目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Concurrent Programming

并发编程三要素

原子性
一个不可再被分割的颗粒,原子性指的是一个或多个操作要么全部执行成功要么全部执行失败
期间不能被中断,也不存在上下文切换,线程切换会带来原子性的问题。

有序性
程序执行的顺序按照代码的先后顺序执行,因为处理器可能会对指令进行重排序。

可见性
一个线程A对共享变量的修改,另一个线程B能够立刻看到

锁分类

悲观锁、乐观锁

  • 悲观锁适合写操作多的场景
  • 乐观锁适合读操作多的场景
  • 乐观锁的吞吐量会比悲观锁多

悲观锁
当线程去操作数据的时候,总认为别的线程会去修改数据,所以它每次拿数据的时候都会上锁,别的线程去拿候就会阻塞,比如synchronized、ReentrantLock

乐观锁

  • 每次去拿数据的时候都认为别人不会修改,更新的时候会判断别人是否回去更新数据
  • 通过版本判断如果数据被修改了就拒绝更新,比如CAS是乐观锁,但严格来说并不是锁,通过原子性来保证数据的问步。
    比如说数据库的乐观锁,通过版本控制来实现,乐观的认为在数据更新期间没有其他线程影响

公平锁、非公平锁

公平锁

  • 指多个线程按照申请锁的顺序来获取锁,简单来说如果一个线程组里,能保证每个线程都能拿到锁。比如ReentrantLock(底层是同步队列FIFO:First Input First Output来实现)

非公平锁

  • 获取锁的方式是随机获取的,保证不了每个线程都能拿到锁,也就是存在有线程饿死,一直拿不到锁。比如synchronized、ReentrantLock

独享锁、共享锁

独享锁(互斥锁)

  • 也叫排它锁/写锁/独占锁/独享锁/该锁:每一次只能被一个线程所持有,加锁后任何线程试图再次加锁的线程会被阻塞
  • 直到当前线程解锁。例子:如果 线程A 对 data1 加上排他锁后,则其他线程不能再对 data1 加任何类型的锁
  • 获得互斥锁的线程即能读数据又能修改数据

共享锁

  • 也叫S锁/读锁,能查看但无法修改和删除的一种数据锁,加锁后其它用户可以并发读取、查询数据
  • 但不能修改,增加,删除数据,该锁可被多个线程所持有,用于资源数据共享

可重入锁、不可重入锁

可重入锁能一定程度的避免死锁,synchronized、ReentrantLock重入锁

可重入锁

  • 也叫递归锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁

不可重入锁

  • 若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞

自旋锁、分段锁、死锁

自旋锁
不会发生线程状态的切换,二直处于用户态,减少了线程上下文切换的消耗,缺点是循环会消耗CPU

  • 一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待
  • 然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环,任何时刻最多只能有一个执行单元获得锁

分段锁
并不是具体的一种锁,只是一种锁的设计,将数据分段上锁,把锁进一步细粒度化,可以提升并发量
当操作不需要更新整个数组的时候,就仅针对数组中的一项进行加锁操作
CurrentHashMap底层就用了分段锁

死锁
两个或两个以上的线程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法让程序进行下去

ReentrantLock公平锁和非公平锁

ReentrantLock是公平锁还是非公平锁?

ReentrantLock可以通过参数控制,可以是公平锁,也可以是非公平锁。

ReentrantLock里面有一个内部类Sync,Sync继承AQS AbstractQueuedsynchronizer

Sync又两个子类,FairSync(公平锁)和NonfairSync(非公平锁),ReentrantLock默认使用非公平锁

公平锁与非公平锁的lock()方法唯一的区别是多了一个限制条件:hasQueuedPredecessors()
image.png

非公平锁会带来【锁饥饿】,什么是【锁饥饿】?

  • 当有几个线程同时来抢占锁时,其中有的线程一直抢到锁
  • 但一些线程由于优先级太低,一直得不到 CPU 调度执行,导致其他线程一直抢不到锁,这个就是出现了锁饥饿。

ReentrantLock为啥要设计有公平锁和非公平锁,他们有什么优缺点?

在Java 语言中,锁的默认实现都是非公平锁,原因是非公平锁的效率更高。
非公平锁注重的是性能,而公平锁注重的是锁资源的平均分配。
各有优缺点,业务需要根据实际业务场景。

  • 公平锁获取锁和释放锁后的相关操作,相关线程也会从休眠和恢复之间变化,这个就涉及到用户态内核态互相转变。
  • 非公平锁获取锁不用遵循先到先得的规则,没有了阻塞和恢复执行的步骤,避免了线程休眠和恢复的操作。所以非公平锁性能高于公平锁,更能重复利用CPU的时间
  • 如果是为了各个线程都可以获取锁资源,则推荐采用公平锁,等待锁的线程不会饿死。
  • 如果是为了业务有更大的吞吐量,则推荐采用非公平锁

synchronized对象锁和类锁的性能差异

阿里巴巴编码手册有一个规范【能锁区块,就不要锁整个方法体;能用对象锁,就不要用类锁】

对象锁( synchronized method{} )

  • 也叫实例锁,对应synchronized关键字,当多个线程访问多个实例时,它们互不干扰,每个对象都拥有自己的锁。
  • 但如果是多个线程访问同个对象的sychronized块,是同步加锁,访问不同对象的话就是不同步的
  • 同步代码块synchronized(object){ } 的效果和在实例方法上加锁一样,不同的是可以在()里添加不同的对象

image.png

类锁( static sychronized method{ } )

  • 关键字是 static sychronized,是一个全局锁,不管创建多少个对象都共享同一个锁。
  • 保障同一个时刻多个线程同时访问同一个synchronized块,当一个线程在访问时,其他的线程等待
  • 静态方法使用synchronized关键字后,无论是多线程访问单个对象还是多个对象的sychronized块,都是同步
  • synchronized(xxx.class)的效果和在静态方法方法上加锁一样,不同的是可以在()里添加不同的类对象

image.png

类锁 VS 对象锁
类锁和对象锁是两种不同的锁,可以同时使用

同步代码块
同步代码块的效果和在实例方法上加锁一样,不同的是可以在()里添加不同的对象

同步代码块()中变量的含义

变量锁范围
this本类对象的锁
实例传入对象的锁
类名.class类锁

示例代码

image.png

示例代码:对象锁

Computer

