Collections提供了一系列的方法来个给容器加锁,返回一个新的容器:
image.png

同步类容器的状态都是串行化的,他们虽然实现了线程安全,但是严重的降低了并发性,在多线程环境中,严重降低了程序的吞吐量。而并发类容器是专门针对并发设计的,比如使用 ConcurrentHashMap 代替基于散列的 HashMap ;使用 CopyOnWriteArrayList 代替 Voctor ,并发的 CopyOnWriteArraySet ,以及并发的 Queue 等等。

1. ConcurrentMap

Concurrentmap 接口下有俩个重要的实现

  • Concurrenthashmap
  • ConcurrentSkipListMap(支持并发排序功能,弥补 ConcurrentHashMap)

ConcurrentHashMap 内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的 Hashtable,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment。只要多个修改操作发生在不同的段上,它们就可以并发进行。把一个整体分成了 16 个段(Segment)。也就是最高支持 16 个线程的并发修改操作,这也是在多线程场景时减小锁的粒度从而降低锁竞争的一种方案。并且代码中大多共享变量使用 volatile 关键字声明,目的是第一时间获取修改的内容,性能非常好。

:ConcurrentSkipListMap 相当于 TreeMap(可以排序);

image.png

  1. ConcurrentHashMap<String, Object> chm = new ConcurrentHashMap<String, Object>();
  2. chm.put("k1", "v1");
  3. chm.put("k2", "v2");
  4. chm.put("k3", "v3");
  5. chm.putIfAbsent("k3", "v4"); // 如果k3存在,则不加入value
  6. for (Map.Entry<String, Object> me : chm.entrySet()) {
  7. System.out.println("key:" + me.getKey() + ",value:" + me.getValue());
  8. }

Java 8 引入了红黑树:
image.png

2. Copy-On-Write 容器

简称 COW,是一种程序设计中的优化策略。

Coplon Write 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器,原容器进行垃圾回收。这样做的好处是我们可以对 Copywrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 Copyonwrite 容器也是一种读写分离的思想,读和写不同的容器。

  • JDK 里的 COW 容器有两种: CopyonwritearrayListCopyonwritearraySet
  • 适用场景:读多写少。(原因是copy容器的时候耗时)

3. Queue

在并发队列上 JDK 提供了两套实现,无论哪种都继承自 Queue。

  1. ConcurrentLinkedQueue 为代表的高性能队列
  2. BlockingQueue 接口为代表的阻塞队列

image.png

3.1 ConcurrentLinkedQueue

ConcurrentLinkedQueue :是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常 concurrentLinkedQueue 性能好于 BlockingQueue 。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许 null 元素。

ConcurrentLinkedQueue 重要方法:

  • 加入元素: add()offer() 在 Concurrentlinkedqueue 中,这俩个方法没有任何区别)
  • 取元素: poll()peek() ,两者都是取头元素节点,区别在于前者会除元素,后者不会。
  1. //高性能无阻塞无界队
  2. Concurrentlinkedqueue <String> q =new Concurrentlinkedqueuestring> ();
  3. q.add("a");
  4. q.offer("b");
  5. q.offer("c");
  6. q.offer("d");
  7. q.offer("e");
  8. system.out.println(q.poll()); // a 从头部出元素,并把元素从队列删除
  9. System.out.println(q.size()); // 4
  10. System.out.println(q.peek()); // b
  11. System.out.println(q.size()); // 4

3.2 BlockingQueue 接口

3.2.1 ArrayBlockingQueue

基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用。

  1. ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
  2. array.add("a");
  3. array.add("b");
  4. array.add("c");
  5. array.put("d");
  6. array.offer("e");
  7. System.out.println(array.offer("f", 2, TimeUnit.SECONDS)); // 2秒之内能加入返回true,否则返回false

3.2.2 LinkedblockingQueue

