1、并发容器概览

  • ConcurrentHashMap 线程安全的HashMap
  • CopyOnWriteArrayList 线程安全的List
  • BlockingQueue 阻塞队列接口,非常适合数据共享的通道
  • ConcurrentLinkedQueue 高效的非阻塞并发队列,可以看作线程安全的LinkedList
  • ConcurrentSkipListMap 使用跳表的数据结构进行快速查找的map

2、集合类的历史

2.1、Vector和HashTable

方法syncronied修饰,并发性能差

  1. Vector:
  2. public synchronized boolean add(E e) {
  3. modCount++;
  4. ensureCapacityHelper(elementCount + 1);
  5. elementData[elementCount++] = e;
  6. return true;
  7. }
  8. HashTable:
  9. public synchronized V put(K key, V value) {
  10. // Make sure the value is not null
  11. if (value == null) {
  12. throw new NullPointerException();
  13. }
  14. // Makes sure the key is not already in the hashtable.
  15. Entry<?,?> tab[] = table;
  16. int hash = key.hashCode();
  17. int index = (hash & 0x7FFFFFFF) % tab.length;
  18. @SuppressWarnings("unchecked")
  19. Entry<K,V> entry = (Entry<K,V>)tab[index];
  20. for(; entry != null ; entry = entry.next) {
  21. if ((entry.hash == hash) && entry.key.equals(key)) {
  22. V old = entry.value;
  23. entry.value = value;
  24. return old;
  25. }
  26. }
  27. addEntry(hash, key, value, index);
  28. return null;
  29. }

2.2、 Collections工具类同步HashMap和ArrayList

方法类代码块同步锁,并发性能差

  1. List<String> syncList = Collections.synchronizedList(new ArrayList<String>());
  2. Map<String,String> hashMap = Collections.synchronizedMap(new HashMap<String,String>());
  3. List:
  4. public E get(int index) {
  5. synchronized (mutex) {return list.get(index);}
  6. }
  7. public E set(int index, E element) {
  8. synchronized (mutex) {return list.set(index, element);}
  9. }
  10. public void add(int index, E element) {
  11. synchronized (mutex) {list.add(index, element);}
  12. }
  13. public E remove(int index) {
  14. synchronized (mutex) {return list.remove(index);}
  15. }
  16. Map:
  17. public boolean containsKey(Object key) {
  18. synchronized (mutex) {return m.containsKey(key);}
  19. }
  20. public boolean containsValue(Object value) {
  21. synchronized (mutex) {return m.containsValue(value);}
  22. }
  23. public V get(Object key) {
  24. synchronized (mutex) {return m.get(key);}
  25. }
  26. public V put(K key, V value) {
  27. synchronized (mutex) {return m.put(key, value);}
  28. }
  29. public V remove(Object key) {
  30. synchronized (mutex) {return m.remove(key);}
  31. }
  32. public void putAll(Map<? extends K, ? extends V> map) {
  33. synchronized (mutex) {m.putAll(map);}
  34. }
  35. public void clear() {
  36. synchronized (mutex) {m.clear();}
  37. }

2.3、ConcurrentHashMap 和 CopyOnWriteArrayList

取代同步的ArrayList和HashMap

绝大多数情况下,ConcurrentHashMap 和 CopyOnWriteArrayList 性能优于同步的ArrayList和HashMap(如果一个List经常被修改,那么同步的ArrayList性能优于CopyOnWriteArrayList ,CopyOnWriteArrayList 适合读多写少的场景,每次写入都会复制完整的列表,比较消耗资源)

3、ConcurrentHashMap

3.1、Map简介

image.png

3.2、 为什么HashMap是线程不安全的?

  • 同时put碰撞导致数据丢失
  • 同时put扩容导致数据丢失
  • 死循环造成cpu100%(jdk7之前) 扩容的时候链表死循环

3.3、ConcurrentHashMap 结构

3.3.1、Jdk1.7中的ConcurrentHashMap 结构

image.png

