分享

Apache ActiveMQ 高级应用

 WindySky 2018-03-21

在某些业务场景下,可能会用到按需分发消息。

对于AMQ他内置了很多的分发策略可供我们选择(DispatchPolicy的实现类),如:PriorityDispatchPolicy, PriorityNetworkDispatchPolicy, RoundRobinDispatchPolicy, SimpleDispatchPolicy, StrictOrderDispatchPolicy。


那我们也可以自己去实现DispatchPolicy接口,做一个适合特定业务场景的分发策略。

首先我们要去http://svn./repos/asf/activemq这里下载对应版本的源码;

本例中为方便起见,我们拷贝了一段SimpleDispatchPolicy类的代码(路径:org.apache.activemq.broker.region.policy),当做我们的自定义类的内容,如:

  1. public class TestDispatchPolicy implements DispatchPolicy {  
  2.   
  3.     public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {  
  4.   
  5.         ActiveMQDestination _destination = node.getMessage().getDestination();  
  6.           
  7.         // 取得Topic name 和前缀,如:topic://Topic.foo  
  8.         System.out.println("-------->_destination.getQualifiedName:"+ _destination.getQualifiedName());  
  9.         // 取得Topic name,如:Topic.foo  
  10.         System.out.println("-------->_destination.getPhysicalName:"+ _destination.getPhysicalName());  
  11.         synchronized (consumers) {  
  12.             int count = 0;  
  13.             for (Iterator iter = consumers.iterator(); iter.hasNext();) {  
  14.                 Subscription sub = (Subscription)iter.next();  
  15.                 // 取得消费者的clientId,如:connection.setClientID("YourClientID");  
  16.                 System.out.println("-------->sub.getContext().getClientId:"+ sub.getContext().getClientId());  
  17.                 // Only dispatch to interested subscriptions  
  18.                 if (!sub.matches(node, msgContext)) {  
  19.                     sub.unmatched(node);  
  20.                     continue;  
  21.                 }  
  22.   
  23.                 sub.add(node);  
  24.                 count++;  
  25.             }  
  26.             return count > 0;  
  27.         }  
  28.     }  
  29.   
  30. }  


ok,接着我们做如下几个步骤:

1、将这个类放到activemq project/activemq-broker的相应路径下,在activemq-project/目录下执行mvn package;

2、待maven执行完成后,将activemq-broker-version.jar和activemq-spring-version.jar放入到apache-activemq-version-bin/lib/目录下,替换对应的文件;

3、修改activemq.xml,加入元素dispatchPolicy,完成。

Apache ActiveMQ单点基本配置 activemq.xml为基础,修改的内容如下:

  1. <destinationPolicy>  
  2.            <policyMap>  
  3.              <policyEntries>  
  4.                <policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb">  
  5.   
  6.         <dispatchPolicy>  
  7.                         <! -- 新增的分发策略 -->  
  8.                         <testDispatchPolicy/>  
  9.         </dispatchPolicy>  
  10.                    <pendingMessageLimitStrategy>  
  11.                         <constantPendingMessageLimitStrategy limit="1000"/>  
  12.                    </pendingMessageLimitStrategy>  
  13.                </policyEntry>  
  14.                  
  15.              </policyEntries>  
  16.            </policyMap>  
  17.        </destinationPolicy>  

至此我们就实现了Activemq的自定义分发策略功能,启动activemq,查看一下控制台。


参考资源:

http://activemq./dispatch-policies.html

http://activemq./maven/5.9.0/apidocs/index.html?deprecated-list.html






    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多