AbstractQueuedSynchronizer
AQS 的作用
ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等类具有的 协作
(同步)功能,底层都是一个共同的基类 AQS
。
AQS 是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了 AQS 以后,更多的协作工具类都可以很方便得被编写出来。
一句话总结:有了 AQS,构建线程协作类就容易多了。
Semaphore 和 AQS 的关系
Semaphore 内部有一个 Sync 类, Sync 类继承了 AQS。
如果没有 AQS
- 每个协作工具自己实现一些功能:(这些功能都是通用的)
* 同步状态的原子性管理(例如:CountDownLatch 计数器)
* 线程的阻塞与解除阻塞
* 队列的管理
- 在并发场景下,自己正确且高效实现这些内容,是相当有难度的,所以我们用 AQS 来帮我们把这些脏活累活都搞定,我们只关注业务逻辑就行了。甚至我们可以利用 AQS 自定义一个线程协作类。
AQS 的重要性、地位
AbstactQueuedSynchronizer 是 Doug Lea 写的,从 JDK1.5 加入的一个基于 FIFO 等待队列实现的一个用于实现同步器的基础框架,我们用 IDE 看 AQS 的实现类,可以发现实现类有以下这些:
AQS 内部原理解析
AQS 最核心的就是三大部分:
* state
* 控制线程抢锁和配合的 FIFO 队列
* 期望协作工具类去实现的获取/释放等重要方法
state
state 的具体含义,会根据具体实现类的不同而不同,比如在 Semaphore 里,它表示 “剩余的许可证的数量”,而在 CountDownLatch 里,它表示 “还需要倒数的数量”。
state 是 volatile 修饰的,会被并发地修改,所以所有修改 state 的方法都需要保证线程安全,比如 getState、setState 以及 compareAndSetState 操作来读取和更新这个状态。这些方法都依赖于 j.u.c.atomic 包的支持。
控制线程抢锁和配合的 FIFO 队列
该队列用来存放 “等待的线程”,AQS 就是一个 “排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。
AQS 会维护一个等待的线程队列,把线程都放到这个队列里,队列是双向链表的数据结构。
期望协作工具类去实现的获取/释放等重要方法
这里的获取和释放的方法, 是利用 AQS 的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义个各不相同。
- 获取方法:依赖于 state 变量 (不同类不同含义),经常会阻塞(比如获取不到锁的时候)
- 释放方法:释放操作不会被阻塞。
AQS 用法
- 写一个类,想好协作逻辑,实现获取/释放方法。
- 内部写一个 Sync 类继承 AbstractQueuedSynchronizer
- 根据是否独占,重写 tryAcquire/tryRelease 或者 tryAcquireShared(int acquires)/tryReleaseShared(int releases) 等方法,在获取/释放方法中调用 AQS 的 acquire/release 或者 Shared 方法。
利用 AQS 实现简化版 CountDownLatch
 只有一个门栓,1 到 0。简化版的 CountDownLatch。
示例:用 AQS 实现一个简单的线程协作器
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 描述: 自己用AQS实现一个简单的线程协作器
*/
public class OneShotLatch {
private final Sync sync = new Sync();
//release
public void signal() {
sync.releaseShared(0);
}
//acquire
public void await() {
sync.acquireShared(0);
}
private class Sync extends AbstractQueuedSynchronizer {
/**
* 门栓值 1 代表 已打开 获取锁成功 > 0
* 门栓值 0 代表 未打开 获取锁失败 进入阻塞队列 < 0
* @param arg
* @return
*/
@Override
protected int tryAcquireShared(int arg) {
return (getState() == 1) ? 1 : -1;
}
/**
* true 代表 唤醒锁 setState(1);
* @param arg
* @return
*/
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}
}
public static void main(String[] args) throws InterruptedException {
OneShotLatch oneShotLatch = new OneShotLatch();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
oneShotLatch.await();
System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
}
}).start();
}
Thread.sleep(5000);
oneShotLatch.signal();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
oneShotLatch.await();
System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
}
}).start();
}
}
Thread-0尝试获取latch,获取失败那就等待
Thread-4尝试获取latch,获取失败那就等待
Thread-3尝试获取latch,获取失败那就等待
Thread-2尝试获取latch,获取失败那就等待
Thread-1尝试获取latch,获取失败那就等待
Thread-7尝试获取latch,获取失败那就等待
Thread-6尝试获取latch,获取失败那就等待
Thread-5尝试获取latch,获取失败那就等待
Thread-9尝试获取latch,获取失败那就等待
Thread-8尝试获取latch,获取失败那就等待
开闸放行Thread-0继续运行
开闸放行Thread-3继续运行
开闸放行Thread-7继续运行
开闸放行Thread-4继续运行
开闸放行Thread-9继续运行
开闸放行Thread-5继续运行
开闸放行Thread-6继续运行
开闸放行Thread-1继续运行
开闸放行Thread-2继续运行
开闸放行Thread-8继续运行
Thread-10尝试获取latch,获取失败那就等待
开闸放行Thread-10继续运行