1、并发容器概览
- ConcurrentHashMap 线程安全的HashMap
- CopyOnWriteArrayList 线程安全的List
- BlockingQueue 阻塞队列接口,非常适合数据共享的通道
- ConcurrentLinkedQueue 高效的非阻塞并发队列,可以看作线程安全的LinkedList
- ConcurrentSkipListMap 使用跳表的数据结构进行快速查找的map
2、集合类的历史
2.1、Vector和HashTable
方法syncronied修饰,并发性能差
Vector:
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
HashTable:
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
addEntry(hash, key, value, index);
return null;
}
2.2、 Collections工具类同步HashMap和ArrayList
方法类代码块同步锁,并发性能差
List<String> syncList = Collections.synchronizedList(new ArrayList<String>());
Map<String,String> hashMap = Collections.synchronizedMap(new HashMap<String,String>());
List:
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}
}
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}
}
Map:
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}
2.3、ConcurrentHashMap 和 CopyOnWriteArrayList
取代同步的ArrayList和HashMap
绝大多数情况下,ConcurrentHashMap 和 CopyOnWriteArrayList 性能优于同步的ArrayList和HashMap(如果一个List经常被修改,那么同步的ArrayList性能优于CopyOnWriteArrayList ,CopyOnWriteArrayList 适合读多写少的场景,每次写入都会复制完整的列表,比较消耗资源)
3、ConcurrentHashMap
3.1、Map简介
3.2、 为什么HashMap是线程不安全的?
- 同时put碰撞导致数据丢失
- 同时put扩容导致数据丢失
- 死循环造成cpu100%(jdk7之前) 扩容的时候链表死循环
3.3、ConcurrentHashMap 结构
3.3.1、Jdk1.7中的ConcurrentHashMap 结构
jdk1.7中的ConcurrentHashMap 最外层是多个segment,每个segment的底层数据结构于HashMap类似,仍然是数组和链表组成的拉链法。
每个segment设置了独立的ReentrantLock锁,每个segment之间互不影响,提高了并发效率
ConcurrentHashMap 默认有16个segment,所以最多可以支持16个线程并发写(操作分布在不同的segment上。默认值可以在初始化的时候设置,但是一旦初始化后,无法扩容
3.3.2、Jdk1.8中的ConcurrentHashMap 结构
- 第一种是最简单的,空着的位置代表当前还没有元素来填充。
- 第二种就是和 HashMap 非常类似的拉链法结构,在每一个槽中会首先填入第一个节点,但是后续如果计算出相同的 Hash 值,就用链表的形式往后进行延伸。链表长度大于某一个阈值(默认为 8),且同时满足一定的容量要求的时候,ConcurrentHashMap 便会把这个链表从链表的形式转化为红黑树的形式,目的是进一步提高它的查找性能。
- 第三种结构就是红黑树结构
final V putVal(K key, V value, boolean onlyIfAbsent){
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
3.3.3、为什么要把1.7的结构变为1.8的结构?
数据结构 : Java 7 采用 Segment 分段锁来实现,而 Java 8 中的 ConcurrentHashMap 使用数组 + 链表 + 红黑树
hash碰撞: 先使用拉链法,再转红黑树
保证并发安全:ava 7 采用 Segment 分段锁,而 Java 8采用CAS
查询复杂度:链表查询复杂度为O(n),但是java8中一旦转为红黑树,就变为O(logn)
为什么超过8转红黑树?
默认链表,占用内存更少,红黑树占用
达到冲突8次概率只有千万分之一,转红黑树可以确保极端情况下的查询效率
3.3.4、错误操作ConcurrentHashMap 不保证线程安全
读和写两个独立操作组合到一起就不是线程安全的,可以采用cas实现
public class ConcurrenthashMapDemo {
private static ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap();
public static void main(String[] args) throws InterruptedException {
int threadCount = 50;
long start = System.currentTimeMillis();
map.put("count", 0);
Runnable r = () -> {
IntStream.range(0, 100000).forEach(e -> {
//可以实现但是并发效率低
synchronized (ConcurrenthashMapDemo.class){
Integer count = map.get("count");
count ++;
map.put("count",count);
}
});
};
Thread[] threads = new Thread[threadCount];
IntStream.range(0, threadCount).forEach(e -> {
threads[e] = new Thread(r);
});
IntStream.range(0, threadCount).forEach(e -> {
threads[e].start();
try {
threads[e].join();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println("syncnized方式 count结果:" + map.get("count") + " 耗时:" + (end - start));
long start2 = System.currentTimeMillis();
map.put("count", 0);
Runnable r2 = () -> {
IntStream.range(0, 100000).forEach(e -> {
//并发效率高
while(true){
Integer count = map.get("count");
Integer newCount = count + 1;
boolean result = map.replace("count", count, newCount);
if(result){
break;
}
}
});
};
Thread[] otherThreads = new Thread[threadCount];
IntStream.range(0, threadCount).forEach(e -> {
otherThreads[e] = new Thread(r2);
});
IntStream.range(0, threadCount).forEach(e -> {
otherThreads[e].start();
try {
otherThreads[e].join();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
});
long end2 = System.currentTimeMillis();
System.out.println("CAS方式 count结果:" + map.get("count") + " 耗时:" + (end2 - start2));
}
}
4、CopyOnWriteArrayList
4.1、作用与适合场景
CopyOnWrite 的意思是说,当容器需要被修改的时候,不直接修改当前容器,而是先将当前容器进行 Copy,复制出一个新的容器,然后修改新的容器,完成修改之后,再将原容器的引用指向新的容器。
CopyOnWriteArrayList 的所有修改操作(add,set等)都是通过创建底层数组的新副本来实现的,所以 CopyOnWrite 容器也是一种读写分离的思想体现,读和写使用不同的容器。
作用
代替Vector和SynchronizedList,Vector和SynchronizedList锁粒度太大,并发效率比较低,并且迭代时无法编辑。
适合场景
读操作尽可能的块,而写操作慢一些也没有太大关系。
例如:黑名单
4.2、读写规则
读取完全不用加锁,写入不会阻塞读取操作,只有写入和写入之间需要同步等待。
4.3、实例
ArrayList迭代时无法编辑,CopyOnWriteArrayList迭代时可以修改,但是修改对迭代没有影响,还是会迭代修改之前的数据
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) {
//异常ConcurrentModificationException 不能再迭代时修改
// List<String> list = new ArrayList<>();
List<String> list = new CopyOnWriteArrayList<>();
IntStream.range(1,6).forEach(e -> list.add(e + ""));
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
String next = iterator.next();
if("2".equals(next)){
list.remove("5");
}
if("3".equals(next)){
list.add("6");
}
System.out.println("list: " + list);
System.out.println(next);
}
}
}
ArrayList输出
list: [1, 2, 3, 4, 5]
1
list: [1, 2, 3, 4]
2
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at com.imooc.thread_demo.collection.CopyOnWriteArrayListDemo.main(CopyOnWriteArrayListDemo.java:26)
CopyOnWriteArrayList输出
list: [1, 2, 3, 4, 5]
1
list: [1, 2, 3, 4]
2
list: [1, 2, 3, 4, 6]
3
list: [1, 2, 3, 4, 6]
4
list: [1, 2, 3, 4, 6]
5
4.4、缺点
数据一致性问题:只能保证数据最终一致,不能保证数据实时一致。
内存占用问题:CopyOnWriteArrayList时创建副本实现写,内存占用大
4.5、源码分析
数据结构
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
public E get(int index) {
return get(getArray(), index);
}
5、并发队列
5.1、阻塞队列
从队列取数据,队列里无数据,则阻塞,往队列放数据,如果队列已满,那么就无法继续插入,则阻塞
5.1.1、阻塞队列的常用方法
- 抛出异常:add、remove、element
- add 方法是往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满
- remove 方法的作用是删除元素,如果我们删除的队列是空的,那么 remove 方法就会抛出异常。
- element 方法是返回队列的头部节点,但是并不删除,如果队列是空的,抛出异常。
- 返回结果但不抛出异常:offer、poll、peek
- offer 方法用来插入一个元素,并用返回值来提示插入是否成功,如果添加成功会返回 true,如果队列满了,无法插入,返回false
- 移除并返回队列的头节点,如果队列是空的,返回 null 作为提示
- 返回队列的头元素但并不删除,如果队列里面是空的,它便会返回 null 作为提示
3.阻塞:put、take
- put 队列已满,那么就无法继续插入,则阻塞
- take 从队列取数据,队列里无数据,则阻塞
5.1.2、ArrayBlockingQueue
有界
指定容量
公平:可以指定是否需要保证公平,如果想保证公平,等待最长的线程会被优先处理,不过同时会带来一定的性能损耗
5.1.3、LinkedBlockingQueue
无界
容量 Integer.MAX_VALUE
内部结构:node,两把锁
5.1.4、PriorityBlockingQueue
支持优先级的无界阻塞队列
public static void main(String[] args) {
PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue(10,
Comparator.comparingInt(o -> ((Student)o).getAge()));
priorityBlockingQueue.add(new Student(13,"张三"));
priorityBlockingQueue.add(new Student(17,"李四"));
priorityBlockingQueue.add(new Student(12,"王五"));
Student take = null;
do{
take = (Student)priorityBlockingQueue.poll();
if(take != null){
System.out.println(take.getName());
}
}while(take != null);
}
5.1.5、SynchronousQueue
SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取
5.1.6、DelayQueue
延迟队列,根据延迟时间排序
5.2、非阻塞并发队列
ConcurrentLinkedQueue
非阻塞队列 ConcurrentLinkedQueue 使用 CAS 非阻塞算法 + 不停重试,来实现线程安全,适合用在不需要阻塞功能,且并发不是特别剧烈的场景