本文首发于泊浮目的专栏:https://segmentfault.com/blog/camile

版本 日期 备注
1.0 2017.12.10 文章首发
1.1 2020.9.6 根据缺陷提出一些可参考的解决方案
1.2 2021.7.13 增加示意图
1.3 2021.8.10 更新改进方案

前言

在ZStack中,最基本的执行单位不仅仅是一个函数,也可以是一个任务(Task。其本质实现了Java的Callable接口)。通过大小合理的线程池调度来并行的消费这些任务,使ZStack这个Iaas软件有条不紊运行在大型的数据中心里。

对线程池不太了解的同学可以先看我的一篇博客:Java多线程笔记(三):线程池

ZStack源码剖析之核心库鉴赏——ThreadFacade - 图1

演示代码

在这里,将以ZStack中ThreadFacade最常用的方法为例进行演示。

syncSubmit

提交同步任务,线程将会等结果完成后才继续下一个任务。

这里先参考ZStack中ApiMediatorImpl ,其中有一段用于API消息调度的逻辑。

  1. @Override
  2. public void handleMessage(final Message msg) {
  3. thdf.syncSubmit(new SyncTask<Object>() {
  4. @Override
  5. public String getSyncSignature() {
  6. return "api.worker";
  7. }
  8. @Override
  9. public int getSyncLevel() {
  10. return apiWorkerNum;
  11. }
  12. @Override
  13. public String getName() {
  14. return "api.worker";
  15. }
  16. @MessageSafe
  17. public void handleMessage(Message msg) {
  18. if (msg instanceof APIIsReadyToGoMsg) {
  19. handle((APIIsReadyToGoMsg) msg);
  20. } else if (msg instanceof APIGetVersionMsg) {
  21. handle((APIGetVersionMsg) msg);
  22. } else if (msg instanceof APIGetCurrentTimeMsg) {
  23. handle((APIGetCurrentTimeMsg) msg);
  24. } else if (msg instanceof APIMessage) {
  25. dispatchMessage((APIMessage) msg);
  26. } else {
  27. logger.debug("Not an APIMessage.Message ID is " + msg.getId());
  28. }
  29. }
  30. @Override
  31. public Object call() throws Exception {
  32. handleMessage(msg);
  33. return null;
  34. }
  35. });
  36. }

每个API消息都会被一个线程消费,同时最大并发量为5(apiWorkerNum=5)。每个线程都会等着API消息的回复,等到回复后便给用户。

chainSubmit

提交异步任务,这里的任务执行后将会执行队列中的下一个任务,不会等待结果。

参考VmInstanceBase关于虚拟机启动、重启、暂停相关的代码:

  1. //暂停虚拟机
  2. protected void handle(final APIStopVmInstanceMsg msg) {
  3. thdf.chainSubmit(new ChainTask(msg) {
  4. @Override
  5. public String getName() {
  6. return String.format("stop-vm-%s", self.getUuid());
  7. }
  8. @Override
  9. public String getSyncSignature() {
  10. return syncThreadName;
  11. }
  12. @Override
  13. public void run(SyncTaskChain chain) {
  14. stopVm(msg, chain);
  15. }
  16. });
  17. }
  18. //重启虚拟机
  19. protected void handle(final APIRebootVmInstanceMsg msg) {
  20. thdf.chainSubmit(new ChainTask(msg) {
  21. @Override
  22. public String getName() {
  23. return String.format("reboot-vm-%s", self.getUuid());
  24. }
  25. @Override
  26. public String getSyncSignature() {
  27. return syncThreadName;
  28. }
  29. @Override
  30. public void run(SyncTaskChain chain) {
  31. rebootVm(msg, chain);
  32. }
  33. });
  34. }
  35. //启动虚拟机
  36. protected void handle(final APIStartVmInstanceMsg msg) {
  37. thdf.chainSubmit(new ChainTask(msg) {
  38. @Override
  39. public String getName() {
  40. return String.format("start-vm-%s", self.getUuid());
  41. }
  42. @Override
  43. public String getSyncSignature() {
  44. return syncThreadName;
  45. }
  46. @Override
  47. public void run(SyncTaskChain chain) {
  48. startVm(msg, chain);
  49. }
  50. });
  51. }

通用特性

