Semaphore介绍

Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。

Semaphore 常用方法

  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可

    应用场景

    可以用于做流量控制,特别是公用资源有限的应用场景

    1. public class Test {
    2. //5个限流
    3. private static Semaphore semaphore = new Semaphore(50);
    4. //自定义一个线程池
    5. private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
    6. public static void main(String[] args) throws InterruptedException {
    7. for (; ; ) {
    8. Thread.sleep(100);
    9. executor.execute(() -> {
    10. exec();
    11. });
    12. }
    13. }
    14. public static void exec() {
    15. try {
    16. semaphore.acquire(1);
    17. System.out.println("执行exec");
    18. Thread.sleep(2000);
    19. } catch (Exception e) {
    20. e.printStackTrace();
    21. } finally {
    22. semaphore.release(1);
    23. }
    24. }
    25. }

    Semaphore源码分析

    关注点:

  1. Semaphore的加锁解锁(共享锁)逻辑实现
  2. 线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现

image.png

CountDownLatch介绍

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象—— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。

CountDownLatch的使用

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }

image.png

CountDownLatch应用场景

CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch 的初始化决定)任务执行完成。
CountDownLatch的两种使用场景:

  • 场景1:让多个线程等待
  • 场景2:让单个线程等待。

场景1 让多个线程等待:模拟并发,让并发线程一起执行

  1. public class Test {
  2. public static void main(String[] args) throws InterruptedException {
  3. CountDownLatch countDownLatch = new CountDownLatch(1);
  4. for (int i = 0; i < 5; i++) {
  5. new Thread(() -> {
  6. try {
  7. countDownLatch.await();
  8. System.out.println(Thread.currentThread().getName() + ":冲。。。。");
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }).start();
  13. }
  14. Thread.sleep(2000);
  15. countDownLatch.countDown();
  16. }
  17. }

场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并

  1. public class Test {
  2. public static void main(String[] args) throws InterruptedException {
  3. CountDownLatch countDownLatch = new CountDownLatch(5);
  4. for (int i = 0; i < 5; i++) {
  5. new Thread(() -> {
  6. try {
  7. Thread.sleep(2000);
  8. countDownLatch.countDown();
  9. System.out.println(Thread.currentThread().getName() + ":我好了");
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }).start();
  14. }
  15. countDownLatch.await();
  16. System.out.println("冲。。。");
  17. }
  18. }

CountDownLatch实现原理

底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。
而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。

CountDownLatch与Thread.join的区别

  • CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
  • CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
  • 而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。

    CountDownLatch与CyclicBarrier的区别

    CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
  2. CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
  3. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  4. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执 行。
  5. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  6. CyclicBarrier是通过ReentrantLock的”独占锁”和Conditon来实现一组线程的阻塞唤 醒的,而CountDownLatch则是通过AQS的“共享锁”实现

    CyclicBarrier介绍

    字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

    CyclicBarrier的使用

    构造方法
    image.png
    重要方法
    image.pngimage.pngimage.png

    CyclicBarrier应用场景

    CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。

    1. public class Test {
    2. private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>();
    3. private ExecutorService threadPool = Executors.newFixedThreadPool(3);
    4. private CyclicBarrier cb = new CyclicBarrier(3, () -> {
    5. int result = 0;
    6. for (String s : map.keySet()) {
    7. result += map.get(s);
    8. }
    9. System.out.println("平均成绩:" + result / map.size() + "分");
    10. });
    11. public void count() {
    12. for (int i = 0; i < 6; i++) {
    13. threadPool.execute(() -> {
    14. map.put(Thread.currentThread().getName(), (int) (Math.random() * 100));
    15. System.out.println(Thread.currentThread().getName() + " 同学成绩为:" + map.get(Thread.currentThread().getName()));
    16. try {
    17. cb.await();
    18. } catch (BrokenBarrierException | InterruptedException e) {
    19. e.printStackTrace();
    20. }
    21. });
    22. }
    23. }
    24. public static void main(String[] args) {
    25. Test test = new Test();
    26. test.count();
    27. }
    28. }

    利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景

    1. public class Test {
    2. public static void main(String[] args) {
    3. AtomicInteger counter = new AtomicInteger();
    4. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1000, TimeUnit.SECONDS,
    5. new ArrayBlockingQueue<>(100),
    6. (r) -> new Thread(r, counter.getAndAdd(1) + "号"), new ThreadPoolExecutor.AbortPolicy());
    7. CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("冲。。。。"));
    8. for (int i = 0; i < 10; i++) {
    9. threadPoolExecutor.submit(new Runner(cyclicBarrier));
    10. }
    11. }
    12. static class Runner implements Runnable {
    13. private CyclicBarrier cyclicBarrier;
    14. public Runner(CyclicBarrier cyclicBarrier) {
    15. this.cyclicBarrier = cyclicBarrier;
    16. }
    17. @Override
    18. public void run() {
    19. try {
    20. long sleepMills = ThreadLocalRandom.current().nextInt(1000);
    21. Thread.sleep(sleepMills);
    22. System.out.println(Thread.currentThread().getName() + "选手就位,用时" + sleepMills + "ms");
    23. cyclicBarrier.await();
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. } catch (BrokenBarrierException e) {
    27. e.printStackTrace();
    28. }
    29. }
    30. }
    31. }

    CyclicBarrier源码分析

    image.png