分享

IBM MQTT消息推送

 WindySky 2018-03-06

需求
前端(C# .net)订阅主题,后端(JAVA)服务推送。

相关环境

  1. java JDK1.7
  2. maven
  3. ibm mq 1.0
  4. spring
  5. … …

MQ MAVEN相关:

<properties>
<jdk.version>1.7</jdk.version>
<spring.version>4.2.5.RELEASE</spring.version>
<ibm.mq.version>1.0</ibm.mq.version>
... ...
</properties>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
<!-- IBM MQ START -->
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>jms</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.allclient</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.axis2</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.commonservices</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.defaultconfig</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.headers</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mqjms</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>fscontext</artifactId>
            <version>${ibm.mq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <!-- IBM MQ END -->
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

后端MQ推送代码

package com.***.mq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.util.ObjectUtils;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ibm.mq.jms.MQConnectionFactory;
import com.***.common.exception.RmsException;
import com.***.common.json.JSONUtils;
import com.***.common.util.ApplicationUtils;
import com.***.pojo.PushPojo;
import com.***.util.ConfigUtil;
import com.***.util.enums.EnumTopic;

public class JMSSender {

    private static Logger logger = LogManager.getLogger(JMSSender.class);

    /**
     * 发送主题消息
     * 
     * @date 2016-7-22
     * @param topic
     *            主题信息 EnumTopic中选择
     * @param message
     *            消息内容
     * @param username
     *            用户名
     * @param password
     *            密码
     * @throws JMSException
     */
    public static void jmsSender(String topic, String message, String username,
            String password) {
        if (logger.isDebugEnabled()) {
            logger.debug("mq push start");
            logger.debug("username:" + username);
            logger.debug("password:" + password);
        }
        MQConnectionFactory mqcf = ApplicationUtils.getBean(
                "jmsConnectionFactory", MQConnectionFactory.class);
        try {
            try (
                    Connection conn = mqcf.createConnection(username, password);
                    Session sion = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
                ) {
                logger.debug("topic:" + topic);
                logger.info("push message:" + message);
                Destination dti = sion.createTopic(topic);
                MessageProducer pdc = sion.createProducer(dti);
                TextMessage msg = sion.createTextMessage(message);
                conn.start();
                pdc.send(msg);
                logger.debug("push successfully");
            }
        } catch (JMSException e) {
            if (logger.isErrorEnabled()) {
                logger.error(e);
            }
            RmsException.throwException("mq.rms.jms");
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("mq push end");
            }
        }
    }

    /**
     * 对推送的JSON消息进行二次处理。
     */
    private static void jmsSender(String topic, PushPojo o) {
        String message = JSONUtils.toJSONString(o,SerializerFeature.WriteMapNullValue,SerializerFeature.WriteNullStringAsEmpty,
                 SerializerFeature.WriteNullNumberAsZero,SerializerFeature.WriteDateUseDateFormat);
        jmsSender(topic, message, ConfigUtil.get("ibm.mq.username"),ConfigUtil.get("ibm.mq.password"));
    }

    /**
     * 入口。
     * 
     * @param topic
     *            主题
     * @param o
     *            推送的消息
     */
    public static void jmsSender(EnumTopic topic , PushPojo o) {
        int addCount = !ObjectUtils.isEmpty(o.getAddData()) ? o.getAddData().length : 0;
        int editCount = !ObjectUtils.isEmpty(o.getEditData()) ? o.getEditData().length : 0;
        int delCount = StringUtils.isNotBlank(o.getDelData()) ? o.getDelData().split(",").length : 0;
        StringBuffer s = new StringBuffer("");
        s.append("push addData[" + addCount + "]");
        s.append(",editData[" + editCount + "]");
        s.append(",delData[" + delCount + "]");
        logger.info(s.toString());

        jmsSender(topic.getCode(), o);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108

PushPojo实体代码

package com.***.pojo;

public class PushPojo {
    private String type;
    /** 值机/行李/值机/转盘 */
    private String module;
    private Object[] addData;   // 需要增加的数据
    private Object[] editData;  // 需要修改的数据
    private String delData;     // 需要删除的数据ID

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getModule() {
        return module;
    }

    public void setModule(String module) {
        this.module = module;
    }

    public String getDelData() {
        return delData;
    }

    public void setDelData(String delData) {
        this.delData = delData;
    }

    public Object[] getAddData() {
        return addData;
    }

    public void setAddData(Object[] addData) {
        this.addData = addData;
    }

    public Object[] getEditData() {
        return editData;
    }

    public void setEditData(Object[] editData) {
        this.editData = editData;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

主题枚举

package com.***.util.enums;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public enum EnumTopic {
    Flight("Flight", "航班主题", "topic://flight"), 
    Stand("Stand", "机位模块主题","topic://stand"),
    Gate("Gate", "登机口模块主题","topic://gate"),
    Carouse("Carouse", "行李转盘模块主题","topic://carouse"),
    Counter("Counter", "值机柜台模块主题","topic://counter");

    private String en;
    private String zhCN;
    private String code;

    EnumTopic(String en, String zhCN, String code) {
        this.en = en;
        this.zhCN = zhCN;
        this.code = code;
    }

    public String getEn() {
        return en;
    }

    public String getCode() {
        return code;
    }

    public String getZhCN() {
        return zhCN;
    }

    private static List<EnumTopic> list;
    static {
        list = new ArrayList<EnumTopic>(Arrays.asList(values()));
    }

    public static List<EnumTopic> getList() {
        return list;
    }

    public static boolean isExist(String code){
        return null != get(code) ? true : false;
    }

    public static EnumTopic get(String code){
        EnumTopic r = null;
        List<EnumTopic> list = getList();
        for(EnumTopic l : list){
            if(l.getCode().equals(code)){
                r = l;
                break;
            }
        }
        return r;
    }

    public static String toJson() {
        List<EnumTopic> list = getList();
        StringBuffer s = new StringBuffer("[");
        int i = 0;
        for (EnumTopic l : list) {
            s.append("{");
            s.append("\"en\"").append(":").append("\"" + l.getEn() + "\"");
            s.append(",");
            s.append("\"zhCN\"").append(":").append("\"" + l.getZhCN() + "\"");
            s.append(",");
            s.append("\"code\"").append(":").append("\"" + l.getCode() + "\"");
            s.append("}");
            if (i++ != list.size() - 1) {
                s.append(",");
            }
        }
        return s.append("]").toString();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

MQ用户名通道等其它配置不用过多解释吧。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约