- 2 Java并发包之原子类型详解
- 2.4 AtomicReference详解
- 2.5 AtomicStampedReference详解
- 3 并发包之后工具类详解
- 4 Java并发包之并发容器详解
- 5 Java并发包之ExecutorService详解
- 6 Java Stream详解
https://item.jd.com/70765016986.html#none
2 Java并发包之原子类型详解
2.1.2 AtomicInteger的基本用法
AtomicInteger也是Number类的一个子类,除此之外,AtomicInteger还提供了很多原子性的操作方法。在AtomicInteger的内部有一个被volatile关键字修饰的成员变量value,实际上,AtomicInteger所提供的所有方法主要都是针对该变量value进行的操作。
1 AtomicInteger的创建
- AtomicInteger 创建的AtomicInteger初始值是0;
- public AtomicInteger(_int initialValue):_创建并且指定初始值。
2 AtomicInteger的Incermental操作
- int getAndIncrement() 返回当前int类型的value值,然后对value进行自增运算
- int incrementAndGet() 直接返回自增后的结果,该操作方法能够确保对value的原子性增量操作
3 AtomicInteger的Decrmental操作
- int getAndDecrement() 返回当前int类型的value值,然后对value进行自减运算
int decrementAndGet() 直接返回自减后的结果,该操作方法能够确保对value的原子性减量操作
4 原子性的更新value值
compareAndSet(_int expectedValue, int newValue) _原子性地更新AtomicInteger的的值。其中expect代表当前的AtomicInteger数值,update代表是需要设置的新值,该方法会返回一个boolean的结果,当expectValue和AtomicInteger的当前值不相等时,修改会失败。
- nt getAndAdd(_int delta)_ 原子性的更新AtomicInteger的value值,更新后的value值为value和delta之和,方法的返回值为value的前一个值,该方法实际上是基于自旋+CAS算法实现的(Compare And Swap)原子性操作。
- int addAndGet_(_int delta) 和上述一样,但是该方法会立即返回更新后的值。
2.1.3 AtomicInteger内幕
CAS包含三个操作数:内存值V,旧的预估值A,要修改的值。当且仅当预估值A和内存值V 相等时,将内存值V修改为B,否则什么都不做。
2.4 AtomicReference详解
AtomicReference类提供了对象引用的非阻塞原子性读写操作,并且提供了其他一些高级用法。
2.4.1 AtomicReference的应用场景
设计一个个人银行资金账号变化的场景,该实例有以下几点要求:
- 个人账号被设计为不可变对象,一旦创建就无法进行修改
- 个人账号类只包含两个字段:账号名、现金数字
为了验证方便,约定个人账号的现金只能多不能少。
public class DebitCard {private final String account;private final int amount;public DebitCard(String account, int amount) {this.account = account;this.amount = amount;}public int getAmount() {return amount;}public String getAccount() {return account;}@Overridepublic String toString() {return new ToStringBuilder(this).append("account", account).append("amount", amount).toString();}}
1 多线程下增加账号金额
假设10个人不断地向这个银行账号里打钱,每次都存入10元,因此这个个人账号每次被别人存入钱之后都会比10元多,下面是多线程实现
public class AromicReferenceExample1 {// volatile 关键字修饰,每次对 DebitCard 对象的写入操作都会被其他线程看到static volatile DebitCard debitCard = new DebitCard("Alex", 0);public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {//读取全局 DebitCard 对象的引用final DebitCard dc = debitCard;//基于全局的DebitCard 的金额增加10元 并且产生一个新的DebitCardfinal DebitCard newDc = new DebitCard(dc.getAccount(), dc.getAmount() + 10);System.out.println(newDc);//修改全局的 对象引用debitCard = newDc;try {TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20));} catch (InterruptedException e) {}}}, "T-" + i).start();}}}
DebitCard@cc952db[account=Alex,amount=10]DebitCard@6c0ebd5f[account=Alex,amount=10]DebitCard@624c756f[account=Alex,amount=10]DebitCard@578e1f78[account=Alex,amount=10]DebitCard@77855513[account=Alex,amount=10]……
虽然被volatile关键字修饰的变量每次更改都可以立即被其他线程看到,但是我们针对对象引用的修改其实至少包含了两个步骤:获取该引用和改变该引用(每一个步骤都是原子性的操作,但组合起来就无法保证原子性了)
2 多线程下加锁增加账号资金
synchronized (AromicReferenceExample1.class) {//读取全局 DebitCard 对象的引用final DebitCard dc = debitCard;//基于全局的DebitCard 的金额增加10元 并且产生一个新的DebitCardfinal DebitCard newDc = new DebitCard(dc.getAccount(), dc.getAmount() + 10);System.out.println(newDc);//修改全局的 对象引用debitCard = newDc;}
3 AtomicReference的非阻塞解决方案
public class AromicReferenceExample3 {//定义 AtomicReference ,初始化为 0private static AtomicReference<DebitCard> debitCardRef =new AtomicReference<>(new DebitCard("Alex", 0));public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {// 获取 AtomicReference 的当前值final DebitCard dc = debitCardRef.get();final DebitCard newDc = new DebitCard(dc.getAccount(), dc.getAmount() + 10);//基于 CAS 算法更新 AtomicReference 的当前值if (debitCardRef.compareAndSet(dc, newDc)) {System.out.println(newDc);}try {TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20));} catch (InterruptedException e) {}}}, "T-" + i).start();}}}
DebitCard@43d9754b[account=Alex,amount=100]DebitCard@4b526515[account=Alex,amount=50]DebitCard@469f3c64[account=Alex,amount=10]DebitCard@4965b721[account=Alex,amount=70]
CAS算法在此处要确保接下来的要修改的引用对象是基于当前线程刚获取的对象引用,否则更新直接失败。
输出金额安装10的步长在增长,由于非阻塞的缘故,数值100可能会出现在前面。
2.4.2 AtomicReference的基本用法
- AtomicReference 的构造:AtomicReference 是一个泛型,它的构造与其他原子类型的构造一样,也提供了无参和一个有参数的构造函数。当使用无参构造函数时,需要再次调用set为其内部指定初始值
- compareAndSet_(_V expectedValue, V newValue) :原子性的更新AtomicReference内部的value值,其中expectValue代表当前AtomicReference的value值,newValue则是需要设置的新引用的值。
- getAndSet_(_V newValue):原子性的更新AtomicReference内部的vlaue值,并返回AtomicReference的旧值。
- getAndUpdate(_UnaryOperator<_V_> updateFunction)getAndUpdate(UnaryOperator<_V_> _updateFunction) :原子性的更新value,并且返回AtomicReference的旧值
static AtomicReference<DebitCard> debitCardRef =new AtomicReference<>(new DebitCard("Alex", 0));final DebitCard preDc = debitCardRef.get();final DebitCard result = debitCardRef.getAndUpdate(dc -> new DebitCard(dc.getAccount(), dc.getAmount() + 10));assert preDc == result;assert result != debitCardRef.get();
2.5 AtomicStampedReference详解
原子类型用自旋+CAS的无锁操作保证了共享变量的线程安全性和原子性。
绝大多数情况下,CAS算法并没有什么问题,但是在需要关心变化值的操作中会存在问题,比如一个值原来是A,变成了B,后来又变成了A ,那么CAS检查是会发现值没有发生变化,但是实际上确是发生了变化的。
2.5.1 CAS算法ABA问题
CAS算法在需要关心变化的操作中将会存在ABA问题。
假设linkedStack有两个元素,经过了push B 和push A,操作后,栈数据如下
假设此时线程T1想从栈顶弹出A ,实际上就是用A.next(B)替换top,在线程T1即将执行时,线程T2进入了执行,线程T2对A B 分别进行了弹出操作,然后又执行了D C A元素的push操作,栈中数据如下
B元素此时变成了游离态。
2.5.2 AtomicStampedReference详解
为了避免CAS带来的ABA问题,针对乐观锁在并发情况下,我们通常会增加版本号,比如数据库中关于乐观锁的实现方式,以此解决并发操作带来的ABA,在Java原子包中也提供了这样的实现AtomicStampedReference。
AtomicStampedReference在构建时需要一个类似与版本号的int类型变量stamped,每一次针对共享数据的变化都会导致该stamped增加(自增需要应用程序自身去维护,AtomicStampedReference并不提供),因此可以解决ABA问题。
AtomicStampedReference内部将会将两个变量封装成Pair对象
private static class Pair<T> {final T reference;final int stamp;private Pair(T reference, int stamp) {this.reference = reference;this.stamp = stamp;}static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);}}
AtomicStampedReference<String> reference = new AtomicStampedReference<>("hello", 1);//失败assert reference.compareAndSet("hello", "world", 2, 3);assert reference.compareAndSet("hello", "world", 1, 2);
3 并发包之后工具类详解
3.1 CountDownLatch工具详解
CountDownLatch 倒计数门阀。
CountDownLatch是一个同步助手,允许一个或多个线程等待一些列的其他线程执行结束。
3.11 等待所有子任务结束
import org.apache.commons.lang.builder.ToStringBuilder;import java.util.Arrays;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;import java.util.stream.IntStream;import static java.util.concurrent.ThreadLocalRandom.current;/*** @author study* @version 1.0* @date 2021/3/8 14:21*/public class CountDownLatchExample1 {public static void main(String[] args) throws InterruptedException {//首先获取商品编号的列表final int[] products = getProductsByCategoryId();//通过Stream 的map 运算 将商品编号转为 ProductPricefinal List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());// ① 定义countdownlatch 计算器 数量为子任务的 个人final CountDownLatch latch = new CountDownLatch(products.length);list.forEach(pp -> {//② 为每一件商品的计算都开辟对应的线程new Thread(() -> {System.out.println(pp.getProdID() + " start calculate price ");try {//模拟其他的系统调用,使用睡眠代替耗时TimeUnit.SECONDS.sleep(current().nextInt(10));//计算价格if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.prodID * 0.9d);} else {pp.setPrice(pp.prodID * 0.71d);}System.out.println(pp.getProdID() + " -> price calculate completed.");} catch (Exception e) {e.printStackTrace();} finally {//③ 计数器 count down ,子任务执行完成latch.countDown();}}).start();});// ④ 主线程阻塞等待所有子任务结束,如果有一个子任务没有完成则会一直等待。latch.await();System.out.println("all of price calculate finished.");list.forEach(System.out::println);}//根据品类ID获取商品列表private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray();}private static class ProductPrice {private final int prodID;private double price;public ProductPrice(int prodID) {this(prodID, -1);}public ProductPrice(int prodID, double price) {this.prodID = prodID;this.price = price;}public int getProdID() {return prodID;}public void setPrice(double price) {this.price = price;}@Overridepublic String toString() {return new ToStringBuilder(this).append("prodID", prodID).append("price", price).toString();}}}
执行countDown方法,计数器减一,表名子任务执行结束,需要注意的是,任务的结束并不一定会代表着正常的结束,可能是运算的过程中出现错误,因为为了能够正确地执行countDown方法,需要将该方法的调用放在finally代码块中,否则会出现主线程await方法永远不会退出阻塞的状态。
3.2 CyclicBarrier工具详解
CyclicBarrier(循环屏障),它也是一个同步助手工具,它允许多个线程在执行完相应的操作之后彼此等待共同达到一个障点(barrier point)。CyclicBarrier也非常适用于某个串行化的任务被拆分成若干个并行执行的子任务,当所有的子任务都执行结束之后在继续接下来的工作。从这一点看,CyclicBarrier和CountDownLatch非常类似,但是它们之间的运行方式以及原理还是存在比较大的差异的,并且CyclicBarrier所能支持的功能CountDownLatch是不具备的。比如CyclicBarrier可以被重复使用,而CountDownLatch当计数器为0的时候就无法再次利用
3.2.1 等待所有子任务结束
import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;import java.util.stream.IntStream;import static java.util.concurrent.ThreadLocalRandom.current;/*** @author study* @version 1.0* @date 2021/3/8 14:54*/public class CyclicBarrierExample1 {public static void main(String[] args) throws InterruptedException {//首先获取商品编号的列表final int[] products = getProductsByCategoryId();//通过Stream 的map 运算 将商品编号转为 ProductPricefinal List<CountDownLatchExample1.ProductPrice> list = Arrays.stream(products).mapToObj(CountDownLatchExample1.ProductPrice::new).collect(Collectors.toList());// ① 定义 CyclicBarrier 计算器 数量为子任务的 数量final CyclicBarrier barrier = new CyclicBarrier(products.length);// ② 存放线程任务的listfinal List<Thread> threadList = new ArrayList<>();list.forEach(pp -> {final Thread thread = new Thread(() -> {System.out.println(pp.getProdID() + " start calculate price ");try {//模拟其他的系统调用,使用睡眠代替耗时TimeUnit.SECONDS.sleep(current().nextInt(10));//计算价格if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.prodID * 0.9d);} else {pp.setPrice(pp.prodID * 0.71d);}System.out.println(pp.getProdID() + " -> price calculate completed.");} catch (Exception e) {e.printStackTrace();} finally {//③ 在此等待其他子线程到达 barrier pointertry {barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});threadList.add(thread);thread.start();});// ④ 主线程阻塞等待所有子任务结束threadList.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("all of price calculate finished.");list.forEach(System.out::println);}//根据品类ID获取商品列表private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray();}}
和CountDownLatch同样是进行子任务并行化的执行并且等待所有子任务结束,但是它们的执行方式却存在很大的差异。在子线程任务中,当执行结束后调用await方法使当前的子线程进入阻塞状态,知道其他所有的子线程都结束了任务的运行之后,它们才能退出阻塞,下面解释几个关键的地方
- 在注释① 处定义一个CyclicBarrier,虽然要求传入大于0的int数字,但是它所代表的含义是“分片”而不再是计数器,虽然它与计数器几乎类似。
- 在注释②定义了Thread List,用于存放已经被启动的线程,其主要作用是为了后面等待所有的任务结束而准备
- 在注释③ ,子线程运行结束后,调用await方法等待其他子线程也允许结束到达一个共同的barrier point ,该await方法会返回一个int的值,该值所代表的意思是当前任务达到的次序(就是这个线程是第几个运行完相关逻辑单元的)
- 在注释④ ,逐一调用每一个子线程的join方法,使当前线程进入阻塞等待所有的子线程运行结束。
在注释④给出的方案虽然能达到目的。但是这种方式不太优雅。可以多给 CyclicBarrier 定义一个,使其等待
final CyclicBarrier barrier = new CyclicBarrier(products.length+1);
list.forEach(
pp -> {
………………………………
barrier.await()
System.out.println("all of price calculate finished.");
list.forEach(System.out::println);
通过barrier的数据多加一个分片的方式,将主线程也当成子任务线程,这个时候,主线程就可以调用await方法,等待其他的线程运行结束并且到达barrier point,进而退出阻塞进入下一个运算逻辑。
3.2.2 CyclicBarrier的循环特性

CyclicBarrier的另一个很好的特性是可以被循环使用,也就是说当其内部的计数器为0之后还可以在接下来的使用中重置而无需重新定义一个新的。例如上图。
只有所有的旅客都上了大巴之后才能将开车到下一个旅游景点,当大巴达到旅游景点之后,导游会进行人数清点以确认车上没有游客由于睡觉而逗留,车才能开去停车场。所有游客都全部上车和下车后才能统一行动。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.ThreadLocalRandom.current;
/**
* @author study
* @version 1.0
* @date 2021/3/8 16:02
*/
public class CyclicBarrierExample2 {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
//定义CyclicBarrier 这里的 parties 值为11
final CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
//创建10 个线程
for (int i = 0; i < 10; i++) {
//定义旅客线程,传入旅客编号和barrier
new Thread(new Tourist(i, cyclicBarrier)).start();
}
//主线程池等待所有游客都上了大巴车
cyclicBarrier.await();
System.out.println("Tour guider :all of tourist get on the bus");
//主线程进入阻塞,等待所有旅客下大巴车
cyclicBarrier.await();
System.out.println("Tour guider :all of tourist get off the bus");
}
private static class Tourist implements Runnable {
private final int touristID;
private final CyclicBarrier barrier;
public Tourist(int touristID, CyclicBarrier barrier) {
this.touristID = touristID;
this.barrier = barrier;
}
@Override
public void run() {
System.out.printf("Tourist:%d by bus\n", touristID);
//模拟乘客上车的时间开销
this.spendSeveralSeconds();
//上车后等待其他同伴上车
this.waitAndPrint("Tourist:%d get on the bus,and await other people reached. \n");
//模拟乘客下车的时间开销
this.spendSeveralSeconds();
//下车后等待其他同伴下车
this.waitAndPrint("Tourist:%d get off the bus,and await other people get off \n");
}
private void waitAndPrint(String msg) {
System.out.printf(msg, touristID);
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
}
}
private void spendSeveralSeconds() {
try {
TimeUnit.SECONDS.sleep(current().nextInt(10));
} catch (InterruptedException e) {
}
}
}
}
在上面的程序中,根据之前的描述对旅客上车后同一发车,以及达到目的地下车后的统一控制。自始至终我们都是使用同一个CyclicBarrier来进行控制的。在这里需要注意的是,在主线程总的两次await中间没有对barrier进行reset的操作,那是因为在CyclicBarrier内部维护了一个count。当所有的await调用导致其值为0的时候,reset相关的操作会默认执行。
3.2.3 CyclicBarrier的其他方法以及总结
其他方法和构造方式
- CyclicBarrier_(_int parties, Runnable barrierAction : 构造CyclicBarrier不仅传入parties,而且指定一个Runnable接口,当所有的线程到达barrier point 的时候,该Runnable接口会被调用,有时我们需要在所有执行结束之后执行某个动作,这时就可以使用这种构造方式了。
- getParties :获取CyclicBarrier在构造时的parties,该值一经CyclicBarrier创建将不会被改变
- await :调用该方法后,当前线程将会进入阻塞状态,等待其他线程执行await方法进入barrier point,进而全部退出阻塞状态。当CyclicBarrier内部的count为0时,调用await方法将直接返回而不是进入阻塞状态。
- await_(_long timeout, TimeUnit unit :该方法和无参数的await类似,只不过是增加了超时功能,当其它线程在设定的时间内没有达到barrier point时,当前线程也会退出阻塞。
isBroken() 返回barrier的broken状态,某个线程由于执行await方法而进入阻塞状态,如果该线程执行了中断操作,那么isBroken方法将返回true。
public class CycliDemo { public static void main(String[] args) throws InterruptedException { final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { try { //thread 会进入阻塞状态 barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); //两秒之后执行 thread的中断方法 TimeUnit.SECONDS.sleep(2); //调用中断 thread.interrupt(); // 短暂睡眠,确保thread 的执行动作发生在main线程读取broken状态之前 TimeUnit.SECONDS.sleep(2); System.out.println(barrier.isBroken()); } }
3.2.4 CyclicBarrier VS CountDownLatch
它们之间的差别包括但不限于以下:
- CountDownLatch的await方法会等待计数器被 cout down到0,而执行CyclicBarrier的await方法的线程将会等待其他线程达到barrier point。
- CyclicBarrier内部的计数器count是可以被重置的,进而使得CyclicBarrier也可以被重复使用,而CountDownLatch则不能。
- CyclicBarrier是由Lock和condition实现的,而CountDownLatch是由同步控制器AQS(Abstract Queued Synchronizer是)实现的。
- 在构造CyclicBarrier时不允许parties为0,而CountDownLatch则允许为0;
3.2 Exchanger工具详解
Exchanger简化了两个线程之间的数据交互,并且提供了两个线程之间的数据交换点。Exchanger等待两个线程调用其exchange方法,调用次方法时交换机会交换两个线程 提供给对方的数据。
3.3.1 一对线程间的数据交换
Exchanger在某种程度上可以看成是生产者和消费者模式的实现,但是它重点关注的是数据交换,所谓交换就是我给你A,你会给我B,而生产者消费者模式中间使用队列将其解耦。生产者只需要在队列中放入元素,它并不在乎是否有消费者的存在,同样消费者也只是从队列中获取元素,并不关心生产者是否存在。
下面的代码中定义两个线程T1 T2,分别调用Exchanger的exchange方法将各自的数据传递给对方,在这里需要注意的是,每个线程在构造数据时开销是不一样的。因此调用exchange的时机并不是同一时刻,当T1线程在执行exchange方法的时候,若干T2方法没有执行exchange方法,,那么T1线程会进入阻塞状态等待T2线程执行exchange方法,只有当两个线程都执行了exchange之后,它们才会退出。
import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @author study
* @version 1.0
* @date 2021/3/9 11:13
*/
public class ExchangerExample1 {
public static void main(String[] args) {
//定义exchange类,String类型 标明一对线程交换的数据只能是String
final Exchanger<String> exchanger = new Exchanger<>();
//定义线程1
new Thread(() -> {
System.out.println(Thread.currentThread() + " start.");
//随机睡眠
randomSleep();
//① 执行exchange 方法,将对应的数据传递给线程T2,同时从T2线程获取交换的数据
try {
final String data = exchanger.exchange("T am from T1");
System.out.println(Thread.currentThread() + " received ->" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " end.");
}, "T1").start();
new Thread(() -> {
System.out.println(Thread.currentThread() + " start.");
//随机睡眠
randomSleep();
//① 执行exchange 方法,将对应的数据传递给线程T2,同时从T2线程获取交换的数据
try {
final String data = exchanger.exchange("T am from T2");
System.out.println(Thread.currentThread() + " received ->" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " end.");
}, "T2").start();
}
private static void randomSleep() {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
}
}
}
如果我们只希望一个线程生成数据,另外一个线程处理数据。也就是说其中A线程会用到B线程交换过来的数据,而B线程压根不会用到A线程交换过来的数据。
import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @author study
* @version 1.0
* @date 2021/3/9 11:37
*/
public class ExchangerExample2 {
public static void main(String[] args) throws InterruptedException {
//定义数据类型为string 的exchange
final Exchanger<String> exchanger = new Exchanger<>();
//定义 StringGenerator 线程
final StringGenerator generator = new StringGenerator(exchanger, "Generator");
//定义 StringConsumer 线程
final StringConsumer consumer = new StringConsumer(exchanger, "Consumer");
//分别启动线程
generator.start();
consumer.start();
//休眠1分钟后关闭 两个线程
TimeUnit.MINUTES.sleep(1);
consumer.close();
generator.close();
}
//定义 Closable 接口
private interface Closable {
//关闭方法
void close();
//判断当前线程是否被关闭
boolean closed();
}
private abstract static class ClosableThread extends Thread implements Closable {
protected final Exchanger<String> exchanger;
private volatile boolean closed = false;
public ClosableThread(Exchanger<String> exchanger, final String name) {
super(name);
this.exchanger = exchanger;
}
@Override
public void run() {
//当前线程未关闭时不断执行 doExchange 方法
while (!closed) {
this.doExchange();
}
}
protected abstract void doExchange();
//关闭当前线程
@Override
public void close() {
System.out.println(Thread.currentThread() + " will be closed.");
this.closed = true;
this.interrupt();
}
@Override
public boolean closed() {
return this.closed || this.isInterrupted();
}
}
private static class StringGenerator extends ClosableThread {
private char initialChar = 'A';
public StringGenerator(Exchanger<String> exchanger, String name) {
super(exchanger, name);
}
@Override
protected void doExchange() {
//模拟数据生成
String str = "";
for (int i = 0; i < 3; i++) {
randomSleep();
str += (initialChar++);
}
//① 如果当前线程未关闭,则执行 exchanger的 exchange方法
if (!closed()) {
try {
exchanger.exchange(str);
} catch (InterruptedException e) {
//如果关闭了 执行close 方法,那么执行终端操作时会捕获到中断信号
System.out.println(currentThread() + " receive the close sinnal .");
}
}
}
private static void randomSleep() {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) { }
}
}
private static class StringConsumer extends ClosableThread {
public StringConsumer(Exchanger<String> exchanger, String name) {
super(exchanger, name);
}
@Override
protected void doExchange() {
//② 如果线程未关闭,则执行 exchanger 的exchange方法
try {
if (!this.closed()) {
final String data = exchanger.exchange(null);
System.out.println("received the data :" + data);
}
} catch (InterruptedException e) {
System.out.println(currentThread() + " receive the close sinnal .");
}
}
}
}
虽然注释①处,Generator线程进行了数据交换,但是它并不关心另外一个Consumer交换过来的数据。同样在注释②处直接使用null作为exchange的数据对象。
3.4 Semaphore工具详解
Semaphore(信号量)是一个线程同步工具,主要用于在一个时刻运行多个线程对共享资源进行并行操作的场景。使用Semaphore的过程实际上是多个线程获取访问共享资源许可证的过程。以下是Semaphore的内部处理逻辑。
- 如果此时Semaphore内部的计数器大于零,那么线程将可以获得小于该计数器数量的许可证,同时还会导致Semaphore内部的计数器减少锁发生的许可证数量。
- 如果此时Semaphore内部的计数器等于0 ,也就是没有没有可用的许可证,那么当前线程有可能会被阻塞(使用tryAcquire时不会被阻塞)
- 当线程不再使用许可证时,需要立即将其释放以供其他线程使用,所有建议将许可证的获取以及释放动作写在try finally语句中。
Semaphore基本流程如下:
3.4.1 Semaphore限制同时在线的用户数量
模拟系统登录,最多限制给定数量的人同时在线,如果所能申请的许可证不足,告知用户无法登陆。
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* @author study
* @version 1.0
* @date 2021/3/9 15:29
*/
public class SemaphoreExample1 {
public static void main(String[] args) {
//定义许可证数量,最多允许10个用户同时在线
final int MAX_PERMIT_LOGIN_ACCOUNT = 10;
final LoginService login = new LoginService(MAX_PERMIT_LOGIN_ACCOUNT);
//启动20个线程
IntStream.range(0, 20).forEach(i -> {
new Thread(() -> {
final boolean logins = login.login();
if (!logins) {
System.out.println(Thread.currentThread() + " is refused dule to exceed max online account.");
return;
}
try {
// 模拟登陆成功后的系统操作
simulateWork();
} catch (Exception e) {
} finally {
login.logout();
}
}, "User-" + i).start();
});
}
//随机睡眠
private static void simulateWork() {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
}
}
private static class LoginService {
private final Semaphore semaphore;
public LoginService(int maxPermitLonginAccount) {
this.semaphore = new Semaphore(maxPermitLonginAccount, true);
}
public boolean login() {
//获取许可证,如果获取失败返回false,tryAcquire 不是一个阻塞方法
final boolean login = semaphore.tryAcquire();
if (login) {
System.out.println(Thread.currentThread() + " login success.");
}
return login;
}
//释放许可证
public void logout() {
semaphore.release();
System.out.println(Thread.currentThread() + " login out.");
}
}
}
3.4.2 使用Semaphore定义try lock
借助Semaphore提供的方法实现一个显示锁,该锁的主要作用是try锁,若获取不到锁就立即放回。
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample2 {
public static void main(String[] args) {
final TryLock tryLock = new TryLock();
//启动一个线程,尝试获取tryLock 如果获取不成功 则将进行其他的操作,该线程不进入阻塞
new Thread(() -> {
final boolean gotlock = tryLock.tryLock();
if (!gotlock) {
System.out.println(Thread.currentThread() + " can't get the lock ,will do other thing");
return;
}
try {
simulateWork();
} finally {
tryLock.unlock();
}
}).start();
//主线程也参与tryLock 的争抢,如果抢不到,则main 线程不会进去阻塞
final boolean b = tryLock.tryLock();
if (!b) {
System.out.println(Thread.currentThread() + " can't get the lock ,will do other thing");
} else {
try {
simulateWork();
} finally {
tryLock.unlock();
}
}
}
private static void simulateWork() {
try {
System.out.println(Thread.currentThread() + " get lock and do working ");
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
}
}
private static class TryLock {
//定义 permit 为1的Semaphore
private final Semaphore semaphore = new Semaphore(1);
public boolean tryLock() {
return semaphore.tryAcquire();
}
public void unlock() {
semaphore.release();
System.out.println(Thread.currentThread() + " release lock ");
}
}
}
3.4.3 Semaphore其他方法
1 Semaphore构造
- Semaphore(_int permits, boolean fair) _定义许可证的同时指定公平或者非公平同步器
2 tryAcquire
该方法尝试想Semaphore获取许可证,如果此时许可证的数量少于申请的数量,则对应的线程会立即返回,结果为false表示申请失败。
3 acquire
也是向Semaphore获取许可证,获取不到会一直阻塞。该方法允许被中断。
4 acquireUninterruptibly
acquireUninterruptibly:不仅在没有可用许可证的情况下执行等待,而且对“别人的劝阻”它还会无视,因此使用这类情况操作时必须小心。
5正确使用release
在一个Semaphore中,许可证的数量可用于控制在同一事件运行多个线程对共享资源进行访问,所以许可证的数量非常珍贵。因此当每一个线程对Semaphore许可证的使用之后立即将其释放,允许其他线程有机会争取许可证。
下面是Semaphore使用不当情况:
public class SemaphoreExample3 {
public static void main(String[] args) throws InterruptedException {
//定义只有一个许可证的semaphore
final Semaphore semaphore = new Semaphore(1, true);
final Thread t1 = new Thread(() -> {
try {
semaphore.acquire();
System.out.println("The thread t1 acquired permit form semaphore");
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
System.out.println("the thread t1 is Interrupted");
} finally {
semaphore.release();
}
});
t1.start();
//为了确保t1 启动 ,在主线程中 休眠 1 秒 等待
TimeUnit.SECONDS.sleep(1);
final Thread t2 = new Thread(() -> {
try {
//阻塞式获取一个许可证
semaphore.acquire();
System.out.println("The thread t2 acquired permit form semaphore");
} catch (InterruptedException e) {
System.out.println("the thread t1 is Interrupted");
} finally {
semaphore.release();
}
});
//启动 t2,休眠2s
t2.start();
TimeUnit.SECONDS.sleep(2);
//对线程t2 中断
t2.interrupt();
//主线程获取许可证
semaphore.acquire();
System.out.println("the main thread acquired permit.");
}
}
运行结果
The thread t1 acquired permit form semaphore
the thread t2 is Interrupted
the main thread acquired permit.
许可证书仅有一个,而且其已经被线程t1获取,为什么主线程还能获取到许可证?
问题是finally块导致的。当线程t2被其他线程中断或者自身原因出现异常的时候,它释放了不属于自己的许可证,导致Semaphore内部的可用许可证计数器增多,其他线程才有机会获取原本不属于它的许可证。
改进如下
final Thread t2 = new Thread(() -> {
try {
//阻塞式获取一个许可证
semaphore.acquire();
} catch (InterruptedException e) {
System.out.println("the thread t2 is Interrupted");
//出现异常则不再进行
return;
}
try {
System.out.println("The thread t2 acquired permit form semaphore");
} catch (Exception e) {
} finally {
semaphore.release();
}
});
3.4.5 Semaphore总结
Semaphore(信号量)是一个非常好的高并发工具类。它允许最多可以有多个线程同时对共享数据进行访问。
虽然Semaphore可以控制多个线程对共享资源进行访问,但是对于共享资源的临界区以及线程安全,Semaphore并不提供任何保证。
3.5 Phaser工具详解
CountDownLatch、CyclicBarrier、Exchanger、Semaphore这几个同步工具都是JDK1.5版本 引入的。而Phaser是JDK1.7版本才加入的。Phaser同样也是一个多线程的同步助手工具,它是一个可被重复使用的同步屏障,功能非常类似CyclicBarrier和CountDownLatch的合集。
3.5.1 Phaser的基本用法
CountDownLatch可以很好地控制等待多个子线程执行完任务,但是有一个缺点,那就是内部的计数器无法重置,也就是说CountDownLatch属于一次性的,使用结束后不能再次使用。CyclicBarrier倒是可以重复使用,但是一旦parties在创建的时候被指定,就无法再改变。Phaser则集合了两者的特点。
1 将Phaser当做CountDownLatch来使用
import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class PhaserExample1 {
public static void main(String[] args) throws InterruptedException {
//①定义一个phaser ,并未指定分片数量parties,此时Phaser内部分片的数量parties 默认为0
//后面可以通过register 方法动态增加
final Phaser phaser = new Phaser();
for (int i = 0; i < 10; i++) {
//定义10个线程
new Thread(() -> {
//② 首先调用phaser的 register 方法使得phaser内部的parties加一
phaser.register();
try {
//采取随机休眠的方式模拟线程的运行开销
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
// ③ 线程结束 执行arrive方法
phaser.arrive();
System.out.println(new Date() + ":" + Thread.currentThread() + " completed the work.");
} catch (Exception e) {
e.printStackTrace();
}
}, "T" + i).start();
}
TimeUnit.SECONDS.sleep(5);
//④ 主线程也调用注册方法,此时 parties的数量为 11
phaser.register();
// ⑤ 主线程也 arrive,但是它要等待下一个阶段,等待下一个阶段的前提是所有的线程都 arrive,也就是phaser 的 unarrived数量为0;
phaser.arriveAndAwaitAdvance();
assert phaser.getRegisteredParties() == 11 : "total 11 parties is registered ";
System.out.println(new Date() + ": all of sub task completed work.");
}
}
2 将Phaser当作CyclicBarrier来使用
借助CyclicBarrier来完成CyclicBarrier来使用,即所有的子系统共同达到一个barrier point。
import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @author study
* @version 1.0
* @date 2021/3/12 10:48
*/
public class PhaserExample2 {
public static void main(String[] args) throws InterruptedException {
//定义一个分片parties为0的Phaser
final Phaser phaser = new Phaser();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
//子线程调用注册方法,当子线程都执行了 都执行了register,parties将尾10
phaser.register();
try {
//采取随机休眠的方式模拟线程的运行开销
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
//调用 arriveAndWaitAdvance 方法等待所有线程 arrive,然后继续前行
phaser.arriveAndAwaitAdvance();
System.out.println(new Date() + ":" + Thread.currentThread() + " completed the work.");
} catch (Exception e) {
e.printStackTrace();
}
}, "T" + i).start();
}
//休眠以确保其子线程顺利调用register方法
TimeUnit.SECONDS.sleep(5);
//主线程调用 register方法,此时 pahser 内部的 parties为11
phaser.register();
phaser.arriveAndAwaitAdvance();
System.out.println(new Date() + ": all of sub task completed work.");
}
}
3.6 Lock&ReentrantLock详解
JDK1.5版本之后增加了对显示锁的支持,显示锁Lock除了能够完成关键字synchronized的语义和功能之外,还提供了很多灵活方便的方法,比如,我们可以通过对显示锁对象提供的方法查看有哪些线程被阻塞,可以创建Condition对象进行线程间的通信,可以中断由于获取锁而被阻塞的线程,设置获取锁的超时时间等一些列synchronized关键字不具备的能力。
3.6.1 Lock及ReentrantLock方法详解
1 Lock接口方法
Lock接口是对锁操作方法的一个基本定义,它提供了synchronized关键字所具备的全部功能方法。另外我们可以借助Lock创建不同的Conditon对象进行多线程间的通信操作。与关键字synchronized进行方法同步代码块同步的方式不同,Lock提供了编程式的锁获取。
2 ReentrantLock扩展方法
在显示锁Lock接口诸多实现中,我们用的最多的就是ReentrantLock。该类完全实现了显示锁Lock接口所定义的接口,也扩展了对使用锁Lock的一些监控方法。
- getHoldCount:查询当前线程在某个lock上的数量,如果当前线程成功获取了Lock,那么该值大于等于1,如果没有获取到Lock的线程调用该方法,则返回值为0。
- isHeldByCurrentThread:判断当前线程是否持有某个Lock。由于Lock的怕他性,因此在某个时刻只有一个线程调用该方法返回true。
- isLocked:判断Lock是否已经被线程持有
- isFair:判断创建的ReentrantLock是否为公平锁
- hasQueuedThreads:在多个线程视图获取Lock的时候,只有一个线程能够正常获得,其他线程可能(如果使用tryLock 方法则不会进入阻塞)会进入阻塞,该方法的作用就是查询是否有线程正在等待获取锁。
- hasQueuedThread(Thread thread):在等待获取的线程中是否包含某个指定的线程。
- getQueueLength:返回当前有多少个线程正在等待获取锁。
3.6.2 正确使用显示锁Lock
无论是Lock接口还是synchronized关键字,主要是帮助我们解决多线程资源的竞争问题,也就是说在同一时刻只能有一个线程对共享资源进行访问,即排他性,另外就是确保若干指令执行的原子性。
1 确保已获取锁的释放
未获取到许可证permit的线程也可以调用Semaphore的release方法,使得当前的可用许可证permit数量增多,但是在lock中不存在这样的情况。
Lock被重入(多次获取),每一次的重入都会在hold计算器原来的数据基础上加一,显示锁lock需要程序员手动控制对锁的释放操作。lock被第二次获取之后只进行了一次unlock操作,这就导致当前线程对该锁的hold数量仍旧非0,因此并未完成对该锁的释放行为,进而导致其他线程无法获取该锁而处于阻塞状态,若程序出现这样的情况是非常危险的。因为匿名线程生命周期结束后,线程本身对象引用还被AQS的exclusiveOwnerThread所持有,但是线程本身已经死亡,这样一来就没有任何线程能够对当前锁进行释放,更谈不上获取了。
2 避免锁的交叉使用引起死锁
public class ReentrantLockExample2 {
private static final Lock lock1 = new ReentrantLock();
private static final Lock lock2 = new ReentrantLock();
private static void m1() {
lock1.lock();
System.out.println(Thread.currentThread() + " get lock1.");
try {
lock2.lock();
System.out.println(Thread.currentThread() + " get lock2.");
try {
} catch (Exception e) {
} finally {
lock2.unlock();
System.out.println(Thread.currentThread() + " release lock2.");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock1.unlock();
System.out.println(Thread.currentThread() + " release lock1.");
}
}
private static void m2() {
lock2.lock();
System.out.println(Thread.currentThread() + " get lock2.");
try {
lock1.lock();
System.out.println(Thread.currentThread() + " get lock1.");
try {
} catch (Exception e) {
} finally {
lock1.unlock();
System.out.println(Thread.currentThread() + " release lock1.");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock2.unlock();
System.out.println(Thread.currentThread() + " release lock2.");
}
}
public static void main(String[] args) {
new Thread(() -> {
while (true) {
m1();
}
}, "").start();
}
}
3 多个原子性方法的组合不能确保原子性
public class ReentrantLockExample3 {
public static void main(String[] args) {
//启动10个线程
final Accumulator accumulator = new Accumulator();
for (int i = 0; i < 10; i++) {
new AccumlatorThread(accumulator).start();
}
}
private static class AccumlatorThread extends Thread {
private final Accumulator accumulator;
public AccumlatorThread(Accumulator accumulator) {
this.accumulator = accumulator;
}
@Override
public void run() {
//不断地调用 addX addY
while (true) {
accumulator.addX();
accumulator.addY();
if (accumulator.getX() != accumulator.getY()) {
System.out.printf("The x:%d not equals y:%d\n", accumulator.getX(), accumulator.getY());
}
}
}
}
private static class Accumulator {
private static final Lock lock = new ReentrantLock();
private int x = 0;
private int y = 0;
void addX() {
lock.lock();
try {
x++;
} finally {
lock.unlock();
}
}
void addY() {
lock.lock();
try {
y++;
} finally {
lock.unlock();
}
}
int getX() {
lock.lock();
try {
return x;
} finally {
lock.unlock();
}
}
int getY() {
lock.lock();
try {
return y;
} finally {
lock.unlock();
}
}
}
}
3.7 ReadWriteLock & ReentrandReadWriteLock
3.7.4 读写锁总结
读写锁提供了非常好的思路和解决方案,旨在提高某个时刻都为读操作的并发吞吐量,但是从基准测试的结果来看性能不尽如人意,因此JDK1.8版本中引入了StampedLock的解决方案。
3.8 Condition详解
Condition对象是由某个显示锁Lock创建的,一个显示锁Lock可以创建多个Condition对象与之间关联,Condition的作用在于控制锁并且判断某个条件(临界值)是否满足,如果不满足,那么使用该锁的线程会被挂起等待另外的线程将其唤醒,与此同时被挂起的线程将会进入阻塞队列中并且释放对显示锁Lock的持有。
3.8.1 Condition
Condition接口提供了与传统线程间通信方式(对象monitor方法)更多的操作方法,Condition不能被创建,只能与某个显示锁Lock进行创建并且与之关联,下面的例子快速地实现一个例子体验Condition的用法
- 本例中将有两个线程分别对数据进行读写
- 当数据发生变化时,读取数据的线程才会对其进行读取和进一步的处理,当数据未发生变化时读数据的线程将会等待
当数据未被读取时,修改数据的线程将会进入阻塞队列,知道给数据被使用过后才会进一步地产生新的数据。 ```java public class ConditionExample { //定义共享数据 private static int shareData = 0; //定义布尔变量标识当前的共享数据是否已经被使用 private static boolean dataUsed = false; //创建显示锁 private static final Lock lock = new ReentrantLock();
//① 使用显示锁创建Condition 对象并且与之关联 private static final Condition condition = lock.newCondition();
//对数据的写操作 private static void change() { //获取锁,如果当前锁被其他线程持有,则当前线程会进入阻塞 lock.lock(); try {
//② 如果当前锁被其他线程持有,则当前线程会进入阻塞 while (!dataUsed) { condition.await(); } //修改数据 ,并且将 dataUsed 状态标识为 fasle、 TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5)); shareData++; dataUsed = false; System.out.println("product the new value :" + shareData); //③ 通知并唤醒 在wait中 的其他线程——数据使用线程 condition.signalAll();} catch (Exception e) {
e.printStackTrace();} finally {
lock.unlock();} }
//对数据进行使用 private static void use() { //获取锁,如果当前锁被其他线程持有,则当前线程会进入阻塞 lock.lock(); try {
// ④ 如果当前数据已经使用,则当前线程将进入wait队列,并且释放 lock while (dataUsed) { condition.await(); } //使用数据 ,并且将 dataUsed 状态标识为 true 、 TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5)); dataUsed = true; System.out.println("the shared data changed :" + shareData); // ⑤ 通知并唤醒wait 队列中的其他线程——修改数据 condition.signalAll();} catch (Exception e) {
e.printStackTrace();} finally {
lock.unlock();} }
public static void main(String[] args) { // 创建并启动两个匿名线程 new Thread(() -> {
for (; ; ) { change(); }}, “Producer”).start(); new Thread(() -> {
for (; ; ) { use(); }}, “Consumer”).start(); } }
上述代码中的关键点:
- shareData和dataUsed标识变量都是我们在该程序中的共享数据(资源),同时dataUsed也是临界值,数据一致性的保护主要是针对这两个变量的。
- 注释① ,创建了显示锁,该锁的作用主要是用于保护数据的一致性,然后使用该显示锁创建与之关联的Condition对象
- 在change方法中,首先获取对共享数据的访问权限,然后判断共享数据是否未被使用,如果还未被使用,那么当前线程将调用condition的await方法进入阻塞队列,以阻塞等待被其他线程唤醒,调用Condition的await方法之后,当前线程会释放对显示锁Lock的持有,由于我们使用两个线程进行操作,因此这里的while完全可以使用if来替代
- 当共享数据已经被使用,change方法会进一步地修改共享数据,然后将状态标识设置为false,并且通知其他线程(主要是数据使用线程)对进行使用
- 在use方法中,同样是首先获取对共享数据的访问权限(获取锁),然后判断共享数据是否已经被使用,如果数据已经被使用,那么当前线程会进入wait队列等待修改共享数据的线程将其唤醒
- 在注释⑤ 出,当正常使用了最新共享数据时,当前线程则会通知数据更新线程可以继续对数据进行修改了。
<a name="a18Ov"></a>
### 3.8.2 Condition接口方法详解
- void await_() _throws InterruptedException: 当前线程调用该方法会进入阻塞状态直到有其他线程对其进行唤醒,或者对当前线程执行中断。当线程执行了await方法进入阻塞时,当前线程会被加入阻塞队列中,并且释放对显示锁的持有,object monitor 的wait方法被执行后同样会加入一个虚拟的容器waitset(线程休息室)中,waitset是一个虚拟的概念,JVM规范并没有强制要求其采用什么样的数据结构,Condition的wait队列是由Java程序实现的FIFO队列。
- void awaitUninterruptibly_() :_该方法和await方法类似,只不过该方法比较固执,它会忽略对它的中断操作,一直等待有其他线程将它唤醒。
- awaitNanos_(_long nanosTimeout_) _throws InterruptedException : 调用该方法同样会使得当前线程进入阻塞状态,但是可以设定阻塞的最大等待时间,如果设定的时间内没有其他线程将其唤醒或被执行中断操作,那么当前线程将会等到设定到的纳秒时间后退出阻塞状态。
- awaitUntil_(_Date deadline_) _throws InterruptedException;
- signal :唤醒Condition阻塞队列中的一个线程,Condition的wait队列采用了Fifo的方式,因此在wait队列中,第一个进入阻塞队列的线程将会被首先唤醒,
- signalAll
<a name="wgzr0"></a>
### 3.8.3 使用Condition之生产者消费者
```java
public class ConditionExample5 {
//定义显示锁
private static final ReentrantLock lock = new ReentrantLock();
//创建与显示锁lock 关联的 Condition对象
private static final Condition condition = lock.newCondition();
//定义long 型的数据链表
private static final LinkedList<Long> list = new LinkedList<>();
//链表的最大容量
private static final int CAPACITY = 100;
//定义数据的初始值为0
private static long i = 0;
//生产者方法
private static void produce() {
//获取锁
lock.lock();
try {
//链表数据大于等于100 为一个临界值,当list中的数据量达到100时,生产者线程将被阻塞加入与 condition 关联的wait队列中
while (list.size() >= CAPACITY) {
condition.await();
}
//链表的数据量不足100时,生产新的数据
i++;
//数据放到列队尾部
list.addLast(i);
System.out.println(Thread.currentThread().getName() + "->" + i);
//① 通知其他线程
condition.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
private static void consume() {
lock.lock();
try {
while (list.isEmpty()) {
condition.await();
}
System.out.println(Thread.currentThread().getName() + "->" + list.removeFirst());
//② 通知其他线程
condition.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
private static void sleep() {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {
//启动10个生产者
IntStream.range(0, 10).forEach(i -> {
new Thread(() -> {
for (; ; ) {
produce();
sleep();
}
}, "Producer-" + i).start();
});
//启动5个消费者
IntStream.range(0, 5).forEach(i -> {
new Thread(() -> {
for (; ; ) {
consume();
sleep();
}
}, "Consumer-" + i).start();
});
}
}
上面的程序虽然能够正常运行,但是仍然有一些不足之处,比如注释①②,此刻唤醒动作唤醒的是与Condition关联的注释队列中所有的阻塞线程。由于我们使用的是唯一的一个Condition的实例,因此生产者唤醒的有可能是与Condition关联的wait队列中的生产者,假设当前生产者被唤醒后抢到了CPU的调度而获得执行权,但是又发现队列以满再次进入阻塞。这样的上下文开销是没有意义的,甚至会影响性能(多线程下的线程上下文开销其实非常大的性能损耗,一般针对高并发程序的调优就是减少上下文切换发生的概率)。
如何优化?使用两个Condition对象,一个用于队列已满临界值条件的处理,另外一个用于对队列为空的临界值条件的处理,这样一来,在生产者中唤醒的阻塞线程只能是消费者线程,在消费者中唤醒的也只能是生产者线程。
//创建与显示锁lock 关联的 Condition对象
private static final Condition FULL_CONDITION = lock.newCondition();
private static final Condition EMPTY_CONDITION = lock.newCondition();
//生产者方法
private static void produce() {
//获取锁
lock.lock();
try {
//链表数据大于等于100 为一个临界值,当list中的数据量达到100时,生产者线程将被阻塞加入与 condition 关联的wait队列中
while (list.size() >= CAPACITY) {
FULL_CONDITION.await();
}
//链表的数据量不足100时,生产新的数据
i++;
//数据放到列队尾部
list.addLast(i);
System.out.println(Thread.currentThread().getName() + "->" + i);
//① 通知其他线程 |生产者唤醒消费者线程
EMPTY_CONDITION.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
private static void consume() {
lock.lock();
try {
while (list.isEmpty()) {
EMPTY_CONDITION.await();
}
System.out.println(Thread.currentThread().getName() + "->" + list.removeFirst());
//② 通知其他线程|消费者唤醒生产者线程
FULL_CONDITION.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
3.9 StampedLock详解
3.9.1 读写锁的饥饿写问题
饥饿写:所谓的饥饿写是指在使用读写锁的时候,读线程的数量远远大于写线程的数量,导致锁长时间被读线程霸占,写线程无法获得对数据进行写操作的权限从而进入饥饿状态。
针对这样的问题,JDK1.8版本中引入了StampedLock,该锁是由一个long类型的数据戳(Stamp)和三种模型构成,当获取锁的时候会返回一个long类型的数据戳,该数据戳用于进行稍后的锁释放参数,,如果返回的数据戳为0,则表示获取锁失败,同时StampedLock还提供了一种乐观锁的操作方式。
需要注意的是StampedLock不是可重入的,每一次对StampedLock锁的获取都会生成一个数据戳,即使当前线程在获取了该锁的情况下再次获取也会返回一个全新的数据戳,因此如果使用不当会出现死锁。
3.9.2 StampedLock的使用
StampedLock被JDK1.8版本引入之后,成为了Lock家族的新宠,它几乎具备了ReentrantReadWriteLock和ReentrantLock这两种类型锁的所有功能。
1 替代ReentrantLock
public class StampedLockExample1 {
//共享数据
private static int shareData = 0;
//定义锁
private static final StampedLock lock = new StampedLock();
public static void inc() {
final long stamp = lock.writeLock();
try {
//修改共享数据
shareData++;
} finally {
lock.unlockWrite(stamp);
}
}
public static int get() {
//获取锁并记录数据戳
final long stamp = lock.writeLock();
try {
return shareData;
} finally {
lock.unlockRead(stamp);
}
}
}
在ReentrantLock锁不存在读写分离,因此上面的示例代码中的读写方法都是使用lock.writeLock进行锁的获取。
2 替换ReentrantReadWriteLock
public static int get() {
//获取锁并记录数据戳
final long stamp = lock.readLock();
try {
return shareData;
} finally {
lock.unlockRead(stamp);
}
}
StampedLock也提供了读锁和写锁这两种模式,因此StampedLock天生就支持读写分离锁的使用,上面的代码只是在StampedLockExample1基础上对get稍作修改即可完成。
3 乐观读模式
乐观读模式:使用tryOptimisticRead()方法获取一个非排他锁并且不会进入阻塞状态,与此同时该模式依然会返回一个long型的数据戳用于接下来的验证(该验证主要用来判断共享资源是否有写操作发生)。
public class StampedLockExample3 {
//共享数据
private static int shareData = 0;
//定义锁
private static final StampedLock lock = new StampedLock();
public static void inc() {
final long stamp = lock.writeLock();
try {
//修改共享数据
shareData++;
} finally {
lock.unlockWrite(stamp);
}
}
public static int get() {
//注释 ①
long stamp = lock.tryOptimisticRead();
//注释 ②
if (!lock.validate(stamp)) {
// 注释 ③
stamp = lock.readLock();
try {
return shareData;
} finally {
lock.unlockRead(stamp);
}
}
return shareData;
}
}
- 乐观读模式的使用方法也是非常简单,首先调用tryOptimisticRead(注释① ,该方法会立即返回方法,并不会导致当前进入阻塞等待) 方法进行乐观读操作,同样该方法也会返回一个long型的数据戳,如果获取成功,则数据戳非0,如果失败,则数据戳为0.
- get方法首先进行了一次乐观读的获取并且立即返回一个数据戳,但仅就这样的操作是不足以立即将数据返回的,这会导致数据出现不一致的情况,具体说明如下:
- 假设调用乐观读返回的数据戳为零,则代表其他线程正在对共享资源进行写操作,也就是说其他线程获取了对该共享资源的写权限。
- 假设调用乐观读返回的数据戳为非零,紧接着又有其他线程立即获取了对共享资源的写操作。
基于以上两点,我们还需要对数据戳进行校验后才能决定对共享资源进行阻塞式的读还是将其立即返回,(注释②),使用StampedLock的validate方法可以判断上述两种情况是否发生。
- 如果上述两种情况已经发生,则进行读锁的获取操作,此时若有其他线程对共享线程进行写操作,则当前线程会进入阻塞等待直到获取到读锁。
- 如果注释①处获取的读锁验证通过,则直接返回共享数据(注释④),不进行任何同步操作,这样的话可以对共享数据进行无锁读操作了,即提高了共享资源并发读取的能力。
3.10 Guava之Monitor详解
3.10.1 Monitor及Guard
import com.google.common.util.concurrent.Monitor;
public class MonitorExample1 {
//定义Monitor对象
private static Monitor monitor = new Monitor();
//共享数据
private static int x = 0;
//定义临界值,共享数据的值不能超过MAX_VALUE
public static final int MAX_VALUE = 10;
//定义Guard并实现isSatisfied方法
private static final Monitor.Guard INCR_WHEN_LESS_10 = new Monitor.Guard(monitor) {
//该方法相当于我们在写对象监视器或者 Condition 时的临界值判读逻辑
@Override
public boolean isSatisfied() {
return x < MAX_VALUE;
}
};
// 注释①
public static void main(String[] args) throws InterruptedException {
while (true) {
//注释②
monitor.enterWhen(INCR_WHEN_LESS_10);
try {
x++;
System.out.println(Thread.currentThread().getName() + " :x value is" + x);
} finally {
//释放③
monitor.leave();
}
}
}
}
无论是使用对象监视器的wait notify notifyAll还是Condition的await signal signallAll方法调用,我们首先都会对共享数据的临界值进行判断,当条件满足或不满足才会调用相关方法使得当前线程挂起,或者唤醒wait队列/set中的线程,因此对共享数据临界值的判断非常关键,Guava的Monitor工具提供了一种将临界值判断抽取成Guard的处理方式,可以很方便地定义若干个Guard也就是临界值的判断,以及对临界值判断的重复使用,除此之外Monitor还具备synchronized关键字和Lock的完整语义。
运行上面的代码,,当临界值条件不满足时,当前线程将会进入阻塞状态。
3.11 Guava之RateLimiter详解
RateLimiter,顾名思义就是速率(Rate)限流器(Limiter),事实上它的作用正如名字描述的那样,经常用来进行流量、访问等的限制。ReateLimter关注的是在单位时间里对资源操作速率的概念,可以理解为在单位时间内允许颁发的许可证数量。
3.11.1 ReateLimiter的基本使用
假设我们允许某个方法在单位时间内(1秒)被调用0.5次,也就是说该方法的访问速率为0.5/秒,即2秒内只允许有一次对该方法的访问操作。
import com.google.common.util.concurrent.RateLimiter;
/**
* @author MI
* @version 1.0
* @date 2021/3/13 21:58
*/
public class RateLimiterExample1 {
//定义一个 Rate Limiter
private static RateLimiter rateLimiter = RateLimiter.create(0.5);
public static void main(String[] args) {
for (; ; ) {
testRateLimiter();
}
}
//该方法只能每2秒执行一次
private static void testRateLimiter() {
//在访问时间之前首先要进行 rateLimiter的获取,返回值为实际的获取等待开销时间
final double elapsedSecond = rateLimiter.acquire();
System.out.println(Thread.currentThread().getName() + ": elapsed seconds: " + elapsedSecond);
}
}
上面的程序采用单线程的方式对RateLimiter进行操作。即便是多线程的情况下,也能将方法的访问速率控制到0.5次/秒。
虽然说RateLimiter主要是用于控制速率的,但是在其内部也是有许可证的概念,甚至可以将其理解为在单位时间内颁发的许可证数量,RateLimiter不仅允许每次获取一个许可证的操作,还运行获取操作许可证数量的行为,只不过后者的操作将是的下一次请求为提前的透支付出代价。
public class RateLimiterExample2 {
//定义单位时间(1秒)的速率或者可用的许可证数量
public static final RateLimiter rateLimiter = RateLimiter.create(2.0d);
public static void main(String[] args) {
//第一次申请4个,这样会透支下一次请求的时间
System.out.println(rateLimiter.acquire(4));
System.out.println(rateLimiter.acquire(2));
System.out.println(rateLimiter.acquire(2));
System.out.println(rateLimiter.acquire(2));
}
}
0.0(透支)
1.996958
0.997764
0.999901
3.11.2 RateLimiter的限流操作——漏桶算法

上图是漏桶算法
- 无论漏桶进水速率如何,漏桶的出水速率永远都是固定的。
- 如果漏桶中没有水流,则出水口不会有水流出。
- 漏桶有一定的水容量
- 如果流入水量超过漏桶容量,则水会溢出(降权处理)

import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.builder.ToStringBuilder;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
/**
* @author MI
* @version 1.0
* @date 2021/3/13 22:44
*/
//限速漏桶
public class RateLimiterBucket {
//定义漏桶的上沿容量
public static final int BUCKET_CAPACITY = 1000;
//漏桶采用 线程安全的容器,
public final ConcurrentLinkedQueue<Request> bucket = new ConcurrentLinkedQueue<>();
//定义漏桶下沿水流速率,每秒均匀放行10个request
private final RateLimiter rateLimiter = RateLimiter.create(10.0D);
//提交请求是需要 的Monitor
private final Monitor requestMonitor = new Monitor();
//处理请求时需要用到的Monitor
private final Monitor handleMonitor = new Monitor();
public void sumbitRequest(int data) {
this.sumbitRequest(new Request(data));
}
public void sumbitRequest(Request request) {
//① 当漏桶容量未溢出时
if (requestMonitor.enterIf(requestMonitor.newGuard(() -> bucket.size() < BUCKET_CAPACITY))) {
try {
//在漏桶中加入 新的 request
final boolean result = bucket.offer(request);
if (result) {
System.out.println(Thread.currentThread().getName() + " submit request " + request.getData() + " successfully!");
} else {
//produce into MQ and will try again later
}
} finally {
requestMonitor.leave();
}
} else {
// 当漏桶溢出的时候做降权处理
System.out.println("the request :" + request.getData() + " will be down-dimensional handle due to bucker is overflow");
//produce into MQ and will try again later
}
}
//该方法主要从漏桶中云速地处理相关请求
public void handleRequest(Consumer<Request> consumer) {
//若漏桶中存在请求,则处理
if (handleMonitor.enterIf(handleMonitor.newGuard(() -> !bucket.isEmpty()))) {
try {
//③ 匀速处理
rateLimiter.acquire();
//处理数据
consumer.accept(bucket.poll());
} finally {
handleMonitor.leave();
}
}
}
//一个简单的请求类
static class Request {
private final int data;
public Request(int data) {
this.data = data;
}
public int getData() {
return data;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("data", data)
.toString();
}
}
}
模拟对该限速漏桶的使用
public class RateLimiterExample3 {
private static final AtomicInteger data = new AtomicInteger(0);
private static final RateLimiterBucket bucket = new RateLimiterBucket();
public static void main(String[] args) {
//启动10个线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
bucket.sumbitRequest(data.getAndIncrement());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
}
}).start();
}
//启动10个线程 模拟匀速地对漏桶请求处理
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
bucket.handleRequest(System.out::println);
}
}).start();
}
}
}
3.11.3 令牌桶算法
令牌桶环与漏桶比较类似,漏桶对水流进入的速度不做任何限制,它只对水流出去的速率有严格控制的,令牌环桶,在对某个资源或者方法进行调用之前首先要获取到令牌也就是获取到许可证才能进行相关的操作,否则将不被允许。比如,常见的互联网秒杀抢购等,商品的数量有限,为了防止大量的并发请求流量进入系统后导致普通商品的消费出现影响,我们需要对类似这样的操作增加令牌授权,许可证放行等操作。
- 根据固定的速率向桶里提交请求
- 新加数据时如果超过了桶的容量,则请求将会被拒绝
- 如果令牌不足,则请求也会被拒绝(请求可以再次尝试)
以下是一个模拟商品抢购的方式。
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* @author MI
* @version 1.0
* @date 2021/3/14 9:15
*/
//令牌环桶 模拟商品抢购
public class RateLimiterTokenBucket {
//当前活动商品数量
public static final int MAX = 100;
//订单编号,订单成功之后会产生一个新的订单
private int orderID;
//单位时间内只允许10个用户能够抢购到商品,也就是说订单服务会被匀速地调用
private final RateLimiter rateLimiter = RateLimiter.create(10.0D);
private Monitor bookOrderMonitor = new Monitor();
//商品售罄的时候抛出异常
static class NoProductionException extends Exception {
public NoProductionException(String message) {
super(message);
}
}
static class OrderFailedException extends Exception {
public OrderFailedException(String message) {
super(message);
}
}
// 前台用户下单,但是只允许云速地进行订单服务调用
public void bookOrder(Consumer<Integer> consumer) throws NoProductionException, OrderFailedException {
//如果当前商品有库存则进行抢购操作
if (bookOrderMonitor.enterIf(bookOrderMonitor.newGuard(() -> orderID < MAX))) {
try {
//抢购商品,最多等待100毫秒
if (!rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
//如果在100 毫秒抢购失败,抛出订购失败异常,客户端可以重新尝试操作
throw new OrderFailedException("book order failed,please try again later.");
}
//执行订单订购操作
orderID++;
consumer.accept(orderID);
} finally {
bookOrderMonitor.leave();
}
} else {
//当前商品已经没有库存,则抛出没有商品的异常,该异常不会再次进行尝试
throw new NoProductionException("No avaliable production now.");
}
}
}
抢购
public class RateLimiterExample4 {
private static final RateLimiterTokenBucket tokenBucket = new RateLimiterTokenBucket();
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
while (true) {
//抢购商品
try {
tokenBucket.bookOrder(proID -> System.out.println("user:" + Thread.currentThread().getName() + " book the prod order and prodID:" + proID));
} catch (RateLimiterTokenBucket.NoProductionException e) {
//商品已经售罄,退出抢购
System.out.println("all of production already sold out");
break;
} catch (RateLimiterTokenBucket.OrderFailedException e) {
//抢购失败,重新尝试抢购
System.out.println("user:" + Thread.currentThread().getName() + " book order failed,will try again");
}
}
}).start();
}
}
}
4 Java并发包之并发容器详解
4.1 链表
4.1.1.1 基本的链表
所谓链表,实际上就是线性表的链式存储方式,有别于数据连续式内存空间,链表并不是连续的内存存储结构。在链表的每一个节点中,至少包含两个属性:数据本身和指向下一个节点的引用或者指针。
根据链表节点元素的不同访问形式就可以演化出栈,即最先进入链表结构的元素最后一个被访问(FIFO),还可以演化出队列,即最先进入链表结构的元素第一个被访问,;此外链表元素节点中多增一个指针属性就可以演化出二叉树。上图是单向链表,链表还包含双向链表,循环链表等。
下面实现一个简单的链表结构。
@Getter
public class Node<T> {
//数据属性
private final T value;
//指向下一个节点的引用
private final Node<T> next;
public Node(T value, Node<T> next) {
this.value = value;
this.next = next;
}
}
1 链表的构造
链表中有一个非常重要的元素Head,它代表当前节点元素的引用,当链表被初始化时,当前节点属性指向null。如下图 空链表
//当前节点引用
private Node<E> header;
public MyList() {
//当前元素节点为指向NULL的属性
header = null;
}
2 链表数据的清空以及是否为空的判断
有了当前节点的引用,在确认链表是否为空,或进行链表清空等操作就非常容易了,无需对链表中的整个元素进行判断,只需要针对链表当前节点的引用进行相关的操作即可。
//判断当前链表是否为空
public boolean isEmpty() {
//只需要判断当前节点引用是否为null即可
return header == null;
}
//清空 链表中的所有元素
public void clear(){
//显示设定当前size=0
this.size=0;
//将当前节点引用设置为null即可,由于其他元素ROOT不可达,因此在稍后的垃圾回收中将被回收
this.header=null;
}
3 向头部增加元素
在链表中增加元素,相对于在数组中的操作来说,是非常灵活且简单的(无需进行数组拷贝),只需要更改当前节点元素的引用即可实现。
public void add(E e){
//定义新的节点node,并且将其next引用指向当前节点所引用的header
final Node<E> node = new Node<>(e, header);
this.header=node;
this.size++;
}
4 链表的peekFirst操作
peek操作不会对当前链表产生任何副作用,其只是简单地返回当前链表头部的数据。
public E peekFirst() {
// 如果为空则直接抛出异常
if (isEmpty()) {
throw new IndexOutOfBoundsException("the linked list is empty");
}
//返回当前节点的元素数据
return header.getValue();
}
5 链表元素的弹出操作

public E popFirst(){
if (isEmpty()){
throw new IndexOutOfBoundsException("the linked list is empty");
}
//获取当前节点数据,作为方法的最终返回值
final E value = header.getValue();
//将链表的 当前引用指向当前节点的下一个
this.header=header.getNext();
//元素减一
this.size--;
return value;
}
6 其他方法
//获取当前链表的元素个数
public int getSize() {
return size;
}
@Override
public String toString() {
Node<E> node = this.header;
final StringBuilder builder = new StringBuilder("[");
while (node != null) {
builder.append(node.getValue().toString()).append(",");
node = node.getNext();
}
if (builder.length() > 1) {
builder.deleteCharAt(builder.length() - 1);
}
builder.append("]");
return builder.toString();
}
7 简单测试
public static void main(String[] args) {
final MyList<Integer> list = new MyList<>();
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
//对list进行测试
System.out.println(list);
System.out.println(list.size);
System.out.println(list.peekFirst());
System.out.println("================================");
System.out.println(list.popFirst());
System.out.println(list.size);
System.out.println(list);
System.out.println(list.peekFirst());
System.out.println(list.popFirst());
System.out.println(list.popFirst());
System.out.println(list.popFirst());
System.out.println(list.popFirst());
System.out.println("================================");
System.out.println(list.isEmpty());
System.out.println(list.size);
System.out.println(list);
System.out.println("================================");
}
4.1.2 优先级链表
在某些场景下,我们需要对队列或者栈中的元素根据某些特定的顺序进行排序。
public void add(E e) {
//定义新的节点,其指向下一个节点的引用为null
final Node<E> newNode = new Node<>(e);
//当前链表节点引用
Node<E> currentNode = this.header;
//上一个节点的引用,初始为null,稍后的计算会得到
Node<E> previous = null;
//循环遍历,
while (currentNode != null && e.compareTo(currentNode.getValue()) > 0) {
//前一个节点为当前节点
previous = currentNode;
currentNode = currentNode.getNext();
}
if (previous == null) {
//链表为空,链表的当前节点引用直接作为新构造的节点
this.header = newNode;
} else {
previous.setNext(newNode);
}
//新的下一个节点为current
newNode.setNext(currentNode);
this.size++;
}
//增加泛型约束,每一个被加入都必须实现 Comparable
public class MyPriorityList<E extends Comparable<E>> {
/**
* Node节点的泛型类型同样增加了香瓜约束,并且取缔了value和next字段不可变的特性
*/
private static class Node<T extends Comparable<T>> {
private T value;
private Node next;
public Node(T value, Node next) {
this.value = value;
this.next = next;
}
public Node(T value) {
this(value, null);
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
public Node getNext() {
return next;
}
public void setNext(Node next) {
this.next = next;
}
}
private Node<E> header;
private int size;
//增加了Comparator接口属性
private final Comparator<E> comparator;
public MyPriorityList(Comparator<E> comparator) {
this.comparator = Objects.requireNonNull(comparator);
this.header = null;
}
public void add(E e) {
//定义新的节点,其指向下一个节点的引用为null
final Node<E> newNode = new Node<>(e);
//当前链表节点引用
Node<E> currentNode = this.header;
//上一个节点的引用,初始为null,稍后的计算会得到
Node<E> previous = null;
//循环遍历,
while (currentNode != null && e.compareTo(currentNode.getValue()) > 0) {
//前一个节点为当前节点
previous = currentNode;
currentNode = currentNode.getNext();
}
if (previous == null) {
//链表为空,链表的当前节点引用直接作为新构造的节点
this.header = newNode;
} else {
previous.setNext(newNode);
}
//新的下一个节点为current
newNode.setNext(currentNode);
this.size++;
}
//判断当前链表是否为空
public boolean isEmpty() {
//只需要判断当前节点引用是否为null即可
return header == null;
}
public E peekFirst() {
// 如果为空则直接抛出异常
if (isEmpty()) {
throw new IndexOutOfBoundsException("the linked list is empty");
}
//返回当前节点的元素数据
return header.getValue();
}
public E popFirst() {
if (isEmpty()) {
throw new IndexOutOfBoundsException("the linked list is empty");
}
//获取当前节点数据,作为方法的最终返回值
final E value = header.getValue();
//将链表的 当前引用指向当前节点的下一个
this.header = header.getNext();
//元素减一
this.size--;
return value;
}
//获取当前链表的元素个数
public int getSize() {
return size;
}
@Override
public String toString() {
Node<E> node = this.header;
final StringBuilder builder = new StringBuilder("[");
while (node != null) {
builder.append(node.getValue().toString()).append(",");
node = node.getNext();
}
if (builder.length() > 1) {
builder.deleteCharAt(builder.length() - 1);
}
builder.append("]");
return builder.toString();
}
public static void main(String[] args) {
final MyPriorityList<Integer> list = new MyPriorityList<>((o1, o2) -> o1 - o2);
list.add(45);
System.out.println(list);
System.out.println("================================");
list.add(456);
list.add(4);
list.add(48);
list.add(500);
System.out.println(list);
System.out.println(list.popFirst());
System.out.println(list);
}
}
4.1.3 跳表
对链表结构改造,增加多个层级进行数据存储和查找,这种以空间换时间的思路能够加快元素的查询速度。
在增加了多个层级的链表中查找元素76这个元素?首先到最高一层的链表中查找并且对比,可以发现100大于76;接下来到第二层发现33小于76,并且发现3的下一个元素100大于76 因此来到下一个层级;对此33的下一个元素就是76,这正是我们需要的。
跳表(skipList)正是受这种多层链表的想法启发而设计出来的,实际上,按照上面生成链表的方式来看,上面每一层链表的节点个数,都会是下面一层节点个数一半左右,这样的查找过程非常类似二分法查找,使得查找的时间复杂度可以降低到 另外,跳表中的元素在插入是已经根据排序规则进行排序,在查找元素时无需再进行排序。
2 跳表(skipList)的实现
跳表中维护的属性
//定义头部节点属性
private Node head;
//定义尾部节点属性
private Node tail;
//元素个数
private int size;
//定义跳表层高
private int height;
//随机数,主要用于随机的方式决定元素应该被放在第几层
private Random random;
public SimpleSkipList() {
//初始化头部和尾部节点
this.head = new Node(null, HEAD_BIT);
this.tail = new Node(null, TAIL_BIT);
//头部节点的右边为尾部节点
this.head.right = tail;
//尾部节点的左边为头部节点
tail.left = head;
random = new Random(System.currentTimeMillis());
}

向跳表中添加元素的方法:首先需要为新元素找到合适的存放位置或者邻近的节点,在跳表中,最低的一层链表存放着全量的元素。因此想要找到合适的位置,是要从最高一层的head节点开始向下一层查询,从而为新的节点找打合适的位置。
private Node find(Integer element) {
//从头节点开始寻找
Node current = head;
for (; ; ) {
//当前节点的右节点不是尾结点,并且当前节点的右节点数据小于element
while (current.right.bit != TAIL_BIT && current.right.value <= element) {
//继续朝右前行
current = current.right;
}
//当 current 节点存在donw节点
if (current.down != null) {
//开始向下一层
current = current.down;
} else {
//到达最高层,终止循环
break;
}
}
return current;
}

假设我们要为新的数据元素23找打一个合适的位置,过程如下:
- 从head节点开始,head.right!=尾结点,并且head.right.vaule(21)<23.
- 在while循环中尝试向右前行,发现21节点的右节点是尾结点,因此退出while,
- 在第四层的节点21 继续下移至第三层。
- 在while循环中,节点21右边的节点虽然不是尾结点,但是21.right.value(211)>23,因此不满足向右迁移的条件
- 在第三层的节点21继续向下移动至第二层
- 第二层的逻辑同第三层
- 在第一层发现21.right.value(54)>23,因此不会继续前行,另外一层中,21.down=null,所以终止for循环。
public void add(Integer element) {
//根据element找到合适它的存储位置,也就是邻近的节点,需要注意的是,此刻节点在整个跳表的第一层
Node nearNode = this.find(element);
//定义一个新的节点
Node newNode = new Node(element);
//新节点的左节点为nearNode
newNode.left = nearNode;
//新节点的右节点为nearNode.right,相当于将新节点插入到了nearNode,和nearNode中间
newNode.right = nearNode.right;
nearNode.right.left = newNode;
nearNode.right = newNode;
//当前层级为0,代表最底层一层
int currentLevel = 0;
//根据随即判断是否将新节点放到新的层级,
while (random.nextDouble() < 0.5d) {
//如果currentLevels大于整个跳表的层高,则需要为跳表增加一层链表
if (currentLevel >= height) {
height++;
//定义新层高 head 和 tail
Node dumyHead = new Node(null, HEAD_BIT);
Node dumyTail = new Node(null, TAIL_BIT);
//指定新层高head和tail 的关系
dumyHead.right = dumyTail;
dumyHead.down = head;
head.up = dumyHead;
dumyTail.left = dumyHead;
dumyTail.down = tail;
tail.up = dumyTail;
head = dumyHead;
tail = dumyTail;
}
//在新的一层中增加 element 节点,同样要维护上下左右的关系
while ((nearNode != null) && nearNode.up == null) {
nearNode = nearNode.left;
}
nearNode = newNode.up;
Node upNode = new Node(element);
upNode.left = nearNode;
upNode.right = nearNode.right;
upNode.down = nearNode;
nearNode.right.left = upNode;
nearNode.right = upNode;
nearNode.up = upNode;
newNode = upNode;
currentLevel++;
}
size++;
}
4.1.4 跳表总结
ConcurrentSkipListMap 和ConcurrentSkipListSet,其内部的主要数据结构就是跳表。
4.2 BlockingQueue(阻塞队列)
所谓的Blocking Queue是指其中的元素数据存在界限,当队列已满时(队列元素数量达到了最大容量的临界值),对队列进行写入操作的线程将被阻塞挂起,当队列为空,对队列进行读操作线程将被挂起。
实际上,BlockingQueue(LinkedTransferQueue除外)的内部实现主要依赖于显示锁Lock以及与之关联的Condition。因此本节所涉及的BlockingQueue的实现都是线程安全的。
4.2.1 ArrayBlockingQueue
ArrayBlockingQueue是一个基于数组结构实现的FIFO阻塞队列,在构造该阻塞队列时需要指定队列中最大元素的数量(容量)。当队列已满时,若再次进行数据写入操作,则线程将会进入阻塞,一直等其他线程对元素进行消费。当队列为空时,对该队列的消费线程将会进入阻塞,直到其他线程写入数据。
1 阻塞式写法
无论是阻塞式写还是非阻塞式写都不允许写入null.
- void put(E e) : 向队列的尾部插入新的数据,当队列已满时调用该方法线程会进入阻塞,直到有其他线程对该线程执行了中断操作,或者队列中的元素被其他线程消费
- boolean offer(E e, long timeout, TimeUnit unit): 向队列尾部写入新的数据,当队列已满时执行该方法的线程在指定的时间单位内将进入阻塞,直到到了指定的超时时间后,或者再此期间有其他线程对队列数据进行了消费。
2 非阻塞式写法
- boolean add(E e) :向队列尾部写入新的数据,当队列已满时不会进入阻塞,但是该方法会抛出队列已满的异常。
- boolean offer(E e):向队列尾部写入新的数据,当队列已满是不会进入阻塞,并立即返回false。
3 阻塞式读方法
- E take():从队列头部获取数据,并且该数据会从队列头部移除,当队列为空时会执行take方法进入阻塞,直到有其他线程写入新的数据。或者当前线程执行了中断操作。
- E poll(long timeout, TimeUnit unit) : 从队列头部获取数据并且该数据会从队列头部移除,如果队列中没有任何元素时执行该方法,当前线程会阻塞指定的时间,直到再此期间有新的数据写入,或者阻塞的当前线程被其他线程中断,当前线程由于超时退出阻塞,返回null。
4 非阻塞式读
- E poll() :从队列头部获取数据并且从队列头部移除,当前队列为空时,该方法不会进入阻塞,而返回null。
- E peek() peek的操作类似于debug操作,它直接从队列头部获取一个数据,但是并不能从队列头部移除数据。当队列为空该方法不会进入阻塞,而是返回null值。
5 生产者模式
//生产者 消费者模式
final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
//启动11个生产数据的线程,想队列尾写入数据
IntStream.rangeClosed(0, 10).boxed()
.map(i -> new Thread("P-Thread-" + i) {
@Override
public void run() {
while (true) {
try {
final String data = String.valueOf(System.currentTimeMillis());
queue.put(data);
System.out.println(Thread.currentThread().getName() + " produce data:" + data);
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
System.out.println("reveived the interrupted SINGAL");
break;
}
}
}
}).forEach(Thread::start);
//定义11个消费者,从队列头部移除数据
IntStream.rangeClosed(0, 10).boxed()
.map(i -> new Thread("C-Thread-" + i) {
@Override
public void run() {
while (true) {
try {
final String data = queue.take();
System.out.println(Thread.currentThread().getName() + " consume data:" + data);
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
System.out.println("reveived the interrupted SINGAL");
break;
}
}
}
}).forEach(Thread::start);
4.2.2 PriorityBlockingQeueue
PriorityBlockingQueue优先级阻塞队列是一个无边界的阻塞队列,该队列会根据某种规则(Comparator)对插入队列尾部的元素进行排序,因此该队列将不会遵守FIFO的约束。该队列也是线程安全的类,适用于高并发多线程的情况下。
1排序且无边界的队列
只要应用程序的内存足够使用,理论上,PriorityBlockingQueue存放数量的数据是无边界的,在其内部维护了一个Object的数组,随着数量的不断增多,该数组也会进行动态的扩容,
2 不存在阻塞写方法
由于“PriorityBlockingQueue”是无边界的队列,因此将不存在在队列上限临界值的控制。添加数据元素的所有方法都等价于offer方法,从队列的尾部添加数据,但是该数据会根据排序规则对数据进行排序。
3 优先级队列读方法
和ArrayBlockingQueue类似
4.2.3 LinkedBlockingQeque
LinkedBlockingQueue是可选边界,基于链表实现的FIFO队列,
使用上,LinkedBlockingQueue和ArrayBlockingQueue极其相似。
4.2.6 LinkedBlockingDeque
LinkedBlockingDeque是一个基于链表实现双向阻塞队列,双向阻塞队列支持在队尾写入数据,读取移除数据;在对头写入数据,读取移除数据。
4.3 ConcurrentQueue(并发队列)
在绝大多数的BlockingQueue中,为了保护共享数据的一致性,需要对共享数据的操作进行加锁处理(显示锁或者synchronized),为了使得操作线程挂起和唤醒,我们需要借助对象监视器的wait/notify/notifyAll或者与显示锁关联的Condition。
Java自1.5版本后,实现了无锁且线程安全的并发队列。
- ConcurrentLinkedQueue: 无锁的,线程安全的,性能高效的,基于链表结构实现的FFIO单向队列(JDK1.5版本引入)
- ConcurrentLinkedDeque:无锁的,线程安全的,性能高效的,基于链表结构实现的双向队列(JDK1.7版本引入)
4.3.2 并发队列在使用中需要注意的问题
虽然并发队列在高并发多线程的环境中有着优异的性能表现,但是如果对其使用不当不仅对性能没有任何提升反倒会降低整个系统的运行效率。
1 在并发队列中使用size方法不是一个好主意
每一个Collection都提供了size方法用来获取Collection中的元素个数,但是并发队列中不是一个明智的操作
- 首先,并发队列是基于链表的结构实现的,并且在其内部并未提供类似的计数器变量,因此想要获取当前队列中元素个数,需要遍历整个队列计算才能得到(效率低下)。
- 其次并发队列采用无锁的算法实现,因此在执行size获取元素的数量同时,其他线程也可以对该队列进行读写操作,所以返回的数值不会是一个精确值,而是一个近视值。
2ConcurrentLinkedQueue的内存泄漏问题
ConcurrentLinkedQueue在执行remove方法删除元素的时候还会出现性能越来越低下,甚至内存泄漏问题,
4.4 ConcurrentMap(并发映射)
4.4.1 ConcurrentHashMap
ConcurrentHashMap是专门为多线程高并发场景设计的Map,它个get操作基本上是lock-free,同时put方法又将锁的粒度控制在很小范围内,因此它非常适合于多线程的应用程序中。
1 JDK1.8版本以前的ConcurrentHashMap内部结构
在JDK1.6 1.7版本中,ConcurrentHashMap采用的是分段锁机制(可以确保线程安全的同时最小化锁的粒度)实现并发的更新操作,在ConcurrentHashMap中包含两个核心的警惕爱内部类Segment和HashEntry,前者是实现RenntrantLock的显示锁,每一个Segment锁对象均可用于同步每个散列映射表的若干个桶,后者主要用于存储映射表的键值对。与此同时,若干个HashEntry通过链表结构形成了HashBucket,而最终的ConcurrentHashMap则是若干个(默认16个)Segment对象数组构成的。
Segment可用于实现减小锁的粒度,ConcurrentHashMap被分割成若干个Segment,在put的时候只需要锁住一个Segment即可,而在get的时候则干脆不加锁,而是使用volatile属性来保证其他线程同时修改后的可见性。
2 JDK1.8版本ConcurrentHashMap的内部结构
在JDK1.8版本中几乎重构了ConcurrentHashMap的内部结构,摒弃了segment的实现方式,直接使用table数组存储键值对,在JDK1.6中,每个bucket总键值对组织方式都是单向链表,查询复杂度是O(n),JDK1.8中当链表长度超过时,链表转换成红黑树,查询复杂度可以降低到O(logn) 改进了性能。利用CAS+Synchronized可以保证并发更新的安全性,底层采用数组+链表+红黑树(提高检索效率)的存储结构。
4.4.2 oncurrentSkipListMap
ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表,内部是SkipList(链表)结构实现,在理论上,其能够在O(log(n))内完成查找,插入,删除操作。调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以需要遍历整个链表才能返回元素个数,这个操作是个O(LOG(n))的操作。
在读取性能上,虽然ConcurrentSkipListMap不能与CconcurrentHashMap相提并论,但是ConcurrentSkipListMap存在着如下两大天生的优势是CconcurrentHashMap不具备的。
- 由于基于跳表的数据结构,因此ConcurrentSkipListMap是有序的。
- ConcurrentSkipListMap 支持更高的并发,在线程数量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现优势。
5 Java并发包之ExecutorService详解
建议直接使用JDK提供的线程池解决方案,主要有以下几点原因
- Java自带的线程池结局方案足够优秀,能满足大多数开发者的需求。
- 随着JDK版本的不断升级,相信这些工具也会不断更新优化或者添入更多的特性
5.1 Executor&ExecutorService详解
Executor接口继承至Executor接口,并且提供了更多用于任务提交和管理的一些方法,比如停止任务的执行等。
5.1.1 ThreadPoolExecutor详解
ThreadPoolExecutor是ExecutorService最为重要、最常见的一个实现之一,
5 拒绝策略
当线程池中没有空闲的工作线程,并且任务队列已满时,新的任务被执行拒绝策略。
ThreadPoolExecutor提供了四种拒绝策略
- DiscardPolicy:丢弃策略,任务会直接无视丢弃而等不到执行,因此该策略需要慎用。
- AbortPolicy:中止策略,在线程池中使用该策略,在无法处理任务时会抛出拒绝执行异常
- DiscardOldestPolicy: 丢弃任务队列中最老任务。
- CallerRunsPolicy: 调用者线程执行策略,该策略不会导致新任务的丢失,但是任务会在当前线程中被阻塞的执行,也就是说任务不会由线程池中的工作线程执行。
5.5 CompletableFuture详解
JDK1.8引入的新的Future,常用于异步编程之中,所谓异步编程,简单来说就是:“程序运算与应用程序主线程在不同的线程上完成,并且程序运算的线程能够向主线程通知其进度,以及成功与失败与否的非阻塞式编码方式”,
5.5.1 CompletableFuture的基本用法
CompletableFuture首先是一个Future。
public static void main(String[] args) {
//double 类型的 CompletableFuture
final CompletableFuture<Double> completableFuture = new CompletableFuture();
//提交异步任务
Executors.newCachedThreadPool().submit(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("finished");
//执行结束
completableFuture.complete(1234.5D);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//非阻塞获取异步任务的计算结果,和宁县,此刻异步任务未执行结束,那么可以采用默认值的方式
//(该方法也可以被认为是放弃异步任务的执行结果,但不会取消异步任务的执行 )
try {
assert completableFuture.get() == 1234.5D;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
5.5.2 任务的异步运行
CompletableFuture除了具备Future的基本特性之外,还可以直接使用它执行异步任务。通常情况下,任务的类型为Suppiler和Runnable,前者非常类似于Callable接口,可返回指定类型的运算结果,后者仍然只是关注异步任务运行本身。
异步执行Supplier
可以直接调用CompletableFuture的静态方法supplyAsync异步执行Supplier类型的任务。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 354);
// 另一个重载 方法,运行传入 ExecutorService
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 354, Executors.newCachedThreadPool());
异步执行Runnable类型的任务
也可以直接调用CompletableFuture的静态方法runAsync 异步执行Runnable类型的任务。
CompletableFuture.runAsync(() -> {
System.out.println("async task .");
});
5.5.3 异步任务链
CompletableFuture还允许将执行的异步任务结果继续交由下一级任务来执行,下一任务还可以有下一次,以此类推,这样就可以形成一个异步任务链或者任务pipeline。
thenApply :以同步的方式继续处理上一个异步任务的结果
/**
* 以同步的方式继续执行上一个异步任务的结果
* supplyAsync 的 计算结果为 Java
* thenApply 继续处理 “Java” ,返回字符串的长度
* supplyAsync和thenApply 的任务执行是同一个线程
*/
static void thenApply() throws ExecutionException, InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(3);
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " supplyAsync");
return "Java";
}, executor).thenApply(e -> {
System.out.println(Thread.currentThread().getName() + " thenApply");
System.out.println(Thread.currentThread().getName() + " " + e);
return e.length();
});
System.out.println(future.get());
}
thenApplyAsync: 以异步的方式继续处理上一个异步任务的结果
/**
* thenApplyAsync 的计算结果为“Java”
* thenApplyAsync 继续处理“Java”,返回字符串的长度
*/
static void thenApplyAsync() throws ExecutionException, InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(3);
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " supplyAsync");
return "Java";
}, executor).thenApplyAsync(e -> {
System.out.println(Thread.currentThread().getName() + " thenApplyAsync");
System.out.println(Thread.currentThread().getName() + " " + e);
return e.length();
});
System.out.println(future.get());
}
thenAccept : 以同步的方式消费上一个异步任务的结果
/**
* thenAccept : 以同步的方式消费上一个异步任务的结果
*/
static void thenAccept() throws ExecutionException, InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " supplyAsync");
return "Java";
}, executor).thenAccept(e -> {
System.out.println(Thread.currentThread().getName() + " thenAccept");
System.out.println(Thread.currentThread().getName() + " " + e);
});
executor.shutdown();
}
thenAcceptAsync 以异步的方式消费上一个异步任务的结果
/**
* thenAcceptAsync 以异步的方式消费上一个异步任务的结果
*/
static void thenAcceptAsync() throws ExecutionException, InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " supplyAsync");
return "Java";
}, executor).thenAcceptAsync(e -> {
System.out.println(Thread.currentThread().getName() + " thenAcceptAsync");
System.out.println(Thread.currentThread().getName() + " " + e);
});
executor.shutdown();
}
在任务链的末端,如果执行的任务既不想对上一个任务的输出做进一步处理,又不想消费上一个任务的输出结果。那么可以使用thenRun或者thenRunSync方法来执行Runnable任务。
thenRun 以异步的而方式执行Runable任务
static void thenRun() throws ExecutionException, InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " supplyAsync");
return "Java";
}, executor).thenAcceptAsync(e -> {
System.out.println(Thread.currentThread().getName() + " thenAcceptAsync");
System.out.println(Thread.currentThread().getName() + " " + e);
}).thenRunAsync(() -> {
System.out.println("all of task completed." + Thread.currentThread().getName());
});
}
5.5.4合并多个Future
CompletableFuture还允许将若干个Future合并成一个Future的使用方式,可以通过thenCompose方法或者thenCombine方法来实现多个Future的合并
thenCompose 方法示例
void thenCompose() {
//通过thenCompose将两个Future合并成一个Future
final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Java")
//s 为上一个Future的计算结果
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " scala"));
//合并后的Future通过thenApply方法组成任务链
completableFuture.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
}
thenCombine 方法示例
void thenCombine() {
final CompletableFuture<String> thenCombine = CompletableFuture.supplyAsync(() -> "Java")
.thenCombine(CompletableFuture.supplyAsync(() -> " scala"),
//s1为第一个Future计算的结果,s2为第二个Future计算的结果
(s1, s2) -> s1 + s2);
thenCombine.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
}
5.5.5 多Future的并行计算
如果想多个独立CompletableFuture同时并行计算,可以借助allOf来完成,其类似于ExecutorService的invokeAll批量提交异步任务。
/**
* 多Future的并行计算
*/
void allOf() throws ExecutionException, InterruptedException {
//定义三个CompletableFuture
final CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Java");
final CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Parallel");
final CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "Future");
//批量执行 ,返回值是一个 void 类型的CompletableFuture
final CompletableFuture<Void> future = CompletableFuture.allOf(f1, f2, f3).thenRun(() -> {
try {
System.out.println(f1.isDone() + " and result " + f1.get());
System.out.println(f2.isDone() + " and result " + f2.get());
System.out.println(f3.isDone() + " and result " + f3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
//阻塞等待运行结束
future.get();
}
5.5.8 CompletableFuture总结
自JDK1.8以来,CompletableFuture的引入不仅很好地填充了Future的不足之处,还提供了非常遍历的异步编程方式,借助于CompletableFuture,我们可以很轻易地开发出异步运行的代码,甚至不用关心地城线程的维护和管理,只需要关注代码函数本身即可。
6 Java Stream详解
Java8中的Stream不是容器,他并不是用来存储数据的,而是对JDK中的Collection的一个增强,它专注于对集合对象既便利又高效的聚合操作。而不仅支持串行的操作功能,而且还可以借助于JDK1.7中的Fork-Join机制支持并行模式,开发者无需编写任何一行相关的代码,就能高效方便地写出高并发的程序,尤其是在当下多核CPU时代,最大程度地利用CPU的超快计算能力。
6.1 Stream介绍以及基本操作
在JDK1.8中,Stream为容器的使用提供了新的方式,它允许我们通过陈述式的编码风格对容器中的数据进行分组、过滤、计算、排序、聚合循环等操作。
6.2 Collector在Stream中的使用
Collector在Stream中的主要用途大致分为如下三项:
- Reduce和Summarizing Stream中的元素到一个单一的新的输出
- 对Stream中的元素进行分组(Groping)
- 对Stream中的元素进行分区(Partioning)
6.2.1 Collector
public class Production {
private final String name;
private final double price;
public Production(String name, double price) {
this.name = name;
this.price = price;
}
public String getName() {
return name;
}
public double getPrice() {
return price;
}
}
1 Reduce和Summarizing Stream操作
final Stream<Production> stream = Stream.of(
new Production("T-shirt", 43.34d),
new Production("cloth", 99.99d),
new Production("shoe", 123.8d),
new Production("hat", 26.5d),
new Production("cloth", 199.99d),
new Production("shoe", 32.5d));
//过滤,只保留衣服元素并返回一个新的 stream
final Double totalPrice = stream.filter(p -> p.getName().equals("cloth"))
.collect(Collectors.summingDouble(Production::getPrice));
//
final double cloth = stream.filter(p -> p.getName().equals("cloth"))
.mapToDouble(Production::getPrice)
.sum();
2 简单分组操作
final List<Production> list = Arrays.asList(new Production("T-shirt", 43.34d),
new Production("cloth", 99.99d),
new Production("shoe", 123.8d),
new Production("hat", 26.5d),
new Production("cloth", 199.99d),
new Production("shoe", 32.5d));
final Map<String, Double> prodPrice = new HashMap<>();
for (Production p : list) {
final String name = p.getName();
final double price = p.getPrice();
if (prodPrice.containsKey(name)) {
final Double aDouble = prodPrice.get(name);
prodPrice.put(name, aDouble + price);
} else {
prodPrice.put(name, price);
}
}
final Map<String, Double> collect = stream.collect(Collectors.groupingBy(
//分组
Production::getName,
//
Collectors.summingDouble(Production::getPrice)
));
