分享

Linux工作队列实现机制

 WUCANADA 2012-11-13

工作项、工作队列和工作者线程

推后执行的任务叫做工作(work),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct ,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events

工作队列(work queue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行—这个下半部分总是会在进程上下文执行,但由于是内核线程,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠。

通常,在工作队列和软中断/tasklet中作出选择非常容易。可使用以下规则
如果推后执行的任务需要睡眠,那么只能选择工作队列;
如果推后执行的任务需要延时指定的时间再触发,那么使用工作队列,因为其可以利用timer延时;
如果推后执行的任务需要在一个tick之内处理,则使用软中断或tasklet,因为其可以抢占普通进程和内核线程;
如果推后执行的任务对延迟的时间没有任何要求,则使用工作队列,此时通常为无关紧要的任务。

实际上,工作队列的本质就是将工作交给内核线程处理,因此其可以用内核线程替换。但是内核线程的创建和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以我们也推荐使用工作队列。

工作队列使用

相关文件:

kernel/include/linux/workqueue.h

Kernel/kernel/workqueue.c

工作队列的创建

要使用工作队列,需要先创建工作项,有两种方式:

1)静态创建:

DECLARE_WORK(name,function); 定义正常执行的工作项

DECLARE_DELAYED_WORK(name,function); 定义延后执行的工作项

2)动态创建,运行时创建:

通常在probe()函数中执行下面的操作来初始化工作项:

INIT_WORK(&work, new_ts_work);

INIT_DELAYED_WORK(&led_work,s0340_ledtime_scanf);

工作队列待执行的函数原型是:

typedef void(*work_func_t)(structwork_struct *work);

这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

创建了工作项之后,在适当的时候可以通过下面的两种方式来提交工作项给工作者线程,通常我们使用的工作队列和工作者线程都是系统初始化时候默认创建的。

工作队列的调度运行

schedule_work(&work)

&work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行

schedule_delayed_work(&delay_work,delay);

&delay_work指向的delay_work直到delay指定的时钟节拍用完以后才会执行。

eg:

schedule_delayed_work(&kpd_backlight_work,msecs_to_jiffies(300));

默认工作队列和工作者线程创建过程

系统默认的工作队列名称是:keventd_wq,默认的工作者线程叫:events/n这里的n是处理器的编号,每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。

默认的工作者线程会从多个地方得到被推后的工作。许多内核驱动程序都把它们的下半部交给默认的工作者线程去做。除非一个驱动程序或者子系统必须建立一个属于它自己的内核线程,否则最好使用默认线程。不过并不存在什么东西能够阻止代码创建属于自己的工作者线程。如果你需要在工作者线程中执行大量的处理操作,这样做或许会带来好处。处理器密集型和性能要求严格的任务会因为拥有自己的工作者线程而获得好处。

默认的工作队列keventd_wq只有一个,但是其工作者线程在每一个cpu上都有。而标记为singlethread的工作者线程最存在于一个cpu上。

关于默认工作队列keventd_wq和工作者线程events/n的建立在文件Kernel/kernel/workqueue.c中实现。

Start_kernel()-->rest_init(),该函数中创建了两个内核线程kernel_initkthreadd,这两个线程都和本文描述的部分有关系,先说说kernel_init

kernel_init()-->do_basic_setup()-->init_workqueues(),该函数中创建了上面提到的默认工作队列和工作者线程。

init_workqueues()-->

-->hotcpu_notifier(workqueue_cpu_callback,0);

-->keventd_wq=create_workqueue("events");

注册的cpu通知链cpu_chain上的回调函数是workqueue_cpu_callback(),raw_notifier_call_chain()函数用来调用cpu_chain上的所有回调函数。

这里主要关注的是函数:create_workqueue("events");

@kernel/include/linux/workqueue.h

#define __create_workqueue(name,singlethread,freezeable,rt)/

__create_workqueue_key((name),(singlethread),(freezeable),(rt),/NULL,NULL)

#define  create_workqueue(name)   __create_workqueue((name),0,0,0)

#define  create_rt_workqueue(name)   __create_workqueue((name),0,0,1)

#define  create_freezeable_workqueue(name)  __create_workqueue((name),1,1,0)

#define  create_singlethread_workqueue(name)  __create_workqueue((name),1,0,0)

从宏__create_workqueue的参数可以看出,可以通过传递不同的参数:是否单cpu线程,是否可冻结,是否实时来创建不同类型的工作队列和工作者线程。

work_struct工作项结构体定义:@kernel/include/linux/workqueue.h

工作队列workqueue_struct结构体:@kernel/kernel/workqueue.c

分析1

关键函数__create_workqueue_key()分析:

struct workqueue_struct *__create_workqueue_key(const char *name,

int singlethread,

int freezeable,

int rt,

struct lock_class_key *key,

const char *lock_name)

{

struct workqueue_struct *wq;

struct cpu_workqueue_struct *cwq;

int err 0, cpu;

wq kzalloc(sizeof(*wq), GFP_KERNEL);

if (!wq)

return NULL;

wq->cpu_wq alloc_percpu(struct cpu_workqueue_struct);

if (!wq->cpu_wq) {

kfree(wq);

return NULL;

}

wq->name name;

lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);

wq->singlethread singlethread;

wq->freezeable freezeable;

wq->rt rt;

