构造器

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }

CountDownLatch初始化的时候会创建一个state大于等于0的同步器

await()阻塞方法

  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);//同步器的共享方法
  3. }
  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())//中断标识
  4. throw new InterruptedException();
  5. if (tryAcquireShared(arg) < 0)//获取锁,小于0,说明共享锁已经用完了
  6. doAcquireSharedInterruptibly(arg);//加入aqs队列
  7. }
  1. protected int tryAcquireShared(int acquires) {
  2. return (getState() == 0) ? 1 : -1;
  3. }

源码可知,线程进来会判断state的值,如果等于0,则直接运行线程,否则加入aqs队列,自旋检查,或者挂起

  1. private void doAcquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. final Node node = addWaiter(Node.SHARED);//创建等待节点,并将节点加入aqs队列
  4. boolean failed = true;
  5. try {
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {//前节点是head节点
  9. int r = tryAcquireShared(arg);//去抢占资源
  10. if (r >= 0) {//抢占成功
  11. setHeadAndPropagate(node, r);//传递依赖
  12. p.next = null; // help GC
  13. failed = false;
  14. return;
  15. }
  16. }
  17. if (shouldParkAfterFailedAcquire(p, node) &&
  18. parkAndCheckInterrupt())//挂起
  19. throw new InterruptedException();
  20. }
  21. } finally {
  22. if (failed)
  23. cancelAcquire(node);
  24. }
  25. }

线程从挂起处被唤醒后setHeadAndPropagate(node, r)去传递唤醒

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head; // Record old head for check below
  3. setHead(node);//当前被唤醒的线程设为head
  4. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  5. (h = head) == null || h.waitStatus < 0) {
  6. Node s = node.next;
  7. if (s == null || s.isShared())
  8. doReleaseShared();//传递唤醒线程
  9. }
  10. }

countDown()唤醒线程

  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }
  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {//释放共享锁
  3. doReleaseShared();//唤醒aqs队列中等待的队列
  4. return true;
  5. }
  6. return false;
  7. }
  1. protected boolean tryReleaseShared(int releases) {
  2. // Decrement count; signal when transition to zero
  3. for (;;) {
  4. int c = getState();
  5. if (c == 0)
  6. return false;
  7. int nextc = c-1;
  8. if (compareAndSetState(c, nextc))//cas去减去1
  9. return nextc == 0;//减去完成之后等于0,返回true
  10. }
  11. }
  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  8. continue; // loop to recheck cases
  9. unparkSuccessor(h);//唤醒排在第一的线程
  10. }
  11. else if (ws == 0 &&
  12. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//PROPAGATE:-3表示下一次共享式同步状态获取将会无条件地被传播下去
  13. continue; // loop on failed CAS
  14. }
  15. if (h == head) // loop if head changed
  16. break;
  17. }
  18. }

调用countDown()时,cas去state-1,当state为0时候。唤醒aqs队列里的排在第一的队列