一、简介
1.1 介绍
Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。
1.2 Lock接口实现的锁 比synchronized关键字有优势:
- 可以尝试非阻塞的获取锁
- 能超时获取锁
- 能中断地获取锁
1.3 在Java 中的锁的相关概念:
- 乐观锁 VS 悲观锁
- 自旋锁 VS 适应性自旋锁 VS 排队自旋锁
- 无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁
- 公平锁 VS 非公平锁
- 可重入锁 VS 非可重入锁
- 独享锁 VS 共享锁
- MCS锁、CLH锁
具体介绍请看美团技术团队“不可不说的Java“锁”事”
为了深刻了解JDK5新增的Lock 接口,这里我们动手实现一个公平、非可重入的锁
二、分析实现锁的条件
2.1 如何判断锁的状态?
对于多个线程同时访问一个共享资源,怎么判断已经有线程占用了呢?对于关键字synchronized是更改对象头中的MarkWord,标记为已加锁或未加锁。那么我们需要做的是模仿对象头中的MarkWord,定义个变量来标记是否已加锁,定义变量state>0 占用锁。
2.2 如何处理未获取到锁的线程?
对于关键字synchronized实现的锁,如果已经有线程获取到了锁,那么其他线程都不能访问synchronized方法或synchronized代码块,从而进入一个等待队列。所以,我们需要一个队列来保存等待获取锁的线程。
2.3 如何再次争取锁?
如果获取到锁的线程处理完,释放了锁。那么被放到队列中线程,如何感知到这个情况的发生呢? 这里就涉及到线程间的通信问题,上面我们定义的变量来判断锁的状态,因为这个变量需要多个线程读,所以需要加上volatile关键字,以便感知。又因为不知道获取到锁那个线程什么是否结束,只能不断的循环判断和阻塞,这就需要CAS算法和unsafe类阻塞唤醒线程(这里也可以用原子类)。
unsafe 的使用请看死磕 java魔法类之Unsafe解析
2.4 图示(来自彤哥读源码)
三、实现
3.1 定义变量
/**
* 锁状态
*/
private volatile int state;
3.2 定义队列节点(这里采用双向链表)
/**
* 头指针
*/
private volatile Node head;
/**
* 尾指针
*/
private volatile Node tail;
/**
* 定义队列节点
*/
private class Node {
//数据域
Thread data;
//前驱(方便判断节点的前驱是否为头节点)
Node pre;
//后继
Node next;
public Node() {
}
public Node(Thread thread, Node prev) {
this.data = thread;
this.pre = prev;
}
}
/**
* 入队
*
* @return
*/
private Node enqueue() {
for (; ; ) {
// 获取尾节点
Node t = tail;
// 构造新节点
Node node = new Node(Thread.currentThread(), t);
// 不断尝试原子更新尾节点
if (compareAndSetTail(t, node)) {
// 更新尾节点成功了,让原尾节点的next指针指向当前节点
t.next = node;
return node;
}
}
}
3.3 使用unsafe类
private static Unsafe unsafe;
/**
* 变量state在内存中的偏移量
*/
private static final long stateOffset;
/**
* 变量tail在内存中的偏移量
*/
private static final long tailOffset;
static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
stateOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
tailOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));
} catch (Throwable ex) {
throw new Error(ex);
}
}
/**
* CAS更新锁的状态
*
* @param expect
* @param update
* @return
*/
private boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* 用于入队列时,保证线程安全
*
* @param expect
* @param update
* @return
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
3.4 加锁操作
/**
* 获取锁
*/
public final void acquire() {
// 尝试更新state字段,更新成功说明占有了锁
if (compareAndSetState(0, 1)) {
return;
}
//没有获取到锁,则进入队列
Node node = enqueue();
Node pre = node.pre;
//阻塞等待再次获取锁
// 再次尝试获取锁,需要检测上一个节点是不是head,按入队顺序加锁
while (node.pre != head || !compareAndSetState(0, 1)) {
// 未获取到锁,阻塞
unsafe.park(false, 0L);
}
//获取到锁,出队列
head = node;
// 清空当前节点的内容,协助GC
node.data = null;
// 将上一个节点从链表中剔除,协助GC
node.pre = null;
pre.next = null;
}
3.5 解锁操作
/**
* 释放锁
*
* @return
*/
public final boolean release() {
// 把state更新成0,这里不需要原子更新,因为同时只有一个线程访问到这里
state = 0;
// 下一个待唤醒的节点
Node next = head.next;
// 下一个节点不为空,就唤醒它
if (next != null) {
unsafe.unpark(next.data);
return true;
}
return false;
}
3.6 测试
public class Main {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
MyLock lock = new MyLock();
//开1000个线程
CountDownLatch countDownLatch = new CountDownLatch(1000);
IntStream.range(0, 1000).forEach(i -> new Thread(() -> {
lock.lock();
try {
//每个线程计算1万
IntStream.range(0, 10000).forEach(j -> {
count++;
});
} finally {
lock.unlock();
}
countDownLatch.countDown();
}, "Thread-" + i).start());
countDownLatch.await();
System.out.println(count);
}
}
//输出结果为10000000,则我们定义的锁是可用的
完整代码
/**
* @author hdj
* @version 1.0
* @date 11/28/19 4:41 PM
* @description: 线程队列--采用双向链表
*/
public class MyAQS {
/**
* 锁状态
*/
private volatile int state;
/**
* 头指针
*/
private volatile Node head;
/**
* 尾指针
*/
private volatile Node tail;
private static Unsafe unsafe;
/**
* 变量state在内存中的偏移量
*/
private static final long stateOffset;
/**
* 变量tail在内存中的偏移量
*/
private static final long tailOffset;
static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
stateOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
tailOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));
} catch (Throwable ex) {
throw new Error(ex);
}
}
public MyAQS() {
this.head = this.tail = new Node();
}
/**
* CAS更新锁的状态
*
* @param expect
* @param update
* @return
*/
private boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* 用于如队列时,保证线程安全
*
* @param expect
* @param update
* @return
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* 定义队列节点
*/
private class Node {
//数据域
Thread data;
//前驱
Node pre;
//后继
Node next;
public Node() {
}
public Node(Thread thread, Node prev) {
this.data = thread;
this.pre = prev;
}
}
/**
* 入队
*
* @return
*/
private Node enqueue() {
//这里的循环采用for(;;)比采用while(true)快,不用解析判断变量
for (; ; ) {
// 获取尾节点
Node t = tail;
// 构造新节点
Node node = new Node(Thread.currentThread(), t);
// 不断尝试原子更新尾节点
if (compareAndSetTail(t, node)) {
// 更新尾节点成功了,让原尾节点的next指针指向当前节点
t.next = node;
return node;
}
}
}
/**
* 获取锁
*/
public final void acquire() {
// 尝试更新state字段,更新成功说明占有了锁
if (compareAndSetState(0, 1)) {
return;
}
//没有获取到锁,则进入队列
Node node = enqueue();
Node pre = node.pre;
//阻塞等待再次获取锁
// 再次尝试获取锁,需要检测上一个节点是不是head,按入队顺序加锁
while (node.pre != head || !compareAndSetState(0, 1)) {
// 未获取到锁,阻塞
unsafe.park(false, 0L);
}
//获取到锁,出队列
head = node;
// 清空当前节点的内容,协助GC
node.data = null;
// 将上一个节点从链表中剔除,协助GC
node.pre = null;
pre.next = null;
}
/**
* 释放锁
*
* @return
*/
public final boolean release() {
// 把state更新成0,这里不需要原子更新,因为同时只有一个线程访问到这里
state = 0;
// 下一个待唤醒的节点
Node next = head.next;
// 下一个节点不为空,就唤醒它
if (next != null) {
unsafe.unpark(next.data);
return true;
}
return false;
}
}
public class MyLock {
private MyAQS aqs = new MyAQS();
/**
* 锁
*/
public void lock() {
aqs.acquire();
}
/**
* 解锁
*/
public void unlock() {
aqs.release();
}
}