package com.soulboy.thread;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class Computer {

    public static void main(String[] args) {
        Computer computer = new Computer();
        new Thread(() -> {
            computer.watchTv();
        }, "看电视(对象锁)").start();

        new Thread(() -> {
            computer.listenMusic();
        }, "听音乐(对象锁)").start();

        new Thread(() -> {
            computer.studyIt();
        }, "学技术(无锁)").start();
    }
    
    public SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public synchronized void watchTv() {
        try {
            String name = Thread.currentThread().getName();
            System.out.println(name + ": 持有对象锁——" + sdf.format(new Date()));
            System.out.println(name + ": 看电影");
            Thread.sleep(2000);
            System.out.println(name + ": 释放对象锁——"+sdf.format(new Date()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public synchronized void listenMusic() {
        try {
            String name = Thread.currentThread().getName();
            System.out.println(name + ": 持有对象锁——" + sdf.format(new Date()));
            System.out.println(name + "听音乐");
            Thread.sleep(2000);
            System.out.println(name + ": 释放对象锁——" + sdf.format(new Date()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
    public void studyIt() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "学习技术");
    }
}

输出结果

学技术(无锁)学习技术
看电视(对象锁): 持有对象锁——2024-07-20 20:33:35
看电视(对象锁): 看电影
看电视(对象锁): 释放对象锁——2024-07-20 20:33:37
听音乐(对象锁): 持有对象锁——2024-07-20 20:33:37
听音乐(对象锁)听音乐
听音乐(对象锁): 释放对象锁——2024-07-20 20:33:39

示例代码:类锁

ComputerStatic

package com.soulboy.thread;

import java.text.SimpleDateFormat;
import java.util.Date;

public class ComputerStatic {

    public static void main(String[] args) {
        ComputerStatic computerStatic = new ComputerStatic();
        new Thread(() -> {
            ComputerStatic.watchTv();
        }, "看电视(类锁)").start();

        new Thread(() -> {
            ComputerStatic.listenMusic();
        }, "听音乐(类锁)").start();

        new Thread(() -> {
            ComputerStatic.studyIt();
        }, "学技术(无锁)").start();
        new Thread(() -> {
            ComputerStatic.studyIt();
        }, "打篮球(对象锁)").start();

    }

    public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static synchronized void watchTv() {
        try {
            String name = Thread.currentThread().getName();
            System.out.println(name + ": 持有类锁——" + sdf.format(new Date()));
            System.out.println(name + "正在看电影");
            Thread.sleep(2000);
            System.out.println(name + ": 释放类锁——" + sdf.format(new Date()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static synchronized void listenMusic() {
        try {
            String name = Thread.currentThread().getName();
            System.out.println(name + ": 持有类锁——" + sdf.format(new Date()));
            System.out.println(name + "正在听音乐");
            Thread.sleep(2000);
            System.out.println(name + ": 释放类锁——" + sdf.format(new Date()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void studyIt() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "学习技术");
    }

    public synchronized void playBasketball() {
        String name = Thread.currentThread().getName();
        System.out.println(name + ": 持有对象锁——" + sdf.format(new Date()));
        System.out.println(name + ": 打篮球");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + ": 释放对象锁——" + sdf.format(new Date()));
    }

}

输出结果

学技术(无锁)学习技术
打篮球(对象锁)学习技术
看电视(类锁): 持有类锁——2024-07-20 21:30:09
看电视(类锁)正在看电影
看电视(类锁): 释放类锁——2024-07-20 21:30:11
听音乐(类锁): 持有类锁——2024-07-20 21:30:11
听音乐(类锁)正在听音乐
听音乐(类锁): 释放类锁——2024-07-20 21:30:13

synchronized底层锁原理分析实战

image.png

  • synchronized是解决线程安全的问题,常用在同步普通方法、静态方法、同步代码块中
  • 每个对象有一个锁和一个等待队列,锁只能被一个线程持有,其他需要锁的线程需要阻塞等待。锁被释放后,对象会从队列中取出一个并唤醒,唤醒哪个线程是不确定的,不保证公平性
  • synchronized是非公平可重入锁(递归锁)

synchronized加锁的方式

同步方法(隐式同步)

  • 生成的字节码文件中会多一个 ACC SYNCHRONIZED 标志位
  • 当一个线程访问方法时,会去检查是否存在ACCSYNCHRONIZED标识
  • 如果存在,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor
  • 在方法执行期间,其他任何线程都无法再获得同一个monitor对象,也叫

image.png

同步代码块(显式同步)

  • 加了 synchronized 关键字的代码段,生成的字节码文件会多出 monitorenter 和 monitorexit 两条指令
  • 每个monitor维护着一个记录着拥有次数的计数器,未被拥有的monitor的该计数器为0
  • 当一个线程获执行monitorenter后,该计数器自增1
  • 当同一个线程执行monitorexit指令的时候,计数器再自减1
  • 当计数器为0的时候,monitor将被释放,也叫显式同步

image.png

查看字节码变化

Java源码

public class SyncTest {

    public synchronized void test1() {
        System.out.println("test1...");
    }

    public void test2() {
        synchronized (this) {
            System.out.println("test2...");
        }
    }
}

编译成字节码输出

D:\Project\redlock\src\main\java\com\soulboy\thread>javac SyncTest.java

D:\Project\redlock\src\main\java\com\soulboy\thread>javap -v SyncTest.class
Classfile /D:/Project/redlock/src/main/java/com/soulboy/thread/SyncTest.class
  Last modified 2024年7月21日; size 592 bytes
  SHA-256 checksum c8641fb2c4ab604bba3cacc7ae95d09f680eda5f923622868fa330cdb38644f3
  Compiled from "SyncTest.java"
public class com.soulboy.thread.SyncTest
  minor version: 0
  major version: 61
  flags: (0x0021) ACC_PUBLIC, ACC_SUPER
  this_class: #23                         // com/soulboy/thread/SyncTest
  super_class: #2                         // java/lang/Object
  interfaces: 0, fields: 0, methods: 3, attributes: 1
Constant pool:
   #1 = Methodref          #2.#3          // java/lang/Object."<init>":()V
   #2 = Class              #4             // java/lang/Object
   #3 = NameAndType        #5:#6          // "<init>":()V
   #4 = Utf8               java/lang/Object
   #5 = Utf8               <init>
   #6 = Utf8               ()V
   #7 = Fieldref           #8.#9          // java/lang/System.out:Ljava/io/PrintStream;
   #8 = Class              #10            // java/lang/System
   #9 = NameAndType        #11:#12        // out:Ljava/io/PrintStream;
  #10 = Utf8               java/lang/System
  #11 = Utf8               out
  #12 = Utf8               Ljava/io/PrintStream;
  #13 = String             #14            // test1...
  #14 = Utf8               test1...
  #15 = Methodref          #16.#17        // java/io/PrintStream.println:(Ljava/lang/String;)V
  #16 = Class              #18            // java/io/PrintStream
  #17 = NameAndType        #19:#20        // println:(Ljava/lang/String;)V
  #18 = Utf8               java/io/PrintStream
  #19 = Utf8               println
  #20 = Utf8               (Ljava/lang/String;)V
  #21 = String             #22            // test2...
  #22 = Utf8               test2...
  #23 = Class              #24            // com/soulboy/thread/SyncTest
  #24 = Utf8               com/soulboy/thread/SyncTest
  #25 = Utf8               Code
  #26 = Utf8               LineNumberTable
  #27 = Utf8               test1
  #28 = Utf8               test2
  #29 = Utf8               StackMapTable
  #30 = Class              #31            // java/lang/Throwable
  #31 = Utf8               java/lang/Throwable
  #32 = Utf8               SourceFile
  #33 = Utf8               SyncTest.java
{
  public com.soulboy.thread.SyncTest();
    descriptor: ()V
    flags: (0x0001) ACC_PUBLIC
    Code:
      stack=1, locals=1, args_size=1
         0: aload_0
         1: invokespecial #1                  // Method java/lang/Object."<init>":()V
         4: return
      LineNumberTable:
        line 3: 0

  public synchronized void test1();
    descriptor: ()V
    flags: (0x0021) ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=2, locals=1, args_size=1
         0: getstatic     #7                  // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #13                 // String test1...
         5: invokevirtual #15                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: return
      LineNumberTable:
        line 6: 0
        line 7: 8

  public void test2();
    descriptor: ()V
    flags: (0x0001) ACC_PUBLIC
    Code:
      stack=2, locals=3, args_size=1
         0: aload_0
         1: dup
         2: astore_1
         3: monitorenter
         4: getstatic     #7                  // Field java/lang/System.out:Ljava/io/PrintStream;
         7: ldc           #21                 // String test2...
         9: invokevirtual #15                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        12: aload_1
        13: monitorexit
        14: goto          22
        17: astore_2
        18: aload_1
        19: monitorexit
        20: aload_2
        21: athrow
        22: return
      Exception table:
         from    to  target type
             4    14    17   any
            17    20    17   any
      LineNumberTable:
        line 10: 0
        line 11: 4
        line 12: 12
        line 13: 22
      StackMapTable: number_of_entries = 2
        frame_type = 255 /* full_frame */
          offset_delta = 17
          locals = [ class com/soulboy/thread/SyncTest, class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 /* chop */
          offset_delta = 4
}
SourceFile: "SyncTest.java"

死锁

  • 两个或两个以上的线程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象
  • 若无外力作用它们都将无法让程序进行下去

示例代码:死锁
程序无法结束

package com.soulboy.thread;

public class DeadLock {
    private static Object locka = new Object();
    private static Object lockb = new Object();

    public static void main(String[] args) {
        System.out.println("manin thread start");
        DeadLock deadLock = new DeadLock();

        new Thread(() -> {
            deadLock.methodA();
        },"方法A").start();
        new Thread(() -> {
            deadLock.methodB();
        },"方法B").start();
    }

    public void methodA() {
        synchronized (locka) {
            System.out.println("Method A acquired lockA——" + Thread.currentThread().getName());
            //睡眠,不释放锁
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            synchronized (lockb) {
                System.out.println("Method A acquired lockB" + Thread.currentThread().getName());
            }
        }
    }

    public void methodB(){
        synchronized (lockb) {
            System.out.println("Method B acquired lockB——" + Thread.currentThread().getName());
            //睡眠,不释放锁
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            synchronized (locka) {
                System.out.println("Method B acquired lockA" + Thread.currentThread().getName());
            }
        }
    }
}

输出结果

manin thread start
Method B acquired lockB——方法B
Method A acquired lockA——方法A

死锁产生的必要条件和解决方案

死锁形成的4个必要条件

  • 互斥条件资源不能共享,只能由一个线程使用
  • 请求与保持条件线程已经获得一些资源,但因请求其他资源
  • 不可抢占有些资源是不可强占的,当某个线程获得这个资源后,系统不能强行回收,只能由线程使用完自己释放
  • 循环等待条件多个线程形成环形链,每个都占用对方申请的下个资源

死锁常见的解决办法
打破死锁形成的4个必要条件之一即可

  • 调整申请锁的范围
  • 调整申请锁的顺序
  • 做好代码review

死锁排查方法

方式一:Jps+jstack

首先通过Jps找到进程id

image.png

image.png

根据进程pid,通过jstack查看指定进程的堆栈

D:\Project\redlock\src\main\java\com\soulboy\thread>jstack 4852

image.png

image.png

方式二:jconsole

D:\Project\redlock\src\main\java\com\soulboy\thread>jconsole

image.png

image.png

image.png

ThreadLocal

全称线程局部变量(Thread Local Variable),使用场合主要解决多线程中数据因并发产生不一致问题

  • ThreadLocal为每一个线程都提供了变量的副本,使得每个线程在某时间访问到的并不是同一个对象
  • 这样就隔离了多个线程对数据的数据共享,这样的结果是耗费了内存
  • 但大大减少了线程同步所带来性能消耗,也减少了线程并发控制的复杂度。
  • 同个线程共享数据(可以横跨Controller层、Service层、DAO层)

image.png

应用场景
ThreadLocal用作每个线程内需要独立保存信息,方便同个线程的其他方法获取该信息的场景。

  • 每个线程获取到的信息可能都是不一样的,前面执行的方法保存了信息后,后续方法可以通过ThreadLocal直接获取到
  • 避免了传参,类似于全局变量的概念
    比如用户登录令牌解密后的信息传递(用户权限信息、从用户系统获取到的用户名、用户ID)

image.png

实例代码:ThreadLocal
线程级别隔离,同一线程内共享

package com.soulboy.thread;

public class PokerGame {

    ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

    public static void main(String[] args) {
        PokerGame pokerGame = new PokerGame();

        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pokerGame.makeMoney();
                }
                pokerGame.showMoney();
            }finally {
                //线程结束后删除ThreadLocal
                pokerGame.threadLocal.remove();
            }
        },"soulboy").start();

        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    pokerGame.makeMoney();
                }
            }finally {
                //线程结束后删除ThreadLocal
                pokerGame.showMoney();
            }

        },"Leon").start();

        new Thread(() -> {
            try {
                for (int i = 0; i < 14; i++) {
                    pokerGame.makeMoney();
                }
            }finally {
                //线程结束后删除ThreadLocal
                pokerGame.showMoney();
            }
        },"Jerry").start();
    }

    public void makeMoney(){
        String name = Thread.currentThread().getName();
        Integer amount = threadLocal.get();
        ++amount;
        threadLocal.set(amount);
        System.out.println(name + " all in 挣钱");
    }

    public void showMoney() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "总共挣钱: " + threadLocal.get());
    }

}

输出结果

Leon all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
Leon all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
Leon all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
Leon all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Leon all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
soulboy all in 挣钱
Jerry all in 挣钱
Jerry all in 挣钱

Leon总共挣钱: 5
Jerry总共挣钱: 14
soulboy总共挣钱: 10

ThreadLocal核心代码分析

层次关系

  • Thread类,里面有一个ThreadLocalMap类型的变量,变量名字叫threadLocals,是ThreadLocalMap类型
  • ThreadLocal类,里面有一个ThreadLocalMap静态内部类
    提供一系列方法操作ThreadLocalMap,比如get/set/remove
    隔离Thread和ThreadLocalMap,防止直接创建ThreadLocalMap
    自身的get/set内部会判断当前线程是否已经绑定一个ThreadLocalMap,有就继续使用,没有就为其自身绑定
  • ThreadLocalMap 就是保存ThreadLocal的map结构,key就是ThreadLocal本身
    所以一个线程只能存储一个值,可以理解为JVM内部维护的一个Map<Thread,Object>
    当线程需要用到Object时,就用【当前线程】去Map里面获取对应的Object

操作方法

  • Thread类里面有一个ThreadLocalMap类型的变量,不能直接操作这个ThreadLocalMap
  • 需要通过工具箱ThreadLocal才可以操作ThreadLocalMap
  • 一个Thread只能有一个ThreadLocalMap
  • ThreadLocalMap以ThreadLocal为键(key)存储数据

总结

  • ThreadLocal本身并不存储值(是一个壳子)它只是自己作为一个key来让线程从ThreadLocalMap获取value
  • 因为这个原理,所以ThreadLocal能够实现“每个线程之间的数据隔离”,获取当前线程的局部变量值,不受其他线程影响

image.png

JVM四大引用类型

image.png

引用类型被垃圾回收时刻用途生存时间
强引用从来不会对象的一般状态JVM停止运行时终止
软引用在内存不足时对象简单,缓存,文件缓存,图片缓存内存不足时终止
弱引用在gc垃圾回收时对象简单,缓存,文件缓存,图片缓存gc运行后终止
虚引用任何时候都可能被垃圾回收器回收基本不写,虚拟机使用,用来跟踪对象被垃圾回收器回收的活动未知

强引用

  • 强引用是使用最普遍的引用,当一个对象被强引用关联后,它就不会被垃圾回收器回收。
    比如String str=“abc”,变量str就是字符串“abc”的强引用
  • 即使在【内存不足】的情况下,JM宁愿抛出OutOfMemoryError,也不会回收这种对象

示例代码:Reference

  • 创建User类,覆盖finalize()函数,在finalize()中输出打印信息,方便追踪,来完成“非内存资源”的清理工作
  • finalize()是Object基类的一个方法,在JVM回收内存时执行的,是GC前对待回收的对象进行标记,标记成功后会回调此函数
  • JVM并不保证在回收内存时一定会调用finalize(),可以用这个System.gc(),手动提醒GC,可以进行标记并回收垃圾

User

package com.soulboy.thread;

public class User {
    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        System.out.println("User对象被标记为垃圾,invoke调用");
    }
}

ReferenceTest

package com.soulboy.thread;

import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ReferenceTest {

	public static void main(String[] args) {
        // 强引用
        testReference();
	}

    /**
     * 强引用
     */
    public static void testReference() {
        User user = new User();
        System.out.println("GC前: " + user);
        //会触发finalize()方法
        user = null;
        System.gc();
        System.out.println("GC后: " + user);
    }
}

输出

GC前: com.soulboy.thread.User@2aae9190
User对象被标记为垃圾,invoke调用
GC后: null

软引用

  • 软引用是用来描述一些还有用但非必需的对象,当系统内存资源不足时,垃圾回收器会回收这些对象
  • 只有当内存不足时,才会回收软引用关联的对象;当内存资源充足时,不会回收软引用关联的对象,直接调用GC也不回收
  • 一般在高速缓存中会使用,内存不够时则回收相关对象释放内存
  • 使用SoftReference<>包装对象就可以转换为软引用

示例代码:SoftReference
添加VM选项:-Xms100M -Xmx100M

image.png

User

package com.soulboy.thread;

public class User {
    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        System.out.println("User对象被标记为垃圾,invoke调用");
    }
}

