一、简介
1.1 介绍
Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。
1.2 Lock接口实现的锁 比synchronized关键字有优势:
- 可以尝试非阻塞的获取锁
- 能超时获取锁
- 能中断地获取锁
1.3 在Java 中的锁的相关概念:
- 乐观锁 VS 悲观锁
- 自旋锁 VS 适应性自旋锁 VS 排队自旋锁
- 无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁
- 公平锁 VS 非公平锁
- 可重入锁 VS 非可重入锁
- 独享锁 VS 共享锁
- MCS锁、CLH锁
具体介绍请看美团技术团队“不可不说的Java“锁”事”
为了深刻了解JDK5新增的Lock 接口,这里我们动手实现一个公平、非可重入的锁
二、分析实现锁的条件
2.1 如何判断锁的状态?
对于多个线程同时访问一个共享资源,怎么判断已经有线程占用了呢?对于关键字synchronized是更改对象头中的MarkWord,标记为已加锁或未加锁。那么我们需要做的是模仿对象头中的MarkWord,定义个变量来标记是否已加锁,定义变量state>0 占用锁。
2.2 如何处理未获取到锁的线程?
对于关键字synchronized实现的锁,如果已经有线程获取到了锁,那么其他线程都不能访问synchronized方法或synchronized代码块,从而进入一个等待队列。所以,我们需要一个队列来保存等待获取锁的线程。
2.3 如何再次争取锁?
如果获取到锁的线程处理完,释放了锁。那么被放到队列中线程,如何感知到这个情况的发生呢? 这里就涉及到线程间的通信问题,上面我们定义的变量来判断锁的状态,因为这个变量需要多个线程读,所以需要加上volatile关键字,以便感知。又因为不知道获取到锁那个线程什么是否结束,只能不断的循环判断和阻塞,这就需要CAS算法和unsafe类阻塞唤醒线程(这里也可以用原子类)。
unsafe 的使用请看死磕 java魔法类之Unsafe解析
2.4 图示(来自彤哥读源码)
三、实现
3.1 定义变量
/*** 锁状态*/private volatile int state;
3.2 定义队列节点(这里采用双向链表)
/*** 头指针*/private volatile Node head;/*** 尾指针*/private volatile Node tail;/*** 定义队列节点*/private class Node {//数据域Thread data;//前驱(方便判断节点的前驱是否为头节点)Node pre;//后继Node next;public Node() {}public Node(Thread thread, Node prev) {this.data = thread;this.pre = prev;}}/*** 入队** @return*/private Node enqueue() {for (; ; ) {// 获取尾节点Node t = tail;// 构造新节点Node node = new Node(Thread.currentThread(), t);// 不断尝试原子更新尾节点if (compareAndSetTail(t, node)) {// 更新尾节点成功了,让原尾节点的next指针指向当前节点t.next = node;return node;}}}
3.3 使用unsafe类
private static Unsafe unsafe;/*** 变量state在内存中的偏移量*/private static final long stateOffset;/*** 变量tail在内存中的偏移量*/private static final long tailOffset;static {try {Field f = Unsafe.class.getDeclaredField("theUnsafe");f.setAccessible(true);unsafe = (Unsafe) f.get(null);stateOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));tailOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));} catch (Throwable ex) {throw new Error(ex);}}/*** CAS更新锁的状态** @param expect* @param update* @return*/private boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}/*** 用于入队列时,保证线程安全** @param expect* @param update* @return*/private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}
3.4 加锁操作
/*** 获取锁*/public final void acquire() {// 尝试更新state字段,更新成功说明占有了锁if (compareAndSetState(0, 1)) {return;}//没有获取到锁,则进入队列Node node = enqueue();Node pre = node.pre;//阻塞等待再次获取锁// 再次尝试获取锁,需要检测上一个节点是不是head,按入队顺序加锁while (node.pre != head || !compareAndSetState(0, 1)) {// 未获取到锁,阻塞unsafe.park(false, 0L);}//获取到锁,出队列head = node;// 清空当前节点的内容,协助GCnode.data = null;// 将上一个节点从链表中剔除,协助GCnode.pre = null;pre.next = null;}
3.5 解锁操作
/*** 释放锁** @return*/public final boolean release() {// 把state更新成0,这里不需要原子更新,因为同时只有一个线程访问到这里state = 0;// 下一个待唤醒的节点Node next = head.next;// 下一个节点不为空,就唤醒它if (next != null) {unsafe.unpark(next.data);return true;}return false;}
3.6 测试
public class Main {private static int count = 0;public static void main(String[] args) throws InterruptedException {MyLock lock = new MyLock();//开1000个线程CountDownLatch countDownLatch = new CountDownLatch(1000);IntStream.range(0, 1000).forEach(i -> new Thread(() -> {lock.lock();try {//每个线程计算1万IntStream.range(0, 10000).forEach(j -> {count++;});} finally {lock.unlock();}countDownLatch.countDown();}, "Thread-" + i).start());countDownLatch.await();System.out.println(count);}}//输出结果为10000000,则我们定义的锁是可用的
完整代码
/*** @author hdj* @version 1.0* @date 11/28/19 4:41 PM* @description: 线程队列--采用双向链表*/public class MyAQS {/*** 锁状态*/private volatile int state;/*** 头指针*/private volatile Node head;/*** 尾指针*/private volatile Node tail;private static Unsafe unsafe;/*** 变量state在内存中的偏移量*/private static final long stateOffset;/*** 变量tail在内存中的偏移量*/private static final long tailOffset;static {try {Field f = Unsafe.class.getDeclaredField("theUnsafe");f.setAccessible(true);unsafe = (Unsafe) f.get(null);stateOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));tailOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));} catch (Throwable ex) {throw new Error(ex);}}public MyAQS() {this.head = this.tail = new Node();}/*** CAS更新锁的状态** @param expect* @param update* @return*/private boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}/*** 用于如队列时,保证线程安全** @param expect* @param update* @return*/private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}/*** 定义队列节点*/private class Node {//数据域Thread data;//前驱Node pre;//后继Node next;public Node() {}public Node(Thread thread, Node prev) {this.data = thread;this.pre = prev;}}/*** 入队** @return*/private Node enqueue() {//这里的循环采用for(;;)比采用while(true)快,不用解析判断变量for (; ; ) {// 获取尾节点Node t = tail;// 构造新节点Node node = new Node(Thread.currentThread(), t);// 不断尝试原子更新尾节点if (compareAndSetTail(t, node)) {// 更新尾节点成功了,让原尾节点的next指针指向当前节点t.next = node;return node;}}}/*** 获取锁*/public final void acquire() {// 尝试更新state字段,更新成功说明占有了锁if (compareAndSetState(0, 1)) {return;}//没有获取到锁,则进入队列Node node = enqueue();Node pre = node.pre;//阻塞等待再次获取锁// 再次尝试获取锁,需要检测上一个节点是不是head,按入队顺序加锁while (node.pre != head || !compareAndSetState(0, 1)) {// 未获取到锁,阻塞unsafe.park(false, 0L);}//获取到锁,出队列head = node;// 清空当前节点的内容,协助GCnode.data = null;// 将上一个节点从链表中剔除,协助GCnode.pre = null;pre.next = null;}/*** 释放锁** @return*/public final boolean release() {// 把state更新成0,这里不需要原子更新,因为同时只有一个线程访问到这里state = 0;// 下一个待唤醒的节点Node next = head.next;// 下一个节点不为空,就唤醒它if (next != null) {unsafe.unpark(next.data);return true;}return false;}}public class MyLock {private MyAQS aqs = new MyAQS();/*** 锁*/public void lock() {aqs.acquire();}/*** 解锁*/public void unlock() {aqs.release();}}
