分享

(3)ZeroMQ 消息内核

 dongsibei 2014-04-25
    总在寻找一种高效的消息传递方式中不断徘徊,最近发现了它,它的性能让人眼前一亮。ZeroMQ
熟悉分布式消息总线的朋友,可以不假思索地脱口而出Kafka,ActiveMQ,***MQ等,但是,ZeroMQ的设计另辟了新天地。
它的特性:
1)ZeroMQ位于(TCP/IP协议层次结构)会话层和应用层之间。ZeroMQ 并不是一个对 socket 的封装,不能用它去实现已有的网络协议。
2)无消息服务器缓存数据。它不同于传统消息中间件通过服务器缓存转发消息的方式,而是优化点对点的消息传输。ZeroMQ能缓存消息,但是是在发送端缓存。ZeroMQ里有水位设置的相关接口来控制缓存量。当然,ZeroMQ也支持传统的消息队列(通过zmq_device来实现)。
3)强调消息收发模式。
在点对点的消息传输上ZeroMQ将通信的模式做了归纳,比如常见的订阅模式(一个消息发多个客户),分发模式(N个消息平均分给X个客户)等等。下面是目前支持的消息模式配对,任何一方都可以做为服务端。
  • PUB and SUB
  • REQ and REP
  • REQ and XREP
  • XREQ and REP
  • XREQ and XREP
  • XREQ and XREQ
  • XREP and XREP
  • PUSH and PULL
  • PAIR and PAIR
4)透明地支持多种通信方式。这主要包括:线程间通信,进程间通信,跨主机通信。只需修改连接的url即可,例如,进程间通信使用ipc:///11111, 跨主机通信使用tcp://10.5.0.170:26666, 不需要修改代码。

5)异步传输数据。通过新增一个IO线程发送数据,该IO线程在一次链接之后,不会马上释放,复用点与点之间数据链接,可以很好地提高性能。

ok,它究竟使用了什么trick才使得性能有了如此的提高呢?

ZeroMQ支持四种消息传递模式:

The following transports are defined:

inproc
local in-process (inter-thread) communication transport, see zmq_inproc(7)
ipc
local inter-process communication transport, see zmq_ipc(7)
tcp
unicast transport using TCP, see zmq_tcp(7)
pgm, epgm
reliable multicast transport using PGM, see zmq_pgm(7)

并且在这四种模式之上提供了统一的接口,下面看一个简单的Sample,实现的是ipc。
server端代码:
include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

int main (void)
{
    void *context = zmq_init (1);

    //  Socket to talk to clients
    void *responder = zmq_socket (context, ZMQ_REP);
  //  zmq_bind (responder, "tcp://*:5555");
     zmq_bind(responder, "ipc:///tmp/feeds/0");

    while (1) {
        //  Wait for next request from client
        zmq_msg_t request;
        zmq_msg_init (&request);
        zmq_recv (responder, &request, 0);
        printf ("Received Hello\n");
        zmq_msg_close (&request);

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq_msg_t reply;
        zmq_msg_init_size (&reply, 5);
        memcpy (zmq_msg_data (&reply), "World", 5);
        zmq_send (responder, &reply, 0);
        zmq_msg_close (&reply);
    }
    //  We never get here but if we did, this would be how we end
    zmq_close (responder);
    zmq_term (context);
    return 0;
}
编译时添加-lzmq
客户端代码:
#include <string.h>
#include <stdio.h>
#include <zmq.h>
int main(int argc, char* argv[]) {
    void * context = zmq_init(10); // there are 10 io_threads,and prepare for 0MQ context.
    void * requester = zmq_socket(context, ZMQ_REQ);
    zmq_msg_t msg;
    zmq_msg_init_size(&msg, 5);
    memcpy(zmq_msg_data(&msg), "Hello", 5);
    int rc = zmq_connect(requester, "ipc:///tmp/feeds/0");
    zmq_send (requester, &msg, 0);
    zmq_msg_close (&msg);
    zmq_msg_t reply;
    zmq_msg_init (&reply);
    zmq_recv (requester, &reply, 0);
    printf ("Received %s\n", (char *)zmq_msg_data(&reply));
    zmq_msg_close (&reply);
    return 0;
}
将这两个文件编译之后,创建 /tmp/feeds/0文件,然后执行即可。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多