JDK 提供的这些容器大部分在 java.util.concurrent 包中。
- ConcurrentHashMap : 线程安全的 HashMap
- CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector。
- ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列。
- BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
- ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。
一、ConcurrentHashMap
1.1 ConcurrentHashMap的结构
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁,在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。
一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获取它对应的Segment锁。1.2 ConcurrentHashMap的初始化
1. 初始化segments数组
segments数组的长度ssize是通过con过concurrencyLevel计算得到的,为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度时2的N次方,所以必须计算出一个大于或者等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。if(concurrencyLevel>MAX_SEGMENTS)
concurrencyLevel=MAX_SEGMENTS;
int sshift=0;
// segments数组的长度
int sszie=1;
while(ssize<concurrencyLevel){
++sshift;
ssize<<=1;
}
segmentShift=32-sshift;
segmentMask=ssize-1;
this.segments=Segment.newArray(ssize);
concurrencyLevel的最大值是65535,这就意味着segments数组的最大长度ssize是65536,对应的二进制是16位。
2. 初始化segmentShift和segementMask
这两个全局变量需要在定位segment时的散列算法时使用,sshift
=ssize
从1向左移位的次数,在默认情况下concurrencyLevel
等于16(1需要向左移动4次),所以sshift默认等于4;
segmentShift
用于定位参与散列运算的位数,sgmentShift=32-sshift
,所以默认segmentShift为28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位。segmentMask
是散列运算的掩码,等于ssize-1,即默认值为15,掩码的二进制位数都是1。
因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位。
3. 初始化每个segment
输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组的每个segment。
if(initialCapacity>MAXIMUM_CAPACITY)
initialCapacity=MAXIMUM_CAPACITY;
int c=initialCapacity/ssize;
if(c*ssize<initialCapacity)
++c;
int cap=1;
while(cap<c)
cap<<=1;
for(int i=0;i<this.segments.length;++i)
this.segments[i]=new Segment<K,V>(cap,loadFactor);
cap
就是segment里HashEntry数组的长度,它等于initialCapacity
除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity
等于16,loadFactor=0.75,cap=1,threshold=0。
1.3 定位Segment
ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到Segment。ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法来对元素的hashCode进行一次再散列。之所以进行再散列目的是减少散列冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。
private static int hash(int h){
h += (h<<15) ^ 0xffffcd7d;
h ^= (h>>>10);
h += (h<<3);
h ^= (h>>>6);
h += (h<<2) + (h<<14);
return h ^ (h>>>16);
}
ConcurrentHashMap通过以下散列算法定位segment
final Segment<K,V> segmentFor(int hash){
return segments[(hash >>> segmentShift) & segmentMask];
}
默认情况下segmentShift为28,segmentMask为15,再散列后的数最大是32位二进制数据,向右无符号移动28位,就是让高4位参与到散列运算中。
1.4 ConcurrentHashMap的操作
1. get操作
Segment的get操作需要先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。
public V get(Object key){
int hash=hash(key.hashCode());
return segementFor(hash).get(key,hash);
}
- 整个get过程不需要加锁,除非读到的值是空才会加锁重读。因为get方法里将要使用的共享变量都定义成volatile类型。例如用于统计当前Segment大小的count字段和用于存储值的HashEntry的value。
- 不会读到过期的值是因为根据Java内存模型的happen-before原则,对volatile字段的写入操作优先于读操作。
在定位元素的代码中可以看出,定位HashEntry和定位Segment的散列算法虽然一样,都是与数组的长度减去1再想与,但是相与的值不一样。定位Segment使用的是元素的hashcode通过再散列后得到的值的高位,而定位HashEntry直接使用的是再散列后的值。
2. put操作
由于put方法需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。
put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经过两个步骤,第一步是判断是否需要对Segment里的HashEntry数组进行扩容,第二步是定位添加元素的位置,然后将其放在HashEntry数组里。
是否需要扩容:
在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈值,则对数组进行扩容。
如何扩容:
在扩容的时候,首先会创建一个容量是原来容量两倍的数组,将原数组里的元素进行再散列后插入到新的数组中。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
3. size操作
统计整个ConcurrentHashMap里元素的大小,就要统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量。但是在多线程场景下,简单地把所有Segment的count相加会造成线程安全问题。
又因为在累加count操作的过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化(使用modCount变量,在put、remove和clean方法里操作元素前都会将变量modCount进行加1,在统计size前后比较modCount是否发生变化来判断容器的大小是否发生变化),则在采用加锁的方法来统计所有Segment的大小。
二、CopyOnWriteArrayList
2.1 CopyOnWriteArrayList简介
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable
在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。
这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?
2.2 CopyOnWriteArrayList实现原理
CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。
从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。
2.3 CopyOnWriteArrayList的操作
1. 读取操作
读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
final Object[] getArray() {
return array;
}
2. 写入操作
CopyOnWriteArrayList 写入操作 add()方法在添加集合的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝新数组
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//释放锁
}
}
三、ConcurrentLinkedQueue
在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:
- 使用阻塞算法:用一个锁或者两个锁。
-
3.1 ConcurrentLinkedQueue的结构
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序。ConcurrentLinkedQueue由head节点和tail节点组成,每个节点由节点元素和指向下一个节点的引用组成。默认head节点存储的元素为空,tail节点等于head节点。
3.2 入队列
1. 入队列过程
入队列就是将入队节点添加到队列的尾部。
将入队节点设置成当前队列尾节点的下一个节点
- 更新tail节点。
- 如果tail节点的next节点不为空,则将入队节点设置成tail节点。
- 如果tail节点的next节点为空,则将入队节点设置成tail的next节点。
所以tail节点不总是尾节点。
但是从多线程的角度分析,如果一个线程正在入队,那么就必须先获取尾节点,然后设置尾节点的下一个节点为入队节点。但这是有可能有另一个线程插队。那么队列的尾节点就会发生变化。
public boolean offer(E e){
if(e == null) throw new NullPointerException();
// 入队前,创建一个入队节点
Node<E> n = new Node<E> e;
retry:
// 死循环,入队不成功反复入队
for(;;){
// 创建一个指向tail节点的引用
Node<E> t = tail;
// p用来表示队列的尾节点,默认情况下等于tail节点
Node<E> p = t;
for(int hops=0;;hops++){
// 获得p节点的下一个节点
Node<E> next = succ(p);
//next节点不为空,说明p不是尾节点,需要更新p后再将它指向next节点。
if(next!=null){
// 循环了两次及其以上,并且当前节点还不是尾节点
if(hops > HOPS && t!= tail)
continue retry;
p = next;
}
//如果p是尾节点,则设置p节点的next节点为入队节点
else if(p.casNext(null,n)){
//如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点
//更新失败表示有其他线程成功更新了tail节点
if(hops>=HOPS)
//更新tail节点,允许失败
casTail(t,n);
return true;
}
// p有next节点,表示p的next节点是尾节点,则重新设置p节点
else{
p=succ(p);
}
}
}
}
从源码的角度看,整个入队过程就主要做两件事:
- 定位出尾节点
- 使用CAS算法将入队节点设置成尾节点的next节点,如果不成功则重试
2. 定位尾节点
tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点。尾节点可能是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if判断就是判断tail是否有next节点,有则表示next节点有可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况:只有一种可能就是p节点和p的next节点都是空,表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。final Node<E> succ(Node<E> p){
Node<E> next=p.getNext();
return (p == next) ? head : next;
}
3. 设置队列节点为尾节点
p.casNext(null,n)方法用于将入队节点设置成为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不为null,表示其他线程更新了尾节点,则需要重新获取当前队列的尾节点。4. HOPS的设计意图
如果将入队代码写成如下的样子:
但是这么做有个缺点就是,每次都需要使用循环CAS更新tail节点,所以hops变量的作用就凸显了,使用hops变量来控制减少tail节点的更新频率,并不是每次节点入队后都将tail节点更新成尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS(默认为1)才更新tail节点,tail和尾节点的距离越长,使用CAS更新tail节点的次数就会越少,但是长距离带来的负面效果就是每次入队定位尾节点的时间越长,但是这样仍能提高效率,因为本质上看它是通过增加volatile变量的读来减少对volatile变量的写。public boolean offer(E e){
if(e == null)
throw new NullPointerException();
Node<E> n= new Node<E>(e);
for(;;){
Node<E> t=tail;
if(t.casNext(null,n) && casTail(t,n)){
return true;
}
}
}
3.3 出队列
出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用。
- 当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。
- 当head节点里没有元素时,出队操作才会更新head节点。
public E poll() {
Node<E> h = head;
// p表示头节点,需要出队的节点
Node<E> p = h;
for (int hops=0;;hops++) {
// 获取p节点的元素
E item = p.getItem();
// 如果p节点的元素不为空使用CAS设置p节点引用的元素为null
// 如果成功则返回p节点的元素
if (item != null && p.casItem(item, null)) {
if (hops>=HOPS) {
// 将p节点下一个节点设置成head节点
Node<E> q= p.getNext();
updateHead(h, ((q = p.next) != null) ? q : p);
}
return item;
}
// 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改。
// 那么获取p节点的下一个节点
Node<E> next=succ(p);
// 如果p的下一个节点也为空,说明这个队列已经空了
if(next==null){
// 更新头节点
updateHead(h,p);
break;
}
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
p=next;
}
return null;
}
- 首先获取头节点的元素。
- 然后再判断头节点元素是否为空。
- 如果为空,表示另一个线程已经进行了一次出队操作将该节点的元素取走。
- 如果不为空,则使用CAS的方式将头节点的引用设置成null。
- 如果CAS成功,则直接返回头节点的元素
- 如果CAS不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
四、Java中的阻塞队列——BlockingQueue
当队列容器已满,生产者线程会被阻塞,直到队列未满;
当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
4.1 ArrayBlockingQueue
ArrayBlockingQueue是一个用数组来实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排序。
public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable{}
ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue,可采用如下代码:
private static ArrayBlockingQueue<Integer> blockingQueue =
new ArrayBlockingQueue<Integer>(10,true);
4.2 LinkedBlockingQueue
LinkedBlockingQueue 底层基于单向链表实现的有界阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE 。
/**
*某种意义上的无界队列
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
*有界队列
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
4.3 PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。
PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
4.4 DelayQueue
DelayQueue是一个支持延迟获取元素的无界阻塞队列。队列使用PriorityQueue实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有延迟期满时才能从队列中提取元素。
4.5 SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列,默认情况下线程采用非公平性策略访问队列。
4.6 LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列,相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法
1. transfer()
如果当前由消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻transfer给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费后才返回。
2. tryTransfer()
tryTransfer方法使用来试探生产者传入的元素是否能直接传给消费者,如果没有消费者等待接收元素,直接返回false。
4.7 LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构构成的双向阻塞队列。
五、AQS
AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks 包下面。AQS 就是一个抽象类,主要用来构建锁和同步器。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
}
AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7) 等等皆是基于 AQS 的。
5.1 AQS原理
AQS 核心思想是:
- 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
- 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。
看个 AQS(AbstractQueuedSynchronizer)原理图:AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
状态信息通过 protected 类型的getState(),setState(),compareAndSetState() 进行操作
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
5.2 AQS对资源的共享方式
1. Exclusive(独占)
只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的定义做介绍:
- 公平锁 :按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁 :当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。
说明:下面这部分关于 ReentrantLock 源代码内容节选自:https://www.javadoop.com/post/AbstractQueuedSynchronizer-2 ,这是一篇很不错文章,推荐阅读。
下面来看 ReentrantLock 中相关的源代码:
ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 中公平锁的 lock 方法
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平锁的 lock 方法:
static final class NonfairSync extends Sync {
final void lock() {
// 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里没有对阻塞队列进行判断
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
总结:公平锁和非公平锁只有两处不同:
- 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
- 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
2.Share(共享)
多个线程可同时执行,如 Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。
5.3 AQS 底层使用了模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
- 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
- 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用,下面简单的给大家介绍一下模板方法模式,模板方法模式是一个很容易理解的设计模式之一。
模板方法模式是基于”继承“的,主要是为了在不改变模板结构的前提下在子类中重新定义模板中的内容以实现复用代码。 举个很简单的例子假如我们要去一个地方的步骤是:购票 buyTicket()->安检 securityCheck()->乘坐某某工具回家 ride() ->到达目的地 arrive()。我们可能乘坐不同的交通工具回家比如飞机或者火车,所以除了ride()方法,其他方法的实现几乎相同。我们可以定义一个包含了这些方法的抽象类,然后用户根据自己的需要继承该抽象类然后修改 ride()方法。
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。
再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(也可以不初始化为 N,不初始化为 N,state 减到 0 也会从 await()返回)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state 会 CAS(Compare and Swap)减 1。等到 state=0,会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。
所以 CountDownLatch 可以做倒计数器,减到 0 后唤醒的线程可以对线程池进行处理,比如关闭线程池。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
推荐两篇 AQS 原理和相关源码分析的文章:
- Java 并发之 AQS 详解
-
六、Semaphore(信号量)
synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
示例代码如下:/**
*
* @author Snailclimb
* @date 2018年9月30日
* @Description: 需要一次性拿一个许可的情况
*/
public class SemaphoreExample1 {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 一次只能允许执行的线程数量。
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
test(threadnum);
semaphore.release();// 释放一个许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
执行 acquire() 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的 acquire() 方法。然而,其实并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制获取某种资源的线程数量。
当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 释放5个许可
除了 acquire() 方法之外,另一个比较常用的与之对应的方法是 tryAcquire() 方法,该方法如果获取不到许可就立即返回 false。
Semaphore 有两种模式,公平模式和非公平模式。 公平模式: 调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO;
- 非公平模式: 抢占式的。
Semaphore 对应的两个构造方法如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。
Semaphore 与 CountDownLatch 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits。当执行任务的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 release() 方法,release() 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过 permits 数量的线程能自旋成功,便限制了执行任务线程的数量。
七、CountDownLatch (倒计时器)
CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。7.1 CountDownLatch 的两种典型用法
1. 某一线程在开始运行前等待 n 个线程执行完毕。
将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
2. 实现多个线程开始执行任务的最大并行性。
注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 (new CountDownLatch(1)),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。
7.2 CountDownLatch 的使用示例
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: CountDownLatch 使用方法示例
*/
public class CountDownLatchExample1 {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
test(threadnum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
countDownLatch.countDown();// 表示一个请求已经被完成
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println(“finish”);。
与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行自己的任务。
再插一嘴:CountDownLatch 的 await() 方法使用不当很容易产生死锁,比如我们上面代码中的 for 循环改为:for (int i = 0; i < threadCount-1; i++) {
.......
}
这样就导致 count 的值没办法等于 0,然后就会导致一直等待。
7.3 CountDownLatch 的不足
CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
7.4 CountDownLatch 相常见面试题
CountDownLatch 怎么用?应用场景是什么?
- CountDownLatch 和 CyclicBarrier 的不同之处?
- CountDownLatch 类中主要的方法?
八、CyclicBarrier(循环栅栏)
CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。
CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
再来看一下它的构造函数: ```java public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
<a name="ZWWvn"></a>
## 8.1 CyclicBarrier 的应用场景
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。
<a name="p2v47"></a>
## 8.2 CyclicBarrier 源码分析
当调用 CyclicBarrier 对象调用 await() 方法时,实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。
```java
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait(false, 0L):
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 锁住
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// cout减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
总结:CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
8.3 CyclicBarrier 和 CountDownLatch 的区别
下面这个是国外一个大佬的回答:
CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)
对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。