并发工具类
CountDownLatch
创建 CountDownLatch 实例的时候需要传入线程数,await()操作进入等待状态,每个线程执行完毕调用 countDown(),计数器减一,当计数器为 0 的时候处于 WAITING 状态的线程会被唤醒。
应用场景:启动三个线程计算,需要每个线程的计算结果进行累加。
CountDownLatch
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(8);
for (int i = 0; i < 8; i++) {
int finalI = i;
new Thread(()->{
try {
Thread.sleep(finalI * 1000L);
System.out.println(Thread.currentThread().getName()+"到达终点");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//无论是否发生异常,每个线程都会调用countDown(),计数器减1.
countDownLatch.countDown();
}
}).start();
}
new Thread(()->{
try {
//计数器挂起
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("800米比赛结束,准备清空跑道并继续跨栏比赛");
}).start();
}
}
CyclicBarrier--栅栏
允许一组线程相互等待达到一个公共的障碍点,之后再继续执行。
CyclicBarrier 跟 countDownLatch 的区别
- CountDownLatch 一般用于某个线程等待若干个其他线程执行完任务之后,它才执行;不可重复使用
- CountDownLatch 一般用于某个线程等待若干个其他线程执行完任务之后,它才执行;不可重复使用
CyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(8);
for (int i = 0; i < 8; i++) {
int finalI = i;
new Thread(() -> {
try {
Thread.sleep(finalI * 1000L);
System.out.println(Thread.currentThread().getName() + "准备就绪");
//相当于计数器加1,当所有线程await()数量累加到指定数量,则会被同时唤醒
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("开始比赛");
}).start();
}
}
}
Semaphore--信号量
- 控制并发数量
- 使用场景:接口限流
Semaphore
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
//获取一个信号量,信号量池中减1
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "开始执行");
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//归还一个信号量,信号量池加1,防御信号量池中,以便后续的线程可以获取信号量
semaphore.release();
}
}).start();
}
}
}
Exchanger
用于交换数据。
它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据, 如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用 Exchanger 的重点是成对的线程使用 exchange()方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是【成对】的。
ExchangerDemo
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> stringExchanger = new Exchanger<>();
String str1 = "test1";
String str2 = "test2";
//线程对象是成对出现的
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "初始值==========>" + str1);
try {
String exchange = stringExchanger.exchange(str1);
System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程1").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "初始值==========>" + str2);
try {
String exchange = stringExchanger.exchange(str2);
System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程2").start();
//单个出现的线程是无法交换数据的
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "初始值==========>" + str2);
try {
String exchange = stringExchanger.exchange(str2);
System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程3").start();
}
}
输出结果
线程1初始值==========>test1
线程2初始值==========>test2
线程1交換后的数据==========>test2
线程2交換后的数据==========>test1
线程3初始值==========>test2