1. Lock

1.1 存在的理由

  1. // 支持中断的 API
  2. void lockInterruptibly() throws InterruptedException;
  3. // 支持超时的 API
  4. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  5. // 支持非阻塞获取锁的 API
  6. boolean tryLock();

以上是Lock接口中的三个方法,提供了synchronized所不具备的能力

  1. 响应中断 使用synchronized时,假设一个线程持有锁A, 又需要获取锁B,一旦锁B被占用,那这个线程就会进入堵塞状态,一旦发生死锁,就没有任何机会来唤醒线程了,但是如果堵塞状态的线程能够响应中断信号被唤醒,就有可能可以释放锁A,避免死锁的出现。
  2. 支持超时 如果线程在指定时间内无法获取锁,不是进入堵塞状态,而是返回一个异常
  3. 非堵塞地获取锁 尝试获取锁,不进入堵塞状态,而是返回结果

java线程状态

  1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
  2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。 线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
  3. 阻塞(BLOCKED):表示线程阻塞于锁。
  4. 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
  5. 超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
  6. 终止(TERMINATED):表示该线程已经执行完毕。

摘自 https://blog.csdn.net/pange1991/article/details/53860651

1.2 实现原理

Java中 多线程的可见性是通过 Happens-Before 规则保证的,synchronized有一条规则synchronized解锁Happens-Before于后续对这个锁的加锁。而ReentrantLock内部也持有一个volatile的成员变量state,加锁和解锁时都会读写state值,从而来保证可见性。

Happens-Before规则 1.程序的顺序性规则:程序前面对某个变量的修改对后续操作是可见的 2.volatile变量规则:对volatile变量的写操作对于后续此变量的读操作是可见的 3.传递性:如果A Happens-Before B, 且B Happens-Before C, 那么 A Happens-Before C 4.锁的规则:对一个锁的解锁Happens-Before后续对于这个锁的加锁 5.线程start()规则:主线程A启动子线程B后,子线程B能够看到主线程启动子线程B前的操作 6.线程join()规则:主线程A等待子线程B完成(主线程A通过调用子线程B的join()方法实现),当子线程B完成后(主线程A中join()方法返回),主线程能够看到子线程的操作。

示例

  1. class SampleLock {
  2. volatile int state;
  3. // 加锁
  4. lock() {
  5. // 省略许多代码
  6. state = 1;
  7. }
  8. // 解锁
  9. unlock() {
  10. // 省略许多代码
  11. state = 0;
  12. }
  13. }

1.3 公平锁与非公平锁

ReentrantLock有两个构造函数,一个是无参构造函数,一个是传入fair参数的构造函数,fair参数代表的是锁的公平策略,如果传入true就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }
  4. public ReentrantLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. }

image.png

图来自Java并发编程实战 https://time.geekbang.org/column/article/87779

公平和非公平锁的队列都基于锁内部维护的一个双向链表,表结点Node的值就是每一个请求当前锁的线程。在公平锁中,如果有另一个线程持有锁或者有其他线程在等待队列中等待这个所,那么新发出的请求的线程将被放入到队列中。而非公平锁上,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中(此时和公平锁是一样的)。所以,它们的差别在于非公平锁会有更多的机会去抢占锁。

1.3.1 公平锁

公平锁的实现原理在于每次有线程来抢占锁的时候,当前线程就会执行如下步骤

  1. //公平锁策略
  2. static final class FairSync extends Sync {
  3. private static final long serialVersionUID = -3000897897090466540L;
  4. final void lock() {
  5. acquire(1);
  6. }
  7. protected final boolean tryAcquire(int acquires) {
  8. final Thread current = Thread.currentThread();
  9. int c = getState();
  10. if (c == 0) {
  11. if (!hasQueuedPredecessors() &&
  12. compareAndSetState(0, acquires)) {
  13. setExclusiveOwnerThread(current);
  14. return true;
  15. }
  16. }
  17. else if (current == getExclusiveOwnerThread()) {
  18. int nextc = c + acquires;
  19. if (nextc < 0)
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. return false;
  25. }
  26. }
  27. //查看是否有等待队列节点
  28. public final boolean hasQueuedPredecessors() {
  29. // The correctness of this depends on head being initialized
  30. // before tail and on head.next being accurate if the current
  31. // thread is first in queue.
  32. Node t = tail; // Read fields in reverse initialization order
  33. Node h = head;
  34. Node s;
  35. return h != t &&
  36. ((s = h.next) == null || s.thread != Thread.currentThread());
  37. }

1.3.2 非公平锁

非公平锁在实现的时候多次强调随机抢占

  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = 7316153563782823691L;
  3. final void lock() {
  4. if (compareAndSetState(0, 1))
  5. setExclusiveOwnerThread(Thread.currentThread());
  6. else
  7. acquire(1);
  8. }
  9. protected final boolean tryAcquire(int acquires) {
  10. return nonfairTryAcquire(acquires);
  11. }
  12. }

