函数式变成可以参考https://www.yuque.com/u21082124/awctwe
五、增强的Future:CompletableFuture
CompletableFuture是Java8新增的一个大类,实现了Future和CompletionStage接口。
1、完成了就通知我
CompletableFuture除了可以像Future一样作为函数调用的契约,还可以手动设置CompletableFuture的完成状态
public class AskThread implements Runnable {CompletableFuture<Integer> re = null;public AskThread(CompletableFuture<Integer> re) {this.re = re;}@Overridepublic void run() {int myRe = 0;try {myRe = re.get() * re.get();} catch (Exception e) {e.printStackTrace();}System.out.println(myRe);}public static void main(String[] args) throws InterruptedException {CompletableFuture<Integer> completableFuture = new CompletableFuture<>();new Thread(new AskThread(completableFuture)).start();Thread.sleep(1000); // 模拟计算过程completableFuture.complete(60); // 当计算完成后,将最终数据(60)载入其中,并标记为完成状态}}
2、异步执行任务
public static Integer calc(Integer para) {try {Thread.sleep(1000);} catch (InterruptedException ignored) {}return para * para;}public static void main(String[] args) {final CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50)); // 异步执行,内部使用线程池实现,会立即返回try {System.out.println(integerCompletableFuture.get()); // 获取不到结果会阻塞} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
类似的工厂方法:
其中supplyAsync()方法用于需要返回值的场景,而runAsync()则是用在没有返回值的场景
两种方法都可以接收一个Executor参数,如果不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行
ForkJoinPool.commonPool()是java8新增的方法,其中所有的线程都是daemon的。
3、流式调用
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50)).thenApply((i) -> Integer.toString(i)) // 同步方法,将上一函数的结果做处理.thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println); // 消费上一函数返回的方法,但没有返回值voidCompletableFuture.get(); // 阻塞等待处理结果,不然会随着main线程退出而退出}
4、异常处理
CompletableFuture提供了一个异常处理的方法exceptionally()
public static void main(String[] args) {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50)).exceptionally(ex -> {System.out.println(ex.toString());return 0;}).thenApply(integer -> Integer.toString(integer)).thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);try {voidCompletableFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
如上面第4行代码的处理。
5、组合多个CompletableFuture
CompletableFuture允许我们将多个CompletableFuture进行整合,一种方法是thenCompose。
方法签名如下:
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
一个CompletableFuture可以在执行完后,将执行结果通过Function接口传递给下一个CompletionStage实例进行处理
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50)).thenCompose((i)->CompletableFuture.supplyAsync(()->calc(i))).thenApply((i) -> Integer.toString(i)).thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);voidCompletableFuture.get();}
另一种组合方法是:thenCombine()方法,签名如下:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
它首先要完成CompletableFuture和other的执行,将其执行结果传递给BiFunction。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> calc(50)).thenCombine(CompletableFuture.supplyAsync(() -> calc(25)), Integer::sum).thenApply((i) -> Integer.toString(i)).thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);voidCompletableFuture.get();}
6、支持timeout的CompletableFuture
在JDK9以后增加了timeout功能
详细请见P306
六、读写锁的改进:StampedLock
读写锁虽然分离了读和写功能,但是读和写之间依然是冲突的。而StampedLock则提供了一种乐观读策略。这种乐观锁非常类似于无锁的操作,使得乐观锁完全不会阻塞线程。
1、StampedLock使用示例
public class Point {private double x, y;private final StampedLock sl = new StampedLock();void move(double deltaX, double deltaY) {long stamp = sl.writeLock(); // 写锁try {x += deltaX;y += deltaY;} finally {sl.unlockWrite(stamp);}}/*** 只读方法* @return*/double distanceFromOrigin() {// 尝试一次乐观读,返回一个类似于时间戳的邮戳整数stamp,可以作为这一次获得锁的凭证long stamp = sl.tryOptimisticRead();double currentX = x, currentY = y;// 判断这个stamp是否有被修改过if (!sl.validate(stamp)) {stamp = sl.readLock(); // 升级锁的级别将其升级为悲观锁,进一步读取数据try {currentX = x;currentY = y;} finally {sl.unlockRead(stamp);}}return Math.sqrt(currentX * currentX + currentY * currentY);}}
2、StampedLock小陷阱
在StampedLock挂起线程时,使用的是Unsafe.park()函数。而这个函数遇到中断时,会直接返回。所以在StampedLock中,如果阻塞在park()方法上的线程被中断后,会再次进入循环。疯狂占用我们的CPU。
示例代码请见:P309
3、有关StampedLock的实现思想
StampedLock的实现是基于CLH锁的。它是一种自旋锁,他保证没有饥饿发生,并且可以保证FIFO的服务顺序
CLH锁的基本思想:
锁维护一个等待线程队列,所有申请锁但是没有成功的线程都记录在这个队列中。每一个节点(一个节点代表一个线程)保存一个标志位(locked),用于判断当前线程是否已经释放锁。
当一个线程试图获得锁时,取得当前等待队列的尾部节点作为前序节点,并使用while循环来判断是否已经成功释放锁。
如果前序节点没有释放锁,则表示当前线程还不能继续执行,会自旋等待。反之,如果已经释放,则当前线程可以继续执行。
释放锁时,也是这个逻辑。如果自身节点locked标记为FALSE,那么后续等待的线程就可以继续执行了。
具体实现需要自行阅读源码或阅读相关资料。
4、原子类的增强
Java 8 引入了LongAdder类,也使用了CAS指令。
1、更快原子类
之前章节讲到的原子类,都是在一个死循环内,不断尝试修改目标值。如果竞争激烈,会进行很多次循环尝试,导致性能受到影响。
有一种基本方案就是使用热点分离,将竞争数据进行分解。比如可以仿造ConcurrentHashMap,将AtomicInteger的内部核心数据value分离成一个数组。
LongAdder并不会一开始就进行数组处理,而是将变量存在一个称为base的变量中。如果在多线程中,发生修改冲突,就会初始化cell数组,使用心的策略。如果使用cell数组更新后,发现在某个cell上更新依然冲突,那么系统就会尝试新的cell,或者将cell的数量加倍。
LongAdder使用了@sun.misc.Contended注释来避免伪共享。当然我们在自己的代码中也可以使用此注解。但是需要加虚拟机参数:-XX:-RestrictContended
2、LongAdder的增强版:LongAccumulator
LongAccumulator与LongAdder有共同的父类Striped64。因此其内部优化方式与LongAdder是一样的。但是LongAccumulator是对LongAdder的功能扩展。LongAdder只是对给定的整数进行一次加法,而LongAccumulator则可以实现任意函数操作。
构造:public LongAccumulator_(_LongBinaryOperator accumulatorFunction,long identity_) _
第一个参数accumulatorFunction就需要一个二元函数(接收两个long型参数并返回long),第二个参数是初始值。
大致使用方法为:
LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);// 过滤最大值for(......){new Thread(()->{Random random = new Random();long value = random.nextLong();accumulator.accumulate(value); // 通过此方法传入accumulator内部})}