ReferenceTest

package com.soulboy.thread;

import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ReferenceTest {

	public static void main(String[] args) {
        // 软引用
        testSoftReference();
	}

    /**
     * 软引用
     * 只能当内存不足时,才会回收【软引用】
     * 当内存资源充足时,不会回收软引用关联的对象,不用手工调用GC
     * -Xms100m -Xmx100m
     */
    public static void testSoftReference() {
        //将User对象转换成软引用
        SoftReference<User> userSoftReference = new SoftReference<>(new User());
        System.out.println("内存资源充足,GC前= " + userSoftReference.get());
        try {
            //睡眠1秒钟
            TimeUnit.SECONDS.sleep(1);
            //消耗100M内存
            consumeMemory(2000);
        } catch (InterruptedException e) {
           e.printStackTrace();
        } finally {
            System.out.println("内存不够用,GC后=" + userSoftReference.get());
        }
    }

    /**
     * 消耗内存
     */
    private static void consumeMemory(int total) {
        List<Byte[]> list = new ArrayList<>();
        for (int i = 0; total < 99; i++) {
            //1MB
            Byte[] bytes = new Byte[1 * 1024 * 1024];
            list.add(bytes);
        }
    }

}

输出

GC前: com.soulboy.thread.User@2aae9190
User对象被标记为垃圾,invoke调用
GC后: null

