java5之后,并发包提供了Lock接口来实现锁的功能,他提供了与synchronized类似的的功能,只是在使用时需要显示获取释放锁。
lock接口提供synchronized不具备的特性:

  • 尝试非阻塞获取锁
  • 能被中断获取锁
  • 超时获取锁

    接口

    1. public interface Lock {
    2. void lock();//获取锁
    3. void lockInterruptibly() throws InterruptedException;//会响应中断的获取锁
    4. boolean tryLock();//尝试获取锁
    5. boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //超时内获取锁。可被中断
    6. void unlock();//释放锁
    7. Condition newCondition();//获取等待通知组件
    8. }

    构造器

    1. public ReentrantLock() {
    2. sync = new NonfairSync(); //默认时非公平锁
    3. }
    4. public ReentrantLock(boolean fair) {
    5. sync = fair ? new FairSync() : new NonfairSync();
    6. }

    获取锁

    1. final void lock() {
    2. if (compareAndSetState(0, 1)) //抢占互斥资源
    3. setExclusiveOwnerThread(Thread.currentThread()); //抢到锁,保存当前线程
    4. else
    5. acquire(1); //没有抢到锁
    6. }

    compareAndSetState 是由AQS实现的:

    1. protected final boolean compareAndSetState(int expect, int update) { //乐观锁
    2. // See below for intrinsics setup to support this
    3. return unsafe.compareAndSwapInt(this, stateOffset, expect, update); //stateOffset 内存中的偏移量 与 expect(预期值比较) 相同就更新 为 update
    4. }

    这里使用的是调用unfafe类的CAS乐观锁,stateOffset 是锁状态的偏移量。state为0 时,没有线程获取锁。与预期值 expect比较。如果相等说明无锁状态,将state该为1.当前线程获取锁成功,返回true。否则不改变state的值。返回false
    如果获取锁失败,则线进入acquire()方法。

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) && //重入判断
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }

    该方法调了分四步:

    tryAcquire(arg)重入判断

    这里使用了策略模式,公平锁和非公平锁的实现不一致。默认是使用公平锁(详见构造器)

    NonfairSync非公平锁

    1. protected final boolean tryAcquire(int acquires) {
    2. return nonfairTryAcquire(acquires);
    3. }
    1. final boolean nonfairTryAcquire(int acquires) {
    2. final Thread current = Thread.currentThread();//当前线程
    3. int c = getState();//当锁的状态
    4. if (c == 0) {//等于0,说明锁已经释放,
    5. if (compareAndSetState(0, acquires)) {//cas抢占锁 。这是非公平锁的特性
    6. setExclusiveOwnerThread(current);//抢锁成功
    7. return true;
    8. }
    9. }
    10. else if (current == getExclusiveOwnerThread()) {//如果持有锁的线程就是当前线程,
    11. int nextc = c + acquires; //state+1 增加重入次数
    12. if (nextc < 0) // overflow 溢出
    13. throw new Error("Maximum lock count exceeded");
    14. setState(nextc); //设置重入次数
    15. return true;
    16. }
    17. return false;
    18. }

FairSync公平锁

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();//当前线程
  3. int c = getState(); //state
  4. if (c == 0) {//无锁
  5. if (!hasQueuedPredecessors() && //是否有等等队列
  6. compareAndSetState(0, acquires)) {//cas占有锁
  7. setExclusiveOwnerThread(current);
  8. return true;
  9. }
  10. }
  11. else if (current == getExclusiveOwnerThread()) {//判断重入
  12. int nextc = c + acquires;//重入次数+1
  13. if (nextc < 0)
  14. throw new Error("Maximum lock count exceeded");
  15. setState(nextc);
  16. return true;
  17. }
  18. return false;
  19. }

公平锁和非公平锁最大差异在于判断是否是重入前,发现持有锁的线程已经释放锁。非公平锁会cas去插队回去锁,公平锁者会判断有没有等待获取锁的队列,如果没有才去cas抢占锁。

addWaiter(Node.EXCLUSIVE)加入等待队列

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode); //将线程包装成一个node
  3. // Try the fast path of enq; backup to full enq on failure 试试enq的快速路径;失败时备份到完整的enq
  4. Node pred = tail; //tail尾部节点
  5. if (pred != null) { //enq的快速路径
  6. node.prev = pred;//将当前节点的pre指向tail
  7. if (compareAndSetTail(pred, node)) { //cas 将tail节点指向 当前节点
  8. pred.next = node;//如果cas成功,将原本原来最后一个节点的next指向当前节点。
  9. return node;
  10. }
  11. }
  12. enq(node);//完整enq
  13. return node;
  14. }

先尝试快enq,失败的话,完整enq:

  1. private Node enq(final Node node) {
  2. for (;;) { //自旋
  3. Node t = tail;
  4. if (t == null) { // Must initialize 必须先初始化,初始化结束进行下个自旋
  5. if (compareAndSetHead(new Node()))//cas为head创建一个新的节点
  6. tail = head; //tail也指向这个新节点
  7. } else {
  8. node.prev = t;//当前节点的prev指向tail
  9. if (compareAndSetTail(t, node)) {//cas让tail指向单前节点
  10. t.next = node;//cas成功,让原本指向tail的节点指向当前节点,否则下次自旋尝试
  11. return t;
  12. }
  13. }
  14. }
  15. }

