分享

Spring集成RabbitMQ并实现延迟队列

 WindySky 2018-02-26

一、说明

在实际业务场景中可能会用到延时消息发送,例如异步回调失败时的重发机制。 RabbitMQ本身不具有延时消息队列的功能,但是可以通过rabbitmq-delayed-message-exchange来实现(也可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现,我们主要讲解通过延迟插件来实现的方法)。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。 

二、安装插件

RabbitMQ的安装请参考我的文章“RabbitMQ安装与使用”,这里我们重点讲插件的安装。

首先到http://www./community-plugins.html网页下载适合的“rabbitmq_delayed_message_exchange插件”。下载完成后将它放到RabbitMQ插件安装目录({rabbitmq-server}/plugins/),然后执行命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange启用插件,执行命令rabbitmq-plugins disable rabbitmq_delayed_message_exchange也可以关闭插件。具体过程可以查看参考文档2。

三、Spring集成RabbitMQ

1、maven配置

  1. <dependency>  
  2.     <groupId>org.springframework.amqp</groupId>  
  3.     <artifactId>spring-amqp</artifactId>  
  4.     <version>1.6.6.RELEASE</version>  
  5.     <exclusions>  
  6.         <exclusion>  
  7.             <groupId>org.springframework</groupId>  
  8.             <artifactId>spring-core</artifactId>  
  9.             <version>4.1.6.RELEASE</version>  
  10.         </exclusion>  
  11.     </exclusions>  
  12. </dependency>  
  13. <dependency>  
  14.     <groupId>org.springframework.amqp</groupId>  
  15.     <artifactId>spring-rabbit</artifactId>  
  16.     <version>1.6.6.RELEASE</version>  
  17.     <exclusions>  
  18.         <exclusion>  
  19.             <groupId>org.springframework</groupId>  
  20.             <artifactId>spring-core</artifactId>  
  21.             <version>4.1.6.RELEASE</version>  
  22.         </exclusion>  
  23.         <exclusion>  
  24.             <groupId>org.springframework</groupId>  
  25.             <artifactId>spring-messaging</artifactId>  
  26.             <version>4.1.6.RELEASE</version>  
  27.         </exclusion>  
  28.         <exclusion>  
  29.             <groupId>org.springframework</groupId>  
  30.             <artifactId>spring-tx</artifactId>  
  31.             <version>4.1.6.RELEASE</version>  
  32.         </exclusion>  
  33.         <exclusion>  
  34.             <groupId>org.springframework</groupId>  
  35.             <artifactId>spring-context</artifactId>  
  36.             <version>4.1.6.RELEASE</version>  
  37.         </exclusion>  
  38.     </exclusions>  
  39. </dependency>  
说明:实现延迟队列需要Spring在4.1以上,spring-amqp在1.6以上。

2、xml配置

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www./schema/beans"  
  3.     xmlns:xsi="http://www./2001/XMLSchema-instance" xmlns:tx="http://www./schema/tx"  
  4.     xmlns:util="http://www./schema/util" xmlns:context="http://www./schema/context"  
  5.     xmlns:rabbit="http://www./schema/rabbit"  
  6.     xsi:schemaLocation="http://www./schema/beans http://www./schema/beans/spring-beans-3.1.xsd  
  7.                             http://www./schema/context   
  8.                             http://www./schema/context/spring-context-3.1.xsd  
  9.                             http://www./schema/tx  
  10.                             http://www./schema/tx/spring-tx.xsd   
  11.                             http://www./schema/aop  
  12.                             http://www./schema/aop/spring-aop.xsd  
  13.                             http://www./schema/util http://www./schema/util/spring-util-3.1.xsd   
  14.                             http://www./schema/rabbit http://www./schema/rabbit/spring-rabbit-1.6.xsd">  
  15.     <context:property-placeholder location="classpath:rmq-config.properties" ignore-unresolvable="true"/>  
  16.   
  17.     <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
  18.         <property name="host" value="${rabbitmq.host}" />  
  19.         <property name="port" value="${rabbitmq.port}" />  
  20.         <property name="username" value="${rabbitmq.username}" />  
  21.         <property name="password" value="${rabbitmq.password}" />  
  22.         <property name="channelCacheSize" value="${rabbitmq.channel.cacheSize}" />  
  23.     </bean>  
  24.   
  25.     <bean id="orderConsumer" class="com.xxx.rmq.OrderConsumer"></bean>  
  26.     <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />  
  27.     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />  
  28.   
  29.     <rabbit:admin connection-factory="connectionFactory" />  
  30.       
  31.     <!-- 延迟消息start -->  
  32.     <rabbit:topic-exchange name="delay_exchange" delayed="true">  
  33.         <rabbit:bindings>  
  34.             <rabbit:binding queue="delay_queue" pattern="order.delay.notify" />  
  35.         </rabbit:bindings>  
  36.     </rabbit:topic-exchange>  
  37.       
  38.     <rabbit:queue name="delay_queue" durable="true" auto-declare="true" auto-delete="false" />  
  39.       
  40.     <rabbit:template id="delayMsgTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" exchange="delay_exchange" />  
  41.       
  42.     <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" acknowledge="auto" message-converter="jsonMessageConverter">  
  43.         <rabbit:listener queues="delay_queue" ref="orderConsumer" method="delayMsg" />  
  44.     </rabbit:listener-container>  
  45.     <!-- 延迟消息end -->  
  46.   
  47. </beans>  