弱引用

  • 弱引用也是用来描述非必需对象,但是它的强度比软引用更弱一些,只能生存到下一次垃圾收集发生之前
  • 只要垃圾回收器工作时,无论内存是否充足,都会回收被弱引用关联的对象
  • 使用了WeakReference类来实现弱引用

示例代码:WeakReference

package com.soulboy.thread;

import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ReferenceTest {

	public static void main(String[] args) {
        // 弱引用
        testWeakReference();
	}

    /**
     * 弱引用
     */
    public static void testWeakReference() {
        WeakReference<User> userWeakReference = new WeakReference<>(new User());
        System.out.println("内存资源充足,GC前= " + userWeakReference.get());
        try {
            System.gc();
            //睡眠1秒钟
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("内存不够用,GC后=" + userWeakReference.get());
        }
    }

}

输出

内存资源充足,GC前= com.soulboy.thread.User@2aae9190
User对象被标记为垃圾,invoke调用
内存不够用,GC后=null

虚引用

  • 最弱的一种引用关系,一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来取得一个对象实例
  • 一个对象设置虚引用关联,目的就是能在这个对象被收集器回收时收到一个系统通知
  • 使用 PhantomReference类来实现虚引用,必需要组合使用一个引用队列ReferenceQueue
  • 当垃圾回收器要回收一个对象时,如果发现它还有虚引用,会在回收对象的内存之前,把这个虚引用加入到关联的引用队列中
    在虚引用对象传到它的引用队列之前会调用对象的finalize方法

示例代码:PhantomReference

package com.soulboy.thread;

import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ReferenceTest {

	public static void main(String[] args) {
        // 虚引用
        PhantomReference();
	}
    /**
     * 虚引用
     * 弱引用和虚引用指向的对象在发生GC时一定会被回收,虚引用是得不到引用的对象实例
     */
    public static void PhantomReference() {
        //引用队列
        ReferenceQueue referenceQueue = new ReferenceQueue<>();
        //创建虚引用
        PhantomReference<User> userPhantomReference = new PhantomReference<>(new User(), referenceQueue);
        //内存够用,GC前= null    不能直接get,这里虽然是null,但是其实是存在的
        System.out.println("内存够用,GC前= " + userPhantomReference.get());
        //User对象被标记为垃圾,invoke调用
        System.gc();
        //第二次gc则没有触发invoke调用
        System.gc();
    }

}

输出

内存够用,GC前= null
User对象被标记为垃圾,invoke调用

ThreadLocal内存泄漏思考

ThreadLocal为啥需要设计成弱引用?为啥ThreadLocal用完需要remove呢?

  • ThreadLocal中的一个内部类ThreadLocalMap,这个类没有实现map接口,是一个普通的ava类,但是实现的类似map的功能
  • 每个数据用Entry保存,继承WeakReference 指向ThreadLocal(所以是弱引用)键值对存储,键为ThreadLocal的自身引用
  • 每个线程持有一个ThreadLocalMap对象,每一个新的线程Thread都会实例化一个ThreadLocalMap
  • 并赋值给成员变量threadLocals,使用时若已经存在threadLocals,则直接使用已经存在的对象

假如 ThreadLocal强引用丢失,线程还存活那 key是强引用,划没法被回收,造成key内存泄露,所以需要弱引用key是弱引用被回收了,但是value也被占据了,所以需要调用remove 方法

Key回收问题

  • 如果ThreadLocal的引用丢失,ThreadLocalMap的Key如果是强引用,则没法被回收,造成泄露
  • 所以设计成弱引用,这个时候触发GC时,Key必定会被回收
  • 这个操作是ThreadLocal自身设计进行了解决

Value回收问题

  • 由于Key是弱引用被回收了,然后key是null,但是value是强引用对象没法被回收和访问,就导致内存泄露
  • 所以用完需要remove相关的value
  • 这个操作需要开发人员进行操作

  • 常规使用的线程,如果线程对象结束被回收,则上面的key和value都可以被回收
  • 但是业务里面多数是使用线程池,就导致线程不能被回收,从而如果没remove对应的值,则会导致OOM
  • 常规set/get方法里面也会清除key为null的entry对象的方法,但实际开发还是需要直接调用remove方法删除

image.png
image.png

如果ThreadLocal涉及为强引用,ThreadLocalMap的Key则没有办法被回收
如果ThreadLocal涉及为强弱用,内存不够时才会被回收,会出现Stop-The-World(STW)

JVM中垃圾收集的Stop-The-World现象及其影响

