分享

工作队列

 点点阅 2017-07-24


工作队列
    工作队列是另一种将工作推后执行的形式,它可以把工作交给一个内核线程去执行,这个下半部是在进程上下文中执行的,因此,它可以重新调度还有睡眠。
    
区分使用软中断/tasklet还是工作队列比较简单,如果推后的工作不需要睡眠,那么就选择软中断或tasklet,但如果需要一个可以重新调度,可以睡眠,可以获取内存,可以获取信号量,可以执行阻塞式I/O操作时,那么,请选择工作队列吧!
    
在老的内核当中(2.6.36之前)工作队列是内核创建一个专门的工作者线程,它有一条任务链表,当设备或者内核某进程有部分任务需要推后处理的时候就把任务挂载在工作者线程的任务链表上,然后会在未来的某个时刻,工作者线程被调度,它就会去处理挂载在任务链表上的任务了。
    
内核有一个缺省的工作者线程叫events/nn是处理器的编号:每个处理器对应一个线程。如单处理器只有一个events/0,而双处理器会多出一个events/1。当然,我们也可以创建我们自己的工作者线程。假如有某个任务会被频繁的触发,那么便可以为它创建一个专门的工作者线程,比如触摸屏CTP
    
然而在2.6.36之后的内核当中对工作队列子系统作了改变,采用的机制改变为并发管理工作队列机制(Concurrency Managed Workqueue (cmwq))。在原来的机制当中,当kernel需要创建一个workqueuecreate_workqueue()方式)的时候,它会在每一个cpu上创建一个work_thread,为每一个cpu分配一个struct cpu_workqueue_struct,随着Kernel创建越来越多的workqueue,这将占用大量的的内存资源,并且加重了进程调度的任务量。而在新的工作队列机制中它不再在每次create_workqueue时都为workqueue创建一个work thread,而是在系统启动的时候给每个cpu创建一个work thread,当有任务项work_struct需要处理时,系统会将任务项work_struct交给某个处理器的work thread上去处理。


    首先来了解一下一个重要的数据结构:并发管理工作队列的后端 gcwq
    struct global_cwq {


spinlock_t lock; /* the gcwq lock */

struct list_head worklist; /* L: list of pending works 需要处理的work_struct都挂载在这个链表上

unsigned int cpu; /* I: the associated cpu 与gcwq相关联的cpu

unsigned int flags; /* L: GCWQ_* flags */


int nr_workers; /* L: total number of workers 总的工作者数量

int nr_idle; /* L: currently idle ones 当前空闲的工作者数量


/* workers are chained either in the idle_list or busy_hash */

struct list_head idle_list; /* X: list of idle workers 空闲的工作者链表

struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];正在执行的工作者线程的哈希表

/* L: hash of busy workers */


struct timer_list idle_timer; /* L: worker idle timeout */

struct timer_list mayday_timer; /* L: SOS timer for dworkers */


struct ida worker_ida; /* L: for worker IDs */


struct task_struct *trustee; /* L: for gcwq shutdown */

unsigned int trustee_state; /* L: trustee state */

wait_queue_head_t trustee_wait; /* trustee wait */

struct worker *first_idle; /* L: first idle worker */

} ____cacheline_aligned_in_smp;
    这个结构每个CPU都有一个,通常我们可以根据CPU在系统中的编号来获取到这个结构,并且 gcwq会将与它关联的CPU编号保存在其成员cpu当中。Gcwqs几乎会处理所有的工作队列的任务。
   
在系统启动过程中,会对gcwqs做一个初始化工作,在workqueue.cinit_workqueues(void)中:


{

unsigned int cpu;

int i;


cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);

cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);/*主要用于支持热插拔CPU系统,注册CPU热插拔事件,这样可以在CPU上线离线的时候创建或者销毁工作线程


/* initialize gcwqs */

for_each_gcwq_cpu(cpu) {//遍历每个CPU,初始化每个CPUgcwq

struct global_cwq *gcwq = get_gcwq(cpu);根据CPU编号获取它的gcwq结构


spin_lock_init(&gcwq->lock);

INIT_LIST_HEAD(&gcwq->worklist);//初始化挂载未处理工作项的链表

gcwq->cpu = cpu;//保存CPU编号

gcwq->flags |= GCWQ_DISASSOCIATED;//设置当前gcwq暂时不可用


INIT_LIST_HEAD(&gcwq->idle_list);初始化空闲工作者线程链表

for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)

INIT_HLIST_HEAD(&gcwq->busy_hash[i]);

/*初始化定时器*/

init_timer_deferrable(&gcwq->idle_timer);

gcwq->idle_timer.function = idle_worker_timeout;

gcwq->idle_timer.data = (unsigned long)gcwq;


setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,

    (unsigned long)gcwq);


