ActiveMQ讯息策略
ActiveMQ消息策略
ActiveMQ中提供了众多的“策略”(policy),它们可以在broker端为每个通道“定制”消息的管理方式。本文将简单描述主要的几种Policy。
一. DispatchPolcicy: 转发策略(Topic) 此策略表明broker端消息转发给多个Consumer时,消息被发送的顺序性,这个顺序通常指Consumer的顺序,只对Topic有效,它有3种常用的类型: 1) RoundRobinDispatchPolicy: “轮询”,消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列,在转发消息时,对于匹配消息的第一个订阅者,将会被移动到“订阅者”列表的尾部,这也意味着“下一条”消息,将会较晚的转发给它。 2) StrictOrderDispatchPolicy: 严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。它和RoundRobin最大的区别是,没有移动“订阅者”顺序的操作。 3) PriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定priority,默认每个consumer的权重都一样。 4) SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其子类。
"轮询"是比较常用的策略。
<policyEntry topic=">"> <dispatchPolicy> <roundRobinDispatchPolicy/> </dispatchPolicy> </policyEntry>
二.SubscriptionRecoveryPolicy: 恢复策略(Topic) 在非耐久订阅者“失效”期间或一个新的Topic,broker可以保留的可追溯的消息量。前提是Topic必须是“retroactive”,我们可以在distination地址中指定此属性,例如:"order.topic?consumer.retroactive=true"。默认情况下,订阅者只能获取“订阅”开始之后的消息,如果Retroactive=true,那么订阅者就可以获取其创建之前的消息列表。此Policy就是用来控制“retroactive”的消息量。
1) FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker将为此Topic开辟定额的RAM用来保存最新的消息。 <!-- 1K --> <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>
2) FixedCountSubscriptionRecoveryPolicy: 保存一定条数的消息。 <!-- 100条 --> <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>
3) LastImageSubscriptionRecoveryPolicy: 只保留最新的一条数据 4) QueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存,具体能够“恢复”多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还存在,则都可以恢复。 5) TimedSubscriptionRecoveryPolicy: 保留最近一段时间的消息。 <!-- 可追溯最近1分钟的消息--> <timedSubscriptionRecoveryPolicy recoverDuration="60000" />
6) NoSubscriptionRecoveryPolicy: 关闭“恢复机制”。默认值。 <policyEntry topic=">"> <subscriptionRecoveryPolicy> <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> </subscriptionRecoveryPolicy> </policyEntry
三. DeadLetterStrategy: “死信”策略 Broker将如何管理“死信”。当消息过期后,或者“重发”几次之后仍然不能被正常消息,那么此消息将会被移除到DeadLetter队列中。此后,我们可以通过侦听死信队列,来获取相关通知或者对消息做额外的操作。 1) IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”,Topic为“ActiveMQ.DLQ.Topic.”;比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。我们使用“queuePrefix”“topicPrefix”来指定上述前缀。 默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。
<policyEntry queue="order"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false" /> </deadLetterStrategy> </policyEntry> 上述将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。individualDeadLetterStrategy还有一个属性为“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。
2) SharedDeadLetterStrategy: 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。 <deadLetterStrategy> <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/> </deadLetterStrategy> 3) DiscardingDeadLetterStrategy: broker将直接抛弃DeadLeatter。如果开发者不需要关心DeadLetter,可以使用此策略。AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。 <broker> <plugins> <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> </plugins> </broker>
对于上述三种策略,还有2个很重要的可选参数,“processExpired”表示是否将过期消息放入死信队列,默认为true;“processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。
四. PendingMessageLimitStrategy: 消息限制策略(面向Slow Consumer) 此策略只对Topic有效,当通道中有大量的消息积压时,broker可以保留的消息量。 1) ConstantPendingMessageLimitStrategy: 保留固定条数的消息,如果消息量超过limit,将使用“MessageEvictionStrategy”移除消息。 <policyEntry topic="PRICES.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="50"/> </pendingMessageLimitStrategy> </policyEntry>
2) PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍数条消息。 <!-- 如果prefetchSize为100,则保留2.5 * 100条消息 --> <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
五. MessageEvictionStrategy: 消息剔除策略(面向Slow Consumer) 配合PendingMessageLimitStrategy,只对Topic有效。当PendingMessage的数量超过限制时,broker该如何剔除多余的消息。 1) OldestMessageEvictionStrategy: 移除旧消息,默认策略。 2) OldestMessageWithLowestPriorityEvictionStrategy: 旧数据中权重较低的消息,将会被移除。(message.getPriority()) 3) UniquePropertyMessageEvictionStrategy: 移除具有指定property的旧消息。开发者可以指定property的名称。 <policyEntry topic="SKU.PRICE.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="10000"/> </pendingMessageLimitStrategy> <messageEvictionStrategy> <uniquePropertyMessageEvictionStrategy propertyName="PRICE" /> </messageEvictionStrategy> </policyEntry> 上述配置,针对SKU.PRICE通道中,只保留10000个最新的消息,如果有新消息继续加入,将会移除具有PRICE属性的旧消息。在每条消息中,封装多个商品的价格列表,我们可以实现“最新商品价格”的功能。
六. SlowConsumerStrategy: 慢速消费者策略 Broker将如何处理慢消费者。Broker将会启动一个后台线程用来检测所有的慢速消费者,并定期关闭关闭它们。 1) AbortSlowConsumerStrategy: 中断慢速消费者,慢速消费将会被关闭。 <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="false"/><!-- 不关闭底层链接 --> </slowConsumerStrategy> 2) AbortSlowConsumerStrategy: 如果慢速消费者最后一个ACK距离现在的时间间隔超过阀值,则中断慢速消费者。
<slowConsumerStrategy> <abortSlowConsumerStrategy maxTimeSinceLastAck="30000"/><!-- 30秒滞后 --> </slowConsumerStrategy>
七. PendingQueueMessageStoragePolicy: 待消息消息转存策略 当Queue中有大量的“NON_PERSISTENT”消息时,将会消耗JVM内存,直到OOM或者达到Broker端设定的systemUsage。为了避免这些问题,我们需要将这些消息“转存”到特定的存储器中。
1) vmQueueCursor: 将消息转存到基于内存(JVM linkeList)的存储结构中。默认设置。有OOM风险。 2) storeCursor: 将消息转存到storeEngine中,如果broker使用kahadb/LevelDB,那么消息将会被转存到相应的存储引擎中。这是强烈推荐的策略,也是效率最好的策略。 3) fileQueueCursor: 将消息转存到临时文件中。 <!-- 内存限制为512M,如果超过阀值,则转存 --> <policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb"> <pendingQueuePolicy> <storeCursor /> </pendingQueuePolicy> </policyEntry>
八. PendingSubscriberMessageStoragePolicy:(Topic) 针对“非耐久”订阅者。概念和(七)一样,支持三种策略:storeCursor, vmCursor和fileCursor。
九. PendingDurableSubscriberMessageStoragePolicy: (Topic) 针对“耐久”订阅者,支持三种策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。 <policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb"> <pendingSubscriberPolicy> <!-- 对于非耐久的订阅者,非持久化消息: vmCursor,fileCursor --> <fileCursor/> </pendingSubscriberPolicy> <pendingDurableSubscriberPolicy> <!-- 对于耐久的订阅者,非持久化消息 --> <!-- storeDurableSubscriberCursor --> <!-- vmDurableCursor --> <!-- fileDurableSubscriberCursor --> <storeDurableSubscriberCursor/> </pendingDurableSubscriberPolicy> </policyEntry> |
|