函数式变成可以参考https://www.yuque.com/u21082124/awctwe

五、增强的Future:CompletableFuture

CompletableFuture是Java8新增的一个大类,实现了Future和CompletionStage接口。

1、完成了就通知我

CompletableFuture除了可以像Future一样作为函数调用的契约,还可以手动设置CompletableFuture的完成状态

  1. public class AskThread implements Runnable {
  2. CompletableFuture<Integer> re = null;
  3. public AskThread(CompletableFuture<Integer> re) {
  4. this.re = re;
  5. }
  6. @Override
  7. public void run() {
  8. int myRe = 0;
  9. try {
  10. myRe = re.get() * re.get();
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println(myRe);
  15. }
  16. public static void main(String[] args) throws InterruptedException {
  17. CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
  18. new Thread(new AskThread(completableFuture)).start();
  19. Thread.sleep(1000); // 模拟计算过程
  20. completableFuture.complete(60); // 当计算完成后,将最终数据(60)载入其中,并标记为完成状态
  21. }
  22. }

2、异步执行任务

  1. public static Integer calc(Integer para) {
  2. try {
  3. Thread.sleep(1000);
  4. } catch (InterruptedException ignored) {
  5. }
  6. return para * para;
  7. }
  8. public static void main(String[] args) {
  9. final CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50)); // 异步执行,内部使用线程池实现,会立即返回
  10. try {
  11. System.out.println(integerCompletableFuture.get()); // 获取不到结果会阻塞
  12. } catch (InterruptedException | ExecutionException e) {
  13. e.printStackTrace();
  14. }
  15. }

类似的工厂方法:
image.png
其中supplyAsync()方法用于需要返回值的场景,而runAsync()则是用在没有返回值的场景
两种方法都可以接收一个Executor参数,如果不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行

ForkJoinPool.commonPool()是java8新增的方法,其中所有的线程都是daemon的。

3、流式调用

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50))
  3. .thenApply((i) -> Integer.toString(i)) // 同步方法,将上一函数的结果做处理
  4. .thenApply((str) -> "\"" + str + "\"")
  5. .thenAccept(System.out::println); // 消费上一函数返回的方法,但没有返回值
  6. voidCompletableFuture.get(); // 阻塞等待处理结果,不然会随着main线程退出而退出
  7. }

4、异常处理

CompletableFuture提供了一个异常处理的方法exceptionally()

  1. public static void main(String[] args) {
  2. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50))
  3. .exceptionally(ex -> {
  4. System.out.println(ex.toString());
  5. return 0;
  6. })
  7. .thenApply(integer -> Integer.toString(integer))
  8. .thenApply((str) -> "\"" + str + "\"")
  9. .thenAccept(System.out::println);
  10. try {
  11. voidCompletableFuture.get();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } catch (ExecutionException e) {
  15. e.printStackTrace();
  16. }
  17. }

如上面第4行代码的处理。

5、组合多个CompletableFuture

CompletableFuture允许我们将多个CompletableFuture进行整合,一种方法是thenCompose。
方法签名如下:

  1. public <U> CompletableFuture<U> thenCompose(
  2. Function<? super T, ? extends CompletionStage<U>> fn)

一个CompletableFuture可以在执行完后,将执行结果通过Function接口传递给下一个CompletionStage实例进行处理

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50))
  3. .thenCompose((i)->CompletableFuture.supplyAsync(()->calc(i)))
  4. .thenApply((i) -> Integer.toString(i))
  5. .thenApply((str) -> "\"" + str + "\"")
  6. .thenAccept(System.out::println);
  7. voidCompletableFuture.get();
  8. }

另一种组合方法是:thenCombine()方法,签名如下:

  1. public <U,V> CompletableFuture<V> thenCombine(
  2. CompletionStage<? extends U> other,
  3. BiFunction<? super T,? super U,? extends V> fn)

它首先要完成CompletableFuture和other的执行,将其执行结果传递给BiFunction。

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50))
  3. .thenCombine(CompletableFuture.supplyAsync(() -> calc(25)), Integer::sum)
  4. .thenApply((i) -> Integer.toString(i))
  5. .thenApply((str) -> "\"" + str + "\"")
  6. .thenAccept(System.out::println);
  7. voidCompletableFuture.get();
  8. }

6、支持timeout的CompletableFuture

JDK9以后增加了timeout功能

详细请见P306

六、读写锁的改进:StampedLock

