目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

线程池

线程池

 单个请求处理的时间很短,海量请求的情况下,频繁的创建线程,销毁线程所带来的系统开销是巨大的

  • 降低频繁创建、销毁线程的开销、线程的创建和销毁需要 JVM 进行大量的辅助操作。(内存的分配与回收、还会给垃圾回收器带来压力)
  • “池”的概念可以很好的防止资源不足。过多线程会占用大量内存,导致 OOM。
  • 加快响应速度(复用池中的线程)
  • 合理利用 CPU 和内存。
  • 统一管理资源。

线程池适用的场景

  • 服务器接受到大量请求时,使用线程池技术是非常适合的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
  • 5 个以上的线程就可以使用线程池来管理。

不使用线程池(海量任务)

  • 线程的创建和销毁需要 JVM 进行大量的辅助操作。(内存的分配与回收、还会给垃圾回收器带来压力)
  • 在 Java 语言中每创建一个线程直接对应操作系统中的一个线程。在操作系统中频繁创建、销毁大量线程会造成很大的系统开销。操作系统支持创建的线程数是有上限的。(线程数量无法与未知的任务数量一一对应)
  • 无法作用于 C10K 场景,会引发 OOM 异常。
 1public class EveryTaskOneThread {
 2    public static void main(String[] args) {
 3        for (int i = 0; i < 1000; i++) {
 4            new Thread(new Task(),"Thread-" +  i).start();
 5        }
 6    }
 7
 8    static class Task implements Runnable {
 9        @Override
10        public void run() {
11            System.out.println(Thread.currentThread().getName() + " 执行了任务");
12        }
13    }
14}
1...
2Thread-924 执行了任务
3Thread-926 执行了任务
4Thread-925 执行了任务
5...

增减线程的时机

线程池构造函数的参数
线程池构造函数的参数
 corePoolSize、maxPoolSize

添加线程规则
添加线程流程
corePoolSize、maxPoolSize
corePoolsize -> workQueue -> maxPoolSize -> deny

11. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。
22. 如果线程数等于(或大于)corePoolSize 但少于 maximumPoolSize,则将任务放入队列。(队列未满,队列已经满,看当前线程是否超过maxPoolSize)
33. 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务。
44. 如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。
5
6corePoolsize -> workQueue -> maxPoolSize -> deny

增减线程的特点

  • 通过 corePoolSize、maximumPoolSize 相当, 可以创建固定大小的线程池。
  • 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
  • 通过设置 maximumPoolSize 为很高的值,例如 Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
  • 如果使用误解队列(例如:LinkedBlockingQueue),那么线程数将永远不会超过 corePoolSize。

keepAliveTime 和线程工厂

keepAliveTime
&emps;如果线程池当前的线程多余 corePoolSize,那么如果多余的线程空闲时间超过 keepAliveTime,他们就会被终止。

ThreadFactory
 新的线程是由 ThreadFactory 创建的,默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一线程组,拥有同样的 NORM_PRIORITY 优先级并且都不是守护线程。
 如果自己指定 ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。

workQueue

  • 直接交换:SynchronousQueue (直接交换不存储,所以 maximumPoolSize 要设置的大一些)
  • 无界队列:LinkedBlockingQueue(如果处理的速度跟不上任务提交的速度,那么队列大量积压导致 OOM)
  • 有界队列:ArrayBlockingQueue(可以设置队列大小)

线程池创建方式

 手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险。
 根据不同的业务场景,自己设置线程池参数,比如内存、自定义线程名等…

自动创建线程池(JDK 封装好的构造函数):演示内存溢出的情况

 1import java.util.concurrent.ExecutorService;
 2import java.util.concurrent.Executors;
 3
 4/**
 5 * 描述:  演示newFixedThreadPool
 6 * VM options:    -Xmx8m -Xms8m   把内存调整小一些,模拟OOM发生
 7 */
 8public class FixThreadPoolTest {
 9    public static void main(String[] args) {
10        //底层使用的LinkedBlockingQueue,存在队列堆积OOM的风险。
11        //new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
12        ExecutorService executorService = Executors.newFixedThreadPool(2);
13        for (int i = 0; i < Integer.MAX_VALUE; i++) {
14            executorService.execute(new Task());
15        }
16    }
17
18}
19class Task implements Runnable {
20    @Override
21    public void run() {
22        try {
23            System.out.println(Thread.currentThread().getName());
24            Thread.sleep(100000000);
25        } catch (InterruptedException e) {
26            e.printStackTrace();
27        }
28        System.out.println(Thread.currentThread().getName());
29    }
30}
1pool-1-thread-1
2pool-1-thread-2
3Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
4	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
5	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
6	at com.xdclass.couponapp.test.threadpool.FixThreadPoolTest.main(FixThreadPoolTest.java:16)