getSyncSignature则指定了其队列的key,这个任务队列本质一个Map。根据相同的k,将任务作为v按照顺序放入map执行。单从这里的业务逻辑来看,可以有效避免虚拟机的状态混乱。

chainTask的默认并发度为1,这意味着它是同步的。在稍后的源码解析中我们将会看到。

它的实现

先从接口ThreadFacade了解一下方法签名:

  1. public interface ThreadFacade extends Component {
  2. <T> Future<T> submit(Task<T> task);//提交一个任务
  3. <T> Future<T> syncSubmit(SyncTask<T> task); //提交一个有返回值的任务
  4. Future<Void> chainSubmit(ChainTask task); //提交一个没有返回值的任务
  5. Future<Void> submitPeriodicTask(PeriodicTask task, long delay); //提交一个周期性任务,将在一定时间后执行
  6. Future<Void> submitPeriodicTask(PeriodicTask task); //提交一个周期性任务
  7. Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task); //提交一个可以取消的周期性任务
  8. Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task, long delay); //提交一个可以取消的周期性任务,将在一定时间后执行
  9. void registerHook(ThreadAroundHook hook); //注册钩子
  10. void unregisterHook(ThreadAroundHook hook); //取消钩子
  11. ThreadFacadeImpl.TimeoutTaskReceipt submitTimeoutTask(Runnable task, TimeUnit unit, long delay); //提交一个过了一定时间就算超时的任务
  12. void submitTimerTask(TimerTask task, TimeUnit unit, long delay); //提交一个timer任务
  13. }

以及几个方法逻辑实现类DispatchQueueImpl中的几个成员变量。

  1. private static final CLogger logger = Utils.getLogger(DispatchQueueImpl.class);
  2. @Autowired
  3. ThreadFacade _threadFacade;
  4. private final HashMap<String, SyncTaskQueueWrapper> syncTasks = new HashMap<String, SyncTaskQueueWrapper>();
  5. private final HashMap<String, ChainTaskQueueWrapper> chainTasks = new HashMap<String, ChainTaskQueueWrapper>();
  6. private static final CLogger _logger = CLoggerImpl.getLogger(DispatchQueueImpl.class);
  7. public static final String DUMP_TASK_DEBUG_SINGAL = "DumpTaskQueue";

关键就是syncTasks(同步队列)和chainTasks(异步队列) ,用于存储两种类型的任务队列。

因此当我们提交chainTask时,要注意记得显示的调用next方法,避免后面的任务调度不到。

接着,我们从最常用的几个方法开始看它的代码

chainSubmit方法

从ThreadFacadeImpl作为入口

  1. @Override
  2. public Future<Void> chainSubmit(ChainTask task) {
  3. return dpq.chainSubmit(task);
  4. }

DispatchQueue中的逻辑

  1. //公有方法,即入口之一
  2. @Override
  3. public Future<Void> chainSubmit(ChainTask task) {
  4. return doChainSyncSubmit(task);
  5. }
  1. //内部逻辑
  2. private <T> Future<T> doChainSyncSubmit(final ChainTask task) {
  3. assert task.getSyncSignature() != null : "How can you submit a chain task without sync signature ???";
  4. DebugUtils.Assert(task.getSyncLevel() >= 1, String.format("getSyncLevel() must return 1 at least "));
  5. synchronized (chainTasks) {
  6. final String signature = task.getSyncSignature();
  7. ChainTaskQueueWrapper wrapper = chainTasks.get(signature);
  8. if (wrapper == null) {
  9. wrapper = new ChainTaskQueueWrapper();
  10. chainTasks.put(signature, wrapper);
  11. }
  12. ChainFuture cf = new ChainFuture(task);
  13. wrapper.addTask(cf);
  14. wrapper.startThreadIfNeeded();
  15. return cf;
  16. }
  17. }

