目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

控制并发流程的工具类

什么是控制并发流程?

 控制并发流程的工具类,作用就是帮助开发者更容易得让线程之间合作。
 让线程之间相互配合,来满足业务逻辑(按照预想的方式并发执行)。

比如:让线程 A 等待线程 B 执行完毕再执行等合作策略。

控制并发流程工具类纵览
控制并发流程工具类纵览

CountDownLatch

 直译 倒数门栓,类似于过山车,人满才会发车。
 执行流程如下:

1. 线程开始运行
2. 进入等待,倒数结束之前,一直处于`等待`状态
3. 等待的过程中进行倒数(减)
4. 倒数结束,线程`继续`工作。

CountDownLatch 的执行流程

CountDownLatch 主要方法概览

* CountDownLatch(int count):仅有一个构造函数,参数 count 为需要倒数的数值。
* await():调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行。
* countDown():将 count 值减1,直到为0时,等待的线程会被唤醒。

CountDownLatch 用法

* 一个线程等待多个线程都执行完毕,再继续自己的工作。 一等多
* 多个线程等待某个线程的信号,同时开始执行。	多等一
* 扩展用法:多个线程等待多个线程完成执行后,再同时执行
* CountDownLatch 是不能够重用的,如果需要重新计数,可以考虑使用 CyclicBarrier 或者创建新的 CountDownLatch 实例。

示例:一个线程等待多个线程都执行完毕,再继续自己的工作。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 描述:     工厂中,质检,5个工人检查,所有人都认为通过,才通过
 */
public class CountDownLatchDemo1 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
                final int no = i + 1;
                Runnable runnable = new Runnable() {

                    @Override
                    public void run() {
                        try {
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("No." + no + "完成了检查。");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            latch.countDown();
                        }
                    }
                };
            service.submit(runnable);
        }
        System.out.println("等待5个人检查完.....");
        latch.await();
        System.out.println("所有人都完成了工作,进入下一个环节。");
    }
}
等待5个人检查完.....
No.1完成了检查。
No.5完成了检查。
No.3完成了检查。
No.2完成了检查。
No.4完成了检查。
所有人都完成了工作,进入下一个环节。

案例:模拟 100 米跑步,5 名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 描述:     模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。
 */
public class CountDownLatchDemo2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "准备完毕,等待发令枪");
                    try {
                        begin.await();
                        System.out.println("No." + no + "开始跑步了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }
        //裁判员检查发令枪...
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始!");
        begin.countDown();
    }
}
No.1准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.5准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.4开始跑步了
No.3开始跑步了
No.2开始跑步了
No.5开始跑步了

案例:模拟 100 米跑步,5 名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 描述:     模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。
 */
public class CountDownLatchDemo1And2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);

        CountDownLatch end = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "准备完毕,等待发令枪");
                    try {
                        begin.await();
                        System.out.println("No." + no + "开始跑步了");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + "跑到终点了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        //裁判员检查发令枪...
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始!");
        begin.countDown();

        end.await();
        System.out.println("所有人到达终点,比赛结束");
    }
}
No.1准备完毕,等待发令枪
No.5准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.2准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.5开始跑步了
No.2开始跑步了
No.3开始跑步了
No.4开始跑步了
No.5跑到终点了
No.3跑到终点了
No.4跑到终点了
No.2跑到终点了
No.1跑到终点了
所有人到达终点,比赛结束

Semaphore

 信号量可以用来限制或管理数量有限的资源的使用情况。
 信号量的作用是维护一个“许可证”的计数器,线程可以“获取”许可证,信号量剩余的许可证就减一,线程也可以“释放”一个许可证,那信号量剩余的许可证就加一,当信号量所拥有的许可证数量为 0,那么一下个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证。

信号量
信号量
信号量
信号量
信号量
信号量
信号量

信号量使用流程

1. 初始化 Semaphore 并指定许可证的数量
2. 在需要被执行的代码前加 acquire() 或者 acquireUninterruptibly() 
3. 在任务执行结束后,调用 release() 来释放许可证

信号量主要方法

* new Semaphore(int permits,boolean fair)	这里可以设置是否要使用公平策略,如果传入 true,那么 Semaphore 会把之前等待的线程放到 FIFO 的队列里,以便于当有了新的许可证,可以分发给之前等了最长时间的线程。
* acquire()		可以响应中断
* acquireUninterruptibly()	不响应中断
* tryAcquire()	尝试去拿信号量,并且根据是否拿到返回一个 布尔值。如果没有获取到也不必陷入阻塞,可以去做别的事,过一会儿再来查看许可证的空闲情况。
* tryAcquire(timeout)	类似tryAcquire()一样,但是多了一个超时时间,比如“在3秒内获取不到许可证”,就去退出。
* release()		释放许可证

注意点

