一、ConcurrentHashMap

1.1 Java7 中的实现

ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock(可重入锁)。不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。

  • 数据结构图示

image.png

  • 成员变量
  1. //定义的常量
  2. //初始时默认容量
  3. static final int DEFAULT_INITIAL_CAPACITY = 16;
  4. //负载因子
  5. static final float DEFAULT_LOAD_FACTOR = 0.75f;
  6. //默认的并发等级,
  7. static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  8. //最大容量
  9. static final int MAXIMUM_CAPACITY = 1 << 30;
  10. //最小每个Segment持有table数量,必须是2的倍数
  11. static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
  12. //Segment 数组最大容量 65536
  13. static final int MAX_SEGMENTS = 1 << 16;
  14. //不加锁进行检索的数量
  15. static final int RETRIES_BEFORE_LOCK = 2;
  16. //Segment 数组, 数组中的每个元素都持有HashEntry 桶
  17. final Segment<K,V>[] segments;
  18. transient Set<K> keySet;
  19. transient Set<Map.Entry<K,V>> entrySet;
  20. transient Collection<V> values;
  • Segment 的源码实现
  1. static final class Segment<K,V> extends ReentrantLock implements Serializable {
  2. private static final long serialVersionUID = 2249069246763182397L;
  3. static final int MAX_SCAN_RETRIES =
  4. Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
  5. //存放数据的hash 桶
  6. transient volatile HashEntry<K,V>[] table;
  7. transient int count;
  8. transient int modCount;
  9. transient int threshold;
  10. final float loadFactor;
  11. Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
  12. this.loadFactor = lf;
  13. this.threshold = threshold;
  14. this.table = tab;
  15. }
  16. }
  • Entry 实现

    1. static final class HashEntry<K,V> {
    2. final int hash; //hahs值
    3. final K key; //键
    4. volatile V value; //值
    5. volatile HashEntry<K,V> next;  //后继指针
    6.   HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
    7. this.hash = hash;
    8. this.key = key;
    9. this.value = value;
    10. this.next = next;
    11. }
    12. }
  • 构造函数

    1. public ConcurrentHashMap(int initialCapacity,
    2. float loadFactor, int concurrencyLevel) {
    3. if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
    4. throw new IllegalArgumentException();
    5. if (concurrencyLevel > MAX_SEGMENTS)
    6. concurrencyLevel = MAX_SEGMENTS;
    7. // Find power-of-two sizes best matching arguments
    8. int sshift = 0; //sshift等于ssize从1向左移位的次数
    9. int ssize = 1; //Segment 数组的大小
    10. //为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方
    11. //(power-of-two size),所以必须计算出一个大于或等于concurrencyLevel的最小的2的N次方值
    12. //来作为segments数组的长度。
    13. while (ssize < concurrencyLevel) {
    14. ++sshift;
    15. ssize <<= 1;
    16. }
    17. //segmentShift用于定位参与散列运算的位数
    18. this.segmentShift = 32 - sshift;
    19. //segmentMask是散列运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1
    20. this.segmentMask = ssize - 1;
    21. if (initialCapacity > MAXIMUM_CAPACITY)
    22. initialCapacity = MAXIMUM_CAPACITY;
    23. int c = initialCapacity / ssize;
    24. if (c * ssize < initialCapacity)
    25. ++c;
    26. int cap = MIN_SEGMENT_TABLE_CAPACITY;
    27. while (cap < c)
    28. cap <<= 1;
    29. //创建 Segment,并放入Segment数组
    30. Segment<K,V> s0 =
    31. new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
    32. (HashEntry<K,V>[])new HashEntry[cap]);
    33. Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    34. UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    35. this.segments = ss;
    36. }
  • get 方法

    1. public V get(Object key) {
    2. Segment<K,V> s; // manually integrate access methods to reduce overhead
    3. HashEntry<K,V>[] tab;
    4. //对key 进行散列,得到hash值
    5. int h = hash(key);
    6. //计算出key 所在的segments数组下标
    7. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    8. if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
    9. (tab = s.table) != null) {
    10. //遍历桶中的元素,找到key对应的的元素
    11. for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
    12. (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    13. e != null; e = e.next) {
    14. K k;
    15. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
    16. return e.value;
    17. }
    18. }
    19. return null;
    20. }

