http://img11.360buyimg.com//n0/jfs/t1/9457/10/1147/146908/5bcd3149Ee45f979e/e5b85ad450838567.jpg
https://item.jd.com/12458866.html
实战Java高并发程序设计 - 图1
代码位置 spthymeleaf->com.study.book.gym

1.2 几个概念

1.2.2 并发(Concurrency)和并行(Parallelism)

并行和并发,都可以表示两个或者多个任务一起执行,但是侧重点有所不同。并发侧重于多个任务交替执行,而多个任务之间有可能是串行的,而并行是真正意义上的“同时执行”

1.5.4 哪些指令不能重排:Happen-Before

以下这些原则是指令重排不可违背的。

  • 程序顺序原则:一个线程内保证语义的串行性。
  • volatile规则:volatile变量的写先于读发生,这保证了volatile变量的可见性。
  • 锁规则:解锁必然发生在随后的加锁前
  • 传递性:A先于B,B先于C,那么A必然先于C
  • 线程的start方法必然先于它的每一个动作
  • 线程的所有操作先于线程的线程的终结(Thread.join)
  • 线程的中断(interrupt)先于被中断的代码

3 JDK并发包

3.1 多线程团队协作:同步控制

3.1.1 关键字synchronized的功能扩展:重入锁

重入锁可以完全替代关键字synchronized。在JDK5.0的早期版本中,重入锁的性能远远优于关键字synchronized,但从JDK6.0开始,JDK在关键字synchronized上做了大量的优化,使得两者的性能差距并不大。

  1. import java.util.concurrent.locks.ReentrantLock;
  2. public class ReenterLock implements Runnable {
  3. private static final ReentrantLock lock = new ReentrantLock();
  4. public static int i = 0;
  5. @Override
  6. public void run() {
  7. for (int j = 0; j < 1_000_000; j++) {
  8. lock.lock();
  9. try {
  10. i++;
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. } finally {
  14. lock.unlock();
  15. }
  16. }
  17. }
  18. public static void main(String[] args) throws InterruptedException {
  19. final ReenterLock tl = new ReenterLock();
  20. final Thread t1 = new Thread(tl);
  21. final Thread t2 = new Thread(tl);
  22. t1.start();
  23. t2.start();
  24. t1.join();
  25. t2.join();
  26. System.out.println(i);
  27. }
  28. }

除了使用上的灵活性以外,重入锁还提供了一些高级功能。比如重入锁可以提供中断处理的能力。

  1. 中断响应
  1. import java.util.concurrent.TimeUnit;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. public class IntLock implements Runnable {
  4. private static final ReentrantLock lock1 = new ReentrantLock();
  5. private static final ReentrantLock lock2 = new ReentrantLock();
  6. int lock;
  7. /**
  8. * 控制加锁顺序 ,方便构造死锁
  9. *
  10. * @param lock
  11. */
  12. public IntLock(int lock) {
  13. this.lock = lock;
  14. }
  15. @Override
  16. public void run() {
  17. try {
  18. if (lock == 1) {
  19. lock1.lockInterruptibly();
  20. TimeUnit.SECONDS.sleep(1);
  21. lock2.lockInterruptibly();
  22. } else {
  23. lock2.lockInterruptibly();
  24. TimeUnit.SECONDS.sleep(2);
  25. lock1.lockInterruptibly();
  26. }
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. System.out.println(Thread.currentThread().getName() + ":线程中断");
  30. } finally {
  31. if (lock1.isHeldByCurrentThread()) {
  32. lock1.unlock();
  33. }
  34. if (lock2.isHeldByCurrentThread()) {
  35. lock2.unlock();
  36. }
  37. System.out.println(Thread.currentThread().getName() + ":线程退出");
  38. }
  39. }
  40. public static void main(String[] args) throws InterruptedException {
  41. final IntLock r1 = new IntLock(1);
  42. final IntLock r2 = new IntLock(2);
  43. final Thread t1 = new Thread(r1);
  44. final Thread t2 = new Thread(r2);
  45. t1.start();
  46. t2.start();
  47. TimeUnit.MILLISECONDS.sleep(500);
  48. //中断其中一个
  49. t2.interrupt();
  50. }
  51. }
  1. 锁申请等待限时

