配色: 字号:
activemq_使用教程
2017-03-31 | 阅:  转:  |  分享 
  
activemq目前最新版本是5.2.0,很多开源项目都使用activemq作为message的收发.?下载网址是:http://activemq.apache.org/download.html?一下是网上收集的资料1JMS???在介绍ActiveMQ之前,首先简要介绍一下JMS规范。1.1JMS的基本构件1.1.1连接工厂???连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。1.1.2连接???JMSConnection封装了客户与JMS提供者之间的一个虚拟的连接。1.1.3会话???JMSSession是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。1.1.4目的地???目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。点对点消息传递域的特点如下:?每个消息只能有一个消费者。?消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。??发布/订阅消息传递域的特点如下:?每个消息可以有多个消费者。?生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。?在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。1.1.5消息生产者???消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。1.1.6消息消费者???消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:?同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。?异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。1.1.7消息???JMS消息由以下三部分组成:?消息头。每个消息头字段都有相应的getter和setter方法。?消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。?消息体。JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。1.2JMS的可靠性机制1.2.1确认???JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。???在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgementmode)。该参数有以下三个可选值:?Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。?Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。?Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMSprovider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMSprovider必须把消息头的JMSRedelivered字段设置为true。1.2.2持久性???JMS支持以下两种消息提交模式:?PERSISTENT。指示JMSprovider持久保存消息,以保证消息不会因为JMSprovider的失败而丢失。?NON_PERSISTENT。不要求JMSprovider持久保存消息。1.2.3优先级???可以使用消息优先级来指示JMSprovider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMSprovider并不一定保证按照优先级的顺序提交消息。1.2.4消息过期???可以设置消息在一定时间后过期,默认是永不过期。1.2.5临时目的地???可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。1.2.6持久订阅???首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic。第二个参数是订阅的名称。???JMSprovider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMSprovider会象客户发送客户处于非激活状态时所发布的消息。???持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。1.2.7本地事务???在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMSSession接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。???事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。???需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。1.3JMS规范的变迁???JMS的最新版本的是1.1。它和同1.0.2版本之间最大的差别是,JMS1.1通过统一的消息传递域简化了消息传递。这不仅简化了JMSAPI,也有利于开发人员灵活选择消息传递域,同时也有助于程序的重用和维护。以下是不同消息传递域的相应接口:JMS公共点对点域发布/订阅域ConnectionFactoryQueueConnectionFactoryTopicConnectionFactoryConnectionQueueConnectionTopicConnectionDestinationQueueTopicSessionQueueSessionTopicSessionMessageProducerQueueSenderTopicPublisherMessageConsumerQueueReceiverTopicSubscriber2ActiveMQ2.1Broker2.1.1RunningBroker???ActiveMQ5.0的二进制发布包中bin目录中包含一个名为activemq的脚本,直接运行这个脚本就可以启动一个broker。???此外也可以通过BrokerConfigurationURI或BrokerXBeanURI对broker进行配置,以下是一些命令行参数的例子:ExampleDescriptionactivemqRunsabrokerusingthedefault''xbean:activemq.xml''asthebrokerconfigurationfile.activemqxbean:myconfig.xmlRunsabrokerusingthefilemyconfig.xmlasthebrokerconfigurationfilethatislocatedintheclasspath.activemqxbean:file:./conf/broker1.xmlRunsabrokerusingthefilebroker1.xmlasthebrokerconfigurationfilethatislocatedintherelativefilepath./conf/broker1.xmlactivemqxbean:file:C:/ActiveMQ/conf/broker2.xmlRunsabrokerusingthefilebroker2.xmlasthebrokerconfigurationfilethatislocatedintheabsolutefilepathC:/ActiveMQ/conf/broker2.xmlactivemqbroker:(tcp://localhost:61616,tcp://localhost:5000)?useJmx=trueRunsabrokerwithtwotransportconnectorsandJMXenabled.activemqbroker:(tcp://localhost:61616,network:tcp://localhost:5000)?persistent=falseRunsabrokerwith1transportconnectorand1networkconnectorwithpersistencedisabled.2.1.2EmbeddedBroker???可以通过在应用程序中以编码的方式启动broker,例如:Java代码?1.BrokerServicebroker=newBrokerService();?2.broker.addConnector("tcp://localhost:61616");?3.broker.start();????如果需要启动多个broker,那么需要为broker设置一个名字。例如:Java代码?1.BrokerServicebroker=newBrokerService();?2.broker.setName("fred");?3.broker.addConnector("tcp://localhost:61616");?4.broker.start();????如果希望在同一个JVM内访问这个broker,那么可以使用VMTransport,URI是:vm://brokerName。关于更多的broker属性,可以参考Apache的官方文档。???此外,也可以通过BrokerFactory来创建broker,例如:Java代码?1.BrokerServicebroker=BrokerFactory.createBroker(newURI(someURI));????someURI的可选值如下:URIschemeExampleDescriptionxbean:xbean:activemq.xmlSearchestheclasspathforanXMLdocumentwiththegivenURI(activemq.xmlinthiscase)whichwillthenbeusedastheXmlConfigurationfile:file:foo/bar/activemq.xmlLoadsthegivenfile(inthisexamplefoo/bar/activemq.xml)astheXmlConfigurationbroker:broker:tcp://localhost:61616UsestheBrokerConfigurationURItoconfigurethebroker??当使用XBean的配置方式的时候,需要指定一个xml配置文件,例如:Java代码?1.BrokerServicebroker=BrokerFactory.createBroker(newURI("xbean:com/test/activemq.xml"));????使用Spring的配置方式如下:Xml代码?1.?2.??3.??4.?2.1.3MonitoringBroker2.1.3.1JMX???在使用JMX监控broker之前,首先要启用broker的JMX监控功能,例如在配置文件中设置useJmx="true",如下:Xml代码?1.?2.??3.?????4.??5.?...?6.????接下来运行JDK自带的jconsole。在运行了jconsole后,它会弹出对话框来选择需要连接到的agent。如果是在启动broker的主机上运行jconsole,那么ActiveMQbroker会出现在jconsole的Local标签中。如果要连接到远程的broker,那么可以在Advanced标签中指定JMXURL,以下是一个连接到本机的JMXURL:???service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi??在jconsole的MBeans标签中,可以查看详细信息,也可以执行相应的operation。需要注意的是,在jconsole连接到broker的时候,并不需要输入用户名和密码,如果这存在潜在的安全问题,那么就需要为JMXConnector配置密码保护(需要使用1.5以上版本的JDK)。????首先要禁止ActiveMQ创建自己的connector,例如:Xml代码?1.?2.??3.?????4.??5.????然后在ActiveMQ的conf目录下创建一个访问控制文件和密码文件,如下:conf/jmx.access:#The"monitorRole"rolehasreadonlyaccess.#The"controlRole"rolehasreadwriteaccess.monitorRolereadonlycontrolRolereadwriteconf/jmx.password:#The"monitorRole"rolehaspassword"abc123".#The"controlRole"rolehaspassword"abcd1234".monitorRoleabc123controlRoleabcd1234??然后修改ActiveMQ的bin目录下activemq的启动脚本,查找包含"SUNJMX="的一行如下:REMsetSUNJMX=-Dcom.sun.management.jmxremote.port=1616-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false???把它替换成setSUNJMX=-Dcom.sun.management.jmxremote.port=1616-Dcom.sun.management.jmxremote.authenticate=true-Dcom.sun.management.jmxremote.ssl=false-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access???最后重启ActiveMQ和jconsole,这时候需要强制login。如果在启动activemq的过程中出现以下错误,那么需要为这个文件增加访问控制。Windows平台上的具体解决方法请参考如下网址:http://java.sun.com/j2se/1.5.0/docs/guide/management/security-windows.htmlError:Passwordfilereadaccessmustberestricted:D:\apache-activemq-5.0.0\bin\../conf/jmx.password2.1.3.2WebConsole???WebConsole被集成到了ActiveMQ的二进制发布包中,因此缺省访问http://localhost:8161/admin即可访问WebConsole。???在配置文件中,可以通过修改nioConnector的port属性来修改Webconsole的缺省端口:Xml代码?1.?2.??3.????4.??5.?...?6.????出于安全性或者可靠性的考虑,WebConsole可以被部署到不同于ActiveMQ的进程中。例如把activemq-web-console.war部署到一个单独的web容器中(Tomcat,Jetty等)。在ActiveMQ5.0的二进制发布包中不包含activemq-web-console.war,因此需要下载ActiveMQ的源码,然后进入到${activemq.base}/src/activemq-web-console目录中执行mvninstanll。如果一切正常,那么缺省会在${activemq.base}/src/activemq-web-console/target目录中生成activemq-web-console-5.0.0.war。然后将activemq-web-console-5.0.0.war拷贝到Tomcat的webapps目录中,并重命名成activemq-web-console.war。??需要注意的是,要将activemq-all-5.0.0.jar拷贝到WEB-INF\lib目录中(可能还需要拷贝jms.jar)。还要为Tomcat设置以下五个系统属性(修改catalina.bat文件):setJAVA_OPTS=%JAVA_OPTS%-Dwebconsole.type="properties"setJAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jms.url="tcp://localhost:61616"setJAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jmx.url="service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"setJAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jmx.role=""setJAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jmx.password=""??如果JMX没有配置密码保护,那么webconsole.jmx.role和webconsole.jmx.password设置成""即可。如果broker被配置成了Master/Slave模式,那么可以配置成使用failovertransport,例如:-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)??顺便说一下,由于webconsole.type属性是properties,因此实际上起作用的WebConsole的配置文件是WEB-INF/webconsole-properties.xml。最后启动被监控的ActiveMQ,访问http://localhost:8080/activemq-web-console/,查看显示是否正常。2.1.3.3AdvisoryMessage???ActiveMQ支持AdvisoryMessages,它允许你通过标准的JMS消息来监控系统。目前的AdvisoryMessages支持:?consumers,producersandconnectionsstartingandstopping?temporarydestinationsbeingcreatedanddestroyed?messagesexpiringontopicsandqueues?brokerssendingmessagestodestinationswithnoconsumers.?connectionsstartingandstopping?AdvisoryMessages可以被想象成某种的管理通道,通过它你可以得到关于JMSprovider、producers、consumers和destinations的信息。Advisorytopics都使用ActiveMQ.Advisory.这个前缀,以下是目前支持的topics:????ClientbasedadvisoriesAdvisoryTopicsDescriptionActiveMQ.Advisory.ConnectionConnectionstart&stopmessagesActiveMQ.Advisory.Producer.QueueProducerstart&stopmessagesonaQueueActiveMQ.Advisory.Producer.TopicProducerstart&stopmessagesonaTopicActiveMQ.Advisory.Consumer.QueueConsumerstart&stopmessagesonaQueueActiveMQ.Advisory.Consumer.TopicConsumerstart&stopmessagesonaTopic????在消费者启动/停止的AdvisoryMessages的消息头中有个consumerCount属性,他用来指明目前desination上活跃的consumer的数量。????DestinationandMessagebasedadvisoriesAdvisoryTopicsDescriptionActiveMQ.Advisory.QueueQueuecreate&destroyActiveMQ.Advisory.TopicTopiccreate&destroyActiveMQ.Advisory.TempQueueTemporaryQueuecreate&destroyActiveMQ.Advisory.TempTopicTemporaryTopiccreate&destroyActiveMQ.Advisory.Expired.QueueExpiredmessagesonaQueueActiveMQ.Advisory.Expired.TopicExpiredmessagesonaTopicActiveMQ.Advisory.NoConsumer.QueueNoconsumerisavailabletoprocessmessagesbeingsentonaQueueActiveMQ.Advisory.NoConsumer.TopicNoconsumerisavailabletoprocessmessagesbeingsentonaTopic??以上的这些destnations都可以用来作为前缀,在其后面追加其它的重要信息,例如topic、queue、clientID、producderID和consumerID等。这令你可以利用Wildcards和Selectors来过滤AdvisoryMessages(关于Wildcard和Selector会在稍后介绍)。??例如,如果你希望订阅FOO.BAR这个queue上Consumer的start/stop的消息,那么可以订阅ActiveMQ.Advisory.Consumer.Queue.FOO.BAR;如果希望订阅所有queue上的start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer.Queue.>;如果希望订阅所有queue或者topic上的start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer.>。???org.apache.activemq.advisory.AdvisorySupport类上有如下的helpermethods,用来在程序中得到advisorydestinationobjects。Java代码?1.AdvisorySupport.getConsumerAdvisoryTopic()?2.AdvisorySupport.getProducerAdvisoryTopic()?3.AdvisorySupport.getDestinationAdvisoryTopic()?4.AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()?5.AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()?6.AdvisorySupport.getNoTopicConsumersAdvisoryTopic()?7.AdvisorySupport.getNoQueueConsumersAdvisoryTopic()???以下是段使用AdvisoryMessages的程序代码:Java代码?1.DestinationadvisoryDestination=AdvisorySupport.getProducerAdvisoryTopic(destination)?2.MessageConsumerconsumer=session.createConsumer(advisoryDestination);?3.consumer.setMessageListener(this);?4....?5.publicvoidonMessage(Messagemsg){?6.???if(msginstanceofActiveMQMessage){?7.???????try{?8.????????????ActiveMQMessageaMsg=?(ActiveMQMessage)msg;?9.????????????ProducerInfoprod=(ProducerInfo)aMsg.getDataStructure();?10.???????}catch(JMSExceptione){?11.???????????log.error("Failedtoprocessmessage:"+msg);?12.???????}?13.???}?14.}?2.1.3.4CommandAgent???在介绍CommandAgent前首先简要介绍一下XMPP(Jabber)协议,XMPP是一种基于XML的即时通信协议,它由Jabber软件基金会开发。在配置文件中通过增加transportConnector来支持XMPP协议:Xml代码?1.?2.??3.????...?4.?????5.??6.????ActiveMQ提供了ActiveMQmessages和XMPP之间的双向桥接:?如果客户加入了一个聊天室,那么这个聊天室的名字会被映射到一个JMStopic。?尝试在聊天室内发送消息会导致一个JMS消息被发送到这个topic。?呆在一个聊天室中意味着这将保持一个对相应JMStopic的订阅。因此发送到这个topic的JMS消息也会被发送到聊天室。??推荐XMPP客户端Spark(http://www.igniterealtime.org/)。??从4.2版本起,ActiveMQ支持CommandAgent。在配置文件中,通过设置commandAgent来启用CommandAgent:Xml代码?1.?2.??3.???...?4.??5.??6.????启用了CommandAgent的broker上会有一个来自CommandAgent的连接,它同时订阅topic:ActiveMQ.Agent。在你启动XMPP客户端,加入到ActiveMQ.Agent聊天室后,就可以同broker进行交谈了。通过在XMPP客户端中键入help,可以得到帮助信息。???需要注意的是,ActiveMQ5.0版本有个小bug,如果broker没有采用缺省的用户名和密码,那么CommandAgent便无法正常启动。Apache官方文档说,此bug已经被修正,预定在5.2.0版本上体现。修改方式如下:Xml代码?1.?2.1.3.5Visualizationplugin???ActiveMQ支持以broker插件的形式生成DOT文件(可以用agrviewer来查看),以图表的方式描述connections、sessions、producers、consumers、destinations等信息。配置方式如下:?Xml代码?1.?2.?????...?3.??????4.??????????5.??????????6.??????7.???需要注意的是,笔者认为ActiveMQ5.0版本的VisualizationPlugin尚不稳定,存在诸多问题。例如:如果使用connectionDotFilePlugin,那么brokerName必须是localhost;如果使用destinationDotFilePlugin可能会导致ArrayStoreException。2.2Transport???ActiveMQ目前支持的transport有:VMTransport、TCPTransport、SSLTransport、PeerTransport、UDPTransport、MulticastTransport、HTTPandHTTPSTransport、FailoverTransport、FanoutTransport、DiscoveryTransport、ZeroConfTransport等。以下简单介绍其中的几种,更多请参考Apache官方文档。2.2.1VMTransport???VMtransport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。第一个创建VM连接的客户会启动一个embedVMbroker,接下来所有使用相同的brokername的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。???以下是配置语法:??vm://brokerName?transportOptions??例如:vm://broker1?marshal=false&broker.persistent=false??TransportOptions的可选值如下:OptionNameDefaultValueDescriptionMarshalfalseIftrue,forceseachcommandsentoverthetransporttobemarshlledandunmarshlledusingaWireFormatwireFormatdefaultThenameoftheWireFormattousewireFormat.AllthepropertieswiththisprefixareusedtoconfigurethewireFormatcreatetrueIfthebrokershouldbecreatedondemandifitdoesnotallreadyexist.OnlysupportedinActiveMQ4.1broker.Allthepropertieswiththisprefixareusedtoconfigurethebroker.SeeConfiguringWireFormatsformoreinformation??以下是高级配置语法:??vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions??vm:broker:(tcp://localhost)?brokerOptions???例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false???TransportOptions的可选值如下:OptionNameDefaultValueDescriptionmarshalfalseIftrue,forceseachcommandsentoverthetransporttobemarshlledandunmarshlledusingaWireFormatwireFormatdefaultThenameoftheWireFormattousewireFormat.AllthepropertieswiththisprefixareusedtoconfigurethewireFormat??使用配置文件的配置语法:?????vm://localhost?brokerConfig=xbean:activemq.xml???例如:vm://localhost?brokerConfig=xbean:com/test/activemq.xml??使用Spring的配置:Xml代码?1.?2.??3.??4.?5.?6.?7.??8.???如果persistent是true,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data的目录用于持久化保存数据。需要注意的是,如果程序中启动了多个不同名字的VMbroker,那么可能会有如下警告:Failedtostartjmxconnector:CannotbindtoURL[rmi://localhost:1099/jmxrmi]:javax.naming.NameAlreadyBoundException…可以通过在transportOptions中追加broker.useJmx=false来禁用JMX来避免这个警告。2.2.2TCPTransport???TCPtransport允许客户端通过TCPsocket连接到远程的broker。以下是配置语法:???tcp://hostname:port?transportOptions???TransportOptions的可选值如下:OptionNameDefaultValueDescriptionminmumWireFormatVersion0TheminimumversionwireformatthatisallowedtracefalseCausesallcommandsthataresentoverthetransporttobeloggeduseLocalHosttrueWhentrue,itcausesthelocalmachinesnametoresolveto"localhost".socketBufferSize641024SetsthesocketbuffersizeinbytessoTimeout0setsthesockettimeoutinmillisecondsconnectionTimeout30000Anon-zerovaluespecifiestheconnectiontimeoutinmilliseconds.Azerovaluemeanswaitforeverfortheconnectiontobeestablished.Negativevaluesareignored.wireFormatdefaultThenameoftheWireFormattousewireFormat.AllthepropertieswiththisprefixareusedtoconfigurethewireFormat.SeeConfiguringWireFormatsformoreinformation??例如:tcp://localhost:61616?trace=false2.2.3FailoverTransport???FailoverTransport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failovertransport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:???failover:(uri1,...,uriN)?transportOptions???failover:uri1,...,uriN???TransportOptions的可选值如下:OptionNameDefaultValueDescriptioninitialReconnectDelay10Howlongtowaitbeforethefirstreconnectattempt(inms)maxReconnectDelay30000Themaximumamountoftimeweeverwaitbetweenreconnectattempts(inms)useExponentialBackOfftrueShouldanexponentialbackoffbeusedbetweenreconnectattemptsbackOffMultiplier2TheexponentusedintheexponentialbackoffattemptsmaxReconnectAttempts0Ifnot0,thenthisisthemaximumnumberofreconnectattemptsbeforeanerrorissentbacktotheclientrandomizetrueusearandomalgorithmtochoosetheURItouseforreconnectfromthelistprovidedbackupfalseinitializeandholdasecondtransportconnection-toenablefastfailover??例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=1002.2.4Discoverytransport???Discoverytransport是可靠的tranport。它使用Discoverytransport来定位用来连接的URI列表。以下是配置语法:???discovery:(discoveryAgentURI)?transportOptions???discovery:discoveryAgentURI???TransportOptions的可选值如下:OptionNameDefaultValueDescriptioninitialReconnectDelay10HowlongtowaitbeforethefirstreconnectattemptmaxReconnectDelay30000ThemaximumamountoftimeweeverwaitbetweenreconnectattemptsuseExponentialBackOfftrueShouldanexponentialbackoffbeusedbtweenreconnectattemptsbackOffMultiplier2TheexponentusedintheexponentialbackoffattemptsmaxReconnectAttempts0Ifnot0,thenthisisthemaximumnumberofreconnectattemptsbeforeanerrorissentbacktotheclient??例如:discovery:(multicast://default)?initialReconnectDelay=100?????为了使用Discovery来发现broker,需要为broker启用discoveryagent。以下是XML配置文件中的一个例子:Xml代码?1.?2.???3.??????4.????5.???...?6.???在使用FailoverTransport或Discoverytransport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQMessageStore作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。??在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:Java代码?1.(ActiveMQConnection)connection).addTransportListener(newTransportListener(){?2.???publicvoidonCommand(Objectcmd){?3.???}?4.?5.???publicvoidonException(IOExceptionexp){?6.???}?7.?8.???publicvoidtransportInterupted(){?9.???????//Thetransporthassufferedaninterruptionfromwhichithopestorecover.?10.???}?11.?12.???publicvoidtransportResumed(){?13.???????//Thetransporthasresumedafteraninterruption.?14.???}?15.});?2.3Persistence2.3.1AMQMessageStore???AMQMessageStore是ActiveMQ5.0缺省的持久化存储。Messagecommands被保存到transactionaljournal(由rollingdatalogs组成)。Messages被保存到datalogs中,同时被referencestore进行索引以提高存取速度。Datelogs由一些单独的datalog文件组成,缺省的文件大小是32M,如果某个消息的大小超过了datalog文件的大小,那么可以修改配置以增加datalog文件的大小。如果某个datalog文件中所有的消息都被成功消费了,那么这个datalog文件将会被标记,以便在下一轮的清理中被删除或者归档。以下是其配置的一个例子:Xml代码?1.?2.??3.?????4.??5.?PropertynameDefaultvalueCommentsdirectoryactivemq-datathepathtothedirectorytousetostorethemessagestoredataandlogfilesuseNIOtrueuseNIOtowritemessagestothedatalogssyncOnWritefalsesynceverywritetodiskmaxFileLength32mbahinttosetthemaximumsizeofthemessagedatalogspersistentIndextrueuseapersistentindexforthemessagelogs.Ifthisisfalse,anin-memorystructureismaintainedmaxCheckpointMessageAddSize4kbthemaximumnumberofmessagestokeepinatransactionbeforeautomaticallycommittingcleanupInterval30000time(ms)beforecheckingforadiscarding/movingmessagedatalogsthatarenolongerusedindexBinSize1024defaultnumberofbinsusedbytheindex.Thebiggerthebinsize-thebettertherelativeperformanceoftheindexindexKeySize96thesizeoftheindexkey-thekeyisthemessageidindexPageSize16kbthesizeoftheindexpage-thebiggerthepage-thebetterthewriteperformanceoftheindexdirectoryArchivearchivethepathtothedirectorytousetostorediscardeddatalogsarchiveDataLogsfalseiftruedatalogsaremovedtothearchivedirectoryinsteadofbeingdeleted2.3.2KahaPersistence???KahaPersistence是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到datalogs中。当不再需要log文件中的数据的时候,log文件会被丢弃。以下是其配置的一个例子:Xml代码?1.?2.??????3.????????4.??????5.?2.3.3JDBCPersistence???目前支持的数据库有ApacheDerby,Axion,DB2,HSQL,Informix,MaxDB,MySQL,Oracle,Postgresql,SQLServer,Sybase。???如果你使用的数据库不被支持,那么可以调整StatementProvider来保证使用正确的SQL方言(flavourofSQL)。通常绝大多数数据库支持以下adaptor:?org.activemq.store.jdbc.adapter.BlobJDBCAdapter?org.activemq.store.jdbc.adapter.BytesJDBCAdapter?org.activemq.store.jdbc.adapter.DefaultJDBCAdapter?org.activemq.store.jdbc.adapter.ImageJDBCAdapter??也可以在配置文件中直接指定JDBCadaptor,例如:Xml代码?1.????以下是其配置的一个例子:Xml代码?1.?2.??????3.?4.?5.?6.??????7.??????8.??????9.??????10.??????11.????需要注意的是,如果使用MySQL,那么需要设置relaxAutoCommit标志为true。2.3.4DisablePersistence???以下是其配置的一个例子:Xml代码?1.?2.??2.4Security???ActiveMQ支持可插拔的安全机制,用以在不同的provider之间切换。2.4.1SimpleAuthenticationPlugin???SimpleAuthenticationPlugin适用于简单的认证需求,或者用于建立测试环境。它允许在XML配置文件中指定用户、用户组和密码等信息。以下是ActiveMQ配置的一个例子:Xml代码?1.?2.?...?3.??4.????5.??????6.??????7.??????8.????9.??10.?2.4.2JAASAuthenticationPlugin???JAASAuthenticationPlugin依赖标准的JAAS机制来实现认证。通常情况下,你需要通过设置java.security.auth.login.config系统属性来配置loginmodules的配置文件。如果没有指定这个系统属性,那么JAASAuthenticationPlugin会缺省使用login.config作为文件名。以下是一个login.config文件的例子:activemq-domain{???org.apache.activemq.jaas.PropertiesLoginModulerequireddebug=true????????org.apache.activemq.jaas.properties.user="users.properties"????????org.apache.activemq.jaas.properties.group="groups.properties";};???这个login.config文件中设置了两个属性:org.apache.activemq.jaas.properties.user和org.apache.activemq.jaas.properties.group分别用来指向user.properties和group.properties文件。需要注意的是,PropertiesLoginModule使用本地文件的查找方式,而且查找时采用的basedirectory是login.config文件所在的目录。因此这个login.config说明user.properties和group.properties文件存放在跟login.config文件相同的目录里。???以下是ActiveMQ配置的一个例子:Xml代码?1.?2.?...?3.??4.????基于以上的配置,在JAAS的LoginContext中会使用activemq-domain中配置的PropertiesLoginModule来进行登陆。???ActiveMQJAAS还支持LDAPLoginModule、CertificateLoginModule、TextFileCertificateLoginModule等loginmodule。2.4.3CustomAuthenticationImplementation???可以通过编码的方式为ActiveMQ增加认证功能。例如编写一个类继承自XBeanBrokerService。Java代码?1.packagecom.yourpackage;?2.?3.importjava.net.URI;?4.importjava.util.HashMap;?5.importjava.util.Map;?6.?7.importorg.apache.activemq.broker.Broker;?8.importorg.apache.activemq.broker.BrokerFactory;?9.importorg.apache.activemq.broker.BrokerService;?10.importorg.apache.activemq.security.SimpleAuthenticationBroker;?11.importorg.apache.activemq.xbean.XBeanBrokerService;?12.?13.publicclassSimpleAuthBrokerextendsXBeanBrokerService{?14.???//?15.???privateStringuser;?16.???privateStringpassword;?17.?????18.???@SuppressWarnings("unchecked")?19.???protectedBrokeraddInterceptors(Brokerbroker)throwsException{?20.???????broker=super.addInterceptors(broker);?21.???????Mappasswords=newHashMap();?22.???????passwords.put(getUser(),getPassword());?23.???????broker=newSimpleAuthenticationBroker(broker,passwords,newHashMap());?24.???????returnbroker;?25.???}?26.?????27.???publicStringgetUser(){?28.???????returnuser;?29.???}?30.?31.???publicvoidsetUser(Stringuser){?32.???????this.user=user;?33.???}?34.?35.???publicStringgetPassword(){?36.???????returnpassword;?37.???}?38.?39.???publicvoidsetPassword(Stringpassword){?40.???????this.password=password;?41.???}?42.}????以下是ActiveMQ配置文件的一个例子:Xml代码?1.?2.?…?3.??6.???7.????8.??????9.????10.??11.?…?12.???在这个配置文件中增加了一个namespaceauth,用于指向之前编写的哪个类。同时为SimpleAuthBroker注入了两个属性值user和password,因此在被SimpleAuthBroker改写的addInterceptors方法里,可以使用这两个属性进行认证了。ActiveMQ提供的SimpleAuthenticationBroker类继承自BrokerFilter(可以简单的看成是Broker的Adaptor),它的构造函数中的两个Map分别是userPasswords和userGroups。SimpleAuthenticationBroker在addConnection方法中使用userPasswords进行认证,同时会把userGroups的信息保存到ConnectionContext中。2.4.4AuthorizationPlugin???可以通过AuthorizationPlugin为认证后的用户授权,以下ActiveMQ配置文件的一个例子:Xml代码?1.?2.??3.???4.??5.????6.??????7.????????8.?????????"read="admins"write="admins"admin="admins"/>?9.?????????"read="users"write="users"admin="users"/>?10.?????????"read="guests"write="guests,users"admin="guests,users"/>?11.???????????12.?????????"read="admins"write="admins"admin="admins"/>?13.?????????"read="users"write="users"admin="users"/>?14.?????????"read="guests"write="guests,users"admin="guests,users"/>?15.?16.?????????"read="guests,users"write="guests,users"admin="guests,users"/>?17.????????18.??????19.????20.??21.??2.5Clustering???ActiveMQ从多种不同的方面提供了集群的支持。2.5.1Queueconsumerclusters???ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。???需要注意的是,笔者发现AcitveMQ5.0版本的Queueconsumerclusters存在一个bug:采用AMQMessageStore,运行一个producer,两个consumer,并采用如下的配置文件:Xml代码?1.?2.??3.???4.????5.??????6.????7.?????8.????9.??????10.????11.??????12.??13.???那么经过一段时间后可能会报出如下错误:ERROR[ActiveMQTransport:tcp:///127.0.0.1:1843-RecoveryListenerAdapter.java:58-RecoveryListenerAdapter]MessageidID:versus-1837-1203915536609-0:2:1:1:419couldnotberecoveredfromthedatastore!???Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现。2.5.2Brokerclusters???一个常见的场景是有多个JMSbroker,有一个客户连接到其中一个broker。如果这个broker失效,那么客户会自动重新连接到其它的broker。在ActiveMQ中使用failover://协议来实现这个功能。ActiveMQ3.x版本的reliable://协议已经变更为failover://。???如果某个网络上有多个brokers而且客户使用静态发现(使用StaticTransport或FailoverTransport)或动态发现(使用DiscoveryTransport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。然而,standalonebrokers并不了解其它brokers上的consumers,也就是说如果某个broker上没有consumers,那么这个broker上的消息可能会因得不到处理而积压起来。目前的解决方案是使用Networkofbrokers,以便在broker之间存储转发消息。ActiveMQ在未来会有更好的特性,用来在客户端处理这个问题。???从ActiveMQ1.1版本起,ActiveMQ支持networksofbrokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。有两种方式配置Networkofbrokers,一种是使用statictransport,如下:Xml代码?1.?2.??3.????4.??5.??6.????7.??8.?…?9.????另外一种是使用multicastdiscovery,如下:Xml代码?1.?2.??3.????4.??5.??6.????7.??8.?...?9.????NetworkConnector有以下属性:PropertyDefaultValueDescriptionnamebridgenameofthenetwork-formorethanonenetworkconnectorbetweenthesametwobrokers-usedifferentnamesdynamicOnlyfalseiftrue,onlyforwardmessagesifaconsumerisactiveontheconnectedbrokerdecreaseNetworkConsumerPriorityfalsedecreasethepriorityfordispatchingtoaQueueconsumerthefurtherawayitis(innetworkhops)fromtheproducernetworkTTL1thenumberofbrokersinthenetworkthatmessagesandsubscriptionscanpassthroughconduitSubscriptionstruemultipleconsumerssubscribingtothesamedestinationaretreatedasoneconsumerbythenetworkexcludedDestinationsemptydestinationsmatchingthislistwon''tbeforwardedacrossthenetworkdynamicallyIncludedDestinationsemptydestinationsthatmatchthislistwillbeforwardedacrossthenetworkn.b.anemptylistmeansalldestinationsnotintheexcludedlistwillbeforwardedstaticallyIncludedDestinationsemptydestinationsthatmatchwillalwaysbepassedacrossthenetwork-evenifnoconsumershaveeverregisteredaninterestduplexfalseiftrue,anetworkconnectionwillbeusedtobothproduceANDConsumemessages.Thisisusefulforhubandspokescenarioswhenthehubisbehindafirewalletc.???关于conduitSubscriptions属性,这里稍稍说明一下。设想有两个brokers,分别是brokerA和brokerB,它们之间用forwardingbridge连接。有一个consumer连接到brokerA并订阅queue:Q.TEST。有两个consumers连接到brokerB,也是订阅queue:Q.TEST。这三个consumers有相同的优先级。然后启动一个producer,它发送了30条消息到brokerA。如果conduitSubscriptions=true,那么brokerA上的consumer会得到15条消息,另外15条消息会发送给brokerB。此时负载并不均衡,因为此时brokerA将brokerB上的两个consumers视为一个;如果conduitSubscriptions=false,那么每个consumer上都会收到10条消息。以下是关于NetworkConnector属性的一个例子:Xml代码?1.?2.??5.?????6.???????7.???????8.?????9.?????10.???????11.???????12.?????13.?????14.???????15.???????16.?????17.??18.?2.5.3MasterSlave???在一个网络内运行多个brokers或者standalonebrokers时存在一个问题,这就是消息在物理上只被一个broker持有,因此当某个broker失效,那么你只能等待直到它重启后,这个broker上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。MasterSlave背后的想法是,消息被复制到slavebroker,因此即使masterbroker遇到了像硬件故障之类的错误,你也可以立即切换到slavebroker而不丢失任何消息。???MasterSlave是目前ActiveMQ推荐的高可靠性和容错的解决方案。以下是几种不同的类型:MasterSlaveTypeRequirementsProsConsPureMasterSlaveNoneNocentralpointoffailureRequiresmanualrestarttobringbackafailedmasterandcanonlysupport1slaveSharedFileSystemMasterSlaveASharedFilesystemsuchasaSANRunasmanyslavesasrequired.AutomaticrecoveryofoldmastersRequiressharedfilesystemJDBCMasterSlaveAShareddatabaseRunasmanyslavesasrequired.AutomaticrecoveryofoldmastersRequiresashareddatabase.Alsorelativelyslowasitcannotusethehighperformancejournal2.5.3.1PureMasterSlave???PureMasterSlave的工作方式如下:?Slavebroker消费masterbroker上所有的消息状态,例如消息、确认和事务状态等。只要slavebroker连接到了masterbroker,它不会(也不被允许)启动任何networkconnectors或者transportconnectors,所以唯一的目的就是复制masterbroker的状态。?Masterbroker只有在消息成功被复制到slavebroker之后才会响应客户。例如,客户的commit请求只有在masterbroker和slavebroker都处理完毕commit请求之后才会结束。?当masterbroker失效的时候,slavebroker有两种选择,一种是slavebroker启动所有的networkconnectors和transportconnectors,这允许客户端切换到slavebroker;另外一种是slavebroker停止。这种情况下,slavebroker只是复制了masterbroker的状态。?客户应该使用failovertransport并且应该首先尝试连接masterbroker。例如:failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false设置randomize为false就可以让客户总是首先尝试连接masterbroker(slavebroker并不会接受任何连接,直到它成为了masterbroker)。??PureMasterSlave具有以下限制:?只能有一个slavebroker连接到masterbroker。?在因masterbroker失效而导致slavebroker成为master之后,之前的masterbroker只有在当前的masterbroker(原slavebroker)停止后才能重新生效。?Masterbroker失效后而切换到slavebroker后,最安全的恢复masterbroker的方式是人工处理。首先要停止slavebroker(这意味着所有的客户也要停止)。然后把slavebroker的数据目录中所有的数据拷贝到masterbroker的数据目录中。然后重启masterbroker和slavebroker。??Masterbroker不需要特殊的配置。Slavebroker需要进行以下配置Xml代码?1.?2.???...?3.????4.??????5.???6.????其中的masterConnectorURI用于指向masterbroker,shutdownOnMasterFailure用于指定slavebroker在masterbroker失效的时候是否需要停止。此外,也可以使用如下配置:Xml代码?1.?2.?...?3.??4.????5.??6.???需要注意的是,笔者认为ActiveMQ5.0版本的PureMasterSlave仍然不够稳定。2.5.3.2SharedFileSystemMasterSlave???如果你使用SAN或者共享文件系统,那么你可以使用SharedFileSystemMasterSlave。基本上,你可以运行多个broker,这些broker共享数据目录。当第一个broker得到文件上的排他锁之后,其它的broker便会在循环中等待获得这把锁。客户端使用failovertransport来连接到可用的broker。当masterbroker失效的时候会释放这把锁,这时候其中一个slavebroker会得到这把锁从而成为masterbroker。以下是ActiveMQ配置的一个例子:Xml代码?1.?2.???3.??????4.???5.??…?6.?2.5.3.3JDBCMasterSlave???JDBCMasterSlave的工作原理跟SharedFileSystemMasterSlave类似,只是采用了数据库作为持久化存储。以下是ActiveMQ配置的一个例子:Xml代码?1.?2.??3.???...?4.????5.??????6.????7.?????8.??9.???10.??11.????12.????13.????14.????15.????16.???17.???需要注意的是,如果你使用MySQL数据库,需要首先执行以下三条语句:(Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现)Sql代码?1.ALTERTABLEactivemq_acksENGINE=InnoDB;?2.ALTERTABLEactivemq_lockENGINE=InnoDB;?3.ALTERTABLEactivemq_msgsENGINE=InnoDB;?2.6Features???ActiveMQ包含了很多功能强大的特性,下面简要介绍其中的几个。2.6.1ExclusiveConsumer???Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作。???ActiveMQ从4.x版本起开始支持ExclusiveConsumer(或者说ExclusiveQueues)。Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其它的consumer。???可以通过DestinationOptions来创建一个ExclusiveConsumer,如下:Java代码?1.queue=newActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");?2.consumer=session.createConsumer(queue);????顺便说一下,可以给consumer设置优先级,以便针对网络情况(如networkhops)进行优化,如下:Java代码?1.queue=newActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");?2.6.2MessageGroups???用Apache官方文档的话说,MessageGroupsrock!它是ExclusiveConsumer功能的增强。逻辑上,MessageGroups可以看成是一种并发的ExclusiveConsumer。跟所有的消息都由唯一的consumer处理不同,JMS消息属性JMSXGroupID被用来区分messagegroup。MessageGroups特性保证所有具有相同JMSXGroupID的消息会被分发到相同的consumer(只要这个consumer保持active)。另外一方面,MessageGroups特性也是一种负载均衡的机制。???在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个messagegroup。如果没有,那么broker会选择一个consumer,并将它关联到这个messagegroup。此后,这个consumer会接收这个messagegroup的所有消息,直到:?Consumer被关闭。?Messagegroup被关闭。通过发送一个消息,并设置这个消息的JMSXGroupSeq为0。??从4.1版本开始,ActiveMQ支持一个布尔字段JMSXGroupFirstForConsumer。当某个messagegroup的第一个消息被发送到consumer的时候,这个字段被设置。如果客户使用failovertransport连接到broker。在由于网络问题等造成客户重新连接到broker的时候,相同messagegroup的消息可能会被分发到不同与之前的consumer,因此JMSXGroupFirstForConsumer字段也会被重新设置。???以下是使用messagegroups的例子:Java代码?1.Mesasgemessage=session.createTextMessage("hey");?2.message.setStringProperty("JMSXGroupID","IBM_NASDAQ_20/4/05");?3....?4.producer.send(message);?2.6.3JMSSelectors???JMSSelectors用于在订阅中,基于消息属性对进行消息的过滤。JMSSelectors由SQL92语法定义。以下是个Selectors的例子:Java代码?1.consumer=session.createConsumer(destination,"JMSType=''car''ANDweight>2500");?????在JMSSelectors表达式中,可以使用IN、NOTIN、LIKE等,例如:???LIKE''12%3''(''123''true,''12993''true,''1234''false)???LIKE''l_se''(''lose''true,''loose''false)???LIKE''\_%''ESCAPE''\''(''_foo''true,''foo''false)???需要注意的是,JMSSelectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换,例如:Java代码?1.myMessage.setStringProperty("NumberOfOrders","2");????"NumberOfOrders>1"求值结果是false。关于JMSSelectors的详细文档请参考javax.jms.Message的javadoc。???上一小节介绍的MessageGroups虽然可以保证具有相同messagegroup的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,MessageGroups可以和JMSSelector一起工作,例如:???设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个messagegroups分别是"A"、"B"和"C"。然后令consumerA使用"JMXGroupID=''A''"作为selector。B和C也同理。这样就可以保证messagegroupA的消息只被consumerA处理。需要注意的是,这种做法有以下缺点:?producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。?如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。2.6.4PendingMessageLimitStrategy???首先简要介绍一下prefetch机制。ActiveMQ通过prefetch机制来提高性能,这意味这客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由prefetchlimit来控制。当某个consumer的prefetchbuffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认。可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQPrefetchPolicy对象来配置prefetchpolicy。也可以通过connectionoptions或者destinationoptions来配置。例如:???tcp://localhost:61616?jms.prefetchPolicy.all=50???tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1???queue=newActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");???prefetchsize的缺省值如下:?persistentqueues(defaultvalue:1000)?non-persistentqueues(defaultvalue:1000)?persistenttopics(defaultvalue:100)?non-persistenttopics(defaultvalue:Short.MAX_VALUE-1)???慢消费者会在非持久的topics上导致问题:一旦消息积压起来,会导致broker把大量消息保存在内存中,broker也会因此而变慢。未来ActiveMQ可能会实现磁盘缓存,但是这也还是会存在性能问题。目前ActiveMQ使用PendingMessageLimitStrategy来解决这个问题。除了prefetchbuffer之外,你还要配置缓存消息的上限,超过这个上限后,新消息到来时会丢弃旧消息。通过在配置文件的destinationmap中配置PendingMessageLimitStrategy,可以为不用的topicnamespace配置不同的策略。目前有以下两种:?ConstantPendingMessageLimitStrategy。这个策略使用常量限制。例如:?PrefetchRatePendingMessageLimitStrategy。这个策略使用prefetchsize的倍数限制。例如:??在以上两种方式中,如果设置0意味着除了prefetch之外不再缓存消息;如果设置-1意味着禁止丢弃消息。????此外,你还可以配置消息的丢弃策略,目前有以下两种:?oldestMessageEvictionStrategy。这个策略丢弃最旧的消息。?oldestMessageWithLowestPriorityEvictionStrategy。这个策略丢弃最旧的,而且具有最低优先级的消息。??以下是个ActiveMQ配置文件的例子:Xml代码?1.?2.????3.??????4.????????5.?????????">?6.????????????7.????????????8.??????????????9.????????????10.?????????????11.????????????12.????????????13.??????????????14.????????????15.??????????16.????????17.??????18.????19.???...?20.?2.6.5CompositeDestinations???从1.1版本起,ActiveMQ支持compositedestinations。它允许用一个虚拟的destination代表多个destinations。例如你可以通过compositedestinations在一个操作中同时向12个queue发送消息。在compositedestinations中,多个destination之间采用","分割。例如:Java代码?1.Queuequeue=newActiveMQQueue("FOO.A,FOO.B,FOO.C");???如果你希望使用不同类型的destination,那么需要加上前缀如queue://或topic://,例如:?Java代码?1.Queuequeue=newActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");????以下是ActiveMQ配置文件进行配置的一个例子:Xml代码?1.?2.??3.????4.??????5.????????6.??????????7.??????????8.????????9.??????10.????11.??12.???可以在转发前,先通过JMSSelector判断一个消息是否需要转发,例如:Xml代码?1.?2.??3.????4.??????5.????????6.??????????7.??????????8.????????9.??????10.????11.??12.?2.6.6MirroredQueues???每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用VirtualDestinations来建立一个virtualqueue来把消息转发到多个queues中。但是为系统中每个queue都进行如此的配置可能会很麻烦。???ActiveMQ支持MirroredQueues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序可以订阅这个mirroredqueuetopic。为了启用MirroredQueues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirrortopic的前缀,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一个例子:Xml代码?1.?2.?3.??4.????5.??6.???7.??8.??????9.??10.?...?11.?假如某个producer向名为Foo.Bar的queue中发送消息,那么你可以通过订阅名为Mirror.Topic.Foo.Bar的topic来获得发送到Foo.Bar中的所有消息。2.6.7Wildcards???Wildcards用来支持联合的名字分层体系(federatednamehierarchies)。它不是JMS规范的一部分,而是ActiveMQ的扩展。ActiveMQ支持以下三种wildcards:?"."用于作为路径上名字间的分隔符。?""用于匹配路径上的任何名字。?">"用于递归地匹配任何以这个名字开始的destination。??作为一种组织事件和订阅感兴趣那部分信息的一种方法,这个概念在金融市场领域已经流行了一段时间了。设想你有以下两个destination:?PRICE.STOCK.NASDAQ.IBM(IBM在NASDAQ的股价)?



献花(0)
+1
(本文系关平藏书首藏)