分享

消息队列内核源码分析

 昵称28511038 2015-10-23
 
二. Linux IPC消息队列源码分析
A. 文件目录结构
include/linux/msg.h
ipc/msg.c
ipc/msgutil.c
 
B. 主要数据结构分析
msg_msg,消息的基本数据结构,每个msg_msg将占据一个page的内容,其中一个page除了存储该结构体,空余的部分直接用来存储消息的内容。其中记录了一条消息的type和size,next指针用于指向msg_msgseg结构。
在消息队列的设计中,若消息内容大于一个page,则会使用一个类似链表的结构,但是后面的节点不需要再标记type和size等数据,后面的节点用msg_msgseg表示。
 
/* one msg_msg structure for each message */
struct msg_msg
{
    struct list_head m_list;
    long  m_type;
    int m_ts; /* message text size */
    struct msg_msgseg* next;
    void *security;    /* the actual message follows immediately */
};

//msg_msgseg只需要存储指向下一链表块的指针就行了,每个msg_msgseg也将占据一个page的空间,其中一个page除了存储该结构体,剩下的部分都将用来存储message的数据。
 
struct msg_msgseg
{
    struct msg_msgseg* next;    /* the next part of the message follows immediately */
};

//msg_msg链表的结构,每个page下方空余的部分都将用来存massage的text。

//消息队列的数据结构,其中包含message、receiver、sender队列的链表指针,同时还包含其他一些相关数据。
 
/* one msq_queue structure for each present queue on the system */
struct msg_queue
{
    struct kern_ipc_perm q_perm;
    time_t q_stime;         /* last msgsnd time */
    time_t q_rtime;         /* last msgrcv time */
    time_t q_ctime;         /* last change time */
    unsigned long q_cbytes;     /* current number of bytes on queue */
    unsigned long q_qnum;       /* number of messages in queue */
    unsigned long q_qbytes;     /* max number of bytes on queue */
    pid_t q_lspid;          /* pid of last msgsnd */
    pid_t q_lrpid;          /* last receive pid */
    struct list_head q_messages;
    struct list_head q_receivers;
    struct list_head q_senders;
};

msqid_ds是msg_queue上msg_message队列的数据结构

struct msqid_ds
{
    struct ipc_perm msg_perm;
    struct msg *msg_first; /* first message on queue,unused  */   
    struct msg *msg_last;  /* last message in queue,unused */
    __kernel_time_t msg_stime;  /* last msgsnd time */
    __kernel_time_t msg_rtime;  /* last msgrcv time */
    __kernel_time_t msg_ctime;  /* last change time */
    unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long  msg_lqbytes; /* ditto */   
    unsigned short msg_cbytes;  /* current number of bytes on queue */
    unsigned short msg_qnum;    /* number of messages in queue */
    unsigned short msg_qbytes;  /* max number of bytes on queue */
    __kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};

//在队列上睡眠的receiver数据结构

/* one msg_receiver structure for each sleeping receiver */
struct msg_receiver
{
    struct list_head    r_list;
    struct task_struct  *r_tsk; //指向该进程
    int         r_mode;   
    long            r_msgtype;
    long            r_maxsize;
    struct msg_msg      *volatile r_msg; //最后将用于存储取得的消息
};

//在队列上睡眠的sender数据结构
/* one msg_sender for each sleeping sender */
struct msg_sender
{
    struct list_head    list;
    struct task_struct  *tsk; //指向该进程
};
 

C. 工具函数分析
load_msg,发送消息时使用
将函数简化后分析如下
struct msg_msg *load_msg(const void __user *src, int len)
{
    struct msg_msg *msg;
    struct msg_msgseg **pseg;
    int alen;
 
    //#define DATALEN_MSG (PAGE_SIZE-sizeof(struct msg_msg))
    //DATALEN_MSG是一个page减去msg_msg结构体的大小,如前面所说,就是一个page可用 来存储数据部分的大小    
   //这里首先处理了第一个page的问题  
 
