1. 常用的并发工具类

CountDownLatch减法计数器

通常用来控制线程等待,它可以让某一个线程或多个线程等待其他一组线程完成操作,再继续执行。
原理

  • 通过一个计数器来实现的,计数器的初始值为需要等待线程的数量。
  • 线程调用CountDownLatch的await()方法会阻塞当前线程,直到计数器的值为0。
  • 当一个工作线程完成了自己的任务后,调用CountDownLatch的countDown()方法,计数器的值就会减1。
  • 当计数器值为0时,说明所有的工作线程都执行完了,此时,在闭锁上等待的主线程就可以恢复执行任务。

使用场景
倒数计时器: 火箭发射。 火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。

  1. public class CountDownLatchTest {
  2. public static void main(String[] args) throws InterruptedException {
  3. // 计数器
  4. CountDownLatch countDownLatch = new CountDownLatch(6);
  5. for (int i = 1; i <= 6; i++) {
  6. new Thread(()->{
  7. try {
  8. TimeUnit.SECONDS.sleep(3);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println(Thread.currentThread().getName()+" 走");
  13. countDownLatch.countDown(); // 计数器减1
  14. },String.valueOf(i)).start();
  15. }
  16. countDownLatch.await(); // 如果计数器大于0一直阻塞
  17. System.out.println("done");
  18. }
  19. }
  1. public class CountDownLatchTest1 {
  2. public static void main(String[] args) throws Exception {
  3. sixCountry();
  4. }
  5. private static void sixCountry() throws InterruptedException {
  6. CountDownLatch countDownLatch = new CountDownLatch(6);
  7. for (int i = 1; i <= 6; i++) {
  8. new Thread(() -> {
  9. System.out.println(Thread.currentThread().getName() + "\t" + "国,灭亡");
  10. countDownLatch.countDown();
  11. }, MyEnum.forEach(i).getName()).start();
  12. }
  13. countDownLatch.await();
  14. System.out.println("秦统一");
  15. }
  16. }
  17. public enum MyEnum {
  18. ONE(1, "齐"), TWO(2, "楚"),THREE(3, "燕"),FOUR(4, "赵"),FIVE(5, "魏"),SIX(6, "韩");
  19. private Integer code;
  20. private String name;
  21. public Integer getCode() {
  22. return code;
  23. }
  24. public void setCode(Integer code) {
  25. this.code = code;
  26. }
  27. public String getName() {
  28. return name;
  29. }
  30. public void setName(String name) {
  31. this.name = name;
  32. }
  33. MyEnum(Integer code, String name) {
  34. this.code = code;
  35. this.name = name;
  36. }
  37. public static MyEnum forEach(int index) {
  38. MyEnum[] countryEnums = MyEnum.values();
  39. for (MyEnum countryEnum : countryEnums) {
  40. if (index == countryEnum.getCode()) {
  41. return countryEnum;
  42. }
  43. }
  44. return null;
  45. }
  46. }

CyclicBarrier累计计数器

image.png
image.png
原理

  • 通过一个计数器来实现的,计数器的初始值为需要等待线程的数量。
  • 每个线程调用CyclicBarrier的await()方法,使自己进入等待状态。
  • 当所有的线程都调用了CyclicBarrier的await()方法后,所有的线程停止等待,继续运行。

CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的。可以用于多线程计算数据,最后合并计算结果的场景。

package com.lymn.juc;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    public static void main(String[] args) {
        // 召唤龙珠的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙成功!");
        });
        for (int i = 1; i <=7 ; i++) {
            final int temp = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"收集"+temp+"龙珠");
                try {
                    //每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句
                    cyclicBarrier.await(); // 等待
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore信号量 限流

信号量为多线程协作提供了更为强大的控制方法 。 无论是内部锁synchronized还是重入锁ReentranLock,一次都只允许一个线程范根同一个资源,而信号量却可以指定多个线程,同时访问某一资源。
信号量主要用于多个共享资源的互斥以及并发线程数的控制。
原理

  • 通过一个计数器(记录许可证的数量)来实现的,计数器的初始值为需要等待线程的数量。
  • 线程通过acquire()方法获取许可证(计数器的值减1),只有获取到许可证才可以继续执行下去,否则阻塞当前线程。
  • 线程通过release()方法归还许可证(计数器的值加1)。

使用场景
流量控制,特别是公用资源有限的应用场景比如数据库连接。

package com.lymn.juc;

import java.util.ArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    public static void main(String[] args) {
        ArrayList<String> car = new ArrayList<>();
        car.add("宝马");
        car.add("奔驰");
        car.add("奥迪");
        car.add("兰博基尼");
        car.add("玛莎拉蒂");
        car.add("劳斯莱斯");
        // 线程数量:停车位! 限流!
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i <car.size() ; i++) {
            new Thread(()->{

                try {
                    // acquire方法获得许可证,如果已经满了,等待
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //release方法释放许可
                    semaphore.release();
                }
            },car.get(i)).start();
        }
    }
}