get操作的高效之处在于整个get过程不需要加锁

  • put 方法了解
  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. //对key 进行散列
  6. int hash = hash(key);
  7. //计算存放到哪个Segment
  8. int j = (hash >>> segmentShift) & segmentMask;
  9. if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  10. (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
  11. s = ensureSegment(j);
  12. return s.put(key, hash, value, false);
  13. }
  1. //如果不存在,创建Segment,并返回
  2. private Segment<K,V> ensureSegment(int k) {
  3. final Segment<K,V>[] ss = this.segments;
  4. long u = (k << SSHIFT) + SBASE; // raw offset
  5. Segment<K,V> seg;
  6. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  7. Segment<K,V> proto = ss[0]; // use segment 0 as prototype
  8. int cap = proto.table.length;
  9. float lf = proto.loadFactor;
  10. int threshold = (int)(cap * lf);
  11. HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
  12. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  13. == null) { // recheck
  14. Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
  15. while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  16. == null) {
  17. if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
  18. break;
  19. }
  20. }
  21. }
  22. return seg;
  23. }

找到对应的Segment,执行put 方法

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  2. HashEntry<K,V> node = tryLock() ? null : //1
  3. scanAndLockForPut(key, hash, value); //2
  4. V oldValue;
  5. try {
  6. HashEntry<K,V>[] tab = table;
  7. int index = (tab.length - 1) & hash;
  8. HashEntry<K,V> first = entryAt(tab, index); //3
  9. for (HashEntry<K,V> e = first;;) {
  10. if (e != null) {
  11. K k;
  12. if ((k = e.key) == key ||
  13. (e.hash == hash && key.equals(k))) {//4
  14. oldValue = e.value;
  15. if (!onlyIfAbsent) {
  16. e.value = value;
  17. ++modCount;
  18. }
  19. break;
  20. }
  21. e = e.next;
  22. }
  23. else {//5
  24. if (node != null)
  25. node.setNext(first);
  26. else
  27. node = new HashEntry<K,V>(hash, key, value, first);
  28. int c = count + 1;
  29. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  30. rehash(node);
  31. else
  32. setEntryAt(tab, index, node);
  33. ++modCount;
  34. count = c;
  35. oldValue = null;
  36. break;
  37. }
  38. }
  39. } finally {
  40. unlock(); //6
  41. }
  42. return oldValue;
  43. }
  1. 首先第一步的时候会尝试获取锁: tryLock()
  2. 如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。
  3. 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。
  4. 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
  5. 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
  6. 最后会解除在 1 中所获取当前 Segment 的锁。
  • scanAndLockForPut方法
    1. private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    2. HashEntry<K,V> first = entryForHash(this, hash);
    3. HashEntry<K,V> e = first;
    4. HashEntry<K,V> node = null;
    5. int retries = -1; // negative while locating node
    6. while (!tryLock()) { //1
    7. HashEntry<K,V> f; // to recheck first below
    8. if (retries < 0) {
    9. if (e == null) {
    10. if (node == null) // speculatively create node
    11. node = new HashEntry<K,V>(hash, key, value, null);
    12. retries = 0;
    13. }
    14. else if (key.equals(e.key))
    15. retries = 0;
    16. else
    17. e = e.next;
    18. }
    19. else if (++retries > MAX_SCAN_RETRIES) {//2
    20. lock();
    21. break;
    22. }
    23. else if ((retries & 1) == 0 &&
    24. (f = entryForHash(this, hash)) != first) {
    25. e = first = f; // re-traverse if entry changed
    26. retries = -1;
    27. }
    28. }
    29. return node;
    30. }
  1. 尝试自旋获取锁。
  2. 如果重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。
  • rehash方法
  1. private void rehash(HashEntry<K,V> node) {
  2. HashEntry<K,V>[] oldTable = table;
  3. int oldCapacity = oldTable.length;
  4. int newCapacity = oldCapacity << 1; //1
  5. threshold = (int)(newCapacity * loadFactor);
  6. HashEntry<K,V>[] newTable =
  7. (HashEntry<K,V>[]) new HashEntry[newCapacity];
  8. int sizeMask = newCapacity - 1;
  9. for (int i = 0; i < oldCapacity ; i++) {
  10. HashEntry<K,V> e = oldTable[i];
  11. if (e != null) {
  12. HashEntry<K,V> next = e.next;
  13. int idx = e.hash & sizeMask;
  14. if (next == null) // Single node on list //2
  15. newTable[idx] = e;
  16. else { // Reuse consecutive sequence at same slot //3
  17. HashEntry<K,V> lastRun = e;
  18. int lastIdx = idx;
  19. for (HashEntry<K,V> last = next;
  20. last != null;
  21. last = last.next) {
  22. int k = last.hash & sizeMask;
  23. if (k != lastIdx) {
  24. lastIdx = k;
  25. lastRun = last;
  26. }
  27. }
  28. newTable[lastIdx] = lastRun;
  29. // Clone remaining nodes
  30. for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
  31. V v = p.value;
  32. int h = p.hash;
  33. int k = h & sizeMask;
  34. HashEntry<K,V> n = newTable[k];
  35. newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
  36. }
  37. }
  38. }
  39. }
  40. int nodeIndex = node.hash & sizeMask; // add the new node
  41. node.setNext(newTable[nodeIndex]);
  42. newTable[nodeIndex] = node;
  43. table = newTable;
  44. }
  1. 计算新的容量为旧容量的2倍
  2. 遍历旧HashEntry桶,如果当前HashEntry只用一个节点,直接放到新的HashEntry桶中
  3. 如果当前HashEntry是链表,则遍历链表,重新计算下标放到新的HashEntry桶中