说明:spring-rabbit-1.6.xsd必须是1.6及以上版本,否则会报“元素 'rabbit:topic-exchange' 中不允许出现属性 'delayed'”错误。具体请查看参考文档3。

四、延迟队列的使用

1、发送消息Producer

  1. import net.sf.json.JSONObject;  
  2.   
  3. import org.apache.commons.lang.StringUtils;  
  4. import org.springframework.amqp.AmqpException;  
  5. import org.springframework.amqp.core.AmqpTemplate;  
  6. import org.springframework.amqp.core.Message;  
  7. import org.springframework.amqp.core.MessagePostProcessor;  
  8. import org.springframework.beans.factory.annotation.Autowired;  
  9. import org.springframework.stereotype.Service;  
  10. /**  
  11.  *  
  12.  * @author Horace  
  13.  * @version 创建时间:2016年10月26日 下午6:34:31  
  14.  */  
  15. @Service  
  16. public class MessageProducerServiceImpl implements MessageProducerService{  
  17.     @Autowired  
  18.     private AmqpTemplate delayMsgTemplate;  
  19.     @Override  
  20.     public void delayMsg(JSONObject msg,int delay) {  
  21.         // TODO Auto-generated method stub   
  22.         final int xdelay= delay*1000;   
  23.         delayMsgTemplate.convertAndSend("order.delay.notify", (Object) msg,  
  24.                 new MessagePostProcessor() {  
  25.   
  26.                     @Override  
  27.                     public Message postProcessMessage(Message message)  
  28.                             throws AmqpException {  
  29.                         // TODO Auto-generated method stub  
  30.                         message.getMessageProperties().setDelay(xdelay);  
  31.                         return message;  
  32.                     }  
  33.                 });  
  34.     }  
  35. }  

2、异步接收消息Consumer

  1. import net.sf.json.JSONObject;  
  2. import org.apache.commons.lang.StringUtils;  
  3. import org.slf4j.Logger;  
  4. import org.slf4j.LoggerFactory;  
  5. import org.springframework.beans.factory.annotation.Autowired;  
  6.   
  7. /**  
  8.  * 
  9.  * @author Horace  
  10.  * @version 创建时间:2016年10月26日 下午2:48:14  
  11.  */  
  12. public class OrderConsumer {  
  13.       
  14.     private static Logger logger = LoggerFactory.getLogger(OrderConsumer.class);  
  15.       
  16.     @Autowired  
  17.     private MessageProducerService messageProducerService;  
  18.       
  19.       
  20.     public void delayMsg(Object obj) {  
  21.         logger.info("[延时消息]" + obj);  
  22.         if (obj != null) {  
  23.             JSONObject notifyJson = JSONObject.fromObject(obj);  
  24.             String notifyUrl = notifyJson.getString("notifyUrl");  
  25.             String notifyContent = notifyJson.getString("notifyContent");  
  26.             String result = HttpUtil.postMessage(notifyUrl, notifyContent);  
  27.             if (StringUtils.isBlank(result)) { // 通知失败 进入重发机制  
  28.                 int newNotifyCount = notifyJson.getInt("notifyCount") + 1; //已经通知的次数  
  29.                 if (newNotifyCount < 5) {  
  30.                     notifyJson.put("notifyCount", newNotifyCount);  
  31.                     int spacingInterval = getSpacingInterval(newNotifyCount);  
  32.                     messageProducerService  
  33.                             .delayMsg(notifyJson, spacingInterval);  
  34.                 } else {  
  35.                     logger.info("通知5次都失败,等待后台手工处理!");  
  36.                 }  
  37.             }  
  38.         }  
  39.     }  
  40.       
  41.     /** 
  42.      * 重复通知间隔时间(单位为秒) 
  43.      * @param notifyCount 已经通知的次数 
  44.      * @return 
  45.      */  
  46.     private int getSpacingInterval(int notifyCount) {  
  47.         // TODO Auto-generated method stub  
  48.         int spacingInterval = 0;  
  49.         switch (notifyCount) {  
  50.         case 1:  
  51.             spacingInterval = 10;  
  52.             break;  
  53.         case 2:  
  54.             spacingInterval = 20;  
  55.             break;  
  56.         case 3:  
  57.             spacingInterval = 30;  
  58.             break;  
  59.         case 4:  
  60.             spacingInterval = 60;  
  61.             break;  
  62.         case 5:  
  63.             spacingInterval = 90;  
  64.             break;  
  65.         default:  
  66.             break;  
  67.         }  
  68.         return spacingInterval;  
  69.     }  
  70.       
  71. }  


参考文档:

1、http://blog.csdn.net/tongdao/article/details/51638066 RabbitMQ安装与使用

2、http://blog.csdn.net/u014308482/article/details/53036770  rabbitmq 实现延迟队列的两种方式

3、http://docs./spring-amqp/docs/1.6.0.RELEASE/reference/html/_reference.html#delayed-message-exchange

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约