在上一讲使用异步方式提高了高并发场景的性能,但是在异步问题中还有一个致命的问题 —— OOM

一、问题成因

两个处理步骤速度不一样导致的,处理慢的那个步骤任务就会在队列中堆积,直到内存耗尽 OOM。
image.png

二、解决思路

对上游输出给下游的速度做流量控制。
**
如何进行流量控制?

  • 严格控制上游发送速度
    • 可行但低效,下游的处理速度会变化,但无法实时的调整上游的发送速度
  • 反向压力
    • 优雅且高效,可实时调整上游发送速度

三、反向压力

反压原理

在反向压力的方案中,上游能够根据下游的处理能力,动态地调整输出速度。
image.png
当下游的消息订阅者,从上游的消息发布者接收消息前,会先通知消息发布者自己能够接收多少消息。然后消息发布者就按照这个数量,向下游的消息订阅者发送消息。

实现反压

要实现反向压力的功能,只需要从两个方面来进行控制:

  • 执行器的任务队列,它的容量必须是有限的。
  • 当执行器的任务队列已经满了时,就阻止上游继续提交新的任务,直到任务队列,重新有新的空间可用为止。

image.png
代码实现:
具备反向压力能力的 ExecutorService 的具体实现

  1. private final List<ExecutorService> executors;
  2. private final Partitioner partitioner;
  3. private Long rejectSleepMills = 1L;
  4. public BackPressureExecutor(String name, int executorNumber, int coreSize, int maxSize, int capacity, long rejectSleepMills) {
  5. this.rejectSleepMills = rejectSleepMills;
  6. this.executors = new ArrayList<>(executorNumber);
  7. for (int i = 0; i < executorNumber; i++) {
  8. ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(capacity);
  9. this.executors.add(new ThreadPoolExecutor(
  10. coreSize, maxSize, 0L, TimeUnit.MILLISECONDS,
  11. queue,
  12. new ThreadFactoryBuilder().setNameFormat(name + "-" + i + "-%d").build(),
  13. new ThreadPoolExecutor.AbortPolicy()));
  14. }
  15. this.partitioner = new RoundRobinPartitionSelector(executorNumber);
  16. }
  17. @Override
  18. public void execute(Runnable command) {
  19. boolean rejected;
  20. do {
  21. try {
  22. rejected = false;
  23. executors.get(partitioner.getPartition()).execute(command);
  24. } catch (RejectedExecutionException e) {
  25. rejected = true;
  26. try {
  27. TimeUnit.MILLISECONDS.sleep(rejectSleepMills);
  28. } catch (InterruptedException e1) {
  29. logger.warn("Reject sleep has been interrupted.", e1);
  30. }
  31. }
  32. } while (rejected);
  33. }

BackPressureExecutor 类在初始化时,新建一个或多个 ThreadPoolExecutor 对象,作为执行任务的线程池。这里面的关键点有两个。

  • 第一个是,在创建 ThreadPoolExecutor 对象时,采用 ArrayBlockingQueue
    • 这是一个容量有限的阻塞队列。因此,当任务队列已经满了时,就会停止继续往队列里添加新的任务,从而避免内存无限大,造成 OOM 问题。
  • 第二个是,将 ThreadPoolExecutor 拒绝任务时,采用的策略设置为 AbortPolicy
    • 这就意味着,在任务队列已经满了的时候,如果再向任务队列提交任务,就会抛出 RejectedExecutionException 异常。
    • 之后,再通过一个 while 循环,在循环体内,捕获 RejectedExecutionException 异常,并不断尝试,重新提交任务,直到成功为止。

这样,经过上面的改造,当下游的步骤执行较慢时,它的任务队列就会占满。这个时候,如果上游继续往下游提交任务,它就会不停重试。这样,自然而然地降低了上游步骤的处理速度,从而起到了流量控制的作用。

【疑问】虽然解决了下游输入队列 OOM 的问题,但上游任务的队列不就堆积越来越多了么?同时这种情况传递下去,最源头的那个任务还是会 OOM 的吧?