线程池
线程池
单个请求处理的时间很短,海量请求的情况下,频繁的创建线程,销毁线程所带来的系统开销是巨大的。
- 降低频繁创建、销毁线程的开销、线程的创建和销毁需要 JVM 进行大量的辅助操作。(内存的分配与回收、还会给垃圾回收器带来压力)
- “池”的概念可以很好的防止资源不足。过多线程会占用大量内存,导致OOM。
- 加快响应速度(复用池中的线程)
- 合理利用CPU和内存。
- 统一管理资源。
线程池适用的场景
- 服务器接受到大量请求时,使用线程池技术是非常适合的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
- 5个以上的线程就可以使用线程池来管理。
不使用线程池(海量任务)
- 线程的创建和销毁需要 JVM 进行大量的辅助操作。(内存的分配与回收、还会给垃圾回收器带来压力)
- 在Java语言中每创建一个线程直接对应操作系统中的一个线程。在操作系统中频繁创建、销毁大量线程会造成很大的系统开销。操作系统支持创建的线程数是有上限的。(线程数量无法与未知的任务数量一一对应)
- 无法作用于C10K场景,会引发OOM异常。
public class EveryTaskOneThread {
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(new Task(),"Thread-" + i).start();
}
}
static class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行了任务");
}
}
}
...
Thread-924 执行了任务
Thread-926 执行了任务
Thread-925 执行了任务
...
增减线程的时机
线程池构造函数的参数
corePoolSize、maxPoolSize
添加线程规则
corePoolsize -> workQueue -> maxPoolSize -> deny
1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。
2. 如果线程数等于(或大于)corePoolSize 但少于 maximumPoolSize,则将任务放入队列。(队列未满,队列已经满,看当前线程是否超过maxPoolSize)
3. 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务。
4. 如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。
corePoolsize -> workQueue -> maxPoolSize -> deny
增减线程的特点
- 通过corePoolSize、maximumPoolSize相当, 可以创建固定大小的线程池。
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
- 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
- 如果使用误解队列(例如:LinkedBlockingQueue),那么线程数将永远不会超过corePoolSize。
keepAliveTime 和线程工厂
keepAliveTime
&emps;如果线程池当前的线程多余corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,他们就会被终止。
ThreadFactory
新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。
如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。
workQueue
- 直接交换:SynchronousQueue (直接交换不存储,所以maximumPoolSize要设置的大一些)
- 无界队列:LinkedBlockingQueue(如果处理的速度跟不上任务提交的速度,那么队列大量积压导致OOM)
- 有界队列:ArrayBlockingQueue(可以设置队列大小)
线程池创建方式
手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险。
根据不同的业务场景,自己设置线程池参数,比如内存、自定义线程名等…
自动创建线程池(JDK封装好的构造函数):演示内存溢出的情况
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 描述: 演示newFixedThreadPool
* VM options: -Xmx8m -Xms8m 把内存调整小一些,模拟OOM发生
*/
public class FixThreadPoolTest {
public static void main(String[] args) {
//底层使用的LinkedBlockingQueue,存在队列堆积OOM的风险。
//new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new Task());
}
}
}
class Task implements Runnable {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(100000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.xdclass.couponapp.test.threadpool.FixThreadPoolTest.main(FixThreadPoolTest.java:16)
线程池对比
核心参数对比
Parameter | FixedThreadPool | CachedThreadPool | ScheduledThreadPool | SingleThreadPool |
---|---|---|---|---|
corePoolSize | constructor-arg | 0 | constructor-arg | 1 |
maxPoolSize | same as corePoolSize | Integer.MAX_VALUE | Integer.MAX_VALUE | 1 |
keepAliveTime | 0 seconds | 60 seconds | 0 | 0 seconds |
使用队列差异
FixedThreadPool | CachedThreadPool | ScheduledThreadPool | SingleThreadPool | |
---|---|---|---|---|
QueueType | LinkedBlockingQueue | SynchronousQueue | DelayedWorkQueue | LinkedBlockingQueue |
newFixedThreadPool
自定义线程数量,底层使用的LinkedBlockingQueue,存在队列堆积OOM的风险。
ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
newSingleThreadExecutor
只有一个线程,底层使用的LinkedBlockingQueue,存在队列堆积OOM的风险。
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
newCachedThreadPool
可缓存线程池,具有自动回收多余线程的功能。
直接交换线程池(每次请求都要新建线程,相当于没有存储队列),
maximumPoolSize=Integer.MAX_VALUE 相当于无界线程池。
keepAliveTime=60 seconds,具有自动回收多余线程的功能。
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
newScheduledThreadPool
延迟执行、周期执行相关的线程池。
maximumPoolSize=Integer.MAX_VALUE 有OOM的风险。
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
//延迟5秒
//threadPool.schedule(new Task(), 5, TimeUnit.SECONDS);
//第一次延迟1秒,然后以周期每3秒钟运行一次
threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
newScheduledThreadPool
这个线程池和之前的有很大不同,适合产生子任务的线程(遍历二叉树)。子线程窃取任务(放在子线程自己的队列中,层层窃取,导致无法保证任务的执行顺序。)
JDK1.8加入。
线程池里的线程数量设置为多少比较合适?
CPU密集型(加密、计算hash等):最佳线程数为 CPU 核心数的 1~2 倍左右。
耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考 Brain Goetz 推荐的计算法:
线程数 = CPU核心数 * (1 + 平均等待时间/平均工作时间)
停止线程池的正确方法
- shutdown()
- isShutdown()
- isTerminated()
- awaitTermination()
- shutdownNow()
初始化停止线程池的动作,线程池虽然不一定会立刻停止,但是会拒绝接受新的任务。
package com.xdclass.couponapp.test.threadpool;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 描述: 演示关闭线程池
*/
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
//立刻关闭线程池(暴力)
//interrupt正在执行的线程,将队列中的任务以List的形式返回。
//返回的任务可以交给新的线程池执行
//List<Runnable> runnableList = executorService.shutdownNow();
//Thread.sleep(1000);
//System.out.println(runnableList.size());
//让线程池进入停止状态(尝试停止,需要过程)
//executorService.shutdown();
//线程池还在执行,提交新任务会拒绝,报异常
//executorService.execute(new ShutDownTask());
//isShutdown()可以用于判断是否已经进入停止状态了(并不是完全停止所有线程)
//System.out.println(executorService.isShutdown());
//isTerminated()线程池是否已经完全停止
//Thread.sleep(10000);
//System.out.println(executorService.isTerminated());
//awaitTermination(time,unit)用来测试在一段时间内线程是否真正的停止
//主要作用是检测(如果等待过程中被打断,会抛出InterruptException)
//boolean b = executorService.awaitTermination(7L, TimeUnit.SECONDS);
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
拒绝新任务
线程池拒绝任务的时机
- 当 Executor 处于关闭状态时,提交新任务会被拒绝。
- 当 Executor 对最大线程和工作对了容量使用有限边界并且已经饱和时。
拒绝策略
- AbortPolicy(堕胎):抛出异常
RejectedExecutionException
- DiscardPolicy(丢弃):如果线程池无法处理,则会默默丢弃任务,并且不会发出通知。
- DiscardOldestPolicy(弃老):丢弃队列中最老的任务,以便腾出空间存放新提交的任务。
- CallerRunsPolicy(调用者运行):如果线程池无法处理,哪个线程提交的新任务,就由该线程代替执行。
钩子函数:让线程池拥有暂停和恢复能力
- 每个任务执行前后
- 日志、统计
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 描述: 演示每个任务执行前后放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}
线程池的实现原理
线程池组成部分
- 线程池管理器
- 工作线程
- 任务队列
- 任务接口(Task)
ThreadPoolExecutor、ExecutorService、Executor、Executors 之间的关系
一般把 ExecutorService 看作是线程池。
- Executor:顶层接口,只有一个方法
void execute(Runnable command);
- ExecutorService:继承 Executor 接口,并且增加了一些新的方法。
- ThreadPoolExecutor:子类(底层实现类)
- Executors:是一个工具类
线程池实现任务复用的原理
- 相同线程执行不同任务(线程池对线程做了包装,不需要重复的启动线程,只启动哪些corePoolSize的线程,后续的任务复用已有线程)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //从阻塞队列拿到任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //只要任务不为空,循环判断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //运行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程池状态
- RUNNING:接受新任务并处理排队任务。
- SHUTDOWN:不接受新任务,但处理排队任务。
- STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务。
- TIDYING:中文是整洁,所有任务都已终止, workerCount 为零时,线程会转到 TIDYING 状态,并将运行 terminate() 钩子方法。
- TERMINATED:terminate() 运行完成。
线程池的注意点
- 避免任务堆积:无解的队列会导致OOM
- 避免线程数过渡增加:cachedThreadPool,每来一个任务就要创建一个线程。
- 排查线程泄漏:线程已经执行完毕,却不能被回收。