一、简介

1.1 介绍

Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。

1.2 Lock接口实现的锁 比synchronized关键字有优势:

  • 可以尝试非阻塞的获取锁
  • 能超时获取锁
  • 能中断地获取锁

1.3 在Java 中的锁的相关概念:

  • 乐观锁 VS 悲观锁
  • 自旋锁 VS 适应性自旋锁 VS 排队自旋锁
  • 无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁
  • 公平锁 VS 非公平锁
  • 可重入锁 VS 非可重入锁
  • 独享锁 VS 共享锁
  • MCS锁、CLH锁

具体介绍请看美团技术团队“不可不说的Java“锁”事

为了深刻了解JDK5新增的Lock 接口,这里我们动手实现一个公平、非可重入的锁

二、分析实现锁的条件

2.1 如何判断锁的状态?

对于多个线程同时访问一个共享资源,怎么判断已经有线程占用了呢?对于关键字synchronized是更改对象头中的MarkWord,标记为已加锁或未加锁。那么我们需要做的是模仿对象头中的MarkWord,定义个变量来标记是否已加锁,定义变量state>0 占用锁。

2.2 如何处理未获取到锁的线程?

对于关键字synchronized实现的锁,如果已经有线程获取到了锁,那么其他线程都不能访问synchronized方法或synchronized代码块,从而进入一个等待队列。所以,我们需要一个队列来保存等待获取锁的线程。

2.3 如何再次争取锁?

如果获取到锁的线程处理完,释放了锁。那么被放到队列中线程,如何感知到这个情况的发生呢? 这里就涉及到线程间的通信问题,上面我们定义的变量来判断锁的状态,因为这个变量需要多个线程读,所以需要加上volatile关键字,以便感知。又因为不知道获取到锁那个线程什么是否结束,只能不断的循环判断和阻塞,这就需要CAS算法和unsafe类阻塞唤醒线程(这里也可以用原子类)。

unsafe 的使用请看死磕 java魔法类之Unsafe解析

2.4 图示(来自彤哥读源码)

image.png

三、实现

3.1 定义变量

  1. /**
  2. * 锁状态
  3. */
  4. private volatile int state;

3.2 定义队列节点(这里采用双向链表)

  1. /**
  2. * 头指针
  3. */
  4. private volatile Node head;
  5. /**
  6. * 尾指针
  7. */
  8. private volatile Node tail;
  9. /**
  10. * 定义队列节点
  11. */
  12. private class Node {
  13. //数据域
  14. Thread data;
  15. //前驱(方便判断节点的前驱是否为头节点)
  16. Node pre;
  17. //后继
  18. Node next;
  19. public Node() {
  20. }
  21. public Node(Thread thread, Node prev) {
  22. this.data = thread;
  23. this.pre = prev;
  24. }
  25. }
  26. /**
  27. * 入队
  28. *
  29. * @return
  30. */
  31. private Node enqueue() {
  32. for (; ; ) {
  33. // 获取尾节点
  34. Node t = tail;
  35. // 构造新节点
  36. Node node = new Node(Thread.currentThread(), t);
  37. // 不断尝试原子更新尾节点
  38. if (compareAndSetTail(t, node)) {
  39. // 更新尾节点成功了,让原尾节点的next指针指向当前节点
  40. t.next = node;
  41. return node;
  42. }
  43. }
  44. }

3.3 使用unsafe类

  1. private static Unsafe unsafe;
  2. /**
  3. * 变量state在内存中的偏移量
  4. */
  5. private static final long stateOffset;
  6. /**
  7. * 变量tail在内存中的偏移量
  8. */
  9. private static final long tailOffset;
  10. static {
  11. try {
  12. Field f = Unsafe.class.getDeclaredField("theUnsafe");
  13. f.setAccessible(true);
  14. unsafe = (Unsafe) f.get(null);
  15. stateOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
  16. tailOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));
  17. } catch (Throwable ex) {
  18. throw new Error(ex);
  19. }
  20. }
  21. /**
  22. * CAS更新锁的状态
  23. *
  24. * @param expect
  25. * @param update
  26. * @return
  27. */
  28. private boolean compareAndSetState(int expect, int update) {
  29. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  30. }
  31. /**
  32. * 用于入队列时,保证线程安全
  33. *
  34. * @param expect
  35. * @param update
  36. * @return
  37. */
  38. private final boolean compareAndSetTail(Node expect, Node update) {
  39. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  40. }

3.4 加锁操作

  1. /**
  2. * 获取锁
  3. */
  4. public final void acquire() {
  5. // 尝试更新state字段,更新成功说明占有了锁
  6. if (compareAndSetState(0, 1)) {
  7. return;
  8. }
  9. //没有获取到锁,则进入队列
  10. Node node = enqueue();
  11. Node pre = node.pre;
  12. //阻塞等待再次获取锁
  13. // 再次尝试获取锁,需要检测上一个节点是不是head,按入队顺序加锁
  14. while (node.pre != head || !compareAndSetState(0, 1)) {
  15. // 未获取到锁,阻塞
  16. unsafe.park(false, 0L);
  17. }
  18. //获取到锁,出队列
  19. head = node;
  20. // 清空当前节点的内容,协助GC
  21. node.data = null;
  22. // 将上一个节点从链表中剔除,协助GC
  23. node.pre = null;
  24. pre.next = null;
  25. }