1.2 Java8 中的实现

  • 数据结构图示

抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。结构上也引入了红黑树,防止查询效率退化为O(N)
image.png

  • Node类与Java7 HashEntry类似

    1. static class Node<K,V> implements Map.Entry<K,V> {
    2. final int hash;
    3. final K key;
    4. volatile V val; //volatile保证可见性
    5. volatile Node<K,V> next;
    6. Node(int hash, K key, V val, Node<K,V> next) {
    7. this.hash = hash;
    8. this.key = key;
    9. this.val = val;
    10. this.next = next;
    11. }
    12. }
  • get方法

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {
  6. //根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  7. if ((eh = e.hash) == h) {
  8. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  9. return e.val;
  10. }
  11. else if (eh < 0)//如果是红黑树那就按照树的方式获取值。
  12. return (p = e.find(h, key)) != null ? p.val : null;
  13. while ((e = e.next) != null) { 就不满足那就按照链表的方式遍历获取值。
  14. if (e.hash == h &&
  15. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  16. return e.val;
  17. }
  18. }
  19. return null;
  20. }
  • put 方法
    1. final V putVal(K key, V value, boolean onlyIfAbsent) {
    2. if (key == null || value == null) throw new NullPointerException();
    3. int hash = spread(key.hashCode());
    4. int binCount = 0;
    5. for (Node<K,V>[] tab = table;;) {
    6. Node<K,V> f; int n, i, fh;
    7. //如果桶为空,初始化
    8. if (tab == null || (n = tab.length) == 0)
    9. tab = initTable();
    10. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    11. //采用CAS无锁put入新的元素,成功返回
    12. //失败自旋
    13. if (casTabAt(tab, i, null,
    14. new Node<K,V>(hash, key, value, null)))
    15. break; // no lock when adding to empty bin
    16. }
    17. //如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
    18. else if ((fh = f.hash) == MOVED)
    19. tab = helpTransfer(tab, f);
    20. else {
    21. //如果都不满足,则利用 synchronized 锁写入数据。
    22. V oldVal = null;
    23. synchronized (f) {
    24. if (tabAt(tab, i) == f) {
    25. if (fh >= 0) {
    26. binCount = 1;
    27. for (Node<K,V> e = f;; ++binCount) {
    28. K ek;
    29. if (e.hash == hash &&
    30. ((ek = e.key) == key ||
    31. (ek != null && key.equals(ek)))) {
    32. oldVal = e.val;
    33. if (!onlyIfAbsent)
    34. e.val = value;
    35. break;
    36. }
    37. Node<K,V> pred = e;
    38. if ((e = e.next) == null) {
    39. pred.next = new Node<K,V>(hash, key,
    40. value, null);
    41. break;
    42. }
    43. }
    44. }
    45. else if (f instanceof TreeBin) {
    46. Node<K,V> p;
    47. binCount = 2;
    48. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    49. value)) != null) {
    50. oldVal = p.val;
    51. if (!onlyIfAbsent)
    52. p.val = value;
    53. }
    54. }
    55. }
    56. }
    57. if (binCount != 0) {
    58. //如果达到需要转换为红黑树的阀值 TREEIFY_THRESHOLD = 8
    59. if (binCount >= TREEIFY_THRESHOLD)
    60. treeifyBin(tab, i);//将链表转换为红黑树
    61. if (oldVal != null)
    62. return oldVal;
    63. break;
    64. }
    65. }
    66. }
    67. addCount(1L, binCount);
    68. return null;
    69. }

