分享

springboot如何集成mqtt消息推送

 贾朋亮博客 2019-07-19
1.需求分析
    近期笔者项目需要用到mqtt实现消息推送,笔者选择emq作为mqtt服务器载体,上篇笔者讲解了如何在linux中安装mqtt服务,安装链接:https://blog.csdn.net/zhangxing52077/article/details/80567270,接下来笔者将讲解如何在springboot中集成mqtt

2.实现方案
①pom依赖

<!--mqtt-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>
②yml中配置mqtt(自定义配置)

#mq配置
com:
  mqtt:
    host: tcp://ip:1883
    clientid: mqttjs_e8022a4d0b
    topic: good,test,yes
    username: zhangxing
    password: zxp52077
    timeout: 10
    keepalive: 20
③创建mqtt消息属性配置类

@Component
@ConfigurationProperties(prefix = "com.mqtt")
@Setter
@Getter
public class MqttConfiguration {

    private String host;

    private String clientid;

    private String topic;

    private String username;

    private String password;

    private int timeout;

    private int keepalive;

}
④创建mqtt消息推送实体

@Slf4j
@Setter
@Getter
public class PushPayload {
    //推送类型
    private String type;
    //推送对象
    private String mobile;
    //标题
    private String title;
    //内容
    private String content;
    //数量
    private Integer badge = 1;
    //铃声
    private String sound = "default";


    public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){
        this.type = type;
        this.mobile = mobile;
        this.title = title;
        this.content = content;
        this.badge = badge;
        this.sound = sound;
    }

    public static class Builder{
        //推送类型
        private String type;
        //推送对象
        private String mobile;
        //标题
        private String title;
        //内容
        private String content;
        //数量
        private Integer badge = 1;
        //铃声
        private String sound = "default";

        public Builder setType(String type) {
            this.type = type;
            return this;
        }

        public Builder setMobile(String mobile) {
            this.mobile = mobile;
            return this;
        }

        public Builder setTitle(String title) {
            this.title = title;
            return this;
        }

        public Builder setContent(String content) {
            this.content = content;
            return this;
        }

        public Builder setBadge(Integer badge) {
            this.badge = badge;
            return this;
        }

        public Builder setSound(String sound) {
            this.sound = sound;
            return this;
        }

        public PushPayload bulid(){
           return new PushPayload(type,mobile,title,content,badge,sound);
        }
    }


    public static Builder getPushPayloadBuider(){
        return new Builder();
    }


    @Override
    public String toString() {
        return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);
    }

  
}
⑤创建mqtt消息推送或订阅客户端

@Slf4j
public class MqttPushClient {

    private MqttClient client;

    private static volatile MqttPushClient mqttPushClient = null;

    public static MqttPushClient getInstance(){

        if(null == mqttPushClient){
            synchronized (MqttPushClient.class){
                if(null == mqttPushClient){
                    mqttPushClient = new MqttPushClient();
                }
            }

        }
        return mqttPushClient;

    }

    private MqttPushClient() {
        connect();
    }

    private void connect(){
        try {
            client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            options.setUserName(PropertiesUtil.MQTT_USER_NAME);
            options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
            options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
            options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
            try {
                client.setCallback(new PushCallback());
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布,默认qos为0,非持久化
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic,PushPayload pushMessage){
        publish(0, false, topic, pushMessage);
    }

    /**
     * 发布
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.toString().getBytes());
        MqttTopic mTopic = client.getTopic(topic);
        if(null == mTopic){
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题,qos默认为0
     * @param topic
     */
    public void subscribe(String topic){
        subscribe(topic,0);
    }

    /**
     * 订阅某个主题
     * @param topic
     * @param qos
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws Exception {
        String kdTopic = "good";
        PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("15345715326")
                 .setContent("designModel")
                .bulid();
        MqttPushClient.getInstance().publish(0, false, kdTopic, pushMessage);
    }
}
⑥配置获取类的编写

public class PropertiesUtil {

   public static String MQTT_HOST;
   public static String MQTT_CLIENTID;
   public static String MQTT_USER_NAME;
   public static String MQTT_PASSWORD;
   public static int MQTT_TIMEOUT;
   public static int MQTT_KEEP_ALIVE;


   public static final String ELASTIC_SEARCH_HOST;

   public static final int ELASTIC_SEARCH_PORT;

   public static final String ELASTIC_SEARCH_CLUSTER_NAME;

   static {
      MQTT_HOST = loadMqttProperties().getProperty("MQTT_HOST");
      MQTT_CLIENTID = loadMqttProperties().getProperty("MQTT_CLIENTID");
      MQTT_USER_NAME = loadMqttProperties().getProperty("MQTT_USER_NAME");
      MQTT_PASSWORD = loadMqttProperties().getProperty("MQTT_PASSWORD");
      MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("MQTT_TIMEOUT"));
      MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("MQTT_KEEP_ALIVE"));

   }

   static {
      ELASTIC_SEARCH_HOST = loadEsProperties().getProperty("ES_HOST");
      ELASTIC_SEARCH_PORT = Integer.valueOf(loadEsProperties().getProperty("ES_PORT"));
      ELASTIC_SEARCH_CLUSTER_NAME = loadEsProperties().getProperty("ES_CLUSTER_NAME");
   }

   private static Properties loadMqttProperties() {
      InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.yml");
      Properties properties = new Properties();
      try {
         properties.load(inputstream);
         return properties;
      } catch (IOException e) {
         throw new RuntimeException(e);
      } finally {
         try {
            if (inputstream != null) {
               inputstream.close();
            }
         } catch (IOException e) {
            throw new RuntimeException(e);
         }
      }
   }

   private static Properties loadEsProperties() {
      InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/elasticsearch.properties");
      Properties properties = new Properties();
      try {
         properties.load(inputstream);
         return properties;
      } catch (IOException e) {
         throw new RuntimeException(e);
      } finally {
         try {
            if (inputstream != null) {
               inputstream.close();
            }
         } catch (IOException e) {
            throw new RuntimeException(e);
         }
      }
   }


}
⑦mqtt推送回调类

/**
 * @auther zx
 * @date 2018/5/28 9:20
 */
public class PushCallback implements MqttCallback {

    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
}
3.效果测试
@Test
public void test() {

   PushPayload pushPayload = PushPayload.getPushPayloadBuider().setContent("test")
         .setMobile("119")
         .setType("2018")
         .bulid();
   mqttClientComponent.push("yes",pushPayload);

}
mqtt客户端效果显示

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多