分享

activeMQ 推送之mqtt客户端

 quasiceo 2016-07-20

使用activeMQ进行android推送

activeMQ下载地址:http://activemq./download.html

下载后是一个压缩包:apache-activemq-5.9.0-bin.zip

启动方式:

解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务

 启动之后:

 android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件

 

但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:

使用方法:点击[启动]按钮,开始接收推送消息
 对应的主类是:
MqttSwing,用于接收推送消息.

 

我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:

使用方法:点击[连接]按钮,才可以发送推送消息
 对应的主类:PusherApp,用于发送推送消息.

核心代码介绍如下.

客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)

方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)

 

Java代码  收藏代码
  1. /*** 
  2.      * 客户端和activeMQ服务器建立连接 
  3.      * @param BROKER_URL 
  4.      * @param clientId : 用于标识客户端,相当于ios中的device token 
  5.      * @param TOPIC 
  6.      * @param isCleanSession :false--可以接受离线消息; 
  7.      * @return 是否启动成功  
  8.      */  
  9.     private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){  
  10.         try {  
  11.             ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true);  
  12.             mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());  
  13.             MqttConnectOptions options= new MqttConnectOptions();  
  14.             options.setCleanSession(isCleanSession);//mqtt receive offline message  
  15.             ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true);  
  16.             options.setKeepAliveInterval(30);  
  17.             //推送回调类,在此类中处理消息,用于消息监听  
  18.             mqttClient.setCallback(new MyCallBack(MqttSwing.this));  
  19.             boolean isSuccess=false;  
  20.             try {  
  21.                 mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME  
  22.                 isSuccess=true;  
  23.             } catch (Exception e) {  
  24.                 if(isPrintException){  
  25.                     e.printStackTrace();  
  26.                 }  
  27.             }  
  28.             if(!isSuccess){  
  29.                 String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动";  
  30.                 ComponentUtil.appendResult(resultTextPane, message, true);  
  31.                 GUIUtil23.warningDialog(message);  
  32.                 return false;  
  33.             }else{  
  34.             //Subscribe to topics   
  35.                 mqttClient.subscribe(new String[]{TOPIC,clientId});  
  36.                  
  37.                 System.out.println("topic:"+TOPIC+",  "+(clientId));  
  38.                 ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+",  "+(clientId), true);  
  39.             }  
  40.   
  41.         } catch (MqttException e) {  
  42.             if(isPrintException){  
  43.             e.printStackTrace();}  
  44.             GUIUtil23.errorDialog(e.getMessage());  
  45.             return false;  
  46.         }  
  47.         return true;  
  48.     }  

 

 

推送消息到来时的回调类:MyCallBack

 

Java代码  收藏代码
  1. package com.mqtt.hw.callback;  
  2.   
  3. import org.apache.commons.lang.StringEscapeUtils;  
  4. import org.eclipse.paho.client.mqttv3.MqttCallback;  
  5. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;  
  7. import org.eclipse.paho.client.mqttv3.MqttTopic;  
  8.   
  9. import com.mqtt.hw.MqttSwing;  
  10. import com.time.util.TimeHWUtil;  
  11.   
  12. public class MyCallBack implements MqttCallback {  
  13.     private MqttSwing mqttSwing;  
  14.       
  15.       
  16.       
  17.     public MyCallBack(MqttSwing mqttSwing) {  
  18.         super();  
  19.         this.mqttSwing = mqttSwing;  
  20.     }  
  21.   
  22.     @Override  
  23.     public void connectionLost(Throwable cause) {  
  24.           
  25.     }  
  26.   
  27.     @Override  
  28.     public void messageArrived(MqttTopic topic, MqttMessage message)  
  29.             throws Exception {  
  30.         System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond());  
  31.         String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload()));  
  32.         System.out.println("message:"+messageStr);  
  33.         this.mqttSwing.receiveMessage(messageStr);  
  34.         //使窗口处于激活状态  
  35.   
  36.     }  
  37.   
  38.     @Override  
  39.     public void deliveryComplete(MqttDeliveryToken token) {  
  40.           
  41.     }  
  42.   
  43. }  

 

 

推送者与activeMQ建立连接:

 

