ReentrantLock(重入锁)

基本使用

  1. Lock lock = new ReentrantLock();
  2. lock.lock();
  3. lock.unlock();

原理

  1. Sync extends AbstractQueuedSynchronizer{
  2. }
  3. class FairSync(公平锁) extends Sync{
  4. }
  5. NonfairSync(非公平锁) extends Sync{
  6. }

AbstractQueuedSynchronizer提供了两种锁
1.独占锁
2.共享锁
state = 0 表示当前资源空闲
state>0 表示当前资源抢占到锁还没释放

Dingtalk_20211027154318.jpg
1.线程A 判断 state 状态为 0 获得锁 并且将 state 设置成了1 并且将自己设置为获得锁的线程
2.线程B 在去判断state状态为 1 获取不到锁 然后会构造一个阻塞队列 双向链表结构 从尾部插入 如果没有头节点则构建一个头节点 然后将当前节点的pred 指向头节点 将头节点的 next 指向当前节点 并进行自旋 尝试去获取锁 如果通过一次自旋获取不到锁 则进行阻塞 线程C
3.线程C也去判断state 状态是1 获取不到锁 然后将自己从尾部插入阻塞队列 将C节点的前一个pred 指向B节点 B节点的next指向C节点 并自旋
4.线程A释放锁 并将state-1 当state 成0 将 当前获取锁的线程设置为null 然后会去唤醒处于阻塞队列中头节点的下一个节点 此时如果是公平锁的话 B节点会获取到锁 但是如果是非公平锁的情况下 如果此时有线程D抢占到锁 则线程B还是依然会在阻塞队列中等待

加锁

  1. public void lock() {
  2. sync.lock();
  3. }
  1. final void lock() {
  2. //cas 原子操作修改state 的值 如果能修改成功则把0变成1 然后记录当前线程id
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. //抢占锁逻辑
  7. acquire(1);
  8. }
  1. public final void acquire(int arg) {
  2. //尝试获取独占锁
  3. if (!tryAcquire(arg) &&
  4. //如果失败则假如aqs 队列中
  5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt();
  7. }

非公平
seata +1

  1. final boolean nonfairTryAcquire(int acquires) {
  2. //获取当前线程
  3. final Thread current = Thread.currentThread();、
  4. //拿到 State 值
  5. int c = getState();
  6. //如果是0 表示可以去获得锁
  7. if (c == 0) {
  8. //cas 原子操作修改state 的值 如果能修改成功则把0变成1 然后记录当前线程id
  9. if (compareAndSetState(0, acquires)) {
  10. setExclusiveOwnerThread(current);
  11. return true;
  12. }
  13. }
  14. //如果当前线程就是获得锁的线程
  15. else if (current == getExclusiveOwnerThread()) {
  16. //增加重入次数
  17. int nextc = c + acquires;
  18. if (nextc < 0) // overflow
  19. throw new Error("Maximum lock count exceeded");
  20. setState(nextc);
  21. return true;
  22. }
  23. return false;
  24. }

构建链表

  1. private Node addWaiter(Node mode) {
  2. //构建一个node
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. // tail = 尾节点 默认是null
  6. Node pred = tail;
  7. if (pred != null) {
  8. //如果尾节点不等于空 把当前节点当成尾节点 然后把prev指针指向上一个节点 把新进来的节点改成尾节点
  9. node.prev = pred;
  10. if (compareAndSetTail(pred, node)) {
  11. //把上一个节点的next 指针指向刚进来的节点
  12. pred.next = node;
  13. return node;
  14. }
  15. }
  16. enq(node);
  17. return node;
  18. }
  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. //如果尾节点 = = null 用cas 构建一个节点
  6. if (compareAndSetHead(new Node()))
  7. //把头节点赋值给尾节点
  8. tail = head;
  9. } else {
  10. //如果尾节点不等于空 把当前节点当成尾节点 然后把prev指针指向上一个节点 把新进来的节点改成尾节点
  11. node.prev = t;
  12. if (compareAndSetTail(t, node)) {
  13. //把上一个节点的next 指针指向刚进来的节点
  14. t.next = node;
  15. return t;
  16. }
  17. }
  18. }
  19. }

加入阻塞队列

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. //获取当前节点的前一个节点
  7. final Node p = node.predecessor();
  8. //如果当前节点的前一个结点是头节点 则说明有资格去争夺锁
  9. if (p == head && tryAcquire(arg)) {
  10. //把当前节点设置成头节点
  11. setHead(node);
  12. p.next = null; // help GC
  13. failed = false;
  14. return interrupted;
  15. }
  16. if (shouldParkAfterFailedAcquire(p, node) &&
  17. parkAndCheckInterrupt())
  18. interrupted = true;
  19. }
  20. } finally {
  21. if (failed)
  22. cancelAcquire(node);
  23. }
  24. }
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. //前置节点的 waitStatus
  3. int ws = pred.waitStatus;
  4. //等待其他前直接点的线程被释放
  5. if (ws == Node.SIGNAL)
  6. /*
  7. * This node has already set status asking a release
  8. * to signal it, so it can safely park.
  9. */
  10. //说明可以挂起
  11. return true;
  12. // 大于0 说明prev节点取消了排队 直接移除这个节点
  13. if (ws > 0) {
  14. /*
  15. * Predecessor was cancelled. Skip over predecessors and
  16. * indicate retry.
  17. */
  18. do {
  19. node.prev = pred = pred.prev;
  20. } while (pred.waitStatus > 0);
  21. pred.next = node;
  22. } else {
  23. /*
  24. * waitStatus must be 0 or PROPAGATE. Indicate that we
  25. * need a signal, but don't park yet. Caller will need to
  26. * retry to make sure it cannot acquire before parking.
  27. */
  28. 设置prev 节点状态为-1
  29. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  30. }
  31. return false;
  32. }

