J.U.C - AQS

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。

1. CountDownLatch

用来控制一个或者多个线程等待多个线程。

维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。

image.png

  1. public class CountdownLatchExample {
  2. public static void main(String[] args) throws InterruptedException {
  3. final int totalThread = 10;
  4. CountDownLatch countDownLatch = new CountDownLatch(totalThread);
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. for (int i = 0; i < totalThread; i++) {
  7. executorService.execute(() -> {
  8. System.out.print("run..");
  9. countDownLatch.countDown();
  10. });
  11. }
  12. countDownLatch.await();
  13. System.out.println("end");
  14. executorService.shutdown();
  15. }
  16. }
  1. run..run..run..run..run..run..run..run..run..run..end

2. CyclicBarrier

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。

CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. this.barrierCommand = barrierAction;
  6. }
  7. public CyclicBarrier(int parties) {
  8. this(parties, null);
  9. }

image.png

  1. public class CyclicBarrierExample {
  2. public static void main(String[] args) {
  3. final int totalThread = 10;
  4. CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. for (int i = 0; i < totalThread; i++) {
  7. executorService.execute(() -> {
  8. System.out.print("before..");
  9. try {
  10. cyclicBarrier.await();
  11. } catch (InterruptedException | BrokenBarrierException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.print("after..");
  15. });
  16. }
  17. executorService.shutdown();
  18. }
  19. }
  1. before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

3. Semaphore

Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。

以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。

  1. public class SemaphoreExample {
  2. public static void main(String[] args) {
  3. final int threadNum=1;
  4. final int totalThread=10;
  5. Semaphore semaphore=new Semaphore(threadNum);
  6. ExecutorService service= Executors.newCachedThreadPool();
  7. for(int i=0;i<totalThread;i++){
  8. final int num=i;
  9. service.execute(new Runnable() {
  10. @Override
  11. public void run() {
  12. try {
  13. semaphore.acquire();
  14. test(num);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }finally {
  18. semaphore.release();
  19. }
  20. }
  21. });
  22. }
  23. service.shutdown();
  24. }
  25. private static void test(int i) throws InterruptedException {
  26. System.out.println("Thread: "+i);
  27. Thread.sleep(1000);
  28. }
  29. }
  1. // 每隔一秒钟出现一次
  2. Thread: 0
  3. Thread: 1
  4. Thread: 2
  5. Thread: 3
  6. Thread: 4
  7. Thread: 5
  8. Thread: 7
  9. Thread: 6
  10. Thread: 8
  11. Thread: 9

4. CountDownLatch 和 CyclicBarrier 比较

  • 循环使用
    CountDownLatch 只能用一次;
    CyclicBarrier 通过 reset() 可以循环使用
  • 计数方式
    CountDownLatch 是减计数方式,计数为 0 时释放所有等待的线程;
    CyclicBarrier 是加计数方式,计数达到构造方法中参数指定的值释放所有等待的线程
  • 应用场景
    CountDownLatch 主要应用于主/从任务模式。一个任务(主任务)等待多个任务(从任务)执行完后才能执行;
    CyclicBarrier 主要应用于队友模式。一组 N 个线程(N 个队友)相互等待,任意个线程(某个队友)没有完成任务,所有线程都等着,直到这一组所有线程的任务完成,这组中每个线程才能继续往下运行。
  • 底层原理
    CountDownLatch 底层是共享锁;
    CyclicBarrier 底层是独占锁。

J.U.C - 其它组件

1. FutureTask

在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。

  1. public class FutureTask<V> implements RunnableFuture<V>Copy to clipboardErrorCopied
  2. public interface RunnableFuture<V> extends Runnable, Future<V>Copy to clipboardErrorCopied

FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,主线程在完成自己的任务之后再去获取结果。

  1. public class FutureTaskExample {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. FutureTask<Integer> futureTask = new FutureTask<Integer>(
  4. new Callable<Integer>() {
  5. @Override
  6. public Integer call() throws Exception {
  7. int result = 0;
  8. for (int i = 0; i < 100; i++) {
  9. Thread.sleep(10);
  10. result += i;
  11. }
  12. return result;
  13. }
  14. });
  15. Thread computeThread = new Thread(futureTask);
  16. computeThread.start();
  17. Thread otherThread = new Thread(() -> {
  18. System.out.println("other task is running...");
  19. try {
  20. Thread.sleep(1000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. otherThread.start();
  26. System.out.println(futureTask.get());
  27. }
  28. }
  1. other task is running...
  2. 4950

2. BlockingQueue

java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:

  • FIFO 队列 :LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
  • 优先级队列 :PriorityBlockingQueue

阻塞队列是一个自持两个附加操作的队列:

  • 支持阻塞的插入方法(put):当队列满时,会阻塞插入元素的线程,直到队列中的元素不满为止。
  • 支持阻塞的移除方法(take):队列为空时,会阻塞获取元素的线程,直到队列中的元素不空为止。

使用 BlockingQueue 实现生产者消费者问题

  1. public class ProducerConsumer {
  2. private static BlockingQueue<String> queue=new LinkedBlockingQueue<>();
  3. private static class Producer extends Thread{
  4. @Override
  5. public void run() {
  6. try {
  7. queue.put("product");
  8. System.out.println("produce...");
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }
  14. private static class Consumer extends Thread{
  15. @Override
  16. public void run() {
  17. try {
  18. queue.take();
  19. System.out.println("consume...");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. public static void main(String[] args) {
  26. for(int i=0;i<2;i++){
  27. Producer p=new Producer();
  28. p.start();
  29. }
  30. for(int i=0;i<5;i++){
  31. Consumer c=new Consumer();
  32. c.start();
  33. }
  34. for(int i=0;i<3;i++){
  35. Producer p=new Producer();
  36. p.start();
  37. }
  38. }
  39. }
  1. produce...
  2. produce...
  3. consume...
  4. consume...
  5. produce...
  6. produce...
  7. consume...
  8. consume...
  9. produce...
  10. consume...

3. ForkJoin

主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。

  1. public class ForkJoinExample extends RecursiveTask<Integer> {
  2. private final int threshold = 5;
  3. private int first;
  4. private int last;
  5. public ForkJoinExample(int first, int last) {
  6. this.first = first;
  7. this.last = last;
  8. }
  9. @Override
  10. protected Integer compute() {
  11. int result = 0;
  12. if (last - first <= threshold) {
  13. // 任务足够小则直接计算
  14. for (int i = first; i <= last; i++) {
  15. result += i;
  16. }
  17. } else {
  18. // 拆分成小任务
  19. int middle = first + (last - first) / 2;
  20. ForkJoinExample leftTask = new ForkJoinExample(first, middle);
  21. ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
  22. leftTask.fork();
  23. rightTask.fork();
  24. result = leftTask.join() + rightTask.join();
  25. }
  26. return result;
  27. }
  28. }Copy to clipboardErrorCopied
  29. public static void main(String[] args) throws ExecutionException, InterruptedException {
  30. ForkJoinExample example = new ForkJoinExample(1, 10000);
  31. ForkJoinPool forkJoinPool = new ForkJoinPool();
  32. Future result = forkJoinPool.submit(example);
  33. System.out.println(result.get());
  34. }Copy to clipboardErrorCopied

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

  1. public class ForkJoinPool extends AbstractExecutorServiceCopy to clipboardErrorCopied

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。

每个线程都维护了一个双端队列,用来存储需要执行的任务。

工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。

例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。

image.png
上一篇

并发容器

下一篇

线程池