问题导读

    1.java常用集合类有哪些并发对象?
    2.LinkedBlockingQueue原理是怎样的?
    3.怎样用多个线程同时操作并且遍历queue?



    Java并发容器大纲
    我将JUC包中的集合类划分为3部分来进行说明。在简单的了解JUC包中集合类的框架之后,后面的章节再逐步对各个类进行介绍。

    List和Set

    JUC(java.util.concurrent)集合包中的List和Set实现类包括: CopyOnWriteArrayList CopyOnWriteArraySet ConcurrentSkipListSet ConcurrentSkipListSet稍后在说明Map时再说明, CopyOnWriteArrayList和CopyOnWriteArraySet的框架如下图所示:
    image.png
    CopyOnWriteArrayList相当于线程安全的ArrayList,它实现了List接口。CopyOnWriteArrayList是支持高并发的。 CopyOnWriteArraySet相当于线程安全的HashSet,它继承于AbstractSet类。

    CopyOnWriteArraySet内部包含一个CopyOnWriteArrayList对象,它是通过CopyOnWriteArrayList实现的。

    Map

    JUC集合包中Map的实现类包括: ConcurrentHashMap和ConcurrentSkipListMap。它们的框架如下图所示:
    image.png

    • ConcurrentHashMap是线程安全的哈希表(相当于线程安全的HashMap);它继承于AbstractMap类,并且实现ConcurrentMap接口。ConcurrentHashMap是通过“锁分段”来实现的,它支持并发。

    • ConcurrentSkipListMap是线程安全的有序的哈希表(相当于线程安全的TreeMap); 它继承于AbstractMap类,并且实现ConcurrentNavigableMap接口。ConcurrentSkipListMap是通过“跳表”来实现的,它支持并发。

    • ConcurrentSkipListSet是线程安全的有序的集合(相当于线程安全的TreeSet);它继承于AbstractSet,并实现了NavigableSet接口。ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的,它也支持并发。

    Queue

    JUC集合包中Queue的实现类包括: ArrayBlockingQueue, LinkedBlockingQueue, LinkedBlockingDeque, ConcurrentLinkedQueue和ConcurrentLinkedDeque。它们的框架如下图所示:
    image.png

    • ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。

    • LinkedBlockingQueue是单向链表实现的(指定大小)阻塞队列,该队列按 FIFO(先进先出)排序元素。

    • LinkedBlockingDeque是双向链表实现的(指定大小)双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式。

    • ConcurrentLinkedQueue是单向链表实现的无界队列,该队列按 FIFO(先进先出)排序元素。

    • ConcurrentLinkedDeque是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。

    LinkedBlockingQueue介绍

    LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

    此外,LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

    LinkedBlockingQueue原理和数据结构

    LinkedBlockingQueue的数据结构,如下图所示:
    image.png
    说明:

    • LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
    • LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
    • LinkedBlockingQueue是通过单链表实现的: (01) head是链表的表头。取出数据时,都是从表头head处插入。 (02) last是链表的表尾。新增数据时,都是从表尾last处插入。(03) count是链表的实际大小,即当前链表中包含的节点个数。(04) capacity是列表的容量,它是在创建链表时指定的。 (05) putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。 LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。 此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。

    若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。

    若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。

    LinkedBlockingQueue函数列表

    1. // 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。
    2. LinkedBlockingQueue()
    3. // 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
    4. LinkedBlockingQueue(Collection<? extends E> c)
    5. // 创建一个具有给定(固定)容量的 LinkedBlockingQueue。
    6. LinkedBlockingQueue(int capacity)
    7. // 从队列彻底移除所有元素。
    8. void clear()
    9. // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
    10. int drainTo(Collection<? super E> c)
    11. // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
    12. int drainTo(Collection<? super E> c, int maxElements)
    13. // 返回在队列中的元素上按适当顺序进行迭代的迭代器。
    14. Iterator<E> iterator()
    15. // 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
    16. boolean offer(E e)
    17. // 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
    18. boolean offer(E e, long timeout, TimeUnit unit)
    19. // 获取但不移除此队列的头;如果此队列为空,则返回 null。
    20. E peek()
    21. // 获取并移除此队列的头,如果此队列为空,则返回 null。
    22. E poll()
    23. // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
    24. E poll(long timeout, TimeUnit unit)
    25. // 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
    26. void put(E e)
    27. // 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
    28. int remainingCapacity()
    29. // 从此队列移除指定元素的单个实例(如果存在)。
    30. boolean remove(Object o)
    31. // 返回队列中的元素个数。
    32. int size()
    33. // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
    34. E take()
    35. // 返回按适当顺序包含此队列中所有元素的数组。
    36. Object[] toArray()
    37. // 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
    38. <T> T[] toArray(T[] a)
    39. // 返回此 collection 的字符串表示形式。
    40. String toString()

    LinkedBlockingQueue源码分析

    下面从LinkedBlockingQueue的创建,添加,删除,遍历这几个方面对它进行分析。

    1. 创建

    下面以LinkedBlockingQueue(int capacity)来进行说明。

    1. public LinkedBlockingQueue(int capacity) {
    2. if (capacity <= 0) throw new IllegalArgumentException();
    3. this.capacity = capacity;
    4. last = head = new Node<E>(null);
    5. }

    说明: (01) capacity是“链式阻塞队列”的容量。 (02) head和last是“链式阻塞队列”的首节点和尾节点。它们在LinkedBlockingQueue中的声明如下:

    1. // 容量
    2. private final int capacity;
    3. // 当前数量
    4. private final AtomicInteger count = new AtomicInteger(0);
    5. private transient Node<E> head; // 链表的表头
    6. private transient Node<E> last; // 链表的表尾
    7. // 用于控制“删除元素”的互斥锁takeLock 和 锁对应的“非空条件”notEmpty
    8. private final ReentrantLock takeLock = new ReentrantLock();
    9. private final Condition notEmpty = takeLock.newCondition();
    10. // 用于控制“添加元素”的互斥锁putLock 和 锁对应的“非满条件”notFull
    11. private final ReentrantLock putLock = new ReentrantLock();
    12. private final Condition notFull = putLock.newCondition();

    链表的节点定义如下:

    1. static class Node<E> {
    2. E item; // 数据
    3. Node<E> next; // 下一个节点的指针
    4. Node(E x) { item = x; }
    5. }

    2. 添加

    下面以offer(E e)为例,对LinkedBlockingQueue的添加方法进行说明。

    1. public boolean offer(E e) {
    2. if (e == null) throw new NullPointerException();
    3. // 如果“队列已满”,则返回false,表示插入失败。
    4. final AtomicInteger count = this.count;
    5. if (count.get() == capacity)
    6. return false;
    7. int c = -1;
    8. // 新建“节点e”
    9. Node<E> node = new Node(e);
    10. final ReentrantLock putLock = this.putLock;
    11. // 获取“插入锁putLock”
    12. putLock.lock();
    13. try {
    14. // 再次对“队列是不是满”的进行判断。
    15. // 若“队列未满”,则插入节点。
    16. if (count.get() < capacity) {
    17. // 插入节点
    18. enqueue(node);
    19. // 将“当前节点数量”+1,并返回“原始的数量”
    20. c = count.getAndIncrement();
    21. // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。
    22. if (c + 1 < capacity)
    23. notFull.signal();
    24. }
    25. } finally {
    26. // 释放“插入锁putLock”
    27. putLock.unlock();
    28. }
    29. // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程
    30. if (c == 0)
    31. signalNotEmpty();
    32. return c >= 0;
    33. }

    说明:offer()的作用很简单,就是将元素E添加到队列的末尾。 enqueue()的源码如下:

    1. private void enqueue(Node<E> node) {
    2. // assert putLock.isHeldByCurrentThread();
    3. // assert last.next == null;
    4. last = last.next = node;
    5. }

    enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点! signalNotEmpty()的源码如下:

    1. private void signalNotEmpty() {
    2. final ReentrantLock takeLock = this.takeLock;
    3. takeLock.lock();
    4. try {
    5. notEmpty.signal();
    6. } finally {
    7. takeLock.unlock();
    8. }
    9. }

    signalNotEmpty()的作用是唤醒notEmpty上的等待线程。

    3. 取出

    下面以take()为例,对LinkedBlockingQueue的取出方法进行说明。

    1. public E take() throws InterruptedException {
    2. E x;
    3. int c = -1;
    4. final AtomicInteger count = this.count;
    5. final ReentrantLock takeLock = this.takeLock;
    6. // 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常
    7. takeLock.lockInterruptibly();
    8. try {
    9. // 若“队列为空”,则一直等待。
    10. while (count.get() == 0) {
    11. notEmpty.await();
    12. }
    13. // 取出元素
    14. x = dequeue();
    15. // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
    16. c = count.getAndDecrement();
    17. if (c > 1)
    18. notEmpty.signal();
    19. } finally {
    20. // 释放“取出锁”
    21. takeLock.unlock();
    22. }
    23. // 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。
    24. if (c == capacity)
    25. signalNotFull();
    26. return x;
    27. }

    说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。 dequeue()的源码如下:

    1. private E dequeue() {
    2. // assert takeLock.isHeldByCurrentThread();
    3. // assert head.item == null;
    4. Node<E> h = head;
    5. Node<E> first = h.next;
    6. h.next = h; // help GC
    7. head = first;
    8. E x = first.item;
    9. first.item = null;
    10. return x;
    11. }

    dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。 signalNotFull()的源码如下:

    1. private void signalNotFull() {
    2. final ReentrantLock putLock = this.putLock;
    3. putLock.lock();
    4. try {
    5. notFull.signal();
    6. } finally {
    7. putLock.unlock();
    8. }
    9. }

    signalNotFull()的作用就是唤醒notFull上的等待线程。

    4. 遍历

    下面对LinkedBlockingQueue的遍历方法进行说明。

    1. public Iterator<E> iterator() {
    2. return new Itr();
    3. }

    iterator()实际上是返回一个Iter对象。 Itr类的定义如下:

    1. private class Itr implements Iterator<E> {
    2. // 当前节点
    3. private Node<E> current;
    4. // 上一次返回的节点
    5. private Node<E> lastRet;
    6. // 当前节点对应的值
    7. private E currentElement;
    8. Itr() {
    9. // 同时获取“插入锁putLock” 和 “取出锁takeLock”
    10. fullyLock();
    11. try {
    12. // 设置“当前元素”为“队列表头的下一节点”,即为队列的第一个有效节点
    13. current = head.next;
    14. if (current != null)
    15. currentElement = current.item;
    16. } finally {
    17. // 释放“插入锁putLock” 和 “取出锁takeLock”
    18. fullyUnlock();
    19. }
    20. }
    21. // 返回“下一个节点是否为null”
    22. public boolean hasNext() {
    23. return current != null;
    24. }
    25. private Node<E> nextNode(Node<E> p) {
    26. for (;;) {
    27. Node<E> s = p.next;
    28. if (s == p)
    29. return head.next;
    30. if (s == null || s.item != null)
    31. return s;
    32. p = s;
    33. }
    34. }
    35. // 返回下一个节点
    36. public E next() {
    37. fullyLock();
    38. try {
    39. if (current == null)
    40. throw new NoSuchElementException();
    41. E x = currentElement;
    42. lastRet = current;
    43. current = nextNode(current);
    44. currentElement = (current == null) ? null : current.item;
    45. return x;
    46. } finally {
    47. fullyUnlock();
    48. }
    49. }
    50. // 删除下一个节点
    51. public void remove() {
    52. if (lastRet == null)
    53. throw new IllegalStateException();
    54. fullyLock();
    55. try {
    56. Node<E> node = lastRet;
    57. lastRet = null;
    58. for (Node<E> trail = head, p = trail.next;
    59. p != null;
    60. trail = p, p = p.next) {
    61. if (p == node) {
    62. unlink(p, trail);
    63. break;
    64. }
    65. }
    66. } finally {
    67. fullyUnlock();
    68. }
    69. }
    70. }

    LinkedBlockingQueue示例

    1. import java.util.*;
    2. import java.util.concurrent.*;
    3. /*
    4. * LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。
    5. *
    6. * 下面是“多个线程同时操作并且遍历queue”的示例
    7. * (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。
    8. * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
    9. *
    10. */
    11. public class LinkedBlockingQueueDemo1 {
    12. // TODO: queue是LinkedList对象时,程序会出错。
    13. //private static Queue<String> queue = new LinkedList<String>();
    14. private static Queue<String> queue = new LinkedBlockingQueue<String>();
    15. public static void main(String[] args) {
    16. // 同时启动两个线程对queue进行操作!
    17. new MyThread("ta").start();
    18. new MyThread("tb").start();
    19. }
    20. private static void printAll() {
    21. String value;
    22. Iterator iter = queue.iterator();
    23. while(iter.hasNext()) {
    24. value = (String)iter.next();
    25. System.out.print(value+", ");
    26. }
    27. System.out.println();
    28. }
    29. private static class MyThread extends Thread {
    30. MyThread(String name) {
    31. super(name);
    32. }
    33. @Override
    34. public void run() {
    35. int i = 0;
    36. while (i++ < 6) {
    37. // “线程名” + "-" + "序号"
    38. String val = Thread.currentThread().getName()+i;
    39. queue.add(val);
    40. // 通过“Iterator”遍历queue。
    41. printAll();
    42. }
    43. }
    44. }
    45. }

    其中一次运行结果:

    1. tb1, ta1,
    2. tb1, ta1, ta2,
    3. tb1, ta1, ta2, ta3,
    4. tb1, ta1, ta2, ta3, ta4,
    5. tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5,
    6. ta4, tb1, ta5, ta1, ta6,
    7. ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2,
    8. ta5, ta6, tb2,
    9. tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3,
    10. tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4,
    11. tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5,
    12. tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,

    结果说明: 示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingQueue进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingQueue中;接着,遍历并输出LinkedBlockingQueue中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。 当queue是LinkedBlockingQueue对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。