配色: 字号:
ActiveMQ与Tomcat整合教程
2017-03-31 | 阅:  转:  |  分享 
  
说明:Tomcat示例版本6.0.14,其它版本在配置上可能有一些差异1、准备jar包:将ActiveMQlib目录下的5个jar包复制到
Tomcatlib目录下:activemq-core-5.1.0.jaractivemq-web-5.1.0.jargeroni
mo-j2ee-management_1.0_spec-1.0.jargeronimo-jms_1.1_spec-1.1.1.ja
rgeronimo-jta_1.0.1B_spec-1.0.1.jar2、修改配置文件:2.1修改Tomcat的conf/con
text.xml文件:在节点中添加以下内容:?iloverConnectionFactory"auth="Container"type="org.apache.active
mq.ActiveMQConnectionFactory"description="JMSConnectionFactory
"factory="org.apache.activemq.jndi.JNDIReferenceFactory"brokerU
RL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&am
p;maxReconnectAttempts=5"brokerName="localhost"useEmbeddedBroke
r="false"/>?cription="JMSConnectionFactory"factory="org.apache.activemq.jn
di.JNDIReferenceFactory"brokerURL="tcp://localhost:61616"broker
Name="localhost"useEmbeddedBroker="false"/>?topic/MyTopic"auth="Container"type="org.apache.activemq.command
.ActiveMQTopic"factory="org.apache.activemq.jndi.JNDIReferenceFa
ctory"physicalName="MY.TEST.FOO"/>ueue"auth="Container"type="org.apache.activemq.command.ActiveMQ
Queue"factory="org.apache.activemq.jndi.JNDIReferenceFactory"ph
ysicalName="MY.TEST.FOO.QUEUE"/>配置说明:以JNDI的方式定义了ActiveMQ的broker连
接url、Topic和Queue。?此处需加以注意的是Listener端的borkerURL使用了failover传输方式:fa
ilover:(tcp://localhost:61616)?initialReconnectDelay=100&maxR
econnectAttempts=5客户端使用普通传输方式:tcp://localhost:61616failovertrans
port是一种重新连接机制,用于建立可靠的传输。此处配置的是一旦ActiveMQbroker中断,Listener端将每隔100
ms自动尝试连接,直至成功连接或重试5次连接失败为止。failover还支持多个borker同时提供服务,实现负载均衡的同时可增加
系统容错性,格式:failover:(uri1,...,uriN)?transportOptions2.2新建web应用(web
apps/jms-test),修改WEB-INF/web.xml文件:增加一个自启动Servlet,该Servlet实现了Mes
sageListener接口,作为Topic消息的Listener端。jms-l
istener
com.flvcd.servlet.JMSListe
ner
1t>?2.3修改activemq.xml文件:为了支持持久化消息,需修改ActiveMQ的配置文件如下,使用默认的AMQMe
ssageStore方式(索引文件方式)存储消息,据官网介绍是快速、稳定的。数据库存储方式可参照官网相关文档。mlns="http://activemq.apache.org/schema/core"brokerName="localho
st"persistent="true"useShutdownHook="false">>32mb"/>3、Listener端(JMSListener.java
)完整实现:packagecom.flvcd.servlet;importjava.io.;importjavax.ser
vlet.;importjavax.servlet.http.;importjavax.naming.;importj
avax.jms.;importorg.apache.activemq.ActiveMQConnectionFactory;p
ublicclassJMSListenerextendsHttpServletimplementsMessageLis
tener{/初始化jms连接,创建topic监听器/publicvoidinit(ServletConfig
config)throwsServletException{try{InitialContextinitCtx=n
ewInitialContext();ContextenvContext=(Context)initCtx.looku
p("java:comp/env");ConnectionFactoryconnectionFactory=(Connec
tionFactory)envContext.lookup("jms/FailoverConnectionFactory");
Connectionconnection=connectionFactory.createConnection();con
nection.setClientID("MyClient");SessionjmsSession=connection.
createSession(false,Session.AUTO_ACKNOWLEDGE);?//普通消息订阅者,无法接收持久
消息//MessageConsumerconsumer=jmsSession.createConsumer((Destin
ation)envContext.lookup("jms/topic/MyTopic"));//基于Topic创建持久的消息订
阅者,前提:Connection必须指定一个唯一的clientId,当前为MyClientTopicSubscribercon
sumer=jmsSession.createDurableSubscriber((Topic)envContext.loo
kup("jms/topic/MyTopic"),"MySub");consumer.setMessageListener(t
his);connection.start();}catch(NamingExceptione){e.printSt
ackTrace();}catch(JMSExceptione){e.printStackTrace();}}/
接收消息,做对应处理/publicvoidonMessage(Messagemessage){if(che
ckText(message,"RefreshArticleId")!=null){StringarticleId=
checkText(message,"RefreshArticleId");System.out.println("接收刷新
文章消息,开始刷新文章ID="+articleId);}elseif(checkText(message,"Ref
reshThreadId")!=null){StringthreadId=checkText(message,"R
efreshThreadId");System.out.println("接收刷新论坛帖子消息,开始刷新帖子ID="+thr
eadId);}else{System.out.println("接收普通消息,不做任何处理!");}}privat
estaticStringcheckText(Messagem,Strings){try{returnm.g
etStringProperty(s);}catch(JMSExceptione){e.printStackTrace
(System.out);returnnull;}}}编译JMSListener.java至classes目录:javac
-cp.;D:\apache-tomcat-6.0.14\lib\servlet-api.jar;D:\apache-tomc
at-6.0.14\lib\geronimo-jms_1.1_spec-1.1.1.jar;D:\apache-tomcat-6.
0.14\lib\activemq-core-5.1.0.jar-d.JMSListener.java注:D:\apache
-tomcat-6.0.14请替换成本地对应目录。4、Publisher端(publish.jsp)实现:在jms-test目录下
新建publish.jsp文件:?<%@pagelanguage="java"import="javax.jms."pa
geEncoding="GBK"%><%@pagelanguage="java"import="javax.naming.
"%><%@pagelanguage="java"import="org.apache.activemq.ActiveMQC
onnectionFactory"%>?<%try{InitialContextinitCtx=newInitial
Context();ContextenvContext=(Context)initCtx.lookup("java:co
mp/env");ConnectionFactoryconnectionFactory=(ConnectionFactor
y)envContext.lookup("jms/NormalConnectionFactory");Connectionc
onnection=connectionFactory.createConnection();SessionjmsSess
ion=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducerproducer=jmsSession.createProducer((Destination
)envContext.lookup("jms/topic/MyTopic"));//设置持久方式producer.set
DeliveryMode(DeliveryMode.PERSISTENT);??MessagetestMessage=jms
Session.createMessage();//发布刷新文章消息testMessage.setStringProperty
("RefreshArticleId","2046");producer.send(testMessage);//发布刷新帖
子消息testMessage.clearProperties();testMessage.setStringProperty(
"RefreshThreadId","331");producer.send(testMessage);}catch(N
amingExceptione){e.printStackTrace();}catch(JMSExceptione)
{e.printStackTrace();}%>?Publisher和Listner之间通过Message的setStrin
gProperty和getStringProperty方法,实现对应的业务逻辑。上述示例代码中,RefreshArticleId代
表刷新某篇文章,RefreshThreadId代表刷新某个帖子,property值保持对应的ID。当然用户可根据实际需求灵活地使用
。5、运行Demo:5.1启动ActiveMQ服务器5.2启动Tomcat服务器:JMSListener将自动连接Active
MQbroker,日志信息:Successfullyconnectedtotcp://localhost:616165.
3访问http://localhost:8080/jms-test/publish.jspTomcat服务器日志将提示:接收刷新
文章消息,开始刷新文章ID=2046接收刷新论坛帖子消息,开始刷新帖子ID=3315.4访问http://localhost:8
161/admin/topics.jsphttp://localhost:8161/admin/topics.jsp查看http:
//localhost:8161/admin/send.jsp?JMSDestination=MY.TEST.FOO&JMSDestinationType=topicMY.TEST.FOO的消息日志,分别发送和接收2条。至此,已成功完成ActiveMQ与Tomcat的基本整合!Publisher和Listener完全可以独立部署到不同的Web服务器上,并通过ActiveMQ来进行消息传递,实现用户所需的业务逻辑。?测试持久消息的具体步骤:l启动Publisher所在Web服务器l启动ActiveMQl访问publish.jsp发送消息,此时Listener还未启动,消息将保存在ActiveMQ的bin\activemq-data目录下,查看日志可以看到发送2条,接收0条l启动Listener所在Web服务器,将自动接收到ActiveMQ的持久消息并处理,查看日志:发送2条,接收2条,表明持久消息应用成功!
献花(0)
+1
(本文系关平藏书首藏)