在上一讲使用异步方式提高了高并发场景的性能,但是在异步问题中还有一个致命的问题 —— OOM。
一、问题成因
两个处理步骤速度不一样导致的,处理慢的那个步骤任务就会在队列中堆积,直到内存耗尽 OOM。
二、解决思路
对上游输出给下游的速度做流量控制。
**
如何进行流量控制?
- 严格控制上游发送速度
- 可行但低效,下游的处理速度会变化,但无法实时的调整上游的发送速度
- 反向压力
- 优雅且高效,可实时调整上游发送速度
三、反向压力
反压原理
在反向压力的方案中,上游能够根据下游的处理能力,动态地调整输出速度。
当下游的消息订阅者,从上游的消息发布者接收消息前,会先通知消息发布者自己能够接收多少消息。然后消息发布者就按照这个数量,向下游的消息订阅者发送消息。
实现反压
要实现反向压力的功能,只需要从两个方面来进行控制:
- 执行器的任务队列,它的容量必须是有限的。
- 当执行器的任务队列已经满了时,就阻止上游继续提交新的任务,直到任务队列,重新有新的空间可用为止。
代码实现:
具备反向压力能力的 ExecutorService 的具体实现
private final List<ExecutorService> executors;
private final Partitioner partitioner;
private Long rejectSleepMills = 1L;
public BackPressureExecutor(String name, int executorNumber, int coreSize, int maxSize, int capacity, long rejectSleepMills) {
this.rejectSleepMills = rejectSleepMills;
this.executors = new ArrayList<>(executorNumber);
for (int i = 0; i < executorNumber; i++) {
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(capacity);
this.executors.add(new ThreadPoolExecutor(
coreSize, maxSize, 0L, TimeUnit.MILLISECONDS,
queue,
new ThreadFactoryBuilder().setNameFormat(name + "-" + i + "-%d").build(),
new ThreadPoolExecutor.AbortPolicy()));
}
this.partitioner = new RoundRobinPartitionSelector(executorNumber);
}
@Override
public void execute(Runnable command) {
boolean rejected;
do {
try {
rejected = false;
executors.get(partitioner.getPartition()).execute(command);
} catch (RejectedExecutionException e) {
rejected = true;
try {
TimeUnit.MILLISECONDS.sleep(rejectSleepMills);
} catch (InterruptedException e1) {
logger.warn("Reject sleep has been interrupted.", e1);
}
}
} while (rejected);
}
BackPressureExecutor 类在初始化时,新建一个或多个 ThreadPoolExecutor 对象,作为执行任务的线程池。这里面的关键点有两个。
- 第一个是,在创建 ThreadPoolExecutor 对象时,采用 ArrayBlockingQueue
- 这是一个容量有限的阻塞队列。因此,当任务队列已经满了时,就会停止继续往队列里添加新的任务,从而避免内存无限大,造成 OOM 问题。
- 这是一个容量有限的阻塞队列。因此,当任务队列已经满了时,就会停止继续往队列里添加新的任务,从而避免内存无限大,造成 OOM 问题。
- 第二个是,将 ThreadPoolExecutor 拒绝任务时,采用的策略设置为 AbortPolicy
- 这就意味着,在任务队列已经满了的时候,如果再向任务队列提交任务,就会抛出 RejectedExecutionException 异常。
- 之后,再通过一个 while 循环,在循环体内,捕获 RejectedExecutionException 异常,并不断尝试,重新提交任务,直到成功为止。
这样,经过上面的改造,当下游的步骤执行较慢时,它的任务队列就会占满。这个时候,如果上游继续往下游提交任务,它就会不停重试。这样,自然而然地降低了上游步骤的处理速度,从而起到了流量控制的作用。
【疑问】虽然解决了下游输入队列 OOM 的问题,但上游任务的队列不就堆积越来越多了么?同时这种情况传递下去,最源头的那个任务还是会 OOM 的吧?