这段逻辑大致为:

  • 断言syncSignature不为空,并且必须并行度必须大于等于1。因为1会被做成队列,由一个线程完成这些任务。而1以上则指定了可以有几个线程来完成同一个signature的任务。
  • 加锁HashMap<String, ChainTaskQueueWrapper> chainTasks ,尝试取出相同signature的队列。如果没有则新建一个相关signature的队列,并初始化这个队列的线程数量和它的signature。无论如何,要将这个任务放置队列。
  • 接下来就是startThreadIfNeeded。所谓ifNeeded就是指给这个队列的线程数尚有空余。然后提交一个任务到线程池中,这个任务的内容是:从等待队列中取出一个Feture,如果等待队列为空,则删除这个等待队列的Map。
  1. private class ChainTaskQueueWrapper {
  2. LinkedList pendingQueue = new LinkedList();
  3. final LinkedList runningQueue = new LinkedList();
  4. AtomicInteger counter = new AtomicInteger(0);
  5. int maxThreadNum = -1;
  6. String syncSignature;
  7. void addTask(ChainFuture task) {
  8. pendingQueue.offer(task);
  9. if (maxThreadNum == -1) {
  10. maxThreadNum = task.getSyncLevel();
  11. }
  12. if (syncSignature == null) {
  13. syncSignature = task.getSyncSignature();
  14. }
  15. }
  16. void startThreadIfNeeded() {
  17. //如果运行线程数量已经大于等于限制,不start
  18. if (counter.get() >= maxThreadNum) {
  19. return;
  20. }
  21. counter.incrementAndGet();
  22. _threadFacade.submit(new Task<Void>() {
  23. @Override
  24. public String getName() {
  25. return "sync-chain-thread";
  26. }
  27. // start a new thread every time to avoid stack overflow
  28. @AsyncThread
  29. private void runQueue() {
  30. ChainFuture cf;
  31. synchronized (chainTasks) {
  32. // remove from pending queue and add to running queue later
  33. cf = (ChainFuture) pendingQueue.poll();
  34. if (cf == null) {
  35. if (counter.decrementAndGet() == 0) {
  36. //并且线程只有一个(跑完就没了),则将相关的signature队列移除,避免占用内存
  37. chainTasks.remove(syncSignature);
  38. }
  39. //如果为空,则没有任务,返回
  40. return;
  41. }
  42. }
  43. synchronized (runningQueue) {
  44. // add to running queue
  45. runningQueue.offer(cf);
  46. }
  47. //完成以后将任务挪出运行队列
  48. cf.run(new SyncTaskChain() {
  49. @Override
  50. public void next() {
  51. synchronized (runningQueue) {
  52. runningQueue.remove(cf);
  53. }
  54. runQueue();
  55. }
  56. });
  57. }
  58. //这个方法将会被线程池调用,作为入口
  59. @Override
  60. public Void call() throws Exception {
  61. runQueue();
  62. return null;
  63. }
  64. });
  65. }
  66. }

syncSubmit方法

syncSubmit的内部逻辑与我们之前分析的chainSubmit极为相似,只是放入了不同的队列中。

同样,也是从ThreadFacadeImpl作为入口

  1. @Override
  2. public <T> Future<T> syncSubmit(SyncTask<T> task) {
  3. return dpq.syncSubmit(task);
  4. }

然后是DispatchQueue中的实现

  1. @Override
  2. public <T> Future<T> syncSubmit(SyncTask<T> task) {
  3. if (task.getSyncLevel() <= 0) {
  4. return _threadFacade.submit(task);
  5. } else {
  6. return doSyncSubmit(task);
  7. }
  8. }

内部逻辑-私有方法

  1. private <T> Future<T> doSyncSubmit(final SyncTask<T> syncTask) {
  2. assert syncTask.getSyncSignature() != null : "How can you submit a sync task without sync signature ???";
  3. SyncTaskFuture f;
  4. synchronized (syncTasks) {
  5. SyncTaskQueueWrapper wrapper = syncTasks.get(syncTask.getSyncSignature());
  6. if (wrapper == null) {
  7. wrapper = new SyncTaskQueueWrapper();
  8. //放入syncTasks队列。
  9. syncTasks.put(syncTask.getSyncSignature(), wrapper);
  10. }
  11. f = new SyncTaskFuture(syncTask);
  12. wrapper.addTask(f);
  13. wrapper.startThreadIfNeeded();
  14. }
  15. return f;
  16. }

submitPeriodicTask

提交一个定时任务本质上是通过了线程池的scheduleAtFixedRate来实现。这个方法用于对任务进行周期性调度,任务调度的频率是一定的,它以上一个任务开始执行时间为起点,之后的period时间后调度下一次任务。如果任务的执行时间大于调度时间,那么任务就会在上一个任务结束后,立即被调用。

