JUC

CountDownLatch

CountDownLatch是目前使用比较多的类,CountDownLatch初始化时会给定一个计数,然后每次调用countDown() 计数减1,
当计数未到达0之前调用await() 方法会阻塞直到计数减到0;
使用场景:多用于划分任务由多个线程执行,例如:最近写个豆瓣爬虫,需要爬取每个电影的前五页短评,可以划分成五个线程来处理数据。通过latch.await()保证全部完成再返回。

  1. public void latch() throws InterruptedException {
  2. int count= 5;
  3. CountDownLatch latch = new CountDownLatch(count);
  4. for (int x=0;x<count;x++){
  5. new Worker(x*20,latch).start();
  6. }
  7. latch.await();
  8. System.out.println("全部执行完毕");
  9. }
  10. class Worker extends Thread{
  11. Integer start;
  12. CountDownLatch latch;
  13. public Worker(Integer start,CountDownLatch latch){
  14. this.start=start;
  15. this.latch=latch;
  16. } @Override
  17. public void run() {
  18. System.out.println(start+" 已执行");
  19. latch.countDown();
  20. }
  21. }

输出如下:

  1. 20 已执行
  2. 0 已执行
  3. 40 已执行
  4. 60 已执行
  5. 80 已执行
  6. 全部执行完毕

CyclicBarrier

它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)也就是阻塞在调用cyclicBarrier.await()的地方。
看上去CyclicBarrierCountDownLatch 功能上类似,在官方doc上CountDownLatch的描述上就说明了,CountDownLatch 的计数无法被重置,
如果需要重置计数,请考虑使用CyclicBarrier
CyclicBarrier初始时还可添加一个Runnable的参数, 此RunnableCyclicBarrier的数目达到后,所有其它线程被唤醒前被最后一个进入 CyclicBarrier 的线程执行
使用场景:类似CyclicBarrier,但是 CyclicBarrier提供了几个CountDownLatch没有的方法以应付更复杂的场景,例如:
getNumberWaiting() 获取阻塞线程数量,
isBroken() 用来知道阻塞的线程是否被中断等方法。
reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException

  1. public void latch() throws InterruptedException {
  2. int count = 5;
  3. CyclicBarrier cb = new CyclicBarrier(count, new Runnable() {
  4. @Override
  5. public void run() {
  6. System.out.println("全部执行完毕");
  7. }
  8. });
  9. ExecutorService executorService = Executors.newFixedThreadPool(count);
  10. while (true){
  11. for (int x=0;x<count;x++){
  12. executorService.execute(new Worker(x,cb));
  13. }
  14. }
  15. }
  16. class Worker extends Thread {
  17. Integer start;
  18. CyclicBarrier cyclicBarrier; public Worker(Integer start, CyclicBarrier cyclicBarrier) {
  19. this.start = start;
  20. this.cyclicBarrier = cyclicBarrier;
  21. } @Override
  22. public void run() {
  23. System.out.println(start + " 已执行");
  24. try {
  25. cyclicBarrier.await();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. } catch (BrokenBarrierException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }

输出如下:

  1. 0 已执行
  2. 3 已执行
  3. 4 已执行
  4. 2 已执行
  5. 1 已执行
  6. 全部执行完毕
  7. 0 已执行
  8. 1 已执行
  9. 2 已执行
  10. 3 已执行
  11. 4 已执行
  12. 全部执行完毕

Semaphore

Semaphore 信号量维护了一个许可集,每次使用时执行acquire()Semaphore获取许可,如果没有则会阻塞,每次使用完执行release()释放许可。
使用场景:Semaphore对用于对资源的控制,比如数据连接有限,使用Semaphore限制访问数据库的线程数。

  1. public void latch() throws InterruptedException, IOException {
  2. int count = 5;
  3. Semaphore semaphore = new Semaphore(1);
  4. ExecutorService executorService = Executors.newFixedThreadPool(count);
  5. for (int x=0;x<count;x++){
  6. executorService.execute(new Worker(x,semaphore));
  7. }
  8. System.in.read();
  9. }
  10. class Worker extends Thread {
  11. Integer start;
  12. Semaphore semaphore; public Worker(Integer start, Semaphore semaphore) {
  13. this.start = start;
  14. this.semaphore = semaphore;
  15. } @Override
  16. public void run() throws IllegalArgumentException {
  17. try {
  18. System.out.println(start + " 准备执行");
  19. TimeUnit.SECONDS.sleep(1);
  20. semaphore.acquire();
  21. System.out.println(start + " 已经执行");
  22. semaphore.release();
  23. System.out.println(start + " 已经释放");
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } }
  27. }

输出如下:

  1. 0 准备执行
  2. 2 准备执行
  3. 1 准备执行
  4. 3 准备执行
  5. 4 准备执行
  6. 2 已经执行
  7. 2 已经释放
  8. 4 已经执行
  9. 4 已经释放
  10. 1 已经执行
  11. 1 已经释放
  12. 0 已经执行
  13. 0 已经释放
  14. 3 已经执行
  15. 3 已经释放

Exchanger

Exchanger 用于两个线程间的数据交换,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
使用场景:两个线程相互等待处理结果并进行数据传递。

  1. public void latch() throws InterruptedException, IOException {
  2. int count = 5;
  3. Exchanger<String> exchanger = new Exchanger<>();
  4. ExecutorService executorService = Executors.newFixedThreadPool(count);
  5. for (int x=0;x<count;x++){
  6. executorService.execute(new Worker(x,exchanger));
  7. }
  8. System.in.read();
  9. }
  10. class Worker extends Thread {
  11. Integer start;
  12. Exchanger<String> exchanger; public Worker(Integer start, Exchanger<String> exchanger) {
  13. this.start = start;
  14. this.exchanger = exchanger;
  15. } @Override
  16. public void run() throws IllegalArgumentException {
  17. try {
  18. System.out.println(Thread.currentThread().getName() + " 准备执行");
  19. TimeUnit.SECONDS.sleep(start);
  20. System.out.println(Thread.currentThread().getName() + " 等待交换");
  21. String value = exchanger.exchange(Thread.currentThread().getName());
  22. System.out.println(Thread.currentThread().getName() + " 交换得到数据为:"+value);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. } }
  26. }

输出如下:

  1. pool-1-thread-1 准备执行
  2. pool-1-thread-1 等待交换
  3. pool-1-thread-3 准备执行
  4. pool-1-thread-2 准备执行
  5. pool-1-thread-5 准备执行
  6. pool-1-thread-4 准备执行
  7. pool-1-thread-2 等待交换
  8. pool-1-thread-1 交换得到数据为:pool-1-thread-2
  9. pool-1-thread-2 交换得到数据为:pool-1-thread-1
  10. pool-1-thread-3 等待交换
  11. pool-1-thread-4 等待交换
  12. pool-1-thread-4 交换得到数据为:pool-1-thread-3
  13. pool-1-thread-3 交换得到数据为:pool-1-thread-4
  14. pool-1-thread-5 等待交换

Exchanger必须成对出现,否则会像上面代码执行结果那样,pool-1-thread-5一直阻塞等待与其交换数据的线程,为了避免这一现象,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。