ida_init(&gcwq->worker_ida);


gcwq->trustee_state = TRUSTEE_DONE;

init_waitqueue_head(&gcwq->trustee_wait);

}


/* create the initial worker 为当前活动的CPU创建工作者线程*/

for_each_online_gcwq_cpu(cpu) {

struct global_cwq *gcwq = get_gcwq(cpu);

struct worker *worker;

/*遍历每个CPU,根据CPU编号获取对应的gcwq,然后为每个CPU创建一个工作者线程,并利用gcwq及参数true将工作者线程与CPU绑定起来*/

if (cpu != WORK_CPU_UNBOUND)

gcwq->flags &= ~GCWQ_DISASSOCIATED;

worker = create_worker(gcwq, true);

BUG_ON(!worker);

spin_lock_irq(&gcwq->lock);

start_worker(worker);/*启动工作者线程,开始处理工作项,因为工作项是挂载在gcwq链表上的,因此会对gcwq的数据进行改动,所以这里需要锁来对gcwq作保护*/

spin_unlock_irq(&gcwq->lock);

}


/*通过传递不同的参数来创建用途不同的工作队列:

events:普通工作队列,大部分的延迟处理函数都在这个工作队列上运行,要求任务执行时间短,避免互相影响。

events_long:需要长时间运行的工作项可在这个工作队列上运行。

events_nrt:该工作队列上的任务是不可重入的,也就是该工作队列上的任务在某处理上执行的时候,它不会在其他处理上再执行这些任务。

events_unbound:该工作队列上的任务不会绑定在特定CPU上,只要某处理器空闲,就可以处理该工作队列上的任务。

events_freezable:类似于events,但工作项都是被挂起的

events_nrt_freezable:类似events_nrt。

system_wq = alloc_workqueue("events", 0, 0);

system_long_wq = alloc_workqueue("events_long", 0, 0);

system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);

system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,

    WQ_UNBOUND_MAX_ACTIVE);

system_freezable_wq = alloc_workqueue("events_freezable",

      WQ_FREEZABLE, 0);

system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable",

WQ_NON_REENTRANT | WQ_FREEZABLE, 0);

BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||

       !system_unbound_wq || !system_freezable_wq ||

!system_nrt_freezable_wq);

return 0;

}

    在上面我们看到了,系统在启动的时候会为每个gcwq创建一个工作者线程管理者worker,这个worker就是用来管理gcwq上挂载的工作项的:

                struct worker {

                /* on idle list while idle, on busy hash table while busy */

                union {

                struct list_head entry; /* L: while idle */

                struct hlist_node hentry; /* L: while busy */

                };

//与工作者线程的状态有关,当工作者线程空闲时,使用entry,但忙碌时,使用哈希节点hentry,它们分别对应gcwqidle_listbusy_hash[]

                struct work_struct *current_work; /* L: work being processed 当前正在处理的工作项

                struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq worker所对应的cpu_workqueue_struct

                struct list_head scheduled; /* L: scheduled works 被调度的工作项,只有进入该链表,工作项才能被工作队列处理。

                struct task_struct *task; /* I: worker task 被内核调度的线程实体,通过它参与到内核调度当中去,

                struct global_cwq *gcwq; /* I: the associated gcwq 相关联的gcwq

                /* 64 bytes boundary on 64bit, 32 on 32bit */

                unsigned long last_active; /* L: last active timestamp 最后的活动时间,用于决定该worker的生命周期

                unsigned int flags; /* X: flags */

                int id; /* I: worker id */

                struct work_struct rebind_work; /* L: rebind worker to cpu */

                };

在初始化gcwq过程当中会创建一个worker,而且worker中又有一个成员task_struct,因此我们可以想像的到在创建worker时会发生什么:

static struct worker *create_worker(struct global_cwq *gcwq, bool bind)

{

.........

worker = alloc_worker();//为一个worker分配内存并完成一些初始化工作

if (!worker)

goto fail;


worker->gcwq = gcwq;//绑定gcwq

worker->id = id;

//以下会根据workergcwqcpu标示是否on_unbound来创建worker的线程,使worker能参与到进程调度当中去。

if (!on_unbound_cpu)

worker->task = kthread_create_on_node(worker_thread,

      worker,

      cpu_to_node(gcwq->cpu),

      "kworker/%u:%d", gcwq->cpu, id);

else

worker->task = kthread_create(worker_thread, worker,

      "kworker/u:%d", id);

if (IS_ERR(worker->task))

goto fail;

.........

}

worker创建的工作者线程会在创建好后启动,开始处理工作项。工作者线程的主要工作在worker_thread中完成:

static int worker_thread(void *__worker)

