1. 阻塞队列

image.png

1.1 类图

截屏2021-07-10 下午5.23.13.png

1.2 7个实现类

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列
    • 有界,但是界限非常大,相当于无界,可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
    • 生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

    1.3 阻塞队列的方法

    image.png
    image.png

    1.4 ArrayBlockQueue

    1.5 应用-消费者生产问题

  • 传统版本的生产者和消费者问题(不使用阻塞队列)

生产一个,消费一个。

  1. public class ProductConsumerTradition {
  2. public static void main(String[] args) {
  3. ShareData shareData = new ShareData();
  4. new Thread(()->{
  5. for (int i = 0; i < 5; i++) {
  6. try {
  7. shareData.increment();
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. }, "A").start();
  13. new Thread(()->{
  14. for (int i = 0; i < 5; i++) {
  15. try {
  16. shareData.decrement();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }, "B").start();
  22. }
  23. }
  24. // 资源类
  25. class ShareData{
  26. private int number = 0;
  27. private final Lock lock = new ReentrantLock();
  28. private Condition condition = lock.newCondition();
  29. public void increment() throws Exception{
  30. lock.lock();
  31. try {
  32. // 判断,防止虚假唤醒,用while
  33. while (number > 0){
  34. // 等待,不能生产
  35. condition.await();
  36. }
  37. // 生产者生产
  38. number ++;
  39. System.out.println(Thread.currentThread().getName() + "生产了。" + number);
  40. // 唤醒消费者
  41. condition.signalAll();
  42. }finally {
  43. lock.unlock();
  44. }
  45. }
  46. public void decrement() throws Exception{
  47. lock.lock();
  48. try {
  49. // 判断,防止虚假唤醒,用while
  50. while (number == 0){
  51. // 等待,不能消费
  52. condition.await();
  53. }
  54. // 消费者消费
  55. number --;
  56. System.out.println(Thread.currentThread().getName() + "消费了。" + number);
  57. // 唤醒生产者
  58. condition.signalAll();
  59. }finally {
  60. lock.unlock();
  61. }
  62. }

截屏2021-07-10 下午6.43.55.png

2. 线程池

2.1 线程池概念

池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。特点:线程复用、控制最大并发数、管理线程
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    2.2 自定义线程池

    image.png
    上图就是一个线程池的实现,先初始化线程池、阻塞队列大小,然后开几个线程通过线程池对象调用方法执行任务,线程池中的线程会执行任务,如果任务过多,会添加到阻塞队列中,执行完任务再从阻塞队列中取值继续执行。当执行的线程数大于线程池和阻塞队列的大小,我们可以定义拒绝策略,类似 jdk 线程池那样。 ```java @Slf4j(topic = “c.TestPool”) public class TestPool {

    public static void main(String[] args) { ThreadPool pool=new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10); for (int i = 0; i < 5; i++) {

    1. int j = i;
    2. pool.execute(()->{
    3. log.debug("{}", j);
    4. });

    } } }

@Slf4j(topic = “ThreadPool”) class ThreadPool { // 任务队列 private BlockingQueue taskQueue;

  1. // 线程集合
  2. private HashSet<Worker> workers = new HashSet<>();
  3. // 核心线程数
  4. private int coreSize;
  5. // 任务超时时间
  6. private long timeOut;
  7. private TimeUnit timeUnit;
  8. public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity) {
  9. this.coreSize = coreSize;
  10. this.timeOut = timeOut;
  11. this.timeUnit = timeUnit;
  12. this.taskQueue = new BlockingQueue<>(queueCapacity);
  13. }
  14. // 执行任务
  15. public void execute(Runnable task){
  16. // 当任务数没有超过coreSize时,直接交给worker对象执行
  17. // 如果任务数超过coreSize,加入任务队列暂存
  18. synchronized (workers){
  19. if (workers.size()< coreSize){
  20. Worker worker = new Worker(task);
  21. log.debug("新增worker{},{}", worker,task);
  22. workers.add(worker);
  23. worker.start();
  24. } else {
  25. log.debug("加入任务队列{}", task);
  26. taskQueue.put(task);
  27. }
  28. }
  29. }
  30. class Worker extends Thread{
  31. private Runnable task;
  32. public Worker(Runnable task) {
  33. this.task = task;
  34. }
  35. @Override
  36. public void run() {
  37. // 执行任务
  38. // 1 当task不为空,执行任务
  39. // 2 task执行完毕 任务队列中获取任务并执行
  40. // 无超时
  41. //while (task != null || (task = taskQueue.take()) != null){
  42. // 有超时
  43. while (task != null || (task = taskQueue.poll(timeOut, timeUnit))!= null){
  44. try {
  45. log.debug("正在执行{}", task);
  46. task.run();
  47. }catch (Exception e){
  48. e.printStackTrace();
  49. }finally {
  50. task = null;
  51. }
  52. }
  53. synchronized (workers){
  54. log.debug("worker被移除{}", this);
  55. workers.remove(this);
  56. }
  57. }
  58. }

}

class BlockingQueue {

  1. // 任务队列
  2. private Deque<T> queue = new ArrayDeque<>();
  3. // 锁
  4. private ReentrantLock lock = new ReentrantLock();
  5. // 生产者条件变量
  6. private Condition fullWaitSet = lock.newCondition();
  7. // 消费者条件变量
  8. private Condition emptyWaitSet = lock.newCondition();
  9. // 容量
  10. private int capacity;
  11. public BlockingQueue(int capacity) {
  12. this.capacity = capacity;
  13. }
  14. // 带超时的阻塞获取
  15. public T poll(long timeout, TimeUnit unit) {
  16. lock.lock();
  17. try {
  18. // 将超时时间同意转换为 纳秒
  19. long nanos = unit.toNanos(timeout);
  20. while (queue.isEmpty()) {
  21. try {
  22. if (nanos <= 0) {
  23. return null;
  24. }
  25. //awaitNanos方法返回的就是剩余时间
  26. nanos = emptyWaitSet.awaitNanos(nanos);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. T t = queue.removeFirst();
  32. fullWaitSet.signal();
  33. return t;
  34. } finally {
  35. lock.unlock();
  36. }
  37. }
  38. // 阻塞获取
  39. public T take() {
  40. lock.lock();
  41. try {
  42. while (queue.isEmpty()) {
  43. try {
  44. emptyWaitSet.await();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. T t = queue.removeFirst();
  50. fullWaitSet.signal();
  51. return t;
  52. } finally {
  53. lock.unlock();
  54. }
  55. }
  56. // 阻塞添加
  57. public void put(T e) {
  58. lock.lock();
  59. try {
  60. while (queue.size() == capacity) {
  61. try {
  62. fullWaitSet.await();
  63. } catch (InterruptedException interruptedException) {
  64. interruptedException.printStackTrace();
  65. }
  66. }
  67. queue.addLast(e);
  68. emptyWaitSet.signal();
  69. } finally {
  70. lock.unlock();
  71. }
  72. }
  73. // 获取大小
  74. public int getSize() {
  75. lock.lock();
  76. try {
  77. return queue.size();
  78. } finally {
  79. lock.unlock();
  80. }
  81. }
  1. 无超时等待:(可以一直等下去)
  2. ```java
  3. 14:52:01.161 [main] DEBUG ThreadPool - 新增workerThread[Thread-0,5,main],com.ll.ch7.TestPool$$Lambda$1/431687835@2ff4f00f
  4. 14:52:01.167 [main] DEBUG ThreadPool - 新增workerThread[Thread-1,5,main],com.ll.ch7.TestPool$$Lambda$1/431687835@1b0375b3
  5. 14:52:01.167 [Thread-0] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@2ff4f00f
  6. 14:52:01.168 [Thread-0] DEBUG c.TestPool - 0
  7. 14:52:01.168 [main] DEBUG ThreadPool - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@2d209079
  8. 14:52:01.168 [Thread-1] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@1b0375b3
  9. 14:52:01.168 [Thread-1] DEBUG c.TestPool - 1
  10. 14:52:01.168 [Thread-1] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@2d209079
  11. 14:52:01.168 [Thread-1] DEBUG c.TestPool - 2
  12. 14:52:01.168 [main] DEBUG ThreadPool - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@6bdf28bb
  13. 14:52:01.168 [main] DEBUG ThreadPool - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@6b71769e
  14. 14:52:01.169 [Thread-0] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@6bdf28bb
  15. 14:52:01.169 [Thread-1] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@6b71769e
  16. 14:52:01.169 [Thread-0] DEBUG c.TestPool - 3
  17. 14:52:01.169 [Thread-1] DEBUG c.TestPool - 4

有超时等待:

  1. 14:56:28.720 [main] DEBUG ThreadPool - 新增workerThread[Thread-0,5,main],com.ll.ch7.TestPool$$Lambda$1/431687835@2ff4f00f
  2. 14:56:28.725 [main] DEBUG ThreadPool - 新增workerThread[Thread-1,5,main],com.ll.ch7.TestPool$$Lambda$1/431687835@1b0375b3
  3. 14:56:28.725 [Thread-0] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@2ff4f00f
  4. 14:56:28.725 [main] DEBUG ThreadPool - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@2d209079
  5. 14:56:28.725 [Thread-0] DEBUG c.TestPool - 0
  6. 14:56:28.725 [Thread-1] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@1b0375b3
  7. 14:56:28.725 [main] DEBUG ThreadPool - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@6bdf28bb
  8. 14:56:28.725 [Thread-0] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@2d209079
  9. 14:56:28.725 [Thread-1] DEBUG c.TestPool - 1
  10. 14:56:28.726 [main] DEBUG ThreadPool - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@6b71769e
  11. 14:56:28.726 [Thread-1] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@6bdf28bb
  12. 14:56:28.726 [Thread-1] DEBUG c.TestPool - 3
  13. 14:56:28.726 [Thread-0] DEBUG c.TestPool - 2
  14. 14:56:28.726 [Thread-1] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@6b71769e
  15. 14:56:28.726 [Thread-1] DEBUG c.TestPool - 4
  16. 14:56:29.731 [Thread-0] DEBUG ThreadPool - worker被移除Thread[Thread-0,5,main]
  17. 14:56:29.731 [Thread-1] DEBUG ThreadPool - worker被移除Thread[Thread-1,5,main]
  • 当任务队列满时 ```java @Slf4j(topic = “c.TestPool”) public class TestPool {

    public static void main(String[] args) {

    1. ThreadPool pool=new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
    2. for (int i = 0; i < 15; i++) {
    3. int j = i;
    4. pool.execute(()->{
    5. try {
    6. Thread.sleep(100000);
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. log.debug("{}", j);
    11. });
    12. }

    } }

@Slf4j(topic = “ThreadPool”) class ThreadPool { // 任务队列 private BlockingQueue taskQueue;

  1. // 线程集合
  2. private HashSet<Worker> workers = new HashSet<>();
  3. // 核心线程数
  4. private int coreSize;
  5. // 任务超时时间
  6. private long timeOut;
  7. private TimeUnit timeUnit;
  8. public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity) {
  9. this.coreSize = coreSize;
  10. this.timeOut = timeOut;
  11. this.timeUnit = timeUnit;
  12. this.taskQueue = new BlockingQueue<>(queueCapacity);
  13. }
  14. // 执行任务
  15. public void execute(Runnable task){
  16. // 当任务数没有超过coreSize时,直接交给worker对象执行
  17. // 如果任务数超过coreSize,加入任务队列暂存
  18. synchronized (workers){
  19. if (workers.size()< coreSize){
  20. Worker worker = new Worker(task);
  21. log.debug("新增worker{},{}", worker,task);
  22. workers.add(worker);
  23. worker.start();
  24. } else {
  25. taskQueue.put(task);
  26. }
  27. }
  28. }
  29. class Worker extends Thread{
  30. private Runnable task;
  31. public Worker(Runnable task) {
  32. this.task = task;
  33. }
  34. @Override
  35. public void run() {
  36. // 执行任务
  37. // 1 当task不为空,执行任务
  38. // 2 task执行完毕 任务队列中获取任务并执行
  39. // 无超时
  40. //while (task != null || (task = taskQueue.take()) != null){
  41. // 有超时
  42. while (task != null || (task = taskQueue.poll(timeOut, timeUnit))!= null){
  43. try {
  44. log.debug("正在执行{}", task);
  45. task.run();
  46. }catch (Exception e){
  47. e.printStackTrace();
  48. }finally {
  49. task = null;
  50. }
  51. }
  52. synchronized (workers){
  53. log.debug("worker被移除{}", this);
  54. workers.remove(this);
  55. }
  56. }
  57. }

}

@Slf4j(topic = “c.BlockingQueue”) class BlockingQueue {

  1. // 任务队列
  2. private Deque<T> queue = new ArrayDeque<>();
  3. // 锁
  4. private ReentrantLock lock = new ReentrantLock();
  5. // 生产者条件变量
  6. private Condition fullWaitSet = lock.newCondition();
  7. // 消费者条件变量
  8. private Condition emptyWaitSet = lock.newCondition();
  9. // 容量
  10. private int capacity;
  11. public BlockingQueue(int capacity) {
  12. this.capacity = capacity;
  13. }
  14. // 带超时的阻塞获取
  15. public T poll(long timeout, TimeUnit unit) {
  16. lock.lock();
  17. try {
  18. // 将超时时间同意转换为 纳秒
  19. long nanos = unit.toNanos(timeout);
  20. while (queue.isEmpty()) {
  21. try {
  22. if (nanos <= 0) {
  23. return null;
  24. }
  25. //awaitNanos方法返回的就是剩余时间
  26. nanos = emptyWaitSet.awaitNanos(nanos);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. T t = queue.removeFirst();
  32. fullWaitSet.signal();
  33. return t;
  34. } finally {
  35. lock.unlock();
  36. }
  37. }
  38. // 阻塞获取
  39. public T take() {
  40. lock.lock();
  41. try {
  42. while (queue.isEmpty()) {
  43. try {
  44. emptyWaitSet.await();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. T t = queue.removeFirst();
  50. fullWaitSet.signal();
  51. return t;
  52. } finally {
  53. lock.unlock();
  54. }
  55. }
  56. // 阻塞添加
  57. public void put(T e) {
  58. lock.lock();
  59. try {
  60. while (queue.size() == capacity) {
  61. try {
  62. log.debug("等待加入任务队列:{}", e);
  63. fullWaitSet.await();
  64. } catch (InterruptedException interruptedException) {
  65. interruptedException.printStackTrace();
  66. }
  67. }
  68. log.debug("加入任务队列{}", e);
  69. queue.addLast(e);
  70. emptyWaitSet.signal();
  71. } finally {
  72. lock.unlock();
  73. }
  74. }
  75. // 获取大小
  76. public int getSize() {
  77. lock.lock();
  78. try {
  79. return queue.size();
  80. } finally {
  81. lock.unlock();
  82. }
  83. }

}

  1. ```java
  2. 14:59:24.108 [main] DEBUG ThreadPool - 新增workerThread[Thread-0,5,main],com.ll.ch7.TestPool$$Lambda$1/431687835@2ff4f00f
  3. 14:59:24.113 [main] DEBUG ThreadPool - 新增workerThread[Thread-1,5,main],com.ll.ch7.TestPool$$Lambda$1/431687835@1b0375b3
  4. 14:59:24.113 [Thread-0] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@2ff4f00f
  5. 14:59:24.113 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@2d209079
  6. 14:59:24.113 [Thread-1] DEBUG ThreadPool - 正在执行com.ll.ch7.TestPool$$Lambda$1/431687835@1b0375b3
  7. 14:59:24.113 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@6bdf28bb
  8. 14:59:24.113 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@6b71769e
  9. 14:59:24.113 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@2752f6e2
  10. 14:59:24.114 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@e580929
  11. 14:59:24.114 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@1cd072a9
  12. 14:59:24.114 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@7c75222b
  13. 14:59:24.114 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@4c203ea1
  14. 14:59:24.114 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@27f674d
  15. 14:59:24.114 [main] DEBUG c.BlockingQueue - 加入任务队列com.ll.ch7.TestPool$$Lambda$1/431687835@1d251891
  16. 14:59:24.114 [main] DEBUG c.BlockingQueue - 等待加入任务队列:com.ll.ch7.TestPool$$Lambda$1/431687835@48140564
  • 给主线程加入各种拒绝机制 ```java @Slf4j(topic = “c.TestPool”) public class TestPool {

    public static void main(String[] args) {

    1. ThreadPool pool=new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
    2. // 1.死等
    3. // queue.put(task);
    4. // 2 带超时等待
    5. // queue.offer(task, 500, TimeUnit.MILLISECONDS);
    6. // 3 放弃任务执行
    7. // log.debug("放弃任务{}", task);
    8. // 4 让调用者抛出异常
    9. // throw new RuntimeException("任务执行失败" + task);
    10. // 5 让调用者自己执行任务
    11. task.run();
    12. });
    13. for (int i = 0; i < 3; i++) {
    14. int j = i;
    15. pool.execute(()->{
    16. try {
    17. Thread.sleep(1000);
    18. } catch (InterruptedException e) {
    19. e.printStackTrace();
    20. }
    21. log.debug("{}", j);
    22. });
    23. }

    } }

// 策略模式 // 拒绝策略 @FunctionalInterface interface RejectPolicy{ void reject(BlockingQueue queue, T task); }

@Slf4j(topic = “ThreadPool”) class ThreadPool { // 任务队列 private BlockingQueue taskQueue;

  1. // 线程集合
  2. private HashSet<Worker> workers = new HashSet<>();
  3. // 核心线程数
  4. private int coreSize;
  5. // 任务超时时间
  6. private long timeOut;
  7. private TimeUnit timeUnit;
  8. // 拒绝策略
  9. private RejectPolicy<Runnable> rejectPolicy;
  10. public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
  11. this.coreSize = coreSize;
  12. this.timeOut = timeOut;
  13. this.timeUnit = timeUnit;
  14. this.rejectPolicy = rejectPolicy;
  15. this.taskQueue = new BlockingQueue<>(queueCapacity);
  16. }
  17. // 执行任务
  18. public void execute(Runnable task){
  19. // 当任务数没有超过coreSize时,直接交给worker对象执行
  20. // 如果任务数超过coreSize,加入任务队列暂存
  21. synchronized (workers){
  22. if (workers.size()< coreSize){
  23. Worker worker = new Worker(task);
  24. log.debug("新增worker{},{}", worker,task);
  25. workers.add(worker);
  26. worker.start();
  27. } else {
  28. //taskQueue.put(task);
  29. // 1 死等
  30. // 2 带超时等待
  31. // 3 放弃任务执行
  32. // 4 让调用者抛出异常
  33. // 5 让调用者自己执行任务
  34. taskQueue.trtPut(rejectPolicy, task);
  35. }
  36. }
  37. }
  38. class Worker extends Thread{
  39. private Runnable task;
  40. public Worker(Runnable task) {
  41. this.task = task;
  42. }
  43. @Override
  44. public void run() {
  45. // 执行任务
  46. // 1 当task不为空,执行任务
  47. // 2 task执行完毕 任务队列中获取任务并执行
  48. // 无超时
  49. //while (task != null || (task = taskQueue.take()) != null){
  50. // 有超时
  51. while (task != null || (task = taskQueue.poll(timeOut, timeUnit))!= null){
  52. try {
  53. log.debug("正在执行{}", task);
  54. task.run();
  55. }catch (Exception e){
  56. e.printStackTrace();
  57. }finally {
  58. task = null;
  59. }
  60. }
  61. synchronized (workers){
  62. log.debug("worker被移除{}", this);
  63. workers.remove(this);
  64. }
  65. }
  66. }

}

@Slf4j(topic = “c.BlockingQueue”) class BlockingQueue {

  1. // 任务队列
  2. private Deque<T> queue = new ArrayDeque<>();
  3. // 锁
  4. private ReentrantLock lock = new ReentrantLock();
  5. // 生产者条件变量
  6. private Condition fullWaitSet = lock.newCondition();
  7. // 消费者条件变量
  8. private Condition emptyWaitSet = lock.newCondition();
  9. // 容量
  10. private int capacity;
  11. public BlockingQueue(int capacity) {
  12. this.capacity = capacity;
  13. }
  14. // 带超时的阻塞获取
  15. public T poll(long timeout, TimeUnit unit) {
  16. lock.lock();
  17. try {
  18. // 将超时时间同意转换为 纳秒
  19. long nanos = unit.toNanos(timeout);
  20. while (queue.isEmpty()) {
  21. try {
  22. if (nanos <= 0) {
  23. return null;
  24. }
  25. //awaitNanos方法返回的就是剩余时间
  26. nanos = emptyWaitSet.awaitNanos(nanos);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. T t = queue.removeFirst();
  32. fullWaitSet.signal();
  33. return t;
  34. } finally {
  35. lock.unlock();
  36. }
  37. }
  38. // 阻塞获取
  39. public T take() {
  40. lock.lock();
  41. try {
  42. while (queue.isEmpty()) {
  43. try {
  44. emptyWaitSet.await();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. T t = queue.removeFirst();
  50. fullWaitSet.signal();
  51. return t;
  52. } finally {
  53. lock.unlock();
  54. }
  55. }
  56. // 阻塞添加
  57. public void put(T e) {
  58. lock.lock();
  59. try {
  60. while (queue.size() == capacity) {
  61. try {
  62. log.debug("等待加入任务队列:{}", e);
  63. fullWaitSet.await();
  64. } catch (InterruptedException interruptedException) {
  65. interruptedException.printStackTrace();
  66. }
  67. }
  68. log.debug("加入任务队列{}", e);
  69. queue.addLast(e);
  70. emptyWaitSet.signal();
  71. } finally {
  72. lock.unlock();
  73. }
  74. }
  75. // 带超时时间阻塞添加
  76. public boolean offer(T task, long timeOut, TimeUnit timeUnit){
  77. lock.lock();
  78. try {
  79. long nanos = timeUnit.toNanos(timeOut);
  80. while (queue.size() == capacity) {
  81. try {
  82. log.debug("等待加入任务队列:{}", task);
  83. if (nanos <=0){
  84. return false;
  85. }
  86. fullWaitSet.awaitNanos(nanos);
  87. } catch (InterruptedException interruptedException) {
  88. interruptedException.printStackTrace();
  89. }
  90. }
  91. log.debug("加入任务队列{}", task);
  92. queue.addLast(task);
  93. emptyWaitSet.signal();
  94. return true;
  95. } finally {
  96. lock.unlock();
  97. }
  98. }
  99. // 获取大小
  100. public int getSize() {
  101. lock.lock();
  102. try {
  103. return queue.size();
  104. } finally {
  105. lock.unlock();
  106. }
  107. }
  108. public void trtPut(RejectPolicy<T> rejectPolicy, T task) {
  109. lock.lock();
  110. try {
  111. // 判断队列是否已满
  112. if (queue.size() == capacity){
  113. rejectPolicy.reject(this, task);
  114. }else {
  115. // 有空闲
  116. log.debug("加入任务队列{}", task);
  117. queue.addLast(task);
  118. emptyWaitSet.signal();
  119. }
  120. }finally {
  121. lock.unlock();
  122. }
  123. }
  1. <a name="I2g66"></a>
  2. ## 2.3 ThreadPoolExecutor
  3. ![截屏2021-05-12 下午3.17.16.png](https://cdn.nlark.com/yuque/0/2021/png/12943861/1620803840674-71d50592-4994-4d63-aa2c-9e7b2616552a.png#clientId=u38eb6d6c-1811-4&from=drop&id=u3e8f3164&margin=%5Bobject%20Object%5D&name=%E6%88%AA%E5%B1%8F2021-05-12%20%E4%B8%8B%E5%8D%883.17.16.png&originHeight=304&originWidth=644&originalType=binary&ratio=1&size=83757&status=done&style=none&taskId=ue1de37c0-178e-4253-aa06-4757c67e605)
  4. <a name="dFYUr"></a>
  5. ### 2.3.1 线程池状态
  6. ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,<br />ThreadPoolExecutor 类中的线程状态变量如下:
  7. ```java
  8. // Integer.SIZE 值为 32
  9. private static final int COUNT_BITS = Integer.SIZE - 3;
  10. // runState is stored in the high-order bits
  11. private static final int RUNNING = -1 << COUNT_BITS;
  12. private static final int SHUTDOWN = 0 << COUNT_BITS;
  13. private static final int STOP = 1 << COUNT_BITS;
  14. private static final int TIDYING = 2 << COUNT_BITS;
  15. private static final int TERMINATED = 3 << COUNT_BITS;
状态名称 高3位的值 描述
RUNNING 111 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 不接受新任务,但是处理任务队列中的任务
STOP 001 中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING 010 任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED 011 终结状态

线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示,使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值

  1. // 原子整数,前 3 位保存了线程池的状态,剩余位保存的是线程数量
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. // 并不是所有平台的 int 都是 32 位。
  4. // 去掉前三位保存线程状态的位数,剩下的用于保存线程数量
  5. // 高3位为0,剩余位数全为1
  6. private static final int COUNT_BITS = Integer.SIZE - 3;
  7. // 2^COUNT_BITS次方,表示可以保存的最大线程数
  8. // CAPACITY 的高3位为 0
  9. private static final int CAPACITY = (1 << COUNT_BITS) - 1;

获取线程池状态、线程数量以及合并两个值的操作

  1. // Packing and unpacking ctl
  2. // 获取运行状态
  3. // 该操作会让除高3位以外的数全部变为0
  4. private static int runStateOf(int c) { return c & ~CAPACITY; }
  5. // 获取运行线程数
  6. // 该操作会让高3位为0
  7. private static int workerCountOf(int c) { return c & CAPACITY; }
  8. // 计算ctl新值
  9. private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池属性:

  1. // 工作线程,内部封装了Thread
  2. private final class Worker
  3. extends AbstractQueuedSynchronizer
  4. implements Runnable {
  5. ...
  6. }
  7. // 阻塞队列,用于存放来不及被核心线程执行的任务
  8. private final BlockingQueue<Runnable> workQueue;
  9. // 锁
  10. private final ReentrantLock mainLock = new ReentrantLock();
  11. // 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)
  12. private final HashSet<Worker> workers = new HashSet<Worker>();

2.3.2 构造方法

ThreadPoolExecutor 类参数最多、最全的有参构造方法。

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

2.3.3 七大参数

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数(≥1)
    • maximumPoolSize - corePoolSize = 救急(空闲)线程数
  • keepAliveTime:救急线程空闲时的最大生存时间
  • unit:时间单位
  • workQueue:阻塞队列(存放任务)
    • 有界阻塞队列 ArrayBlockingQueue
    • 无界阻塞队列 LinkedBlockingQueue
    • 最多只有一个同步元素的队列 SynchronousQueue
  • 优先队列 PriorityBlockingQueue
  • threadFactory:线程工厂(给线程取名字)
  • handler:拒绝策略。表示当队列满了并且工作线程大于线程池的最大线程数(maximumPoolSize)时,如何来拒绝请求执行的Runnable的策略。以下所有拒绝策略都实现了RejectedExecutionHandler接口
    • AbortPolicy:默认,直接抛出RejectedExcutionException异常,阻止系统正常运行
    • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果运行任务丢失,这是一种好方案
    • CallerRunsPolicy:该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者
    • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

2.3.4 三种线程池

  • newFixedThreadPool:执行长期任务,性能好
  • newCachedThreadPool:执行很多短期异步的小程序或者负载较轻的服务器
  • newSingleThreadExecutor:一个任务一个任务执行的场景

image.png

2.3.5 底层原理

image.png

  1. 在创建了线程池后,等待提交过来的任务请求
  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断
    1. 如果正在运行的线程池数量小于corePoolSize,那么马上创建线程运行这个任务
    2. 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列
    3. 如果这时候队列满了,并且正在运行的线程数量还小于maximumPoolSize,那么还是创建非核心线程来运行这个任务;
    4. 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
  4. 当一个线程无事可做操作一定的时间(keepAliveTime)时,线程池会判断:
    1. 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉
    2. 所以线程池的所有任务完成后,它会最终收缩到corePoolSize的大小

      2.3.6 使用策略

      提供的一个都不用。使用自定义的。
  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
    • 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题,如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题
  • 线程池不允许使用Executors去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

    • Executors返回的线程池对象弊端如下:
      • FixedThreadPool和SingleThreadPool:
        • 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
      • CacheThreadPool和ScheduledThreadPool
        • 允许创建的线程数量为:Integer.MAX_VALUE,可能会创建大量的线程,从而大导致oom

          参数如何配置

  • CPU密集型(需要大量运算,没有阻塞)一般公式:CPU核数 + 1个线程数

  • IO密集型
    • 由于IO密集型任务线程并不是一直在执行任务,则可能多的线程,如 CPU核数 * 2
    • 在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力花费在等待上。所以IO密集型任务中使用多线程可以大大的加速程序的运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。IO密集时,大部分线程都被阻塞,故需要多配置线程数:CPU核数 / (1 - 阻塞系数) 阻塞系数在0.8 ~ 0.9左右