除了等待外部通知之外,要避免死锁还有另外一种方法,那就是限时等待。
可以使用tryLock方法进行一次限时等待。

  1. public class TimeLock implements Runnable {
  2. private static final ReentrantLock lock = new ReentrantLock();
  3. @Override
  4. public void run() {
  5. try {
  6. if (lock.tryLock(5, TimeUnit.SECONDS)) {
  7. System.out.println("获得了锁:" + Thread.currentThread().getName());
  8. TimeUnit.SECONDS.sleep(6);
  9. } else {
  10. System.out.println(Thread.currentThread().getName() + " get lock failed");
  11. }
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } finally {
  15. if (lock.isHeldByCurrentThread()) {
  16. //没有获取锁,解锁会提示 IllegalMonitorStateException
  17. lock.unlock();
  18. }
  19. }
  20. }
  21. public static void main(String[] args) {
  22. final TimeLock tl = new TimeLock();
  23. final Thread t1 = new Thread(tl);
  24. final Thread t2 = new Thread(tl);
  25. t1.start();
  26. t2.start();
  27. }
  28. }
  1. 公平锁

大多数情况下,锁的申请都是非公平的。公平锁的一大特点是:它不会产生饥饿现象,只要排队,最终还是可以得到资源的。如果使用synchronized关键值进行锁控制,那么产生的锁就是非公平的。
公平锁看起来很优美,但是要实现公平锁必然要求维护一个有序队列,因此公平锁的实现成本比较高,新能却非常低下,因此,在默认的情况下,锁是非公平的。如果没有特别的需求,则不需要使用公平锁。公平锁和非公平锁在现场调度表现上也是非常不一样的。

3.1.2 重入锁的好搭档:Condition

利用Condition对象,我们可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。
Condition接口提供的基本方法如下

  1. public class ReenterLockCondition implements Runnable {
  2. public static final ReentrantLock lock = new ReentrantLock();
  3. public static final Condition condition = lock.newCondition();
  4. @Override
  5. public void run() {
  6. try {
  7. lock.lock();
  8. condition.await();
  9. System.out.println("Thread is going on ");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. } finally {
  13. lock.unlock();
  14. }
  15. }
  16. public static void main(String[] args) throws InterruptedException {
  17. final ReenterLockCondition tl = new ReenterLockCondition();
  18. final Thread t1 = new Thread(tl);
  19. t1.start();
  20. TimeUnit.SECONDS.sleep(2);
  21. /**
  22. *使用Condition.await方法时,要求线程持有相关重入锁。在调用Condition.await方法时,会释放这把锁
  23. * 在调用condition.signal() ,也要求线程先获得相关的锁,在调用signal后,也要释放相关的锁
  24. *
  25. */
  26. lock.lock();
  27. //IllegalMonitorStateException
  28. condition.signal();
  29. lock.unlock();
  30. }
  31. }

3.1.3 允许多个线程同时访问:信号量(Semaphore)

信号量为多线程协作提供了更为强大的控制方法。从广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemapDemo implements Runnable {
    //构造方法 Boolean是为了表示是否是公平锁
    final Semaphore semp = new Semaphore(5, true);
    @Override
    public void run() {
        try {
            semp.acquire();
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getId() + " is Done!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();
        }
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);

        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            exec.submit(demo);
        }
    }
}

new Semaphore(5) 意味着同时可以有5个线程进入执行。

3.1.5 倒计数器:CountDownLatch

这个工具类通常用来控制线程等待,它可以让某一线程等待直到倒计数结束,在开始执行。

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class CountDownLatchDemo implements Runnable {
    static final CountDownLatch end = new CountDownLatch(10);
    static final CountDownLatchDemo demo = new CountDownLatchDemo();

    private final static Random random = new Random(System.currentTimeMillis());


    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(10));
            System.out.println("check complete");
            end.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            exec.submit(demo);
        }
        //等待检查
        end.await();
        System.out.println("Fire");
        exec.shutdown();
    }
}

