在某些业务场景下,可能会用到按需分发消息。
对于AMQ他内置了很多的分发策略可供我们选择(DispatchPolicy的实现类),如:PriorityDispatchPolicy, PriorityNetworkDispatchPolicy, RoundRobinDispatchPolicy, SimpleDispatchPolicy, StrictOrderDispatchPolicy。
那我们也可以自己去实现DispatchPolicy接口,做一个适合特定业务场景的分发策略。
首先我们要去http://svn./repos/asf/activemq这里下载对应版本的源码;
本例中为方便起见,我们拷贝了一段SimpleDispatchPolicy类的代码(路径:org.apache.activemq.broker.region.policy),当做我们的自定义类的内容,如:
- public class TestDispatchPolicy implements DispatchPolicy {
-
- public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
-
- ActiveMQDestination _destination = node.getMessage().getDestination();
-
- // 取得Topic name 和前缀,如:topic://Topic.foo
- System.out.println("-------->_destination.getQualifiedName:"+ _destination.getQualifiedName());
- // 取得Topic name,如:Topic.foo
- System.out.println("-------->_destination.getPhysicalName:"+ _destination.getPhysicalName());
- synchronized (consumers) {
- int count = 0;
- for (Iterator iter = consumers.iterator(); iter.hasNext();) {
- Subscription sub = (Subscription)iter.next();
- // 取得消费者的clientId,如:connection.setClientID("YourClientID");
- System.out.println("-------->sub.getContext().getClientId:"+ sub.getContext().getClientId());
- // Only dispatch to interested subscriptions
- if (!sub.matches(node, msgContext)) {
- sub.unmatched(node);
- continue;
- }
-
- sub.add(node);
- count++;
- }
- return count > 0;
- }
- }
-
- }
ok,接着我们做如下几个步骤:
1、将这个类放到activemq project/activemq-broker的相应路径下,在activemq-project/目录下执行mvn package;
2、待maven执行完成后,将activemq-broker-version.jar和activemq-spring-version.jar放入到apache-activemq-version-bin/lib/目录下,替换对应的文件;
3、修改activemq.xml,加入元素dispatchPolicy,完成。
以Apache ActiveMQ单点基本配置 activemq.xml为基础,修改的内容如下:
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb">
-
- <dispatchPolicy>
- <! -- 新增的分发策略 -->
- <testDispatchPolicy/>
- </dispatchPolicy>
- <pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="1000"/>
- </pendingMessageLimitStrategy>
- </policyEntry>
-
- </policyEntries>
- </policyMap>
- </destinationPolicy>
至此我们就实现了Activemq的自定义分发策略功能,启动activemq,查看一下控制台。
参考资源:
http://activemq./dispatch-policies.html
http://activemq./maven/5.9.0/apidocs/index.html?deprecated-list.html
|