线程池的继承体系

AbstractExecutorService.png

线程池各参数含义

image.png

corePoolSize&maxPoolSize

corePoolSize:即核心线程的个数。核心线程其实就是预先分配的线程,当线程池创建,就实现分配指定数量的核心线程,一旦有任务提交过来,核心线程立即就能接管开始执行。它的意义在于减少了创建线程的开销,做到了“任务一来线程就可用,无需等待”。
maximumPoolSize:即线程池最大的线程数量。它的作用在于,当核心线程数被任务占满,且其中的任务缓冲队列也满了的时候,让新提交的任务仍旧可以执行,不至于立马就被拒绝。保证线程池可以通过创建新工作线程的方式来接受并执行任务。这是一种“弹性设计”,可以让线程池在应对突发流量时稳稳当当。例如每秒执行10个以内的任务的低并发场景中,是没啥大问题的。然而,如果每个月初月末流量突增,达到每秒100以上的并发时。因为有maxPoolSize就可以抵挡住这个压力
image.png

keepAliveTime

超过核心线程池(线程总数小于等于maximumPoolSize)并且处于空闲状态的那些线程的存活时间。这部分线程给他们分配了一个keepAliveTime参数,意义在于,当没有可处理的任务时,先等会儿,看有没有任务来了,如果有就去执行任务,否则就等达到了keepAliveTime之后,对工作线程进行回收。

workQueue

用于保存提交的待执行状态的任务的阻塞队列。队列种类有很多种,包括基于数据的有界队列ArrayBlockingQueue、基于链表的无界队列LinkedBlockingQueue、有且只有一个元素的同步队列SynchronousQueue以及优先级队列PriorityBlockingQueue等等。

RejectHandler

当任务队列满了,并且工作线程数量也达到了最大线程数,且都在处理任务时,如果继续往线程池提交任务,那么就会触发拒绝策略。
image.png
image.png

4种拒绝策略

AbortPolicy

这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略。

DiscardPolicy

当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失.

DiscardOldestPolicy

如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险。

CallerRunsPolicy

当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。

6种线程池

FixedThreadPool

是固定线程数的线程池,这个线程池种corePoolSize和maxPoolSize是一样的。当任务数超过核心数时,就将任务放入队列中阻塞等待。

CachedThreadPool

这个线程池的线程数可以无限增大(Integer.MAX_VALUE,为2^31-1),当线程闲置时可以对线程进行回收(60秒)。这个线程池存在一个阻塞队列为SynchronousQueue并且这个队列的容量是0,实际不存储任务唯一的作用是传递任务。

ScheduledThreadPool

支持定时或周期性的执行任务

  1. ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
  2. //10秒后执行一次任务,执行完后结束
  3. service.schedule(new Task(), 10, TimeUnit.SECONDS);
  4. //第二个参数 initialDelay 表示第一次延时时间,第三个参数 period 表示周期,
  5. //也就是第一次延时后每次延时多长时间执行一次任务。
  6. //以任务开始的时间为时间起点开始计时
  7. service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
  8. //和方法二一样也是周期性执行任务
  9. //以任务结束的时间为下一次循环的时间起点开始计时。
  10. service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);

SingleThreadExecutor

这个线程池中线程只有一个,当线程异常的时候会重新创建一个线程去执行后面的任务,并且所有任务按顺序执行。适合所有任务需要被顺序提交的场景。前面几种线程池都不可以保证任务的顺序执行,因为它们是多线程并行执行的。

SingleThreadScheduledExecutor

支持定时或周期性的执行任务,唯一的不同是它里面的线程数是1个,相当于是第五种和第四种线程池的结合

ForkJoinPool

在这个线程池中,存在一个workqueue同时每个队列还存在一个属于自己的dequeue(双端队列),当自身线程分裂出子线程时就会将子线程放入dequeue中(以LIFO方式放入)。
还有一个特点就是当其他线程空闲的时候会窃取别的线程中的任务进行执行(以FIFT),使用“work-stealing”算法和双端队列混好的平衡了各线程之前的负载。
image.png

阻塞队列