INIT_LIST_HEAD(&wq->list);

if (singlethread) {// 创建单模块线程

cwq init_cpu_workqueue(wq, singlethread_cpu);  note -1

//初始化cpu_workqueue_struct结构体 cwq

// singlethread_cpu -- the first cpu in cpumask

err create_workqueue_thread(cwq, singlethread_cpu);  note 0

// kthread_create(worker_thread, cwq, fmt, wq->name, cpu);  note 1

// trace_workqueue_creation(cwq->thread, cpu); note 2

start_workqueue_thread(cwq, -1);  // run this thread

else {

cpu_maps_update_begin();

 

spin_lock(&workqueue_lock);

list_add(&wq->list, &workqueues);

spin_unlock(&workqueue_lock);

 

for_each_possible_cpu(cpu) {// 为每个cpu都建立一个对应的线程

cwq init_cpu_workqueue(wq, cpu);

if (err || !cpu_online(cpu))

continue;

err create_workqueue_thread(cwq, cpu);

start_workqueue_thread(cwq, cpu);

}

cpu_maps_update_done();

}

if (err) {

destroy_workqueue(wq);

wq NULL;

}

return wq;

}


Note -1: kernel/kernel/workqueue.c

static struct cpu_workqueue_struct *

init_cpu_workqueue(struct workqueue_struct *wq, int cpu)

{

struct cpu_workqueue_struct *cwq per_cpu_ptr(wq->cpu_wq, cpu);

cwq->wq wq;

spin_lock_init(&cwq->lock);

INIT_LIST_HEAD(&cwq->worklist);// 初始化工作项列表使用时提交的工作项都是挂接在这个链表上的

init_waitqueue_head(&cwq->more_work);

// 初始化等待队列头

return cwq;

}


Note 0: kernel/kernel/workqueue.c

static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)

{

struct sched_param param .sched_priority MAX_RT_PRIO-1 };

struct workqueue_struct *wq cwq->wq;

const char *fmt is_wq_single_threaded(wq) "%s" "%s/%d";

struct task_struct *p;

kthread_create(worker_threadcwqfmt, wq->name, cpu);

// fmt 线程命名格式cpu -- cpu number; cwq -- 传递的参数

// 线程函数worker_thread()

 

if (IS_ERR(p))

return PTR_ERR(p);

if (cwq->wq->rt)

sched_setscheduler_nocheck(p, SCHED_FIFO, ?m); 

// 是否需要设置实时属性

cwq->thread p;

 // cpu_workqueue_struct.thread 中记录返回线程的task_struct结构体

trace_workqueue_creation(cwq->thread, cpu);

return 0;

}


Note kernel/kernel/kthread.c 这里会牵扯到内核线程的创建机制,可以扩展一下

static DEFINE_SPINLOCK(kthread_create_lock);

static LIST_HEAD(kthread_create_list);

struct kthread_create_info

{

 int (*threadfn)(void *data);

void *data;

 struct task_struct *result;

struct completion done;

struct list_head list;

};

 

struct task_struct *kthread_create(int (*threadfn)(void *data),

   void *data,

   const char namefmt[],

   ...)

{

struct kthread_create_info create;

create.threadfn threadfn;

create.data data;

init_completion(&create.done); // 初始化完成量

spin_lock(&kthread_create_lock);

list_add_tail(&create.list, &kthread_create_list); 

// 将新建的kthread挂接到全局的线程链表kthread_create_list

spin_unlock(&kthread_create_lock);

wake_up_process(kthreadd_task);

// kthreadd_task find_task_by_pid_ns(pid, &init_pid_ns); kernel/init/main.c // rest_init()中初始化,该指针保存的是线程kthreaddtask_struct结构体指针。

// 唤醒线程kthreadd

wait_for_completion(&create.done);

// 等待完成量,我们这里转到kthreadd线程的执行函数中去看一下,这个完成量的唤醒应该是在kthreadd线程中做的,kthreadd线程应该是根据kthread_create_list上挂接的kthread_create_info结构体来创建特定线程。

// 这部分关于内核线程创建的机制请阅读分析文档:内核线程创建 目录中的相关文件和内核源码。这里不再详细分析。

// 新线程创建ok后,进入了睡眠,然后唤醒了对应的完成量create.done,这边继续执行

if (!IS_ERR(create.result)) 

// create.result中保存的是新创建内核线程的task_struct结构体指针

struct sched_param param .sched_priority };

va_list args;

va_start(args, namefmt);

vsnprintf(create.result->comm, sizeof(create.result->comm),

  namefmt, args);// 设置当前线程的名字

  // 名字的格式来源于函数上层上层调用函数,这里是来源于工作队列创建函数create_workqueue_thread()中:kernel/kernel/workqueue.c

// const char *fmt is_wq_single_threaded(wq) "%s" "%s/%d";

va_end(args);

 

sched_setscheduler_nocheck(create.result, SCHED_NORMAL, ?m); 

// 调度策略设置

set_cpus_allowed_ptr(create.result, cpu_all_mask);

}

return create.result;// 返回的是新建内核线程的task_struct结构体指针

}

kthread_create()函数通过专门创建线程的内核线程kthreadd创建了公用线程kthread,而在该kthread线程函数中调用其参数传递进来的回调函数threadfn(),这个threadfn()函数就是我们调用kthread_create()函数时传递进来的第一个参数,第二个参数则是执行回调函数时的参数。该函数原形如下:

struct task_struct *kthread_create(int (*threadfn)(void *data),

   void *data,

   const char namefmt[],

   ...)

调用示例:

kthread_create(worker_thread, cwq, fmt, wq->name, cpu);

这个线程创建ok之后,会在线程kthread中调用函数worker_thread(cwq);

worker_thread()函数如下,是每一个工作者线程的共用的线程函数。其实工作队列对应的数据结构是workqueue_struct,而该结构体中包含一个对应cpu的数据结构cpu_workqueue_struct,这个数据结构中包含了工作项链表worklist。而所有的工作者线程,只是名字不一样而已,所跑的线程函数都是一样:worker_thread

static int worker_thread(void *__cwq)

{

struct cpu_workqueue_struct *cwq __cwq;

DEFINE_WAIT(wait);// 定义一个等待队列项wait 

// kernel/include/linux/wait.h

if (cwq->wq->freezeable)

set_freezable();// current->flags &= ~PF_NOFREEZE;

for (;;) {

prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);

// 可中断睡眠

// 准备进入睡眠等待,wait加入cwq->more_work等待队列头中,

// 设置非独占进程标志

// 和可中断睡眠标志 kernel/kernel/wait.c

if (!freezing(current) && 

    !kthread_should_stop() &&

    list_empty(&cwq->worklist))// 

    // 当前进程是非冻结状态,当前线程没停止,同时工作项列表为空

    // 的时候进入睡眠让出cpu

schedule();

finish_wait(&cwq->more_work, &wait);// 当前线程被唤醒后马上要做的事情

try_to_freeze();

if (kthread_should_stop())// 检查当前线程是否被要求stop

break;

run_workqueue(cwq);// 运行工作项中对应的函数

}

return 0;

}




+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

在分析函数run_workqueue()之前,我们先来看一下,提交工作项的时候发生了什么事情。还是从函数int schedule_work(struct work_struct *work);开始说起吧!

kernel/kernel/workqueue.c

int schedule_work(struct work_struct *work)

{

return queue_work(keventd_wqwork);

}

keventd_wq工作队列是在函数init_workqueues()中创建的(参看前文),所有这里在提交工作项的时候就用上了。

int queue_work(struct workqueue_struct *wq, struct work_struct *work)

{

int ret;

ret queue_work_on(get_cpu(), wq, work);

put_cpu();

return ret;

}

该函数将work工作项提交到当前做该项提交的cpu上的工作队列wq上,如果这个cpu被标记为die,那么可以提交到别的cpu上去执行。返回0,表示该项工作已经提交过,还没执行。非0表示提交成功。

int  queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)

{

int ret 0;

if (!test_and_set_bit(WORK_STRUCT_PENDINGwork_data_bits(work))) {

BUG_ON(!list_empty(&work->entry));

__queue_work(wq_per_cpu(wq, cpu), work);

ret 1;

}

return ret;

}


kernel/include/linux/workqueue.h

#define work_data_bits(work) ((unsigned long *)(&(work)->data))

work_struct结构体的第一个word中保留该标识,宏也在该结构体中定义。

test_and_set_bit(int nr, volatile void *addr)  *addr的第nr位设置为1,并返回它的原值。

Linux内核的原子操作


工作项在初始化的时候会调用WORK_DATA_INIT()宏来将work_structdata域初始化成0,所有这里!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))结果为1

static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)

{

unsigned long flags;

spin_lock_irqsave(&cwq->lock, flags);

insert_work(cwq, work, &cwq->worklist);

spin_unlock_irqrestore(&cwq->lock, flags);

}

static void insert_work(struct cpu_workqueue_struct *cwq,

struct work_struct *work, struct list_head *head)

{

trace_workqueue_insertion(cwq->thread, work);

set_wq_data(work, cwq);// 设置work_structpending未决标志

smp_wmb();// 多处理器的相关动作

list_add_tail(&work->entry, head);// 工作项加入链表

wake_up(&cwq->more_work);// 唤醒等待在该等待队列头上的所有等待队列项

}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

static void run_workqueue(struct cpu_workqueue_struct *cwq)

{

spin_lock_irq(&cwq->lock);

while (!list_empty(&cwq->worklist)) {

struct work_struct *work list_entry(cwq->worklist.next,

struct work_structentry);

work_func_t work->func;// 取出工作项函数

#ifdef CONFIG_LOCKDEP

 

struct lockdep_map lockdep_map work->lockdep_map;

#endif

trace_workqueue_execution(cwq->thread, work);

cwq->current_work work;

list_del_init(cwq->worklist.next);// 从链表中删除工作项节点

spin_unlock_irq(&cwq->lock);

BUG_ON(get_wq_data(work) != cwq);

work_clear_pending(work);

lock_map_acquire(&cwq->wq->lockdep_map);

lock_map_acquire(&lockdep_map);

f(work);

// 执行对应的工作项函数,将work_struct结构体指针作为参数传递进去

lock_map_release(&lockdep_map);

lock_map_release(&cwq->wq->lockdep_map);

if (unlikely(in_atomic() || lockdep_depth(current) 0)) {

printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "

"%s/0xx/%d/n",

current->comm, preempt_count(),

       task_pid_nr(current));

printk(KERN_ERR    last function: ");

print_symbol("%s/n", (unsigned long)f);

debug_show_held_locks(current);

dump_stack();

}

spin_lock_irq(&cwq->lock);

cwq->current_work NULL;

}

spin_unlock_irq(&cwq->lock);

}