基于链表的阻塞队列,同 ArayBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成), Linkedblockingqueue 之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。他是一个无界队列

  1. LinkedBlockingQueue<String> arr = new LinkedBlockingQueue<>(); // 可以加数字参数定容也可以不加
  2. arr.add("a");
  3. arr.add("b");
  4. arr.add("c");
  5. arr.offer("d");
  6. List<String> list = new ArrayList<String>();
  7. System.out.println(arr.drainTo(list, 3)); // 把arr中的前三个值放入list
  8. System.out.println(list.size());
  9. for (String string : list) {
  10. System.out.println(string);
  11. }

3.2.3 SynchronousQueue

一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。此队列不能添加元素。

  1. final SynchronousQueue queue = new SynchronousQueue();
  2. new Thread(() -> {
  3. try {
  4. System.out.println(queue.take());
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. }
  8. }).start();
  9. new Thread(() -> {
  10. queue.add("zcq");
  11. }).start();

注意:此队列的add方法并不是不能用,但是必须现有take,再有add。元素并不是加到SynchronizeQueue队列里(没有容积),而是直接扔给了take取值的线程。
**

3.2.4 PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Computor 对象来决定,也就是说传入队列的对象必须实现 Comparable 接口),在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列

public class UsePriorityBlockingQueue {
    public static void main(String[] args) throws InterruptedException {
    PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<Task>();
    Task t1 = new Task(3, "id3");
    Task t2 = new Task(1, "id1");
    Task t3 = new Task(2, "id2");
    queue.add(t1);
    queue.add(t2);
    queue.add(t3);

    System.out.println(queue);
    System.out.println(queue.take().getName());
    System.out.println(queue);
    System.out.println(queue.take().getName());
    }
}


@Data
class Task implements Comparable<Task> {
    private int id;
    private String name;

    public Task(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    @Override
    public int compareTo(Task task) {
        return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
    }
}

PriorityBlockingQueuetake() 方法执行的时候才进行排序(选择排序法)。

3.2.5 DelayQueue

带有延迟时间的 Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。Delayqueue 中的元素必须实现 Delayed 接口,DelayQueue,是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。

例子:网民上网,按照不同时间依次下机

  1. 创建网民对象
@Data
public class WangMin implements Delayed {
    private String name;
    private String id;
    private long endTime;
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    public WangMin(String name, String id, long endTime) {
        this.name = name;
        this.id = id;
        this.endTime = endTime;
    }

    /**
     * 判断是否到了截止时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return endTime - System.currentTimeMillis();
    }

    /**
     * 相互比较排序
     * @return
     */
    @Override
    public int compareTo(Delayed delayed) {
        WangMin w = (WangMin) delayed;
        return this.getDelay(timeUnit) - w.getDelay(timeUnit) > 0 ? 1 : 0;
    }
}
  1. 创建网吧对象

    public class WangBa implements Runnable {
     private DelayQueue<WangMin> queue = new DelayQueue();
     private boolean yingye = true;
    
     public void shangji(String name, String id, int money) {
         WangMin man = new WangMin(name, id, 1000 * money + System.currentTimeMillis());
         System.out.println("网民:" + name + ";身份证:" + id + ",交了" + money + "元钱。开始上机。。。");
         queue.add(man);
     }
    
     public void xiaji(WangMin man) {
         System.out.println("网民:" + man.getName() + ";身份证:" + man.getId() + ",时间到了下机。。。");
     }
    
     @Override
     public void run() {
         while (yingye) {
             try {
                 WangMin man = queue.take();
                 xiaji(man);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
     }
    }
    
  2. main 方法

System.out.println("网吧开始营业 =======");
WangBa wangba = new WangBa();
Thread shangwang = new Thread(wangba);
shangwang.start();

wangba.shangji("路人甲", "123", 1);
wangba.shangji("路人乙", "1234", 10);
wangba.shangji("路人丙", "12345", 5);
  1. 结果

    网吧开始营业 ======= 网民:路人甲;身份证:123,交了1元钱。开始上机。。。 网民:路人乙;身份证:1234,交了10元钱。开始上机。。。 网民:路人丙;身份证:12345,交了5元钱。开始上机。。。 网民:路人甲;身份证:123,时间到了下机。。。 网民:路人丙;身份证:12345,时间到了下机。。。 网民:路人乙;身份证:1234,时间到了下机。。。