   alen = len;
    if (alen > DATALEN_MSG)
        alen = DATALEN_MSG;
 
    //kmalloc为msg分配空间
    msg = kmalloc(sizeof(*msg) + alen, GFP_KERNEL);
    //该函数底层用汇编代码实现,主要工作是将msg的内容从用户态空间转移到内核态空间,显然msg+1返回的地址就是page余下用来存储msg内容的基地址
    copy_from_user(msg + 1, src, alen)
    //下面考虑如果msg的长度大于一个page的容纳量,将增加更多的page,不过增加的page的头部是msg_msgseg结构体
    len -= alen;
    src = ((char __user *)src) + alen;
    pseg = &msg->next;
    //while循环,保证足够多的page去存储所有的消息text
    while (len > 0)
    {
        struct msg_msgseg *seg;//这边的工作其实和之前一样,只是分配的结构体是msg_msgseg
        alen = len;
        if (alen > DATALEN_SEG)
            alen = DATALEN_SEG;
        seg = kmalloc(sizeof(*seg) + alen, GFP_KERNEL);
        *pseg = seg;
        seg->next = NULL;
        //同样是copy_from_user的操作
        copy_from_user(seg + 1, src, alen)
        pseg = &seg->next;
        len -= alen;
        src = ((char __user *)src) + alen;   
 }
 //返回第一个msg_msg的地址
 //如果之前发生了error(相关代码已经省去),将会执行free_msg(msg)的工作进行内存释放,最后返回错误信息
    return msg;
}

//store_msg,接收消息时使用,主要逻辑类似load_msg
int store_msg(void __user *dest, struct msg_msg *msg, int len)
{
    int alen;
    struct msg_msgseg *seg;
   
    alen = len;
    if (alen > DATALEN_MSG)
        alen = DATALEN_MSG;
    //对应于copy_from_user,这里copy_to_user的工作就是将消息从内核态空间调整到用户态,底层也是汇编实现的
    copy_to_user(dest, msg + 1, alen)
    len -= alen;
    dest = ((char __user *)dest) + alen;
    seg = msg->next;
    while (len > 0)
    {
        alen = len;
        if (alen > DATALEN_SEG)
            alen = DATALEN_SEG;
        copy_to_user(dest, seg + 1, alen)
        len -= alen;
        dest = ((char __user *)dest) + alen;
        seg = seg->next;
   }
   return 0;
}

//free_msg,之前再讨论load_msg的时候有提到free_msg的工作,用于在发生错误时,释放已经为msg结构分配的pages

void free_msg(struct msg_msg *msg)
{
    struct msg_msgseg *seg;
   
 seg = msg->next;
 //kfree是kmalloc的对应操作,在这里从msg的第一个节点逐个free,直到最后next指针指向的是NULL   
 kfree(msg);
    while (seg != NULL)
 {
        struct msg_msgseg *tmp = seg->next;
  kfree(seg);
  seg = tmp;
 }
}

//pipeline_send,发送消息时会调用,条件是在发送进程进行发送的时候恰巧接收进程同时也准备好接收了

static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
{
    struct list_head *tmp;
 //轮询每一个receiver队列中的receiver
    tmp = msq->q_receivers.next;
    while (tmp != &msq->q_receivers)
 {
        struct msg_receiver *msr;
  //获取msg_receiver队列的地址
        msr = list_entry(tmp, struct msg_receiver, r_list);
        tmp = tmp->next;
  //testmsg通过消息的type和mode等信息判断该消息是否是接收进程需要的消息
        if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&
   !security_msg_queue_msgrcv(msq, msg, msr->r_tsk,                          
   msr->r_msgtype, msr->r_mode))
  {
   //符合条件则取出该receiver
            list_del(&msr->r_list);
   //消息太大放不下时会报错,算是消息接收失败
   if (msr->r_maxsize < msg->m_ts)
   {
                msr->r_msg = NULL;
                wake_up_process(msr->r_tsk);
    //多处理器相关,避免编译器优化代码
                smp_mb();
                msr->r_msg = ERR_PTR(-E2BIG);
   }
   else
   { 
    //成功取得消息
                msr->r_msg = NULL;
    //msg_queue的一些变量设置
    //设置last receieved pid
                msq->q_lrpid = task_pid_vnr(msr->r_tsk);
                //设置last msgrcv time
    msq->q_rtime = get_seconds();
    //唤醒接收进程
                wake_up_process(msr->r_tsk);
                smp_mb();
    //获取消息内容
                msr->r_msg = msg;
                return 1;
   }
  }
 }
    return 0;
}

