目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

原子类

原子类

 原子是不可分割的最小单位,故原子类可以认为其操作都是不可分割。
 一个操作是不可中断的,即便是在多线程情况下也可以保证。
java.uti.concurrent.aomic包下有很多具有原子特性的类。

原子类 VS 锁

  • 粒度更细:原子变量可以把竞争范围缩小到变量级别,这是我们可以获得的最细粒度的情况,锁的粒度通常大于原子类。
  • 效率更高:通常,使用原子类的效率会比使用锁的效率更高,除了高度竞争的情况下。

6类原子类纵览

6类原子类纵览

基本类型原子类

 以 AtomicInteger 为例,本质是对 Integer 的封装,提供原子的访问和更新操作,其本质是基于 CAS 技术。

AtomicInteger 常用方法

* get() 	获取当前的值
* getAndSet()	获取当前的值,并设置新的值
* getAndIncrement()	获取当前的值,并自增
* getAndDecrement()	获取当前的值,并自减
* getAndAdd()	获取当前的值,并加上预期的值
* compareAndSet()	如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)

示例:原子类的用法

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 描述:     演示AtomicInteger的基本用法,对比非原子类的线程安全问题,使用了原子类之后,不需要加锁,也可以保证线程安全。
 */
public class AtomicIntegerDemo1 implements Runnable {

    private static final AtomicInteger atomicInteger = new AtomicInteger();

    private static volatile int basicCount = 0;

    public void incrementAtomic() {
        atomicInteger.getAndAdd(-90);
	//atomicInteger.getAndIncrement();
    }

    public void incrementBasic() {
        basicCount++;
    }

    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerDemo1 r = new AtomicIntegerDemo1();
        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("原子类的结果:" + atomicInteger.get());
        System.out.println("普通变量的结果:" + basicCount);
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            incrementAtomic();
            incrementBasic();
        }
    }
}
原子类的结果:-1800000
普通变量的结果:19663

数组类型原子类

 演示原子数组的使用方法

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * 描述:     演示原子数组的使用方法
 */
public class AtomicArrayDemo {

    public static void main(String[] args) {
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
        Incrementer incrementer = new Incrementer(atomicIntegerArray);
        Decrementer decrementer = new Decrementer(atomicIntegerArray);
        Thread[] threadsIncrementer = new Thread[100];
        Thread[] threadsDecrementer = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threadsDecrementer[i] = new Thread(decrementer);
            threadsIncrementer[i] = new Thread(incrementer);
            threadsDecrementer[i].start();
            threadsIncrementer[i].start();
        }

//        Thread.sleep(10000);
        for (int i = 0; i < 100; i++) {
            try {
                threadsDecrementer[i].join();
                threadsIncrementer[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < atomicIntegerArray.length(); i++) {
		//开始是 0,最后也还是 0,找 0 印证是否执行成功
//            if (atomicIntegerArray.get(i)!=0) {
//                System.out.println("发现了错误"+i);
//            }
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println("运行结束");
    }
}

class Decrementer implements Runnable {

    private AtomicIntegerArray array;

    public Decrementer(AtomicIntegerArray array) {
        this.array = array;
    }

    @Override
    public void run() {
        for (int i = 0; i < array.length(); i++) {
            array.getAndDecrement(i);
        }
    }
}

class Incrementer implements Runnable {

    private AtomicIntegerArray array;

    public Incrementer(AtomicIntegerArray array) {
        this.array = array;
    }

    @Override
    public void run() {
        for (int i = 0; i < array.length(); i++) {
            array.getAndIncrement(i);
        }
    }
}

引用类型原子类

 AtomicReference 可以让一个对象保证原子性,功能比基本类型原子类强一些,因为一个对象中可以包含多个属性。

示例:引用类可以实现自旋锁

/**
 * 描述:     自旋锁
 */
public class SpinLock {
    //原子引用类(具备CAS能力)
    private AtomicReference<Thread> sign = new AtomicReference<>();

    /**
     * 加锁
     */
    public void lock() {
        Thread current = Thread.currentThread();
        //期待没有人持有锁(null)、让当前线程持有锁(current)
        //直到原子引用被赋值为当前的线程之后,才会停止。
        while (!sign.compareAndSet(null, current)) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "自旋获取失败,再次尝试");
        }
    }

    /**
     * 解锁
     */
    public void unlock() {
        Thread current = Thread.currentThread();
        //期待持有锁的人是current线程,然后清除锁
        sign.compareAndSet(current, null);
    }

    public static void main(String[] args) throws InterruptedException {
        SpinLock spinLock = new SpinLock();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "开始尝试获取自旋锁");
                spinLock.lock();
                System.out.println(Thread.currentThread().getName() + "获取到了自旋锁");
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    spinLock.unlock();
                    System.out.println(Thread.currentThread().getName() + "释放了自旋锁");
                }
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        Thread.sleep(50);
        thread2.start();
    }
}