阻塞等待

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }
  1. private void cancelAcquire(Node node) {
  2. // Ignore if node doesn't exist
  3. if (node == null)
  4. return;
  5. node.thread = null;
  6. // Skip cancelled predecessors
  7. //当前节点的前一个节点
  8. Node pred = node.prev;
  9. //前一个节点的 waitStatus> 0 (结束状态)
  10. while (pred.waitStatus > 0)
  11. node.prev = pred = pred.prev;
  12. // predNext is the apparent node to unsplice. CASes below will
  13. // fail if not, in which case, we lost race vs another cancel
  14. // or signal, so no further action is necessary.
  15. Node predNext = pred.next;
  16. // Can use unconditional write instead of CAS here.
  17. // After this atomic step, other Nodes can skip past us.
  18. // Before, we are free of interference from other threads.
  19. //将 node.waitStatus 设置成 CANCELLED 状态
  20. node.waitStatus = Node.CANCELLED;
  21. //4. 如果node是tail,更新tail为pred,并使pred.next指向null
  22. // If we are the tail, remove ourselves.
  23. if (node == tail && compareAndSetTail(node, pred)) {
  24. compareAndSetNext(pred, predNext, null);
  25. } else {
  26. // If successor needs signal, try to set pred's next-link
  27. // so it will get one. Otherwise wake it up to propagate.
  28. int ws;
  29. //5. 如果node既不是tail,又不是head的后继节点
  30. //则将node的前继节点的waitStatus置为SIGNAL
  31. //并使node的前继节点指向node的后继节点
  32. if (pred != head &&
  33. ((ws = pred.waitStatus) == Node.SIGNAL ||
  34. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  35. pred.thread != null) {
  36. Node next = node.next;
  37. if (next != null && next.waitStatus <= 0)
  38. compareAndSetNext(pred, predNext, next);
  39. } else {
  40. //6. 如果node是head的后继节点,则直接唤醒node的后继节点
  41. unparkSuccessor(node);
  42. }
  43. node.next = node; // help GC
  44. }
  45. }

Node的五种状态

CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。

释放锁

  1. public void unlock() {
  2. sync.release(1);
  3. }
  1. public final boolean release(int arg) {
  2. //释放锁成功
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. //如果头节点不为空 并且状态不为0
  6. if (h != null && h.waitStatus != 0)
  7. //唤醒
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }
  1. protected final boolean tryRelease(int releases) {
  2. //state -1
  3. int c = getState() - releases;
  4. if (Thread.currentThread() != getExclusiveOwnerThread())
  5. throw new IllegalMonitorStateException();
  6. boolean free = false;
  7. if (c == 0) {
  8. //如果c =0 表示当前是无锁状态 把线程iq清空
  9. free = true;
  10. setExclusiveOwnerThread(null);
  11. }
  12. //重新设置 state
  13. setState(c);
  14. return free;
  15. }
  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0)
  9. //设置head节点的状态为0
  10. compareAndSetWaitStatus(node, ws, 0);
  11. /*
  12. * Thread to unpark is held in successor, which is normally
  13. * just the next node. But if cancelled or apparently null,
  14. * traverse backwards from tail to find the actual
  15. * non-cancelled successor.
  16. */
  17. //拿到head节点的下一个节点
  18. Node s = node.next;
  19. //如果下一个节点为null 或者 status>0则表示是 CANCELLED 状态
  20. //听过尾部节点开始扫描 找到距离 head最近的一个 waitStatus<=0的节点
  21. if (s == null || s.waitStatus > 0) {
  22. s = null;
  23. for (Node t = tail; t != null && t != node; t = t.prev)
  24. if (t.waitStatus <= 0)
  25. s = t;
  26. }
  27. //如果next 节点不等于空直接唤醒这个线程
  28. if (s != null)
  29. LockSupport.unpark(s.thread);
  30. }

ReentrantReadWriteLock(读写锁)

读写分离减少锁的竞争

  1. ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
  2. //获取读锁
  3. reentrantReadWriteLock.readLock();
  4. //获取写锁
  5. reentrantReadWriteLock.writeLock();