二、 CopyOnWriteArrayList

基于Java8 了解源码实现
原理: 采用读写分离的思想实现并发访问, 而且保证读读之间在任何时候都不会被阻塞。
缺点:

  • 内存占用问题
  • 数据一致性问题

适合场景: 用于读多写少的并发场景

2.1 内部成员

  1. //可重入锁
  2. final transient ReentrantLock lock = new ReentrantLock();
  3. //数组 volatile:保证可见性, 
  4. private transient volatile Object[] array;
  5. //弱一致性的迭代器
  6. static final class COWIterator<E> implements ListIterator<E>
  7. // 反射机制 Unsafe类
  8. private static final sun.misc.Unsafe UNSAFE;
  9. // lock域的内存偏移量
  10. private static final long lockOffset;
  11. static {
  12. try {
  13. //实例化Unsafe类
  14. UNSAFE = sun.misc.Unsafe.getUnsafe();
  15. Class<?> k = CopyOnWriteArrayList.class;
  16. //锁的偏移量
  17. lockOffset = UNSAFE.objectFieldOffset
  18. (k.getDeclaredField("lock"));
  19. } catch (Exception e) {
  20. throw new Error(e);
  21. }
  22. }

2.2 构造方法

  1. public CopyOnWriteArrayList() {
  2. setArray(new Object[0]); //初始化长度为0 的数组
  3. }
  4. public CopyOnWriteArrayList(Collection<? extends E> c) {
  5. Object[] elements;
  6. if (c.getClass() == CopyOnWriteArrayList.class)
  7. elements = ((CopyOnWriteArrayList<?>)c).getArray();
  8. else {
  9. elements = c.toArray();
  10. // c.toArray might (incorrectly) not return Object[] (see 6260652)
  11. if (elements.getClass() != Object[].class)
  12. elements = Arrays.copyOf(elements, elements.length, Object[].class);
  13. }
  14. setArray(elements);
  15. }
  16. public CopyOnWriteArrayList(E[] toCopyIn) {
  17. setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
  18. }

