一、同步类容器

 为了方便编写出线程安全的程序,Java里面提供了一些线程安全类和并发工具,比如:同步容器、并发容器、阻塞队列、Synchronizer(比如CountDownLatch)。今天我们就来讨论下同步容器和并发类容器。

1. 为什么会出现同步容器?

  在Java的集合容器框架中,主要有四大类别:List、Set、Queue、Map。
  List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。
  注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和队列这三大类容器。
  像ArrayList、LinkedList都是实现了List接口,HashSet实现了Set接口,而Deque(双向队列,允许在队首、队尾进行入队和出队操作)继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList(实际上是双向链表)实现了了Deque接口。
  像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。
  如果有多个线程并发地访问这些容器时,就会出现问题。
  因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。
  所以,Java提供了同步容器供用户使用。

2. Java中的同步容器类

  在Java中,同步容器主要包括2类:
  1)Vector、Stack、HashTable
  2)Collections类中提供的静态工厂方法创建的类
  Vector矢量队列实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
  Stack栈也是一个同步容器,它的方法也用synchronized进行了同步,它实际上是继承于Vector类。
  HashTable实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有。
  Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:
JAVA的并发编程(五): 同步类容器和并发类容器 - 图1image.gif

3. 同步类容器的缺陷

  1. ** 首先**,从同步容器的具体实现源码可知,同步容器中的方法采用了synchronized进行了同步,显然这必然会影响到执行性能。举个例子,进行同样多的插入操作,Vector的耗时是ArrayList的两倍。另外,由于Vector中的add方法和get方法都进行了同步,因此,在有多个线程进行访问时,如果多个线程都只是进行读取操作,那么每个时刻就只能有一个线程进行读取,其他线程便只能等待,这些线程必须竞争同一把锁。<br /> **其次**,同步类容器并不是所有操作都是线程安全的。同样以Vectory为例,他只能包装单个方法的操作是同步的,但在长度发生变化时,很容易抛数组下标越界异常ArrayIndexOutOfBoundsException。举个例子:
  1. //线程1
  2. for(int i=0;i<vector.size();i++)
  3. vector.get(i);
  4. //线程2
  5. for(int i=0;i<vector.size();i++)
  6. vector.remove(i);

image.gif
假设vectory的大小为10,由于仅仅对vectory的方法做了同步,
那么在i=9时,线程1在获取size之后释放锁
线程2刚好也循环到了i=9的时候,它拿到锁,删除了index=9的数据并释放锁
那么线程1中的get方法拿到锁,要删除index=9的数据,然而这个时候数组最大的index为8,抛出异常。

二、并发类容器

  很明显,由于同步类容器的状态都是串行化的,虽然实现了线程安全,但是严重降低了并发性,在多线程环境时,会严重影响应用程序的吞吐量,并不能满足我们当今互联网时代高并发的需求。
在Java 1.5中给我们提供了多种并发容器来替代同步类容器从而改善性能,位于java.util.concurrent目录下。
使用ConcurrentHashMap来代替基于散列的传统HashTable,而且添加了一些常见符合操作的支持。
使用了CopyOnWriteArrayList代替了Voctor
并发的CopyOnWriteArraySet
并发队列ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能队列,后者是阻塞队列,还有更多其他的队列等。
相比同步类容器,并发类容器主要解决了两个问题:
(1) 根据具体场景进行设计,尽量避免synchronized,提供并发性。
(2) 定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。
下面我详细讲解下其中的几个类

1. ConcurrentHashMap

ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:
JAVA的并发编程(五): 同步类容器和并发类容器 - 图4image.gif
 从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。
具体细节分析大家有兴趣可以看:ConcurrentHashMap之实现细节

引申1:锁的粒度

