前记:目前学习还比较杂乱,还未找到系统化地学习ActiveMq的方法。在网上看到消息持久化的demo,了解了一下,在此记录。
一、目前ActiveMq支持的持久化方法 url:http://activemq./persistence.html 3、KahaDB 4、JDBC 配合其自带的 high performance journal;根据官方说法,它内置的高性能journal的工作类似于在缓存层工作,消息会优先写入到journal,后台的定时任务会每隔一段时间间隔去 查看需要写入到jdbc的消息。 二、配置activemq.xml 1、修改persistenceAdapter <persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/>--> <jdbcPersistenceAdapter dataSource="#my-ds"/> </persistenceAdapter> 上面我们注释了默认的kahadb,添加了jdbc数据源。 2、增加数据源 <bean id="my-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://192.168.2.140:3306/activemq?characterEncoding=utf-8" /> <property name="username" value="root" /> <property name="password" value="123456" /> <property name="initialSize" value="5" /> <property name="maxTotal" value="100" /> <property name="maxIdle" value="30" /> <property name="maxWaitMillis" value="10000" /> <property name="minIdle" value="1" /> </bean> 这边有几个注意点: 2.1:上面配置的数据源类对应于dbcp2,如果1.x系列的jar包会报错,classNotFound异常。 2.2:在主安装目录的lib目录下,将mysql的jar包、dbcp的jar包放到该路径下 /usr/local/apache-activemq-5.14.4/lib 2.3:建议用bin/activemq console方式启动,可以及时查看错误信息。我在启动时,报以下错误: WARN | Failure Details: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)[:1.8.0_121] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)[:1.8.0_121] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)[:1.8.0_121] at java.lang.reflect.Constructor.newInstance(Constructor.java:423)[:1.8.0_121] at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.Util.getInstance(Util.java:387)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5094)[mysql-connector-java-5.1.38.jar:5.1.38] at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994)[mysql-connector-java-5.1.38.jar:5.1.38] at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1] at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1] at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:832)[activemq-jdbc-store-5.14.4.jar:5.14.4] at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:349)[activemq-jdbc-store-5.14.4.jar:5.14.4] at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$3.run(JDBCPersistenceAdapter.java:327)[activemq-jdbc-store-5.14.4.jar:5.14.4] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_121] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)[:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)[:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)[:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121] at java.lang.Thread.run(Thread.java:745)[:1.8.0_121] 在网上查询结果,原来是数据库需要字符集设置为latin1.
于是,新建了一个数据库,字符集设为Latin1.然后启动ActiveMq,查看数据库,发现多了三张表: 在这边将结构导出来,供手动建表: /* SQLyog v10.2 MySQL - 5.1.71 : Database - activemq ********************************************************************* */ /*!40101 SET NAMES utf8 */; /*!40101 SET SQL_MODE=''*/; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; CREATE DATABASE /*!32312 IF NOT EXISTS*/`activemq` /*!40100 DEFAULT CHARACTER SET latin1 COLLATE latin1_bin */; USE `activemq`; /*Table structure for table `ACTIVEMQ_ACKS` */ DROP TABLE IF EXISTS `ACTIVEMQ_ACKS`; CREATE TABLE `ACTIVEMQ_ACKS` ( `CONTAINER` varchar(250) COLLATE latin1_bin NOT NULL, `SUB_DEST` varchar(250) COLLATE latin1_bin DEFAULT NULL, `CLIENT_ID` varchar(250) COLLATE latin1_bin NOT NULL, `SUB_NAME` varchar(250) COLLATE latin1_bin NOT NULL, `SELECTOR` varchar(250) COLLATE latin1_bin DEFAULT NULL, `LAST_ACKED_ID` bigint(20) DEFAULT NULL, `PRIORITY` bigint(20) NOT NULL DEFAULT '5', `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL, PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`), KEY `ACTIVEMQ_ACKS_XIDX` (`XID`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin; /*Table structure for table `ACTIVEMQ_LOCK` */ DROP TABLE IF EXISTS `ACTIVEMQ_LOCK`; CREATE TABLE `ACTIVEMQ_LOCK` ( `ID` bigint(20) NOT NULL, `TIME` bigint(20) DEFAULT NULL, `BROKER_NAME` varchar(250) COLLATE latin1_bin DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin; /*Table structure for table `ACTIVEMQ_MSGS` */ DROP TABLE IF EXISTS `ACTIVEMQ_MSGS`; CREATE TABLE `ACTIVEMQ_MSGS` ( `ID` bigint(20) NOT NULL, `CONTAINER` varchar(250) COLLATE latin1_bin DEFAULT NULL, `MSGID_PROD` varchar(250) COLLATE latin1_bin DEFAULT NULL, `MSGID_SEQ` bigint(20) DEFAULT NULL, `EXPIRATION` bigint(20) DEFAULT NULL, `MSG` longblob, `PRIORITY` bigint(20) DEFAULT NULL, `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL, PRIMARY KEY (`ID`), KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`), KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`), KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`), KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`), KEY `ACTIVEMQ_MSGS_XIDX` (`XID`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin; /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
三、测试消息持久化 package com.ckl.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class HelloActiveMQ { public static void main(String[] args) throws Exception { HelloWorldProducer producer = new HelloWorldProducer(); HelloWorldConsumer consumer = new HelloWorldConsumer(); Thread threadProducer = new Thread(producer); threadProducer.start(); //注释掉消费者,不然的话,马上消费者马上把消息消费了,来不及持久化 // Thread threadConsumer = new Thread(consumer); // threadConsumer.start(); } public static class HelloWorldProducer implements Runnable { public void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.2.140:61616"); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); // Create a Session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue("DemoQueue"); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer producer = session.createProducer(destination); // producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息需要持久化 // Create a messages String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); TextMessage message = session.createTextMessage(text); // Tell the producer to send the message System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName()); producer.send(message); // Clean up session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } } public static class HelloWorldConsumer implements Runnable, ExceptionListener { public void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.2.140:61616"); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener(this); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue("DemoQueue"); // Create a MessageConsumer from the Session to the Topic or // Queue MessageConsumer consumer = session.createConsumer(destination); // Wait for a message Message message = consumer.receive(1000); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } public synchronized void onException(JMSException ex) { System.out.println("JMS Exception occured. Shutting down client."); } } } 运行后,效果如下: 四、单独启动消费者,查看数据库 注释掉上面的生产者,单独启动消费者后,发现数据库中的消息已被消费。 五、备注 Although the JDBC Store does not offer the best performance, it makes fairly simple to create a simple Master-Slave robust broker setup. 看到一段资料补充下,涉及到ActiveMq配置为主从模式时,都会去尝试连接该数据源,为了防止资源争用出现的问题,加了ACTIVEMQ_LOCK表。
|
|
来自: WindySky > 《ActiveMQ》