{

struct worker *worker = __worker;

struct global_cwq *gcwq = worker->gcwq;


/* tell the scheduler that this is a workqueue worker */

worker->task->flags |= PF_WQ_WORKER;//告诉调度器这是一个工作者线程

woke_up:

spin_lock_irq(&gcwq->lock);//锁住gcwq,每个cpu只有一个gcwq结构,该CPU的其他线程同样有可能对它进行改变,因此这里需要加锁保护


/* DIE can be set only while we're idle, checking here is enough */

if (worker->flags & WORKER_DIE) {

spin_unlock_irq(&gcwq->lock);

worker->task->flags &= ~PF_WQ_WORKER;

return 0;

}

//首先让工作者线程的管理者从空闲状态退出,因为它现在要work了,这里的主要工作是清除worker的空闲状态标志,减少gcwqnr_idle数量,并将workerentry删除并重新初始化。

worker_leave_idle(worker);

recheck:

/* no more worker necessary? */

if (!need_more_worker(gcwq))//这里会对gcwq进行检查:gcwq上的worklist上是否挂载有未处理的工作项,如果没有,说明当前工作者线程无事可做,睡眠。检查当前CPUgcwqworker如果有更高优先级的工作要处理,且系统的全局队列中已经没有空闲的worker了,那么这时应该需要一个新的worker

goto sleep;

if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))

goto recheck;

BUG_ON(!list_empty(&worker->scheduled));

worker_clr_flags(worker, WORKER_PREP);

//这里做一些处理工作项work_struct的准备工作如查看gcwq的空闲工作者数量,提示worker上不能有准备调度的工作项,清除和设置worker的一些标志。

do {

struct work_struct *work =

list_first_entry(&gcwq->worklist,

 struct work_struct, entry);//gcwqworklist链表上获取未处理的工作项work_struct

if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {

/* optimization path, not strictly necessary */

process_one_work(worker, work);

if (unlikely(!list_empty(&worker->scheduled)))

process_scheduled_works(worker);

} else {

move_linked_works(work, &worker->scheduled, NULL);

process_scheduled_works(worker);

}

} while (keep_working(gcwq));

在这个do while()循环里处理挂载在gcwq上的未处理的工作项。一般情况下,会在do while()循环里把gcwqworklist的所有未处理完的工作项都处理完后终止。在循环中,它通过process_one_work(worker, work)逐个的将工作项处理掉,process_scheduled_works(worker)内部同样是调用process_one_work(worker, work):

static void process_one_work(struct worker *worker, struct work_struct *work)

__releases(&gcwq->lock)

__acquires(&gcwq->lock)

{

........

work_func_t f = work->func;

........

trace_workqueue_execute_start(work);

f(work);

trace_workqueue_execute_end(work);

.........

}

因此,process_one_work()的最终目的是执行work_structfunc函数,也就是每个工作项需要完成的工作。


工作队列的使用:

工作项:

当然首先我们先来了解一下工作项:

struct work_struct {

atomic_long_t data;//work_struct 的参数

struct list_head entry;//work_struct 的连接链表

work_func_t func;//工作项处理函数

#ifdef CONFIG_LOCKDEP

struct lockdep_map lockdep_map;

#endif

};
有两种方法来创建工作项:
静态:DECLARE_WORK(struct work_struct *work,void(*func) (void *));
动态:INIT_WORK(struct work_struct *work,void(*func)(void *));
无论是静态还是动态的方法,它们的主要任务就是初始化好一个work_struct结构:
初始化work_structdataentry值,并且将func指向一个可执行函数:


do {

__init_work((_work), _onstack);

(_work)->data = (atomic_long_t) WORK_DATA_INIT();

INIT_LIST_HEAD(&(_work)->entry);

PREPARE_WORK((_work), (_func));

} while (0);
当一个工作项被初始化好之后,表明它可以进入工作状态了,这时候我们就可以调度它了,如果我们把它交给系统默认线程去执行:


Schedule_work(work);

{

return queue_work(system_wq, work);

}
从代码可知,当调度执行工作项的时候,其实是去queue_work(system_wq,work),而从前面的初始化gcwqs过程中可知,system_wq是在系统启动过程中创建的一个普通的工作队列,也就是说,我们这里初始化的工作项会交给
统启动过程中的普通工作队列的工作者线程去处理。
当然,但我们有足够的理由需要去创建一个新的工作队列的时候我们可以:


Create_workqueue(const char *name);

创建一个名字为name的工作队列;

#define create_workqueue(name)

alloc_workqueue((name), WQ_MEM_RECLAIM, 1) //在初始化gcwqs时使用同样的方式创建了几个默认的系统工作队列。

创建工作队列的主要工作在struct workqueue_struct *__alloc_workqueue_key(const char *fmt,

       unsigned int flags,

       int max_active,

       struct lock_class_key *key,

       const char *lock_name, ...)

{

.........

wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL);

if (!wq)

goto err;

