Concurrent Container
Vector、Hashtable
Vector 特点
类似一个线程安全的 ArrayList
* 大量容器操作方法都采用同步方法
Vector<String> vector = new Vector<>();
vector.add("test");
System.out.println(vector.get(0));
Hashtable 特点
类似一个线程安全的 HashMap
* 大量容器操作方法都采用同步方法
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("a", "80%");
System.out.println(hashtable.get("a"));
Collections.synchronizedXXX()
虽然这两个类不是线程安全的,但是可以用 Collections.synchronizedList(new ArrayList<Integer>())
、Conllections.synchronizedMap(new HashMap<>())
让其变成线程安全的。
* 大量使用同步代码块,并没有性能提升
// ArrayList 升级
List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
// HashMap 升级
Map<Object, Object> objectObjectMap = Collections.synchronizedMap(new HashMap<>());
ConcurrentHashMap、CopyOnWriteArrayList
取代同步的 HashMap 和同步的 ArrayList。
绝大多数并发情况下,CocurrentHashMap、CopyOnWriteArrayList 的性能都更好。
Java Map Hierarchy
Map 体系下的所有集合类型都要求传入的** key 必须为不可变对象**,即对象的 hash 值不会改变。
Time Complexity
Map(Interface)
HashMap
当我们给 put 方法传递键和值时,会对键调用 hashCode()方法,返回的 hashCode 用于找到 bucket 位置来存储键对象和值对象,作为 Map.Entry.
* 允许 key 为 null
* 线程不安全
* 迭代时不允许修改内容
* 只读的并发是安全的
为什么 HashMap 是线程不安全的?
* 同时 put 碰撞(hash)导致数据丢失
* 同时 put 扩容导致数据丢失
* 死循环造成的 CPU 100%(多个线程同时扩容,会造成链表的自循环,A指向B,B指向A。 HashMap本身就不支持并发,所以也不是个问题)
JDK 1.7 采用拉链法
JDK 1.8 引入数组 + 链表 + 红黑树
数组的加入提高了并发性,红黑树的加入使得查找的时间复杂度从 O(N) 变为 O(logN)。
红黑树优点
* 对二叉查找树 BST 的一种平衡策略,** O(logN)** vs O(N)
* 会自动平衡,防止极端不平衡从而影响查找效率的情况发生
红黑树特点
* 每个节点要么是红色,要么是黑色,但根节点永远是黑色
* 红色节点不能连续(也即是,红色节点的孩子和父亲都不能是红色)
* 从任一节点到其子树中每个叶子节点的路径都包含相同数量的黑色节点
* 所有的叶子节点都是黑色的
Hashtable
* 不允许 key 为 null
* 采用大量 synchonrized 同步方法对集合进行操作,性能较差。
* 线程安全
LinkedHashMap
* 保留插入元素的顺序
TreeMap
* 由于实现 SortedMap 接口,默认会对元素进行排序。
ConcurrentHashMap
JDK 1.7 的实现
Java 7 中的 ConcurrentHashMap 最外层是多个 segment,每个 segment 的底层数据结构与 HashMap 类似,仍然是数组和链表组成的拉链法。
每个 segment 独立上 ReentrantLock 锁,每个 segment 之间互不影响,提高了并发效率。segment 类继承 ReentrantLock 来保证线程安全。
ConcurrentHashMap 默认有 16 个 segment,所以最多可以同时支持 16 个线程并发写(操作分别分布在不同的 Segment 上)。这个默认值可以在初始化的时候设置为其他值,但是一旦初始化以后,是不可以扩容的。
JDK 1.8 引入 CAS + synchonrized
ConcurrnetHashMap 在 JDK 1.8 中完全借鉴了 JDK1.8 中的 HashMap 的实现。
取消了 segment 分段锁,引入 Node数组
提高了并发性(锁粒度更细)的同时也减少了并发冲突几率,链表
触发 红黑树
的加入使得查找的时间复杂度从 O(N) 变为 O(logN)。
保证并发安全性:CAS(读) + synchronized(写)
# putValue 流程
1. 判断 key value 不为空
2. 计算 hash 值
3. 根据对应位置节点的类型,来赋值,(helpTransger | 增长链表 | 给红黑树增加节点)
4. 检查满足阈值(8)并且数组大于阈值(64)就 “红黑树化”,如果数组阈值小于64,则优先扩容数组。
5. 返回oldValue
# get 流程
1. 计算 hash 值
2. 找到对应的位置,根据情况进行:
3. 直接取值
4. 红黑树里找值
5. 遍历链表取值
6. 返回找到的结果
为什么超过8要转为红黑树?
正常情况下几乎不会碰到树化的情况,概率只有千万分之一,正常情况下链表的长度不会达到8。(如果真的达到8说明hash算法出了问题,为了保证在这种极端情况下让链表查询依然有效率,在这种情况下就会触发 “树化”)。
作者进行了泊松分布概率函数
的计算如下:
* 0: 0.60653066
* 1: 0.30326533
* 2: 0.07581633
* 3: 0.01263606
* 4: 0.00157952
* 5: 0.00015795
* 6: 0.00001316
* 7: 0.00000094
* 8: 0.00000006
* more: less than 1 in ten million
ConcurrentHashMap 负载因子为什么是 0.75 ?
这样的默认值其实就是时间和空间成本上的一种折中:
在 ConcurrentHashMap 中的注释中提到:一个节点在 Bucket 中出现的概率是符合泊松分布的,使用 0.75 为负载因子,可以降低节点在某一个特定桶中出现的概率,降低了 hash 冲突,也就降低了查询时间(而查询是最频繁的操作(HashMap 的 get()与 put()方法都要用到查询)),同时也不会因为负载因子过小而导致 hash 表过大,占用过多的内存空间。
* 负载因子越小,降低 hash 冲突,提高查询效率,增加内存占用率
* 负载因子越大,提升 hash 冲突,降低查询效率,减少内存占用率
HashMap 中定义了几个常量
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
static final int MAXIMUM_CAPACITY = 1 << 30;
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
`DEFAULT_INITIAL_CAPACITY`: 初始容量,也就是默认会创建 16 个箱子,箱子的个数不能太多或太少。如果太少,很容易触发扩容,如果太多,遍历哈希表会比较慢。
`MAXIMUM_CAPACITY`: 哈希表最大容量,一般情况下只要内存够用,哈希表不会出现问题。
`DEFAULT_LOAD_FACTOR`: 默认的负载因子。因此初始情况下,当键值对的数量大于 `16 * 0.75 = 12` 时,就会触发扩容。
`TREEIFY_THRESHOLD`: 上文说过,如果哈希函数不合理,即使扩容也无法减少箱子中链表的长度,因此 Java 的处理方案是当链表太长时,转换成红黑树。这个值表示当某个箱子中,链表长度大于 8 时,有可能会转化成树。
`UNTREEIFY_THRESHOLD`: 在哈希表扩容时,如果发现链表长度小于 6,则会由树重新退化为链表。
`MIN_TREEIFY_CAPACITY`: 在转变成树之前,还会有一次判断,只有键值对数量大于 64 才会发生转换。这是为了避免在哈希表建立初期,多个键值对恰好被放入了同一个链表中而导致不必要的树化。
错误的使用 ConcurrentHashMap 也会导致线程不安全
一个工具类本身是线程安全的,但并不意味着多个组合使用工具类的操作也是线程安全的,不恰当的使用工具类,也会带来线程安全的问题。
repalce:组合操作
import java.util.concurrent.ConcurrentHashMap;
/**
* 描述: 组合操作并不保证线程安全
*/
public class OptionsNotSafe implements Runnable {
private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>();
public static void main(String[] args) throws InterruptedException {
scores.put("小明", 0);
Thread t1 = new Thread(new OptionsNotSafe());
Thread t2 = new Thread(new OptionsNotSafe());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(scores);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
while (true) {
Integer score = scores.get("小明");
Integer newScore = score + 1;
//false:代表值已经被其他线程修改过
//true:代表本线程修改成功
//repalce() 方法是一个组合操作, 检查预期值是否为 score ,如果是则尝试替换成 newscore,否则重新进入while()循环获取最新的 score
boolean b = scores.replace("小明", score, newScore);
//成功修改的时候退出
if (b) {
break;
}
}
}
}
}
putIfAbsent:组合操作(等价于如下代码)
// 存在就取出,不存在就put进去。
if(!map.containsKey(key))
return map.put(key,value)'
else
return map.get(key);
CopyOnWirteArrayList
代替 Vector 和 synchronizedList 的高性能并发版的 ArrayList。
* Vector 和 synchronizedList 锁的粒度大,大量使用同步方法导致并发性能较差。
* 迭代的时候无法编辑,导致用法和功能都不够强大
Copy-On-Wirte 并发容器还包括 CopyOnWriteArraySet,用来替代同步 Set。
CopyOnWirteArrayList 适用场景
* 读多,写少 :黑名单、每日更新;监听器:迭代操作远多于修改操作。
* 读操作可以尽可能的快,而写即使慢一些也没有太大关系
CopyOnWirteArrayList 读写规则
* 读写锁的规则升级:读取时完全不用加锁的,并且写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。
* 也就是说,只有写入和写入之间才会有锁。
示例:CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* 描述:演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
*/
public class CopyOnWriteArrayListDemo1 {
public static void main(String[] args) {
// ArrayList<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
System.out.println("list is" + list);
String next = iterator.next();
System.out.println(next);
if (next.equals("2")) {
list.remove("5");
}
if (next.equals("3")) {
list.add("3 found");
}
}
}
}
list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4]
3
list is[1, 2, 3, 4, 3 found]
4
list is[1, 2, 3, 4, 3 found]
5
CopyOnWirteArrayList 实现原理
当发生写时,复制副本,基于副本操作,容器引用指向新的副本。
这样读就可以不受限制,读可以继续在原始容器中读,写在副本中进行,可以同时进行。虽然会有一定程度上的过期,但是提高了并发效率。
适用于更新频率很低的情况,在这样的情况下不必担心会读取到过期数据的问题,因为本身对数据的实时性要求就不是特别高。
创建新副本,读写分离的思想。
“不可变”原理:旧的容器没有人去修改,所以旧的容器是不可变的,既然不会变,完全就是线程安全的,完全可以并发的读。
迭代的时候:读取依然使用旧的容器,所以不会报错。
CopyOnWirteArrayList 的缺点
* CopyOnWirte 容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,马上可以读到,请不要使用 CopyOnWrite 容器。
* 内存占用问题: 因为 CopyOnWrite 的写是复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* 描述: 对比两个迭代器
*/
public class CopyOnWriteArrayListDemo2 {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3});
System.out.println(list);
Iterator<Integer> itr1 = list.iterator();
list.remove(2);
Thread.sleep(1000);
System.out.println(list);
Iterator<Integer> itr2 = list.iterator();
itr1.forEachRemaining(System.out::println);// 1 2 3 取决于诞生时间
itr2.forEachRemaining(System.out::println);// 1 2 取决于诞生时间
}
}
[1, 2, 3]
[1, 2]
1
2
3
1
2
Concurrent Queue
各并发队列关系图
为什么要使用队列?
* 使用队列可以在线程间传递数据:生产者消费者模式、银行转账
* 考虑锁等线程安全问题的重任从 “你” 转移到了 “队列” 上。(变为阻塞队列的使用者)
BlockingQueue
阻塞队列是一个具有阻塞功能的队列。
通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的。
是否有界(容量多大):这是一个非常重要的属性,无界队列意味着里面可以容纳非常多(Integer.MAX_VALUE,约为2的31次,是非常大的一个数,可以近似认为是无限容量)
阻塞队列和线程池的关系:阻塞队列是线程池的重要组成部分。
# 阻塞队列常用方法
* take() 获取并移除队列头结点,如果执行take()的时候,队列里无数据,则阻塞,直到队列里有数据。
* put() 插入元素,如果队列已满,则无法继续插入,则阻塞,直到队列里有了空闲空间。
* remove() 删除队列头节点,如果队列中为空,则抛出异常。
* add() 插入元素,如果队列已满,则抛出异常。
* element() 返回队列头元素,如果队列中为空,则抛出异常。
* offer 添加,满则返回 false,添加成功返回 true
* poll 取,如果队列为空,则返回null
* peek 取出的同时会删除,如果队列为空,则返回null
ArrayBlockingQueue
* 有界
* 初始化需要指定容量
* 公平:公平指的是等待了最长的线程会被优先处理,不过这会地带来一定的性能损耗。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 描述: TODO
*/
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
Interviewee r1 = new Interviewee(queue);
Interviewer r2 = new Interviewer(queue);
new Thread(r1).start();
new Thread(r2).start();
}
}
class Interviewee implements Runnable {
BlockingQueue<String> queue;
public Interviewee(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("10个候选人都来啦");
for (int i = 0; i < 10; i++) {
String candidate = "Candidate" + i;
try {
queue.put(candidate);
System.out.println("安排好了" + candidate);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
queue.put("stop");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Interviewer implements Runnable {
BlockingQueue<String> queue;
public Interviewer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg;
try {
while(!(msg = queue.take()).equals("stop")){
System.out.println(msg + " 面试结束");
}
System.out.println("所有候选人都结束了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
10个候选人都来啦
安排好了Candidate0
安排好了Candidate1
安排好了Candidate2
Candidate0 面试结束
Candidate1 面试结束
Candidate2 面试结束
Candidate3 面试结束
安排好了Candidate3
安排好了Candidate4
安排好了Candidate5
安排好了Candidate6
Candidate4 面试结束
安排好了Candidate7
Candidate5 面试结束
安排好了Candidate8
Candidate6 面试结束
安排好了Candidate9
Candidate7 面试结束
Candidate8 面试结束
Candidate9 面试结束
所有候选人都结束了
LinkedBlockingQueue
* 无界
* 内部结构:Node、两把锁(读锁、写锁)。分析 put 方法
* ReentranLock
PriorityBlockingQueue
* 无界队列(put不会阻塞,take时有可能会阻塞)
* 支持优先级
* 自然排序(不是先进先出)
* PriorityQueue 的线程安全版,采用ReentranLock
SynchronousQueue
* 队列容量为0,不存储任何元素
* 直接交换,不做存储,所以效率很高
* 没有peek()方法
* 是一个极好的用来直接传递的并发数据结构
* SynchronousQueue 是线程池 Executors.newCachedThreadPool() 使用的阻塞队列
DelayQueue
* 延迟队列,根据延迟时间排序
* 元素需要实现 Delayed 接口,规定排序规则
Non-Blocking Queue
ConcurrentLinkedQueue
并发包中的非阻塞队列只有 ConcurrentLinkedQueue 这一种
* 链表作为其数据结构
* 使用 CAS 费阻塞算法来实现线程安全(不具备阻塞功能)
* 适合用在队性能要求比较高的并发场景,出场频率相对比较少
选择适合的队列
- 边界
- 容量
- 吞吐量(LinkedBlockingQueue 优于 ArrayBlockingQueue)