分享

使用RocketMQTemplate发送各种消息

 观审美2 2023-12-08 发布于北京

引言

RocketMQTemplate是spring boot为RocketMQ提供的模板类,发送各种消息更方便,提供了许多重载的方法发送各种消息,本文只演示部分方法的使用,做抛砖引玉。

项目依赖

pom.xml

复制代码
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance" xsi:schemaLocation="http://maven./POM/4.0.0 https://maven./xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.neo</groupId> <artifactId>user-center</artifactId> <version>0.0.1-SNAPSHOT</version> <name>user-center</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper-spring-boot-starter</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <!--整合 spring cloud--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR3</version> <type>pom</type> <scope>import</scope> </dependency> <!--整合 spring cloud alibaba--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-maven-plugin</artifactId> <version>1.3.6</version> <configuration> <configurationFile> ${basedir}/src/main/resources/generator/generatorConfig.xml </configurationFile> <overwrite>true</overwrite> <verbose>true</verbose> </configuration> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.20</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper</artifactId> <version>4.1.5</version> </dependency> </dependencies> </plugin> </plugins> </build> </project>

消息生产者

复制代码
package com.carnation.rocketmq; import com.alibaba.fastjson.JSON; import com.carnation.domain.dto.SysUserDto; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import java.util.List; import java.util.UUID; /** * @author zy * @date 2020/6/10 * @since 1.0.0 */ @Slf4j @Service public class RocketMQTemplateProducer { @Autowired private RocketMQTemplate template; /** * 发送普通消息 * * @param topic topic * @param message 消息体 */ public void sendMessage(String topic, Object message) { this.template.convertAndSend(topic, message); log.info("普通消息发送完成:message = {}", message); } /** * 发送同步消息 * * @param topic topic * @param message 消息体 */ public void syncSendMessage(String topic, Object message) { SendResult sendResult = this.template.syncSend(topic, message); log.info("同步发送消息完成:message = {}, sendResult = {}", message, sendResult); } /** * 发送异步消息 * * @param topic topic * @param message 消息体 */ public void asyncSendMessage(String topic, Object message) { this.template.asyncSend(topic, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("异步消息发送成功,message = {}, SendStatus = {}", message, sendResult.getSendStatus()); } @Override public void onException(Throwable e) { log.info("异步消息发送异常,exception = {}", e.getMessage()); } }); } /** * 发送单向消息 * * @param topic topic * @param message 消息体 */ public void sendOneWayMessage(String topic, Object message) { this.template.sendOneWay(topic, message); log.info("单向发送消息完成:message = {}", message); } /** * 同步发送批量消息 * * @param topic topic * @param messageList 消息集合 * @param timeout 超时时间(毫秒) */ public void syncSendMessages(String topic, List<Message<?>> messageList, long timeout) { this.template.syncSend(topic, messageList, timeout); log.info("同步发送批量消息完成:message = {}", JSON.toJSONString(messageList)); } /** * 发送事务消息 * * @param topic topic * @param message 消息对象 */ public void sendMessageInTransaction(String topic, SysUserDto message) { String transactionId = UUID.randomUUID().toString(); TransactionSendResult result = this.template.sendMessageInTransaction(topic, MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .build(), message); log.info("发送事务消息(半消息)完成:result = {}", result); } /** * 发送携带 tag 的消息(过滤消息) * * @param topic topic,RocketMQTemplate将 topic 和 tag 合二为一了,底层会进行 * 拆分再组装。只要在指定 topic 时跟上 {:tags} 就可以指定tag * 例如 test-topic:tagA * @param message 消息体 */ public void syncSendMessageWithTag(String topic, Object message) { this.template.syncSend(topic, message); log.info("发送带 tag 的消息完成:message = {}", message); } /** * 同步发送延时消息 * * @param topic topic * @param message 消息体 * @param timeout 超时 * @param delayLevel 延时等级:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级, * 从1s到2h分别对应着等级 1 到 18,消息消费失败会进入延时消息队列 * "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; */ public void syncSendDelay(String topic, Object message, long timeout, int delayLevel) { this.template.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout, delayLevel); log.info("已同步发送延时消息 message = {}", message); } /** * 异步发送延时消息 * * @param topic topic * @param message 消息对象 * @param timeout 超时时间 * @param delayLevel 延时等级 */ public void asyncSendDelay(String topic, Object message, long timeout, int delayLevel) { this.template.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("异步发送延时消息成功,message = {}", message); } @Override public void onException(Throwable throwable) { log.error("异步发送延时消息发生异常,exception = {}", throwable.getMessage()); } }, timeout, delayLevel); log.info("已异步发送延时消息 message = {}", message); } /** * 发送单向顺序消息 * * @param topic topic */ public void sendOneWayOrderly(String topic) { for (int i = 0; i < 30; i++) { this.template.sendOneWayOrderly(topic, MessageBuilder.withPayload("message - " + i).build(), "topic"); log.info("单向顺序发送消息完成:message = {}", "message - " + i); } } /** * 同步发送顺序消息 * * @param topic topic */ public void syncSendOrderly(String topic) { for (int i = 0; i < 30; i++) { SendResult sendResult = this.template.syncSendOrderly(topic, MessageBuilder.withPayload("message - " + i).build(), "syncOrderlyKey"); log.info("同步顺序发送消息完成:message = {}, sendResult = {}", "message - " + i, sendResult); } } }
  • 发送批量消息,在早期版本有个BUG, 即使构造了批量消息体,也调用不到发送批量消息的方法,依赖rocketmq-spring-boot-starter升级到2.0.4以上即可正常调用了。
  • 发送事务消息的方法在rocketmq-spring-boot-starter2.0.4开始有了变化,参数少了,据说是为了解决一个多线程场景下的安全问题,新版本不管工程里发送多少事务消息,都只能有一个本地事务监听器类,监听本地事务执行情况,然后再决定消息是否发送并可被消费,老版本每个事务消息对应一个本地事务监听器类