2.3 get方法

  1. //volatile 修饰数组引用,保证可见性
  2. private transient volatile Object[] array;
  3. final Object[] getArray() {
  4. return array;
  5. }
  6. public E get(int index) {
  7. return get(getArray(), index);
  8. }
  9. @SuppressWarnings("unchecked")
  10. private E get(Object[] a, int index) {
  11. return (E) a[index]; //通过下标获取数组元素, 没有做任何加锁操作
  12. }

2.4 add 方法

  1. public boolean add(E e) {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock(); //获取锁
  4. try {
  5. //获取原数组
  6. Object[] elements = getArray();
  7. int len = elements.length;
  8. //拷贝原数组到新的新数组
  9. Object[] newElements = Arrays.copyOf(elements, len + 1);
  10. //操作新的数组
  11. newElements[len] = e;
  12. //将旧数组引用指向新的数组
  13. setArray(newElements);
  14. return true;
  15. } finally {
  16. lock.unlock(); //释放锁
  17. }
  18. }

三、阻塞队列BlockingQueue

原理:采用等待通知机制实现, 底层采用可重入锁和Condition实现

3.1 JDK 中的阻塞队列实现

  • ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue 一个支持优先级排序的无界优先级阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界延迟阻塞队列。
  • SynchronousQueue 一个不存储元素的阻塞队列。
  • LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。

3.2 对ArrayBlockingQueue 进行分析(Java8)

3.2.1 成员变量
  1. //队列容器数组
  2. final Object[] items;
  3. /** items index for next take, poll, peek or remove */
  4. int takeIndex;
  5. /** items index for next put, offer, or add */
  6. int putIndex;
  7. //队列元素个数
  8. int count;
  9. //可重入锁
  10. final ReentrantLock lock;
  11. //队列条件锁(队列挂起出队列线程)
  12. private final Condition notEmpty;
  13. //队列条件锁(队列挂起入队列线程)
  14. private final Condition notFull;

3.2.2 构造函数
  1. //capacity 容量
  2. //fair 是否公平访问队列:()
  3. //true 在插入和删除操作中会阻塞线程,按照FIFO的顺序执行访问,
  4. //false 线程访问顺序不确定
  5. public ArrayBlockingQueue(int capacity, boolean fair) {
  6. if (capacity <= 0)
  7. throw new IllegalArgumentException();
  8. this.items = new Object[capacity];
  9. lock = new ReentrantLock(fair);
  10. notEmpty = lock.newCondition();
  11. notFull = lock.newCondition();
  12. }
  13. public ArrayBlockingQueue(int capacity, boolean fair,
  14. Collection<? extends E> c) {
  15. this(capacity, fair);
  16. final ReentrantLock lock = this.lock;
  17. lock.lock(); // Lock only for visibility, not mutual exclusion
  18. try {
  19. int i = 0;
  20. try {
  21. for (E e : c) {
  22. checkNotNull(e);
  23. items[i++] = e;
  24. }
  25. } catch (ArrayIndexOutOfBoundsException ex) {
  26. throw new IllegalArgumentException();
  27. }
  28. count = i;
  29. putIndex = (i == capacity) ? 0 : i;
  30. } finally {
  31. lock.unlock();
  32. }
  33. }