我们在新建工作项的时候,需要将工作函数的参数设置成work_struct 结构体指针,例如:

static void sitronix_ts_work(struct work_struct *work)

INIT_WORK(&priv->work, sitronix_ts_work);

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

虽然以上内容是通过创建系统默认的工作队列keventd_wq和工作者线程events/n来分析了其创建过程,提交工作项过程和提交工作后唤醒工作者线程之后的所做的动作。

其实我们自己也可以使用这些接口来创建独立的工作队列和工作者线程来专门为特定的任务服务,例如在Androidlinux的睡眠和唤醒架构中就使用这种方式,kernel/kernel/power/wakelock.c

core_initcall(wakelocks_init)wakelocks_init()函数中有创建两个工作队列和其对于的工作者线程:

sys_sync_work_queue create_singlethread_workqueue("fs_sync");

suspend_work_queue create_singlethread_workqueue("suspend");

early suspend的时候调用:kernel/kernel/power/earlysuspend.c

static DECLARE_WORK(early_sys_sync_work, early_sys_sync);

queue_work(sys_sync_work_queue, &early_sys_sync_work);

static DECLARE_WORK(early_suspend_work, early_suspend);

queue_work(suspend_work_queue, &early_suspend_work);

suspend的时候调用:kernel/kernel/power/wakelock.c

static DECLARE_WORK(suspend_work, suspend);

queue_work(suspend_work_queue, &suspend_work);

下面来看一看延时执行的工作项是如何提交的,这里和上面共同的部分不讨论,只讨论如何实现的延时执行,其余部分是相同的。

delayed_work结构体的定义:kernel/include/linux/workqueue.h

struct delayed_work {

struct work_struct work;

struct timer_list timer;

// work_struct结构体进行了封装,添加了一个timer_list结构体

};

#define DECLARE_DELAYED_WORK(n, f)/

struct delayed_work __DELAYED_WORK_INITIALIZER(n, f)

#define __DELAYED_WORK_INITIALIZER(n, f) {/

.work __WORK_INITIALIZER((n).work, (f)),/

.timer TIMER_INITIALIZER(NULL, 0, 0),/

// 初始化work_struct结构体和前文方式一样,这里需要多初始化timer域。

kernel/include/linux/timer.h

#define TIMER_INITIALIZER(_function, _expires, _data) {/

.entry .prev TIMER_ENTRY_STATIC },/

.function (_function),/

.expires (_expires),/

.data (_data),/

.base &boot_tvec_bases,/

__TIMER_LOCKDEP_MAP_INITIALIZER(/

__FILE__ ":" __stringify(__LINE__))/

}

通常情况下使用的定义一个定时器也是调用该宏来初始化:
#define DEFINE_TIMER(_name, _function, _expires, _data)
/

struct timer_list _name =/

TIMER_INITIALIZER(_function, _expires, _data)

提交一个延时执行的工作项使用函数:

int schedule_delayed_work(struct delayed_work *dwork,  unsigned long delay)

{

return queue_delayed_work(keventd_wqdwork, delay);

// delay 单位是jiffies,或者传递0的话,就是立即执行和schedule_work()一样了

     // kernel/kernel/timer.c文件中有实现一些time to jiffies的函数:

     // msecs_to_jiffied() 、 usecs_to_jiffies()

int queue_delayed_work(struct workqueue_struct *wq,

  struct delayed_work *dwork, unsigned long delay)

{

if (delay == 0)// 如果传递进来的delay0,那么走立即执行的通路

return queue_work(wq, &dwork->work);

return queue_delayed_work_on(-1, wq, dwork, delay);

}

int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,

struct delayed_work *dwork, unsigned long delay)

{

int ret 0;

struct timer_list *timer &dwork->timer;

struct work_struct *work &dwork->work;

// test_and_set_bit()设置特定位并传回该位原来的值

// 如果未决位为0,设置pending未决位后返回0

if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {

BUG_ON(timer_pending(timer));

BUG_ON(!list_empty(&work->entry));

timer_stats_timer_set_start_info(&dwork->timer);

 

set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));

timer->expires jiffies delay;// 到时时间阀值

timer->data (unsigned long)dwork;// 向定时执行函数传递的参数

timer->function delayed_work_timer_fn;// 定时执行函数

if (unlikely(cpu >= 0))

add_timer_on(timer, cpu);

else

add_timer(timer);// 向系统添加一个timer

ret 1;

}

return ret;

}

static void delayed_work_timer_fn(unsigned long __data)

{

struct delayed_work *dwork (struct delayed_work *)__data;

struct cpu_workqueue_struct *cwq get_wq_data(&dwork->work);

struct workqueue_struct *wq cwq->wq;

__queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);

}

看到函数__queue_work()是不是觉得很眼熟呢?没错,延时执行的工作项走的提交路线和正常提交工作项在该函数之前不一样,后面后市一样了。换句话说,提交延时工作项,只是延时提交了而已,并不是立即提交给工作者线程,让其工作者线程延时来执行

 

其余函数介绍:

void flush_workqueue(struct workqueue_struct *wq);

