CountDownLatch(线程计数器)

countDown()计数器递减

  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }
  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

唤醒共享锁队列的线程

  1. private void doReleaseShared() {
  2. /*
  3. * Ensure that a release propagates, even if there are other
  4. * in-progress acquires/releases. This proceeds in the usual
  5. * way of trying to unparkSuccessor of head if it needs
  6. * signal. But if it does not, status is set to PROPAGATE to
  7. * ensure that upon release, propagation continues.
  8. * Additionally, we must loop in case a new node is added
  9. * while we are doing this. Also, unlike other uses of
  10. * unparkSuccessor, we need to know if CAS to reset status
  11. * fails, if so rechecking.
  12. */
  13. for (;;) {
  14. Node h = head;
  15. if (h != null && h != tail) {
  16. int ws = h.waitStatus;
  17. //如果节点状态为 SIGNAL 表示可以被唤醒
  18. if (ws == Node.SIGNAL) {
  19. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  20. continue; // loop to recheck cases
  21. unparkSuccessor(h);
  22. }
  23. else if (ws == 0 &&
  24. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  25. continue; // loop on failed CAS
  26. }
  27. if (h == head) // loop if head changed
  28. break;
  29. }
  30. }

await()阻塞等待 CountDownLatch变成0在释放执行后面逻辑

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. //state 不为 0 的时候 进行阻塞
  6. if (tryAcquireShared(arg) < 0)
  7. doAcquireSharedInterruptibly(arg);
  8. }
  1. protected int tryAcquireShared(int acquires) {
  2. return (getState() == 0) ? 1 : -1;
  3. }
  1. private void doAcquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. //根据判断结果获取锁
  10. int r = tryAcquireShared(arg);
  11. //这里不会走 因为 外面判断了 r<0
  12. if (r >= 0) {
  13. setHeadAndPropagate(node, r);
  14. p.next = null; // help GC
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. //阻塞等待
  21. parkAndCheckInterrupt())
  22. throw new InterruptedException();
  23. }
  24. } finally {
  25. if (failed)
  26. cancelAcquire(node);
  27. }
  28. }

Semaphore(信号量)

假设初始时 是5 每次调用 release()方法 都针对 state进行递减 因此当 state令牌==5的时候 意味着所有的令牌都被使用完了 后续调用的线程都会以共享类型加入到 CLH队列中 而当 state<5时 说明有其他线程释放了令牌 可以从clh队列中唤醒头部的线程

限制对某个资源同时访问的线程数

acquire()获取令牌

获取指定数量的令牌 如果不足会阻塞

release()释放令牌

释放指定数量的令牌

tryAcquire()

尝试获取指定数量的令牌 此过程是非阻塞的 如果令牌数不够 则返回false否则返回true

drainPermits()

当前线程获得剩下的所有可用令牌

hasQueuedThreads()

判断当前 Semaphore实例是否存在正在等待令牌的线程

CyclicBarrier

当四个线程都执行结束 唤醒main方法的线程继续执行

基本使用

  1. public static void main(String[] args) {
  2. int parties =4;
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(parties,() ->{
  4. System.out.println("所有线程执行完开始执行");
  5. });
  6. for (int i = 0; i <parties ; i++) {
  7. new ImportDataTask(cyclicBarrier).start();
  8. }
  9. }
  10. static class ImportDataTask extends Thread {
  11. private CyclicBarrier cyclicBarrier;
  12. public ImportDataTask(CyclicBarrier cyclicBarrier) {
  13. this.cyclicBarrier = cyclicBarrier;
  14. }
  15. @SneakyThrows
  16. @Override
  17. public void run() {
  18. Thread.sleep(1000);
  19. System.out.println(Thread.currentThread().getName()+"线程执行完毕");
  20. cyclicBarrier.await();
  21. }
  22. }
  23. }

原理

  1. public int await() throws InterruptedException, BrokenBarrierException {
  2. try {
  3. return dowait(false, 0L);
  4. } catch (TimeoutException toe) {
  5. throw new Error(toe); // cannot happen
  6. }
  7. }
  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. final ReentrantLock lock = this.lock;
  5. lock.lock(); //获取锁
  6. try {
  7. final Generation g = generation;
  8. if (g.broken) //确认当前的 genration的barrier是否失效
  9. throw new BrokenBarrierException();
  10. //线程是否中断
  11. if (Thread.interrupted()) {
  12. breakBarrier();
  13. throw new InterruptedException();
  14. }
  15. int index = --count;
  16. //如果到达了屏障 需要去释放被阻塞的线程
  17. if (index == 0) { // tripped
  18. boolean ranAction = false;
  19. try {
  20. final Runnable command = barrierCommand;
  21. if (command != null)
  22. //如果 CylicBarrier回调不为空 直接触发回调
  23. command.run();
  24. ranAction = true;
  25. //进入下一个屏障周期
  26. nextGeneration();
  27. return 0;
  28. } finally {
  29. //回调任务失败
  30. if (!ranAction)
  31. breakBarrier();
  32. }
  33. }
  34. // loop until tripped, broken, interrupted, or timed out
  35. //自旋 等待到达屏障点或线程中断 等待超时等
  36. for (;;) {
  37. try {
  38. //是否有超时时间 如果没有直接阻塞
  39. if (!timed)
  40. trip.await();
  41. else if (nanos > 0L)
  42. //采用超时等待机制
  43. nanos = trip.awaitNanos(nanos);
  44. } catch (InterruptedException ie) {
  45. if (g == generation && ! g.broken) {
  46. breakBarrier();
  47. throw ie;
  48. } else {
  49. // We're about to finish waiting even if we had not
  50. // been interrupted, so this interrupt is deemed to
  51. // "belong" to subsequent execution.
  52. Thread.currentThread().interrupt();
  53. }
  54. }
  55. if (g.broken)
  56. throw new BrokenBarrierException();
  57. if (g != generation)
  58. return index;
  59. if (timed && nanos <= 0L) {
  60. breakBarrier();
  61. throw new TimeoutException();
  62. }
  63. }
  64. } finally {
  65. lock.unlock();
  66. }
  67. }