//D. 一些其他的工具函数
//创建一个新的msg_queue,主要执行一些初始化操作
static int newque(struct ipc_namespace *ns, struct ipc_params *params)
 
//在sender队列里面添加一个sender
static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss)
 
//删除sender,观察sender结构体可以发现mss->list就是该sender在list上的节点
static inline void ss_del(struct msg_sender *mss)
 
//唤醒sender
static void ss_wakeup(struct list_head *h, int kill)
 
//唤醒receiver
static void expunge_all(struct msg_queue *msq, int res)
 
//清楚msg_queue,回收队列的空间,主要是把msg,sender,receiver三个队列没一个节点free掉
static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 
//接收消息时调用,一边是message队列中的消息,一边是receivers队列中的接收进程,testmsg通过消息的type和mode等信息来判断时候接收
static int testmsg(struct msg_msg *msg, long type, int mode)
 
//设置消息匹配的mode,在msgrcv中调用,获得的mode在testmsg中会用到
static inline int convert_mode(long *msgtyp, int msgflg)
 
//E. 主要的系统调用
//msgget消息队列的获取或创建

SYSCALL_DEFINE2(msgget, key_t, key, int, msgflg)
{
    struct ipc_namespace *ns;
    struct ipc_ops msg_ops;
 struct ipc_params msg_params;
 //一些初始化操作
    ns = current->nsproxy->ipc_ns;
    msg_ops.getnew = newque;
    msg_ops.associate = msg_security;
    msg_ops.more_checks = NULL;
 msg_params.key = key;
 msg_params.flg = msgflg;
 //调用ipcget函数,ipc模块统一处理
    return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params);
}

//msgsnd发送消息
//函数执行流程:
//获得当前进程ipc namespace
//load_msg构造一个msg结构,存放sender的消息内容,消息从用户态转入内核态
//初始化msg的type和size
//ipc权限检查
//判断msg_queue是否有足够空间装下新的msg,装的下直接跳到pipeline_send
//queue满的时候需要等待(比较size),查看flag是否是IPC_NOWAIT,是的话报错
//调用ss_add,将这个sender添加到sender队列
//调用schedule()调度等待进程
//设置last msgsnd time、last msgsndpid,然后调用pipeline_send直接向等待进程发消息
//如果直接发送失败,则将创建的msg放入消息等待队列(msg->q_messages),并修改msg_queue相应变量
//最后free_msg,释放空间
//简化后的代码:

//实际调用的函数
long do_msgsnd(int msqid, long mtype, void __user *mtext, size_t msgsz, int msgflg)
{
    struct msg_queue *msq;
    struct msg_msg *msg;
    struct ipc_namespace *ns;
    ns = current->nsproxy->ipc_ns;
 //关键操作
 load_msg msg = load_msg(mtext, msgsz);
    msg->m_type = mtype;
    msg->m_ts = msgsz;
    msq->q_lspid = task_tgid_vnr(current);
    msq->q_stime = get_seconds();
    if (!pipelined_send(msq, msg))
 {
        /* noone is waiting for this message, enqueue it */
        list_add_tail(&msg->m_list, &msq->q_messages);
        msq->q_cbytes += msgsz;
        msq->q_qnum++;
        atomic_add(msgsz, &ns->msg_bytes);
        atomic_inc(&ns->msg_hdrs);
 }

    msg = NULL;
    if (msg != NULL)
        free_msg(msg);
 return 0;
}