加锁

  1. protected final boolean tryAcquire(int acquires) {
  2. /*
  3. * Walkthrough:
  4. * 1. If read count nonzero or write count nonzero
  5. * and owner is a different thread, fail.
  6. * 2. If count would saturate, fail. (This can only
  7. * happen if count is already nonzero.)
  8. * 3. Otherwise, this thread is eligible for lock if
  9. * it is either a reentrant acquire or
  10. * queue policy allows it. If so, update state
  11. * and set owner.
  12. */
  13. //拿到当前线程
  14. Thread current = Thread.currentThread();
  15. //拿到 State
  16. int c = getState();
  17. //从 State中查找当前获得写锁的线程数量因为写锁是互斥的如果获得锁个代表了重入
  18. int w = exclusiveCount(c);
  19. if (c != 0) {
  20. // (Note: if c != 0 and w == 0 then shared count != 0)
  21. if (w == 0 || current != getExclusiveOwnerThread())
  22. return false;
  23. //如果重入 判断重入次数是否大于65535次
  24. if (w + exclusiveCount(acquires) > MAX_COUNT)
  25. throw new Error("Maximum lock count exceeded");
  26. // Reentrant acquire
  27. setState(c + acquires);
  28. return true;
  29. }
  30. //c==0说明读锁和写锁都没有被线程获取
  31. if (writerShouldBlock() ||
  32. !compareAndSetState(c, c + acquires))
  33. return false;
  34. setExclusiveOwnerThread(current);
  35. return true;
  36. }

state采用高低位分开存储读写和写锁 高16位存储读锁状态 低16位存储写锁状态

释放锁

  1. protected final boolean tryRelease(int releases) {
  2. //是否等于当前线程
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. int nextc = getState() - releases;
  6. //递减锁次数
  7. boolean free = exclusiveCount(nextc) == 0;
  8. if (free)
  9. setExclusiveOwnerThread(null);
  10. //如果是0 释放成功
  11. setState(nextc);
  12. return free;
  13. }

共享锁

  1. protected final int tryAcquireShared(int unused) {
  2. /*
  3. * Walkthrough:
  4. * 1. If write lock held by another thread, fail.
  5. * 2. Otherwise, this thread is eligible for
  6. * lock wrt state, so ask if it should block
  7. * because of queue policy. If not, try
  8. * to grant by CASing state and updating count.
  9. * Note that step does not check for reentrant
  10. * acquires, which is postponed to full version
  11. * to avoid having to check hold count in
  12. * the more typical non-reentrant case.
  13. * 3. If step 2 fails either because thread
  14. * apparently not eligible or CAS fails or count
  15. * saturated, chain to version with full retry loop.
  16. */
  17. //获取当前线程
  18. Thread current = Thread.currentThread();
  19. //state
  20. int c = getState();
  21. //如果是写锁或独占锁的持有者不是当前线程 直接阻塞
  22. if (exclusiveCount(c) != 0 &&
  23. getExclusiveOwnerThread() != current)
  24. return -1;
  25. //获取读锁的数量
  26. int r = sharedCount(c);
  27. //当前读锁不需要等待 并且 读锁数量不大于65535
  28. if (!readerShouldBlock() &&
  29. r < MAX_COUNT &&
  30. 高低位增加读锁的数量
  31. compareAndSetState(c, c + SHARED_UNIT)) {
  32. //如果是第一次枷锁
  33. if (r == 0) {
  34. //第一次获得锁的线程就是当前线程
  35. firstReader = current;
  36. //重入次数是1
  37. firstReaderHoldCount = 1;
  38. } else if (firstReader == current) {
  39. firstReaderHoldCount++;
  40. } else {
  41. HoldCounter rh = cachedHoldCounter;
  42. if (rh == null || rh.tid != getThreadId(current))
  43. cachedHoldCounter = rh = readHolds.get();
  44. else if (rh.count == 0)
  45. readHolds.set(rh);
  46. rh.count++;
  47. }
  48. return 1;
  49. }
  50. //如果cas执行失败 尝试获取共享锁
  51. return fullTryAcquireShared(current);
  52. }

锁降级

从写锁降级为读锁 如果A线程获得了写锁 在写锁没有释放的情况下在去获得读锁是允许的

特性

读/读不互斥
读/写互斥
写/写互斥

StampedLock

  1. StampedLock reentrantReadWriteLock = new StampedLock();
  2. //获取写锁
  3. reentrantReadWriteLock.writeLock();
  4. //获得悲观锁 不阻塞写操作
  5. reentrantReadWriteLock.tryOptimisticRead();
  6. //获得读锁
  7. reentrantReadWriteLock.readLock();

采用一个long 类型的state 变量来表示同步状态 long类型的长度是64位 这里将state拆成3个部分 低7位表述读锁的状态 当一个线程加了读锁时 就在低7位加1 也就是说 最多同时支持127个线程获得读锁 如果线程数超过127则使用readerOverflow进行记录 第八位用来存储写锁的状态 第9-64位存储stamp 记录写锁的状态变化每触发一次写锁 stamped 就会加1
自旋锁根据cpu核心数决定自旋的次数