线程池对比

核心参数对比

Parameter FixedThreadPool CachedThreadPool ScheduledThreadPool SingleThreadPool
corePoolSize constructor-arg 0 constructor-arg 1
maxPoolSize same as corePoolSize Integer.MAX_VALUE Integer.MAX_VALUE 1
keepAliveTime 0 seconds 60 seconds 0 0 seconds

使用队列差异

FixedThreadPool CachedThreadPool ScheduledThreadPool SingleThreadPool
QueueType LinkedBlockingQueue SynchronousQueue DelayedWorkQueue LinkedBlockingQueue

newFixedThreadPool
 自定义线程数量,底层使用的 LinkedBlockingQueue,存在队列堆积 OOM 的风险。
ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

1        ExecutorService executorService = Executors.newFixedThreadPool(4);
2        for (int i = 0; i < 1000; i++) {
3            executorService.execute(new Task());
4        }

newSingleThreadExecutor
 只有一个线程,底层使用的 LinkedBlockingQueue,存在队列堆积 OOM 的风险。
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));

1        ExecutorService executorService = Executors.newSingleThreadExecutor();
2        for (int i = 0; i < 1000; i++) {
3            executorService.execute(new Task());
4        }

newCachedThreadPool
 可缓存线程池,具有自动回收多余线程的功能。
 直接交换线程池(每次请求都要新建线程,相当于没有存储队列),
 maximumPoolSize=Integer.MAX_VALUE 相当于无界线程池。
 keepAliveTime=60 seconds,具有自动回收多余线程的功能。
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

1        ExecutorService executorService = Executors.newCachedThreadPool();
2        for (int i = 0; i < 1000; i++) {
3            executorService.execute(new Task());
4        }

newScheduledThreadPool
 延迟执行、周期执行相关的线程池。
 maximumPoolSize=Integer.MAX_VALUE 有 OOM 的风险。
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())

1        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
2        //延迟5秒
3        //threadPool.schedule(new Task(), 5, TimeUnit.SECONDS);
4        //第一次延迟1秒,然后以周期每3秒钟运行一次
5        threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);

newScheduledThreadPool
 这个线程池和之前的有很大不同,适合产生子任务的线程(遍历二叉树)。子线程窃取任务(放在子线程自己的队列中,层层窃取,导致无法保证任务的执行顺序。)
 JDK1.8 加入。

线程池里的线程数量设置为多少比较合适?

 CPU 密集型(加密、计算 hash 等):最佳线程数为 CPU 核心数的 1~2 倍左右。
 耗时 IO 型(读写数据库、文件、网络读写等):最佳线程数一般会大于 CPU 核心数很多倍,以 JVM 线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考 Brain Goetz 推荐的计算法:
线程数 = CPU核心数 * (1 + 平均等待时间/平均工作时间)