在JVM中,垃圾收集(Garbage Collection, GC)的Stop-The-World(STW)现象是指在执行垃圾回收时,所有的Java应用线程都被暂停,只有垃圾收集线程在运行。这是必要的,因为垃圾收集器需要在一致的状态下来检查哪些对象是可回收的,而这个状态只能在整个应用暂停时才能保证。

Stop-The-World现象的影响:

  • 性能延迟
    STW期间,所有的应用线程都会暂停,这意味着在垃圾收集发生时,应用程序将不会处理任何用户线程或执行任何用户代码,这会导致性能延迟。
  • 响应时间
    对于交互式或者需要快速响应的应用,STW现象会导致用户感受到卡顿,影响用户体验。
  • 吞吐量下降
    频繁的STW现象会降低应用程序的整体吞吐量,因为CPU时间被垃圾收集线程占用,用户线程的执行时间减少。
  • 资源竞争
    在多处理器系统中,垃圾收集可能会因为需要与应用程序线程竞争处理器资源而变得更加复杂。
  • 垃圾收集效率
    STW现象的持续时间取决于多个因素,包括堆的大小、垃圾收集器的效率、对象的生命周期、应用程序的内存分配速率等。
  • 用户满意度
    对于实时或近实时的应用,STW现象可能导致用户满意度下降,因为它们对延迟非常敏感。
  • 系统稳定性
    长时间的STW现象可能导致系统看起来像是“冻结”了,这可能会引起用户的恐慌,甚至导致他们错误地重启应用程序或系统。

减少Stop-The-World现象的影响

为了减少STW现象的影响,现代的垃圾收集器采用了多种策略:
通过这些策略,虽然无法完全消除STW现象,但可以显著减少其对应用程序性能的影响。

  • 并发收集
    使用并发垃圾收集器,如G1或CMS,这些收集器尝试在应用程序运行的同时进行大部分的垃圾收集工作。
  • 增量式收集
    将垃圾收集过程分解成多个小步骤,每次只执行一小段时间,以减少单次STW的持续时间。
  • 分代收集
    新生代和老年代使用不同的收集策略,因为新生代的对象通常死亡得更快,可以更频繁地收集,而不需要STW整个堆。
  • 垃圾收集器调优
    通过调整垃圾收集器的参数,如堆大小、Eden区与Survivor区的比例、垃圾收集器的启动阈值等,可以优化垃圾收集的性能。
  • 应用设计
    在应用程序层面,可以通过优化代码来减少内存分配和对象的生命周期,从而减少垃圾收集的频率。
  • 软实时目标
    对于对延迟敏感的应用,可以选择那些以软实时为目标的垃圾收集器,如G1,它们提供了可预测的停顿时间。

CompletableFuture

  • JDK1.5有了Future和Callable的实现,想要异步获取结果,通常会以轮询的方式去获取结果
    image.png
    输出结果

    main线程:在执行
    main线程:现在可以去做其他操作……
    main线程:轮询中……
    pool-1-thread-1线程:任务开始
    pool-1-thread-1线程:任务结束
    main线程:轮询中…… future = hello
    main线程:在执行
    
  • JDK8里面引入的CompletableFuture,帮助我们简化异步编程复杂性,函数式编程让代码更加简洁
    可以在任务完成后做对应的callback回调处理

CompletableFuture核心用途

  • 聚合信息处理类的处理逻辑在项目开发中,由于业务规划逻辑的原因,业务需要从多个不同的地方获取数据,然后汇总处理为最终的结果,再返回给请求的调用方
  • 提升性能如果常用串行请求,则接口响应时间长,合理的利用CompletableFuture提高程序中可以并行化的部分,从而可以大幅度提升程序的性能
  • 多任务编排调度也可以使用CompletableFuture进行完成

image.png

CompletableFuture 核心设计思想和结构

CompletableFuture类实现了FutureCompletionStage接口,相当于一个Task编排工具

  • Future表示异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成;计算完成后只能使用 get 方法来获取结果,有cancel、get、isDone、isCancelled等方法
  • CompletionStage:是Java8新增接口,用于异步执行中的阶段处理,CompletableFuture是其中的一个实现类;对任务处理可以构造一条结果传递链,在结果传递过程中任何一个CompletionStage都可以对结果进行处理(包括异常处理、类型转换,可以构造非常简单的传递链也可以构造很复杂的传递链)。几个CompletionStage可以串联起来,一个完成的阶段可以触发下一阶段的执行

当前的Task到底由哪个Thread执行?

  • 使用的不好可能会有性能问题,根据compietableruture的方法命名可以掌握
  • 在没有指定线程池的情况下,使用的是CompletableFuture内部的线程池 ForkJoinPool线程数默认是 CPU 的核心数建议根据业务类型指定专属的外部线程池。
    一般不要所有业务共用一个线程池,避免有任务执行一些很慢的I/O操作,会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,影响整个系统的性能。
方法名功能描述
xxxx( )表示该方法将继续在当前执行CompletableFuture的方法线程中执行
xxxxAsync( )表示异步,在线程池中执行

image.png

CompletableFuture常用API

异步方法API(CompletableFuture的静态方法)

方法名描述
image.png无返回值,默认使用ForkJoinPool.commonPool()作为它的线程池执行异步代码
image.png无返回值,可以自定义线程池
image.png有返回值,默认使用ForkJoinPool.commonPool()作为它的线程池执行异步代码
image.png有返回值,可以自定义线程池

获取结果API(CompletableFuture对象)

方法名描述
image.png如果返回值没有返回,一直阻塞
image.png设置等待超时时间
image.png有返回值就返回,线程抛出异常就返回设置的默认值

其他常用API(CompletableFuture对象)

方法名描述
image.png方法无返回值,当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数
image.png方法有返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数
image.png不关心上一步的计算结果,执行下一个操作

CompletableFuture核心API示例

  • supplyAsync( )有返回值,默认使用线程池
/**
     * 有返回值,默认使用线程池 ForkJoinPool
     * @throws Exception
     */
    public static void testFuture2() throws Exception {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + "线程:任务开始");
            return "有返回值,可以自定义线程池";
        });

        System.out.println(Thread.currentThread().getName() + "线程拿到: " +  stringCompletableFuture.getNow("因为CompletableFuture未能立刻返回,所以返回默认值"));
        System.out.println(Thread.currentThread().getName() + "线程拿到: " +  stringCompletableFuture.get());
    }

输出结果

main线程拿到: 因为CompletableFuture未能立刻返回,所以返回默认值
ForkJoinPool.commonPool-worker-1线程:任务开始
main线程拿到: 有返回值,可以自定义线程池
  • thenApply( )任务编排,方法有返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一个任务的输入参数
/**
     * 任务编排
     * 方法有返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一个任务的输入参数
     * @throws Exception
     */
    public static void testFuture3() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "线程:任务一开始执行");
            return "有返回值,可以自定义线程池";
        });

        //当作入参传入,继续执行下一个处理
        CompletableFuture<Object> future2 = future.thenApply((ele) -> {
            System.out.println(Thread.currentThread().getName() + "线程:拿到参入——" + ele + "——开始执行任务二");

            return ele + " ~~~2次处理";
        });
        
        //main线程
        System.out.println(Thread.currentThread().getName() + "线程拿到结果:"+future2.get());
    }

输出结果

ForkJoinPool.commonPool-worker-1线程:任务一开始执行
ForkJoinPool.commonPool-worker-1线程:拿到参入——有返回值,可以自定义线程池——开始执行任务二
main线程拿到结果:有返回值,可以自定义线程池 ~~~2次处理
  • thenAccept( )无返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一个任务的输入参数
    任务编排(记录日志)