不变模式

异步模式

ForkJoin

ForkJoin在JDK1.7,并行执行任务,大数据量!
image.png

package com.czp.forkjoin;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Test {

    public static void main(String[] args) {
//        test1(1L, 200_0000_0000L);//执行时间为10570毫秒  sum = -2914184820805067776
        test2(1L, 200_0000_0000L);//执行时间为202979毫秒   sum = -2935156330807653376
//        test3(1L, 200_0000_0000L);//执行时间为15894毫秒   sum = -2914184800805067776
    }

    public static void test1(long start, long end) {
        Instant instant = Instant.now();
        long sum = 0;
        for (long i = start; i < end; i++) {
            sum += i;
        }
        Instant instant1 = Instant.now();
        Duration duration = Duration.between(instant, instant1);
        System.out.println("执行时间为" + duration.toMillis() + "毫秒");
        System.out.println("sum = " + sum);

    }

    public static void test2(long start, long end) {
        Instant instant = Instant.now();
        ForkJoinPool joinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(start, end);
        ForkJoinTask<Long> result = joinPool.submit(task);//提交任务
        Long sum = null;
        try {
            sum = result.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        Instant instant1 = Instant.now();
        Duration duration = Duration.between(instant, instant1);
        System.out.println("执行时间为" + duration.toMillis() + "毫秒");
        System.out.println("sum = " + sum);

    }


    // stream 并行流
    public static void test3(Long start, Long end) {
        Instant instant = Instant.now();
        //range() 开区间   rangeClosed() 闭区间左开右闭
        long sum = LongStream.rangeClosed(start, end).parallel().reduce(0, Long::sum);
        Instant instant1 = Instant.now();
        Duration duration = Duration.between(instant, instant1);
        System.out.println("执行时间为" + duration.toMillis() + "毫秒");
        System.out.println("sum = " + sum);

    }
}


/**
 * 求和计算的任务
 * 1. forkjoinpool 通过它来执行
 * 2. 计算任务forkJoinPool, execute(forkjoinTask task)
 * 3. 计算类要继承自forkjointask
 */
public class ForkJoinDemo extends RecursiveTask<Long> {

    private long start;     // 1
    private long end;       // 20_0000_0000
    private long temp = 1_0000L;

    public ForkJoinDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    //计算方法
    @Override
    protected Long compute() {
        if (end - start < temp) {

            Long sum = 0L;
            for (Long i = start; i < end; i++) {
                sum += i;
            }
            return sum;
        } else {
            //分支合并计算
            long middle = (end + start) / 2;  //中间值
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork(); // 拆分任务,把任务压入线程队列
            ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
            task2.fork();// 拆分任务,把任务压入线程队列


            return task1.join() + task2.join();
        }
    }
}

参考

Semaphore