此函数刷新指定工作队列,他会一直等待,知道该工作队列中所有工作项都已完成。

void flush_scheduled_work(void)

和上面函数类似,只是刷新默认工作队列:keventd_wq

void flush_delayed_work(struct delayed_work *dwork)

等待一个delayed_work执行完。

int flush_work(struct work_struct *work)

等待一个work执行完。

如何取消提交的延时工作项?

cancel_work_sync(struct work_struct *work);

该函数取消已排在工作队列中的未决work,返回true。如果workcallback已经在运行了,那么该函数将会阻塞到其执行完毕。

static inline int __cancel_delayed_work(struct delayed_work *work)

{

int ret;

ret del_timer(&work->timer);

if (ret)

work_clear_pending(&work->work);

return ret;

}

// if it returns the timer function may be running and the queueing is in progress.

static inline int cancel_delayed_work(struct delayed_work *work)

{

int ret;

ret del_timer_sync(&work->timer); // 阻塞直到定时函数执行完

if (ret)

work_clear_pending(&work->work);

return ret;

}

// 同上

三、工作队列新老版本比较

http://liaowb1234.blog.163.com/blog/static/77155547200911296838120/

这篇网文已有详细的说明,请参考。


博客分析2

工作队列(work queue)是另外一种将工作推后执行的形式,它和tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。

那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet

  1. 工作、工作队列和工作者线程

如前所述,我们把推后执行的任务叫做工作(work ),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue ),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events, 自己也可以创建自己的工作者线程。

  1. 表示工作的数据结构

工作用<linux/workqueue.h> 中定义的work_struct 结构表示:

structwork_struct{

        unsigned longpending;

        structlist_head entry;

        void(*func) (void*);

        void*data;

        void*wq_data;

        struct timer_listtimer;

};

这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。

3. 创建推后的工作

要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK 在编译时静态地建该结构:

DECLARE_WORK(name, void (*func) (void *), void *data);

这样就会静态地创建一个名为name ,待执行函数为func ,参数为data work_struct 结构。

同样,也可以在运行时通过指针创建一个工作:

INIT_WORK(struct work_struct *work, woid(*func) (void*), void *data);

这会动态地初始化一个由work 指向的工作。

4. 工作队列中待执行的函数

工作队列待执行的函数原型是:

void work_handler(void *data)

这 个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是, 尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运 行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

5. 对工作进行调度

现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events 工作线程,只需调用

schedule_work(&work)

work 马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。

有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

schedule_delayed_work(&work, delay);

这时,&work 指向的work_struct 直到delay 指定的时钟节拍用完以后才会执行。

6. 工作队列的简单应用

#include<linux/module.h>
#include<linux/init.h>
#include<linux/workqueue.h>

staticstructworkqueue_struct *queue=NULL;
staticstructwork_struct work;

staticvoidwork_handler(structwork_struct *data)
{
        printk
(KERN_ALERT "workhandler function./n" );
}

staticint__init test_init(void)
{
        
queue=create_singlethread_workqueue("helloworld");
        
if(!queue)
                
gotoerr;

        INIT_WORK
(&work,work_handler);
        schedule_work
(&work);

        
return0;
err
:
        
return-1;
}

staticvoid__exit test_exit(void)
{
        destroy_workqueue
(queue);
}
MODULE_LICENSE
("GPL");
module_init
(test_init);
module_exit
(test_exit);



&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&

CMWQ

以上博文是比较老的工作队列机制,看了2.6.39的kernel workqueue.txt,采用新的cmwq机制:Concurrency Managed Workqueue并发管现工作队列

1. Introduction

There are many cases where an asynchronous process execution context
is needed and the workqueue (wq) API is the most commonly used
mechanism for such cases.

When such an asynchronous execution context is needed, a work item
describing which function to execute is put on a queue.  An
independent thread serves as the asynchronous execution context.  The
queue is called workqueue and the thread is called worker.

While there are work items on the workqueue the worker executes the
functions associated with the work items one after the other.  When
there is no work item left on the workqueue the worker becomes idle.
When a new work item gets queued, the worker begins executing again.


2. Why cmwq?

In the original wq implementation, a multi threaded (MT) wq had one
worker thread per CPU and a single threaded (ST) wq had one worker
thread system-wide.  A single MT wq needed to keep around the same
number of workers as the number of CPUs.  The kernel grew a lot of MT
wq users over the years and with the number of CPU cores continuously
rising, some systems saturated the default 32k PID space just booting
up.
 传统的wq的实现机制是:MT wq在每一个CPU上都有一个工作线程,ST wq只有一个系统默认的工作线程.一个单一的MT wq需要保持 与CPU数量一致的工作.随着CPU core不断增长的,kernel增加了大量的MT wq使用,在起动时用了32K PID space就让一些系统饱和了.

Although MT wq wasted a lot of resource, the level of concurrency
provided was unsatisfactory.  The limitation was common to both ST and
MT wq albeit less severe on MT.  Each wq maintained its own separate
worker pool.  A MT wq could provide only one execution context per CPU
while a ST wq one for the whole system.  Work items had to compete for
those very limited execution contexts leading to various problems
including proneness to deadlocks around the single execution context.

