本文主要讲zeromq的消息体系zmq_msg_t及其相关的操作函数。 因为tcp是一种字节流类型的协议,木有边界,所以把该消息边界的制定留给了应用层。通常有两种方式实现: 1. 在传输的数据中添加分隔符。 2. 在每条消息中添加size字段。 而zeromq可以说选择了第二种方式。 先来看看zmq_msg_t的基本数据结构
-
-
-
- typedef struct
- {
- void *content;
- unsigned char flags;
- unsigned char vsm_size;
- unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
- } zmq_msg_t
其中content指针指向的是zmq::msg_content_t的结构, flags表示消息的一些flags
这边vsm_size和vsm_data是针对very small message的小消息做一些优化,直接在stack上分配内存了,可以看后面的消息函数的具体操作。
-
-
-
-
-
-
-
-
- struct msg_content_t
- {
- void *data;
- size_t size;
- zmq_free_fn *ffn;
- void *hint;
- zmq::atomic_counter_t refcnt;
- };
其中data指向真正的消息数据,size表示消息数据的字节大小,zmq_free_fn *ffn指向释放函数,refcnt表示消息的引用计数,hint目前未知。
下面我们看一下基本的消息操作函数:
- int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
- {
- if (size_ <= ZMQ_MAX_VSM_SIZE) {
- msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- msg_->vsm_size = (uint8_t) size_;
- }
- else {
- msg_->content =
- (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
- if (!msg_->content) {
- errno = ENOMEM;
- return -1;
- }
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
-
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- content->data = (void*) (content + 1);
- content->size = size_;
- content->ffn = NULL;
- content->hint = NULL;
- new (&content->refcnt) zmq::atomic_counter_t ();
- }
- return 0;
- }
对于小消息来说, 我们设置content指向地址ZMQ_VSM,这个值为32,相当于一个魔法数字,用来表示小消息。
对于相对来说的大消息来说,我们malloc (sizeof (zmq::msg_content_t) + size_)的空间,其中size_就是可变长的消息大小,然后设置content->data = (void *) (content + 1),指向这块可变长的内存区域。 这边有一个初始化msg的flags的操作: - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
flags的定义:-
-
-
- #define ZMQ_MSG_MORE 1 // 00000001
- #define ZMQ_MSG_SHARED 128 // 10000000
- #define ZMQ_MSG_MASK 129 /* Merges all the flags */ // 10000001
我们简单分析下这样就可以msg的flags的初始值就是01111110,即可以| ZMQ_MSG_MORE或者| ZMQ_MSG_SHARED。 尼玛。。。感觉在说废话。。。
我们再来看看另外两个消息初始化函数: - int zmq_msg_init (zmq_msg_t *msg_)
- {
- msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- msg_->vsm_size = 0;
- return 0;
- }
初始化一个msg消息,设置成小消息类型,初始化flags,并且将小消息的大小初始化为0.- int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
- zmq_free_fn *ffn_, void *hint_)
- {
- msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
- alloc_assert (msg_->content);
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- content->data = data_;
- content->size = size_;
- content->ffn = ffn_;
- content->hint = hint_;
- new (&content->refcnt) zmq::atomic_counter_t ();
- return 0;
- }
该函数根据已有的data和相应的size,和销毁的函数,以及hint来初始化消息。这边消息指向data指针所指的内存区域。
接下来我们来看看获取数据的地址的函数,这样你就能操作消息的数据内存区域了。 - void *zmq_msg_data (zmq_msg_t *msg_)
- {
- zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
-
- if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
- return msg_->vsm_data;
- if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
- return NULL;
-
- return ((zmq::msg_content_t*) msg_->content)->data;
- }
如果是小消息那么就是stack上的vsm_data地址,大消息的就是heap上的content->data。如果是delimiter的话,返回的是空指针。 我们后面会讲述delimiter的用处。
而zmq_msg_size(1)函数也是类似的处理方式,只不过返回的是消息大小。 - size_t zmq_msg_size (zmq_msg_t *msg_)
- {
- zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
-
- if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
- return msg_->vsm_size;
- if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
- return 0;
-
- return ((zmq::msg_content_t*) msg_->content)->size;
- }
接下来我们来看zmq_msg_close(1)函数, 看看如何销毁消息的。- int zmq_msg_close (zmq_msg_t *msg_)
- {
-
- if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) {
- errno = EFAULT;
- return -1;
- }
-
-
- if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
- msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
-
-
-
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
-
-
-
- content->refcnt.~atomic_counter_t ();
-
- if (content->ffn)
- content->ffn (content->data, content->hint);
- free (content);
- }
- }
-
-
- msg_->flags = 0;
-
- return 0;
- }
消息的flag检测和前面类似,该函数末尾除还会移除初始化的flags,将其置成0,表明该消息已经废了。对于小消息,消息数据是分配在stack上的,因此不需要手动销毁。 对于大消息,消息数据是分配在heap上的,因此我们查看消息flags是否采用ZMQ_MSG_SHARED共享模式,如果没有使用就直接销毁。假如使用了共享模式,那么我们递减消息的引用计数,一旦引用计数为0,我们就销毁消息内容。 销毁过程如下:
1. 销毁引用计数,因为在创建引用计数的时候我们使用的是placement new所以这边我们需要调用它的析构函数。 2. 如果有注册相应的自定义的销毁函数,就调用该函数。这种情况主要是用于使用zmq_msg_init_data(),自己来管理data所指向空间。
3. 调用free(1)释放heap的空间, 注意 如果是使用zmq_msg_init_size()来初始化消息的,就会释放掉data的空间,因为这块空间就是分配时多分配的size大小(在content下方)的那块。
至于zmq_msg_move和zmq_msg_copy主要是消息的move和copy。 - int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
- {
- #if 0
-
- if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
- (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
- errno = EFAULT;
- return -1;
- }
- #endif
- zmq_msg_close (dest_);
- *dest_ = *src_;
- zmq_msg_init (src_);
- return 0;
- }
-
- int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
- {
-
- if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
- (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
- errno = EFAULT;
- return -1;
- }
-
- zmq_msg_close (dest_);
-
-
- if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
- src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
-
-
-
- zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
- if (src_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (1);
- else {
- src_->flags |= ZMQ_MSG_SHARED;
- content->refcnt.set (2);
- }
- }
-
- *dest_ = *src_;
- return 0;
- }
zmq_msg_move(1)将source的消息重置成初始化的空消息。 而zmq_msg_copy(1)dst和src共享消息content的引用,主要注意引用计数的变更。如果已经是SHARED模式则增加引用计数,否则则设置成SHARED模式,并将引用技术设置成2。
有了这些知识,下面我们来看一下基本的消息操作的示例: -
- static char *
- s_recv (void *socket) {
- zmq_msg_t message;
- zmq_msg_init (&message);
- zmq_recv (socket, &message, 0);
- int size = zmq_msg_size (&message);
- char *string = malloc (size + 1);
- memcpy (string, zmq_msg_data (&message), size);
- zmq_msg_close (&message);
- string [size] = 0;
- return (string);
- }
-
-
- static int
- s_send (void *socket, char *string) {
- int rc;
- zmq_msg_t message;
- zmq_msg_init_size (&message, strlen (string));
- memcpy (zmq_msg_data (&message), string, strlen (string));
- rc = zmq_send (socket, &message, 0);
- assert (!rc);
- zmq_msg_close (&message);
- return (rc);
- }
从上面的注释可以看出收发消息的流程。zeromq有相关的库(czmq)封装了这些操作,当然你也可以自己封装。
ZMQ_DELIMITER:
刚才上文曾经提到过delimiter类型的消息。这种类型的消息类似于终结者的意思,主要在收发的管道中使用。因为zeromq会将消息先发送到管道中,然后poller运行在另外一个线程,将管道中的数据读出来发往socket的缓冲区,所以可以发送一个delimiter类型的消息去终结管道,销毁它。在以后我们分析管道的过程中,你能看到做这个工作的代码。
总结:
本文简单地介绍了zeromq中定义的消息的数据结构以及相关的操作,基本上zeromq的消息的数据结构就采用了可变长的数据结构来存放data,还会对小消息进行内存分配上面的优化,直接使用stack分配,而不是使用heap动态分配,消息content拥有引用计数可以共享之。 这边我们看到消息flags的时候我们发觉漏讲了一个#define ZMQ_MSG_MORE 1的标志,这个标志是multipart message使用的。下一次我们就会分析这块内容,并且会看看消息发送到管道后,poller将其取出来发送给socket缓冲区的具体细节以及反过来接收消息的过程,敬请期待! 希望有兴趣的朋友可以和我联系,一起学习。 kaka11.chen@gmail.com
|