3.2.3 入队列
  1. //入队列,队列已满返回false
  2. public boolean offer(E e) {
  3. checkNotNull(e); //检查元素是否为空
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();//获取锁
  6. try {
  7. if (count == items.length) //如果队列已满,返回false
  8. return false;
  9. else {
  10. enqueue(e);//添加元素到队列尾部
  11. return true;
  12. }
  13. } finally {
  14. lock.unlock();//释放锁
  15. }
  16. }
  17. //入队列,已满的挂起线程等待
  18. public void put(E e) throws InterruptedException {
  19. checkNotNull(e);
  20. final ReentrantLock lock = this.lock;
  21. lock.lockInterruptibly(); //获取锁,可打断
  22. try {
  23. while (count == items.length) //如果队列已满,挂起线程,释放锁
  24. notFull.await();
  25. enqueue(e);//入队列
  26. } finally {
  27. lock.unlock();//释放锁
  28. }
  29. }
  30. //入队列,可设置超时
  31. public boolean offer(E e, long timeout, TimeUnit unit)
  32. throws InterruptedException {
  33. checkNotNull(e);
  34. long nanos = unit.toNanos(timeout);//超时时间
  35. final ReentrantLock lock = this.lock;
  36. lock.lockInterruptibly();//获取锁,可打断
  37. try {
  38. while (count == items.length) {
  39. if (nanos <= 0)
  40. return false;
  41. nanos = notFull.awaitNanos(nanos);等待超时挂起线程, 释放锁
  42. }
  43. enqueue(e);
  44. return true;
  45. } finally {
  46. lock.unlock();
  47. }
  48. }
  49. //入队列
  50. private void enqueue(E x) {
  51. // assert lock.getHoldCount() == 1;
  52. // assert items[putIndex] == null;
  53. final Object[] items = this.items;
  54. items[putIndex] = x;
  55. if (++putIndex == items.length)
  56. putIndex = 0;
  57. count++;
  58. notEmpty.signal(); //唤醒takes线程
  59. }