服务事务消息的本地事务监听器

复制代码
package com.carnation.rocketmq; import com.carnation.dao.rocketmq.RocketmqTransactionLogMapper; import com.carnation.domain.dto.SysUserDto; import com.carnation.domain.entity.RocketmqTransactionLog; import com.carnation.domain.entity.SysUser; import com.carnation.service.SysUserService; import com.carnation.util.BeanUtils; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** * @author zy * @version 1.0.0 * @date 2020/6/17 */ @Slf4j @RocketMQTransactionListener public class TestMessageTransactionListener implements RocketMQLocalTransactionListener { private final SysUserService sysUserService; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; public TestMessageTransactionListener(SysUserService sysUserService, RocketmqTransactionLogMapper rocketmqTransactionLogMapper) { this.sysUserService = sysUserService; this.rocketmqTransactionLogMapper = rocketmqTransactionLogMapper; } @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { MessageHeaders headers = message.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); SysUserDto dto = (SysUserDto) o; SysUser user = new SysUser(); BeanUtils.copyProperties(dto, user); try { //执行本地事务 this.sysUserService.insertWithRocketLog(user, transactionId); //返回本地事务执行状态为提交,发送事务消息 log.info("本地事务正常,消息可以被发送了.."); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne( RocketmqTransactionLog.builder() .transactionId(transactionId) .build()); if (transactionLog != null) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } }

测试端点

复制代码
package com.carnation.controller.rocketmq; import com.alibaba.fastjson.JSON; import com.carnation.domain.dto.SysUserDto; import com.carnation.rocketmq.RocketMQTemplateProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; /** * 使用 RocketMQTemplate 发送各种消息 * * @author zy * @date 2020/6/9 */ @Slf4j @RestController @RequestMapping("/producer/template") public class RocketTemplateController { @Autowired private RocketMQTemplateProducer producer; /** * 发送普通消息 */ @GetMapping("/ordinary") public String sendMessage() { SysUserDto sysUserDto = SysUserDto.builder() .accountName("chen") .userName("hua") .password("123456") .build(); this.producer.sendMessage("ordinary-message", sysUserDto); return "success:消息已发送:message = " + JSON.toJSONString(sysUserDto); } /** * 同步发送消息 * * @return 反馈信息 */ @GetMapping("/sync") public String syncSendMessage() { SysUserDto sysUserDto = SysUserDto.builder() .accountName("sync") .userName("message") .password("12ds456") .build(); this.producer.syncSendMessage("sync-message", sysUserDto); return "success:消息已同步发送:message = " + JSON.toJSONString(sysUserDto); } /** * 异步发送消息 * * @return 反馈信息 */ @GetMapping("/async") public String asyncSendMessage() { SysUserDto sysUserDto = SysUserDto.builder() .accountName("async") .userName("message") .password("8752ert") .build(); this.producer.asyncSendMessage("async-message", sysUserDto); return "success:消息已异步发送:message = " + JSON.toJSONString(sysUserDto); } /** * 发送批量消息 * * @return 反馈信息 */ @GetMapping("/syncMessages") public String asyncSendMessages() { List<Message<?>> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { messageList.add(MessageBuilder.withPayload( SysUserDto.builder() .id(String.valueOf(i)) .accountName("accountName-" + i) .userName("userName-" + i) .password("password-" + i) .email("email-" + i) .build() ).build()); } this.producer.syncSendMessages("sync-messages", messageList, 5000); return "success:已异步批量发送消息"; } /** * 发送事务消息 * * @return 反馈信息 */ @GetMapping("/transactionMessage") public String sendMessageInTransactionMessage() { SysUserDto sysUserDto = SysUserDto.builder() .id(String.valueOf(Math.random())) .accountName("transaction") .userName("message") .password("transaction") .email("transaction@gmail.com") .build(); this.producer.sendMessageInTransaction("transaction-message", sysUserDto); return "success:已发送事务消息:message = " + JSON.toJSONString(sysUserDto); } /** * 单向发送消息 * * @return 反馈信息 */ @GetMapping("/oneWay") public String oneWaySendMessage() { SysUserDto sysUserDto = SysUserDto.builder() .accountName("oneWay") .userName("message") .password("asdew123") .build(); this.producer.sendOneWayMessage("oneWay-message", sysUserDto); return "success:消息已单向发送:message = " + JSON.toJSONString(sysUserDto); } /** * 单向发送顺序消息 * * @return 反馈信息 */ @GetMapping("/oneWayOrderly") public String sendOneWayOrderlyMessage() { this.producer.sendOneWayOrderly("oneWay-order-message"); return "success:已单向发送有序消息.. "; } /** * 同步发送顺序消息 * * @return 反馈信息 */ @GetMapping("/syncOrderly") public String syncSendOrderlyMessage() { this.producer.syncSendOrderly("sync-order-message"); return "success:已同步发送有序消息.. "; } /** * 同步发送延时消息 * * @return 反馈信息 */ @GetMapping("/syncDelay") public String syncSendDelayMessage() { SysUserDto userDto = SysUserDto.builder() .userName("sync") .accountName("delay") .email("delay.5788@gmail.com") .mobile("17898097654") .build(); this.producer.syncSendDelay("sync-delay-message", userDto, 10000, 4); return "success:已同步发送延时消息.. "; } /** * 发送携带tag的消息,以便进行消息过滤 * * @return 反馈信息 */ @GetMapping("/withTag") public String syncSendWithTagMessage() { SysUserDto userDto = SysUserDto.builder() .userName("tag") .accountName("message") .email("tag.5788@gmail.com") .mobile("17898097654") .build(); this.producer.syncSendMessageWithTag("tag-message:testTag", userDto); return "success:已同步发送带 tag 的消息消息.. "; } }

消息消费者

  • 消费同步发送的消息
复制代码
/** * 监听消费同步发送的消息 * * @author zy * @date 2020/6/12 * @since 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "sync-consumer", topic = "sync-message") public class SyncMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("消费到了同步消息: message = {}", JSON.toJSONString(sysUserDto)); } }
  • 消费同步批量发送的消息
复制代码
/** * 监听消费同步批量发送的消息。 * * TIPS: 指定 @RocketMQMessageListener 的属性 consumeMode = ConsumeMode.ORDERLY * 可以在消费消息时保证消费顺序,与生产消息集合时添加到集合中的顺序一致 * 代价是只会创建一个消费者线程消费消息,效率偏低 * * @author zy * @date 2020/6/16 * @since 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "syncs-consumer", topic = "sync-messages") public class SyncMessagesListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("消费到了同步批量消息之: message = {}", JSON.toJSONString(sysUserDto)); } }
  • 消费异步发送的消息
复制代码
/** * 监听消费异步发送的消息 * * @author zy * @date 2020/6/10 * @since 1.0.0 */ @Slf4j @Service @RocketMQMessageListener(consumerGroup = "async-consumer", topic = "async-message") public class AsyncMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("消费到了异步消息: message = {}", JSON.toJSONString(sysUserDto)); } }
  • 消费单向发送的消息
