经过上面两篇博客已经对ZMQ和如何编译ZMQ有所了解了,在第一篇文章中介绍了ZMQ有三种模式,这篇博客就写一个关于发布订阅模式的Demo,希望对读者能够起到学习的作用,那要是在深入学习的话,可以参照zmq的官方和它自身带的实例代码学习。 在写DEMO之前需要将上篇博客编译之后的lib文件配置到环境变量中, C:\Users\CJQ\Desktop\zeromq-2.2.0\lib;C:\Users\CJQ\Desktop\zeromq-2.2.0\zeromq-jzmq\lib 也就是将zmq的lib文件夹和jzmq的文件夹放到环境变量中。 之后我们来写发布端代码, - package com.wh.mq.demo2;
-
- import org.zeromq.ZMQ;
- import org.zeromq.ZMQ.Context;
- import org.zeromq.ZMQ.Socket;
-
- public class Sync_PUB {
-
- public static void main(String[] args) throws InterruptedException {
- Context context = ZMQ.context(1);
- Socket publisher = context.socket(ZMQ.PUB);
- publisher.bind("tcp://*:5561");
- //zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布
- Thread.sleep(1000);
-
- int update_nbr;
-
- for (update_nbr = 20; update_nbr < 40; update_nbr++) {
- Stringa="{\"magicNum\":\"CHINSOFT\",\"varName\":\"ZJ_YD_1\",\"varType\":\"5\",\"varValue\":"+update_nbr+",\"varQuality\":\"1111\",\"varTime\":"+System.currentTimeMillis()/1000+"}";
- publisher.send(a.getBytes(), ZMQ.NOBLOCK);
- System.out.println(update_nbr);
- Thread.sleep(1000);
- }
-
-
- publisher.close();
- context.term();
- }
- }
发布端需要通过context.socket(ZMQ.PUB)表示为发布端,通过bind方法来创建发布端连接,等待订阅者连接。 之后通过send方法将数据发送到出去。 之后来写订阅端代码 - public classSync_SUB1 {
-
- publicstaticvoidmain(String[] args) {
- Context context = ZMQ.context(1);
- Socket subscriber = context.socket(ZMQ.SUB);
- subscriber.connect("tcp://localhost:5561");
-
- //设置订阅条件"setsockopt"
- subscriber.subscribe("".getBytes());
- int update_nbr = 0;
- while (true) {
- byte[] stringValue = subscriber.recv(0);
-
- String string = new String(stringValue);
-
- update_nbr++;
- System.out.println("Received " + update_nbr + " updates. :"+ string);
- }
- }
- }
客户端通过connect进行连接,之后通过recv来进行数据接收。 到此为止发布订阅Demo就写完了,通过这三篇博客能够对ZMQ有了初步的认识和简单实用,希望这三篇博客对学习zmq的读者有所帮助。
|