一、简介

Java 中实现同步除了有关键字synchronized 外,还有JDK5发布并发包下的锁。并发包下的锁其实也是采用了CAS算法和队列实现的,下面让我们了解一下具体的实现。

二、Lock 接口

2.1 介绍

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。

2.2 Lock 接口的定义

  1. public interface Lock {
  2. /**
  3. * 获取锁,调用该方法当前线程会获取锁
  4. */
  5. void lock();
  6. /**
  7. *可中断获取锁,即在获取锁的过程中,可以中断当前线程
  8. */
  9. void lockInterruptibly() throws InterruptedException;
  10. /**
  11. * 尝试非阻塞的获取锁,调用该方法后立即返回,返回值为true 则获取到锁,false则没有
  12. */
  13. boolean tryLock();
  14. /**
  15. * 超时尝试获得锁:
  16. * 有以下三种情况
  17. * 1. 当前线程在给超时时间内获取到锁
  18. * 2. 当前线程在超时时间内被中断
  19. * 3. 超时时间结束,返回false
  20. */
  21. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  22. /**
  23. * 释放锁
  24. */
  25. void unlock();
  26. /**
  27. *获取等待通知组件,该组件与当前锁绑定,线程只有获取到了锁后,才能调用Conditon 中的 await(), signal()等等待通知方法
  28. * 与Object 对象中的wait,notify 类似
  29. */
  30. Condition newCondition();
  31. }

2.3 Lock 接口比关键字 synchronized 的优势

  • 可以尝试非阻塞的获取锁
  • 能超时获取锁
  • 能被中断地获取锁

三、队列同步器(AQS, AbstractQueuedSynchronizer)

3.1 介绍

  • 队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
  • 同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。

3.2 基本结构图示

image.png

3.3 AQS 供子类重写的方法

  1. // 互斥模式下使用:尝试获取锁
  2. protected boolean tryAcquire(int arg)
  3. // 互斥模式下使用:尝试释放锁
  4. protected boolean tryRelease(int arg)
  5. //共享模式下使用:尝试获取锁
  6. protected int tryAcquireShared(int arg)
  7. // 共享模式下使用:尝试释放锁
  8. protected boolean tryReleaseShared(int arg)
  9. // 如果当前线程独占着锁,返回true
  10. protected boolean isHeldExclusively()

