3-工作队列使用-Workqueue文章中,我们已经简单的调用以下API来使用工作队列。

  1. /* 1. 自己创建一个workqueue, 中间参数为0,默认配置 */
  2. workqueue_test = alloc_workqueue("workqueue_test", 0, 0);
  3. /* 2. 初始化一个工作项,并添加自己实现的函数 */
  4. INIT_WORK(&work_test, work_test_func);
  5. /* 3. 将自己的工作项添加到指定的工作队列去, 同时唤醒相应线程处理 */
  6. queue_work(workqueue_test, &work_test);

可是为什么这个函数会被调用了? 在什么时候被调用的?这样的实现过程是最优的嘛? 带着上面几个问题,我们来深入分析一下工作队列。

创建工作队列

alloc_workqueue

alloc_workqueue 函数 调用了 __alloc_workqueue_key 来创建一个工作队列

  1. # kernel/workqueue.c
  2. struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
  3. unsigned int flags,
  4. int max_active,
  5. struct lock_class_key *key,
  6. const char *lock_name, ...)
  7. struct workqueue_struct *wq;
  8. struct pool_workqueue *pwq;
  9. ...
  10. /* (1) pwq 最多放到 worker_pool 中的 work 数*/
  11. max_active = max_active ?: WQ_DFL_ACTIVE;
  12. max_active = wq_clamp_max_active(max_active, flags, wq->name);
  13. /*对一系列的链表进行初始化*/
  14. ...
  15. /*给 workqueue 分配对应的 pool_workqueue*/
  16. /*pool_workqueue 将 workqueue 和 worker_pool 链接起来*/
  17. alloc_and_link_pwqs(wq)
  18. /*如果是 WQ_MEM_RECLAIM 类型的 workqueue 需要建立一个线程来保护*/
  19. if (flags & WQ_MEM_RECLAIM)
  20. struct worker *rescuer;
  21. rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
  22. wq->name);
  23. /*将绑定的线程绑定到CPU上*/
  24. kthread_bind_mask(rescuer->task, cpu_possible_mask);
  25. /*需要的话建立 sysfs 文件接口*/
  26. if (wq->flags & WQ_SYSFS)
  27. workqueue_sysfs_register(wq)
  28. /*将新的 workqueue 加入到全局链表 workqueues 中*/
  29. /*list_add_tail_rcu - add a new entry to rcu-protected list*/
  30. list_add_tail_rcu(&wq->list, &workqueues);

上面我们看到了一个全局链表 workqueues,以及提到了workqueue 和 worker_pool的绑定。下面我们来分析以这些变量和操作的作用。

workqueue 和 worker_pool 的绑定

  1. static int alloc_and_link_pwqs(struct workqueue_struct *wq)
  2. /* normal workqueue */
  3. if (!(wq->flags & WQ_UNBOUND))
  4. wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
  5. /*遍历每一个CPU*/
  6. for_each_possible_cpu(cpu)
  7. struct pool_workqueue *pwq =
  8. per_cpu_ptr(wq->cpu_pwqs, cpu);
  9. struct worker_pool *cpu_pools =
  10. per_cpu(cpu_worker_pools, cpu);
  11. init_pwq(pwq, wq, &cpu_pools[highpri]);
  12. /*绑定 struct workqueue_struct 和 struct pool_workqueue 结构体*/
  13. /*workqueue 和 pool_workqueue*/
  14. link_pwq(pwq);

worker

每个 worker 对应一个 worker_thread() 内核线程,一个 worker_pool 对应一个或者多个 worker。多个 worker 从同一个链表中 worker_pool->worklist 获取 work 进行处理。那么问题来了。

  1. worker 怎么处理 work;
  2. worker_pool 怎么动态管理 worker 的数量;


worker 处理 work

上面我们提到一个worker对应一个worker_thread() 内核线程,那么从代码入手

worker_thread

  1. kernel/workqueue.c
  2. static int worker_thread(void *__worker)
  3. struct worker *worker = __worker;
  4. struct worker_pool *pool = worker->pool;
  5. /* tell the scheduler that this is a workqueue worker */
  6. /*我们前面提到过 一个worker_pool 管理多个 worker
  7. 那么worker_pool 需要知道 worker 的状态,就是下面这面标志*/
  8. worker->task->flags |= PF_WQ_WORKER;
  9. ...
  10. do {
  11. /*遍历 pool->worklist 获得工作*/
  12. struct work_struct *work =
  13. list_first_entry(&pool->worklist,
  14. struct work_struct, entry);
  15. pool->watchdog_ts = jiffies;
  16. /*调度工作 work*/
  17. if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
  18. /* optimization path, not strictly necessary */
  19. process_one_work(worker, work);
  20. if (unlikely(!list_empty(&worker->scheduled)))
  21. process_scheduled_works(worker);
  22. } else {
  23. move_linked_works(work, &worker->scheduled, NULL);
  24. process_scheduled_works(worker);
  25. }
  26. } while (keep_working(pool));

上面会调用process_one_work 函数处理单个work.

process_one_work

  1. /**
  2. * process_one_work - process single work
  3. * @worker: self
  4. * @work: work to process
  5. *
  6. * Process @work. This function contains all the logics necessary to
  7. * process a single work including synchronization against and
  8. * interaction with other workers on the same cpu, queueing and
  9. * flushing. As long as context requirement is met, any worker can
  10. * call this function to process a work.
  11. *
  12. * CONTEXT:
  13. * spin_lock_irq(pool->lock) which is released and regrabbed.
  14. */
  15. static void process_one_work(struct worker *worker, struct work_struct *work)
  16. struct pool_workqueue *pwq = get_work_pwq(work);
  17. struct worker_pool *pool = worker->pool;
  18. worker->current_work = work;
  19. worker->current_func = work->func;
  20. worker->current_pwq = pwq;
  21. /*执行我们注册的函数,现在这里就会处理对应的函数了*/
  22. worker->current_func(work);
  23. list_del_init(&work->entry);

worker_thread 函数的创建

前面我们已经知道 worker_thread 线程是按照下面的顺序来处理

  1. worker_thread
  2. process_one_work
  3. 调用我们需要调用的回调函数