//系统调用接口
SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz, int, msgflg)
{
    long mtype;
 if (get_user(mtype, &msgp->mtype))
        return -EFAULT;
    return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
}

//msgrcv接收消息
//函数执行流程:
//convert_mode设置搜索模式,获取ipc的namespace
//ipc权限检查
//遍历q_messages,使用testmsg检查是否有符合要求的msg存在,按直接设置的搜索模式进行匹配
//找到合适的message,将该msg从list中删除,设置msg_queue相关参数,调用ss_wakeup唤醒sender,跳到store_msg处
//用store_msg将消息复制到用户空间
//free_msg,释放操作
//如果之前遍历q_messages没找到合适的message,判断是否设置了IPC_NOWAIT,若设置了则报错
//把该receiver加入到receivers队列中
//调用schedule()进行调度
//等待直到sender调用msgsnd进行pipeline_send
//简化后的代码:
 
//实际调用的函数
long do_msgrcv(int msqid, long *pmtype, void __user *mtext, size_t msgsz, long msgtyp, int msgflg)
{
    struct msg_queue *msq;
    struct msg_msg *msg;
    int mode;
    struct ipc_namespace *ns;
    mode = convert_mode(&msgtyp, msgflg);
    ns = current->nsproxy->ipc_ns;
    for (;;)
 {
        struct msg_receiver msr_d;
        struct list_head *tmp;
        tmp = msq->q_messages.next;
        while (tmp != &msq->q_messages)
  {
            struct msg_msg *walk_msg;
            walk_msg = list_entry(tmp, struct msg_msg, m_list);
            if (testmsg(walk_msg, msgtyp, mode))
   {
    msg = walk_msg;
                if (mode == SEARCH_LESSEQUAL && walk_msg->m_type != 1)
    {
                    msg = walk_msg;
     msgtyp = walk_msg->m_type - 1;
    }
    else
    {
                    msg = walk_msg;
                    break;
    }
   }
            tmp = tmp->next;
  }

        if (!IS_ERR(msg))
  {
            //找到合适的message的情况
   list_del(&msg->m_list);
            msq->q_qnum--;
            msq->q_rtime = get_seconds();
            msq->q_lrpid = task_tgid_vnr(current);
            msq->q_cbytes -= msg->m_ts;
            atomic_sub(msg->m_ts, &ns->msg_bytes);
            atomic_dec(&ns->msg_hdrs);
            ss_wakeup(&msq->q_senders, 0);
   msg_unlock(msq);
   break;
  }

  //没找到相应的message,receiver将会睡眠等待,该receiver会被加入msg_queue中的receiver队列
        list_add_tail(&msr_d.r_list, &msq->q_receivers);
        msr_d.r_tsk = current;
        msr_d.r_msgtype = msgtyp;
        msr_d.r_mode = mode;
        msr_d.r_maxsize = msgsz;
        current->state = TASK_INTERRUPTIBLE;
        schedule();
  .... //省略众多基本看不懂的和锁相关的操作
        list_del(&msr_d.r_list);
        if (signal_pending(current))
  {
            break;
  }
 }

    msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
    *pmtype = msg->m_type;
    //关键操作store_msg
 store_msg(mtext, msg, msgsz)
    free_msg(msg);
    return msgsz;
}

//系统调用接口
SYSCALL_DEFINE5(msgrcv, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz, long, msgtyp, int, msgflg)
{   
 long mtype;
    do_msgrcv(msqid, &mtype, msgp->mtext, msgsz, msgtyp, msgflg);
 if (put_user(mtype, &msgp->mtype))
        return -EFAULT;
 return 0;
}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多