image.png

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    BlockingQueue

    BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。
    image.png

    Guarded Suspension

    “保护性暂挂模式”
    Guarded Suspension模式的“等待-通知”机制是一种非常普遍的线程间协作的方式。我们在平时工作中经常看到有人使用“轮询while(true)”的方式来等待某个状态,其实都可以用这个“等待-通知”机制来优化。 ```java public class GuardedQueue { private final Queue sourceList;

    public GuardedQueue() {

    1. this.sourceList = new LinkedBlockingQueue<>();

    }

    public synchronized Integer get() {

      while (sourceList.isEmpty()) {
          try {
              wait();    // <--- 如果队列为null,等待            
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
      return sourceList.peek();
    

    }

    public synchronized void put(Integer e) {

      sourceList.add(e);
      notifyAll();  //<--- 通知,继续执行    }
    

    }

public class App { public static void main(String[] args) { GuardedQueue guardedQueue = new GuardedQueue(); ExecutorService executorService = Executors.newFixedThreadPool(3); executorService.execute(() -> { guardedQueue.get(); } ); Thread.sleep(2000); executorService.execute(() -> { guardedQueue.put(20); } ); executorService.shutdown(); executorService.awaitTermination(30, TimeUnit.SECONDS); } }

<a name="BJh7G"></a>
### BlockingQueue之Guarded Suspension
接口中主要存在以下几个方法<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/2317535/1628564880351-4d30ff2b-5803-4941-9fcc-a4235a5f4d51.png#clientId=udfd12564-f1b1-4&from=paste&height=269&id=P5Zcr&margin=%5Bobject%20Object%5D&name=image.png&originHeight=488&originWidth=1153&originalType=binary&ratio=1&size=128311&status=done&style=none&taskId=ube3dbe1a-e6a6-4167-ac65-7166909ca64&width=636.4931030273438)<br />以ArrayBlockingQueue做一下分析
<a name="NKbqw"></a>
#### 非阻塞式方法
插入 boolean offer(E e) :可以看到非阻塞式没有涉及“保护性暂挂模式”,不涉及线程等待没有什么特点
```java
 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

阻塞式方法(就是一个典型的“保护性暂挂模式”)

image.png
通过这两个条件去唤醒线程执行take或者put

put(E e):
  • 队列已满,那么新到来的执行put操作的线程将被添加到notFull的条件队列中等待;
  • 队列未满,当别的线程执行了take并执行成功了,就会执行dequeue()方法中的notFull.signal();,也就会将当前put阻塞的线程唤醒 ```java public void put(E e) throws InterruptedException { //检查元素是否为空
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
    
    //该方法意思当别的线程没有持有锁立马返回一个锁 与lock()普通获取锁有差别
      lock.lockInterruptibly();
      try {
          //队列满了,将当前调用线程挂起,添加到notFull条件队列中等待唤醒
          while (count == items.length)
              notFull.await();
          //队列没满就执行入队操作
          enqueue(e);
      } finally {
          lock.unlock();
      }
    
    } //enqueue会唤醒另一个notEmpty线程队列 private void enqueue(E x) {
      // assert lock.getHoldCount() == 1;
      // assert items[putIndex] == null;
      final Object[] items = this.items;
      items[putIndex] = x;
      if (++putIndex == items.length)
          putIndex = 0;
      count++;
      notEmpty.signal();
    
    }
![image.png](https://cdn.nlark.com/yuque/0/2021/png/2317535/1629336789117-7fd78007-a058-44dd-a345-825b586924a9.png#clientId=u9ba749f4-04e5-4&from=paste&height=249&id=u772cc725&margin=%5Bobject%20Object%5D&name=image.png&originHeight=630&originWidth=1219&originalType=binary&ratio=1&size=92254&status=done&style=none&taskId=u37893a7c-ecc8-43d0-96c2-05231a012bd&width=482.49078369140625)
<a name="EBLc6"></a>
##### take():
阻塞式获取元素的方法<br />1.队列为空:以notEmpty的条件将当前线程阻塞住<br />2.队列不为空:当别的线程往里面添加内容了就会执行put()的enqueue方法,这个方法中就正好执行了notEmpty.signal();将当前take()线程唤醒
```java
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果队列没有元素就将以notEmpty为条件将当前线程阻塞掉
            while (count == 0)
                notEmpty.await();
            //如果有元素就执行出队操作
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //将被notFull条件阻塞的线程唤醒,是不是就和上面的put方法对应了
        notFull.signal();
        return x;
    }