//为一个workqueue_struct分配内存。

vsnprintf(wq->name, namelen, fmt, args1);//初始化wqname

/* init wq */

wq->flags = flags;

wq->saved_max_active = max_active;

mutex_init(&wq->flush_mutex);

atomic_set(&wq->nr_cwqs_to_flush, 0);

INIT_LIST_HEAD(&wq->flusher_queue);

INIT_LIST_HEAD(&wq->flusher_overflow);


lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);

INIT_LIST_HEAD(&wq->list);//初始化wq的主要成员

......

for_each_cwq_cpu(cpu, wq) {

struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

struct global_cwq *gcwq = get_gcwq(cpu);


BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);

cwq->gcwq = gcwq;

cwq->wq = wq;

cwq->flush_color = -1;

cwq->max_active = max_active;

INIT_LIST_HEAD(&cwq->delayed_works);

}

//per-cpu workqueuewqgcwq关联起来。Cpu_workqueque_struct中保存了与CPU关联的gcwq,在这里,又关联了wq

......

list_add(&wq->list, &workqueues);//最后把wq放到一个workqueues的链表上。

.........

Return wq;

}

它会返回一个初始化好的工作队列wq
在整个创建workqueue的流程当中我们可以看到,它并没有为新的workqueue去创建一个工作者线程,而是将wqcpu_workqueue_struct关联起来,在这个cpu_workqueue_struct结构中还关联了gcwq,然后把workqueue放到workqueues链表上去。
创建好了我们自己的工作队列之后,我们可以用自己的工作队列去执行工作项:
Queue_work(struct workqueue_struct *wq,struct wori_struct *work);
与上面的使用系统默认的工作队列调度工作项比较,就是使用了自己创建的而不是系统开机启动时创建的工作队列。
现在我们来看看系统到底是怎么去执行工作队列的吧:


Queue_work()->queue_work_on()->__queue_work()

{

struct global_cwq *gcwq;

struct cpu_workqueue_struct *cwq;

struct list_head *worklist;

unsigned int work_flags;

unsigned long flags;


debug_work_activate(work);


/* if dying, only works from the same workqueue are allowed */

if (unlikely(wq->flags & WQ_DRAINING) &&

    WARN_ON_ONCE(!is_chained_work(wq)))

return;


/* determine gcwq to use */

if (!(wq->flags & WQ_UNBOUND)) {

struct global_cwq *last_gcwq;


if (unlikely(cpu == WORK_CPU_UNBOUND))

cpu = raw_smp_processor_id();


/*

 * It's multi cpu.  If @wq is non-reentrant and @work

 * was previously on a different cpu, it might still

 * be running there, in which case the work needs to

 * be queued on that cpu to guarantee non-reentrance.

 */

gcwq = get_gcwq(cpu);

if (wq->flags & WQ_NON_REENTRANT &&

    (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {

struct worker *worker;


spin_lock_irqsave(&last_gcwq->lock, flags);


worker = find_worker_executing_work(last_gcwq, work);


if (worker && worker->current_cwq->wq == wq)

gcwq = last_gcwq;

else {

/* meh... not running there, queue here */

spin_unlock_irqrestore(&last_gcwq->lock, flags);

spin_lock_irqsave(&gcwq->lock, flags);

}

} else

spin_lock_irqsave(&gcwq->lock, flags);

} else {

gcwq = get_gcwq(WORK_CPU_UNBOUND);//

spin_lock_irqsave(&gcwq->lock, flags);

}

//前面这部分代码的主要功能是根据wq的标志位来获取合适的gcwq

/* gcwq determined, get cwq and queue */

cwq = get_cwq(gcwq->cpu, wq);

trace_workqueue_queue_work(cpu, cwq, work);


if (WARN_ON(!list_empty(&work->entry))) {

spin_unlock_irqrestore(&gcwq->lock, flags);

return;

}


cwq->nr_in_flight[cwq->work_color]++;

work_flags = work_color_to_flags(cwq->work_color);


if (likely(cwq->nr_active < cwq->max_active)) {

trace_workqueue_activate_work(work);

cwq->nr_active++;

worklist = gcwq_determine_ins_pos(gcwq, cwq);//gcwqworklist上(也就是未执行工作项链表)找到一个合适的位置

} else {

work_flags |= WORK_STRUCT_DELAYED;

worklist = &cwq->delayed_works;

}


insert_work(cwq, work, worklist, work_flags);//将工作项插入到对应gcwqworklist的合适位置上。


spin_unlock_irqrestore(&gcwq->lock, flags);

}

因此,用一句简单的句可以概括queue_work(),就是把工作项work放到与wq关联的gcwq的未执行工作项链表上去。前面有分析,gcwq对应的工作者线程在被调度的时候会把gcwq上的未执行的工作项都执行。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多