在3-工作队列使用-Workqueue文章中,我们已经简单的调用以下API来使用工作队列。
/* 1. 自己创建一个workqueue, 中间参数为0,默认配置 */
workqueue_test = alloc_workqueue("workqueue_test", 0, 0);
/* 2. 初始化一个工作项,并添加自己实现的函数 */
INIT_WORK(&work_test, work_test_func);
/* 3. 将自己的工作项添加到指定的工作队列去, 同时唤醒相应线程处理 */
queue_work(workqueue_test, &work_test);
可是为什么这个函数会被调用了? 在什么时候被调用的?这样的实现过程是最优的嘛? 带着上面几个问题,我们来深入分析一下工作队列。
创建工作队列
alloc_workqueue
alloc_workqueue 函数 调用了 __alloc_workqueue_key 来创建一个工作队列
# kernel/workqueue.c
struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
unsigned int flags,
int max_active,
struct lock_class_key *key,
const char *lock_name, ...)
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
...
/* (1) pwq 最多放到 worker_pool 中的 work 数*/
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name);
/*对一系列的链表进行初始化*/
...
/*给 workqueue 分配对应的 pool_workqueue*/
/*pool_workqueue 将 workqueue 和 worker_pool 链接起来*/
alloc_and_link_pwqs(wq)
/*如果是 WQ_MEM_RECLAIM 类型的 workqueue 需要建立一个线程来保护*/
if (flags & WQ_MEM_RECLAIM)
struct worker *rescuer;
rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
wq->name);
/*将绑定的线程绑定到CPU上*/
kthread_bind_mask(rescuer->task, cpu_possible_mask);
/*需要的话建立 sysfs 文件接口*/
if (wq->flags & WQ_SYSFS)
workqueue_sysfs_register(wq)
/*将新的 workqueue 加入到全局链表 workqueues 中*/
/*list_add_tail_rcu - add a new entry to rcu-protected list*/
list_add_tail_rcu(&wq->list, &workqueues);
上面我们看到了一个全局链表 workqueues,以及提到了workqueue 和 worker_pool的绑定。下面我们来分析以这些变量和操作的作用。
workqueue 和 worker_pool 的绑定
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
/* normal workqueue */
if (!(wq->flags & WQ_UNBOUND))
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
/*遍历每一个CPU*/
for_each_possible_cpu(cpu)
struct pool_workqueue *pwq =
per_cpu_ptr(wq->cpu_pwqs, cpu);
struct worker_pool *cpu_pools =
per_cpu(cpu_worker_pools, cpu);
init_pwq(pwq, wq, &cpu_pools[highpri]);
/*绑定 struct workqueue_struct 和 struct pool_workqueue 结构体*/
/*workqueue 和 pool_workqueue*/
link_pwq(pwq);
worker
每个 worker 对应一个 worker_thread()
内核线程,一个 worker_pool 对应一个或者多个 worker。多个 worker 从同一个链表中 worker_pool->worklist 获取 work 进行处理。那么问题来了。
- worker 怎么处理 work;
- worker_pool 怎么动态管理 worker 的数量;
worker 处理 work
上面我们提到一个worker对应一个worker_thread()
内核线程,那么从代码入手
worker_thread
kernel/workqueue.c
static int worker_thread(void *__worker)
struct worker *worker = __worker;
struct worker_pool *pool = worker->pool;
/* tell the scheduler that this is a workqueue worker */
/*我们前面提到过 一个worker_pool 管理多个 worker
那么worker_pool 需要知道 worker 的状态,就是下面这面标志*/
worker->task->flags |= PF_WQ_WORKER;
...
do {
/*遍历 pool->worklist 获得工作*/
struct work_struct *work =
list_first_entry(&pool->worklist,
struct work_struct, entry);
pool->watchdog_ts = jiffies;
/*调度工作 work*/
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(pool));
上面会调用process_one_work 函数处理单个work.
process_one_work
/**
* process_one_work - process single work
* @worker: self
* @work: work to process
*
* Process @work. This function contains all the logics necessary to
* process a single work including synchronization against and
* interaction with other workers on the same cpu, queueing and
* flushing. As long as context requirement is met, any worker can
* call this function to process a work.
*
* CONTEXT:
* spin_lock_irq(pool->lock) which is released and regrabbed.
*/
static void process_one_work(struct worker *worker, struct work_struct *work)
struct pool_workqueue *pwq = get_work_pwq(work);
struct worker_pool *pool = worker->pool;
worker->current_work = work;
worker->current_func = work->func;
worker->current_pwq = pwq;
/*执行我们注册的函数,现在这里就会处理对应的函数了*/
worker->current_func(work);
list_del_init(&work->entry);
worker_thread 函数的创建
前面我们已经知道 worker_thread 线程是按照下面的顺序来处理
worker_thread
process_one_work
调用我们需要调用的回调函数