把普通变量升级为具有原子性的变量

 AtomicIntegerFieldUpdater 对普通变量进行升级
 其背后原理利用的是反射

* 可以范围(public)
* 不支持 static
* 

 适用场景如下:

  • 变量类型不是由我们创建的,无权修改变量类型
  • 偶尔需要一个原子 get-set 操作,并不是一直都需要原子操作。
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
 * 描述:     演示AtomicIntegerFieldUpdater的用法
 */
public class AtomicIntegerFieldUpdaterDemo implements Runnable{

    static Candidate tom;
    static Candidate peter;

    //原子类升级:需要引用类型和引用类型的具体操作的属性
    public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater
            .newUpdater(Candidate.class, "score");

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            peter.score++;
            scoreUpdater.getAndIncrement(tom);//升级成原子类,操作对象中的属性自增
        }
    }

    public static void main(String[] args) throws InterruptedException {
        tom=new Candidate();
        peter=new Candidate();
        AtomicIntegerFieldUpdaterDemo r = new AtomicIntegerFieldUpdaterDemo();
        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("普通变量:"+peter.score);
        System.out.println("升级后的结果"+ tom.score);
    }

    public static class Candidate {

        volatile int score;
    }
}

普通变量:17517
升级后的结果20000

Adder 累加器

累加器
 是 Java 8 引入的,相对是比较新的一个类。
 高并发下 LongAdder 比 AtomicLong 效率高(Integer类型也一样),不过本质是空间换时间。
 在竞争激烈的时候,LongAdder 把不同线程对应到不同的 Cell 上进行修改,降低了冲突的概率,是多段锁的理念,提高了并发性。
 LongAdder 引入了分段累加的概念,内部有一个 base 变量和一个 Cell[] 数组共同参与计数:

base变量:竞争不激烈,直接累加到该变量上。
Cell[]数组:竞争激烈,各个线程分散累加到自己的槽 Cell[i] 中。

 sum() 方法不是线程安全的,sum() = base + Cell[]

AtomicLong 和 LongAdder 使用场景

  • 在低竞争的情况下,两者差不多,在高竞争的情况下,LongAdder的预期吞吐量要高的多,但要消耗更多的内存(分段锁)。
  • LongAdder 适合的场景是统计求和的计数的场景,并且 LongAdder 基本只提供了 add 方法。而 AtomicLong 还具有 cas 方法。

**AtomicLong **
 AtomicLong 的实现原理是每一次加法都需要做同步,所以在并发的时候会导致冲突比较多,也就降低了效率。

示例:AtomicLong VS LongAdder

* AtomicLong 的性能在多线程成竞争激烈的情况下,每一次加法,都要 flush 和 refresh,导致很耗费资源。

 AtomicLongDemo
AtomicLong 的弊端

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 描述:     演示高并发场景下,LongAdder比AtomicLong性能好
 */
public class AtomicLongDemo {

    public static void main(String[] args) throws InterruptedException {
        AtomicLong counter = new AtomicLong(0);
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) {

        }
        long end = System.currentTimeMillis();
        System.out.println(counter.get());
        System.out.println("AtomicLong耗时:" + (end - start));
    }

    private static class Task implements Runnable {

        private AtomicLong counter;

        public Task(AtomicLong counter) {
            this.counter = counter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.incrementAndGet();
            }
        }
    }
}
100000000
AtomicLong耗时:720

 LongAdderDemo
LongAdder带来的改进

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * 描述:     演示高并发场景下,LongAdder比AtomicLong性能好
 */
public class LongAdderDemo {

    public static void main(String[] args) throws InterruptedException {
        LongAdder counter = new LongAdder();
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) {

        }
        long end = System.currentTimeMillis();
        System.out.println(counter.sum());
        System.out.println("LongAdder耗时:" + (end - start));
    }

    private static class Task implements Runnable {

        private LongAdder counter;

        public Task(LongAdder counter) {
            this.counter = counter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        }
    }
}

100000000
LongAdder耗时:70

Accumulator 累加器

 Accumulator 和 Adder 非常相似,Accumulator 就是一个更通用版本的 Adder。
 它可以更灵活的书写计算逻辑,适用于大量计算,需要并发计算的场景。前提是:

* 计算顺序不会影响最终计算结果

如果计算逻辑有严格的顺序,那么也不能使用此工具类。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.stream.IntStream;

/**
 * 描述:     演示LongAccumulator的用法
 */
public class LongAccumulatorDemo {

    public static void main(String[] args) {
        LongAccumulator accumulator = new LongAccumulator((x, y) -> 2 + x * y, 1);
        ExecutorService executor = Executors.newFixedThreadPool(8);
        IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

        executor.shutdown();
        while (!executor.isTerminated()) {

        }
        System.out.println(accumulator.getThenReset());
    }
}

作者:Soulboy