Spring+ActiveMQ+Mysql 配置JMS 一、准备一个可以运行的Spring环境 二、下载ActiveMQ (下载地址) 2.1 先确保ActiveMQ运行正常,直接运行 安装目录\bin\activemq.bat即可, 注意:如果要以服务方式运行的话,可以使用ActiveMQ 提供的工具 安装目录\bin\win32\InstallService.bat 确保以管理员方式运行 可以打开链接, (http://localhost:8161/admin)查看是否安装成功 三、试用 单独编写消息发送者和消息接受以测试相应 消息发送 public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值Queue.Name是Query的名字 Destination destination = session.createQueue("[color=red]Queue.Name[/color]"); // MessageProducer:消息生产者 MessageProducer producer = session.createProducer(destination); // 设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 发送一条消息 sendMsg(session, producer); session.commit(); connection.close(); } /** * 在指定的会话上,通过指定的消息生产者发出一条消息 * * @param session 消息会话 * @param producer 消息生产者 */ public static void sendMsg(Session session, MessageProducer producer) throws JMSException { // 创建一条文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!"); // 通过消息生产者发出消息 producer.send(message); System.out.println(""); } 消息接收 public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 Destination destination = session.createQueue("Queue.Name"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); while(true) { TextMessage message = (TextMessage) consumer.receive(1000); if(null != message) System.out.println("收到消息:" + message.getText()); else break; } session.close(); connection.close(); } 开启你的ActiveMQ服务器,测试一下吧。发送一个消息,然后看看接收到的成果 四、 Spring 注入 spring application.xml 文件配置 <!-- 配置JMS消息发送 --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <!-- Spring JMS Template --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0"> <value>Queue.Name</value> </constructor-arg> </bean> <bean id="sender" class="demo.JmsQueueSender"> <property name="jmsTemplate" ref="myJmsTemplate"></property> </bean> <bean id="receive" class="demo.JmsQueueReceiver"></bean> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"></property> <property name="messageListener" ref="receive"></property> <property name="destination" ref="destination" /> </bean> <!-- 配置JMS消息发送完成 --> 注意这里需要几个包 ,activeio-core-3.1.2.jar,activemq-all-5.5.0.jar,activemq-pool-5.5.0.jar,commons-pool-1.5.6.jar 剩下的就是在你的程序里面添加相应的消息发送和接收程序了 sender @Component public class JmsQueueSender { private JmsTemplate jmsTemplate; public void setConnectionFactory(ConnectionFactory cf) { this.jmsTemplate = new JmsTemplate(cf); } public void simpleSend() { jmsTemplate.convertAndSend("Queue.Name", "test!!!"); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } } receiver @Component public class JmsQueueReceiver implements MessageListener { @Override public void onMessage(Message message) { if(message instanceof TextMessage) { final TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch(final JMSException e) { e.printStackTrace(); } } } } 五、配置ActiveMQ以数据库的方式存储消息 ActiveMQ安装目录\conf\activemq.xml 找到 <broker>标签中的内容 <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> 注释掉以上内容,添加自己的数据库配置 <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> 配置以Mysql的方式保存消息 在<broker>标签以外的地方添加数据源 <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/> </bean> 将Mysql的包加到ActiveMQ的启动Lib下 在Mysql数据中新建数据库 activemq ,ActiveMQ在启动的时候会自动建表。 OK 。。。。 重新启动服务。 这样消息的发送者的消息将被保存到Mysql数据库,同时消息消耗者每读取一条消息。数据库中的消息也会相应的删除。
|
|