分享

基于MQTT的消息推送

 WindySky 2018-03-20

这段时间学习了推送技术,对xmpp和mqtt 协议做了下比较。

xmpp基于xml信息传递,所以传输信息量比较大,在保持长链接情况下功耗会比较大。

可能还是比较适合用来做聊天之类的通讯应用,而对于智能和物联低功耗设备的推送来说,感觉比较笨重。

而mqtt协议就是针对网络带宽低,高延时,通信不稳定的环境设计的,特别适合物联设备。低通讯量连接保持,简约轻便。

  • 提供了发布/订阅模式,只要订阅了,即使发布时客户端离线,等再次上线时还能收到消息。
  • 提供了发布反馈,客户端收到消息反馈等机制。
  • 提供了发布质量,比如至多一次,至少一次,只有一次。可以根据不同业务要求进行选择。
  • 提供了心跳机制,可自行设置心跳
  • 提供了鉴权机制

可见,提供的功能已经很完整了。


mqtt 详细协议可见:

http://docs./mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

中文版的:

http://wenku.baidu.com/link?url=eyG24CVjmmA_J5KbR_Y8xzDOkq5YsrChshcHxcZQzav2OcVE9KNxukXr1uix1ACPTw36QIKwHRTmLqjM1Bm7qqPwVQYG9BzIv_BdJvPoU8_


现在有很多基于mqtt的开源实现,包含客户端和服务器端(或者说中间件)。

我做了一个demo,客户选用paho 提供client-mqtt 和 android server库,中间件是apollo。

apollo 下载地址:

http://activemq./apollo/download.html

下载完解压按照里面的readme 建立和运行自己的broker,依赖java运行环境。


client-mqtt 和 android server库地址:

可以直接把jar文件导入工程,但是我选择的是把源码放入,因为这样可以根据自己的需要对源码做修改。
android studio上:
1.把jar放入很简单:放到model libs目录下,右键 As libraries就可以了。
2. 源码放入,把sources.jar 分别解压,放到对应java目录下。



