一、同步类容器
为了方便编写出线程安全的程序,Java里面提供了一些线程安全类和并发工具,比如:同步容器、并发容器、阻塞队列、Synchronizer(比如CountDownLatch)。今天我们就来讨论下同步容器和并发类容器。
1. 为什么会出现同步容器?
在Java的集合容器框架中,主要有四大类别:List、Set、Queue、Map。
List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。
注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和队列这三大类容器。
像ArrayList、LinkedList都是实现了List接口,HashSet实现了Set接口,而Deque(双向队列,允许在队首、队尾进行入队和出队操作)继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList(实际上是双向链表)实现了了Deque接口。
像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。
如果有多个线程并发地访问这些容器时,就会出现问题。
因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。
所以,Java提供了同步容器供用户使用。
2. Java中的同步容器类
在Java中,同步容器主要包括2类:
1)Vector、Stack、HashTable
2)Collections类中提供的静态工厂方法创建的类
Vector矢量队列实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
Stack栈也是一个同步容器,它的方法也用synchronized进行了同步,它实际上是继承于Vector类。
HashTable实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有。
Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:
3. 同步类容器的缺陷
** 首先**,从同步容器的具体实现源码可知,同步容器中的方法采用了synchronized进行了同步,显然这必然会影响到执行性能。举个例子,进行同样多的插入操作,Vector的耗时是ArrayList的两倍。另外,由于Vector中的add方法和get方法都进行了同步,因此,在有多个线程进行访问时,如果多个线程都只是进行读取操作,那么每个时刻就只能有一个线程进行读取,其他线程便只能等待,这些线程必须竞争同一把锁。<br /> **其次**,同步类容器并不是所有操作都是线程安全的。同样以Vectory为例,他只能包装单个方法的操作是同步的,但在长度发生变化时,很容易抛数组下标越界异常ArrayIndexOutOfBoundsException。举个例子:
//线程1
for(int i=0;i<vector.size();i++)
vector.get(i);
//线程2
for(int i=0;i<vector.size();i++)
vector.remove(i);
假设vectory的大小为10,由于仅仅对vectory的方法做了同步,
那么在i=9时,线程1在获取size之后释放锁
线程2刚好也循环到了i=9的时候,它拿到锁,删除了index=9的数据并释放锁
那么线程1中的get方法拿到锁,要删除index=9的数据,然而这个时候数组最大的index为8,抛出异常。
二、并发类容器
很明显,由于同步类容器的状态都是串行化的,虽然实现了线程安全,但是严重降低了并发性,在多线程环境时,会严重影响应用程序的吞吐量,并不能满足我们当今互联网时代高并发的需求。
在Java 1.5中给我们提供了多种并发容器来替代同步类容器从而改善性能,位于java.util.concurrent目录下。
使用ConcurrentHashMap来代替基于散列的传统HashTable,而且添加了一些常见符合操作的支持。
使用了CopyOnWriteArrayList代替了Voctor
并发的CopyOnWriteArraySet
并发队列ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能队列,后者是阻塞队列,还有更多其他的队列等。
相比同步类容器,并发类容器主要解决了两个问题:
(1) 根据具体场景进行设计,尽量避免synchronized,提供并发性。
(2) 定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。
下面我详细讲解下其中的几个类
1. ConcurrentHashMap
ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:
从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。
具体细节分析大家有兴趣可以看:ConcurrentHashMap之实现细节
引申1:锁的粒度
上面说到这种策略就是很典型的通过减小锁的粒度来提升并发性能,也可以称之为锁分离。可能有人对锁的粒度这个词不是很了解,我在这里给大家做个形象的比喻。
为什么要加锁?加锁是为了防止不同的线程访问同一共享资源造成混乱 打个比方:人是不同的线程,卫生间是共享资源
你在上洗手间的时候肯定要把门锁上吧,这就是加锁,只要你在里面,这个卫生间就被锁了,只有你出来之后别人才能用。的的
什么是加锁粒度呢?所谓加锁粒度就是你要锁住的范围是多大。
比如你在家上卫生间,你只要锁住卫生间就可以了吧,不需要将整个家都锁起来不让家人进门吧,卫生间就是你的加锁粒度。
怎样才算合理的加锁粒度呢?
其实卫生间并不只是用来上厕所的,还可以洗澡,洗手。这里就涉及到优化加锁粒度的问题。
你在卫生间里洗澡,其实别人也可以同时去里面洗手,只要做到隔离起来就可以,如果马桶,浴缸,洗漱台都是隔开相对独立的,实际上卫生间可以同时给三个人使用,
当然三个人做的事儿不能一样。这样就细化了加锁粒度,你在洗澡的时候只要关上浴室的门,别人还是可以进去洗手的。如果当初设计卫生间的时候没有将不同的功能区域划分隔离开,就不能实现卫生间资源的最大化使用。这就是设计架构的重要性。
引申2:用ConcurrentHashMap实现本地缓存
基于ConcurrentHashMap的特性,我们可以用它来实现java本地缓存,来缓存类似黑名单,配置,城市列表,网站协议等等读多写少的数据,减少对数据库的访问。
public final class Cache {
/**
* 预缓存信息
*/
private static final Map<String, Object> CACHE_MAP = new ConcurrentHashMap<String, Object>();
/**
* 每个缓存生效时间12小时
*/
public static final long CACHE_HOLD_TIME_12H = 12 * 60 * 60 * 1000L;
/**
* 每个缓存生效时间24小时
*/
public static final long CACHE_HOLD_TIME_24H = 24 * 60 * 60 * 1000L;
/**
* 存放一个缓存对象,默认保存时间12小时
* @param cacheName
* @param obj
*/
public static void put(String cacheName, Object obj) {
put(cacheName, obj, CACHE_HOLD_TIME_12H);
}
/**
* 存放一个缓存对象,保存时间为holdTime
* @param cacheName
* @param obj
* @param holdTime
*/
public static void put(String cacheName, Object obj, long holdTime) {
CACHE_MAP.put(cacheName, obj);
CACHE_MAP.put(cacheName + "_HoldTime", System.currentTimeMillis() + holdTime);//缓存失效时间
}
/**
* 取出一个缓存对象
* @param cacheName
* @return
*/
public static Object get(String cacheName) {
if (checkCacheName(cacheName)) {
return CACHE_MAP.get(cacheName);
}
return null;
}
/**
* 删除所有缓存
*/
public static void removeAll() {
CACHE_MAP.clear();
}
/**
* 删除某个缓存
* @param cacheName
*/
public static void remove(String cacheName) {
CACHE_MAP.remove(cacheName);
CACHE_MAP.remove(cacheName + "_HoldTime");
}
/**
* 检查缓存对象是否存在,
* 若不存在,则返回false
* 若存在,检查其是否已过有效期,如果已经过了则删除该缓存并返回false
* @param cacheName
* @return
*/
public static boolean checkCacheName(String cacheName) {
Long cacheHoldTime = (Long) CACHE_MAP.get(cacheName + "_HoldTime");
if (cacheHoldTime == null || cacheHoldTime == 0L) {
return false;
}
if (cacheHoldTime < System.currentTimeMillis()) {
remove(cacheName);
return false;
}
return true;
}
}
但是,如果要用它来代替mamecache或者redis或者tair来实现java本地缓存的话,还缺少一个缓存策略。最经典的缓存策略就是LRU(Least Recently Used
)最近较少使用的缓存淘汰策略。可以使用ConcurrentHashMap与LinkedHashMap协同实现,或者直接使用Apache下的LRUMap来实现,具体实现方法我在之后的博客中会进行整理说明,这里暂不阐述。
2. Copy-On-Write
CopyOnWrite 容器又称COW,即写时复制容器,原理就是当我们往一个容器添加元素时,不直接往当前容器添加,而是先将当前容器进行Copy,复制出新的容器,然后往新的容器进行添加元素,添加好之后,再将原来的容器引用指向新的容器。
这样的好处就是,我们可以对CopyOnWrite容器进行并发的读,也不需要加锁,因为当前容器不会添加任何新的元素。
同时,代码底层通过加锁的方式,解决了多个写进程之间的同步问题。
public boolean add(E e) {
final ReentrantLock lock = this.lock;//重入锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下:
public class BlackListServiceImpl {
private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>(
1000);
public static boolean isBlackList(String id) {
return blackListMap.get(id) == null ? false : true;
}
public static void addBlackList(String id) {
blackListMap.put(id, Boolean.TRUE);
}
/**
* 批量添加黑名单
*
* @param ids
*/
public static void addBlackList(Map<String,Boolean> ids) {
blackListMap.putAll(ids);
}
}
代码很简单,但是使用CopyOnWriteMap需要注意两件事情:
1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。
CopyOnWrite的缺点:
内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。
针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。
数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
3. 并发Queue
在并发队列上,JDK提供了两套实现,两者都实现或者说继承了Queue接口
1)ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能。通常来说,他都性能会优于BlockingQueue。
ConcurrentLinkedQueue是一个机遇链接节点的无界线程安全队列。该队列的元素准讯先进先出的原则,头是最先加入的,尾是最近加入的,不允许有null元素,结构如下
ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tair节点初始化等于head节点。
入队列
将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。
上面的分析让我们从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用CAS算法来入队的。
出队列
将出队节点从队列移除
从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。
由于博主本人暂时不了解CAS执行过程,有兴趣的朋友可以移步去看原文:
聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
2)BlockingQueue
ArrayBlockingQueue
基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue
基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
SynchronousQueue
SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列。
阻塞队列中的常用方法:
put方法用来向队尾存入元素,如果队列满,则等待;
take方法用来从队首取元素,如果队列为空,则等待;
offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;
应用场景:
在一个高并发的环境下,推荐使用ArrayBlockingQueue来限制队列中的请求,防止无界队列可能导致的内存耗尽问题。
当并发不是非常严重时,可以使用LinkedBlockingQueue,提高请求处理的实时性
当并发量很低是,可以使用SynchronousQueue,请求直接提交到生产者(基本很少发生)
PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,必须实现Comparable接口),基于对象的compareTo方法判断优先级(-1,0,1),越小越先出队。此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),不支持null值。
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延时期满时才能从队列中提取元素。 由于使用PriorityQueue来实现,传入的对象必须实现Comparable接口。
使用场景参考下图