1. task-instance queued但不执行
def queue_command(self,
task_instance: TaskInstance,
command: CommandType,
priority: int = 1,
queue: Optional[str] = None, ):
"""Queues command to task"""
if task_instance.key not in self.queued_tasks and task_instance.key not in self.running:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)
else:
self.log.error("could not queue task %s", task_instance.key)
- task提交过程会按照下面逻辑进行调度
- scheduler发送当前taskInstance的请求进行对应的queue
- 触发
queue_command
判断 taskInstance 状态- 判断当前 taskInstance 的状态,是否在
set=(queued_task, running)
状态中 - 如果在上面状态中,进入
queued
状态,准备执行 - 如果步在上述状态,报错
"cound not queue task taskInstance"
,但是当前状态下 webserver 的上对应 taskInstance 的状态也是queued
的状态
- 判断当前 taskInstance 的状态,是否在
- 正常状态的 taskInstance 在Pool和queue并发没有问题的状态下会正常执行
2. celeryExecutor执行任务异常
表现:通过celery调度的Executor接收到task之后不能正常的执行,一直处于queued状态。web-UI上表现为任务pool中存在可用的slot,但是还有任务会卡在queued_slot中
原因:celery的预取机制导致的问题
- celery的预取机制通过
worker_multiplier_prefetch
参数控制,代表的是Executor上每个celery进程可以预取的任务数。- 例如,
worker_concurrency=4 AND worker_multiplier_prefetch = 2
代表每个Executor除了可以同时执行4个任务外,还可以预测4个任务在Executor上进行等待执行
- 例如,
解决方法:取消预取机制
worker_multiplier_prefetch = 1
task_act_late = True