jdk1.7中的ConcurrentHashMap 最外层是多个segment,每个segment的底层数据结构于HashMap类似,仍然是数组和链表组成的拉链法。
每个segment设置了独立的ReentrantLock锁,每个segment之间互不影响,提高了并发效率
ConcurrentHashMap 默认有16个segment,所以最多可以支持16个线程并发写(操作分布在不同的segment上。默认值可以在初始化的时候设置,但是一旦初始化后,无法扩容

3.3.2、Jdk1.8中的ConcurrentHashMap 结构

image.png

  • 第一种是最简单的,空着的位置代表当前还没有元素来填充。
  • 第二种就是和 HashMap 非常类似的拉链法结构,在每一个槽中会首先填入第一个节点,但是后续如果计算出相同的 Hash 值,就用链表的形式往后进行延伸。链表长度大于某一个阈值(默认为 8),且同时满足一定的容量要求的时候,ConcurrentHashMap 便会把这个链表从链表的形式转化为红黑树的形式,目的是进一步提高它的查找性能。
  • 第三种结构就是红黑树结构
  1. final V putVal(K key, V value, boolean onlyIfAbsent){
  2. if (key == null || value == null) throw new NullPointerException();
  3. int hash = spread(key.hashCode());
  4. int binCount = 0;
  5. for (Node<K,V>[] tab = table;;) {
  6. Node<K,V> f; int n, i, fh;
  7. if (tab == null || (n = tab.length) == 0)
  8. tab = initTable();
  9. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  10. if (casTabAt(tab, i, null,
  11. new Node<K,V>(hash, key, value, null)))
  12. break; // no lock when adding to empty bin
  13. }
  14. else if ((fh = f.hash) == MOVED)
  15. tab = helpTransfer(tab, f);
  16. else {
  17. V oldVal = null;
  18. synchronized (f) {
  19. if (tabAt(tab, i) == f) {
  20. if (fh >= 0) {
  21. binCount = 1;
  22. for (Node<K,V> e = f;; ++binCount) {
  23. K ek;
  24. if (e.hash == hash &&
  25. ((ek = e.key) == key ||
  26. (ek != null && key.equals(ek)))) {
  27. oldVal = e.val;
  28. if (!onlyIfAbsent)
  29. e.val = value;
  30. break;
  31. }
  32. Node<K,V> pred = e;
  33. if ((e = e.next) == null) {
  34. pred.next = new Node<K,V>(hash, key,
  35. value, null);
  36. break;
  37. }
  38. }
  39. }
  40. else if (f instanceof TreeBin) {
  41. Node<K,V> p;
  42. binCount = 2;
  43. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  44. value)) != null) {
  45. oldVal = p.val;
  46. if (!onlyIfAbsent)
  47. p.val = value;
  48. }
  49. }
  50. }
  51. }
  52. if (binCount != 0) {
  53. if (binCount >= TREEIFY_THRESHOLD)
  54. treeifyBin(tab, i);
  55. if (oldVal != null)
  56. return oldVal;
  57. break;
  58. }
  59. }
  60. }
  61. addCount(1L, binCount);
  62. return null;
  63. }
  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {
  6. if ((eh = e.hash) == h) {
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  8. return e.val;
  9. }
  10. else if (eh < 0)
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) {
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }

3.3.3、为什么要把1.7的结构变为1.8的结构?

数据结构 : Java 7 采用 Segment 分段锁来实现,而 Java 8 中的 ConcurrentHashMap 使用数组 + 链表 + 红黑树
hash碰撞: 先使用拉链法,再转红黑树
保证并发安全:ava 7 采用 Segment 分段锁,而 Java 8采用CAS
查询复杂度:链表查询复杂度为O(n),但是java8中一旦转为红黑树,就变为O(logn)

为什么超过8转红黑树?
默认链表,占用内存更少,红黑树占用
达到冲突8次概率只有千万分之一,转红黑树可以确保极端情况下的查询效率

3.3.4、错误操作ConcurrentHashMap 不保证线程安全

