分享

linux 内核work工作队列实现

 XeonGate 2015-09-19
    写这篇blog的缘由是因为最近调试模块代码的时候,出现了kernel crash,堆栈如下:
PID: 0      TASK: ffffffff81a8d020  CPU: 0   COMMAND: "swapper"
 #0 [ffff8800282038b0] machine_kexec at ffffffff81035c0b
 #1 [ffff880028203910] crash_kexec at ffffffff810c0dd2
 #2 [ffff8800282039e0] oops_end at ffffffff81511680
 #3 [ffff880028203a10] die at ffffffff8100f19b
 #4 [ffff880028203a40] do_trap at ffffffff81510ee4
 #5 [ffff880028203aa0] do_invalid_op at ffffffff8100cdb5
 #6 [ffff880028203b40] invalid_op at ffffffff8100be5b
    [exception RIP: queue_work_on+73]
    RIP: ffffffff81091379  RSP: ffff880028203bf0  RFLAGS: 00010206
    RAX: ffffc90005dea068  RBX: ffff8801f496e8c8  RCX: 0000000000000000
    RDX: ffffc90005dea060  RSI: ffff88021b230740  RDI: 0000000000000000
    RBP: ffff880028203bf0   R8: ffffc90005dea038   R9: 0000000000000001
    R10: ffff8802163a1900  R11: 0000000000000001  R12: 0000000000000000
    R13: ffff8801f496e8c8  R14: 0000000000003600  R15: 0000000000000000
    ORIG_RAX: ffffffffffffffff  CS: 0010  SS: 0018
 #7 [ffff880028203bf8] queue_work at ffffffff810913cf


最后的log日志则如下:
------------[ cut here ]------------
kernel BUG at kernel/workqueue.c:191!
invalid opcode: 0000 [#1] SMP
last sysfs file: /sys/kernel/mm/ksm/run
CPU 0
Modules linked in: ext3 jbd binlogdev(U) ip6table_filter ip6_tables iptable_filter ip_tables ebtable_nat ebtables bnx2fc fcoe libfcoe libfc scsi_transport_fc scsi_tgt 8021q garp sunrpc bridge stp llc vhost_net macvtap macvlan tun kvm_intel kvm sg serio_raw i2c_i801 i2c_core iTCO_wdt iTCO_vendor_support shpchp memdisk(U) memcon(U) ext4 mbcache jbd2 sd_mod crc_t10dif megaraid_sas e1000e video output ahci dm_mirror dm_region_hash dm_log dm_mod be2iscsi bnx2i cnic uio ipv6 cxgb4i cxgb4 cxgb3i libcxgbi cxgb3 mdio libiscsi_tcp qla4xxx iscsi_boot_sysfs libiscsi scsi_transport_iscsi [last unloaded: scsi_wait_scan]

Pid: 0, comm: swapper Not tainted 2.6.32-358.6.1.el6.x86_64 #1 Supermicro X9SCI/X9SCA/X9SCI/X9SCA
RIP: 0010:[]  [] queue_work_on+0x49/0x60
RSP: 0018:ffff880028203be0  EFLAGS: 00010006
RAX: ffffc9000578d068 RBX: 0000000000000286 RCX: 0000000000000000
RDX: ffffc9000578d060 RSI: ffff88021acddc00 RDI: 0000000000000000
RBP: ffff880028203be0 R08: ffffc9000578d038 R09: 0000000000000001
R10: ffff8802159bee80 R11: 0000000000000001 R12: 0000000000000000
R13: ffff8800c4702558 R14: 0000000000002400 R15: 0000000000000000
FS:  0000000000000000(0000) GS:ffff880028200000(0000) knlGS:0000000000000000
CS:  0010 DS: 0018 ES: 0018 CR0: 000000008005003b
CR2: 000000000246d000 CR3: 00000001f1db3000 CR4: 00000000000427e0
DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000
DR3: 0000000000000000 DR6: 00000000ffff0ff0 DR7: 0000000000000400
Process swapper (pid: 0, threadinfo ffffffff81a00000, task ffffffff81a8d020)
Stack:
 ffff880028203bf0 ffffffff810913cf ffff880028203c10 ffffffffa033b642
ffff8800c4702558 ffff8800c4702558 ffff880028203c30 ffffffffa0342b3c
ffffffffa0342b00 ffffffffa0342b00 ffff880028203c60 ffffffffa01b5d7f
Call Trace:
 
 [] queue_work+0x1f/0x30

省去了调用queue_work的流程。

判断程序是在workqueue.c这个文件的第191行挂掉,查看对应源代码(所有的源码为2.6.32-358.2.1.el6,在此说明下,以后就不一一说明代码出处了),如下:
function: queue_work
   163    int queue_work(struct workqueue_struct *wq, struct work_struct *work)
   164    {
   165        int ret;
   166   
   167        ret = queue_work_on(get_cpu(), wq, work);
   168        put_cpu();
   169   
   170        return ret;
   171    }

function: queue_work_on
   174   
   185    int
   186    queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
   187    {
   188        int ret = 0;
   189   
   190        if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
   191            BUG_ON(!list_empty(&work->entry));
   192            __queue_work(wq_per_cpu(wq, cpu), work);
   193            ret = 1;
   194        }
   195        return ret;
   196    }
可见,模块是crash在了第191行,也就是BUG_ON(!list_empty(&work->entry))。

