ConcurrentHashMap的实现原理和使用
为什么使用ConcurrentHashMap:
- 线程不安全的HashMap
- 效率低下的HashTable
- ConcurrentHashMap的锁分段技术可有效提升并发访问率
ConcurrentHashMap的初始化
java1.7和java1.8之间的变化

源码分析:
/* ---------------- Constants -------------- *//*** 最大可能的表容量。该值必须正好为 1<<30 以保持在Java数组分配和索引两个表大小的幂的范围内,* 并且进一步要求因为 32 位散列字段的前两位用于控制目的。*/private static final int MAXIMUM_CAPACITY = 1 << 30;/*** 默认初始表容量。必须是 2 * 的幂(即至少为 1)且最多为 MAXIMUM_CAPACITY。*/private static final int DEFAULT_CAPACITY = 16;/*** 最大可能(非二的幂)数组大小。* toArray 和相关方法需要。*/static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/*** 此表的默认并发级别。* 未使用但为与此类的先前版本兼容而定义.*/private static final int DEFAULT_CONCURRENCY_LEVEL = 16;/*** 此表的负载因子。构造函数中此值的覆盖仅影响初始表容量。* 通常不使用实际浮点值 - 使用 {@code n - (n >>> 2)} 等表达式更简单相关的调整大小阈值。*/private static final float LOAD_FACTOR = 0.75f;/*** bin 使用树而不是列表的 bin 计数阈值。将元素添加到具有至少这么多节点的bin时,* bin 将转换为树。该值必须大于2,并且应该至少为 8,* 以便与树删除中关于在收缩时转换回普通 bin 的假设相吻合。*/static final int TREEIFY_THRESHOLD = 8;/*** 调整大小操作期间 untreeifying(拆分)bin 的 bin 计数阈值。* 应小于 TREEIFY_THRESHOLD,并且最多为 6 以在移除时使用收缩检测进行网格化。*/static final int UNTREEIFY_THRESHOLD = 6;/*** 可对其进行树化的 bin 的最小表容量。(否则,如果 bin 中有太多节点,则调整表的大小。)* 该值应至少为 4 * TREEIFY_THRESHOLD 以避免调整大小和树化阈值之间的冲突。*/static final int MIN_TREEIFY_CAPACITY = 64;/*** 每个传输步骤的最小重组次数。范围细分以允许多个调整大小线程。此值用作下限,* 以避免调整大小时遇到过多的内存争用。该值应至少为DEFAULT_CAPACITY。*/private static final int MIN_TRANSFER_STRIDE = 16;/*** sizeCtl 中用于生成标记的位数。 对于 32 位数组,必须至少为 6。*/private static int RESIZE_STAMP_BITS = 16;/*** 可以帮助调整大小的最大线程数。必须适合 32 - RESIZE_STAMP_BITS 位。*/private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;/*** 在 sizeCtl 中记录大小标记的位移。*/private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;/** 节点哈希字段的编码。参见上面的解释。*/static final int MOVED = -1; // 转发节点的哈希static final int TREEBIN = -2; // 树根的哈希static final int RESERVED = -3; // 临时预订的哈希static final int HASH_BITS = 0x7fffffff; // 正常节点哈希的可用位/** CPU 的数量,以限制某些大小 */static final int NCPU = Runtime.getRuntime().availableProcessors();/** 为了序列化兼容性。 */private static final ObjectStreamField[] serialPersistentFields = {new ObjectStreamField("segments", Segment[].class),new ObjectStreamField("segmentMask", Integer.TYPE),new ObjectStreamField("segmentShift", Integer.TYPE)};/*** bin 数组。在第一次插入时延迟初始化。大小始终是 2 的幂。由迭代器直接访问。*/transient volatile Node<K,V>[] table;/*** 下一个要使用的表;仅在调整大小时为非空。*/private transient volatile Node<K,V>[] nextTable;/*** 基本计数器值,主要在没有争用时使用,但也作为表初始化比赛期间的后备。通过 CAS 更新。*/private transient volatile long baseCount;/*** 表初始化和调整大小控制。如果为负数,则表正在初始化或调整大小:-1 用于初始化,* else -(1 + 活动调整大小线程的数量)。否则,当 table 为 null 时,保存创建时要使用的初始表大小* 或者默认为 0。初始化后,保存下一个元素计数值,根据该值调整表大小。*/private transient volatile int sizeCtl;/*** 调整大小时要拆分的下一个表索引(加一个)。*/private transient volatile int transferIndex;/*** 调整大小和/或创建 CounterCell 时使用自旋锁(通过 CAS 锁定)。*/private transient volatile int cellsBusy;/*** 计数单元表。当非空时,大小是 2 的幂。*/private transient volatile CounterCell[] counterCells;// viewsprivate transient KeySetView<K,V> keySet;private transient ValuesView<K,V> values;private transient EntrySetView<K,V> entrySet;
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;// 当 table 数组为空的时候while ((tab = table) == null || tab.length == 0) {// 当 sizeCtl < 0 时表示被其他线程抢占了初始化的操作if ((sc = sizeCtl) < 0)// 让出自己的 CPU 时间片Thread.yield(); // lost initialization race; just spin// 通过 CAS 操作把 sizeCtl 设置成 -1,标识当前线程抢到了初始化的资格else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {// 默认的初始化容量为 DEFAULT_CAPACITY = 16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")// 创建一个长度为 16 的 Node 节点数组,// 或在创建 ConcurrentHashMap 时传入长度Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 将创建好的数组赋值给 tabletable = tab = nt;// 计算下次扩容的大小,也就是当前容量的 0.75 倍,// 比如现在容量是 16 * 0.75 = 12sc = n - (n >>> 2);}} finally {// 设置 sizeCtl 为 sc,如果默认是容量 16 的话,现在就是 12sizeCtl = sc;}break;}}return tab;// 初始化数组,只是初始化一个合适大小的数组,// 其中的 sizeCtl 变量是在 Node 数组初始化或者扩容时的一个控制位标识,// 负数代表正在进行初始化或者扩容操作, 当 sizeCtl = 0 时标识这Node数组还未被初始化,// 正数代表初始化或者下一次扩容的大小;// 当 sizeCtl = -1 时,表示正在初始化// 当 sizeCtl = -N 时,表示有 N - 1 个线程正在进行扩容操作,}
/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 计算 hash 值,高低 16 位异或运算int hash = spread(key.hashCode());// 用来记录链表的长度int binCount = 0;// 当出现锁竞争时就不断地做自旋操作for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 当数组为空时if (tab == null || (n = tab.length) == 0)// 初始化数组tab = initTable();// 通过 hash 值对应的数组的下标得到第一个节点,// 以 volatile 读的方式来读取 table 数组中的元素// 保证每次拿到的数据都是最新的else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 如果该下标返回的节点为空,直接通过 CAS 方式将新的值封装成 Node 插入// 如果 CAS 失败,说明出现锁竞争,进入下一次循环。if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}// 如果对应的节点存在,判断该节点的 hash 是否是等于 MOVED(-1)// 如果相等,说明当前节点是 ForwardingNode,意味着有其他线程正在扩容else if ((fh = f.hash) == MOVED)// 当前直接帮助它进行扩容tab = helpTransfer(tab, f);else {// 进入到这里,说明 f 是当前 node 数组对应位置节点的头结点,并且不为空V oldVal = null;// 给对应的头结点加锁synchronized (f) {// 再次判断对应下标的位置是否是 f 节点if (tabAt(tab, i) == f) {// 当头结点的 hash 值大于 0 时,说明是链表if (fh >= 0) {// 用来记录链表的长度binCount = 1;// 遍历链表for (Node<K,V> e = f;; ++binCount) {K ek;// 如果发现相同的 key 的 hash 值,就判断是否需要进行值的覆盖if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;// 默认情况下,新值换旧值if (!onlyIfAbsent)e.val = value;break;}// 一直遍历到链表的末端Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 如果当前的头节点是红黑树else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;// 调用红黑树的插入方法插入新的值if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {// 如果值已存在,新值换旧值oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 当链表的长度不等于 0 的时候,说明上面在做链表的操作if (binCount != 0) {// 如果链表的长度已经达到了阈值 8 就需要尝试把链表转换为红黑树if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);// 如果 value 被替换,返回旧值给调用者if (oldVal != null)return oldVal;break;}}}// 将当前的 ConcurrentHashMap 的元素数量加 1,有可能触发 transfer() 方法操作扩容addCount(1L, binCount);return null;}
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}● 该方法主要是获取对象中 offset 偏移量地址对应的对象 field 值,类似于 tab[i] 获取。出于性能的考虑,通过 Unsafe 类对 table 进行操作。● 上面不使用 tab[i] 计算的原因○ 使用 Unsafe#getObjectVolatile() 方法获取对象,在该方法中获取的对象是 volatile 关键字修饰的。表示 volatile 写操作 happens-before volatile 读操作,因此其他线程对 table 的修改都会 对 get 可见。○ 虽然 table 数组本身也是增加了 volatile 属性,但是 volatile 的数组只是针对数组的引用基于 volatile 语义,而不是其元素。所以当有其他线程对该数组的元素进行写操作时,那么当前线程读的时候并不一定获取到的是最新值
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 判断此时是否还正在执行扩容,当 nextTab == null 时,表示扩容结束if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 生成扩容戳int rs = resizeStamp(tab.length);// 说明扩容还没完成的情况下不断地循环来尝试将当前线程加入到扩容操作中while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {// sc >>> RESIZE_STAMP_SHIFT) != rs 表示如果在同一轮扩容中,// sc 右移比较高低位和 rs 的值// 如果不相等,扩容就结束。// sc == rs + 1 表示扩容结束// sc == rs + MAX_RESIZERS 表示扩容线程数是否达到最大扩容数量// transferIndex <= 0 表示所有的 Node 都已经分配了线程if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;// 在低 16 位上增加扩容线程数if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {transfer(tab, nextTab);break;}}return nextTab;}// 返回新的数组return table;}
/*** 将每个 bin 中的节点移动和/或复制到新表**/private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 将 (n >>> 3 等价于 n / 8) 然后除以 CPU 的核心数,当得到的结果小于 16 时,就使用 16// 在这里的目的是为了让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象。// 当桶比较少时就默认使用一个 CPU 处理 16 个桶,也就是说长度只有 16 的时候,// 扩容时只会有一个线程来扩容if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// nextTab 未初始化,使用来扩容 Node 数组的if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")// 新建一个 n << 1 原始 table 大小的 nextTab,也就是 32Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];// 将新建好的 Node 数组赋值给 nextTabnextTab = nt;} catch (Throwable ex) { // try to cope with OOME// 扩容失败,sizeCtl 使用 int 最大值sizeCtl = Integer.MAX_VALUE;return;}// 更新成员变量nextTable = nextTab;// 更新转移下标,表示转移时的下标transferIndex = n;}// 新的 tab 长度,例如 32int nextn = nextTab.length;// 创建一个 fwd 节点,表示一个正在被迁移的 Node,并且它的 hash 值为 MOVED(-1),// 在最开始 put 方法里有一个判断 MOVED 的逻辑。其作用就是用来占位,// 表示原始数组中的位置 i 处的节点// 完成迁移之后会在 i 位置设置一个 fwd 来告诉其他线程这个位置已经处理过了ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 首次推进为 true,如果等于 true,说明需要再次推进下一个目标(i--)// 如果是 false 的情况下,就不能推进下标,需要将当前的小标处理完毕才能继续推进。boolean advance = true;// 判断是否已经扩容完成,完成就 return 退出循环boolean finishing = false; // to ensure sweep before committing nextTab// 通过循环处理每个槽位中的链表元素, 默认 advance = true,// 通过 CAS 方式设置 TRANSFERINDEX 属性值// 并初始化 i 和 bound 值,i 指当前处理的槽位序号,bound 指需要处理的槽位边界,// 先处理槽位 15 的节点for (int i = 0, bound = 0;;) {// 这个循环不断地使用 CAS 尝试为当前线程分配任务,// 直到分配成功或者任务队列已经被全部分配完毕// 如果当前线程已经被分配过 bucket 区域,那么会通过 --i 指向下一个待处理 bucket,// 然后退出循环。Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;// --i 表示下一个待处理的 bucket,当 --i >= bound 时,// 表示当前线程已经分配过 bucket 区域if (--i >= bound || finishing)advance = false;// 表示所有 bucket 已经被分配完毕else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 通过 CAS 修改 TRANSFERINDEX 属性的值,为当前线程分配任务// 处理的节点区间为 (nextBound, nextIndex)---> (0, 15)else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}// 当 i < 0 表示已经遍历完旧的数组,// 也就是说当前线程已经处理完所有负责的 bucketif (i < 0 || i >= n || i + n >= nextn) {int sc;//如果完成了扩容if (finishing) {// 删除成员变量nextTable = null;// 更新 table 数组table = nextTab;// 更新阈值 32 * 0.75 = 24sizeCtl = (n << 1) - (n >>> 1);return;}// sizeCtl 在迁移之前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2// 然后每增加一个线程参与迁移就会将 sizeCtl + 1,// 在这里通过 CAS 的方式对 sizeCtl 的低 16 位// 进行减一操作,说明做完了属于自己的任务if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 第一个扩容的线程在执行 transfer() 方法之前会设置// 扩容之前会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2// 后续帮助该线程扩容的线程执行 transfer() 方法之前会设置 sizeCtl + 1// 每一个退出 transfer() 方法的线程,退出之前会设置 sizeCtl - 1// 当最后一个线程退出时,sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT + 2 ,也就是// (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT// 当 sc - 2 不等于标识符左移 16 位,如果相等就表示没有线程在帮助它们扩容,// 扩容结束if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 如果相等,扩容结束,更新 finishing 变量为 truefinishing = advance = true;// 再次循环检查整张表i = n; // recheck before commit}}// 如果位置 i 是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode 空节点else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 这里表示已经完成了数据的迁移,如果 ThreadA 已经处理过该节点,// 那么 ThreadB 处理这个节点时// hash 值为 MOVEDelse if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// ---------------------开始进行数据迁移-----------------------// 对数组该节点进行加锁,开始处理数组该位置的迁移工作synchronized (f) {// 再次做校验if (tabAt(tab, i) == f) {// ln 表示低位,hn 表示高位,接下来把链表拆分成 低位 -- 0 和高位 -- 1//-------------------------可以看后面图---------------------Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;// 遍历当前 bucket 的链表,目的是为了尽量重用 Node 链表尾部的一部分for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}// 如果最后更新的 runBit 是 0 就设置低位节点if (runBit == 0) {ln = lastRun;hn = null;}else {// 设置为高位节点hn = lastRun;ln = null;}// 构建高低位链表for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 将低位的链表放在 i 位置,不用动setTabAt(nextTab, i, ln);// 将高位链表放在 i + n 位置setTabAt(nextTab, i + n, hn);// 把旧 table 数组的 hash 桶放置在 fwd 节点上,表示该 hash 桶已经被处理setTabAt(tab, i, fwd);advance = true;}// -------------------红黑树扩容--------------------------------else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
ConcurrentLinkedQueue
在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,该算法在Michael&Scott算法上进行了一些修改。
ConcurrentLinkedQueue的入队列操作:
- 入队列就是将入队节点添加到队列的尾部,整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试
- 定位尾节点:tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点。尾节点可能是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。
- 设置入队节点为尾节点 : p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。
- HOPS的设计意图 :让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑清晰和易懂。但是, 这么做有个缺点,每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次 数,就能提高入队的效率,所以doug lea使用hops变量来控制并减少tail节点的更新频率,并不 是每次节点入队后都将tail节点更新成尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长,使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。
ConcurrentLinkedQueue的出队列操作:首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
Java的阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
在阻塞队列不可用时,这两个附加操作提供了4种处理方式:
- 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(”Queue full”)异常。当队列空时,从队列里获取元素会抛NoSuchElementException异常。
- 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
- 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
JDK 7提供了7个阻塞队列:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。 (FIFO)
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。(FIFO)长度Integer.MAX_VALUE
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 可以自定义类实现compareTo()方法来指定元素排序规则,需要注意的是不能保证同优先级元素的顺序。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。支持延时获取元素,必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。 只有在延迟期满时才能从队列中取出元素
- SynchronousQueue:一个不存储元素的阻塞队列。 每一个put操作必须等待一个take操作, 否则不能继续添加元素
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列的实现原理 :使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生
产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。
Fork/join框架
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。那么,为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如A线程负责处理A队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