停止线程池的正确方法

  • shutdown()
  • isShutdown()
  • isTerminated()
  • awaitTermination()
  • shutdownNow()

 初始化停止线程池的动作,线程池虽然不一定会立刻停止,但是会拒绝接受新的任务。

 1package com.xdclass.couponapp.test.threadpool;
 2
 3import java.util.List;
 4import java.util.concurrent.ExecutorService;
 5import java.util.concurrent.Executors;
 6import java.util.concurrent.TimeUnit;
 7
 8/**
 9 * 描述:     演示关闭线程池
10 */
11public class ShutDown {
12
13    public static void main(String[] args) throws InterruptedException {
14        ExecutorService executorService = Executors.newFixedThreadPool(10);
15        for (int i = 0; i < 100; i++) {
16            executorService.execute(new ShutDownTask());
17        }
18        Thread.sleep(1500);
19
20        //立刻关闭线程池(暴力)
21        //interrupt正在执行的线程,将队列中的任务以List的形式返回。
22        //返回的任务可以交给新的线程池执行
23        //List<Runnable> runnableList = executorService.shutdownNow();
24        //Thread.sleep(1000);
25        //System.out.println(runnableList.size());
26
27        //让线程池进入停止状态(尝试停止,需要过程)
28        //executorService.shutdown();
29
30        //线程池还在执行,提交新任务会拒绝,报异常
31        //executorService.execute(new ShutDownTask());
32
33        //isShutdown()可以用于判断是否已经进入停止状态了(并不是完全停止所有线程)
34        //System.out.println(executorService.isShutdown());
35
36        //isTerminated()线程池是否已经完全停止
37        //Thread.sleep(10000);
38        //System.out.println(executorService.isTerminated());
39
40        //awaitTermination(time,unit)用来测试在一段时间内线程是否真正的停止
41        //主要作用是检测(如果等待过程中被打断,会抛出InterruptException)
42        //boolean b = executorService.awaitTermination(7L, TimeUnit.SECONDS);
43    }
44}
45
46class ShutDownTask implements Runnable {
47    
48    @Override
49    public void run() {
50        try {
51            Thread.sleep(500);
52            System.out.println(Thread.currentThread().getName());
53        } catch (InterruptedException e) {
54            System.out.println(Thread.currentThread().getName() + "被中断了");
55        }
56    }
57}

拒绝新任务

线程池拒绝任务的时机

  • 当 Executor 处于关闭状态时,提交新任务会被拒绝。
  • 当 Executor 对最大线程和工作对了容量使用有限边界并且已经饱和时。

线程池拒绝任务

拒绝策略

  • AbortPolicy(堕胎):抛出异常 RejectedExecutionException
  • DiscardPolicy(丢弃):如果线程池无法处理,则会默默丢弃任务,并且不会发出通知。
  • DiscardOldestPolicy(弃老):丢弃队列中最老的任务,以便腾出空间存放新提交的任务。
  • CallerRunsPolicy(调用者运行):如果线程池无法处理,哪个线程提交的新任务,就由该线程代替执行。

钩子函数:让线程池拥有暂停和恢复能力

  • 每个任务执行前后
  • 日志、统计
  1import java.util.concurrent.BlockingQueue;
  2import java.util.concurrent.LinkedBlockingQueue;
  3import java.util.concurrent.RejectedExecutionHandler;
  4import java.util.concurrent.ThreadFactory;
  5import java.util.concurrent.ThreadPoolExecutor;
  6import java.util.concurrent.TimeUnit;
  7import java.util.concurrent.locks.Condition;
  8import java.util.concurrent.locks.ReentrantLock;
  9
 10/**
 11 * 描述:     演示每个任务执行前后放钩子函数
 12 */
 13public class PauseableThreadPool extends ThreadPoolExecutor {
 14
 15    private final ReentrantLock lock = new ReentrantLock();
 16    private Condition unpaused = lock.newCondition();
 17    private boolean isPaused;
 18
 19
 20    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
 21            TimeUnit unit,
 22            BlockingQueue<Runnable> workQueue) {
 23        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
 24    }
 25
 26    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
 27            TimeUnit unit, BlockingQueue<Runnable> workQueue,
 28            ThreadFactory threadFactory) {
 29        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
 30    }
 31
 32    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
 33            TimeUnit unit, BlockingQueue<Runnable> workQueue,
 34            RejectedExecutionHandler handler) {
 35        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
 36    }
 37
 38    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
 39            TimeUnit unit, BlockingQueue<Runnable> workQueue,
 40            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
 41        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
 42                handler);
 43    }
 44
 45    @Override
 46    protected void beforeExecute(Thread t, Runnable r) {
 47        super.beforeExecute(t, r);
 48        lock.lock();
 49        try {
 50            while (isPaused) {
 51                unpaused.await();
 52            }
 53        } catch (InterruptedException e) {
 54            e.printStackTrace();
 55        } finally {
 56            lock.unlock();
 57        }
 58    }
 59
 60    @Override
 61    protected void afterExecute(Runnable r, Throwable t) {
 62        super.afterExecute(r, t);
 63    }
 64
 65    private void pause() {
 66        lock.lock();
 67        try {
 68            isPaused = true;
 69        } finally {
 70            lock.unlock();
 71        }
 72    }
 73
 74    public void resume() {
 75        lock.lock();
 76        try {
 77            isPaused = false;
 78            unpaused.signalAll();
 79        } finally {
 80            lock.unlock();
 81        }
 82    }
 83
 84    public static void main(String[] args) throws InterruptedException {
 85        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
 86                TimeUnit.SECONDS, new LinkedBlockingQueue<>());
 87        Runnable runnable = new Runnable() {
 88            @Override
 89            public void run() {
 90                System.out.println("我被执行");
 91                try {
 92                    Thread.sleep(10);
 93                } catch (InterruptedException e) {
 94                    e.printStackTrace();
 95                }
 96            }
 97        };
 98
 99        for (int i = 0; i < 10000; i++) {
100            pauseableThreadPool.execute(runnable);
101        }
102
103        Thread.sleep(1500);
104        pauseableThreadPool.pause();
105        System.out.println("线程池被暂停了");
106        Thread.sleep(1500);
107        pauseableThreadPool.resume();
108        System.out.println("线程池被恢复了");
109
110    }
111}