3.1.6 循环栅栏:CyclicBarrier

CyclicBarrier是另外一种多线程并发控制工具。和CountDownLatch非常类似,它可以以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大。
CyclicBarrier的使用场景也很丰富。比如,司令下达命令,要求10个士兵一起去完成一项任务,这是就会要求10个士兵先集合报道,接着,一起雄赳赳,气昂昂地去执行任务,当10个士兵把自己手上的任务都执行完成了,那么司令才能对外宣布,任务完成。

3.1.7 线程阻塞工具类:LockSupport

LockSuppore是一个非常实用的线程阻塞工具,它可以在任意位置让线程阻塞,与Thread.suspend方法相比,弥补了由于resume方法发生导致线程无法继续执行的情况。,与Object.wait方法相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。
LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法。他们可以实现一个限时的等待。

import java.util.concurrent.locks.LockSupport;
public class LockSupportDemo {
    public static Object u = new Object();

    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in " + getName());
                LockSupport.park();
            }
        }
    }

    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");

    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }
}

这里将原来的suspend()方法和resume()方法用park方法和unpark方法做了替换。当然,我们无法保证unpark方法发生在park方法之后,但是执行这段代码可以发现,它自始至终都可以正常地结束,不会因为park方法而导致线程永久挂起。

即使unpark方法操作发生在park方法之前,它可以使下一次park方法操作立即返回。

同时,处于park方法挂起挂起状态的线程不会想suspend方法那样还出一个令人费解的Runnable状态。它会给出一个WAITTING状态,甚至还会标注是park方法引起的。
如果使用park(Object)函数,那么还可以为当前的线程设置一个阻塞对象。这个阻塞对象会出现在线程的Dump中。这样分析问题时,就更加方便了。
除了定时阻塞的功能,LockSupport.park方法还支持中断影响。但是和其他接受中断的函数恨不一样,LockSupport.park方法不会抛出InterruptedException异常,它只会默默返回,但是可以从Thread.interrupted等方法中获得中断标记。

3.1.8 Guava和RateLimiter限流

任何应用和模块组件都有一定的访问速率上限,如果请求速率突破了这个上限,不但多余的请求无法处理,甚至会压垮系统所有的请求均无法有效处理。
一种简单的限流算法就是给出一个单位时间,然后使用一个计数器counter统计单位时间内收到的请求数量,当请求数量超过门限时,余下的请求丢弃或者等待,但这种简单的算法有一个严重的问题,就是很难控制边界时间上的请求。假设请求时间单位是1秒,每秒请求不超过10个,如果在一秒的前半秒没有请求,而后半秒有10个请求,下一秒的前半秒又有10个请求,那么在中间的一秒内,就会合理处理20个请求,这明显违反了限流的基本需求。
一般化的限流算法有两种:漏桶算法和令牌算法
漏桶算法:利用一个缓存区,当有请求进入系统时,无论请求的速率如何,都先在缓存区内保存,然后以固定的流速流出缓存区进行处理。
令牌算法:一种反向的漏桶算法。在令牌桶算法中。桶中存放的不再是请求,而是令牌。处理程序只有拿到令牌后,才能对请求进行处理。如果没有令牌,那么处理程序要么丢弃请求,要么等待可用的令牌。为了限制流速,该算法在单个时间产生一定量的令牌存入桶中。
RateLimiter正是采用了令牌桶算法。

import com.google.common.util.concurrent.RateLimiter;
import org.joda.time.DateTime;

public class RateLimiterDemo {
    public static final String FORMAT_Date = "yyyy-MM-dd HH:mm:ss:SSS";
    //限制每秒处理2个请求
    static RateLimiter limiter = RateLimiter.create(2);

    public static class Task implements Runnable {

        @Override
        public void run() {
            System.out.println(new DateTime().toString(FORMAT_Date));
        }
    }