读和写两个独立操作组合到一起就不是线程安全的,可以采用cas实现

  1. public class ConcurrenthashMapDemo {
  2. private static ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap();
  3. public static void main(String[] args) throws InterruptedException {
  4. int threadCount = 50;
  5. long start = System.currentTimeMillis();
  6. map.put("count", 0);
  7. Runnable r = () -> {
  8. IntStream.range(0, 100000).forEach(e -> {
  9. //可以实现但是并发效率低
  10. synchronized (ConcurrenthashMapDemo.class){
  11. Integer count = map.get("count");
  12. count ++;
  13. map.put("count",count);
  14. }
  15. });
  16. };
  17. Thread[] threads = new Thread[threadCount];
  18. IntStream.range(0, threadCount).forEach(e -> {
  19. threads[e] = new Thread(r);
  20. });
  21. IntStream.range(0, threadCount).forEach(e -> {
  22. threads[e].start();
  23. try {
  24. threads[e].join();
  25. } catch (InterruptedException interruptedException) {
  26. interruptedException.printStackTrace();
  27. }
  28. });
  29. long end = System.currentTimeMillis();
  30. System.out.println("syncnized方式 count结果:" + map.get("count") + " 耗时:" + (end - start));
  31. long start2 = System.currentTimeMillis();
  32. map.put("count", 0);
  33. Runnable r2 = () -> {
  34. IntStream.range(0, 100000).forEach(e -> {
  35. //并发效率高
  36. while(true){
  37. Integer count = map.get("count");
  38. Integer newCount = count + 1;
  39. boolean result = map.replace("count", count, newCount);
  40. if(result){
  41. break;
  42. }
  43. }
  44. });
  45. };
  46. Thread[] otherThreads = new Thread[threadCount];
  47. IntStream.range(0, threadCount).forEach(e -> {
  48. otherThreads[e] = new Thread(r2);
  49. });
  50. IntStream.range(0, threadCount).forEach(e -> {
  51. otherThreads[e].start();
  52. try {
  53. otherThreads[e].join();
  54. } catch (InterruptedException interruptedException) {
  55. interruptedException.printStackTrace();
  56. }
  57. });
  58. long end2 = System.currentTimeMillis();
  59. System.out.println("CAS方式 count结果:" + map.get("count") + " 耗时:" + (end2 - start2));
  60. }
  61. }

4、CopyOnWriteArrayList

4.1、作用与适合场景

CopyOnWrite 的意思是说,当容器需要被修改的时候,不直接修改当前容器,而是先将当前容器进行 Copy,复制出一个新的容器,然后修改新的容器,完成修改之后,再将原容器的引用指向新的容器。
CopyOnWriteArrayList 的所有修改操作(add,set等)都是通过创建底层数组的新副本来实现的,所以 CopyOnWrite 容器也是一种读写分离的思想体现,读和写使用不同的容器。
作用

代替Vector和SynchronizedList,Vector和SynchronizedList锁粒度太大,并发效率比较低,并且迭代时无法编辑。

适合场景

读操作尽可能的块,而写操作慢一些也没有太大关系。

例如:黑名单

4.2、读写规则

读取完全不用加锁,写入不会阻塞读取操作,只有写入和写入之间需要同步等待。

4.3、实例

ArrayList迭代时无法编辑,CopyOnWriteArrayList迭代时可以修改,但是修改对迭代没有影响,还是会迭代修改之前的数据

  1. public class CopyOnWriteArrayListDemo {
  2. public static void main(String[] args) {
  3. //异常ConcurrentModificationException 不能再迭代时修改
  4. // List<String> list = new ArrayList<>();
  5. List<String> list = new CopyOnWriteArrayList<>();
  6. IntStream.range(1,6).forEach(e -> list.add(e + ""));
  7. Iterator<String> iterator = list.iterator();
  8. while (iterator.hasNext()) {
  9. String next = iterator.next();
  10. if("2".equals(next)){
  11. list.remove("5");
  12. }
  13. if("3".equals(next)){
  14. list.add("6");
  15. }
  16. System.out.println("list: " + list);
  17. System.out.println(next);
  18. }
  19. }
  20. }

