Semaphore使用及应用场景例子

Semaphore 是什么?

Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。

怎么使用 Semaphore?

构造方法

public Semaphore(int permits) public Semaphore(int permits, boolean fair)

  • permits 表示许可线程的数量
  • fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

    重要方法

    public void acquire() throws InterruptedException public void release() tryAcquire(int args,long timeout, TimeUnit unit)

  • acquire() 表示阻塞并获取许可

  • release() 表示释放许可

    基本使用

    需求场景

    资源访问,服务限流(Hystrix里限流就有基于信号量方式)。

代码实现

  1. public class SemaphoreRunner {
  2. public static void main(String[] args) {
  3. Semaphore semaphore = new Semaphore(2);
  4. for (int i=0;i<5;i++){
  5. new Thread(new Task(semaphore,"yangguo+"+i)).start();
  6. }
  7. }
  8. static class Task extends Thread{
  9. Semaphore semaphore;
  10. public Task(Semaphore semaphore,String tname){
  11. this.semaphore = semaphore;
  12. this.setName(tname);
  13. }
  14. public void run() {
  15. try {
  16. semaphore.acquire();
  17. System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
  18. Thread.sleep(1000);
  19. semaphore.release();
  20. System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }

结果:

Thread-0:aquire() at time:1647149230703 Thread-1:aquire() at time:1647149230704 Thread-2:aquire() at time:1647149235713 Thread-3:aquire() at time:1647149235713 Thread-4:aquire() at time:1647149240721 Thread-5:aquire() at time:1647149240721 Thread-6:aquire() at time:1647149245723 Thread-7:aquire() at time:1647149245723 Thread-8:aquire() at time:1647149250729 Thread-9:aquire() at time:1647149250729

从打印结果可以看出,一次只有两个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。

CountDownLatch使用及应用场景例子

CountDownLatch是什么?

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
使用场景:

Zookeeper分布式锁,Jmeter模拟高并发等

CountDownLatch如何工作?

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

API

CountDownLatch.countDown() CountDownLatch.await();

CountDownLatch应用场景例子

比如陪媳妇去看病。
医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。
现在我们是双核,可以同时做这两个事(多线程)。
假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)。

  1. /**
  2. * 看大夫任务
  3. */
  4. public class SeeDoctorTask implements Runnable {
  5. private CountDownLatch countDownLatch;
  6. public SeeDoctorTask(CountDownLatch countDownLatch){
  7. this.countDownLatch = countDownLatch;
  8. }
  9. public void run() {
  10. try {
  11. System.out.println("开始看医生");
  12. Thread.sleep(3000);
  13. System.out.println("看医生结束,准备离开病房");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }finally {
  17. if (countDownLatch != null)
  18. countDownLatch.countDown();
  19. }
  20. }
  21. }
  22. /**
  23. * 排队的任务
  24. */
  25. public class QueueTask implements Runnable {
  26. private CountDownLatch countDownLatch;
  27. public QueueTask(CountDownLatch countDownLatch){
  28. this.countDownLatch = countDownLatch;
  29. }
  30. public void run() {
  31. try {
  32. System.out.println("开始在医院药房排队买药....");
  33. Thread.sleep(5000);
  34. System.out.println("排队成功,可以开始缴费买药");
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }finally {
  38. if (countDownLatch != null)
  39. countDownLatch.countDown();
  40. }
  41. }
  42. }
  43. /**
  44. * 配媳妇去看病,轮到媳妇看大夫时
  45. * 我就开始去排队准备交钱了。
  46. */
  47. public class CountDownLaunchRunner {
  48. public static void main(String[] args) throws InterruptedException {
  49. long now = System.currentTimeMillis();
  50. CountDownLatch countDownLatch = new CountDownLatch(2);
  51. new Thread(new SeeDoctorTask(countDownLatch)).start();
  52. new Thread(new QueueTask(countDownLatch)).start();
  53. //等待线程池中的2个任务执行完毕,否则一直
  54. countDownLatch.await();
  55. System.out.println("over,回家 cost:"+(System.currentTimeMillis()-now));
  56. }
  57. }

CyclicBarrier应用场景例子

CyclicBarrier是什么?

栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

API

cyclicBarrier.await();

应用场景

可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
示例代码:

  1. public class CyclicBarrierRunner implements Runnable {
  2. private CyclicBarrier cyclicBarrier;
  3. private int index ;
  4. public CyclicBarrierTest(CyclicBarrier cyclicBarrier, int index) {
  5. this.cyclicBarrier = cyclicBarrier;
  6. this.index = index;
  7. }
  8. public void run() {
  9. try {
  10. System.out.println("index: " + index);
  11. index--;
  12. cyclicBarrier.await();
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. public static void main(String[] args) throws Exception {
  18. CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
  19. public void run() {
  20. System.out.println("所有特工到达屏障,准备开始执行秘密任务");
  21. }
  22. });
  23. for (int i = 0; i < 10; i++) {
  24. new Thread(new CyclicBarrierTest(cyclicBarrier, i)).start();
  25. }
  26. cyclicBarrier.await();
  27. System.out.println("全部到达屏障....");
  28. }
  29. }