AbstractQueuedSynchronizer
AQS 的作用
ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等类具有的 协作
(同步)功能,底层都是一个共同的基类 AQS
。
AQS 是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了 AQS 以后,更多的协作工具类都可以很方便得被编写出来。
一句话总结:有了 AQS,构建线程协作类就容易多了。
Semaphore 和 AQS 的关系
Semaphore 内部有一个 Sync 类, Sync 类继承了 AQS。
如果没有 AQS
- 每个协作工具自己实现一些功能:(这些功能都是通用的)
1* 同步状态的原子性管理(例如:CountDownLatch 计数器)
2* 线程的阻塞与解除阻塞
3* 队列的管理
- 在并发场景下,自己正确且高效实现这些内容,是相当有难度的,所以我们用 AQS 来帮我们把这些脏活累活都搞定,我们只关注业务逻辑就行了。甚至我们可以利用 AQS 自定义一个线程协作类。
AQS 的重要性、地位
AbstactQueuedSynchronizer 是 Doug Lea 写的,从 JDK1.5 加入的一个基于 FIFO 等待队列实现的一个用于实现同步器的基础框架,我们用 IDE 看 AQS 的实现类,可以发现实现类有以下这些:
AQS 内部原理解析
AQS 最核心的就是三大部分:
1* state
2* 控制线程抢锁和配合的 FIFO 队列
3* 期望协作工具类去实现的获取/释放等重要方法
state
state 的具体含义,会根据具体实现类的不同而不同,比如在 Semaphore 里,它表示 “剩余的许可证的数量”,而在 CountDownLatch 里,它表示 “还需要倒数的数量”。
state 是 volatile 修饰的,会被并发地修改,所以所有修改 state 的方法都需要保证线程安全,比如 getState、setState 以及 compareAndSetState 操作来读取和更新这个状态。这些方法都依赖于 j.u.c.atomic 包的支持。
控制线程抢锁和配合的 FIFO 队列
该队列用来存放 “等待的线程”,AQS 就是一个 “排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。
AQS 会维护一个等待的线程队列,把线程都放到这个队列里,队列是双向链表的数据结构。
期望协作工具类去实现的获取/释放等重要方法
这里的获取和释放的方法, 是利用 AQS 的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义个各不相同。
- 获取方法:依赖于 state 变量 (不同类不同含义),经常会阻塞(比如获取不到锁的时候)
- 释放方法:释放操作不会被阻塞。
AQS 用法
- 写一个类,想好协作逻辑,实现获取/释放方法。
- 内部写一个 Sync 类继承 AbstractQueuedSynchronizer
- 根据是否独占,重写 tryAcquire/tryRelease 或者 tryAcquireShared(int acquires)/tryReleaseShared(int releases) 等方法,在获取/释放方法中调用 AQS 的 acquire/release 或者 Shared 方法。
利用 AQS 实现简化版 CountDownLatch
 只有一个门栓,1 到 0。简化版的 CountDownLatch。
示例:用 AQS 实现一个简单的线程协作器
1import java.util.concurrent.locks.AbstractQueuedSynchronizer;
2
3/**
4 * 描述: 自己用AQS实现一个简单的线程协作器
5 */
6public class OneShotLatch {
7
8 private final Sync sync = new Sync();
9
10 //release
11 public void signal() {
12 sync.releaseShared(0);
13 }
14
15 //acquire
16 public void await() {
17 sync.acquireShared(0);
18 }
19
20 private class Sync extends AbstractQueuedSynchronizer {
21
22 /**
23 * 门栓值 1 代表 已打开 获取锁成功 > 0
24 * 门栓值 0 代表 未打开 获取锁失败 进入阻塞队列 < 0
25 * @param arg
26 * @return
27 */
28 @Override
29 protected int tryAcquireShared(int arg) {
30 return (getState() == 1) ? 1 : -1;
31 }
32
33 /**
34 * true 代表 唤醒锁 setState(1);
35 * @param arg
36 * @return
37 */
38 @Override
39 protected boolean tryReleaseShared(int arg) {
40 setState(1);
41 return true;
42 }
43 }
44
45
46 public static void main(String[] args) throws InterruptedException {
47 OneShotLatch oneShotLatch = new OneShotLatch();
48 for (int i = 0; i < 10; i++) {
49 new Thread(new Runnable() {
50 @Override
51 public void run() {
52 System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
53 oneShotLatch.await();
54 System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
55 }
56 }).start();
57 }
58 Thread.sleep(5000);
59 oneShotLatch.signal();
60
61 new Thread(new Runnable() {
62 @Override
63 public void run() {
64 System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
65 oneShotLatch.await();
66 System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
67 }
68 }).start();
69 }
70}
1Thread-0尝试获取latch,获取失败那就等待
2Thread-4尝试获取latch,获取失败那就等待
3Thread-3尝试获取latch,获取失败那就等待
4Thread-2尝试获取latch,获取失败那就等待
5Thread-1尝试获取latch,获取失败那就等待
6Thread-7尝试获取latch,获取失败那就等待
7Thread-6尝试获取latch,获取失败那就等待
8Thread-5尝试获取latch,获取失败那就等待
9Thread-9尝试获取latch,获取失败那就等待
10Thread-8尝试获取latch,获取失败那就等待
11开闸放行Thread-0继续运行
12开闸放行Thread-3继续运行
13开闸放行Thread-7继续运行
14开闸放行Thread-4继续运行
15开闸放行Thread-9继续运行
16开闸放行Thread-5继续运行
17开闸放行Thread-6继续运行
18开闸放行Thread-1继续运行
19开闸放行Thread-2继续运行
20开闸放行Thread-8继续运行
21Thread-10尝试获取latch,获取失败那就等待
22开闸放行Thread-10继续运行