MT wq不但浪费资源,而且并发能力也不如人意.这种局限性都出现在ST和MT wq,即使MT比ST稍好点.每个wq维护自己独自的工作池.一个MT wq能在每一个CPU上执行上下文,ST wq是所有系统所共用的执行上下文.工作条目在非常有限的执行上下文竞争,导致了许多严重的问题,比如单一的执行上下文时的都易于死锁


The tension between the provided level of concurrency and resource
usage also forced its users to make unnecessary tradeoffs like libata
choosing to use ST wq for polling PIOs and accepting an unnecessary
limitation that no two polling PIOs can progress at the same time.  As
MT wq don't provide much better concurrency, users which require
higher level of concurrency, like async or fscache, had to implement
their own thread pool.

Concurrency Managed Workqueue (cmwq) is a reimplementation of wq with
focus on the following goals.

* Maintain compatibility with the original workqueue API.
   与传统的工作队列API兼容
* Use per-CPU unified worker pools shared by all wq to provide
  flexible level of concurrency on demand without wasting a lot of
  resource.
  使用
所用wq共享出来的运行在每个CPU上的统一的工作池,使之更灵活的并发级别需求而不浪费资源
* Automatically regulate worker pool and level of concurrency so that
  the API users don't need to worry about such details.
  自动管理工作池和并发级别,api使用都并不需要担心这些细节

3. The Design

In order to ease the asynchronous execution of functions a new
abstraction, the work item, is introduced.
用于异步通信
A work item is a simple struct that holds a pointer to the function
that is to be executed asynchronously.  Whenever a driver or subsystem
wants a function to be executed asynchronously it has to set up a work
item pointing to that function and queue that work item on a
workqueue.
工作条目是一个简单的结构体,它有一个指针指向一个函数,这个函数将被异步地执行.如果想用一个被异步执行的函数,必须建立一个指行这个函数的工作条目并且将这个工作条目排列在工作队列上
Special purpose threads, called worker threads, execute the functions
off of the queue, one after the other.  If no work is queued, the
worker threads become idle.  These worker threads are managed in so
called thread-pools.
特定目的线程叫工作线程,它将一个接着一个地执行队列上的函数(工作条目所指的函数).如果没有工作可执行,工作线程会idle.所有的工作线程被线程池管理着.
The cmwq design differentiates between the user-facing workqueues that
subsystems and drivers queue work items on and the backend mechanism
which manages thread-pool and processes the queued work items.
cmwq设计了与user-facing的工作队列(用于排列工作条目)不同的后端机制(管理工作线程池和处理排列好的工作条目)
The backend is called gcwq.  There is one gcwq for each possible CPU
and one gcwq to serve work items queued on unbound workqueues.
后端叫gcwq,每一个CPU都有一个gcwq,每个gcwq用来服务于末绑定的工作队列上的工作条目
Subsystems and drivers can create and queue work items through special
workqueue API functions as they see fit. They can influence some
aspects of the way the work items are executed by setting flags on the
workqueue they are putting the work item on. These flags include
things like CPU locality, reentrancy, concurrency limits and more. To
get a detailed overview refer to the API description of
alloc_workqueue() below.
通过设置工作队列上的flag,工作条目就以不同的方式被执行,这些flag有,cpu 位置,重入,并发限制等
When a work item is queued to a workqueue, the target gcwq is
determined according to the queue parameters and workqueue attributes
and appended on the shared worklist of the gcwq.  For example, unless
specifically overridden, a work item of a bound workqueue will be
queued on the worklist of exactly that gcwq that is associated to the
CPU the issuer is running on.
当作条目被排入工作队列时,根据队列的参数和工作队列的属性来确定目 标gcwq,并且将之追加到gcwq的共享worklist上.例如,除非特别是overridden,一个绑定的工作队列的工作条目将会被精确地排列到 worklist中去,与CPU相关的gcwq作为发起者开始运行.
For any worker pool implementation, managing the concurrency level
(how many execution contexts are active) is an important issue.  cmwq
tries to keep the concurrency at a minimal but sufficient level.
Minimal to save resources and sufficient in that the system is used at
its full capacity.
任何一个工作池的实现,管理并发级别(同时有多少个执行上下在运行)是一个重要的问题.cmwq尽量保持足够且最小的并发级别,在充分的使用系统时尽量减小保存资源
Each gcwq bound to an actual CPU implements concurrency management by
hooking into the scheduler.  The gcwq is notified whenever an active
worker wakes up or sleeps and keeps track of the number of the
currently runnable workers.  Generally, work items are not expected to
hog a CPU and consume many cycles.  That means maintaining just enough
concurrency to prevent work processing from stalling should be
optimal.  As long as there are one or more runnable workers on the
CPU, the gcwq doesn't start execution of a new work, but, when the
last running worker goes to sleep, it immediately schedules a new
worker so that the CPU doesn't sit idle while there are pending work
items.  This allows using a minimal number of workers without losing
execution bandwidth.
每一个gcwq绑定在特定的CPU上,通过hooking into the scheduler实现并发管理.只要有工作唤醒或者睡眼的活动,都会通知gcwq, gcwq一直追踪运行工作的数量.一般来说,不希望工作条目独占CPU并且多次运行,这就意味道着维护刚刚好的发并发来阻止工作处理拖拉,是最理想的.当 有很多待处理的工作时,只要CPU上有工作在运行,gcwq不会开始执行新的工作,一旦最后一个运行的工作进入睡眠,它就会立即调度新工作,因此CPU就 不会idle.这允许使用最小工作数量而并不损失执行带宽
Keeping idle workers around doesn't cost other than the memory space
for kthreads, so cmwq holds onto idle ones for a while before killing
them.
保持工作线程idle并不消额外内存空间,cmwq将会保留它一会后就kill它
For an unbound wq, the above concurrency management doesn't apply and
the gcwq for the pseudo unbound CPU tries to start executing all work
items as soon as possible.  The responsibility of regulating
concurrency level is on the users.  There is also a flag to mark a
bound wq to ignore the concurrency management.  Please refer to the
API section for details.
对于没有绑定的wq,以上的并发管现并不实用,pseudo绑定CPU的gcwq会试着尽快执行工作条目,管理并发级别的责任在于用户,有一个flag来标记绑定wq来忽略并发管理
Forward progress guarantee relies on that workers can be created when
more execution contexts are necessary, which in turn is guaranteed
through the use of rescue workers.  All work items which might be used
on code paths that handle memory reclaim are required to be queued on
wq's that have a rescue-worker reserved for execution under memory
pressure.  Else it is possible that the thread-pool deadlocks waiting
for execution contexts to free up.
当更多的执行上下文需求时,进一步的工作保证依靠能创建工作,通过用rescue工作反过来保证.所有的工作条目可能被用来内存回收的代码路径都必须排队,专门有一个rescue-worker来执行.否则很有可能线程池会死锁等待执行上下文来释放