接下来,就说说work工作队列的执行过程。
1. 首先看下创建workqueue_struct,我们引用single thread的创建深入,
wq = create_singlethread_workqueue("testd");
->__create_workqueue((name), 1, 0, 0)
->struct workqueue_struct *__create_workqueue_key(const char *name, int singlethread, int freezeable, int rt, struct lock_class_key *key, const char *lock_name)
->static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
至此,创建workqueue完成,重点是最后执行的 create_workqueue_thread 函数,此函数创建了一个名为wq->name的线程,进入了static int worker_thread(void *__cwq)开始执行线程。

查看线程执行
线程主要为for的死循环,如下:
 320     for (;;) {
 321         prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
 322         if (!freezing(current) &&
 323             !kthread_should_stop() &&
 324             list_empty(&cwq->worklist))
 325             schedule();
 326         finish_wait(&cwq->more_work, &wait);
 327
 328         try_to_freeze();
 329        
 330         if (kthread_should_stop())
 331             break;
 332
 333         run_workqueue(cwq);
 334     }

深入prepare_to_wait查看
 67 void        
 68 prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
 69 {       
 70     unsigned long flags;
 71
 72     wait->flags &= ~WQ_FLAG_EXCLUSIVE;
 73     spin_lock_irqsave(&q->lock, flags);
 74     if (list_empty(&wait->task_list))
 75         __add_wait_queue(q, wait);
 76     set_current_state(state);
 77     spin_unlock_irqrestore(&q->lock, flags);
 78 }
只是将wait加入了q的task_list队列中,修改了线程状态。

调用schedule(),线程被切换,休眠了。
至此整个的创建workqueue的工作结束了。我们也安装模块执行的顺序,依依罗列其过程,所以在此就不在继续下面的函数,而是将焦点切换至唤醒工作队列上来。

2. 唤醒工作队列之前,肯定有工作需要自己注册的线程去做,如下:
INIT_WORK(&__test_work, test_work);
也就是我们需要唤醒线程,来执行test_work这个函数。

3. 于是我们需要调用queue_work(wq, &__test_work);将work加入至workqueue_struct中去。执行过程如下:
queue_work(wq, &__test_work);
-> int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
-> static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
-> static void insert_work(struct cpu_workqueue_struct *cwq, struct work_struct *work, struct list_head *head)
-> void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key)
-> static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key)
至此queue_work调用结束,下面一一分析各个函数作用。

 185 int
 186 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
 187 {
 188     int ret = 0;
 189
 190     if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
 191         BUG_ON(!list_empty(&work->entry));
 192         __queue_work(wq_per_cpu(wq, cpu), work);
 193         ret = 1;
 194     }
 195     return ret;
 196 }
此函数,只要的目的是同一个work只能进入,并且被加入wq一次,如果work被加入之后,还没有执行到,则直接退出。通过上面自己的模块也是crash在此函数的第191行,具体原因,暂不明,待查找。

 6226 static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
 6227             int nr_exclusive, int wake_flags, void *key)
 6228 {
 6229     wait_queue_t *curr, *next;
 6230
 6231     list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
 6232         unsigned flags = curr->flags;
 6233
 6234         if (curr->func(curr, mode, wake_flags, key) &&
 6235                 (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
 6236             break;
 6237     }
 6238 }
此函数将执行所有wait_queue->task_list注册的func函数,执行完成,整个的queue_work调用完成。

那么此处的wait_queue->task_list队列何时被添加进wait_queue_head_t呢?
回过头来看workqueue_struct注册时候的线程static int worker_thread(void *__cwq)
查看void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)函数中的调用,发现是此时将work添加进queu_head的。
122 static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
123 {  
124     list_add(&new->task_list, &head->task_list);
125 }

而这个work则是进入worker_thread函数时定义的,DEFINE_WAIT(wait);
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
所有queue_work执行到最后,也就是为了执行autoremove_wake_function函数,而autoremove_wake_function函数最后执行,则是为了唤醒curr->private所指向的task。代码如下:
 6210 int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
 6211               void *key)
 6212 { 
 6213     return try_to_wake_up(curr->private, mode, wake_flags);
 6214 }

那么何时赋值这个private呢,见wait定义:
598 #define DEFINE_WAIT_FUNC(name, function)                \
599     wait_queue_t name = {                       \
600         .private    = current,              \
601         .func       = function,             \
602         .task_list  = LIST_HEAD_INIT((name).task_list), \
603     }
对,一切工作就是为了唤醒主动休眠的当前进程,使之能继续执行。

4. 线程被唤醒之后,继续执行。
重点为run_workqueue函数,此函数为while循环执行所有worklist上被加入的work,也就是在调用queue_work在insert_work函数中被加入的work。代码如下:
 128 static void insert_work(struct cpu_workqueue_struct *cwq,
 129             struct work_struct *work, struct list_head *head)
 130 { 
 131     trace_workqueue_insertion(cwq->thread, work);
 132
 133     set_wq_data(work, cwq);
 134       
 138     smp_wmb();
 139     list_add_tail(&work->entry, head);
 140     wake_up(&cwq->more_work);
 141 }        
 142
 143 static void __queue_work(struct cpu_workqueue_struct *cwq,
 144              struct work_struct *work)
 145 {  
 146     unsigned long flags;
 147    
 148     spin_lock_irqsave(&cwq->lock, flags);
 149     insert_work(cwq, work, &cwq->worklist);
 150     spin_unlock_irqrestore(&cwq->lock, flags);
 151 }

将pending标记清除,从队列中删除等操作之后,开始执行f(work),也就是之前work所注册的函数,此处为test_work,一切调用流程结束。          

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多