/**
     * 任务编排(记录日志)
     * 无返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一个任务的输入参数
     * @throws Exception
     */
    public static void testFuture4() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "线程:任务一开始执行");
            return "有返回值,可以自定义线程池";
        });

        //当作入参传入,继续执行下一个处理
        CompletableFuture<Void> future2 = future.thenAccept((ele) -> {
            System.out.println(Thread.currentThread().getName() + "线程:拿到参入——" + ele + "——开始执行任务二。 无返回值,并记录日志");
        });

        //main线程
        System.out.println(Thread.currentThread().getName() + "线程拿到结果:"+future2.get());
    }

CompletableFuture嵌套示例:thenCompose解决嵌套,提高代码可读性

  • 日常的任务中,通常定义的方法都会返回CompletableFuture 类型,方便后续操作。
  • 然后将该任务的执行结果Future作为方法入参然后执行指定的方法,返回一个新的CompletableFuture任务
  • 它们之间存在着业务逻辑上的先后顺序(嵌套)

thencompose(避免CompletableFuture嵌套)

  • 推荐:用来连接两个CompletableFuture,是生成一个新的CompletableFuture,用于组合多个CompletableFuture
  • 不推荐:也可以使用 thenApply()方法来描述关系,但返回的结果就会发生CompletableFuture 的嵌套
    CompletableFuture <CompletableFuture<Product>> 这样的情况,需要get两次

image.png

示例代码:商品信息(不同信息分别存储在不同的表上)
分别展示了嵌套和非嵌套

Product(POJO)

package com.soulboy.future;

public class Product {
    private int id;

    private String title;

    private String detail;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getDetail() {
        return detail;
    }

    public void setDetail(String detail) {
        this.detail = detail;
    }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                ", title='" + title + '\'' +
                ", detail='" + detail + '\'' +
                '}';
    }
}

ProductDetailService (商品详情)

package com.soulboy.future;

import java.util.HashMap;
import java.util.Map;

public class ProductDetailService {
    private static final Map<Integer, String> map = new HashMap<>();

    static{
        map.put(1, "Product 1详情");
        map.put(2, "Product 2详情");
        map.put(3, "Product 3详情");
        map.put(4, "Product 4详情");
        map.put(5, "Product 5详情");
    }

    public String getById(int id) {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            System.out.println("Detail service is processing..." + Thread.currentThread().getName());

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return map.get(id);
    }

}

ProductDetailService(商品详情信息)

package com.soulboy.future;

import java.util.HashMap;
import java.util.Map;

public class ProductDetailService {
    private static final Map<Integer, String> map = new HashMap<>();

    static{
        map.put(1, "Product 1详情");
        map.put(2, "Product 2详情");
        map.put(3, "Product 3详情");
        map.put(4, "Product 4详情");
        map.put(5, "Product 5详情");
    }

    public String getById(int id) {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            System.out.println("Detail service is processing..." + Thread.currentThread().getName());

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return map.get(id);
    }

}

testEmbedFuture方法

/**
     *  嵌套(可读性差,编码繁琐)
     * @throws Exception
     */
    public static void testEmbedFuture() throws Exception {
        ProductService productService = new ProductService();
        ProductDetailService productDetailService = new ProductDetailService();
        int id = 1;

        CompletableFuture<CompletableFuture<Product>> future = CompletableFuture.supplyAsync(() -> {
            String title = productService.getById(id);
            Product product = new Product();
            product.setTitle(title);
            product.setId(id);
            return product;
        }).thenApply(new Function<Product, CompletableFuture<Product>>() {
            @Override
            public CompletableFuture<Product> apply(Product product) {
                return CompletableFuture.supplyAsync(() -> {
                    //可以用到上一步的入参
                    String detail = productDetailService.getById(product.getId());
                    product.setDetail(detail);
                    return product;
                });
            }
        });
        System.out.println(future.get().get());
    }

testEmbedFuture方法(不推荐)
嵌套(可读性差,编码繁琐)

/**
     *  嵌套(可读性差,编码繁琐)
     * @throws Exception
     */
    public static void testEmbedFuture() throws Exception {
        ProductService productService = new ProductService();
        ProductDetailService productDetailService = new ProductDetailService();
        int id = 1;

        CompletableFuture<CompletableFuture<Product>> future = CompletableFuture.supplyAsync(() -> {
            String title = productService.getById(id);
            Product product = new Product();
            product.setTitle(title);
            product.setId(id);
            return product;
        }).thenApply(new Function<Product, CompletableFuture<Product>>() {
            @Override
            public CompletableFuture<Product> apply(Product product) {
                return CompletableFuture.supplyAsync(() -> {
                    //可以用到上一步的入参
                    String detail = productDetailService.getById(product.getId());
                    product.setDetail(detail);
                    return product;
                });
            }
        });
        System.out.println(future.get().get());
    }

testEmbedFuture方法(推荐)
非嵌套(可读性好,编码简单)

/**
     *  非嵌套(可读性好,编码简单)
     * @throws Exception
     */
    public static void testThenComposeFuture() throws Exception {
        ProductService productService = new ProductService();
        ProductDetailService productDetailService = new ProductDetailService();
        int id = 1;

        CompletableFuture<Product> future = CompletableFuture.supplyAsync(() -> {
            String title = productService.getById(id);
            Product product = new Product();
            product.setTitle(title);
            product.setId(id);
            return product;
        }).thenCompose(new Function<Product, CompletionStage<Product>>() {
            @Override
            public CompletionStage<Product> apply(Product product) {
                return CompletableFuture.supplyAsync(() -> {
                    //可以用到上一步的入参
                    String detail = productDetailService.getById(product.getId());
                    product.setDetail(detail);
                    return product;
                });
            }
        });

        System.out.println(future.get());
    }

输出结果

Product service is processing...ForkJoinPool.commonPool-worker-1
Detail service is processing...ForkJoinPool.commonPool-worker-1
Product{id=1, title='Product 1', detail='Product 1详情'}

2个CompletableFuture合并(使用频繁)

需要请求两个个接口,然后把对应的CompletableFuture进行合并,返回一个新的CompletableFuture

  • thenCombine:在两个任务都执行完成后,把两个任务的结果合并。

示例代码

/**
     *  在两个任务都执行完成后,把两个任务的结果合并
     * @throws Exception
     */
    public static void testThenCombineFuture() throws Exception {
        ProductService productService = new ProductService();
        ProductDetailService productDetailService = new ProductDetailService();
        int id = 1;

        System.out.println("任务开始"+ LocalDateTime.now());

        //第一个任务
        CompletableFuture<Product> future1 = CompletableFuture.supplyAsync(() -> {
            String title = productService.getById(id);
            Product product = new Product();
            product.setTitle(title);
            product.setId(id);
            return product;
        });

        //第二个任务
        CompletableFuture<Product> future2 = CompletableFuture.supplyAsync(() -> {
            String detail = productDetailService.getById(id);
            Product product = new Product();
            product.setDetail(detail);
            return product;
        });

        //将上面两个任务结果合并,返回新的CompletableFuture<Product>
        CompletableFuture<Product> productCombineFuture = future1.thenCombine(future2, new BiFunction<Product, Product, Product>() {
            @Override
            public Product apply(Product product, Product product2) {
                //第一product有tile
                product.setDetail(product2.getDetail());
                return product;
            }
        });

        //拿到合并结果打印
        System.out.println(productCombineFuture.get());
        System.out.println("任务结束"+ LocalDateTime.now());
    }

输出结果