复制代码
/** * 监听消费单向发送的消息 * * @author zy * @date 2020/6/10 * @since 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "oneWay-consumer", topic = "oneWay-message") public class OneWayMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("消费到了单向消息: message = {}", JSON.toJSONString(sysUserDto)); } }
  • 消费单向有序消息
复制代码
/** * 消费单向有序消息 * TIPS: @RocketMQMessageListener 必须指定属性 * consumeMode = ConsumeMode.ORDERLY 才能顺序消费消息 * * @author zy * @date 2020/6/10 * @since 1.0.0 */ @Service @Slf4j @RocketMQMessageListener(consumerGroup = "oneWay-order-consumer", topic = "oneWay-order-message", consumeMode = ConsumeMode.ORDERLY ) public class OneWayOrderlyMessageListener implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("消费到了单向有序消息: message = {}", s); } }
  • 消费同步发送的延时消息
复制代码
/** * 监听消费同步发送的延时消息 * * @author zy * @date 2020/6/12 * @since 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "sync-delay-consumer", topic = "sync-delay-message") public class SyncDelayMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("消费到了同步延时消息: message = {}", JSON.toJSONString(sysUserDto)); } }
  • 消费同步有序消息
复制代码
/** * 监听消费同步有序消息 * TIPS: @RocketMQMessageListener必须配置属性 * consumeMode = ConsumeMode.ORDERLY 才能顺序消费消息 * * @author zy * @date 2020/6/12 * @since 1.0.0 */ @Service @Slf4j @RocketMQMessageListener(consumerGroup = "sync-orderly-consumer", topic = "sync-order-message", consumeMode = ConsumeMode.ORDERLY ) public class SyncOrderlyMessageListener implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("消费到了同步有序消息: message = {}", s); } }
  • 消费某个 topic 下指定 tag 的消息
复制代码
/** * 监听消费某个 topic 下指定 tag 的消息 * tips: @RocketMQMessageListener 默认 selectorExpression = "*",表示会 * 消费指定 topic 下的所有消息,修改属性 selectorExpression = "tagName" * 可以让该消息监听器消费某个 topic 下指定 tag 的消息,达到消息过滤的效果。 * * @author zy * @date 2020/6/15 * @since 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "tag-consumer", topic = "tag-message", selectorExpression = "testTag", selectorType = SelectorType.TAG ) public class TagMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("消费到了带 tag 的消息: tag = {}, message = {}", "testTag", JSON.toJSONString(sysUserDto)); } }
  • 消费事务消息
复制代码
/** * 监听消费事务消息 * * @author zhaoWenCai * @date 2020/6/17 * @since 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "transaction-consumer", topic = "transaction-message") public class TransactionMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("消费到了事务消息: message = {}", JSON.toJSONString(sysUserDto)); } }

RocketMQ更多信息参考 github.com/apache/rock…

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多