* 获取和释放许可证的数量尽可能一致,否则会造成许可证丢失,导致获取许可证的线程阻塞长时间阻塞。
* 一般设置信号量为 true 更为合理,因为信号量一般用于`慢任务`会导致线程堆积,如果是非公平会导致有线程插队,引发饥饿。
* 并不是必须由获取许可证线程的那个线程释放许可证,A线程获取,B线程释放也可以,只要逻辑合理即可。 许可证的获取和释放可以`跨线程、跨线程池`。
* 信号量的作用,除了控制临界区最多同时有 N 个线程访问外,另外一个作用是可以实现 “条件等待”,例如:线程 A需要线程 B完成准备工作后才能开始工作,那么就线程A acquire(),而线程B release(),这样的话,相当于轻量级的 CountDownLatch。

示例:演示 Semaphore 用法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 描述:     演示Semaphore用法
 */
public class SemaphoreDemo {

    static Semaphore semaphore = new Semaphore(5, true);

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            try {
                semaphore.acquire(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "释放了许可证");
            semaphore.release(5);
        }
    }
}

Condition (Interface)

 当线程1需要等待某个条件的时候,它就去执行 condition.await() 方法,一旦执行 await() 方法,线程就会进入阻塞状态。
 通常会有另外一个线程,假设线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行 condition.signal() 方法, 这时 JVM 机会从被阻塞的线程里找,找到那些等待该 condition 的线程,当线程1就会收到可执行信号的时候,它的线程状态就会变成 Runnable 可以执行状态。

Condition

Condition 常用方法

* condition.await() 	阻塞于当前 condition
* condition.signal()	“公平”,只会唤醒等待时间最长的线程
* condition.signalAll()	唤醒所有正在等待的线程

Condition 注意点

* Lock用来代替 synchonrized,Conidtion 相当于代替了 synchonrized的 Object.wait() / notify(),用法上几乎一样
* await() 方法会自动释放持有的 Lock 锁,和 Object.wait 一样,不需要自己手动释放锁
* 调用 await() 的时候,必须持有锁,否则会抛出异常,和Object.wait 一样。
* 一个 Lock 锁可以生成 多个 Condition 条件。

示例:演示Condition的基本用法

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 描述:     演示Condition的基本用法
 */
public class ConditionDemo1 {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    void method1() throws InterruptedException {
        lock.lock();
        try{
            System.out.println("条件不满足,开始await");
            condition.await();
            System.out.println("条件满足了,开始执行后续的任务");
        }finally {
            lock.unlock();
        }
    }

    void method2() {
        lock.lock();
        try{
            System.out.println("准备工作完成,唤醒其他的线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        //method1()
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    conditionDemo1.method1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        /method2()
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    conditionDemo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

示例:演示用Condition实现生产者消费者模式

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 描述:     演示用Condition实现生产者消费者模式
 */
public class ConditionDemo2 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();

    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

CyclicBarrier

 循环栅栏和 CountDownLatch 很类似,都能阻塞一组线程。
 当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们可以使用 CyclicBarrier。CyclicBarrier 可以构造一个集结点,当某个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一发出,继续执行剩下的任务。
 CyclicBarrier 可以重用

CyclicBarrier 和 CountDownLatch的区别

  • 作用不同:CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字到 0,也就是说,CountDownLatch 用于事件但是 CyclicBarrier 是用于线程的。
  • CountDownLatch 不同重用,CyclicBarrier 可以重用。
  • CyclicBarrier 在构造 CyclicBarrier 实例的时候可以设置集结成功之后触发的动作。

示例:演示CyclicBarrier

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 描述:    演示CyclicBarrier
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //同行者数量,集合成功执行的任务
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到场了, 大家统一出发!");
            }
        });
	//CyclicBarrier 可以重用
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable{
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + id + "现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random()*10000));
                System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
                cyclicBarrier.await();//抵达 CyclicBarrier 之后,开始等待其他线程
                System.out.println("线程"+id+"出发了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
线程0现在前往集合地点
线程4现在前往集合地点
线程5现在前往集合地点
线程3现在前往集合地点
线程2现在前往集合地点
线程1现在前往集合地点
线程7现在前往集合地点
线程6现在前往集合地点
线程8现在前往集合地点
线程9现在前往集合地点
线程3到了集合地点,开始等待其他人到达
线程2到了集合地点,开始等待其他人到达
线程9到了集合地点,开始等待其他人到达
线程1到了集合地点,开始等待其他人到达
线程7到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程7出发了
线程3出发了
线程2出发了
线程9出发了
线程1出发了
线程4到了集合地点,开始等待其他人到达
线程5到了集合地点,开始等待其他人到达
线程8到了集合地点,开始等待其他人到达
线程0到了集合地点,开始等待其他人到达
线程6到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程6出发了
线程4出发了
线程5出发了
线程8出发了
线程0出发了

作者:Soulboy