Java代码  收藏代码
  1. /** 
  2.      * 初始化connection和session 
  3.      *  
  4.      * @throws Exception 
  5.      */  
  6.     private void init(/* String mqIp,boolean transacted */throws Exception {  
  7.         if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) {  
  8.             return;  
  9.         }  
  10.         String transactedStr = transactedTextField.getText();  
  11.         boolean transacted = false;  
  12.         if (ValueWidget.isNullOrEmpty(transactedStr)) {  
  13.             transacted = false;  
  14.         } else {  
  15.             transacted = Boolean.parseBoolean(transactedStr);  
  16.         }  
  17.         String message = "transacted:" + transacted;  
  18.         ComponentUtil.appendResult(resultTextArea, message, true);  
  19.         System.out.println(message);  
  20.         String brokerUrl = String.format(BROKER_URL,  
  21.                 serverIpTextField.getText());  
  22.         // 创建链接工厂  
  23.         TopicConnectionFactory factory = new ActiveMQConnectionFactory(  
  24.                 ActiveMQConnection.DEFAULT_USER,  
  25.                 ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);  
  26.         ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true);  
  27.         // 通过工厂创建一个连接  
  28.           
  29.         connection = factory.createTopicConnection();  
  30.         // 启动连接  
  31.         connection.start();  
  32.         ComponentUtil.appendResult(resultTextArea, "启动connection 成功"true);  
  33.         // 创建一个session会话 transacted  
  34.         session = connection.createTopicSession(  
  35.                 transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE);  
  36.   
  37.     }  

 

 

项目源代码见附件mqtt_swing.zip

手机android客户端(测试推送)见附件android-mqtt-push-master.zip

也可以从 https://github.com/tokudu/AndroidPushNotificationsDemo

 

  下载

详细配置参阅附件mqtt推送详解.zip

 依赖的jar包

io0007-find_progess-0.0.8.4-SNAPSHOT.jar

io0007-find_progess-0.0.8.4-SNAPSHOT-sources.jar

2
0
分享到:
参考知识库
Linux知识库 562  关注 | 579  收录
HTML5知识库 1710  关注 | 145  收录
OpenCV知识库 508  关注 | 1023  收录
软件测试知识库 571  关注 | 244  收录
评论
14 楼 a406979945 2016-01-12  
你好,这个swing的消息推送与接收都能连接到服务器  但是消息发送出来 后 并不能接收到,
13 楼 zhaoke2011 2015-08-08  
运行你的那个20150621 的,提示错误: 找不到或无法加载主类 com.mqtt.hw.MqttClientSwing,新手 求教  qq  502727023 谢谢
11 楼 hw1287789687 2015-06-21  
最新的源代码 下载地址:http://pan.baidu.com/s/1sjLYibb
10 楼 hw1287789687 2015-06-21  
jackyx 写道
import com.swing.component.AssistPopupTextPane
没找到这个包呢,能否发一份最新完整的给我

这个类在附件的io0007-find_progess-0.0.8.4-SNAPSHOT.jar
9 楼 c1007857613 2015-06-05  
引用
    
8 楼 hw1287789687 2014-09-30  
jackyx 写道
你好,请问有IOS和推送后台(服务器端)的资料吗?

ios 推送资料下载地址:http://pan.baidu.com/s/1i35kRch
7 楼 hw1287789687 2014-09-30  
import com.swing.component.AssistPopupTextPane 所在的jar包下载地址:
http://pan.baidu.com/s/1mghyjUw ,http://pan.baidu.com/s/1c0y90u0
mqtt推送详解.zip  下载地址:http://pan.baidu.com/s/1o6r31Ui
6 楼 hw1287789687 2014-09-30  
jackyx 写道
import com.swing.component.AssistPopupTextPane
没找到这个包呢,能否发一份最新完整的给我

http://pan.baidu.com/s/1hqlAekW
5 楼 jackyx 2014-09-28  
你的jar不全,能否提供下载?
4 楼 jackyx 2014-09-28  
import com.swing.component.AssistPopupTextPane
没找到这个包呢,能否发一份最新完整的给我
3 楼 jackyx 2014-09-28  
你好,请问有IOS和推送后台(服务器端)的资料吗?
2 楼 lwhhuahua 2014-09-11  
mqtt推送详解.zip  这个附件呢?
1 楼 hw1287789687 2014-07-02  
需要什么资料可以给我留言,我还做过ios 推送后台(服务器端)

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多