在Spring 2.0之前,Spring的JMS的作用局限于产生消息。这个功能(封装在 JmsTemplate 类中)当然是很好的,
但是,它没有描述完整的JMS堆栈,比如像消息的 异步 产生和消耗。JMS堆栈缺少的这一部分已经被添加,Spring 2.0现在提供对消息异步消耗的完整支持。 让我们从一个例子开始。 首先我们打开ActiveMQ。从ActiveMQ的安装路径上的bin目录,那里有一个ActiveMQ.bat,双击执行即可。不过要注意必须先设置java_home环境变量。ActiveMQ默认的服务端口是61616。 然后我们开始配置Spring配置文件。我起名为spring-jms.xml
- 首先要配置一个ConnectionFactory代码如下
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean>
这里用到的ConnectionFactory是ActiveMQ提供的工厂,为了能使用这个工厂,我们必须在项目中添加以下几个jar文件: geronimo-jms_1.1_spec-1.0.jar, activeio-core-3.0-beta3.jar, activemq-core-4.0.1.jar, backport-util-concurrent-2.1.jar, commons-logging-1.0.4.jar, geronimo-j2ee-management_1.0_spec-1.0.jar 以上这些Jar文件都存在于ActiveMQ安装目录的lib目录下,这些可是我一个一个试验出来的,累个半死。。
- 然后应该配置一个Queue(我使用的是点对点方式),不过ActiveMQ只要提供一个名字就可以自动创建队列,因此这一步省了,呵呵
- 下
面就轮到Spring的支持类了,首先是JmsTemplate。这个类提供了大量的方法简化我们对JMS的操作。常用的有两个,
org.springframework.jms.core.JmsTemplate102和
org.springframework.jms.core.JmsTemplate,这两个类分别支持JMS的1.02版本和1.1版本。现在比较常用
的还是1.02版本。配置如下
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate102"> <property name="connectionFactory" ref="connectionFactory"/> <property name="timeToLive" value="86400000"/> <property name="defaultDestinationName" value="cmpp" /> <property name="messageConverter" ref="messageConverter" /> <property name="receiveTimeout" value="30000" /> </bean>
上
面的配置中用到了第一步配置的connectionFactory以及一个消息转换的类messageConverter,这个类实现了
org.springframework.jms.support.converter.MessageConverter接口,可以在消息发送之前和接
受之后进行消息类型转换。具体的看最后的实例代码。配置代码如下:
<!-- Spring JMS SimpleConverter --> <bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /> <!-- Message Converter --> <bean id="messageConverter" class="com.liangj.apmgt.jms.ApmgtMessageConverter"> <property name="converter"> <ref local="simpleConverter" /> </property> </bean>
这里还配置了发送的消息的存在时间timeToLive,目标Queue的名字defaultDestinationName,接受消息超时时间receiveTimeout
- 配置发送代码
<!-- Message porducer --> <bean id="producer" class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean>
- 接着配置监听器,这是Spring2.0新增的功能,配置如下:
<!-- this is the Message Driven POJO (MDP) --> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" /> </constructor-arg> <property name="defaultListenerMethod" value="onMessage" /> <property name="messageConverter" ref="messageConverter" /> </bean>
<!-- and this is the attendant message listener container --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destinationName" value="${jms.destinationName.cmpp}" /> <property name="messageSelector" value="${jms.messageSelector}" /> <property name="messageListener" ref="messageListener" /> </bean>
Spring配置监听器有很多种选择,在这里我选择这回种MessageListenerAdapter方法主要是因为这个方法比较灵活。实现他只要一个
很普通的java类即可,和JMS以及Spring的耦合度最低。其中方法onMessage可以随便修改方法名,只要在配置文件中对应的修改就好了。 MessageListenerAdapter还有一个功能就是如果处理方法(我这里是onMessage)返回一个非空值,它将自动返回一个响应消息。这个消息会返回给JMS Reply-To属性定义的目的地(如果存在),或者是MessageListenerAdapter设置(如果配置了)的缺省目的地;如果没有定义目的地,那么将产生一个InvalidDestinationException异常(此异常将不会只被捕获而不处理,它将沿着调用堆栈上传)。 这样我们的配置就都完成了。接下来我们来实现对应的Java文件 先是接口文件发送消息接口IApmgtMessageProducer.java
public interface IApmgtMessageProducer { public abstract void sendMessage(ApmgtMessageData messageData); }
接受消息接口IApmgtMessageListener.java
public interface IApmgtMessageListener { public void onMessage(ApmgtMessageData message); }
发消息的文件DefaultApmgtMessageProducer.java
public class DefaultApmgtMessageProducer implements IApmgtMessageProducer { private JmsTemplate jmsTemplate;
public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; }
public void sendMessage(ApmgtMessageData messageData) { this.jmsTemplate.convertAndSend(messageData); } }
收消息文件DefaultApmgtMessageListener.java
public class DefaultApmgtMessageListener implements IApmgtMessageListener { public void onMessage(ApmgtMessageData message) { System.out.println("监听到消息:"+message); } }
消息转换类ApmgtMessageConverter.java
public class ApmgtMessageConverter implements MessageConverter {
private Log log = LogFactory.getLog(ApmgtMessageConverter.class);
private SimpleMessageConverter converter;
public void setConverter(SimpleMessageConverter converter) { this.converter = converter; }
public Object fromMessage(Message message) throws JMSException, MessageConversionException { if (message instanceof ObjectMessage) { ObjectMessage o_message = (ObjectMessage)message; MessageHeader header = new MessageHeader(); header.setId(message.getLongProperty("id")); header.setReceiver(message.getIntProperty("receiver")); header.setSender(message.getIntProperty("sender")); header.setSendPerson(message.getStringProperty("sendPerson")); header.setType(message.getIntProperty("type")); Serializable messageContent = o_message.getObject(); ApmgtMessageData<Serializable> messageData = new ApmgtMessageData<Serializable>(); messageData.setMessageContent(messageContent); messageData.setMessageHeader(header); return messageData; } return null; }
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { if (object instanceof ApmgtMessageData) { ApmgtMessageData data = (ApmgtMessageData) object; Message message = converter.toMessage(data.getMessageContent(), session); message.setLongProperty("id", data.getMessageHeader().getId()); message.setIntProperty("receiver", data.getMessageHeader().getReceiver()); message.setIntProperty("sender", data.getMessageHeader().getSender()); message.setIntProperty("type", data.getMessageHeader().getType()); message.setStringProperty("sendPerson", data.getMessageHeader().getSendPerson()); log.info("发送消息[MessageSender]:\n" + message); return message; } else { return null; } }
}
消息类文件 消息父类:ApmgtMessageData.java
public class ApmgtMessageData<T extends Serializable>{
protected T messageContent; protected MessageHeader messageHeader;
public T getMessageContent() { return this.messageContent; }
public MessageHeader getMessageHeader() { return this.messageHeader; }
public void setMessageContent(T messageContent) { this.messageContent = messageContent; } public void setMessageHeader(MessageHeader messageHeader) { this.messageHeader = messageHeader; }
}
消息属性的一个类MessageHeader.java
public class MessageHeader {
/** * 消息ID */ private long id;
/** * 消息类型 */ private int type;
/** * 消息发送方,发送消息的模块 */ private int sender;
/** * 消息接收方,接收消息的模块 */ private int receiver;
/** * 消息发送者,具体的用户 */ private String sendPerson;
public MessageHeader(){ this.id = System.currentTimeMillis() ; } public long getId() { return id; }
public void setId(long id) { this.id = id; }
public String getSendPerson() { return sendPerson; }
public void setSendPerson(String sendPerson) { this.sendPerson = sendPerson; }
public int getReceiver() { return receiver; }
public void setReceiver(int receiver) { this.receiver = receiver; }
public int getSender() { return sender; }
public void setSender(int sender) { this.sender = sender; }
public int getType() { return type; }
public void setType(int type) { this.type = type; }
}
消息体的类ModPasswordRequest.java
public class ModPasswordRequest implements Serializable{
private static final long serialVersionUID = 1L;
/** * 旧密码 */ private String oldPassword; /** * 新密码 */ private String newPassword;
public String getNewPassword() { return newPassword; }
public void setNewPassword(String newPassword) { this.newPassword = newPassword; }
public String getOldPassword() { return oldPassword; }
public void setOldPassword(String oldPassword) { this.oldPassword = oldPassword; }
}
消息类:ApmgtModPasswordRequest.java
public class ApmgtModPasswordRequest extends ApmgtMessageData<ModPasswordRequest> { private static final int REQ_MODPASSWORD = 0; private static final int INTF = 1; private static final int APMGT = 2;
public void init(){ messageHeader = new MessageHeader(); messageContent = new ModPasswordRequest(); messageHeader.setType(REQ_MODPASSWORD); messageHeader.setSender(INTF); messageHeader.setReceiver(APMGT); messageContent.setNewPassword("123456"); messageContent.setOldPassword("654321"); } }
最后是测试类Main.java
public class Main {
public static void main(final String[] args) throws Exception { PropertyConfigurator.configure("log4j.properties"); AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(new String[] { "spring-jms.xml" }); // ctx.registerShutdownHook(); IApmgtMessageProducer producer = (IApmgtMessageProducer)ctx.getBean("producer"); ApmgtModPasswordRequest messageData = new ApmgtModPasswordRequest(); messageData.setMessageHeader(new MessageHeader()); messageData.setMessageContent(new ModPasswordRequest()); messageData.init(); producer.sendMessage(messageData); } }
还有两个配置文件,第一个spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="[url]http://www./schema/beans"[/url] xmlns:xsi="[url]http://www./2001/XMLSchema-instance"[/url]
xsi:schemaLocation="[url]http://www./schema/beans[/url]
[url]http://www./schema/beans/spring-beans.xsd">[/url]
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>apmgt.properties</value> </list> </property> </bean>
<!-- ####################################### --> <!-- JMS Spring Beans --> <!-- ####################################### -->
<!-- Jms ConnectionFactory --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.brokerURL}" /> </bean>
<!-- Spring JMS SimpleConverter --> <bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<!-- JMS Queue Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate102"> <property name="connectionFactory" ref="connectionFactory"/> <property name="timeToLive" value="${jms.timeToLive}"/> <property name="defaultDestinationName" value="${jms.destinationName.cmpp}" /> <property name="messageConverter" ref="messageConverter" /> <property name="receiveTimeout" value="${jms.receiveTimeout}" /> </bean>
<!-- Message Converter --> <bean id="messageConverter" class="com.liangj.apmgt.jms.ApmgtMessageConverter"> <property name="converter"> <ref local="simpleConverter" /> </property> </bean>
<!-- Message porducer --> <bean id="producer" class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean>
<!-- this is the Message Driven POJO (MDP) --> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" /> </constructor-arg> <property name="defaultListenerMethod" value="onMessage" /> <property name="messageConverter" ref="messageConverter" /> </bean>
<!-- and this is the attendant message listener container --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destinationName" value="${jms.destinationName.cmpp}" /> <property name="messageSelector" value="${jms.messageSelector}" /> <property name="messageListener" ref="messageListener" /> </bean> </beans>
apmgt.properties
#jms properties jms.brokerURL=tcp://localhost:61616 jms.receiveTimeout=3000 jms.destinationName.cmpp=cmpp jms.messageSelector=receiver=2 #one day is 86400000 ms. 0 is means that it lives forever. jms.timeToLive=86400000
最后后还有源代码,希望对大家有帮助,写了2个小时,真累 ActiveMQ和Tomcat结合使用,网上有很多文章了,这里就不说了,over
附件:
test.zip (2 K)
附件:
test.zip (2 K)
附件:
midlist.html (33 K)
附件:
逐个显示图片.zip (161 K)
附件:
lightbox2.02.zip (71 K)
附件:
iecn_code.html (2 K)
附件:
iecn_code.html (1 K)
附件:
apmgt.zip (23 K)
|