分享

zmq_msg_t的结构和相关函数

 赵静Library 2013-12-30

本文主要讲zeromq的消息体系zmq_msg_t及其相关的操作函数。

因为tcp是一种字节流类型的协议,木有边界,所以把该消息边界的制定留给了应用层。通常有两种方式实现:

1. 在传输的数据中添加分隔符。

2. 在每条消息中添加size字段。

而zeromq可以说选择了第二种方式。

先来看看zmq_msg_t的基本数据结构

  1. /*  A message. Note that 'content' is not a pointer to the raw data.          */  
  2. /*  Rather it is pointer to zmq::msg_content_t structure                      */  
  3. /*  (see src/msg_content.hpp for its definition).                             */  
  4. typedef struct  
  5. {     
  6.     void *content;  
  7.     unsigned char flags;  
  8.     unsigned char vsm_size;// 小消息的大小  
  9.     unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];// 小消息的内存区域  
  10. } zmq_msg_t  
其中content指针指向的是zmq::msg_content_t的结构, flags表示消息的一些flags

这边vsm_size和vsm_data是针对very small message的小消息做一些优化,直接在stack上分配内存了,可以看后面的消息函数的具体操作。

  1. //  Shared message buffer. Message data are either allocated in one  
  2.     //  continuous block along with this structure - thus avoiding one  
  3.     //  malloc/free pair or they are stored in used-supplied memory.  
  4.     //  In the latter case, ffn member stores pointer to the function to be  
  5.     //  used to deallocate the data. If the buffer is actually shared (there  
  6.     //  are at least 2 references to it) refcount member contains number of  
  7.     //  references.  
  8.       
  9.     struct msg_content_t  
  10.     {  
  11.         void *data;  
  12.         size_t size;  
  13.         zmq_free_fn *ffn;  
  14.         void *hint;  
  15.         zmq::atomic_counter_t refcnt;  
  16.     };  

其中data指向真正的消息数据,size表示消息数据的字节大小,zmq_free_fn *ffn指向释放函数,refcnt表示消息的引用计数,hint目前未知。


下面我们看一下基本的消息操作函数:

  1. int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)  
  2. {  
  3.     if (size_ <= ZMQ_MAX_VSM_SIZE) {  
  4.         msg_->content = (zmq::msg_content_t*) ZMQ_VSM;  
  5.         msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;  
  6.         msg_->vsm_size = (uint8_t) size_;  
  7.     }  
  8.     else {  
  9.         msg_->content =  
  10.             (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);  
  11.         if (!msg_->content) {  
  12.             errno = ENOMEM;  
  13.             return -1;  
  14.         }  
  15.         msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;  
  16.       
  17.         zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;  
  18.         content->data = (void*) (content + 1);  
  19.         content->size = size_;  
  20.         content->ffn = NULL;  
  21.         content->hint = NULL;  
  22.         new (&content->refcnt) zmq::atomic_counter_t (); // 设置引用计数  
  23.     }  
  24.     return 0;  
  25. }     

对于小消息来说, 我们设置content指向地址ZMQ_VSM,这个值为32,相当于一个魔法数字,用来表示小消息。

对于相对来说的大消息来说,我们malloc (sizeof (zmq::msg_content_t) + size_)的空间,其中size_就是可变长的消息大小,然后设置content->data = (void *) (content + 1),指向这块可变长的内存区域。

这边有一个初始化msg的flags的操作:

  1. msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;  
flags的定义:

  1. /*  Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag     */  
  2. /*  (it has no equivalent in the wire format), however, making  it a flag     */  
  3. /*  allows us to pack the stucture tigher and thus improve performance.       */  
  4. #define ZMQ_MSG_MORE 1 // 00000001  
  5. #define ZMQ_MSG_SHARED 128 // 10000000  
  6. #define ZMQ_MSG_MASK 129 /* Merges all the flags */ // 10000001  

我们简单分析下这样就可以msg的flags的初始值就是01111110,即可以| ZMQ_MSG_MORE或者| ZMQ_MSG_SHARED。

尼玛。。。感觉在说废话。。。


我们再来看看另外两个消息初始化函数:

  1. int zmq_msg_init (zmq_msg_t *msg_)  
  2. {  
  3.     msg_->content = (zmq::msg_content_t*) ZMQ_VSM;  
  4.     msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;  
  5.     msg_->vsm_size = 0;  
  6.     return 0;  
  7. }  
