一、简介
Java 中实现同步除了有关键字synchronized 外,还有JDK5发布并发包下的锁。并发包下的锁其实也是采用了CAS算法和队列实现的,下面让我们了解一下具体的实现。
二、Lock 接口
2.1 介绍
锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。
2.2 Lock 接口的定义
public interface Lock {
/**
* 获取锁,调用该方法当前线程会获取锁
*/
void lock();
/**
*可中断获取锁,即在获取锁的过程中,可以中断当前线程
*/
void lockInterruptibly() throws InterruptedException;
/**
* 尝试非阻塞的获取锁,调用该方法后立即返回,返回值为true 则获取到锁,false则没有
*/
boolean tryLock();
/**
* 超时尝试获得锁:
* 有以下三种情况
* 1. 当前线程在给超时时间内获取到锁
* 2. 当前线程在超时时间内被中断
* 3. 超时时间结束,返回false
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁
*/
void unlock();
/**
*获取等待通知组件,该组件与当前锁绑定,线程只有获取到了锁后,才能调用Conditon 中的 await(), signal()等等待通知方法
* 与Object 对象中的wait,notify 类似
*/
Condition newCondition();
}
2.3 Lock 接口比关键字 synchronized 的优势
- 可以尝试非阻塞的获取锁
- 能超时获取锁
- 能被中断地获取锁
三、队列同步器(AQS, AbstractQueuedSynchronizer)
3.1 介绍
- 队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
- 同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。
3.2 基本结构图示
3.3 AQS 供子类重写的方法
// 互斥模式下使用:尝试获取锁
protected boolean tryAcquire(int arg)
// 互斥模式下使用:尝试释放锁
protected boolean tryRelease(int arg)
//共享模式下使用:尝试获取锁
protected int tryAcquireShared(int arg)
// 共享模式下使用:尝试释放锁
protected boolean tryReleaseShared(int arg)
// 如果当前线程独占着锁,返回true
protected boolean isHeldExclusively()
3.4 AQS 中的模板方法
//独占式获取锁
public final void acquire(int arg) {
//1.tryAcquire(arg)尝试获取锁失败
//2. 则进入队列中acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
//3. 没有能进入队列,则打断线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//可响应中断地获取锁
public final void acquireInterruptibly(int arg)
//在acquireInterruptibly方法的基础上加上超时限制,如果在超时时间内没有获取锁,则返回false
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
//共享式获取锁
public final void acquireShared(int arg)
//可打断,共享式获取锁
public final void acquireSharedInterruptibly(int arg)
//可打断,超时机制,共享式获取锁
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
//独占式释放锁
public final boolean release(int arg)
//共享式释放锁
public final boolean releaseShared(int arg)
3.5 AQS 内部节点类
static final class Node {
/** 标记该节点是共享模式 waiting */
static final Node SHARED = new Node();
/** 标识一个节点是互斥模式 waiting*/
static final Node EXCLUSIVE = null;
/** waitStatus 等待状态的值 1标识该线程已被取消*/
static final int CANCELLED = 1;
/** 标识后继节点需要唤醒*/
static final int SIGNAL = -1;
/** 标识线程等待在一个条件上 */
static final int CONDITION = -2;
/**
* 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)
*/
static final int PROPAGATE = -3;
/**
* 当前节点保存的线程对应的等待状态
*/
volatile int waitStatus;
/**
* aqs同步队列节点前驱
*/
volatile Node prev;
/**
* aqs同步队列节点后继
*/
volatile Node next;
/**
* 当前节点保存的线程;
*/
volatile Thread thread;
/**
* 下一个等待在条件上的节点(Condition等待队列指针)
*/
Node nextWaiter;
/**
* 判断下一个等待在条件上的节点是否是共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前一个节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
3.6 AQS 操作图示
- 添加节点到AQS 的过程(没有获取到锁,加入队列)
同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
- 成功获取锁,首节点设置的过程(公平锁)
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可
- 独占式获取锁的流程图
- 独占式超时获取锁
四、LockSupport工具
4.1 方法(以下方法底层都采用sun.misc.Unsafe 类实现)
#阻塞当前线程
public static void park()
#超时阻塞当前线程
public static void parkNanos(Object blocker, long nanos)
#阻塞当前线程,直到deadline
public static void parkUntil(Object blocker, long deadline)
#唤醒处于阻塞的线程
public static void unpark(Thread thread)
4.2 简单使用
Thread thread = new Thread(() -> {
long a = System.currentTimeMillis();
//阻塞线程
LockSupport.park();
System.out.println("线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");
});
thread.start();
Thread.sleep(1000);
//唤醒线程
//也可以使用打断线程来唤醒
//thread.interrupt();
LockSupport.unpark(thread);
//输出:
线程完成! 时间: 1秒
4.3 比较LockSupport.park(),Object.wait() 等方法
public class TestLockSupport {
private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
testObjectWait();
testLockSupport();
}
public static void testLockSupport() throws InterruptedException {
Thread thread = new Thread(() -> {
long a = System.currentTimeMillis();
//阻塞线程
//1. 可以在任意地方调用
//2. 不需要捕获异常
//3.不会释放锁资源,它只是单纯阻塞线程
LockSupport.park();
System.out.println("testLockSupport 线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");
});
thread.start();
Thread.sleep(1000);
//主线程唤醒thread线程
//1. 可以唤醒指定线程
//2. 可以在任意地方调用
LockSupport.unpark(thread);
}
public static void testObjectWait() throws InterruptedException {
Thread thread = new Thread(() -> {
long a = System.currentTimeMillis();
//阻塞线程
synchronized (lock) {
try {
//方法Object.wait调用前,需要先获取对象监视器,即在同步方法内或同步代码块内调用
//Object.wait会抛出InterruptedException异常
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("testObjectWait 线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");
});
thread.start();
Thread.sleep(1000);
//主线程唤醒thread线程
//1. 随机唤醒被阻塞的一个线程
//2. 需要先获取对象监视器,即在同步方法内或同步代码块内调用
//3. 不能在Object.wait()方法前调用,否则抛出IllegalMonitorStateException异常
synchronized (lock) {
lock.notify();
}
}
}
总结:
- Object.wait()方法,需要先获取对象监视器,即在同步方法内或同步代码块内调用
- Object.wait() 会抛出InterruptedException异常
- LockSupport.park() 可以在任意地方调用
- LockSupport.park() 不需要捕获异常
- LockSupport.park() 不会释放锁资源,它只是单纯阻塞线程
- Object.notify() 随机唤醒被阻塞的一个线程
- Object.notify() 需要先获取对象监视器,即在同步方法内或同步代码块内调用
- Object.notify() 不能在Object.wait()方法前调用,否则抛出IllegalMonitorStateException异常
- LockSupport.unpark(Thread thread) 可以唤醒指定线程
- LockSupport.unpark(Thread thread) 可以在任意地方调用
五、Condition接口
5.1 接口定义
public interface Condition {
/**
* 阻塞,直到被唤醒或被中断
*/
void await() throws InterruptedException;
/**
*阻塞,直到被唤醒,不能被中断
*/
void awaitUninterruptibly();
/**
* 超时阻塞
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 超时阻塞
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 阻塞
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
*唤醒
*/
void signal();
/**
* 唤醒全部
*/
void signalAll();
}
5.2 Condition 实现的等待队列的基本结构
5.3 同步队列与等待队列图示
Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。
5.4 利用Condition的等待通知过程图示
5.5 唤醒图示
在等待队列中的唤醒节点,需要考虑并发问题,需要采用了cas 重新入同步队列,保证是队列的尾节点
六、实例
在动手实现的锁中,我们自己实现的同步队列。 在这里我们使用AQS来实现一个公平、可重入锁。
public class MyLock {
private Sync sync = new Sync();
private class Sync extends AbstractQueuedSynchronizer {
public Sync() {
}
@Override
protected boolean tryAcquire(int state) {
final Thread current = Thread.currentThread();
//获取当前状态
int c = getState();
//判断是否持有锁
if (c == 0) {
//这里实现的是公平锁
//判断当前线程是否是同步队列头指针指向的线程节点
//cas修改状态
if (!hasQueuedPredecessors() && compareAndSetState(0, state)) {
//获取到锁,标识当前线程持有锁
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { //c>0 已持有锁,处理可重入锁操作
//修改状态
int nextc = c + state;
if (nextc < 0)
//防止超出int 值范围
throw new Error("超出锁可重入次数");
//设置状态
setState(nextc);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int state) {
//修改状态
int c = getState() - state;
//判断释放锁的线程是否是持有锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果状态值为0, 则线程释放锁
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
- 测试
public class MyLockTest {
static MyLock lock=new MyLock();
public static void test(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+ " 进入test "+ System.currentTimeMillis());
test2();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() +" 释放锁"+ System.currentTimeMillis());
}
}
public static void test2(){
lock.lock();
try {
System.out.println("进入test2"+ System.currentTimeMillis());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
System.out.println("test2释放锁"+ System.currentTimeMillis());
}
}
public static void main(String[] args) {
//测试是否可重入
//test锁没有释放,也可以进入方法test2
test();
//公平锁测试
//按顺序输出
// new Thread(()->{test();},"Thread-B").start();
// new Thread(()->{test();},"Thread-A").start();
// new Thread(()->{test();},"Thread-C").start();
}
}
在并发包下,实现锁的基础原理知识了解完毕。以上自定义锁的实现,就是摘自ReentrantLock 锁的公平锁实现,接下来该了解一下,并发包下锁的实现细节,如公平锁和非公平锁实现区别、共享锁和非共享锁实现区别、如何加入超时机制、如何加入中断机制等等。
参考
- 《Java并发编程的艺术》
- 死磕 java同步系列之AQS起篇