    public static void main(String[] args) { 
        for (int i = 0; i < 50; i++) {
            //控制流量
            // limiter.acquire();

            //在某些场景中,如果系统无法处理请求,为了保证服务质量,
            //更倾向于直接丢弃过载请求,从而避免可能的奔溃
            if (!limiter.tryAcquire()) {
                continue;
            }
            new Thread(new Task()).start();
        }
    }
}

3.2 线程复用:线程池

3.2.3 刨根究底:核心线程池的内部实现

参数

  • corePoolSize 指令了线程池中的线程数量
  • maximumPoolSize 指定了线程池中的最大线程数量
  • keepAliveTime 当线程池数量超过corePoolSize时,多于的空闲线程的存活时间,超过corePooSize的空闲线程,在多长时间内会被销毁。
  • unit keepAliveTime 时间单位
  • workQueue 任务队列,被提交单尚未被执行的任务
  • threadFactory 线程工厂,用于创建线程
  • handler 拒绝策略,单任务太多来不及处理时,如何拒绝任务。

参数任务队列指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,根据功能分类,在ThreadPoolExector类的构造函数中可使用以下几个BlockingQueue接口。

  • 直接提交的队列:该功能由SynchronousQueue对象提供,SynchronousQueue是一个特殊的BlockingQueue,SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之。每一个删除操作都要等待对应的插入操作。

3.2.4 超负载了怎么办:拒绝策略

ThreadPoolExecutor类的最后一个参数指定了拒绝策略。也就是当任务数量超过系统实际承受能力时,就要用到拒绝策略了。拒绝策略可以说是系统超负荷运行时的补救措施,通常由于大力太大而引起的。也就是线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列中也已经排满了,再也放不下新任务了。

JDK内置的拒绝策略如下:

  • AbortPolicy:该策略会直接抛出异常,阻止系统正常工作
  • CallerRunsPolicy 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务。
  • DiscardOldestPolicy :将丢弃最老的一个请求,也就是基金被执行的一个任务,并尝试再次提交当前任务
  • DiscardPolicy 该策略默默地丢弃无法处理的任务,不予任何处理。如果运行丢弃任务,这可能是最好的一种方案了吧。

    3.2.5 自定义线程创建:ThreadFactory

    自定义线程池可以帮助我们做不少事。比如我们可以跟踪线程池究竟在合适创建了多少线程,也可以自定义线程的名称,组以及优先级等消息,甚至可以任性地将所有的线程设置为守护线程。
    下面设置自定义的ThreadFactory,一方面记录线程的创建,另一个方面将所有的线程都设置为守护线程,这样,当主线程退出后,将会强制销毁线程池。

    public class ThreadPoolDemo {
      public static class MyTask implements Runnable {
          @Override
          public void run() {
              System.out.println(System.currentTimeMillis()
                      + ":Thread ID:" + Thread.currentThread().getId()
              );
    
              try {
                  TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
    
      public static void main(String[] args) throws InterruptedException {
          final MyTask myTask = new MyTask();
          final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                  new SynchronousQueue<Runnable>(),
                  r -> {
                      final Thread thread = new Thread(r);
                      thread.setDaemon(true);
                      System.out.println("ctreate " + thread);
                      return thread;
                  }
          );
    
          for (int i = 0; i < 5; i++) {
              threadPoolExecutor.submit(myTask);
          }
          TimeUnit.SECONDS.sleep(2);
      }
    }
    

3.2.6 扩展线程池

对线程池做一些扩展,比如监控每个任务执行的开始时间和结束时间,或者自定义其他一些的增强功能。

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在执行 Thread ID:" + Thread.currentThread().getId()
                    + " ,Task name=" + name);
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) throws InterruptedException {
            final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingDeque<Runnable>()) {
                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.out.println("准备执行" + ((MyTask) r).name);
                }

                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.out.println("执行完成" + ((MyTask) r).name);
                }

                @Override
                protected void terminated() {
                    System.out.println("线程退出");
                }
            };
            for (int i = 0; i < 5; i++) {
                MyTask task = new MyTask("Task-GEYM-" + i);
                poolExecutor.execute(task);
                TimeUnit.MILLISECONDS.sleep(100);
            }
            poolExecutor.shutdown();
        }
    }
}