调用这个方法时将会把任务放入定时任务队列。当任务出现异常时,将会取消这个Futrue,并且挪出队列。

  1. public Future<Void> submitPeriodicTask(final PeriodicTask task, long delay) {
  2. assert task.getInterval() != 0;
  3. assert task.getTimeUnit() != null;
  4. ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
  5. public void run() {
  6. try {
  7. task.run();
  8. } catch (Throwable e) {
  9. _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
  10. final Map<PeriodicTask, ScheduledFuture<?>> periodicTasks = getPeriodicTasks();
  11. final ScheduledFuture<?> ft = periodicTasks.get(task);
  12. if (ft != null) {
  13. ft.cancel(true);
  14. periodicTasks.remove(task);
  15. } else {
  16. _logger.warn("Not found feature for task " + task.getName()
  17. + ", the exception happened too soon, will try to cancel the task next time the exception happens");
  18. }
  19. }
  20. }
  21. }, delay, task.getInterval(), task.getTimeUnit());
  22. _periodicTasks.put(task, ret);
  23. return ret;
  24. }

submitCancelablePeriodicTask

submitCancelablePeriodicTask则是会在执行时检测ScheduledFuture是否被要求cancel,如果有要求则取消。

  1. @Override
  2. public Future<Void> submitCancelablePeriodicTask(final CancelablePeriodicTask task, long delay) {
  3. ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
  4. private void cancelTask() {
  5. ScheduledFuture<?> ft = cancelablePeriodicTasks.get(task);
  6. if (ft != null) {
  7. ft.cancel(true);
  8. cancelablePeriodicTasks.remove(task);
  9. } else {
  10. _logger.warn("cannot find feature for task " + task.getName()
  11. + ", the exception happened too soon, will try to cancel the task next time the exception happens");
  12. }
  13. }
  14. public void run() {
  15. try {
  16. boolean cancel = task.run();
  17. if (cancel) {
  18. cancelTask();
  19. }
  20. } catch (Throwable e) {
  21. _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
  22. cancelTask();
  23. }
  24. }
  25. }, delay, task.getInterval(), task.getTimeUnit());
  26. cancelablePeriodicTasks.put(task, ret);
  27. return ret;
  28. }

初始化操作

不同与通常的ZStack组件,它虽然实现了Component接口。但是其start中的逻辑并不全面,初始化逻辑是基于spring bean的生命周期来做的。见ThreadFacade

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
  4. xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  7. http://www.springframework.org/schema/aop
  8. http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
  9. http://www.springframework.org/schema/tx
  10. http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
  11. http://zstack.org/schema/zstack
  12. http://zstack.org/schema/zstack/plugin.xsd"
  13. default-init-method="init" default-destroy-method="destroy">
  14. <bean id="ThreadFacade" class="org.zstack.core.thread.ThreadFacadeImpl">
  15. <property name="totalThreadNum" value="500" />
  16. <!-- don't declare Component extension, it's specially handled -->
  17. </bean>
  18. <bean id="ThreadAspectj" class="org.zstack.core.aspect.ThreadAspect" factory-method="aspectOf" />
  19. </beans>

再让回头看看ThreadFacadeImpl的init与destory操作。

  1. //init 操作
  2. public void init() {
  3. //根据全局配置读入线程池最大线程数量
  4. totalThreadNum = ThreadGlobalProperty.MAX_THREAD_NUM;
  5. if (totalThreadNum < 10) {
  6. _logger.warn(String.format("ThreadFacade.maxThreadNum is configured to %s, which is too small for running zstack. Change it to 10", ThreadGlobalProperty.MAX_THREAD_NUM));
  7. totalThreadNum = 10;
  8. }
  9. // 构建一个支持延时任务的线程池
  10. _pool = new ScheduledThreadPoolExecutorExt(totalThreadNum, this, this);
  11. _logger.debug(String.format("create ThreadFacade with max thread number:%s", totalThreadNum));
  12. //构建一个DispatchQueue
  13. dpq = new DispatchQueueImpl();
  14. jmxf.registerBean("ThreadFacade", this);
  15. }
  1. //destory
  2. public void destroy() {
  3. _pool.shutdownNow();
  4. }

看了这里可能大家会有疑问,这种关闭方式未免关于暴力(执行任务的线程会全部被中断)。在此之前,我们曾提到过,它实现了Component接口。这个接口分别有一个startstop方法,使一个组件的生命周期能够方便的在ZStack中注册相应的钩子。

  1. //stop 方法
  2. @Override
  3. public boolean stop() {
  4. _pool.shutdown();
  5. timerPool.stop();
  6. return true;
  7. }