任务开始2024-07-29T14:18:36.392408200
Product service is processing...ForkJoinPool.commonPool-worker-1
Detail service is processing...ForkJoinPool.commonPool-worker-2
Product{id=1, title='Product 1', detail='Product 1详情'}
任务结束2024-07-29T14:18:37.407372300

多个CompletableFuture合并(使用频繁)

当超过了2个Future,需要对任务进行调度和组合。

方法名功能描述参数类型返回值类型
allOf与(需要等全部Future都拿到结果),因为allOf( )没有返回值,所以通过thenApply,获取每个CompletableFuture 的执行结果变长的CompletableFuture 的集合CompletableFuture< void >类型
anyOf或(任何一个Future拿到结果即可),每个 CompletableFuture 的返回值类型都可能不同,无法判断是什么类型,所以 anyOf的返回值是 CompletableFuture类型变长的CompletableFuture 的集合CompletableFuture< Object >类型

示例代码:allOf

/**
     *  与(需要等全部Future都拿到结果)**,因为allOf( )没有返回值,所以通过thenApply,获取每个CompletableFuture 的执行结果
     * @throws Exception
     */
    public static void testAllOf() throws Exception {
        //任务一
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("future1完成");
            return "future1";
        });

        //任务二
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("future2完成");
            return "future2";
        });

        //任务三
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("future3完成");
            return "future3";
        });

        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2, future3);
        System.out.println("任务开始"+ LocalDateTime.now());
        //阻塞等待全部任务完成
        allOfFuture.join();
        if (allOfFuture.isDone()) {
            System.out.println("全部任务完成");
        }
        System.out.println("任务结束"+ LocalDateTime.now());
    }

输出结果

任务开始2024-07-29T15:18:49.014263600
future1完成
future2完成
future3完成
全部任务完成
任务结束2024-07-29T15:18:50.021853400

示例代码:anyOf (很少用,使用场景比较少)

/**
     *   **或(任何一个Future拿到结果即可),每个 CompletableFuture 的返回值类型都可能不同,无法判断是什么类型,所以 anyOf的返回值是 CompletableFuture类型**
     * @throws Exception
     */
    public static void testAnyOf() throws Exception {
        //任务一
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("future1完成");
            return "future1";
        });

        //任务二
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("future2完成");
            return "future2";
        });

        //任务三
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("future3完成");
            return "future3";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
        System.out.println("任务开始"+ LocalDateTime.now());
        //阻塞等待全部任务完成
        anyOfFuture.join();
        if (anyOfFuture.isDone()) {
            System.out.println("全部任务完成: " + anyOfFuture.get());
        }
        System.out.println("任务结束"+ LocalDateTime.now());
    }

输出结果

任务开始2024-07-29T15:21:55.410113600
future1完成
全部任务完成: future1
任务结束2024-07-29T15:21:56.416740400

CompletableFuture异步编码

微服务架构下,接口单一职责,一个页面打开涉及多个模块需要同时调用
由于需要同时建立多个连接,中间会有性能损耗,部分页面需要使用聚合接口
则可以用CompletableFuture聚合多个响应结果一次性返回

image.png

优点

  • 减少建立连接数量
  • 网关和服务可以处理更多连接

缺点

  • 如果接口性能差异大,则容易性能好的接口被性能差的拖慢
  • 需要开发更多接口,数据量大泽需要更大的带宽

其他场景

  • 爬虫业务多个URL并行爬取和解析处理
  • 商品详情页信息组装:主图信息,详情图信息,SKU信息,评价信息等
  • 业务聚合接口:用户信息(积分,基本信息,权限)

示例代码

EduService

package com.soulboy.future;

public class EduService {
    public String getRank() {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "rank info";
    }

    public String getCategory() {
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "Category info";
    }