3.2.8 堆栈去哪里了:在线程池中寻找堆栈

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DivTask implements Runnable {
    int a, b;

    public DivTask(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
        final ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS
                , new SynchronousQueue<>());
        for (int i = 0; i < 5; i++) {
            //只有4个输出
            pools.submit(new DivTask(100, i));
        }
    }
}

上面最简单的方法就是放弃**submit**方法,改用execute()方法。

3.2.10 Guava中对线程池的扩展

3.3 不要重复发明轮子:JDK的并发容器

3.3.1 并发集合

  • ConcurrentHashMap:一个高效的并发HashMap。可以理解为一个线程安全的HashMap
  • CopyOnWriteArrayList
  • ConcurrentLinkedQueue 高效的并发队列,使用链表实现,可以看做是一个线程安全的LinkedList
  • BlockingQueue 一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常使用作为你数据共享的通道。
  • ConcurrentSkipListMap 跳表的实现,一个Map。使用跳表的数据结构进行快速查找。

4.3 人手一笔:ThreadLocal

4.3.1 ThreadLocal的简单使用

这是一个线程局部变量。

4.3.2 ThreadLocal实现原理

6 Java8/9/10与并发

6.1 Java8的函数式编程简介

6.1.1 函数作为一等公民

6.2 函数式编程基础

6.2.1 FunctionalInterface

Java8提出了函数式接口的概念。所谓函数式接口,简单地说,就是值定义了单一抽象方法的接口。比如下面的定义:

@FunctionalInterface
public interface IntHandler {
    void handle(int i);
}

注释FunctionalInterface用于表明IntHandler接口是一个函数式接口,该接口被定义为只包含一个抽象方法handle(),因为它符合函数式接口的定义。如果一个函数满足函数式接口的定义,那么及时不标注@FunctionalInterface,编译器依然会把它看做函数式接口。

需要注意的是,函数式接口只能有一个抽象方法,而不是只能有一个方法。这分亮点来说明:首先,Java8中,接口运行存在实例方法,其次,任何被java.lang.Object实现的方法,都不能视为抽象方法,因此NonFunc接口不是函数式接口,因为equals方法已经在java.lang.Object中已经实现

interface NonFunc {
    boolean equals(Object obj);
}

同理,下面实现的IntHandler接口韩式符合函数式接口要求,它虽然看起来不像函数式接口,但实际上确是一个完全符合规范的函数式接口

@FunctionalInterface
public interface IntHandler {
    void handle(int i);
    boolean equals(Object obj);
}

6.2.2 接口默认方法

从Java8开始,接口也可以包含若干个实例方法,这一改进使得Java8拥有了类似于多继承的能力。

public interface IHorse {
    void eat();
    default void run() {
        System.out.println("hourse run ");
    }
}

在Java8中,使用default关键字可以在接口内定义实例方法。注意,这个方法并非抽象方法,而是拥有特定逻辑的具体实现方法。

public interface IAnimal {
    default void breath() {
        System.out.println("breath");
    }
}
public class Mule implements IHorse, IAnimal {
    @Override
    public void eat() {
        System.out.println("Mule.eat");
    }
    public static void main(String[] args) {
        final Mule mule = new Mule();
        mule.run();
        mule.eat();
    }
}

Mule实例同时拥有来自不同接口的实现方法,在这Java8之前是做不到的。从某种程度上,这种模式可以弥补Java单一继承的一些不便。但同时也要知道,它也将遇到和多继承相同的问题。

public interface IDonkey {
    void eat();
    default void run() {
        System.out.println("IDonkey.run");
    }
}

public class Mule implements IHorse,IDonkey, IAnimal {
    @Override
    public void eat() {
        System.out.println("Mule.eat");
    }

    @Override
    public void run() {
        IHorse.super.run();
    }