3.4 AQS 中的模板方法

  1. //独占式获取锁
  2. public final void acquire(int arg) {
  3. //1.tryAcquire(arg)尝试获取锁失败
  4. //2. 则进入队列中acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  5. //3. 没有能进入队列,则打断线程
  6. if (!tryAcquire(arg) &&
  7. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  8. selfInterrupt();
  9. }
  10. //可响应中断地获取锁
  11. public final void acquireInterruptibly(int arg)
  12. //在acquireInterruptibly方法的基础上加上超时限制,如果在超时时间内没有获取锁,则返回false
  13. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  14. //共享式获取锁
  15. public final void acquireShared(int arg)
  16. //可打断,共享式获取锁
  17. public final void acquireSharedInterruptibly(int arg)
  18. //可打断,超时机制,共享式获取锁
  19. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  20. //独占式释放锁
  21. public final boolean release(int arg)
  22. //共享式释放锁
  23. public final boolean releaseShared(int arg)

3.5 AQS 内部节点类

  1. static final class Node {
  2. /** 标记该节点是共享模式 waiting */
  3. static final Node SHARED = new Node();
  4. /** 标识一个节点是互斥模式 waiting*/
  5. static final Node EXCLUSIVE = null;
  6. /** waitStatus 等待状态的值 1标识该线程已被取消*/
  7. static final int CANCELLED = 1;
  8. /** 标识后继节点需要唤醒*/
  9. static final int SIGNAL = -1;
  10. /** 标识线程等待在一个条件上 */
  11. static final int CONDITION = -2;
  12. /**
  13. * 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)
  14. */
  15. static final int PROPAGATE = -3;
  16. /**
  17. * 当前节点保存的线程对应的等待状态
  18. */
  19. volatile int waitStatus;
  20. /**
  21. * aqs同步队列节点前驱
  22. */
  23. volatile Node prev;
  24. /**
  25. * aqs同步队列节点后继
  26. */
  27. volatile Node next;
  28. /**
  29. * 当前节点保存的线程;
  30. */
  31. volatile Thread thread;
  32. /**
  33. * 下一个等待在条件上的节点(Condition等待队列指针)
  34. */
  35. Node nextWaiter;
  36. /**
  37. * 判断下一个等待在条件上的节点是否是共享模式
  38. */
  39. final boolean isShared() {
  40. return nextWaiter == SHARED;
  41. }
  42. /**
  43. * 获取前一个节点
  44. */
  45. final Node predecessor() throws NullPointerException {
  46. Node p = prev;
  47. if (p == null)
  48. throw new NullPointerException();
  49. else
  50. return p;
  51. }
  52. Node() { // Used to establish initial head or SHARED marker
  53. }
  54. Node(Thread thread, Node mode) { // Used by addWaiter
  55. this.nextWaiter = mode;
  56. this.thread = thread;
  57. }
  58. Node(Thread thread, int waitStatus) { // Used by Condition
  59. this.waitStatus = waitStatus;
  60. this.thread = thread;
  61. }
  62. }

3.6 AQS 操作图示

  • 添加节点到AQS 的过程(没有获取到锁,加入队列)

同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

image.png

  • 成功获取锁,首节点设置的过程(公平锁)

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可
image.png

  • 独占式获取锁的流程图

image.png

  • 独占式超时获取锁

image.png

四、LockSupport工具

4.1 方法(以下方法底层都采用sun.misc.Unsafe 类实现)

  1. #阻塞当前线程
  2. public static void park()
  3. #超时阻塞当前线程
  4. public static void parkNanos(Object blocker, long nanos)
  5. #阻塞当前线程,直到deadline
  6. public static void parkUntil(Object blocker, long deadline)
  7. #唤醒处于阻塞的线程
  8. public static void unpark(Thread thread)

4.2 简单使用

  1. Thread thread = new Thread(() -> {
  2. long a = System.currentTimeMillis();
  3. //阻塞线程
  4. LockSupport.park();
  5. System.out.println("线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");
  6. });
  7. thread.start();
  8. Thread.sleep(1000);
  9. //唤醒线程
  10. //也可以使用打断线程来唤醒
  11. //thread.interrupt();
  12. LockSupport.unpark(thread);
  13. //输出:
  14. 线程完成! 时间: 1

4.3 比较LockSupport.park(),Object.wait() 等方法

  1. public class TestLockSupport {
  2. private static Object lock = new Object();
  3. public static void main(String[] args) throws InterruptedException {
  4. testObjectWait();
  5. testLockSupport();
  6. }
  7. public static void testLockSupport() throws InterruptedException {
  8. Thread thread = new Thread(() -> {
  9. long a = System.currentTimeMillis();
  10. //阻塞线程
  11. //1. 可以在任意地方调用
  12. //2. 不需要捕获异常
  13. //3.不会释放锁资源,它只是单纯阻塞线程
  14. LockSupport.park();
  15. System.out.println("testLockSupport 线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");
  16. });
  17. thread.start();
  18. Thread.sleep(1000);
  19. //主线程唤醒thread线程
  20. //1. 可以唤醒指定线程
  21. //2. 可以在任意地方调用
  22. LockSupport.unpark(thread);
  23. }
  24. public static void testObjectWait() throws InterruptedException {
  25. Thread thread = new Thread(() -> {
  26. long a = System.currentTimeMillis();
  27. //阻塞线程
  28. synchronized (lock) {
  29. try {
  30. //方法Object.wait调用前,需要先获取对象监视器,即在同步方法内或同步代码块内调用
  31. //Object.wait会抛出InterruptedException异常
  32. lock.wait();
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. System.out.println("testObjectWait 线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");
  38. });
  39. thread.start();
  40. Thread.sleep(1000);
  41. //主线程唤醒thread线程
  42. //1. 随机唤醒被阻塞的一个线程
  43. //2. 需要先获取对象监视器,即在同步方法内或同步代码块内调用
  44. //3. 不能在Object.wait()方法前调用,否则抛出IllegalMonitorStateException异常
  45. synchronized (lock) {
  46. lock.notify();
  47. }
  48. }
  49. }

总结:

  • Object.wait()方法,需要先获取对象监视器,即在同步方法内或同步代码块内调用
  • Object.wait() 会抛出InterruptedException异常
  • LockSupport.park() 可以在任意地方调用
  • LockSupport.park() 不需要捕获异常
  • LockSupport.park() 不会释放锁资源,它只是单纯阻塞线程
  • Object.notify() 随机唤醒被阻塞的一个线程
  • Object.notify() 需要先获取对象监视器,即在同步方法内或同步代码块内调用
  • Object.notify() 不能在Object.wait()方法前调用,否则抛出IllegalMonitorStateException异常
  • LockSupport.unpark(Thread thread) 可以唤醒指定线程
  • LockSupport.unpark(Thread thread) 可以在任意地方调用

五、Condition接口

5.1 接口定义

  1. public interface Condition {
  2. /**
  3. * 阻塞,直到被唤醒或被中断
  4. */
  5. void await() throws InterruptedException;
  6. /**
  7. *阻塞,直到被唤醒,不能被中断
  8. */
  9. void awaitUninterruptibly();
  10. /**
  11. * 超时阻塞
  12. */
  13. long awaitNanos(long nanosTimeout) throws InterruptedException;
  14. /**
  15. * 超时阻塞
  16. */
  17. boolean await(long time, TimeUnit unit) throws InterruptedException;
  18. /**
  19. * 阻塞
  20. */
  21. boolean awaitUntil(Date deadline) throws InterruptedException;
  22. /**
  23. *唤醒
  24. */
  25. void signal();
  26. /**
  27. * 唤醒全部
  28. */
  29. void signalAll();
  30. }

5.2 Condition 实现的等待队列的基本结构

等待队列中的节点实现,也是AQS的节点结构
image.png

5.3 同步队列与等待队列图示

Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

image.png

5.4 利用Condition的等待通知过程图示

image.png

5.5 唤醒图示

在等待队列中的唤醒节点,需要考虑并发问题,需要采用了cas 重新入同步队列,保证是队列的尾节点
image.png

六、实例

在动手实现的锁中,我们自己实现的同步队列。 在这里我们使用AQS来实现一个公平、可重入锁。

  1. public class MyLock {
  2. private Sync sync = new Sync();
  3. private class Sync extends AbstractQueuedSynchronizer {
  4. public Sync() {
  5. }
  6. @Override
  7. protected boolean tryAcquire(int state) {
  8. final Thread current = Thread.currentThread();
  9. //获取当前状态
  10. int c = getState();
  11. //判断是否持有锁
  12. if (c == 0) {
  13. //这里实现的是公平锁
  14. //判断当前线程是否是同步队列头指针指向的线程节点
  15. //cas修改状态
  16. if (!hasQueuedPredecessors() && compareAndSetState(0, state)) {
  17. //获取到锁,标识当前线程持有锁
  18. setExclusiveOwnerThread(current);
  19. return true;
  20. }
  21. } else if (current == getExclusiveOwnerThread()) { //c>0 已持有锁,处理可重入锁操作
  22. //修改状态
  23. int nextc = c + state;
  24. if (nextc < 0)
  25. //防止超出int 值范围
  26. throw new Error("超出锁可重入次数");
  27. //设置状态
  28. setState(nextc);
  29. return true;
  30. }
  31. return false;
  32. }
  33. @Override
  34. protected boolean tryRelease(int state) {
  35. //修改状态
  36. int c = getState() - state;
  37. //判断释放锁的线程是否是持有锁
  38. if (Thread.currentThread() != getExclusiveOwnerThread())
  39. throw new IllegalMonitorStateException();
  40. boolean free = false;
  41. if (c == 0) {
  42. //如果状态值为0, 则线程释放锁
  43. free = true;
  44. setExclusiveOwnerThread(null);
  45. }
  46. setState(c);
  47. return free;
  48. }
  49. }
  50. public void lock() {
  51. sync.acquire(1);
  52. }
  53. public void unlock() {
  54. sync.release(1);
  55. }
  56. }
  • 测试
  1. public class MyLockTest {
  2. static MyLock lock=new MyLock();
  3. public static void test(){
  4. lock.lock();
  5. try {
  6. System.out.println(Thread.currentThread().getName()+ " 进入test "+ System.currentTimeMillis());
  7. test2();
  8. Thread.sleep(1000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }finally {
  12. lock.unlock();
  13. System.out.println(Thread.currentThread().getName() +" 释放锁"+ System.currentTimeMillis());
  14. }
  15. }
  16. public static void test2(){
  17. lock.lock();
  18. try {
  19. System.out.println("进入test2"+ System.currentTimeMillis());
  20. Thread.sleep(1000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }finally {
  24. lock.unlock();
  25. System.out.println("test2释放锁"+ System.currentTimeMillis());
  26. }
  27. }
  28. public static void main(String[] args) {
  29. //测试是否可重入
  30. //test锁没有释放,也可以进入方法test2
  31. test();
  32. //公平锁测试
  33. //按顺序输出
  34. // new Thread(()->{test();},"Thread-B").start();
  35. // new Thread(()->{test();},"Thread-A").start();
  36. // new Thread(()->{test();},"Thread-C").start();
  37. }
  38. }

在并发包下,实现锁的基础原理知识了解完毕。以上自定义锁的实现,就是摘自ReentrantLock 锁的公平锁实现,接下来该了解一下,并发包下锁的实现细节,如公平锁和非公平锁实现区别、共享锁和非共享锁实现区别、如何加入超时机制、如何加入中断机制等等。

参考