以自旋的方式保证节点插入等待队列成功,并返回当前节点:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))自旋检查直到挂起来

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;//中断标识
  5. for (;;) {//自旋
  6. final Node p = node.predecessor();//获取pre节点,没找到抛出异常
  7. if (p == head && tryAcquire(arg)) { //p == head ,说明当前节点是排在队列中排第一个 检查锁已经被释放,并去cas获取锁
  8. setHead(node);//重新设置head
  9. p.next = null; // help GC 方便垃圾回收
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&//是否需要挂起线程判断
  14. parkAndCheckInterrupt()) //挂起 当前线程阻塞在这里,等待被唤醒
  15. interrupted = true;//中断标识
  16. }
  17. } finally {
  18. if (failed) //线程被取消
  19. cancelAcquire(node);//回收节点
  20. }
  21. }

是否挂起判断逻辑

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;//前个节点的状态
  3. if (ws == Node.SIGNAL) //signal值为-1 运行被唤醒的状态
  4. /*
  5. * This node has already set status asking a release
  6. * to signal it, so it can safely park. 这个节点已经设置了状态,要求释放信号给它,这样它就可以安全挂起了。
  7. */
  8. return true;
  9. if (ws > 0) { //CANCELLED 取消的=1 移除掉
  10. /*
  11. * Predecessor was cancelled. Skip over predecessors and
  12. * indicate retry. 前任被取消了。跳过前辈和显示重试。
  13. */
  14. do {
  15. node.prev = pred = pred.prev;
  16. } while (pred.waitStatus > 0);
  17. pred.next = node;
  18. } else {//小于0
  19. /*
  20. * waitStatus must be 0 or PROPAGATE. Indicate that we
  21. * need a signal, but don't park yet. Caller will need to
  22. * retry to make sure it cannot acquire before parking.
  23. */
  24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//cas将当前节点变成SIGNAL
  25. }
  26. return false;
  27. }
  28. private final boolean parkAndCheckInterrupt() {
  29. LockSupport.park(this); //线程会阻塞在这里
  30. return Thread.interrupted(); //唤醒后。重置线程中断标识
  31. }

如果线程被取消执行取消操作

  1. private void cancelAcquire(Node node) {
  2. // Ignore if node doesn't exist
  3. if (node == null) //节点已经可以被回收
  4. return;
  5. node.thread = null; //方便GC回收线程
  6. // Skip cancelled predecessors
  7. Node pred = node.prev;
  8. while (pred.waitStatus > 0)//前面节点也被取消
  9. node.prev = pred = pred.prev;//移除前面节点
  10. // predNext is the apparent node to unsplice. CASes below will
  11. // fail if not, in which case, we lost race vs another cancel
  12. // or signal, so no further action is necessary.
  13. Node predNext = pred.next;//前节点的后节点。(可能已经不是当前节点了)
  14. // Can use unconditional write instead of CAS here.
  15. // After this atomic step, other Nodes can skip past us.
  16. // Before, we are free of interference from other threads.
  17. node.waitStatus = Node.CANCELLED;//将当前节点置为可回收状态
  18. // If we are the tail, remove ourselves.
  19. if (node == tail && compareAndSetTail(node, pred)) {//如果tail指向当前节点,cas让tail指向前节点
  20. compareAndSetNext(pred, predNext, null);//cas当前阶段的next为空
  21. } else {
  22. // If successor needs signal, try to set pred's next-link
  23. // so it will get one. Otherwise wake it up to propagate.
  24. int ws;
  25. if (pred != head &&//前节点非头节点
  26. ((ws = pred.waitStatus) == Node.SIGNAL ||//前节点是可被挂起状态
  27. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//将前节点的wait状态设为signal
  28. pred.thread != null) {//前节点有线程
  29. Node next = node.next;
  30. if (next != null && next.waitStatus <= 0)
  31. compareAndSetNext(pred, predNext, next);//cas前节点的next指向当前节点的next
  32. } else {
  33. unparkSuccessor(node);//如果是头节点的话,唤醒unpark该节点
  34. }
  35. node.next = node; // help GC
  36. }
  37. }

selfInterrupt();

  1. static void selfInterrupt() {
  2. Thread.currentThread().interrupt(); //再次中断
  3. }

释放锁

  1. public void unlock() {
  2. sync.release(1);//同步器释放锁
  3. }
  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) { // 释放锁
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0) //有等待的线程
  5. unparkSuccessor(h);//唤醒头节点的线程
  6. return true;
  7. }
  8. return false;
  9. }

释放锁:

  1. protected final boolean tryRelease(int releases) {
  2. int c = getState() - releases; //一层一层减少重入次数
  3. if (Thread.currentThread() != getExclusiveOwnerThread())//当前线程不是抢占锁线程。抛出异常
  4. throw new IllegalMonitorStateException();
  5. boolean free = false;//释放释放成功标识
  6. if (c == 0) {//stete ==0 说明当前线程已经释放了资源
  7. free = true;
  8. setExclusiveOwnerThread(null);
  9. }
  10. setState(c); //释放锁的最后,写volatile变量state。利用了volatile的内存屏障特性
  11. return free;
  12. }

唤醒等待队列中的线程

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0)//SIGNAL -1 CONDITION -2 PROPAGATE -3
  9. compareAndSetWaitStatus(node, ws, 0); //cas waitStatus 设置成0
  10. /*
  11. * Thread to unpark is held in successor, which is normally
  12. * just the next node. But if cancelled or apparently null,
  13. * traverse backwards from tail to find the actual
  14. * non-cancelled successor.
  15. */
  16. Node s = node.next;//真正要被唤醒的线程
  17. if (s == null || s.waitStatus > 0) { //没有线程,或者线程被取消
  18. s = null;
  19. for (Node t = tail; t != null && t != node; t = t.prev)//从tail开始往前遍历,直到为空,获取当前节点
  20. if (t.waitStatus <= 0)//且线程没有被取消
  21. s = t; //赋给要被唤醒的节点
  22. }
  23. if (s != null)
  24. LockSupport.unpark(s.thread);//唤醒该节点
  25. }