ArrayList输出

  1. list: [1, 2, 3, 4, 5]
  2. 1
  3. list: [1, 2, 3, 4]
  4. 2
  5. Exception in thread "main" java.util.ConcurrentModificationException
  6. at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
  7. at java.util.ArrayList$Itr.next(ArrayList.java:859)
  8. at com.imooc.thread_demo.collection.CopyOnWriteArrayListDemo.main(CopyOnWriteArrayListDemo.java:26)

CopyOnWriteArrayList输出

  1. list: [1, 2, 3, 4, 5]
  2. 1
  3. list: [1, 2, 3, 4]
  4. 2
  5. list: [1, 2, 3, 4, 6]
  6. 3
  7. list: [1, 2, 3, 4, 6]
  8. 4
  9. list: [1, 2, 3, 4, 6]
  10. 5

4.4、缺点

数据一致性问题:只能保证数据最终一致,不能保证数据实时一致。
内存占用问题:CopyOnWriteArrayList时创建副本实现写,内存占用大

4.5、源码分析

数据结构

  1. /** The lock protecting all mutators */
  2. final transient ReentrantLock lock = new ReentrantLock();
  3. /** The array, accessed only via getArray/setArray. */
  4. private transient volatile Object[] array;
  1. public boolean add(E e) {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. Object[] elements = getArray();
  6. int len = elements.length;
  7. Object[] newElements = Arrays.copyOf(elements, len + 1);
  8. newElements[len] = e;
  9. setArray(newElements);
  10. return true;
  11. } finally {
  12. lock.unlock();
  13. }
  14. }
  15. public E get(int index) {
  16. return get(getArray(), index);
  17. }

5、并发队列

image.png

5.1、阻塞队列

从队列取数据,队列里无数据,则阻塞,往队列放数据,如果队列已满,那么就无法继续插入,则阻塞

5.1.1、阻塞队列的常用方法


  1. 抛出异常:add、remove、element
    • add 方法是往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满
    • remove 方法的作用是删除元素,如果我们删除的队列是空的,那么 remove 方法就会抛出异常。
    • element 方法是返回队列的头部节点,但是并不删除,如果队列是空的,抛出异常。
  2. 返回结果但不抛出异常:offer、poll、peek
    • offer 方法用来插入一个元素,并用返回值来提示插入是否成功,如果添加成功会返回 true,如果队列满了,无法插入,返回false
    • 移除并返回队列的头节点,如果队列是空的,返回 null 作为提示
    • 返回队列的头元素但并不删除,如果队列里面是空的,它便会返回 null 作为提示

3.阻塞:put、take

  • put 队列已满,那么就无法继续插入,则阻塞
  • take 从队列取数据,队列里无数据,则阻塞


5.1.2、ArrayBlockingQueue

有界
指定容量
公平:可以指定是否需要保证公平,如果想保证公平,等待最长的线程会被优先处理,不过同时会带来一定的性能损耗

5.1.3、LinkedBlockingQueue

无界
容量 Integer.MAX_VALUE
内部结构:node,两把锁

5.1.4、PriorityBlockingQueue

支持优先级的无界阻塞队列

  1. public static void main(String[] args) {
  2. PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue(10,
  3. Comparator.comparingInt(o -> ((Student)o).getAge()));
  4. priorityBlockingQueue.add(new Student(13,"张三"));
  5. priorityBlockingQueue.add(new Student(17,"李四"));
  6. priorityBlockingQueue.add(new Student(12,"王五"));
  7. Student take = null;
  8. do{
  9. take = (Student)priorityBlockingQueue.poll();
  10. if(take != null){
  11. System.out.println(take.getName());
  12. }
  13. }while(take != null);
  14. }

5.1.5、SynchronousQueue

SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取

5.1.6、DelayQueue

延迟队列,根据延迟时间排序

5.2、非阻塞并发队列


ConcurrentLinkedQueue

非阻塞队列 ConcurrentLinkedQueue 使用 CAS 非阻塞算法 + 不停重试,来实现线程安全,适合用在不需要阻塞功能,且并发不是特别剧烈的场景