上面说到这种策略就是很典型的通过减小锁的粒度来提升并发性能,也可以称之为锁分离。可能有人对锁的粒度这个词不是很了解,我在这里给大家做个形象的比喻。
为什么要加锁?加锁是为了防止不同的线程访问同一共享资源造成混乱 打个比方:人是不同的线程,卫生间是共享资源
你在上洗手间的时候肯定要把门锁上吧,这就是加锁,只要你在里面,这个卫生间就被锁了,只有你出来之后别人才能用。的的
什么是加锁粒度呢?所谓加锁粒度就是你要锁住的范围是多大。
比如你在家上卫生间,你只要锁住卫生间就可以了吧,不需要将整个家都锁起来不让家人进门吧,卫生间就是你的加锁粒度。
怎样才算合理的加锁粒度呢?
其实卫生间并不只是用来上厕所的,还可以洗澡,洗手。这里就涉及到优化加锁粒度的问题。
你在卫生间里洗澡,其实别人也可以同时去里面洗手,只要做到隔离起来就可以,如果马桶,浴缸,洗漱台都是隔开相对独立的,实际上卫生间可以同时给三个人使用,
当然三个人做的事儿不能一样。这样就细化了加锁粒度,你在洗澡的时候只要关上浴室的门,别人还是可以进去洗手的。如果当初设计卫生间的时候没有将不同的功能区域划分隔离开,就不能实现卫生间资源的最大化使用。这就是设计架构的重要性。

引申2:用ConcurrentHashMap实现本地缓存

基于ConcurrentHashMap的特性,我们可以用它来实现java本地缓存,来缓存类似黑名单,配置,城市列表,网站协议等等读多写少的数据,减少对数据库的访问。

  1. public final class Cache {
  2. /**
  3. * 预缓存信息
  4. */
  5. private static final Map<String, Object> CACHE_MAP = new ConcurrentHashMap<String, Object>();
  6. /**
  7. * 每个缓存生效时间12小时
  8. */
  9. public static final long CACHE_HOLD_TIME_12H = 12 * 60 * 60 * 1000L;
  10. /**
  11. * 每个缓存生效时间24小时
  12. */
  13. public static final long CACHE_HOLD_TIME_24H = 24 * 60 * 60 * 1000L;
  14. /**
  15. * 存放一个缓存对象,默认保存时间12小时
  16. * @param cacheName
  17. * @param obj
  18. */
  19. public static void put(String cacheName, Object obj) {
  20. put(cacheName, obj, CACHE_HOLD_TIME_12H);
  21. }
  22. /**
  23. * 存放一个缓存对象,保存时间为holdTime
  24. * @param cacheName
  25. * @param obj
  26. * @param holdTime
  27. */
  28. public static void put(String cacheName, Object obj, long holdTime) {
  29. CACHE_MAP.put(cacheName, obj);
  30. CACHE_MAP.put(cacheName + "_HoldTime", System.currentTimeMillis() + holdTime);//缓存失效时间
  31. }
  32. /**
  33. * 取出一个缓存对象
  34. * @param cacheName
  35. * @return
  36. */
  37. public static Object get(String cacheName) {
  38. if (checkCacheName(cacheName)) {
  39. return CACHE_MAP.get(cacheName);
  40. }
  41. return null;
  42. }
  43. /**
  44. * 删除所有缓存
  45. */
  46. public static void removeAll() {
  47. CACHE_MAP.clear();
  48. }
  49. /**
  50. * 删除某个缓存
  51. * @param cacheName
  52. */
  53. public static void remove(String cacheName) {
  54. CACHE_MAP.remove(cacheName);
  55. CACHE_MAP.remove(cacheName + "_HoldTime");
  56. }
  57. /**
  58. * 检查缓存对象是否存在,
  59. * 若不存在,则返回false
  60. * 若存在,检查其是否已过有效期,如果已经过了则删除该缓存并返回false
  61. * @param cacheName
  62. * @return
  63. */
  64. public static boolean checkCacheName(String cacheName) {
  65. Long cacheHoldTime = (Long) CACHE_MAP.get(cacheName + "_HoldTime");
  66. if (cacheHoldTime == null || cacheHoldTime == 0L) {
  67. return false;
  68. }
  69. if (cacheHoldTime < System.currentTimeMillis()) {
  70. remove(cacheName);
  71. return false;
  72. }
  73. return true;
  74. }
  75. }