4. Application Programming Interface (API)

alloc_workqueue() allocates a wq.  The original create_*workqueue()
functions are deprecated and scheduled for removal.  alloc_workqueue()
takes three arguments - @name, @flags and @max_active.  @name is the
name of the wq and also used as the name of the rescuer thread if
there is one.

A wq no longer manages execution resources but serves as a domain for
forward progress guarantee, flush and work item attributes.  @flags
and @max_active control how work items are assigned execution
resources, scheduled and executed

@flags:

  WQ_NON_REENTRANT

    By default, a wq guarantees non-reentrance only on the same
    CPU.  A work item may not be executed concurrently on the same
    CPU by multiple workers but is allowed to be executed
    concurrently on multiple CPUs.  This flag makes sure
    non-reentrance is enforced across all CPUs.  Work items queued
    to a non-reentrant wq are guaranteed to be executed by at most
    one worker system-wide at any given time.

  WQ_UNBOUND

    Work items queued to an unbound wq are served by a special
    gcwq which hosts workers which are not bound to any specific
    CPU.  This makes the wq behave as a simple execution context
    provider without concurrency management.  The unbound gcwq
    tries to start execution of work items as soon as possible.
    Unbound wq sacrifices locality but is useful for the following
    cases.

    * Wide fluctuation in the concurrency level requirement is
      expected and using bound wq may end up creating large number
      of mostly unused workers across different CPUs as the issuer
      hops through different CPUs.

    * Long running CPU intensive workloads which can be better
      managed by the system scheduler.

  WQ_FREEZABLE

    A freezable wq participates in the freeze phase of the system
    suspend operations.  Work items on the wq are drained and no
    new work item starts execution until thawed.

  WQ_MEM_RECLAIM

    All wq which might be used in the memory reclaim paths _MUST_
    have this flag set.  The wq is guaranteed to have at least one
    execution context regardless of memory pressure.

  WQ_HIGHPRI

    Work items of a highpri wq are queued at the head of the
    worklist of the target gcwq and start execution regardless of
    the current concurrency level.  In other words, highpri work
    items will always start execution as soon as execution
    resource is available.

    Ordering among highpri work items is preserved - a highpri
    work item queued after another highpri work item will start
    execution after the earlier highpri work item starts.

    Although highpri work items are not held back by other
    runnable work items, they still contribute to the concurrency
    level.  Highpri work items in runnable state will prevent
    non-highpri work items from starting execution.

    This flag is meaningless for unbound wq.

  WQ_CPU_INTENSIVE

    Work items of a CPU intensive wq do not contribute to the
    concurrency level.  In other words, runnable CPU intensive
    work items will not prevent other work items from starting
    execution.  This is useful for bound work items which are
    expected to hog CPU cycles so that their execution is
    regulated by the system scheduler.

    Although CPU intensive work items don't contribute to the
    concurrency level, start of their executions is still
    regulated by the concurrency management and runnable
    non-CPU-intensive work items can delay execution of CPU
    intensive work items.

    This flag is meaningless for unbound wq.

  WQ_HIGHPRI | WQ_CPU_INTENSIVE

    This combination makes the wq avoid interaction with
    concurrency management completely and behave as a simple
    per-CPU execution context provider.  Work items queued on a
    highpri CPU-intensive wq start execution as soon as resources
    are available and don't affect execution of other work items.

@max_active:

@max_active determines the maximum number of execution contexts per
CPU which can be assigned to the work items of a wq.  For example,
with @max_active of 16, at most 16 work items of the wq can be
executing at the same time per CPU.

Currently, for a bound wq, the maximum limit for @max_active is 512
and the default value used when 0 is specified is 256.  For an unbound
wq, the limit is higher of 512 and 4 * num_possible_cpus().  These
values are chosen sufficiently high such that they are not the
limiting factor while providing protection in runaway cases.