    public static void main(String[] args) {
        final Mule mule = new Mule();
        mule.run();
        mule.eat();
    }
}

上面的Mule将run方法委托给IHorse,当然,也可以自己实现。

6.2.3 lambda表达式

和匿名对象一样,lambda表达式也可以访问外部的局部变量。

final int num = 2;
final Function<Integer, Integer> stringConverter = (from) -> from * num;
System.out.println(stringConverter.apply(3));

上述代码可以编译通过,正常执行并输出,与匿名内部对象一样,在这种情况下,外部的num变量必须声明为final,这样才能保证在lambda表达式中合法的访问它。

6.2.4 方法引用

方法引用是Java8中提出的用来简化lambda表达式的一种手段。它通过类名和方法名来定位一个静态方法或者实例方法。

6.5 增强的Future:CompletableFuture

6.5.1 完成了就通知我

CompletableFuture和Future一样,可以作为函数调用的契约。向CompletableFuture请求一个数据,如果数据还没有准备好,请求线程就会等待,而让人惊喜的是,我们可以手动设置CompletableFuture的完成状态。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @author study
 * @version 1.0
 * @date 2021/5/17 11:38
 */
public class AskThread implements Runnable {
    CompletableFuture<Integer> re = null;

    public AskThread(CompletableFuture<Integer> re) {
        this.re = re;
    }

    @Override
    public void run() {
        int myRe = 0;
        try {
            myRe = re.get() * re.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(myRe);
    }

    public static void main(String[] args) throws InterruptedException {
        final CompletableFuture<Integer> future = new CompletableFuture<>();
        new Thread(new AskThread(future)).start();
        //模拟长时间的计算过程
        TimeUnit.SECONDS.sleep(1);
        //告知完成的结果
        future.complete(60);
    }
}

6.5.2 异步执行任务

public static Integer calc(Integer para){

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return para*para;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> calc(50));
    System.out.println(future.get());
}

6.5.6 支持timeout的CompletableFuture

在JDK9以后增加的timeout功能。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureTimeOut {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;

        }).orTimeout(1, TimeUnit.SECONDS).exceptionally(e -> {
            System.err.println(e);
            return 0;
        }).thenAccept(System.out::println);
    }
}

6.6 读写锁的改进:StampedLock

StampedLock是Java8中引入的一种新的锁机制,可以认为它是读写锁的一个改进版本。读写锁虽然分离了读和写的功能,使得读与读之间可以完全并发。但是,读和写之间依然是冲突的。读锁是完全阻塞写锁,它使用的依然是悲观的锁策略,如果有大量的读线程,它有可能引起写线程的“饥饿”。
而StampedLock则提供了一个乐观读策略。这种乐观的锁非常类似无锁操作,使得乐观锁完全不会阻塞写线程。

6.6.1 StampedLock使用示例

public class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        final long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaX;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() {
        final long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!sl.validate(stamp)) {
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

6.6.2 StampedLock的小陷阱

StampedLock内部实现时,使用类似于CAS操作的死循环反复尝试的策略。在它挂起线程时,使用的是Unsafe.park()函数,而park函数在遇到线程中断时,也会直接返回(不同意Thread.sleep方法,它不会抛出异常)。而在StampedLock的死循环逻辑中,没有处理有关中断的逻辑。因此,这就会导致阻塞在park方法上的线程被中断后,再次进入循环。而当退出条件得不到满足时,就会发送疯狂占用CPU的情况,这一点是需要值得注意。

6.7 原子类的增强

Java8引入了LongAdder类。

6.7.1 更快的原子类:LongAdder

仿造ConcurrentHashMap,将特点数据分离。比如,可以将AtomicInteger内部核心数据value分离成一个数组。每个线程访问时,通过哈希算法映射到其中一个数字进行计数,而最终的计数结果则为这个数组的求和累加。

6.8 ConcurrentHashMap的增强

Java1.8以后,oncurrentHashMap有了一些API的增强,其中很多增强接口与lambda表达式有关。

6.8.1 foreach操作