工作队列
工作队列是另一种将工作推后执行的形式,它可以把工作交给一个内核线程去执行,这个下半部是在进程上下文中执行的,因此,它可以重新调度还有睡眠。区分使用软中断/tasklet还是工作队列比较简单,如果推后的工作不需要睡眠,那么就选择软中断或tasklet,但如果需要一个可以重新调度,可以睡眠,可以获取内存,可以获取信号量,可以执行阻塞式I/O操作时,那么,请选择工作队列吧! 在老的内核当中(2.6.36之前)工作队列是内核创建一个专门的工作者线程,它有一条任务链表,当设备或者内核某进程有部分任务需要推后处理的时候就把任务挂载在工作者线程的任务链表上,然后会在未来的某个时刻,工作者线程被调度,它就会去处理挂载在任务链表上的任务了。 内核有一个缺省的工作者线程叫events/n,n是处理器的编号:每个处理器对应一个线程。如单处理器只有一个events/0,而双处理器会多出一个events/1。当然,我们也可以创建我们自己的工作者线程。假如有某个任务会被频繁的触发,那么便可以为它创建一个专门的工作者线程,比如触摸屏CTP。 然而在2.6.36之后的内核当中对工作队列子系统作了改变,采用的机制改变为并发管理工作队列机制(Concurrency Managed Workqueue (cmwq))。在原来的机制当中,当kernel需要创建一个workqueue(create_workqueue()方式)的时候,它会在每一个cpu上创建一个work_thread,为每一个cpu分配一个struct cpu_workqueue_struct,随着Kernel创建越来越多的workqueue,这将占用大量的的内存资源,并且加重了进程调度的任务量。而在新的工作队列机制中它不再在每次create_workqueue时都为workqueue创建一个work thread,而是在系统启动的时候给每个cpu创建一个work thread,当有任务项work_struct需要处理时,系统会将任务项work_struct交给某个处理器的work thread上去处理。
首先来了解一下一个重要的数据结构:并发管理工作队列的后端 gcwq
spinlock_t lock; /* the gcwq lock */ struct list_head worklist; /* L: list of pending works 需要处理的work_struct都挂载在这个链表上 unsigned int cpu; /* I: the associated cpu 与gcwq相关联的cpu unsigned int flags; /* L: GCWQ_* flags */
int nr_workers; /* L: total number of workers 总的工作者数量 int nr_idle; /* L: currently idle ones 当前空闲的工作者数量
/* workers are chained either in the idle_list or busy_hash */ struct list_head idle_list; /* X: list of idle workers 空闲的工作者链表 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];正在执行的工作者线程的哈希表 /* L: hash of busy workers */
struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list mayday_timer; /* L: SOS timer for dworkers */
struct ida worker_ida; /* L: for worker IDs */
struct task_struct *trustee; /* L: for gcwq shutdown */ unsigned int trustee_state; /* L: trustee state */ wait_queue_head_t trustee_wait; /* trustee wait */ struct worker *first_idle; /* L: first idle worker */
} ____cacheline_aligned_in_smp;
{ unsigned int cpu; int i;
cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);/*主要用于支持热插拔CPU系统,注册CPU热插拔事件,这样可以在CPU上线离线的时候创建或者销毁工作线程
/* initialize gcwqs */ for_each_gcwq_cpu(cpu) {//遍历每个CPU,初始化每个CPU的gcwq struct global_cwq *gcwq = get_gcwq(cpu);根据CPU编号获取它的gcwq结构
spin_lock_init(&gcwq->lock); INIT_LIST_HEAD(&gcwq->worklist);//初始化挂载未处理工作项的链表 gcwq->cpu = cpu;//保存CPU编号 gcwq->flags |= GCWQ_DISASSOCIATED;//设置当前gcwq暂时不可用
INIT_LIST_HEAD(&gcwq->idle_list);初始化空闲工作者线程链表 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) INIT_HLIST_HEAD(&gcwq->busy_hash[i]); /*初始化定时器*/ init_timer_deferrable(&gcwq->idle_timer); gcwq->idle_timer.function = idle_worker_timeout; gcwq->idle_timer.data = (unsigned long)gcwq;
setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout, (unsigned long)gcwq);
ida_init(&gcwq->worker_ida);
gcwq->trustee_state = TRUSTEE_DONE; init_waitqueue_head(&gcwq->trustee_wait); }
/* create the initial worker 为当前活动的CPU创建工作者线程*/ for_each_online_gcwq_cpu(cpu) { struct global_cwq *gcwq = get_gcwq(cpu); struct worker *worker; /*遍历每个CPU,根据CPU编号获取对应的gcwq,然后为每个CPU创建一个工作者线程,并利用gcwq及参数true将工作者线程与CPU绑定起来*/ if (cpu != WORK_CPU_UNBOUND) gcwq->flags &= ~GCWQ_DISASSOCIATED; worker = create_worker(gcwq, true); BUG_ON(!worker); spin_lock_irq(&gcwq->lock); start_worker(worker);/*启动工作者线程,开始处理工作项,因为工作项是挂载在gcwq链表上的,因此会对gcwq的数据进行改动,所以这里需要锁来对gcwq作保护*/ spin_unlock_irq(&gcwq->lock); }
/*通过传递不同的参数来创建用途不同的工作队列: events:普通工作队列,大部分的延迟处理函数都在这个工作队列上运行,要求任务执行时间短,避免互相影响。 events_long:需要长时间运行的工作项可在这个工作队列上运行。 events_nrt:该工作队列上的任务是不可重入的,也就是该工作队列上的任务在某处理上执行的时候,它不会在其他处理上再执行这些任务。 events_unbound:该工作队列上的任务不会绑定在特定CPU上,只要某处理器空闲,就可以处理该工作队列上的任务。 events_freezable:类似于events,但工作项都是被挂起的 events_nrt_freezable:类似events_nrt。 system_wq = alloc_workqueue("events", 0, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable", WQ_NON_REENTRANT | WQ_FREEZABLE, 0); BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq || !system_unbound_wq || !system_freezable_wq || !system_nrt_freezable_wq); return 0; } 在上面我们看到了,系统在启动的时候会为每个gcwq创建一个工作者线程管理者worker,这个worker就是用来管理gcwq上挂载的工作项的: struct worker { /* on idle list while idle, on busy hash table while busy */ union { struct list_head entry; /* L: while idle */ struct hlist_node hentry; /* L: while busy */ }; //与工作者线程的状态有关,当工作者线程空闲时,使用entry,但忙碌时,使用哈希节点hentry,它们分别对应gcwq的idle_list和busy_hash[]。 struct work_struct *current_work; /* L: work being processed 当前正在处理的工作项 struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq worker所对应的cpu_workqueue_struct。 struct list_head scheduled; /* L: scheduled works 被调度的工作项,只有进入该链表,工作项才能被工作队列处理。 struct task_struct *task; /* I: worker task 被内核调度的线程实体,通过它参与到内核调度当中去, struct global_cwq *gcwq; /* I: the associated gcwq 相关联的gcwq /* 64 bytes boundary on 64bit, 32 on 32bit */ unsigned long last_active; /* L: last active timestamp 最后的活动时间,用于决定该worker的生命周期 unsigned int flags; /* X: flags */ int id; /* I: worker id */ struct work_struct rebind_work; /* L: rebind worker to cpu */ }; 在初始化gcwq过程当中会创建一个worker,而且worker中又有一个成员task_struct,因此我们可以想像的到在创建worker时会发生什么: static struct worker *create_worker(struct global_cwq *gcwq, bool bind) { ......... worker = alloc_worker();//为一个worker分配内存并完成一些初始化工作 if (!worker) goto fail;
worker->gcwq = gcwq;//绑定gcwq worker->id = id; //以下会根据worker的gcwq的cpu标示是否on_unbound来创建worker的线程,使worker能参与到进程调度当中去。 if (!on_unbound_cpu) worker->task = kthread_create_on_node(worker_thread, worker, cpu_to_node(gcwq->cpu), "kworker/%u:%d", gcwq->cpu, id); else worker->task = kthread_create(worker_thread, worker, "kworker/u:%d", id); if (IS_ERR(worker->task)) goto fail; ......... } 为worker创建的工作者线程会在创建好后启动,开始处理工作项。工作者线程的主要工作在worker_thread中完成: static int worker_thread(void *__worker) { struct worker *worker = __worker; struct global_cwq *gcwq = worker->gcwq;
/* tell the scheduler that this is a workqueue worker */ worker->task->flags |= PF_WQ_WORKER;//告诉调度器这是一个工作者线程 woke_up: spin_lock_irq(&gcwq->lock);//锁住gcwq,每个cpu只有一个gcwq结构,该CPU的其他线程同样有可能对它进行改变,因此这里需要加锁保护
/* DIE can be set only while we're idle, checking here is enough */ if (worker->flags & WORKER_DIE) { spin_unlock_irq(&gcwq->lock); worker->task->flags &= ~PF_WQ_WORKER; return 0; } //首先让工作者线程的管理者从空闲状态退出,因为它现在要work了,这里的主要工作是清除worker的空闲状态标志,减少gcwq的nr_idle数量,并将worker的entry删除并重新初始化。 worker_leave_idle(worker); recheck: /* no more worker necessary? */ if (!need_more_worker(gcwq))//这里会对gcwq进行检查:gcwq上的worklist上是否挂载有未处理的工作项,如果没有,说明当前工作者线程无事可做,睡眠。检查当前CPU的gcwq的worker如果有更高优先级的工作要处理,且系统的全局队列中已经没有空闲的worker了,那么这时应该需要一个新的worker。 goto sleep; if (unlikely(!may_start_working(gcwq)) && manage_workers(worker)) goto recheck; BUG_ON(!list_empty(&worker->scheduled)); worker_clr_flags(worker, WORKER_PREP); //这里做一些处理工作项work_struct的准备工作如查看gcwq的空闲工作者数量,提示worker上不能有准备调度的工作项,清除和设置worker的一些标志。 do { struct work_struct *work = list_first_entry(&gcwq->worklist, struct work_struct, entry);//从gcwq的worklist链表上获取未处理的工作项work_struct if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { /* optimization path, not strictly necessary */ process_one_work(worker, work); if (unlikely(!list_empty(&worker->scheduled))) process_scheduled_works(worker); } else { move_linked_works(work, &worker->scheduled, NULL); process_scheduled_works(worker); } } while (keep_working(gcwq)); 在这个do while()循环里处理挂载在gcwq上的未处理的工作项。一般情况下,会在do while()循环里把gcwq的worklist的所有未处理完的工作项都处理完后终止。在循环中,它通过process_one_work(worker, work)逐个的将工作项处理掉,process_scheduled_works(worker)内部同样是调用process_one_work(worker, work): static void process_one_work(struct worker *worker, struct work_struct *work) __releases(&gcwq->lock) __acquires(&gcwq->lock) { ........ work_func_t f = work->func; ........ trace_workqueue_execute_start(work); f(work); trace_workqueue_execute_end(work); ......... } 因此,process_one_work()的最终目的是执行work_struct的func函数,也就是每个工作项需要完成的工作。
工作队列的使用: 工作项: 当然首先我们先来了解一下工作项: struct work_struct { atomic_long_t data;//work_struct 的参数 struct list_head entry;//work_struct 的连接链表 work_func_t func;//工作项处理函数 #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif
};
do { __init_work((_work), _onstack); (_work)->data = (atomic_long_t) WORK_DATA_INIT(); INIT_LIST_HEAD(&(_work)->entry); PREPARE_WORK((_work), (_func));
} while (0);
Schedule_work(work); { return queue_work(system_wq, work);
}
Create_workqueue(const char *name); 创建一个名字为name的工作队列; #define create_workqueue(name) alloc_workqueue((name), WQ_MEM_RECLAIM, 1) //在初始化gcwqs时使用同样的方式创建了几个默认的系统工作队列。 创建工作队列的主要工作在struct workqueue_struct *__alloc_workqueue_key(const char *fmt, unsigned int flags, int max_active, struct lock_class_key *key, const char *lock_name, ...) { ......... wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL); if (!wq) goto err; //为一个workqueue_struct分配内存。 vsnprintf(wq->name, namelen, fmt, args1);//初始化wq的name /* init wq */ wq->flags = flags; wq->saved_max_active = max_active; mutex_init(&wq->flush_mutex); atomic_set(&wq->nr_cwqs_to_flush, 0); INIT_LIST_HEAD(&wq->flusher_queue); INIT_LIST_HEAD(&wq->flusher_overflow);
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); INIT_LIST_HEAD(&wq->list);//初始化wq的主要成员 ...... for_each_cwq_cpu(cpu, wq) { struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); struct global_cwq *gcwq = get_gcwq(cpu);
BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); cwq->gcwq = gcwq; cwq->wq = wq; cwq->flush_color = -1; cwq->max_active = max_active; INIT_LIST_HEAD(&cwq->delayed_works); } //用per-cpu workqueue将wq和gcwq关联起来。Cpu_workqueque_struct中保存了与CPU关联的gcwq,在这里,又关联了wq。 ...... list_add(&wq->list, &workqueues);//最后把wq放到一个workqueues的链表上。 ......... Return wq; }
它会返回一个初始化好的工作队列wq。
Queue_work()->queue_work_on()->__queue_work() { struct global_cwq *gcwq; struct cpu_workqueue_struct *cwq; struct list_head *worklist; unsigned int work_flags; unsigned long flags;
debug_work_activate(work);
/* if dying, only works from the same workqueue are allowed */ if (unlikely(wq->flags & WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq))) return;
/* determine gcwq to use */ if (!(wq->flags & WQ_UNBOUND)) { struct global_cwq *last_gcwq;
if (unlikely(cpu == WORK_CPU_UNBOUND)) cpu = raw_smp_processor_id();
/* * It's multi cpu. If @wq is non-reentrant and @work * was previously on a different cpu, it might still * be running there, in which case the work needs to * be queued on that cpu to guarantee non-reentrance. */ gcwq = get_gcwq(cpu); if (wq->flags & WQ_NON_REENTRANT && (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { struct worker *worker;
spin_lock_irqsave(&last_gcwq->lock, flags);
worker = find_worker_executing_work(last_gcwq, work);
if (worker && worker->current_cwq->wq == wq) gcwq = last_gcwq; else { /* meh... not running there, queue here */ spin_unlock_irqrestore(&last_gcwq->lock, flags); spin_lock_irqsave(&gcwq->lock, flags); } } else spin_lock_irqsave(&gcwq->lock, flags); } else { gcwq = get_gcwq(WORK_CPU_UNBOUND);// spin_lock_irqsave(&gcwq->lock, flags); } //前面这部分代码的主要功能是根据wq的标志位来获取合适的gcwq。 /* gcwq determined, get cwq and queue */ cwq = get_cwq(gcwq->cpu, wq); trace_workqueue_queue_work(cpu, cwq, work);
if (WARN_ON(!list_empty(&work->entry))) { spin_unlock_irqrestore(&gcwq->lock, flags); return; }
cwq->nr_in_flight[cwq->work_color]++; work_flags = work_color_to_flags(cwq->work_color);
if (likely(cwq->nr_active < cwq->max_active)) { trace_workqueue_activate_work(work); cwq->nr_active++; worklist = gcwq_determine_ins_pos(gcwq, cwq);//在gcwq的worklist上(也就是未执行工作项链表)找到一个合适的位置 } else { work_flags |= WORK_STRUCT_DELAYED; worklist = &cwq->delayed_works; }
insert_work(cwq, work, worklist, work_flags);//将工作项插入到对应gcwq的worklist的合适位置上。
spin_unlock_irqrestore(&gcwq->lock, flags); } 因此,用一句简单的句可以概括queue_work(),就是把工作项work放到与wq关联的gcwq的未执行工作项链表上去。前面有分析,gcwq对应的工作者线程在被调度的时候会把gcwq上的未执行的工作项都执行。 |
|