/* prevent userland from meddling with cpumask of workqueue workers */ worker->task->flags |= PF_NO_SETAFFINITY;
// (2.5) 将worker和worker_pool绑定 /* successful, attach the worker to the pool */ worker_attach_to_pool(worker, pool);
// (2.6) 将worker初始状态设置成idle, // wake_up_process以后,worker自动leave idle状态 /* start the newly created worker */ spin_lock_irq(&pool->lock); worker->pool->nr_workers++; worker_enter_idle(worker); wake_up_process(worker->task); spin_unlock_irq(&pool->lock);
return worker;
fail: if (id >= 0) ida_simple_remove(&pool->worker_ida, id); kfree(worker); returnNULL; } || → staticvoidworker_attach_to_pool(struct worker *worker, struct worker_pool *pool) { mutex_lock(&pool->attach_mutex);
// (2.5.1) 将worker线程和cpu绑定 /* * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any * online CPUs. It'll be re-applied when any of the CPUs come up. */ set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
/* * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains * stable across this function. See the comments above the * flag definition for details. */ if (pool->flags & POOL_DISASSOCIATED) worker->flags |= WORKER_UNBOUND;
// (1) 初始化normal和high nice对应的unbound attrs /* create default unbound and ordered wq attrs */ for (i = 0; i < NR_STD_WORKER_POOLS; i++) { structworkqueue_attrs *attrs;
// (2) unbound_std_wq_attrs BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; unbound_std_wq_attrs[i] = attrs; /* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ // (3) ordered_wq_attrs,no_numa = true; BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs; }
// (4) 如果是WQ_MEM_RECLAIM类型的workqueue // 创建对应的rescuer_thread()内核进程 /* * Workqueues which may be used during memory reclaim should * have a rescuer to guarantee forward progress. */ if (flags & WQ_MEM_RECLAIM) { structworker *rescuer;
rescuer = alloc_worker(NUMA_NO_NODE); if (!rescuer) goto err_destroy;
/* * wq_pool_mutex protects global freeze state and workqueues list. * Grab it, adjust max_active and add the new @wq to workqueues * list. */ mutex_lock(&wq_pool_mutex);
mutex_lock(&wq->mutex); // 将pool_workqueue和workqueue链接起来 link_pwq(pwq); mutex_unlock(&wq->mutex); } return0; } elseif (wq->flags & __WQ_ORDERED) { // (3.2) unbound ordered_wq workqueue // pool_workqueue链接workqueue和worker_pool的过程 ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), 'ordering guarantee broken for workqueue %s\n', wq->name); return ret; } else { // (3.3) unbound unbound_std_wq workqueue // pool_workqueue链接workqueue和worker_pool的过程 return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); } } || → intapply_workqueue_attrs(struct workqueue_struct *wq, const struct workqueue_attrs *attrs) {
// (3.2.1) 根据的ubound的ordered_wq_attrs/unbound_std_wq_attrs // 创建对应的pool_workqueue和worker_pool // 其中worker_pool不是默认创建好的,是需要动态创建的,对应的worker内核进程也要重新创建 // 创建好的pool_workqueue赋值给pwq_tbl[node] /* * If something goes wrong during CPU up/down, we'll fall back to * the default pwq covering whole @att- kernel/workqueue.c: - __alloc_workqueue_key() -> alloc_and_link_pwqs() -> apply_workqueue_attrs() -> alloc_unbound_pwq()/numa_pwq_tbl_install()rs->cpumask. Always create * it even if we don't use it immediately. */ dfl_pwq = alloc_unbound_pwq(wq, new_attrs); if (!dfl_pwq) goto enomem_pwq;
/* save the previous pwq and install the new one */ // (3.2.2) 将临时pwq_tbl[node]赋值给wq->numa_pwq_tbl[node] for_each_node(node) pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
/* tell the scheduler that this is a workqueue worker */ worker->task->flags |= PF_WQ_WORKER; woke_up: spin_lock_irq(&pool->lock);
// (1) 是否die /* am I supposed to die? */ if (unlikely(worker->flags & WORKER_DIE)) { spin_unlock_irq(&pool->lock); WARN_ON_ONCE(!list_empty(&worker->entry)); worker->task->flags &= ~PF_WQ_WORKER;
// (3) 如果需要本worker继续执行则继续,否则进入idle状态 // need more worker的条件: (pool->worklist != 0) && (pool->nr_running == 0) // worklist上有work需要执行,并且现在没有处于running的work /* no more worker necessary? */ if (!need_more_worker(pool)) goto sleep;
// (4) 如果(pool->nr_idle == 0),则启动创建更多的worker // 说明idle队列中已经没有备用worker了,先创建 一些worker备用 /* do we need to manage? */ if (unlikely(!may_start_working(pool)) && manage_workers(worker)) goto recheck;
/* * ->scheduled list can only be filled while a worker is * preparing to process a work or actually processing it. * Make sure nobody diddled with it while I was sleeping. */ WARN_ON_ONCE(!list_empty(&worker->scheduled));
/* * Finish PREP stage. We're guaranteed to have at least one idle * worker or that someone else has already assumed the manager * role. This is where @worker starts participating in concurrency * management if applicable and concurrency management is restored * after being rebound. See rebind_workers() for details. */ worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
worker_set_flags(worker, WORKER_PREP);supposed sleep: // (9) worker进入idle状态 /* * pool->lock is held and there's no work to process and no need to * manage, sleep. Workers are woken up only while holding * pool->lock or from local cpu, so setting the current state * before releasing pool->lock is enough to prevent losing any * event. */ worker_enter_idle(worker); __set_current_state(TASK_INTERRUPTIBLE); spin_unlock_irq(&pool->lock); schedule(); goto woke_up; } | → staticvoidprocess_one_work(struct worker *worker, struct work_struct *work) __releases(&pool->lock) __acquires(&pool->lock) { structpool_workqueue *pwq = get_work_pwq(work); structworker_pool *pool = worker->pool; bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE; int work_color; structworker *collision; #ifdef CONFIG_LOCKDEP /* * It is permissible to free the struct work_struct from * inside the function that is called from it, this we need to * take into account for lockdep too. To avoid bogus 'held * lock freed' warnings as well as problems when looking into * work->lockdep_map, make a copy and use that here. */ structlockdep_maplockdep_map;
lockdep_copy_map(&lockdep_map, &work->lockdep_map); #endif /* ensure we're on the correct CPU */ WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) && raw_smp_processor_id() != pool->cpu);
// (8.1) 如果work已经在worker_pool的其他worker上执行, // 将work放入对应worker的scheduled队列中延后执行 /* * A single work shouldn't be executed concurrently by * multiple workers on a single cpu. Check whether anyone is * already processing the work. If so, defer the work to the * currently executing one. */ collision = find_worker_executing_work(pool, work); if (unlikely(collision)) { move_linked_works(work, &collision->scheduled, NULL); return; }
// (8.3) 如果work所在的wq是cpu密集型的WQ_CPU_INTENSIVE // 则当前work的执行脱离worker_pool的动态调度,成为一个独立的线程 /* * CPU intensive works don't participate in concurrency management. * They're the scheduler's responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */ if (unlikely(cpu_intensive)) worker_set_flags(worker, WORKER_CPU_INTENSIVE);
// (8.4) 在UNBOUND或者CPU_INTENSIVE work中判断是否需要唤醒idle worker // 普通work不会执行这个操作 /* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */ if (need_more_worker(pool)) wake_up_worker(pool);
/* * Record the last pool and clear PENDING which should be the last * update to @work. Also, do this inside @pool->lock so that * PENDING and queued state changes happen together while IRQ is * disabled. */ set_work_pool_and_clear_pending(work, pool->id);
spin_unlock_irq(&pool->lock);
lock_map_acquire_read(&pwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); trace_workqueue_execute_start(work); // (8.5) 执行work函数 worker->current_func(work); /* * While we must be careful to not use 'work' after this, the trace * point will only record its address. */ trace_workqueue_execute_end(work); lock_map_release(&lockdep_map); lock_map_release(&pwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { pr_err('BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n' ' last function: %pf\n', current->comm, preempt_count(), task_pid_nr(current), worker->current_func); debug_show_held_locks(current); dump_stack(); }
/* * The following prevents a kworker from hogging CPU on !PREEMPT * kernels, where a requeueing work item waiting for something to * happen could deadlock with stop_machine as such work item could * indefinitely requeue itself while all other CPUs are trapped in * stop_machine. At the same time, report a quiescent RCU state so * the same condition doesn't freeze RCU. */ cond_resched_rcu_qs();
spin_lock_irq(&pool->lock);
/* clear cpu intensive status */ if (unlikely(cpu_intensive)) worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
/* * Rescuers, which may not have all the fields set up like normal * workers, also reach here, let's not access anything before * checking NOT_RUNNING. */ if (worker->flags & WORKER_NOT_RUNNING) returnNULL;
pool = worker->pool;
/* this can only happen on the local cpu */ if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu)) returnNULL;
/* * The counterpart of the following dec_and_test, implied mb, * worklist not empty test sequence is in insert_work(). * Please read comment there. * * NOT_RUNNING is clear. This means that we're bound to and * running on the local cpu w/ rq lock held and preemption * disabled, which in turn means that none else could be * manipulating idle_list, so dereferencing idle_list without pool * lock is safe. */ // 减少worker_pool中running的worker数量 // 如果worklist还有work需要处理,唤醒第一个idle worker进行处理 if (atomic_dec_and_test(&pool->nr_running) && !list_empty(&pool->worklist)) to_wakeup = first_idle_worker(pool); return to_wakeup ? to_wakeup->task : NULL; }
// (1) 设置当前worker的WORKER_CPU_INTENSIVE标志 // nr_running会被减1 // 对worker_pool来说,当前worker相当于进入了suspend状态 /* * CPU intensive works don't participate in concurrency management. * They're the scheduler's responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */ if (unlikely(cpu_intensive)) worker_set_flags(worker, WORKER_CPU_INTENSIVE);
// (2) 接上一步,判断是否需要唤醒新的worker来处理work /* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */ if (need_more_worker(pool)) wake_up_worker(pool);
// (3) 执行work worker->current_func(work);
// (4) 执行完,清理当前worker的WORKER_CPU_INTENSIVE标志 // 当前worker重新进入running状态 /* clear cpu intensive status */ if (unlikely(cpu_intensive)) worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
/* * If transitioning out of NOT_RUNNING, increment nr_running. Note * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask * of multiple flags, not a single flag. */ if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING)) if (!(worker->flags & WORKER_NOT_RUNNING)) atomic_inc(&pool->nr_running); }
} | → staticintworkqueue_cpu_down_callback(struct notifier_block *nfb, unsignedlong action, void *hcpu) { int cpu = (unsignedlong)hcpu; structwork_structunbind_work; structworkqueue_struct *wq;
switch (action & ~CPU_TASKS_FROZEN) { case CPU_DOWN_PREPARE: /* unbinding per-cpu workers should happen on the local CPU */ INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn); // (1) cpu down_prepare // 把和当前cpu绑定的normal worker_pool上的worker停工 // 随着当前cpu被down掉,这些worker会迁移到其他cpu上 queue_work_on(cpu, system_highpri_wq, &unbind_work);
// (2) unbound wq对cpu变化的更新 /* update NUMA affinity of unbound workqueues */ mutex_lock(&wq_pool_mutex); list_for_each_entry(wq, &workqueues, list) wq_update_unbound_numa(wq, cpu, false); mutex_unlock(&wq_pool_mutex);
/* wait for per-cpu unbinding to finish */ flush_work(&unbind_work); destroy_work_on_stack(&unbind_work); break; } return NOTIFY_OK; } | → staticintworkqueue_cpu_up_callback(struct notifier_block *nfb, unsignedlong action, void *hcpu) { int cpu = (unsignedlong)hcpu; structworker_pool *pool; structworkqueue_struct *wq; int pi;
switch (action & ~CPU_TASKS_FROZEN) { case CPU_UP_PREPARE: for_each_cpu_worker_pool(pool, cpu) { if (pool->nr_workers) continue; if (!create_worker(pool)) return NOTIFY_BAD; } break;
case CPU_DOWN_FAILED: case CPU_ONLINE: mutex_lock(&wq_pool_mutex);
// (3) cpu up for_each_pool(pool, pi) { mutex_lock(&pool->attach_mutex);
/* * While a work item is PENDING && off queue, a task trying to * steal the PENDING will busy-loop waiting for it to either get * queued or lose PENDING. Grabbing PENDING and queueing should * happen with IRQ disabled. */ WARN_ON_ONCE(!irqs_disabled());
debug_work_activate(work);
/* if draining, only works from the same workqueue are allowed */ if (unlikely(wq->flags & __WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq))) return; retry: // (1) 如果没有指定cpu,则使用当前cpu if (req_cpu == WORK_CPU_UNBOUND) cpu = raw_smp_processor_id();
/* pwq which will be used unless @work is executing elsewhere */ if (!(wq->flags & WQ_UNBOUND)) // (2) 对于normal wq,使用当前cpu对应的normal worker_pool pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); else // (3) 对于unbound wq,使用当前cpu对应node的worker_pool pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
// (4) 如果work在其他worker上正在被执行,把work压到对应的worker上去 // 避免work出现重入的问题 /* * If @work was previously on a different pool, it might still be * running there, in which case the work needs to be queued on that * pool to guarantee non-reentrancy. */ last_pool = get_work_pool(work); if (last_pool && last_pool != pwq->pool) { structworker *worker;
if (worker && worker->current_pwq->wq == wq) { pwq = worker->current_pwq; } else { /* meh... not running there, queue here */ spin_unlock(&last_pool->lock); spin_lock(&pwq->pool->lock); } } else { spin_lock(&pwq->pool->lock); }
/* * pwq is determined and locked. For unbound pools, we could have * raced with pwq release and it could already be dead. If its * refcnt is zero, repeat pwq selection. Note that pwqs never die * without another pwq replacing it in the numa_pwq_tbl or while * work items are executing on it, so the retrying is guaranteed to * make forward-progress. */ if (unlikely(!pwq->refcnt)) { if (wq->flags & WQ_UNBOUND) { spin_unlock(&pwq->pool->lock); cpu_relax(); goto retry; } /* oops */ WARN_ONCE(true, 'workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt', wq->name, cpu); }
/** * flush_work - wait for a work to finish executing the last queueing instance * @work: the work to flush * * Wait until @work has finished execution. @work is guaranteed to be idle * on return if it hasn't been requeued since flush started. * * Return: * %true if flush_work() waited for the work to finish execution, * %false if it was already idle. */ boolflush_work(struct work_struct *work) { structwq_barrierbarr;
// (1) 如果work所在worker_pool为NULL,说明work已经执行完 local_irq_disable(); pool = get_work_pool(work); if (!pool) { local_irq_enable(); returnfalse; }
spin_lock(&pool->lock); /* see the comment in try_to_grab_pending() with the same code */ pwq = get_work_pwq(work); if (pwq) { // (2) 如果work所在pwq指向的worker_pool不等于上一步得到的worker_pool,说明work已经执行完 if (unlikely(pwq->pool != pool)) goto already_gone; } else { // (3) 如果work所在pwq为NULL,并且也没有在当前执行的work中,说明work已经执行完 worker = find_worker_executing_work(pool, work); if (!worker) goto already_gone; pwq = worker->current_pwq; }
// (4) 如果work没有执行完,向work的后面插入barr work insert_wq_barrier(pwq, barr, work, worker); spin_unlock_irq(&pool->lock);
/* * If @max_active is 1 or rescuer is in use, flushing another work * item on the same workqueue may lead to deadlock. Make sure the * flusher is not running on the same workqueue by verifying write * access. */ if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer) lock_map_acquire(&pwq->wq->lockdep_map); else lock_map_acquire_read(&pwq->wq->lockdep_map); lock_map_release(&pwq->wq->lockdep_map);
/* * debugobject calls are safe here even with pool->lock locked * as we know for sure that this will not trigger any of the * checks and call back into the fixup functions where we * might deadlock. */ // (4.1) barr work的执行函数wq_barrier_func() INIT_WORK_ONSTACK(&barr->work, wq_barrier_func); __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); init_completion(&barr->done);
/* * If @target is currently being executed, schedule the * barrier to the worker; otherwise, put it after @target. */ // (4.2) 如果work当前在worker中执行,则barr work插入scheduled队列 if (worker) head = worker->scheduled.next; // 否则,则barr work插入正常的worklist队列中,插入位置在目标work后面 // 并且置上WORK_STRUCT_LINKED标志 else { unsignedlong *bits = work_data_bits(target);
head = target->entry.next; /* there can already be other linked works, inherit and set */ linked = *bits & WORK_STRUCT_LINKED; __set_bit(WORK_STRUCT_LINKED_BIT, bits); }
/* * If @delay is 0, queue @dwork->work immediately. This is for * both optimization and correctness. The earliest @timer can * expire is on the closest next tick and delayed_work users depend * on that there's no such delay when @delay is 0. */ if (!delay) { __queue_work(cpu, wq, &dwork->work); return; }