目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

AbstractQueuedSynchronizer

AQS 的作用

 ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等类具有的 协作(同步)功能,底层都是一个共同的基类 AQS
AQS 是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了 AQS 以后,更多的协作工具类都可以很方便得被编写出来。
 一句话总结:有了 AQS,构建线程协作类就容易多了。

Semaphore 和 AQS 的关系

 Semaphore 内部有一个 Sync 类, Sync 类继承了 AQS。

如果没有 AQS

  • 每个协作工具自己实现一些功能:(这些功能都是通用的)
* 同步状态的原子性管理(例如:CountDownLatch 计数器)
* 线程的阻塞与解除阻塞
* 队列的管理
  • 在并发场景下,自己正确且高效实现这些内容,是相当有难度的,所以我们用 AQS 来帮我们把这些脏活累活都搞定,我们只关注业务逻辑就行了。甚至我们可以利用 AQS 自定义一个线程协作类。

AQS 的重要性、地位

 AbstactQueuedSynchronizer 是 Doug Lea 写的,从 JDK1.5 加入的一个基于 FIFO 等待队列实现的一个用于实现同步器的基础框架,我们用 IDE 看 AQS 的实现类,可以发现实现类有以下这些:
AQS

AQS 内部原理解析

 AQS 最核心的就是三大部分

* state 
* 控制线程抢锁和配合的 FIFO 队列
* 期望协作工具类去实现的获取/释放等重要方法

state
 state 的具体含义,会根据具体实现类的不同而不同,比如在 Semaphore 里,它表示 “剩余的许可证的数量”,而在 CountDownLatch 里,它表示 “还需要倒数的数量”。
state 是 volatile 修饰的,会被并发地修改,所以所有修改 state 的方法都需要保证线程安全,比如 getState、setState 以及 compareAndSetState 操作来读取和更新这个状态。这些方法都依赖于 j.u.c.atomic 包的支持。

控制线程抢锁和配合的 FIFO 队列
 该队列用来存放 “等待的线程”,AQS 就是一个 “排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。
 AQS 会维护一个等待的线程队列,把线程都放到这个队列里,队列是双向链表的数据结构

期望协作工具类去实现的获取/释放等重要方法
 这里的获取和释放的方法, 是利用 AQS 的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义个各不相同。

  • 获取方法:依赖于 state 变量 (不同类不同含义),经常会阻塞(比如获取不到锁的时候)
  • 释放方法:释放操作不会被阻塞。

AQS 用法

  1. 写一个类,想好协作逻辑,实现获取/释放方法。
  2. 内部写一个 Sync 类继承 AbstractQueuedSynchronizer
  3. 根据是否独占,重写 tryAcquire/tryRelease 或者 tryAcquireShared(int acquires)/tryReleaseShared(int releases) 等方法,在获取/释放方法中调用 AQS 的 acquire/release 或者 Shared 方法。

利用 AQS 实现简化版 CountDownLatch

 只有一个门栓,1 到 0。简化版的 CountDownLatch。

示例:用 AQS 实现一个简单的线程协作器

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 描述:     自己用AQS实现一个简单的线程协作器
 */
public class OneShotLatch {

    private final Sync sync = new Sync();

    //release
    public void signal() {
        sync.releaseShared(0);
    }

    //acquire
    public void await() {
        sync.acquireShared(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {

        /**
         * 门栓值 1 代表 已打开     获取锁成功                > 0
         * 门栓值 0 代表 未打开     获取锁失败 进入阻塞队列    < 0
         * @param arg
         * @return
         */
        @Override
        protected int tryAcquireShared(int arg) {
            return (getState() == 1) ? 1 : -1;
        }

        /**
         *  true 代表  唤醒锁 setState(1);
         * @param arg
         * @return
         */
        @Override
        protected boolean tryReleaseShared(int arg) {
           setState(1);
           return true;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
                    oneShotLatch.await();
                    System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
                }
            }).start();
        }
        Thread.sleep(5000);
        oneShotLatch.signal();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
                oneShotLatch.await();
                System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
            }
        }).start();
    }
}
Thread-0尝试获取latch,获取失败那就等待
Thread-4尝试获取latch,获取失败那就等待
Thread-3尝试获取latch,获取失败那就等待
Thread-2尝试获取latch,获取失败那就等待
Thread-1尝试获取latch,获取失败那就等待
Thread-7尝试获取latch,获取失败那就等待
Thread-6尝试获取latch,获取失败那就等待
Thread-5尝试获取latch,获取失败那就等待
Thread-9尝试获取latch,获取失败那就等待
Thread-8尝试获取latch,获取失败那就等待
开闸放行Thread-0继续运行
开闸放行Thread-3继续运行
开闸放行Thread-7继续运行
开闸放行Thread-4继续运行
开闸放行Thread-9继续运行
开闸放行Thread-5继续运行
开闸放行Thread-6继续运行
开闸放行Thread-1继续运行
开闸放行Thread-2继续运行
开闸放行Thread-8继续运行
Thread-10尝试获取latch,获取失败那就等待
开闸放行Thread-10继续运行

作者:Soulboy