读写锁虽然分离了读和写功能,但是读和写之间依然是冲突的。而StampedLock则提供了一种乐观读策略。这种乐观锁非常类似于无锁的操作,使得乐观锁完全不会阻塞线程。

1、StampedLock使用示例

  1. public class Point {
  2. private double x, y;
  3. private final StampedLock sl = new StampedLock();
  4. void move(double deltaX, double deltaY) {
  5. long stamp = sl.writeLock(); // 写锁
  6. try {
  7. x += deltaX;
  8. y += deltaY;
  9. } finally {
  10. sl.unlockWrite(stamp);
  11. }
  12. }
  13. /**
  14. * 只读方法
  15. * @return
  16. */
  17. double distanceFromOrigin() {
  18. // 尝试一次乐观读,返回一个类似于时间戳的邮戳整数stamp,可以作为这一次获得锁的凭证
  19. long stamp = sl.tryOptimisticRead();
  20. double currentX = x, currentY = y;
  21. // 判断这个stamp是否有被修改过
  22. if (!sl.validate(stamp)) {
  23. stamp = sl.readLock(); // 升级锁的级别将其升级为悲观锁,进一步读取数据
  24. try {
  25. currentX = x;
  26. currentY = y;
  27. } finally {
  28. sl.unlockRead(stamp);
  29. }
  30. }
  31. return Math.sqrt(currentX * currentX + currentY * currentY);
  32. }
  33. }

2、StampedLock小陷阱

在StampedLock挂起线程时,使用的是Unsafe.park()函数。而这个函数遇到中断时,会直接返回。所以在StampedLock中,如果阻塞在park()方法上的线程被中断后,会再次进入循环。疯狂占用我们的CPU。
示例代码请见:P309

3、有关StampedLock的实现思想

StampedLock的实现是基于CLH锁的。它是一种自旋锁,他保证没有饥饿发生,并且可以保证FIFO的服务顺序

CLH锁的基本思想:
锁维护一个等待线程队列,所有申请锁但是没有成功的线程都记录在这个队列中。每一个节点(一个节点代表一个线程)保存一个标志位(locked),用于判断当前线程是否已经释放锁。
当一个线程试图获得锁时,取得当前等待队列的尾部节点作为前序节点,并使用while循环来判断是否已经成功释放锁。
如果前序节点没有释放锁,则表示当前线程还不能继续执行,会自旋等待。反之,如果已经释放,则当前线程可以继续执行。
释放锁时,也是这个逻辑。如果自身节点locked标记为FALSE,那么后续等待的线程就可以继续执行了。
image.png

具体实现需要自行阅读源码或阅读相关资料。

4、原子类的增强

Java 8 引入了LongAdder类,也使用了CAS指令。

1、更快原子类

之前章节讲到的原子类,都是在一个死循环内,不断尝试修改目标值。如果竞争激烈,会进行很多次循环尝试,导致性能受到影响。

有一种基本方案就是使用热点分离,将竞争数据进行分解。比如可以仿造ConcurrentHashMap,将AtomicInteger的内部核心数据value分离成一个数组。
LongAdder并不会一开始就进行数组处理,而是将变量存在一个称为base的变量中。如果在多线程中,发生修改冲突,就会初始化cell数组,使用心的策略。如果使用cell数组更新后,发现在某个cell上更新依然冲突,那么系统就会尝试新的cell,或者将cell的数量加倍。

LongAdder使用了@sun.misc.Contended注释来避免伪共享。当然我们在自己的代码中也可以使用此注解。但是需要加虚拟机参数:-XX:-RestrictContended

2、LongAdder的增强版:LongAccumulator

LongAccumulator与LongAdder有共同的父类Striped64。因此其内部优化方式与LongAdder是一样的。但是LongAccumulator是对LongAdder的功能扩展。LongAdder只是对给定的整数进行一次加法,而LongAccumulator则可以实现任意函数操作。

构造:
public LongAccumulator_(_LongBinaryOperator accumulatorFunction,long identity_) _
第一个参数accumulatorFunction就需要一个二元函数(接收两个long型参数并返回long),第二个参数是初始值。

大致使用方法为:

  1. LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
  2. // 过滤最大值
  3. for(......){
  4. new Thread(()->{
  5. Random random = new Random();
  6. long value = random.nextLong();
  7. accumulator.accumulate(value); // 通过此方法传入accumulator内部
  8. })
  9. }