3.2.4 出队列
  1. //出队列
  2. public E poll() {
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. return (count == 0) ? null : dequeue();
  7. } finally {
  8. lock.unlock();
  9. }
  10. }
  11. public E take() throws InterruptedException {
  12. final ReentrantLock lock = this.lock;
  13. lock.lockInterruptibly();
  14. try {
  15. while (count == 0)
  16. notEmpty.await(); //队列为空,挂起线程
  17. return dequeue();
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
  22. //超时出队列
  23. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  24. long nanos = unit.toNanos(timeout);
  25. final ReentrantLock lock = this.lock;
  26. lock.lockInterruptibly();
  27. try {
  28. while (count == 0) {
  29. if (nanos <= 0)
  30. return null;
  31. nanos = notEmpty.awaitNanos(nanos);//队列为空,超时挂起线程
  32. }
  33. return dequeue();
  34. } finally {
  35. lock.unlock();
  36. }
  37. }
  38. //出队列具体逻辑
  39. private E dequeue() {
  40. // assert lock.getHoldCount() == 1;
  41. // assert items[takeIndex] != null;
  42. final Object[] items = this.items;
  43. @SuppressWarnings("unchecked")
  44. E x = (E) items[takeIndex];//获取元素
  45. items[takeIndex] = null; //置空,帮助GC
  46. if (++takeIndex == items.length)
  47. takeIndex = 0;
  48. count--;
  49. if (itrs != null)
  50. itrs.elementDequeued(); //通知迭代器更新状态
  51. notFull.signal();//唤醒put线程
  52. return x;
  53. }

四、高效读写队列 ConcurrentLinkedQueue(未完成)

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,该算法在Michael&Scott算法上进行了一些修改。
注意:

  • 该队列不支持存储空值

4.1 成员变量

  1. //头节点
  2. private transient volatile Node<E> head;
  3. //尾节点
  4. private transient volatile Node<E> tail;
  5. //节点类
  6. private static class Node<E> {
  7. volatile E item;
  8. volatile Node<E> next;
  9. // Unsafe mechanics
  10. private static final sun.misc.Unsafe UNSAFE;
  11. private static final long itemOffset;
  12. private static final long nextOffset;
  13. static {
  14. try {
  15. UNSAFE = sun.misc.Unsafe.getUnsafe();
  16. Class<?> k = Node.class;
  17. itemOffset = UNSAFE.objectFieldOffset
  18. (k.getDeclaredField("item"));
  19. nextOffset = UNSAFE.objectFieldOffset
  20. (k.getDeclaredField("next"));
  21. } catch (Exception e) {
  22. throw new Error(e);
  23. }
  24. }
  25. }

4.2 入队列 无锁实现

  1. public boolean offer(E e) {
  2. checkNotNull(e);
  3. final Node<E> newNode = new Node<E>(e);
  4. for (Node<E> t = tail, p = t;;) {//从尾节点插入
  5. Node<E> q = p.next;
  6. if (q == null) { //如果p节点的后继指针为空, 则p为队列的尾节点
  7. // p is last node
  8. if (p.casNext(null, newNode)) {//把新节点加入队列的尾部
  9. // Successful CAS is the linearization point
  10. // for e to become an element of this queue,
  11. // and for newNode to become "live".
  12. if (p != t) // hop two nodes at a time
  13. casTail(t, newNode); // Failure is OK.//设置尾节点
  14. return true;
  15. }
  16. // Lost CAS race to another thread; re-read next
  17. }
  18. else if (p == q)
  19. // We have fallen off list. If tail is unchanged, it
  20. // will also be off-list, in which case we need to
  21. // jump to head, from which all live nodes are always
  22. // reachable. Else the new tail is a better bet.
  23. p = (t != (t = tail)) ? t : head;
  24. else
  25. // Check for tail updates after two hops.
  26. p = (p != t && t != (t = tail)) ? t : q;
  27. }
  28. }

4.3 出队列实现

  1. public E poll() {
  2. restartFromHead:
  3. for (;;) {
  4. for (Node<E> h = head, p = h, q;;) {
  5. E item = p.item;
  6. if (item != null && p.casItem(item, null)) {
  7. // Successful CAS is the linearization point
  8. // for item to be removed from this queue.
  9. if (p != h) // hop two nodes at a time
  10. updateHead(h, ((q = p.next) != null) ? q : p);
  11. return item;
  12. }
  13. else if ((q = p.next) == null) {
  14. updateHead(h, p);
  15. return null;
  16. }
  17. else if (p == q)
  18. continue restartFromHead;
  19. else
  20. p = q;
  21. }
  22. }
  23. }

五、随机数据结构:跳表(SkipList)

5.1 跳跃表了解

跳跃表(skiplist)是一种随机化的数据结构, 在 1989 年由 William Pugh 在论文《Skip lists: a probabilistic alternative to balanced trees》中提出, 跳跃表以有序的方式在层次化的链表中保存元素,搜索、插入和删除的时间复杂度为O(logN)

  • 图示(WiKi)

image.png

5.2 Java 实现简单的SkipList

  • 定义SkipList 的节点 ```java /**
  • 节点类 */ class Node { //数据域 int key; //forward 数组,用于保存不同层级的指针 Node[] forwards; //节点最大等级 int maxLevel = MAX_LEVEL; } ```
  • 使用随机数算法决定新增节点的高度 ```java /**
  • 定义最大层级 / private final static int MAX_LEVEL = 16; /*
  • 随机选择节点作为索引的概率, 这里取50% / private final static float P = 0.5f; /*
  • 随机等级算法 *
  • @return 返回随机层数 */ private int randomLevel() { int level = 1; while (Math.random() < P && level < MAX_LEVEL) {
    1. level++;
    } return level; } ```
  • 跳表的插入实现

基本思路:我们将会从跳表的最高等级开始比较当前节点(一般会定义一个头节点)的下一个节点的key 与将要插入的key

  1. 如果下一个节点的key 小于将要插入节点的key,继续在同一层等级中遍历下一个节点
  2. 如果下一个节点的key 大于将要插入节点的key, 保存当前节点i 到数组update[i] 中,向下移一个等级继续遍历
  • 插入图示(WIKI)

Skip_list_add_element-en.gif

  • Java 代码实现

5.3 并发容器 ConcurrentSkipListMap

参考