3. client.mqtt3 里包含了java的.properties 文件,它们是针对多语言的,这点和android的设计不一样,所以android studio编译时是不能直接把它们打包到class文件里面的。需要单独把它们打包成jar放到lib目录下,这样在运行过程就不会报找不到它们了。

  打包很简单,新建org\eclipse\paho\client\mqttv3\internal\nls 目录,把它们都放到这个目录下。

 然后在org 目录外执行 jar -cvf  properties.jar ./*  , 之后把properties.jar 放入libs下。


  4. 在androidManifest 里面添加

<service android:name="org.eclipse.paho.android.service.MqttService">
</service>
5. 在自己的activity或者service里面通过调用paho.android.service MqttAndroidClient 类实现发布和订阅。

  1. package com.tww.test;  
  2.   
  3. import android.content.ComponentName;  
  4. import android.content.Context;  
  5. import android.content.ServiceConnection;  
  6. import android.os.Bundle;  
  7. import android.os.IBinder;  
  8. import android.os.PowerManager;  
  9. import android.support.v7.app.AppCompatActivity;  
  10. import android.util.Log;  
  11. import android.view.View;  
  12. import android.widget.Button;  
  13.   
  14. import org.eclipse.paho.android.service.MqttAndroidClient;  
  15. import org.eclipse.paho.android.service.MqttTraceHandler;  
  16. import org.eclipse.paho.client.mqttv3.IMqttActionListener;  
  17. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;  
  18. import org.eclipse.paho.client.mqttv3.IMqttToken;  
  19. import org.eclipse.paho.client.mqttv3.MqttCallback;  
  20. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
  21. import org.eclipse.paho.client.mqttv3.MqttException;  
  22. import org.eclipse.paho.client.mqttv3.MqttMessage;  
  23. import org.eclipse.paho.client.mqttv3.MqttSecurityException;  
  24. import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;  
  25.   
  26. import java.io.IOException;  
  27. import java.io.InputStream;  
  28.   
  29. import javax.net.ssl.SSLSocketFactory;  
  30.   
  31. public class MainActivity extends AppCompatActivity  implements View.OnClickListener,ServiceConnection, MqttCallback, IMqttActionListener,MqttTraceHandler {  
  32.     private static final String TAG = "MainActivity";  
  33.     private Button mButton;  
  34.     private MqttAndroidClient mMqttAndroidClient;  
  35.     private String host = "tcp://192.168.43.224:61613";  
  36.     private String userName = "admin";  
  37.     private String passWord = "password";  
  38.     private IMqttToken mConnectToken;  
  39.     private PowerManager.WakeLock mWakeLock;  
  40.     @Override  
  41.     protected void onCreate(Bundle savedInstanceState) {  
  42.         super.onCreate(savedInstanceState);  
  43.         setContentView(R.layout.activity_main);  
  44.         mButton = (Button) findViewById(R.id.button);  
  45.         mButton.setOnClickListener(this);  
  46.         mMqttAndroidClient = new MqttAndroidClient(this,host,"123456789",new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath()));  
  47.         mMqttAndroidClient.setCallback(this);  
  48.         mMqttAndroidClient.setTraceEnabled(true);  
  49.         mMqttAndroidClient.setTraceCallback( this);  
  50.   
  51.         Log.d(TAG,"onCreate");  
  52.         MqttConnectOptions options = new MqttConnectOptions();  
  53.         options.setCleanSession(true);  
  54.         options.setUserName(userName);  
  55.         options.setPassword(passWord.toCharArray());  
  56.         options.setConnectionTimeout(60);  
  57.         options.setKeepAliveInterval(2*60);  
  58.         options.setMqttVersion(3);  
  59.         try {  
  60.             InputStream caInput = getResources().getAssets().open("keystore.bks");  
  61.             if(caInput!=null){  
  62.                 Log.d(TAG,"do setSocketFactory");  
  63.                 SSLSocketFactory sslSocketFactory = mMqttAndroidClient.getSSLSocketFactory(caInput,"password");  
  64.                 options.setSocketFactory(sslSocketFactory);  
  65.             }  
  66.         } catch (IOException e) {  
  67.             e.printStackTrace();  
  68.             Log.e(TAG,"do connect IOException:"+e);  
  69.         }catch (MqttSecurityException e) {  
  70.             e.printStackTrace();  
  71.             Log.e(TAG,"do connect MqttSecurityException:"+e);  
  72.         }  
  73.         try {  
  74.             Log.d(TAG,"do connect");  
  75.             mConnectToken = mMqttAndroidClient.connect(options,this);  
  76.         } catch (MqttException e) {  
  77.             Log.e(TAG,"connect MqttException:"+e);  
  78.             e.printStackTrace();  
  79.         }  
  80.         PowerManager powerManager = (PowerManager)getSystemService(Context.POWER_SERVICE);  
  81.         mWakeLock = powerManager.newWakeLock(PowerManager.FULL_WAKE_LOCK, TAG);  
  82.         mWakeLock.setReferenceCounted(false);  
  83.         mWakeLock.acquire();  
  84.   
  85.     }  
  86.     @Override  
  87.     protected void onDestroy(){  
  88.         super.onDestroy();  
  89.         if(mMqttAndroidClient != null){  
  90.             try {  
  91.                 mMqttAndroidClient.disconnect();  
  92.             } catch (MqttException e) {  
  93.                 e.printStackTrace();  
  94.             }  
  95.             mMqttAndroidClient.close();  
  96.         }  
  97.         mWakeLock.release();  
  98.     }  
  99.   
  100.     @Override  
  101.     public void onClick(View v) {  
  102.         if(v.getId()==R.id.button){  
  103.   
  104.         }  
  105.     }  
  106.   
  107.     @Override  
  108.     public void onServiceConnected(ComponentName name, IBinder service) {  
  109.         Log.d(TAG,"onServiceConnected");  
  110.     }  
  111.   
  112.     @Override  
  113.     public void onServiceDisconnected(ComponentName name) {  
  114.         Log.d(TAG,"onServiceDisconnected");  
  115.     }  
  116.   
  117.     @Override  
  118.     public void connectionLost(Throwable throwable) {  
  119.         Log.d(TAG,"connectionLost");  
  120.     }  
  121.   
  122.     @Override  
  123.     public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {  
  124.         Log.d(TAG,"messageArrived:"+mqttMessage.toString());  
  125.     }  
  126.   
  127.     @Override  
  128.     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  
  129.         Log.d(TAG,"deliveryComplete");  
  130.     }  
  131.   
  132.     @Override  
  133.     public void onSuccess(IMqttToken iMqttToken) {  
  134.         if(mConnectToken.equals(iMqttToken)){  
  135.             try {  
  136.                 Log.d(TAG,"connect success, do subscribe");  
  137.                 mMqttAndroidClient.subscribe("test",1);  
  138.             } catch (MqttException e) {  
  139.                 e.printStackTrace();  
  140.             }  
  141.         }  
  142.     }  
  143.   
  144.     @Override  
  145.     public void onFailure(IMqttToken iMqttToken, Throwable throwable) {  
  146.         if(mConnectToken.equals(iMqttToken)){  
  147.            Log.d(TAG,"onFailure success:"+throwable.toString());  
  148.   
  149.         }  
  150.     }  
  151.   
  152.     @Override  
  153.     public void traceDebug(String s, String s1) {  
  154.         Log.d(s,s1);  
  155.     }  
  156.   
  157.     @Override  
  158.     public void traceError(String s, String s1) {  
  159.         Log.e(s,s1);  
  160.     }  
  161.   
  162.     @Override  
  163.     public void traceException(String s, String s1, Exception e) {  
  164.         Log.e(s,s1,e);  
  165.     }  
  166. }  


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多