介绍:

会换栅栏,实现一组线程等待某个状态,再同时执行。CyclicBarrier可以被重用
1648472462(1).png
构造方法

  1. // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
  2. public CyclicBarrier(int parties)
  3. // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
  4. public CyclicBarrier(int parties, Runnable barrierAction)

重要方法

  1. //屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
  2. // BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
  3. public int await() throws InterruptedException, BrokenBarrierException
  4. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
  5. //循环 通过reset()方法可以进行重置
  6. public void reset()
  7. /** The lock for guarding barrier entry */
  8. private final ReentrantLock lock = new ReentrantLock();
  9. /** Condition to wait on until tripped */
  10. private final Condition trip = lock.newCondition();

CyclicBarrier应用场景
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。

  1. public class CyclicBarrierTest2 {
  2. //保存每个学生的平均成绩
  3. private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
  4. private ExecutorService threadPool= Executors.newFixedThreadPool(3);
  5. private CyclicBarrier cb=new CyclicBarrier(3,()->{
  6. int result=0;
  7. Set<String> set = map.keySet();
  8. for(String s:set){
  9. result+=map.get(s);
  10. }
  11. System.out.println("三人平均成绩为:"+(result/3)+"分");
  12. });
  13. public void count(){
  14. for(int i=0;i<3;i++){
  15. threadPool.execute(new Runnable(){
  16. @Override
  17. public void run() {
  18. //获取学生平均成绩
  19. int score=(int)(Math.random()*40+60);
  20. map.put(Thread.currentThread().getName(), score);
  21. System.out.println(Thread.currentThread().getName()
  22. +"同学的平均成绩为:"+score);
  23. try {
  24. //执行完运行await(),等待所有学生平均成绩都计算完毕
  25. cb.await();
  26. } catch (InterruptedException | BrokenBarrierException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. });
  31. }
  32. }
  33. public static void main(String[] args) {
  34. CyclicBarrierTest2 cb=new CyclicBarrierTest2();
  35. cb.count();
  36. }
  37. }

利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景

  1. public class CyclicBarrierTest3 {
  2. public static void main(String[] args) {
  3. AtomicInteger counter = new AtomicInteger();
  4. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
  5. 5, 5, 1000, TimeUnit.SECONDS,
  6. new ArrayBlockingQueue<>(100),
  7. (r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
  8. new ThreadPoolExecutor.AbortPolicy());
  9. CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
  10. () -> System.out.println("裁判:比赛开始~~"));
  11. for (int i = 0; i < 10; i++) {
  12. threadPoolExecutor.submit(new Runner(cyclicBarrier));
  13. }
  14. }
  15. static class Runner extends Thread{
  16. private CyclicBarrier cyclicBarrier;
  17. public Runner (CyclicBarrier cyclicBarrier) {
  18. this.cyclicBarrier = cyclicBarrier;
  19. }
  20. @Override
  21. public void run() {
  22. try {
  23. int sleepMills = ThreadLocalRandom.current().nextInt(1000);
  24. Thread.sleep(sleepMills);
  25. System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
  26. cyclicBarrier.await();
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }catch(BrokenBarrierException e){
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }

CyclicBarrier与CountDownLatch的区别

  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
  2. CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
  3. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  4. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  5. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  6. CyclicBarrier是通过ReentrantLock的”独占锁”和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现

核心就在 doawit中队列入队
判断条件在唤醒线程,再释放锁,条件队列转到同步队列,再从同步队列中唤醒线程的设计

CyclicBarrier源码分析

  1. // 代码模式
  2. lock.lock();
  3. try{
  4. //和兴
  5. // condition的await;,
  6. // 阻塞线程,释放锁
  7. //进入条件队列
  8. // 可以在释放锁的时候唤醒head后续节点的所有线程
  9. // 被唤醒的线程获取锁,如果失败会阻塞 ,这个是独占锁,双向队列
  10. await();
  11. }finally{
  12. lock.unLock()
  13. }
  14. private void breakBarrier() {
  15. generation.broken = true;
  16. count = parties;
  17. trip.signalAll();// 会唤醒,将条件队列转为同步队列
  18. }

image.png
AQS之CyclicBarrier源码分析.png

死锁

优化:可以调整时间,调整顺序

  1. public class DeadLockTest {
  2. private static String a = "a";
  3. private static String b = "b";
  4. public static void main(String[] args) {
  5. Thread threadA = new Thread(()->{
  6. synchronized (a) {
  7. log.debug("threadA进入a同步块,执行中...");
  8. try {
  9. //Thread.sleep(2000); 条件队列作用: 打破死锁的循环
  10. a.wait(5000);
  11. synchronized (b) {
  12. log.debug("threadA进入b同步块,执行中...");
  13. }
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. },"threadA");
  19. Thread threadB = new Thread(()->{
  20. synchronized (b) {
  21. log.debug("threadB进入b同步块,执行中...");
  22. try {
  23. //b.wait(5000);
  24. Thread.sleep(2000);
  25. synchronized (a) {
  26. log.debug("threadB进入a同步块,执行中...");
  27. }
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. },"threadB");
  33. threadA.start();
  34. threadB.start();
  35. }
  36. }
  1. /**
  2. * @author Fox
  3. * 哲学家就餐问题
  4. */
  5. public class PhilosopherEatTest {
  6. public static void main(String[] args) {
  7. //初始化五根筷子
  8. Chopstick c1 = new Chopstick(1);
  9. Chopstick c2 = new Chopstick(2);
  10. Chopstick c3 = new Chopstick(3);
  11. Chopstick c4 = new Chopstick(4);
  12. Chopstick c5 = new Chopstick(5);
  13. // 思考: 如何打破循环, 调整获取锁的顺序
  14. new Philosopher("苏格拉底", c1, c2).start();
  15. new Philosopher("柏拉图", c2, c3).start();
  16. new Philosopher("亚里士多德", c3, c4).start();
  17. new Philosopher("赫拉克利特", c4, c5).start();
  18. new Philosopher("阿基米德", c1,c5).start();
  19. }
  20. }