初始化一个msg消息,设置成小消息类型,初始化flags,并且将小消息的大小初始化为0.

  1. int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,  
  2.     zmq_free_fn *ffn_, void *hint_)  
  3. {  
  4.     msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));  
  5.     alloc_assert (msg_->content);  
  6.     msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;  
  7.     zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;  
  8.     content->data = data_;  
  9.     content->size = size_;  
  10.     content->ffn = ffn_;  
  11.     content->hint = hint_;  
  12.     new (&content->refcnt) zmq::atomic_counter_t ();  
  13.     return 0;  
  14. }  

该函数根据已有的data和相应的size,和销毁的函数,以及hint来初始化消息。

这边消息指向data指针所指的内存区域。


接下来我们来看看获取数据的地址的函数,这样你就能操作消息的数据内存区域了。

  1. void *zmq_msg_data (zmq_msg_t *msg_)  
  2. {  
  3.     zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);  
  4.   
  5.     if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)  
  6.         return msg_->vsm_data;  
  7.     if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)  
  8.         return NULL;  
  9.   
  10.     return ((zmq::msg_content_t*) msg_->content)->data;  
  11. }  
如果是小消息那么就是stack上的vsm_data地址,大消息的就是heap上的content->data。

如果是delimiter的话,返回的是空指针。 我们后面会讲述delimiter的用处。


而zmq_msg_size(1)函数也是类似的处理方式,只不过返回的是消息大小。

  1. size_t zmq_msg_size (zmq_msg_t *msg_)  
  2. {  
  3.     zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);  
  4.   
  5.     if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)  
  6.         return msg_->vsm_size;  
  7.     if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)  
  8.         return 0;  
  9.   
  10.     return ((zmq::msg_content_t*) msg_->content)->size;  
  11. }  

接下来我们来看zmq_msg_close(1)函数, 看看如何销毁消息的。

  1. int zmq_msg_close (zmq_msg_t *msg_)  
  2. {  
  3.     //  Check the validity tag.  
  4.     if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) {  
  5.         errno = EFAULT;  
  6.         return -1;  
  7.     }  
  8.   
  9.     //  For VSMs and delimiters there are no resources to free.  
  10.     if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&  
  11.           msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {  
  12.   
  13.         //  If the content is not shared, or if it is shared and the reference.  
  14.         //  count has dropped to zero, deallocate it.  
  15.         zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;  
  16.         if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {  
  17.   
  18.             //  We used "placement new" operator to initialize the reference.  
  19.             //  counter so we call its destructor now.  
  20.             content->refcnt.~atomic_counter_t ();  
  21.   
  22.             if (content->ffn)  
  23.                 content->ffn (content->data, content->hint);  
  24.             free (content);  
  25.         }  
  26.     }  
  27.   
  28.     //  Remove the validity tag from the message.  
  29.     msg_->flags = 0;  
  30.   
  31.     return 0;  
  32. }  

消息的flag检测和前面类似,该函数末尾除还会移除初始化的flags,将其置成0,表明该消息已经废了。

对于小消息,消息数据是分配在stack上的,因此不需要手动销毁。

对于大消息,消息数据是分配在heap上的,因此我们查看消息flags是否采用ZMQ_MSG_SHARED共享模式,如果没有使用就直接销毁。假如使用了共享模式,那么我们递减消息的引用计数,一旦引用计数为0,我们就销毁消息内容。

销毁过程如下: 

1. 销毁引用计数,因为在创建引用计数的时候我们使用的是placement new所以这边我们需要调用它的析构函数。

2. 如果有注册相应的自定义的销毁函数,就调用该函数。这种情况主要是用于使用zmq_msg_init_data(),自己来管理data所指向空间。 

3. 调用free(1)释放heap的空间, 注意 如果是使用zmq_msg_init_size()来初始化消息的,就会释放掉data的空间,因为这块空间就是分配时多分配的size大小(在content下方)的那块。