1.3.3 示例

  1. public class FairLockTest {
  2. //false表示非公平
  3. private ReentrantLock lock = new ReentrantLock(false);
  4. //true表示公平锁
  5. //private ReentrantLock lock = new ReentrantLock(true);
  6. public void testFair(){
  7. try {
  8. lock.lock();
  9. System.out.println(Thread.currentThread().getName() +"获得了锁");
  10. }finally {
  11. lock.unlock();
  12. }
  13. }
  14. public static void main(String[] args) {
  15. FairLockTest fairLockTest = new FairLockTest();
  16. Runnable runnable = () -> {
  17. System.out.println(Thread.currentThread().getName()+"启动");
  18. fairLockTest.testFair();
  19. };
  20. Thread[] threadArray = new Thread[10];
  21. for (int i=0; i<10; i++) {
  22. threadArray[i] = new Thread(runnable);
  23. }
  24. for (int i=0; i<10; i++) {
  25. threadArray[i].start();
  26. }
  27. }
  28. }
  29. //false 结果
  30. Thread-0启动
  31. Thread-4启动
  32. Thread-5启动
  33. Thread-3启动
  34. Thread-7启动
  35. Thread-2启动
  36. Thread-1启动
  37. Thread-8启动
  38. Thread-6启动
  39. Thread-0获得了锁
  40. Thread-9启动
  41. Thread-4获得了锁
  42. Thread-5获得了锁
  43. Thread-3获得了锁
  44. Thread-7获得了锁
  45. Thread-2获得了锁
  46. Thread-1获得了锁
  47. Thread-8获得了锁
  48. Thread-6获得了锁
  49. Thread-9获得了锁
  50. //true 结果
  51. Thread-0启动
  52. Thread-4启动
  53. Thread-5启动
  54. Thread-6启动
  55. Thread-3启动
  56. Thread-7启动
  57. Thread-8启动
  58. Thread-2启动
  59. Thread-1启动
  60. Thread-9启动
  61. Thread-0获得了锁
  62. Thread-4获得了锁
  63. Thread-5获得了锁
  64. Thread-6获得了锁
  65. Thread-3获得了锁
  66. Thread-7获得了锁
  67. Thread-8获得了锁
  68. Thread-2获得了锁
  69. Thread-1获得了锁
  70. Thread-9获得了锁

1.4 用锁的最佳实践

  • 永远只在更新对象的成员变量时加锁
  • 永远只在访问可变的成员变量时加锁
  • 永远不在调用其他对象的方法时加锁

1.5 Lock和Condition 在Dubbo中的使用

TCP协议本身是异步的,以RPC为例,在TCP协议层面,发送完RPC请求后,线程是不会等待RPC的响应结果的。所以在Dubbo中就做了异步转同步的操作,主要操作就在 DefaultFuture 中, 源码在 https://github.com/apache/incubator-dubbo/blob/da69a9c8db15b6047cb80ed3d2251215784dcb0e/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
这里我简化下,提炼出关键代码

  1. private final Lock lock = new ReentrantLock();
  2. private final Condition done = lock.newCondition();
  3. public Object get(int timeout) throws RemotingException {
  4. if (timeout <= 0) {
  5. timeout = Constants.DEFAULT_TIMEOUT;
  6. }
  7. if (!isDone()) {
  8. long start = System.currentTimeMillis();
  9. lock.lock();
  10. try {
  11. while (!isDone()) {
  12. done.await(timeout, TimeUnit.MILLISECONDS);
  13. if (isDone() || System.currentTimeMillis() - start > timeout) {
  14. break;
  15. }
  16. }
  17. } catch (InterruptedException e) {
  18. throw new RuntimeException(e);
  19. } finally {
  20. lock.unlock();
  21. }
  22. if (!isDone()) {
  23. throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
  24. }
  25. }
  26. return returnFromResponse();
  27. }
  28. //RPC 结果是否已经返回
  29. public boolean isDone() {
  30. return response != null;
  31. }
  32. //RPC 结果返回
  33. private void doReceived(Response res) {
  34. lock.lock();
  35. try {
  36. response = res;
  37. done.signalAll();
  38. } finally {
  39. lock.unlock();
  40. }
  41. if (callback != null) {
  42. invokeCallback(callback);
  43. }
  44. }

调用线程通过使用get()方法等待RPC返回结果,这个方法里面,调用lock()获取锁,在finally里面调用unlock()释放锁,获取锁后,通过在循环中调用await()方法来实现等待。
在RPC结果返回时,调用doReceived(), 这个方法中,调用lock()获取锁,在finally里面调用unlock()释放锁,获取锁后调用signalAll()来通知所有持有这个锁的线程,结果已经返回。