The number of active work items of a wq is usually regulated by the
users of the wq, more specifically, by how many work items the users
may queue at the same time.  Unless there is a specific need for
throttling the number of active work items, specifying '0' is
recommended.

Some users depend on the strict execution ordering of ST wq.  The
combination of @max_active of 1 and WQ_UNBOUND is used to achieve this
behavior.  Work items on such wq are always queued to the unbound gcwq
and only one work item can be active at any given time thus achieving
the same ordering property as ST wq.


5. Example Execution Scenarios

The following example execution scenarios try to illustrate how cmwq
behave under different configurations.

 Work items w0, w1, w2 are queued to a bound wq q0 on the same CPU.
 w0 burns CPU for 5ms then sleeps for 10ms then burns CPU for 5ms
 again before finishing.  w1 and w2 burn CPU for 5ms then sleep for
 10ms.

Ignoring all other tasks, works and processing overhead, and assuming
simple FIFO scheduling, the following is one highly simplified version
of possible sequences of events with the original wq.

 TIME IN MSECS    EVENT
        w0 starts and burns CPU
        w0 sleeps
 15        w0 wakes up and burns CPU
 20        w0 finishes
 20        w1 starts and burns CPU
 25        w1 sleeps
 35        w1 wakes up and finishes
 35        w2 starts and burns CPU
 40        w2 sleeps
 50        w2 wakes up and finishes

And with cmwq with @max_active >= 3,

 TIME IN MSECS    EVENT
        w0 starts and burns CPU
        w0 sleeps
        w1 starts and burns CPU
 10        w1 sleeps
 10        w2 starts and burns CPU
 15        w2 sleeps
 15        w0 wakes up and burns CPU
 20        w0 finishes
 20        w1 wakes up and finishes
 25        w2 wakes up and finishes

If @max_active == 2,

 TIME IN MSECS    EVENT
        w0 starts and burns CPU
        w0 sleeps
        w1 starts and burns CPU
 10        w1 sleeps
 15        w0 wakes up and burns CPU
 20        w0 finishes
 20        w1 wakes up and finishes
 20        w2 starts and burns CPU
 25        w2 sleeps
 35        w2 wakes up and finishes

Now, let's assume w1 and w2 are queued to a different wq q1 which has
WQ_HIGHPRI set,

 TIME IN MSECS    EVENT
        w1 and w2 start and burn CPU
        w1 sleeps
 10        w2 sleeps
 10        w0 starts and burns CPU
 15        w0 sleeps
 15        w1 wakes up and finishes
 20        w2 wakes up and finishes
 25        w0 wakes up and burns CPU
 30        w0 finishes

If q1 has WQ_CPU_INTENSIVE set,

 TIME IN MSECS    EVENT
        w0 starts and burns CPU
        w0 sleeps
        w1 and w2 start and burn CPU
 10        w1 sleeps
 15        w2 sleeps
 15        w0 wakes up and burns CPU
 20        w0 finishes
 20        w1 wakes up and finishes
 25        w2 wakes up and finishes


6. Guidelines

* Do not forget to use WQ_MEM_RECLAIM if a wq may process work items
  which are used during memory reclaim.  Each wq with WQ_MEM_RECLAIM
  set has an execution context reserved for it.  If there is
  dependency among multiple work items used during memory reclaim,
  they should be queued to separate wq each with WQ_MEM_RECLAIM.

* Unless strict ordering is required, there is no need to use ST wq.

* Unless there is a specific need, using 0 for @max_active is
  recommended.  In most use cases, concurrency level usually stays
  well under the default limit.

* A wq serves as a domain for forward progress guarantee
  (WQ_MEM_RECLAIM, flush and work item attributes.  Work items which
  are not involved in memory reclaim and don't need to be flushed as a
  part of a group of work items, and don't require any special
  attribute, can use one of the system wq.  There is no difference in
  execution characteristics between using a dedicated wq and a system
  wq.

* Unless work items are expected to consume a huge amount of CPU
  cycles, using a bound wq is usually beneficial due to the increased
  level of locality in wq operations and work item execution.


7. Debugging

Because the work functions are executed by generic worker threads
there are a few tricks needed to shed some light on misbehaving
workqueue users.

Worker threads show up in the process list as:

root      5671  0.0  0.0         0 ?          12:07   0:00 [kworker/0:1]
root      5672  0.0  0.0         0 ?          12:07   0:00 [kworker/1:2]
root      5673  0.0  0.0         0 ?          12:12   0:00 [kworker/0:0]
root      5674  0.0  0.0         0 ?          12:13   0:00 [kworker/1:0]

If kworkers are going crazy (using too much cpu), there are two types
of possible problems:

    1. Something beeing scheduled in rapid succession
    2. A single work item that consumes lots of cpu cycles

The first one can be tracked using tracing:

    $ echo workqueue:workqueue_queue_work > /sys/kernel/debug/tracing/set_event
    $ cat /sys/kernel/debug/tracing/trace_pipe > out.txt
    (wait a few secs)
    ^C

If something is busy looping on work queueing, it would be dominating
the output and the offender can be determined with the work item
function.

For the second type of problems it should be possible to just check
the stack trace of the offending worker thread.

    $ cat /proc/THE_OFFENDING_KWORKER/stack

The work item's function should be trivially visible in the stack
trace.

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多