四种创建线程的方式:

  1. 继承Thread类(是一个Runnable接口的实现类)
  2. 实现Runnable接口(常用,使用lambda表达式扔到Thread类中执行)
  3. 实现callable接口(有返回值和范型,使用FutureTask类,再扔到Thread类中执行)
  4. 使用Executors来创建线程池再创建线程(使用ThreadPool)

    lambda与内部类

    ```java public class TestLambda { // 2.静态内部类 static class Like2 implements Like{

    1. @Override
    2. public void lambda() {
    3. System.out.println("l2");
    4. }

    }

    public static void main(String[] args) {

    1. // 1.传统方法
    2. Like l1 = new ILike();
    3. l1.lambda();
    4. // 2.静态内部类
    5. Like2 l2 = new Like2();
    6. l2.lambda();
    7. // 3.局部内部类
    8. class Like3 implements Like{
    9. @Override
    10. public void lambda() {
    11. System.out.println("l3");
    12. }
    13. }
    14. Like3 l3 = new Like3();
    15. System.out.println("l3");
    16. // 4. 匿名内部类
    17. Like l4 = new Like(){
    18. @Override
    19. public void lambda() {
    20. System.out.println("l4");
    21. }
    22. };
    23. l4.lambda();
    24. // 5. lambda
    25. Like l5 = ()->System.out.println("l5");
    26. l5.lambda();

    }

} // 定义一个接口 interface Like{ void lambda(); }

// 定义接口实现类 class ILike implements Like{

  1. @Override
  2. public void lambda() {
  3. System.out.println("l1");
  4. }

}

  1. <a name="GJUMu"></a>
  2. ### Runnable + Synchronized锁 + wait、notifyAll线程通信方式
  3. - Synchronized锁,普通方法锁的是实例对象,静态方法锁的是class类模板。
  4. - Synchronized锁有两种使用方式
  5. 1. 锁方法
  6. public Synchronized void add(){}
  7. 2. 锁对象 (一般锁有共享访问属性的对象)
  8. Synchronized(obj){}
  9. - 生产者消费者模式:判断 -> 不满足等待(wait) -> 满足就执行并唤醒(notifyAll)
  10. ```java
  11. /**
  12. * synchronized方法版
  13. * 使用OOP类封装属性和同步方法,再去调用Thread线程,传入实例对象的方法开启多线程
  14. */
  15. public class TestJucSyn {
  16. public static void main(String[] args) {
  17. // 实例一个需要多线程操作的对象
  18. Fun fun = new Fun();
  19. // 使用lambda表达式简化
  20. // 将对象需要多线程操作的方法用lambda重写run()方法
  21. // 再传给Thread接口开启多线程操作
  22. // 模板 new Thread(()->{}, "Name").start();
  23. // 开启4个线程
  24. new Thread(()->{
  25. // @Override
  26. // public void run(){
  27. for (int i = 0; i < 10; i++) {
  28. try {
  29. fun.increament();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }, "A").start();
  35. new Thread(()->{
  36. for (int i = 0; i < 10; i++) {
  37. try {
  38. fun.decreament();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }, "B").start();
  44. new Thread(()->{
  45. for (int i = 0; i < 10; i++) {
  46. try {
  47. fun.increament();
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }, "C").start();
  53. new Thread(()->{
  54. for (int i = 0; i < 10; i++) {
  55. try {
  56. fun.decreament();
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. }, "D").start();
  62. }
  63. }
  64. class Fun {
  65. private int num = 0;
  66. public synchronized void increament() throws InterruptedException {
  67. // 使用while 防止多线程唤醒失败
  68. while (num == 1) {
  69. this.wait();
  70. }
  71. num++;
  72. System.out.println(Thread.currentThread().getName() +"=>"+ num);
  73. this.notifyAll();
  74. }
  75. public synchronized void decreament() throws InterruptedException {
  76. while (num == 0) {
  77. this.wait();
  78. }
  79. num--;
  80. System.out.println(Thread.currentThread().getName() +"=>"+ num);
  81. this.notifyAll();
  82. }
  83. }
  84. //输出
  85. /*
  86. A=>1
  87. B=>0
  88. A=>1
  89. B=>0
  90. A=>1
  91. B=>0
  92. A=>1
  93. B=>0
  94. A=>1
  95. B=>0
  96. A=>1
  97. B=>0
  98. A=>1
  99. B=>0
  100. A=>1
  101. B=>0
  102. A=>1
  103. B=>0
  104. A=>1
  105. B=>0
  106. C=>1
  107. D=>0
  108. C=>1
  109. D=>0
  110. C=>1
  111. D=>0
  112. C=>1
  113. D=>0
  114. C=>1
  115. D=>0
  116. C=>1
  117. D=>0
  118. C=>1
  119. D=>0
  120. C=>1
  121. D=>0
  122. C=>1
  123. D=>0
  124. C=>1
  125. D=>0
  126. 进程已结束,退出代码为 0
  127. */

Runnable + Lock锁 + condition线程通信方式

  • 使用Lock锁需要先 Lock lock = new ReentrantLock();
  • Lock的线程通信需要先 Condition condition = lock.newCondition(); ,try 语句块加锁 lock.lock(), finally语句块解锁 lock.unlock; ```java import sun.jvm.hotspot.memory.ContiguousSpace;

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

/**

  • Lock方法版
  • 使用OOP类封装属性和同步方法,再去调用Thread线程,传入实例对象的方法开启多线程 */ public class TestJucLock { public static void main(String[] args) {

    1. // 实例一个需要多线程操作的对象
    2. Funs fun = new Funs();
    3. // 将对象需要多线程操作的方法 传给Thread接口开启多线程操作
    4. new Thread(()->{
    5. for (int i = 0; i < 10; i++) {
    6. try {
    7. fun.increament();
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. }
    12. }, "A").start();
    13. new Thread(()->{
    14. for (int i = 0; i < 10; i++) {
    15. try {
    16. fun.decreament();
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. }
    21. }, "B").start();
    22. new Thread(()->{
    23. for (int i = 0; i < 10; i++) {
    24. try {
    25. fun.increament();
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. }
    30. }, "C").start();
    31. new Thread(()->{
    32. for (int i = 0; i < 10; i++) {
    33. try {
    34. fun.decreament();
    35. } catch (InterruptedException e) {
    36. e.printStackTrace();
    37. }
    38. }
    39. }, "D").start();

    } }

class Funs { private int num = 0;

  1. // 使用Lock 需要先在类的作用域定义,不要定义到了方法作用域里面了,
  2. // 不然每次线程调用方法,都会新建一个锁,就锁不住了
  3. Lock lock = new ReentrantLock();
  4. Condition condition = lock.newCondition();
  5. public void increament() throws InterruptedException {
  6. try {
  7. lock.lock();
  8. // 使用while 防止多线程唤醒失败
  9. while (num == 1) {
  10. condition.await();
  11. }
  12. num++;
  13. System.out.println(Thread.currentThread().getName() +"=>"+ num);
  14. condition.signalAll();
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. } finally {
  18. lock.unlock();
  19. }
  20. }
  21. public void decreament() throws InterruptedException {
  22. try {
  23. lock.lock();
  24. // 使用while 防止多线程唤醒失败
  25. while (num == 0) {
  26. condition.await();
  27. }
  28. num--;
  29. System.out.println(Thread.currentThread().getName() +"=>"+ num);
  30. condition.signalAll();
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. } finally {
  34. lock.unlock();
  35. }
  36. }

}

// 输出 // 运行结果与上面类似

  1. <a name="AWYUq"></a>
  2. ### Callbale
  3. ```java
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.ExecutionException;
  6. import java.util.concurrent.FutureTask;
  7. public class TestJucCallable {
  8. public static void main(String[] args) throws ExecutionException, InterruptedException {
  9. // new Thread(new Runnable()).start()
  10. // FutureTask是Runnable的实现类
  11. // new Thread(new FutureTask<V>(Callable)).start()
  12. MythreadCall call = new MythreadCall();
  13. FutureTask futureTask = new FutureTask(call);
  14. new Thread(futureTask, "A").start();
  15. new Thread(futureTask, "B").start(); // 不会打印call,A的结果会被缓存
  16. Integer o = (Integer)futureTask.get(); // 返回值
  17. System.out.println(o);
  18. }
  19. }
  20. class MythreadCall implements Callable<Integer> {
  21. @Override
  22. public Integer call() throws Exception {
  23. System.out.println("call");
  24. return 123;
  25. }
  26. }
  27. // 输出
  28. /*
  29. call
  30. 123
  31. 进程已结束,退出代码为 0
  32. */

线程池

  • 线程池接口:ExecutorService 。 ExecutorService接口继承自Executor接口
  • 线程池的工厂类:Executors(不推荐使用)
  • ThreadPoolExecutor(推荐)

    1. ExecutorService service = new ThreadPoolExecutor();

    Executors返回的线程池对象的弊端:

  1. FixedThreadPool 和 SingleThreadPolol:

允许的请求队列长度为 Integer.MAX_VALUE(21亿),可能会堆积大量的请求,从而导致OOM。

  1. CachedThreadPool 和 ScheduleThreadPool:

允许的创建线程数量为Integer.MAX_VALUE,可能会堆积大量的线程,从而导致OOM。

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class TestThreadPool{
  4. public static void main(String[] args) {
  5. // 创建线程池
  6. ExecutorService service = Executors.newFixedThreadPool(5);
  7. for (int i = 1; i <= 10; i++) {
  8. service.execute(()->{
  9. System.out.println(Thread.currentThread().getName());
  10. });
  11. }
  12. // 线程池关闭
  13. service.shutdown();
  14. }
  15. }
  16. // 输出,可以看出最多只有5个线程
  17. /*
  18. pool-1-thread-1
  19. pool-1-thread-2
  20. pool-1-thread-3
  21. pool-1-thread-4
  22. pool-1-thread-5
  23. pool-1-thread-1
  24. pool-1-thread-2
  25. pool-1-thread-3
  26. pool-1-thread-4
  27. pool-1-thread-3
  28. 进程已结束,退出代码为 0
  29. */

JUC并发

集合

  • 普通集合都是线程不安全的
  • List、Map、Set集合都可以使用Collections.synchronizedXXX()进行加锁实现线程安全
  • 直接使用JUC下的线程安全集合 CopyOnWriteArrayList()、CopyOnWriteArraySet()、ConcurrentHashMap()、ConcurrentLinkedDeque()、ConcurrentLinkedQueue()

ConcurrentHashMap()为例:

  1. import java.util.Map;
  2. import java.util.UUID;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. public class TestConcurrentHashMap {
  5. public static void main(String[] args) {
  6. // HashMap默认初始值: new HashMap<>(16, 0.75) 16的容量,0.75的装填因子
  7. Map<String, String> map = new ConcurrentHashMap<>();
  8. for (int i = 0; i < 30; i++) {
  9. new Thread(()->{
  10. map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
  11. System.out.println(map);
  12. }, String.valueOf(i)).start();
  13. }
  14. }
  15. }

常用辅助类

CountDownLatch(线程减法计数器)

允许一个或多个线程等待,直到在其他线程中执行的一组操作完成同步辅助(说白了就是一个线程减法计数器)

  1. import java.util.concurrent.CountDownLatch;
  2. public class CountDownLatchDemo {
  3. // 这里Main函数不应该抛出异常的,但是这里目的是为了简化演示代码就不写try catch了
  4. public static void main(String[] args) Throws InterruptedException{
  5. // 线程中的辅助计数器
  6. CountDownLatch countDownLatch = new CountDownLatch(6);
  7. for (int i = 1; i <= 6; i++) {
  8. new Thread(()->{
  9. System.out.println(Thread.currentThread().getName()+" Go out");
  10. countDownLatch.countDown(); // 计数器-1
  11. }, String.valueOf(i)).start();
  12. }
  13. countDownLatch.await(); // 等待线程计数器归零,才会继续往下执行代码
  14. System.out.println("Close Door");
  15. }
  16. }
  17. // 输出
  18. /*
  19. 1 Go out
  20. 2 Go out
  21. 3 Go out
  22. 4 Go out
  23. 5 Go out
  24. 6 Go out
  25. Close Door
  26. 进程已结束,退出代码为 0
  27. */

CyclicBarrier(线程加法计数器)

  1. import java.util.concurrent.BrokenBarrierException;
  2. import java.util.concurrent.CyclicBarrier;
  3. public class CyclicBarrierDemo {
  4. public static void main(String[] args) {
  5. /**
  6. * 用多线程实现 集齐7个龙珠召唤神龙
  7. */
  8. // 第一个参数是计数器激活值,第二参数是到达激活值运行的一个Runnable
  9. CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
  10. System.out.println("召唤神龙");
  11. });
  12. for (int i = 1; i <= 7; i++) {
  13. // lambda由于是匿名内部类的进化版,是拿不到外部的for循环的i值的
  14. // 需要将i值进行finnal处理为一个常量
  15. final int temp = i;
  16. new Thread(()->{
  17. System.out.println(Thread.currentThread().getName()+ "==>收集第" + temp + "个龙珠");
  18. try {
  19. // 线程等待,直到满足线程计数设定值才会往下继续执行
  20. // 返回值是当前线程计数值,每个新线程会自动加一
  21. int await = cyclicBarrier.await();
  22. // 召唤神龙之后才会打印,多线程打印,随机顺序
  23. System.out.println(await);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } catch (BrokenBarrierException e) {
  27. e.printStackTrace();
  28. }
  29. }).start();
  30. }
  31. }
  32. }
  33. // 输出
  34. /*
  35. Thread-0==>收集第1个龙珠
  36. Thread-1==>收集第2个龙珠
  37. Thread-2==>收集第3个龙珠
  38. Thread-3==>收集第4个龙珠
  39. Thread-4==>收集第5个龙珠
  40. Thread-5==>收集第6个龙珠
  41. Thread-6==>收集第7个龙珠
  42. 召唤神龙
  43. 0
  44. 4
  45. 3
  46. 2
  47. 5
  48. 6
  49. 1
  50. 进程已结束,退出代码为 0
  51. */

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch是减法计数器,CyclicBarrier是加法计数器。
  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CountDownLatch和CyclicBarrier都有让多个线程等待同步然后再开始下一步动作的意思,但是CountDownLatch的下一步的动作实施者是主线程,具有不可重复性;而CyclicBarrier的下一步动作实施者还是“其他线程”本身,具有往复多次实施动作的特点。
  • CyclicBarrier还提供其他有用的方法,getNumberWaiting()方法可以获得阻塞线程的数量;isBroken()方法可以了解阻塞的线程是否被中断。

    Semaphore(规定线程数量,限流作用)

    有点wait()、notifyAll()的味道,但是是自动计算阈值的,并且控制的是线程

  • acquire() 获取,也就是激活一个线程,直到满足设定的线程最大数量时进入等待状态

  • release() 释放,释放当前的线程,线程数量+1,然后唤醒等待线程

作用

  1. 多个共享资源互斥的使用
  2. 并发限流,控制最大线程数 ```java import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;

public class SemaphoreDemo { /*

  1. * 完成停车位功能
  2. * @param args
  3. */
  4. public static void main(String[] args) {
  5. // 参数就是信号量,也就是限制线程数量
  6. Semaphore semaphore = new Semaphore(3);
  7. for (int i = 1; i <=6 ; i++) {
  8. // 只会运行3个线程运行
  9. new Thread(()->{
  10. try {
  11. // 获取,也就是激活一个线程
  12. semaphore.acquire();
  13. System.out.println(Thread.currentThread().getName()+"==>进入车位");
  14. // 线程等待2s
  15. TimeUnit.SECONDS.sleep(1);
  16. System.out.println(Thread.currentThread().getName()+"==>离开车位");
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. finally {
  21. semaphore.release(); // 释放线程
  22. }
  23. }).start();
  24. }
  25. }

}

/* 输出:

Thread-0==>进入车位 Thread-2==>进入车位 Thread-1==>进入车位 Thread-1==>离开车位 Thread-2==>离开车位 Thread-3==>进入车位 Thread-0==>离开车位 Thread-4==>进入车位 Thread-5==>进入车位 Thread-5==>离开车位 Thread-3==>离开车位 Thread-4==>离开车位

进程已结束,退出代码为 0

*/

  1. <a name="AARPh"></a>
  2. ### 读写锁
  3. <a name="Pv4zV"></a>
  4. #### ReadWriteLock
  5. 作用:
  6. - 比ReentrantLock有更小的细粒度,可以对读写分别加锁,并且写入锁是**排他锁**,读取锁是**共享锁**
  7. - 在需要加锁的类的属性域中定义:**ReadWriteLock readWriteLock = new ReentrantReadWriteLock();**
  8. 在加锁方法中**try块**进行加锁**readWriteLock.writeLock().lock();**<br />**finally块**进行解锁 **readWriteLock.writeLock().unlock();**
  9. ```java
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. import java.util.concurrent.locks.ReadWriteLock;
  13. import java.util.concurrent.locks.ReentrantReadWriteLock;
  14. public class ReadWriteLockDemo {
  15. public static void main(String[] args) {
  16. MyCacheLock cache = new MyCacheLock();
  17. // 写入
  18. for (int i = 1; i <= 5; i++) {
  19. final int temp = i;
  20. new Thread(()->{
  21. cache.put(temp+"",temp);
  22. }).start();
  23. }
  24. System.out.println("========="); // main主线程,只会执行一次,而且与其他线程抢夺资源
  25. // 读取
  26. for (int i = 1; i <= 5; i++) {
  27. final int temp = i;
  28. new Thread(()->{
  29. cache.get(temp+"");
  30. }).start();
  31. }
  32. System.out.println("=========");
  33. }
  34. }
  35. // 设计一个缓存
  36. /*
  37. 这里提一句LRU缓存,使用HashMap + 双向链表实现(其实可以直接继承LinkedHashMap),
  38. get和set都将缓存对象提升到链表头,当缓存满了的时候,就删掉链表尾部的缓存
  39. */
  40. class MyCacheLock{
  41. private volatile Map<String, Integer> map = new HashMap<>();
  42. private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  43. // 存入
  44. public void put(String key, int value) {
  45. try {
  46. readWriteLock.writeLock().lock();
  47. System.out.println(Thread.currentThread().getName()+ "==>写入" + key);
  48. map.put(key, value);
  49. System.out.println(Thread.currentThread().getName()+ "==>写入完毕,ok");
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. } finally {
  53. readWriteLock.writeLock().unlock();
  54. }
  55. }
  56. // 读取
  57. public void get(String key) {
  58. try {
  59. readWriteLock.readLock().lock();
  60. System.out.println(Thread.currentThread().getName()+ "==>读取" + key);
  61. Integer i = map.get(key);
  62. System.out.println(Thread.currentThread().getName()+ "==>读取完毕,ok");
  63. } catch (Exception e) {
  64. e.printStackTrace();
  65. } finally {
  66. readWriteLock.readLock().unlock();
  67. }
  68. }
  69. }
  70. // 输出
  71. /*
  72. Thread-0==>写入1
  73. Thread-0==>写入完毕,ok
  74. Thread-1==>写入2
  75. Thread-1==>写入完毕,ok
  76. Thread-2==>写入3
  77. Thread-2==>写入完毕,ok
  78. Thread-3==>写入4
  79. Thread-3==>写入完毕,ok
  80. Thread-4==>写入5
  81. Thread-4==>写入完毕,ok
  82. =========
  83. Thread-5==>读取1
  84. Thread-5==>读取完毕,ok
  85. Thread-6==>读取2
  86. Thread-6==>读取完毕,ok
  87. Thread-7==>读取3
  88. Thread-7==>读取完毕,ok
  89. Thread-8==>读取4
  90. Thread-8==>读取完毕,ok
  91. =========
  92. Thread-9==>读取5
  93. Thread-9==>读取完毕,ok
  94. 进程已结束,退出代码为 0
  95. */

阻塞队列

BlockingQueue(实现类:ArrayBlockingQueue、LinkedBlockingQueue、同步队列SynchronousQueue)

image.png
四组API:
image.png

SynchronousQueue

同步队列,put了一个元素就必须等待take取出来,不然不能再put任何元素!

  1. import java.util.concurrent.BlockingDeque;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.SynchronousQueue;
  4. import java.util.concurrent.TimeUnit;
  5. public class SynchronousQueueDemo {
  6. public static void main(String[] args) {
  7. BlockingQueue<String> synQueue = new SynchronousQueue<>();
  8. new Thread(()->{
  9. try {
  10. System.out.println(Thread.currentThread().getName() + " put 1");
  11. synQueue.put("1");
  12. System.out.println(Thread.currentThread().getName() + " put 2");
  13. synQueue.put("2");
  14. System.out.println(Thread.currentThread().getName() + " put 3");
  15. synQueue.put("3");
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }, "T1").start();
  20. new Thread(()->{
  21. try {
  22. TimeUnit.SECONDS.sleep(1);
  23. System.out.println(Thread.currentThread().getName() + " take " + synQueue.take());
  24. TimeUnit.SECONDS.sleep(1);
  25. System.out.println(Thread.currentThread().getName() + " take " + synQueue.take());
  26. TimeUnit.SECONDS.sleep(1);
  27. System.out.println(Thread.currentThread().getName() + " take " + synQueue.take());
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }, "T2").start();
  32. }
  33. }
  34. // 输出
  35. /*
  36. T1 put 1
  37. T2 take 1
  38. T1 put 2
  39. T2 take 2
  40. T1 put 3
  41. T2 take 3
  42. 进程已结束,退出代码为 0
  43. */

线程池(池化技术)

ExecutorService service = new ThreadPoolExecutor();

线程的 3大方法,7大参数。

3大方法:(都是ThreadPoolExecutor()的不同参数,不推荐使用;推荐直接使用ThreadPoolExecutor())

Executors.newFixedThreadPool(int nThreads); // 创建一个固定的线程池大小
Executors.newCachedThreadPool(); // 可伸缩的线程池
Executors.newSingleThreadExecutor(); // 单个线程
源码:

  1. // newFixedThreadPool
  2. public static ExecutorService newFixedThreadPool(int nThreads) {
  3. return new ThreadPoolExecutor(nThreads, nThreads,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>());
  6. }
  7. // newCachedThreadPool
  8. public static ExecutorService newCachedThreadPool() {
  9. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  10. 60L, TimeUnit.SECONDS,
  11. new SynchronousQueue<Runnable>());
  12. }
  13. // newSingleThreadExecutor
  14. public static ExecutorService newSingleThreadExecutor() {
  15. return new FinalizableDelegatedExecutorService
  16. (new ThreadPoolExecutor(1, 1,
  17. 0L, TimeUnit.MILLISECONDS,
  18. new LinkedBlockingQueue<Runnable>()));
  19. }

通过源码可以看出,3大方法的本质就是调用了 ThreadPoolExecutor(),

7大参数 就是指的ThreadPoolExecutor的参数
  1. public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
  2. int maximumPoolSize, // 最大核心线程池大小
  3. long keepAliveTime, // 无调用,超时释放
  4. TimeUnit unit, // 超时单位
  5. BlockingQueue<Runnable> workQueue, // 阻塞队列
  6. ThreadFactory threadFactory, // 线程工厂
  7. RejectedExecutionHandler handler) // 拒绝策略

image.png
四种拒绝策略 最大线程定义准则(CPU密集型、IO密集型)

  1. import java.util.concurrent.*;
  2. /**
  3. * 实现银行业务办理
  4. */
  5. public class ThreadPoolExecutorDemo {
  6. public static void main(String[] args) {
  7. // 最大线程定义准则:
  8. // 1、CPU密集型
  9. // 2、IO密集型: 定义为程序中非常消耗IO的线程 的两倍
  10. // 获取CPU核数
  11. int i1 = Runtime.getRuntime().availableProcessors();
  12. ExecutorService threadPool = new ThreadPoolExecutor(
  13. 2, // 核心业务窗口数量
  14. i1, // 最大核心业务窗口数量
  15. 3, // 超时释放时间
  16. TimeUnit.SECONDS, // 超时单位
  17. new LinkedBlockingDeque<>(3), // 阻塞队列大小
  18. Executors.defaultThreadFactory(), // 默认工厂类
  19. new ThreadPoolExecutor.AbortPolicy() // 队列满了还有元素进来,不处理并且抛出异常
  20. //new ThreadPoolExecutor.CallerRunsPolicy() // 回自到己的线程去
  21. //new ThreadPoolExecutor.DiscardPolicy() // 队列满了,丢掉任务,不抛出异常
  22. //new ThreadPoolExecutor.DiscardOldestPolicy() // 队列满了,尝试和最早的竞争,不会抛出异常
  23. );
  24. try {
  25. for (int i = 1; i <= 9; i++) {
  26. threadPool.execute(() -> {
  27. System.out.println(Thread.currentThread().getName());
  28. });
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally {
  33. threadPool.shutdown();
  34. }
  35. }
  36. }

四大函数式接口

Function、Predict、Consumer、Supplier

  • Function 函数型接口:范型第一个参数是输入类型,第二个参数是输出类型

image.png

  1. Function<String, String> function = (str)->{return str;};
  2. System.out.println(function.apply("aaa")) // out: "aaa"
  • Predict 断定型接口:一个输入参数,返回只能是 布尔值

image.png

  1. Predict<String> predict= (str)->{return str.isEmpty();};
  2. System.out.println(predict.test("")) // out: true
  • Consumer 消费型接口:只有输入,没有返回值

image.png

  1. Consumer<String> consumer = (str)->{System.out.println(str);};
  2. consumer.accept("asdasda"); // out: asdasda
  • Supplier 供给型接口:没有参数,只有返回值

image.png

  1. Supplier<> supplier = ()->{return 1024;};
  2. System.out.println(supplier.get()); // out: 1024

Stream流式计算

image.png

执行机制:

image.png

  1. users.stream()
  2. .filter(u -> u.getId()%2==0) // 过滤器
  3. .map(u -> u.getAge()+1) // 映射器
  4. //.count(); // 计数器
  5. //.sorted() // 排序
  6. //.min() // 求最小值
  7. //.limit() // 分页
  8. //.collect(Collectors.toList()) // 将流转换成集合和聚合元素。Collectors 可用于返回列表或字符串
  9. .forEach(System.out::println);

ForkJoin

  • 将大任务拆成小任务

image.png

  • 计算类继承ForkJoinTask类,一般继承下面的两个子类(RecursiveAction、RecursiveTask)

image.png

  • 计算类代码 ```java package stream;

import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo extends RecursiveTask {

  1. private Long start;
  2. private Long end;
  3. // 临界值
  4. private Long temp = 10000L;
  5. public ForkJoinDemo(Long start, Long end) {
  6. this.start = start;
  7. this.end = end;
  8. }
  9. @Override
  10. protected Long compute() {
  11. if ((end - start) < temp) {
  12. Long sum = 0L;
  13. for (Long i = start; i <= end; ++i) {
  14. sum += i;
  15. }
  16. return sum;
  17. }
  18. // ForkJoin 递归计算
  19. // 任务拆分
  20. Long mid = (start + end) / 2; // 中间值
  21. // 任务1
  22. ForkJoinDemo task1 = new ForkJoinDemo(start, mid);
  23. task1.fork(); // 把任务压入线程队列中
  24. // 任务2
  25. ForkJoinDemo task2 = new ForkJoinDemo(mid+1, end);
  26. task2.fork(); // 把任务压入线程队列中
  27. // 结果合并
  28. return task1.join() + task2.join();
  29. }

}

  1. - 测试类代码
  2. ```java
  3. package stream;
  4. import java.util.concurrent.ExecutionException;
  5. import java.util.concurrent.ForkJoinPool;
  6. import java.util.concurrent.ForkJoinTask;
  7. import java.util.stream.LongStream;
  8. public class TestForkJoin {
  9. public static void main(String[] args) throws ExecutionException, InterruptedException {
  10. //test1(); // 执行时间8122
  11. test2(); // 执行时间7944
  12. //test3(); // 执行时间499
  13. }
  14. public static void test1() {
  15. Long sum =0L;
  16. long start = System.currentTimeMillis();
  17. for (Long i = 1L; i <= 10_0000_0000L; ++i)
  18. sum += i;
  19. long end = System.currentTimeMillis();
  20. System.out.println("sum="+ sum +" 执行时间"+(end - start)); // 8276
  21. }
  22. // ForkJoin
  23. public static void test2() throws ExecutionException, InterruptedException {
  24. long start = System.currentTimeMillis();
  25. ForkJoinPool forkJoinPool = new ForkJoinPool();
  26. ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L);
  27. ForkJoinTask<Long> submit = forkJoinPool.submit(task);
  28. Long sum = submit.get();
  29. long end = System.currentTimeMillis();
  30. System.out.println("sum="+ sum +" 执行时间"+(end - start)); // 130
  31. }
  32. // Stram并行流
  33. public static void test3() {
  34. long start = System.currentTimeMillis();
  35. long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
  36. long end = System.currentTimeMillis();
  37. System.out.println("sum="+ sum +" 执行时间"+(end - start)); // 483
  38. }
  39. }

异步回调

使用Future的实现类CompletableFuture

  1. import java.util.concurrent.CompletableFuture;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * 异步调用: CompletableFuture
  6. * 异步执行
  7. * 成功回调
  8. * 失败回调
  9. */
  10. public class AsynDemo {
  11. public static void main(String[] args) throws Exception {
  12. // // 异步执行,没有返回值
  13. // CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(()->{
  14. // try {
  15. // TimeUnit.SECONDS.sleep(1);
  16. // } catch (InterruptedException e) {
  17. // e.printStackTrace();
  18. // }
  19. //
  20. // System.out.println(Thread.currentThread().getName()+"runAsync => Void");
  21. // });
  22. //
  23. // // 不管异步线程,继续执行Main线程
  24. // System.out.println("111");
  25. //
  26. // voidCompletableFuture.get(); // 获取异步执行结果
  27. // 有返回值调
  28. CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
  29. try {
  30. TimeUnit.SECONDS.sleep(1);
  31. System.out.println(Thread.currentThread().getName()+ "supplyAsync=>Integer");
  32. //int i = 1/0;
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. return 1024;
  37. });
  38. System.out.println("111");
  39. System.out.println(completableFuture.whenComplete((t, u) -> {
  40. System.out.println("t=> " + t); // 第一个参数:成功 异步回调调返回值, 失败 null
  41. System.out.println("u=> " + u); // 第二个参数:成功nul,失败 错误返回值
  42. }).exceptionally((e) -> {
  43. System.out.println(e.getMessage());
  44. return 233;
  45. }).get()); // 成功:1024,失败:233
  46. }
  47. }

JMM

彻底明白单例模式