image.gif
但是,如果要用它来代替mamecache或者redis或者tair来实现java本地缓存的话,还缺少一个缓存策略。最经典的缓存策略就是LRU(Least Recently Used )最近较少使用的缓存淘汰策略。可以使用ConcurrentHashMap与LinkedHashMap协同实现,或者直接使用Apache下的LRUMap来实现,具体实现方法我在之后的博客中会进行整理说明,这里暂不阐述。

2. Copy-On-Write

CopyOnWrite 容器又称COW,即写时复制容器,原理就是当我们往一个容器添加元素时,不直接往当前容器添加,而是先将当前容器进行Copy,复制出新的容器,然后往新的容器进行添加元素,添加好之后,再将原来的容器引用指向新的容器。
这样的好处就是,我们可以对CopyOnWrite容器进行并发的读,也不需要加锁,因为当前容器不会添加任何新的元素。
同时,代码底层通过加锁的方式,解决了多个写进程之间的同步问题。

  1. public boolean add(E e) {
  2. final ReentrantLock lock = this.lock;//重入锁
  3. lock.lock();
  4. try {
  5. Object[] elements = getArray();
  6. int len = elements.length;
  7. Object[] newElements = Arrays.copyOf(elements, len + 1);
  8. newElements[len] = e;
  9. setArray(newElements);
  10. return true;
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

image.gif
CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下:

  1. public class BlackListServiceImpl {
  2. private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>(
  3. 1000);
  4. public static boolean isBlackList(String id) {
  5. return blackListMap.get(id) == null ? false : true;
  6. }
  7. public static void addBlackList(String id) {
  8. blackListMap.put(id, Boolean.TRUE);
  9. }
  10. /**
  11. * 批量添加黑名单
  12. *
  13. * @param ids
  14. */
  15. public static void addBlackList(Map<String,Boolean> ids) {
  16. blackListMap.putAll(ids);
  17. }
  18. }

image.gif
 代码很简单,但是使用CopyOnWriteMap需要注意两件事情:
  1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
  2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。
CopyOnWrite的缺点:
  内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。
  针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap
  数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。

3. 并发Queue

在并发队列上,JDK提供了两套实现,两者都实现或者说继承了Queue接口

1)ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能。通常来说,他都性能会优于BlockingQueue。
ConcurrentLinkedQueue是一个机遇链接节点的无界线程安全队列。该队列的元素准讯先进先出的原则,头是最先加入的,尾是最近加入的,不允许有null元素,结构如下
JAVA的并发编程(五): 同步类容器和并发类容器 - 图9image.gif
ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tair节点初始化等于head节点。
入队列
将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。
JAVA的并发编程(五): 同步类容器和并发类容器 - 图11image.gif
上面的分析让我们从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用CAS算法来入队的。
出队列
将出队节点从队列移除
JAVA的并发编程(五): 同步类容器和并发类容器 - 图13image.gif
从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。
由于博主本人暂时不了解CAS执行过程,有兴趣的朋友可以移步去看原文:
聊聊并发(六)ConcurrentLinkedQueue的实现原理分析

2)BlockingQueue

  • ArrayBlockingQueue

    基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

  • LinkedBlockingQueue

    基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

  • SynchronousQueue

    SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列。

阻塞队列中的常用方法:
put方法用来向队尾存入元素,如果队列满,则等待;
  take方法用来从队首取元素,如果队列为空,则等待;
  offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
  poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;
应用场景:
在一个高并发的环境下,推荐使用ArrayBlockingQueue来限制队列中的请求,防止无界队列可能导致的内存耗尽问题。
当并发不是非常严重时,可以使用LinkedBlockingQueue,提高请求处理的实时性
当并发量很低是,可以使用SynchronousQueue,请求直接提交到生产者(基本很少发生)

  • PriorityBlockingQueue

    基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,必须实现Comparable接口),基于对象的compareTo方法判断优先级(-1,0,1),越小越先出队。此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),不支持null值。

  • DelayQueue

    DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延时期满时才能从队列中提取元素。 由于使用PriorityQueue来实现,传入的对象必须实现Comparable接口。

使用场景参考下图
JAVA的并发编程(五): 同步类容器和并发类容器 - 图15image.gif