线程工厂

ThreadFacadeImpl同时也实现了ThreadFactory,可以让线程在创建时做一些操作。

  1. @Override
  2. public Thread newThread(Runnable arg0) {
  3. return new Thread(arg0, "zs-thread-" + String.valueOf(seqNum.getAndIncrement()));
  4. }

在这里可以看到ZStack为每一个新的线程赋予了一个名字。

线程池

ZStack对JDK中的线程池进行了一定的扩展,对一个任务执行前后都有相应的钩子函数,同时也开放注册钩子。

  1. package org.zstack.core.thread;
  2. import org.apache.logging.log4j.ThreadContext;
  3. import org.zstack.utils.logging.CLogger;
  4. import org.zstack.utils.logging.CLoggerImpl;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.concurrent.RejectedExecutionHandler;
  8. import java.util.concurrent.ScheduledThreadPoolExecutor;
  9. import java.util.concurrent.ThreadFactory;
  10. public class ScheduledThreadPoolExecutorExt extends ScheduledThreadPoolExecutor {
  11. private static final CLogger _logger =CLoggerImpl.getLogger(ScheduledThreadPoolExecutorExt.class);
  12. List<ThreadAroundHook> _hooks = new ArrayList<ThreadAroundHook>(8);
  13. public ScheduledThreadPoolExecutorExt(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
  14. super(corePoolSize, threadFactory, handler);
  15. this.setMaximumPoolSize(corePoolSize);
  16. }
  17. public void registerHook(ThreadAroundHook hook) {
  18. synchronized (_hooks) {
  19. _hooks.add(hook);
  20. }
  21. }
  22. public void unregisterHook(ThreadAroundHook hook) {
  23. synchronized (_hooks) {
  24. _hooks.remove(hook);
  25. }
  26. }
  27. @Override
  28. protected void beforeExecute(Thread t, Runnable r) {
  29. ThreadContext.clearMap();
  30. ThreadContext.clearStack();
  31. ThreadAroundHook debugHook = null;
  32. List<ThreadAroundHook> tmpHooks;
  33. synchronized (_hooks) {
  34. tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
  35. }
  36. for (ThreadAroundHook hook : tmpHooks) {
  37. debugHook = hook;
  38. try {
  39. hook.beforeExecute(t, r);
  40. } catch (Exception e) {
  41. _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e);
  42. }
  43. }
  44. }
  45. @Override
  46. protected void afterExecute(Runnable r, Throwable t) {
  47. ThreadContext.clearMap();
  48. ThreadContext.clearStack();
  49. ThreadAroundHook debugHook = null;
  50. List<ThreadAroundHook> tmpHooks;
  51. synchronized (_hooks) {
  52. tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
  53. }
  54. for (ThreadAroundHook hook : tmpHooks) {
  55. debugHook = hook;
  56. try {
  57. hook.afterExecute(r, t);
  58. } catch (Exception e) {
  59. _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e);
  60. }
  61. }
  62. }
  63. }

另外,ScheduledThreadPoolExecutorExt是继承自ScheduledThreadPoolExecutor。本质上是一个任务调度线程池,用的工作队列也是一个延时工作队列。

小结

本文分析了ZStack的久经生产考验的核心组件——线程池。通过线程池,使并行编程变得不再那么复杂。

当然,其中也有一些可以改进的地方:

  • 一些加锁的地方(synchronized),可以通过使用并发容器解决。这样可以有效提升吞吐量,节省因为竞争锁而导致的开销。
  • 在提交大量任务的情况下,HashMap会因为扩容而导致性能耗损。可以考虑用固定大小的map并以hash key到固定entry的形式来保证数据结构不会持续扩容。
  • 队列是无界的。在大量任务请求阻塞时,会对内存造成极大的负担。
  • 任务队列无超时逻辑判断。ZStack中的调用绝大多数都是由MQ完成,每一个msg有着对应的超时时间。但是每一个任务却没有超时判定,这意味着一个任务执行时间过长时,后面的任务有可能进入了超时状态,而却没有挪出队列,配合之前提到的无界队列,就是一场潜在的灾难。针对这个问题,可以参考Zookeeper的SessionBucket 或Kafka的TimingWheel来解决这种问题。