线程池的实现原理

线程池组成部分

  • 线程池管理器
  • 工作线程
  • 任务队列
  • 任务接口(Task)
    线程池结构

ThreadPoolExecutor、ExecutorService、Executor、Executors 之间的关系
 一般把 ExecutorService 看作是线程池。
哪个是线程池

  • Executor:顶层接口,只有一个方法 void execute(Runnable command);
  • ExecutorService:继承 Executor 接口,并且增加了一些新的方法。
  • ThreadPoolExecutor:子类(底层实现类)
  • Executors:是一个工具类

线程池实现任务复用的原理

  • 相同线程执行不同任务(线程池对线程做了包装,不需要重复的启动线程,只启动哪些 corePoolSize 的线程,后续的任务复用已有线程)
 1    final void runWorker(Worker w) {
 2        Thread wt = Thread.currentThread();
 3        Runnable task = w.firstTask; //从阻塞队列拿到任务
 4        w.firstTask = null;
 5        w.unlock(); // allow interrupts
 6        boolean completedAbruptly = true;
 7        try {
 8            while (task != null || (task = getTask()) != null) { //只要任务不为空,循环判断
 9                w.lock();
10                // If pool is stopping, ensure thread is interrupted;
11                // if not, ensure thread is not interrupted.  This
12                // requires a recheck in second case to deal with
13                // shutdownNow race while clearing interrupt
14                if ((runStateAtLeast(ctl.get(), STOP) ||
15                     (Thread.interrupted() &&
16                      runStateAtLeast(ctl.get(), STOP))) &&
17                    !wt.isInterrupted())
18                    wt.interrupt();
19                try {
20                    beforeExecute(wt, task);
21                    Throwable thrown = null;
22                    try {
23                        task.run(); //运行任务
24                    } catch (RuntimeException x) {
25                        thrown = x; throw x;
26                    } catch (Error x) {
27                        thrown = x; throw x;
28                    } catch (Throwable x) {
29                        thrown = x; throw new Error(x);
30                    } finally {
31                        afterExecute(task, thrown);
32                    }
33                } finally {
34                    task = null;
35                    w.completedTasks++;
36                    w.unlock();
37                }
38            }
39            completedAbruptly = false;
40        } finally {
41            processWorkerExit(w, completedAbruptly);
42        }
43    }

线程池状态

  • RUNNING:接受新任务并处理排队任务。
  • SHUTDOWN:不接受新任务,但处理排队任务。
  • STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务。
  • TIDYING:中文是整洁,所有任务都已终止, workerCount 为零时,线程会转到 TIDYING 状态,并将运行 terminate() 钩子方法。
  • TERMINATED:terminate() 运行完成。

线程池的注意点

  • 避免任务堆积:无解的队列会导致 OOM
  • 避免线程数过渡增加:cachedThreadPool,每来一个任务就要创建一个线程。
  • 排查线程泄漏:线程已经执行完毕,却不能被回收。

作者:Soulboy