分享

RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

 小虚竹 2021-11-30
技术活,该赏
关注+一键三连(点赞,评论,收藏)再看,养成好习惯

RocketMQ使用教程相关系列 目录



源代码说明

MessageExt位于此包下:

package org.apache.rocketmq.common.message;

MessageExt的源码解析

参数说明在代码里

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;

    //记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
    private int queueId;

    //记录消息在Broker存盘大小
    private int storeSize;

    //记录在ConsumeQueue中的偏移
    private long queueOffset;

    //记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
    private int sysFlag;

    //消息创建时间,在Producer发送消息时设置
    private long bornTimestamp;

    //记录发送该消息的producer地址
    private SocketAddress bornHost;

    //消息存储时间
    private long storeTimestamp;

    //记录存储该消息的Broker地址
    private SocketAddress storeHost;

    //消息Id
    private String msgId;

    //记录消息在Broker中存储偏移
    private long commitLogOffset;

    //消息内容CRC校验值
    private int bodyCRC;

    //消息重试消费次数
    private int reconsumeTimes;

    //这个参数没看懂,知道的大佬分享下
    private long preparedTransactionOffset;

//省略get set

   
    @Override
    public String toString() {
        return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
            + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
            + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
            + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
            + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
            + ", toString()=" + super.toString() + "]";
    }
}

继承于Message,Message源码解析

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;

    //网络通信层标记
    private int flag;

    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;

    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;

    //事务消息相关的事务编号
    private String transactionId;

   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }

        return 0;
    }

//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

//省略 get set

    
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}

喝点毒鸡汤

每天读点源码,进步一小步,成长一大步。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多