1. 常用的并发工具类
CountDownLatch减法计数器
通常用来控制线程等待,它可以让某一个线程或多个线程等待其他一组线程完成操作,再继续执行。
原理
- 通过一个计数器来实现的,计数器的初始值为需要等待线程的数量。
- 线程调用CountDownLatch的await()方法会阻塞当前线程,直到计数器的值为0。
- 当一个工作线程完成了自己的任务后,调用CountDownLatch的countDown()方法,计数器的值就会减1。
- 当计数器值为0时,说明所有的工作线程都执行完了,此时,在闭锁上等待的主线程就可以恢复执行任务。
使用场景
倒数计时器: 火箭发射。 火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {// 计数器CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(()->{try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+" 走");countDownLatch.countDown(); // 计数器减1},String.valueOf(i)).start();}countDownLatch.await(); // 如果计数器大于0一直阻塞System.out.println("done");}}
public class CountDownLatchTest1 {public static void main(String[] args) throws Exception {sixCountry();}private static void sixCountry() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "国,灭亡");countDownLatch.countDown();}, MyEnum.forEach(i).getName()).start();}countDownLatch.await();System.out.println("秦统一");}}public enum MyEnum {ONE(1, "齐"), TWO(2, "楚"),THREE(3, "燕"),FOUR(4, "赵"),FIVE(5, "魏"),SIX(6, "韩");private Integer code;private String name;public Integer getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getName() {return name;}public void setName(String name) {this.name = name;}MyEnum(Integer code, String name) {this.code = code;this.name = name;}public static MyEnum forEach(int index) {MyEnum[] countryEnums = MyEnum.values();for (MyEnum countryEnum : countryEnums) {if (index == countryEnum.getCode()) {return countryEnum;}}return null;}}
CyclicBarrier累计计数器


原理
- 通过一个计数器来实现的,计数器的初始值为需要等待线程的数量。
- 每个线程调用CyclicBarrier的await()方法,使自己进入等待状态。
- 当所有的线程都调用了CyclicBarrier的await()方法后,所有的线程停止等待,继续运行。
CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的。可以用于多线程计算数据,最后合并计算结果的场景。
package com.lymn.juc;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
// 召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <=7 ; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集"+temp+"龙珠");
try {
//每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句
cyclicBarrier.await(); // 等待
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore信号量 限流
信号量为多线程协作提供了更为强大的控制方法 。 无论是内部锁synchronized还是重入锁ReentranLock,一次都只允许一个线程范根同一个资源,而信号量却可以指定多个线程,同时访问某一资源。
信号量主要用于多个共享资源的互斥以及并发线程数的控制。
原理
- 通过一个计数器(记录许可证的数量)来实现的,计数器的初始值为需要等待线程的数量。
- 线程通过acquire()方法获取许可证(计数器的值减1),只有获取到许可证才可以继续执行下去,否则阻塞当前线程。
- 线程通过release()方法归还许可证(计数器的值加1)。
使用场景
流量控制,特别是公用资源有限的应用场景比如数据库连接。
package com.lymn.juc;
import java.util.ArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
public static void main(String[] args) {
ArrayList<String> car = new ArrayList<>();
car.add("宝马");
car.add("奔驰");
car.add("奥迪");
car.add("兰博基尼");
car.add("玛莎拉蒂");
car.add("劳斯莱斯");
// 线程数量:停车位! 限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i <car.size() ; i++) {
new Thread(()->{
try {
// acquire方法获得许可证,如果已经满了,等待
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//release方法释放许可
semaphore.release();
}
},car.get(i)).start();
}
}
}
不变模式
异步模式
ForkJoin
ForkJoin在JDK1.7,并行执行任务,大数据量!
package com.czp.forkjoin;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) {
// test1(1L, 200_0000_0000L);//执行时间为10570毫秒 sum = -2914184820805067776
test2(1L, 200_0000_0000L);//执行时间为202979毫秒 sum = -2935156330807653376
// test3(1L, 200_0000_0000L);//执行时间为15894毫秒 sum = -2914184800805067776
}
public static void test1(long start, long end) {
Instant instant = Instant.now();
long sum = 0;
for (long i = start; i < end; i++) {
sum += i;
}
Instant instant1 = Instant.now();
Duration duration = Duration.between(instant, instant1);
System.out.println("执行时间为" + duration.toMillis() + "毫秒");
System.out.println("sum = " + sum);
}
public static void test2(long start, long end) {
Instant instant = Instant.now();
ForkJoinPool joinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(start, end);
ForkJoinTask<Long> result = joinPool.submit(task);//提交任务
Long sum = null;
try {
sum = result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Instant instant1 = Instant.now();
Duration duration = Duration.between(instant, instant1);
System.out.println("执行时间为" + duration.toMillis() + "毫秒");
System.out.println("sum = " + sum);
}
// stream 并行流
public static void test3(Long start, Long end) {
Instant instant = Instant.now();
//range() 开区间 rangeClosed() 闭区间左开右闭
long sum = LongStream.rangeClosed(start, end).parallel().reduce(0, Long::sum);
Instant instant1 = Instant.now();
Duration duration = Duration.between(instant, instant1);
System.out.println("执行时间为" + duration.toMillis() + "毫秒");
System.out.println("sum = " + sum);
}
}
/**
* 求和计算的任务
* 1. forkjoinpool 通过它来执行
* 2. 计算任务forkJoinPool, execute(forkjoinTask task)
* 3. 计算类要继承自forkjointask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private long start; // 1
private long end; // 20_0000_0000
private long temp = 1_0000L;
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
if (end - start < temp) {
Long sum = 0L;
for (Long i = start; i < end; i++) {
sum += i;
}
return sum;
} else {
//分支合并计算
long middle = (end + start) / 2; //中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork(); // 拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
task2.fork();// 拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}