    public String getBanner() {
        try {
            Thread.sleep(3000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "Banner info";
    }

    public String getVideoCard() {
        try {
            Thread.sleep(4000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "VideoCard info";
    }

}

FutureTest
聚合服务的最终响应时间,取决于耗时最长的子服务

public class FutureTest {

    public static void main(String[] args) throws Exception {
        System.out.println("开始时间:" + LocalDateTime.now());
        Map<String, Object> homePage = testHomePageApi();
        System.out.println(homePage.toString());
        System.out.println("结束时间:" + LocalDateTime.now());
    }

    //创建线程池
    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
            16,
            32,
            100,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * 首页聚合服务
     * @return
     * @throws Exception
     */
    public static Map<String,Object> testHomePageApi() throws Exception {
        Map<String, Object> homePageInfo = new HashMap<>();
        EduService eduService = new EduService();

        //获取轮播图  没有返回值用runAsync()
        CompletableFuture<Void> bannerFuture = CompletableFuture.runAsync(() -> {
            String bannerInfo = eduService.getBanner();
            homePageInfo.put("banner", bannerInfo);
        }, executor);

        //排行榜  没有返回值用runAsync()
        CompletableFuture<Void> rankFuture = CompletableFuture.runAsync(() -> {
            String rankInfo = eduService.getRank();
            homePageInfo.put("rank", rankInfo);
        }, executor);

        //排行榜  没有返回值用runAsync()
        CompletableFuture<Void> categoryFuture = CompletableFuture.runAsync(() -> {
            String categoryInfo = eduService.getCategory();
            homePageInfo.put("CategoryInfo", categoryInfo);
        }, executor);

        //视频卡  没有返回值用runAsync()
        CompletableFuture<Void> videoCardInfoFuture = CompletableFuture.runAsync(() -> {
            String videoCardInfo = eduService.getVideoCard();
            homePageInfo.put("VideoCard", videoCardInfo);
        }, executor);

        //get()或join()都可以:等待所有Future全部运行完成
        CompletableFuture.allOf(bannerFuture, rankFuture, categoryFuture, videoCardInfoFuture).join();
        //CompletableFuture.allOf(bannerFuture, rankFuture, categoryFuture, videoCardInfoFuture).get();
        return homePageInfo;
    }
}

输出结果

开始时间:2024-07-29T16:20:40.896652100
{CategoryInfo=Category info, VideoCard=VideoCard info, rank=rank info, banner=Banner info}
结束时间:2024-07-29T16:20:44.902043600

ThreadPool

池化思想

  • 预先分配申请好相关资源,并维护复用好这些资源
  • 提前申请好资源,节省实时分配的时间成本提升性能降低资源损耗

应用场景

  • 内存池(Memory Pooling)预先申请内存,提高申请内存的速度,减少内存碎片
  • 连接池(Connection Pooling)预先申请数据库连接,并保存一定存活的连接,降低系统开销,提高响应速度
  • 对象池(Object Pooling)循环使用对象,减少资源在初始化和销毁时相关资源损耗

线程池ThreadPoolExecutor

是一种基于池化思想管理线程的工具,预先申请线程,重复利用的线程资源

维护多个线程,任务到到达时无需等待线程的创建立刻执行

线程池的优点

  • 预先分配线程,提升系统响应速度
  • 降低线程创建销毁资源损耗
  • 可以对所有线程进行统一管理控制
  • 线程任务分离,提升线程重用性

JDK里面的线程池关键类 ThreadPoolExecutor

image.png

类名功能描述
Executor定义了线程池的标准,包含了一个方法execute,参数为一个Runnable接口引用;调用者只需要提交任务,不用关心任务执行细节
ExecutorService继承Executor,拓展了更大的接口,包括线程池的关闭、状态管理、任务执行
AbstractExecutorService对ExecutorService的接口提供默认实现;Executor接口的execute方法是未被实现,execute方法是整个线程池的核心;所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略
ThreadPoolExecutor主要作用是提供有参构造方法创建线程池,也是围绕着ExecutorService接口的功能做实现;一个可供应用程序开发人员扩展的Executor实现类,主要负责线程池的创建和控制
Executors是一个工厂类,主要用来创建不同类型的线程池,是一个工具包方便开发者创建不同类型的线程池

线程池设计原理

如果生产者端比消费者端能力强,则需要一个队列进行缓冲

image.png

ThreadPoolExecutor线程池核心参数

参数功能描述
corePoolSize线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
maximumPoolSize最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务,直到线程数量达到maximumPoolSize
keepAliveTime当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
TimeUnit时间单位
workQueue缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
threadFactory线程创建的工厂,一般用默认的 Executors.defaultThreadFactory()
handler当pooI已经达到max size的时候,如何处理新任务

ThreadPoolExecutor线程池几个重要参数,什么时候会创建线程

  • 查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第二步
  • 查看阻塞队列是否已满,不满就将任务存储在阴塞队列中,否则执行第三步
  • 查看线程池是否已满(即是否达到最大线程池数),不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务

阻塞队列不占用CPU资源
如果队列的任务为空,阻塞队列可以保证任务队列中没有任务时,阻塞获取任务的线程,线程进入wait状态,释放cpu资源

阻塞队列自带阻塞和唤醒的功能,无任务执行时线程池利用阻塞队列的take方法挂起

阻塞队列可以维持核心线程的存活且不一直占用cpu资源,而一般的队列只能作为一个有限长度的缓冲,没有其他功能

任务队列满,最大线程数也创建满后,新任务进来会根据拒绝策略进行处理

  • AbortPolicy()该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出ReectedExecutionException异常。
  • CallerRunsPolicy()交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等。待线程池中的线程去执行
  • DiscardPolicy()如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
  • DiscardOldestPolicy()丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列

队列长度是怎样的?

线程池的线程数量要配置多少,如果没有任务的话,线程数量又应该怎么变化?

当队列没有任务的时候,线程进入睡眠还是成为阻塞状态?(如果是睡眠状态需要定时唤醒查看队列有没有任务,如果是阻塞则一直需要监听队列)

Executors

Executors工具类可以方便开发者快速创建线程池

企业项目开发

  • 日常开发项目,不会直接通过newthread方式进行多线程开发,而是通过线程池方式进行创建
  • JUC包下的Executors工具类提供多种线程池
线程池名称说明
newFixedThreadPool一个定长线程池,可控制线程最大并发数
newCachedThreadPool一个可缓存线程池。如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。该线程池会复用空闲的线程,从而减少创建对象和回收对象带来开销。使用 synchronousQueue 队列,不存储元素的阻塞队列,每put一个必须等待take操作,否则不能添加元素
newSingleThreadExecutor一个单线程化的线程池,用唯一的工作线程来执行任务
newScheduledThreadPool一个定长线程池,支持定时/周期性任务执行,线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,使用 DelayedworkQueue 延迟队列,可以指定多久时间后才可以从队列获取元素

这些线程池创建的底层API,就是基于ThreadpoolExecutor 进行创建,上述的线程池相关参数有没什么问题呢?

阿里巴巴编码规范(线程池不允许使用 Executors 去创建)

Executors建的线程池底层也是调用ThreadPoolExecutor,只不过使用不同的参数、队列、拒绝策略等,如果使用不当,会造成资源耗尽问题直接使用ThreadPoolExecutor让使用者更加清楚线程池允许规则,常见参数的使用,避免风险

Executors 返回的线程池对象的弊端如下:

线程池名称弊端
FixedThreadPool 和 SingleThreadPool允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM
CachedThreadPool 和 ScheduledThreadPool允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM

image.png

生产环境事故

案例一(大促活动)
微服务架构下,前端请求后端获取商品详情信息,需要把商品的基础、详情信息、库存、优惠活动等聚合起来,一并的返回给用户,使用了CompletableFuture+自定义线程池;并行请求多个接口,响应时间更快,用户体验更好;基于阿里巴巴编码规范,创建了这样的线程池(32核64G内存的机器)

事故现象:大促活动下,多用户进入商品详情页出现超时,页面刷新不出来

故障分析

  • 高并发业务下,应该最大程度的快速响应用户
  • 案例中队列设置过长maximumPoolSize设置失效,导致请求数量增加时大量任务堆积在队列中
  • 应该不设置过大的队列缓冲并发任务,而是调高corePoolSize和maxPoolSize,快速创建更多线程去处理请求

故障代码

//队列长度10W,大量堆积,导致maximumPoolSize失效,相当于只有2个核心生效
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
                32,
                3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(100000),
                new ThreadPoolExecutor.AbortPolicy());

修正代码

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(32,
                128,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.AbortPolicy());

案例二(商家管理后台业务报表数据统计)
线下电商全国有多个门店,需要每天凌晨统计相关数据指标,由于涉及到大量数据统计,且希望数据越快出来越好;使用了 CompletableFuture+自定义线程池进行跑批处理;基干阿里巴巴编码规范,创建了这样的线程池(32核64G内存的机器)

事故现象:海量数据处理统计下,经常出现服务端500错误

故障分析

  • 这类场景需要执行大量的任务,但是对比高并发的快速响应,这类业务是不需要那么快及时响应
  • 而是需要充分利用好系统资源,更准确的完成相关统计
  • 应该配置足够多的队列去缓冲更多待计算的任务,调整合适的corePoolSize去处理任务
  • 也需要避免配置过多线程,由于线程的上下文切换也会导致大量资源的浪费

故障代码

//maximumPoolSize设置过大,频繁的CPU上下文切换会导致大量资源浪费
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
                1024,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.AbortPolicy());

修正代码

//队列长度10W去缓冲
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(32,
                124,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(100000),
                new ThreadPoolExecutor.AbortPolicy());

关于线程池的底层参数配置

互联网项目可以分10密集还是CPU密集项目,不同项目配置不一样

但并非万能配置,最好根据压测情况看再进一步优化参数,且不同的模块要做好线程池的隔离

任务类型配置策略
CPU密集型建议设置为跟核心数一样或者+1
IO密集型建议设置2倍CPU核心数

线程池状态转换和关闭API的区别

变量 ctl 这个AtomicInteger包含两部分的信息,使用的是位运算的方式,相比于基本运算,速度快很多

  • 运行状态(runState)高3位保存
  • 线程池内有效线程的数量(workercount),低29位保存

image.png

image.png

线程池状态转换

状态迁移:从小到大,-1,0,1,2,3不会逆向迁移

image.png

状态说明
RUNNING运行状态,能接受新提交的任务,并且也能处理阻塞队列中的任务
SHUTDOWN关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中已保存的任务
STOP不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程
TIDYING所有的任务都已终止了,workercount(有效线程数)为0
TERMINATEDterminated()方法执行完后进入该状态

线程池的关闭 shutdown和shutdownNow的区别

shutdown()

  • 可以安全关闭线程池,调用shutdown( )方法之后线程池并不是立刻就被关闭,这个时候线程池不能接受新的任务
  • 线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务
  • 用 shutdown( ) 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭,没有返回值

shutdownNow()

  • 立刻关闭,不推荐使用这一种方式关闭线程池
  • 调用法后线程池不能接受新的任务,会给所有线程池中的线程发送interrupt 中断信号
  • 尝试中断这些任务的执行,会将任务队列中正在等待的所有任务转移到一个List 中并返回
  • 可以根据返回的任务 List 来进行一些补救的操作,返回一个List<Runnable>

作者:Soulboy