3.5 解锁操作

  1. /**
  2. * 释放锁
  3. *
  4. * @return
  5. */
  6. public final boolean release() {
  7. // 把state更新成0,这里不需要原子更新,因为同时只有一个线程访问到这里
  8. state = 0;
  9. // 下一个待唤醒的节点
  10. Node next = head.next;
  11. // 下一个节点不为空,就唤醒它
  12. if (next != null) {
  13. unsafe.unpark(next.data);
  14. return true;
  15. }
  16. return false;
  17. }

3.6 测试

  1. public class Main {
  2. private static int count = 0;
  3. public static void main(String[] args) throws InterruptedException {
  4. MyLock lock = new MyLock();
  5. //开1000个线程
  6. CountDownLatch countDownLatch = new CountDownLatch(1000);
  7. IntStream.range(0, 1000).forEach(i -> new Thread(() -> {
  8. lock.lock();
  9. try {
  10. //每个线程计算1万
  11. IntStream.range(0, 10000).forEach(j -> {
  12. count++;
  13. });
  14. } finally {
  15. lock.unlock();
  16. }
  17. countDownLatch.countDown();
  18. }, "Thread-" + i).start());
  19. countDownLatch.await();
  20. System.out.println(count);
  21. }
  22. }
  23. //输出结果为10000000,则我们定义的锁是可用的

完整代码

  1. /**
  2. * @author hdj
  3. * @version 1.0
  4. * @date 11/28/19 4:41 PM
  5. * @description: 线程队列--采用双向链表
  6. */
  7. public class MyAQS {
  8. /**
  9. * 锁状态
  10. */
  11. private volatile int state;
  12. /**
  13. * 头指针
  14. */
  15. private volatile Node head;
  16. /**
  17. * 尾指针
  18. */
  19. private volatile Node tail;
  20. private static Unsafe unsafe;
  21. /**
  22. * 变量state在内存中的偏移量
  23. */
  24. private static final long stateOffset;
  25. /**
  26. * 变量tail在内存中的偏移量
  27. */
  28. private static final long tailOffset;
  29. static {
  30. try {
  31. Field f = Unsafe.class.getDeclaredField("theUnsafe");
  32. f.setAccessible(true);
  33. unsafe = (Unsafe) f.get(null);
  34. stateOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
  35. tailOffset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));
  36. } catch (Throwable ex) {
  37. throw new Error(ex);
  38. }
  39. }
  40. public MyAQS() {
  41. this.head = this.tail = new Node();
  42. }
  43. /**
  44. * CAS更新锁的状态
  45. *
  46. * @param expect
  47. * @param update
  48. * @return
  49. */
  50. private boolean compareAndSetState(int expect, int update) {
  51. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  52. }
  53. /**
  54. * 用于如队列时,保证线程安全
  55. *
  56. * @param expect
  57. * @param update
  58. * @return
  59. */
  60. private final boolean compareAndSetTail(Node expect, Node update) {
  61. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  62. }
  63. /**
  64. * 定义队列节点
  65. */
  66. private class Node {
  67. //数据域
  68. Thread data;
  69. //前驱
  70. Node pre;
  71. //后继
  72. Node next;
  73. public Node() {
  74. }
  75. public Node(Thread thread, Node prev) {
  76. this.data = thread;
  77. this.pre = prev;
  78. }
  79. }
  80. /**
  81. * 入队
  82. *
  83. * @return
  84. */
  85. private Node enqueue() {
  86. //这里的循环采用for(;;)比采用while(true)快,不用解析判断变量
  87. for (; ; ) {
  88. // 获取尾节点
  89. Node t = tail;
  90. // 构造新节点
  91. Node node = new Node(Thread.currentThread(), t);
  92. // 不断尝试原子更新尾节点
  93. if (compareAndSetTail(t, node)) {
  94. // 更新尾节点成功了,让原尾节点的next指针指向当前节点
  95. t.next = node;
  96. return node;
  97. }
  98. }
  99. }
  100. /**
  101. * 获取锁
  102. */
  103. public final void acquire() {
  104. // 尝试更新state字段,更新成功说明占有了锁
  105. if (compareAndSetState(0, 1)) {
  106. return;
  107. }
  108. //没有获取到锁,则进入队列
  109. Node node = enqueue();
  110. Node pre = node.pre;
  111. //阻塞等待再次获取锁
  112. // 再次尝试获取锁,需要检测上一个节点是不是head,按入队顺序加锁
  113. while (node.pre != head || !compareAndSetState(0, 1)) {
  114. // 未获取到锁,阻塞
  115. unsafe.park(false, 0L);
  116. }
  117. //获取到锁,出队列
  118. head = node;
  119. // 清空当前节点的内容,协助GC
  120. node.data = null;
  121. // 将上一个节点从链表中剔除,协助GC
  122. node.pre = null;
  123. pre.next = null;
  124. }
  125. /**
  126. * 释放锁
  127. *
  128. * @return
  129. */
  130. public final boolean release() {
  131. // 把state更新成0,这里不需要原子更新,因为同时只有一个线程访问到这里
  132. state = 0;
  133. // 下一个待唤醒的节点
  134. Node next = head.next;
  135. // 下一个节点不为空,就唤醒它
  136. if (next != null) {
  137. unsafe.unpark(next.data);
  138. return true;
  139. }
  140. return false;
  141. }
  142. }
  143. public class MyLock {
  144. private MyAQS aqs = new MyAQS();
  145. /**
  146. * 锁
  147. */
  148. public void lock() {
  149. aqs.acquire();
  150. }
  151. /**
  152. * 解锁
  153. */
  154. public void unlock() {
  155. aqs.release();
  156. }
  157. }

参考