至于zmq_msg_move和zmq_msg_copy主要是消息的move和copy。

  1. int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)  
  2. {  
  3. #if 0  
  4.     //  Check the validity tags.  
  5.     if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||  
  6.           (src_->flags | ZMQ_MSG_MASK) != 0xff)) {  
  7.         errno = EFAULT;  
  8.         return -1;  
  9.     }  
  10. #endif  
  11.     zmq_msg_close (dest_);  
  12.     *dest_ = *src_;  
  13.     zmq_msg_init (src_);  
  14.     return 0;  
  15. }  
  16.   
  17. int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)  
  18. {  
  19.     //  Check the validity tags.  
  20.     if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||  
  21.           (src_->flags | ZMQ_MSG_MASK) != 0xff)) {  
  22.         errno = EFAULT;  
  23.         return -1;  
  24.     }  
  25.   
  26.     zmq_msg_close (dest_);  
  27.   
  28.     //  VSMs and delimiters require no special handling.  
  29.     if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&  
  30.           src_->content != (zmq::msg_content_t*) ZMQ_VSM) {  
  31.   
  32.         //  One reference is added to shared messages. Non-shared messages  
  33.         //  are turned into shared messages and reference count is set to 2.  
  34.         zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;  
  35.         if (src_->flags & ZMQ_MSG_SHARED)  
  36.             content->refcnt.add (1);  
  37.         else {  
  38.             src_->flags |= ZMQ_MSG_SHARED;  
  39.             content->refcnt.set (2);  
  40.         }  
  41.     }  
  42.   
  43.     *dest_ = *src_;  
  44.     return 0;  
  45. }  

zmq_msg_move(1)将source的消息重置成初始化的空消息。

而zmq_msg_copy(1)dst和src共享消息content的引用,主要注意引用计数的变更。如果已经是SHARED模式则增加引用计数,否则则设置成SHARED模式,并将引用技术设置成2。

有了这些知识,下面我们来看一下基本的消息操作的示例:

  1. //  Receive 0MQ string from socket and convert into C string  
  2. static char *  
  3. s_recv (void *socket) {  
  4.     zmq_msg_t message; // 创建消息结构  
  5.     zmq_msg_init (&message); // 初始化空消息  
  6.     zmq_recv (socket, &message, 0); // 接收消息  
  7.     int size = zmq_msg_size (&message); // 计算消息的大小  
  8.     char *string = malloc (size + 1); // 分配string为指向size + 1大小的heap空间,那个多出来的1字节是'\0'的空间  
  9.     memcpy (string, zmq_msg_data (&message), size); // 通过zmq_msg_data(1)获得消息的data地址,拷贝到字符串中  
  10.     zmq_msg_close (&message); // 释放或销毁消息  
  11.     string [size] = 0; // 设置'\0'  
  12.     return (string);  
  13. }  
  14.   
  15. //  Convert C string to 0MQ string and send to socket  
  16. static int  
  17. s_send (void *socket, char *string) {  
  18.     int rc;  
  19.     zmq_msg_t message; // 创建消息结构  
  20.     zmq_msg_init_size (&message, strlen (string)); // 以字符串长度(不包括'\0')初始化成消息  
  21.     memcpy (zmq_msg_data (&message), string, strlen (string)); // 将字符串的内容(不包括'\0')拷贝给消息  
  22.     rc = zmq_send (socket, &message, 0); // 发送消息  
  23.     assert (!rc);  
  24.     zmq_msg_close (&message); // 释放和销毁消息  
  25.     return (rc);  
  26. }  

从上面的注释可以看出收发消息的流程。zeromq有相关的库(czmq)封装了这些操作,当然你也可以自己封装。


ZMQ_DELIMITER:

刚才上文曾经提到过delimiter类型的消息。这种类型的消息类似于终结者的意思,主要在收发的管道中使用。因为zeromq会将消息先发送到管道中,然后poller运行在另外一个线程,将管道中的数据读出来发往socket的缓冲区,所以可以发送一个delimiter类型的消息去终结管道,销毁它。在以后我们分析管道的过程中,你能看到做这个工作的代码。


总结:

本文简单地介绍了zeromq中定义的消息的数据结构以及相关的操作,基本上zeromq的消息的数据结构就采用了可变长的数据结构来存放data,还会对小消息进行内存分配上面的优化,直接使用stack分配,而不是使用heap动态分配,消息content拥有引用计数可以共享之。

这边我们看到消息flags的时候我们发觉漏讲了一个#define ZMQ_MSG_MORE 1的标志,这个标志是multipart message使用的。下一次我们就会分析这块内容,并且会看看消息发送到管道后,poller将其取出来发送给socket缓冲区的具体细节以及反过来接收消息的过程,敬请期待!

希望有兴趣的朋友可以和我联系,一起学习。 kaka11.chen@gmail.com

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多