分享

ActiveMQ持久化消息的三种方式 【详细配置讲解】

 CevenCheng 2011-10-27

利用消息队列的异步策略,可以从很大程序上缓解程序的压力,但是,如果MQ所在的机器down机了,又如果队列中的数据不是持久的就会发生数据丢失,后果是可想而知的, 所以消息的持久化是不可不讨论的话题。

 

   1) 关于ActiveMQ消息队列的持久化,主要是在ActiveMQ的配置文件中设置(看粗体部分).

 

Java代码  收藏代码
  1. <!--  
  2.     Licensed to the Apache Software Foundation (ASF) under one or more  
  3.     contributor license agreements.  See the NOTICE file distributed with  
  4.     this work for additional information regarding copyright ownership.  
  5.     The ASF licenses this file to You under the Apache License, Version 2.0  
  6.     (the "License"); you may not use this file except in compliance with  
  7.     the License.  You may obtain a copy of the License at  
  8.      
  9.     http://www./licenses/LICENSE-2.0  
  10.      
  11.     Unless required by applicable law or agreed to in writing, software  
  12.     distributed under the License is distributed on an "AS IS" BASIS,  
  13.     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
  14.     See the License for the specific language governing permissions and  
  15.     limitations under the License.  
  16. -->  
  17. <!-- START SNIPPET: example -->  
  18. <beans  
  19.   xmlns="http://www./schema/beans"  
  20.   xmlns:amq="http://activemq./schema/core"  
  21.   xmlns:xsi="http://www./2001/XMLSchema-instance"  
  22.   xsi:schemaLocation="http://www./schema/beans http://www./schema/beans/spring-beans-2.0.xsd  
  23.   http://activemq./schema/core http://activemq./schema/core/activemq-core.xsd     
  24.   http://activemq./camel/schema/spring http://activemq./camel/schema/spring/camel-spring.xsd">  
  25.   
  26.     <!-- Allows us to use system properties as variables in this configuration file -->  
  27.     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
  28.          <property name="locations">  
  29.             <value>file:///${activemq.base}/conf/credentials.properties</value>  
  30.          </property>        
  31.     </bean>  
  32.   
  33.     <broker xmlns="http://activemq./schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data">  
  34.   
  35.         <!-- Destination specific policies using destination names or wildcards -->  
  36.         <destinationPolicy>  
  37.             <policyMap>  
  38.                 <policyEntries>  
  39.                     <policyEntry queue=">" memoryLimit="5mb"/>  
  40.                     <policyEntry topic=">" memoryLimit="5mb">  
  41.                       <!-- you can add other policies too such as these  
  42.                         <dispatchPolicy>  
  43.                             <strictOrderDispatchPolicy/>  
  44.                         </dispatchPolicy>  
  45.                         <subscriptionRecoveryPolicy>  
  46.                             <lastImageSubscriptionRecoveryPolicy/>  
  47.                         </subscriptionRecoveryPolicy>  
  48.                       -->  
  49.                     </policyEntry>  
  50.                 </policyEntries>  
  51.             </policyMap>  
  52.         </destinationPolicy>  
  53.   
  54.         <!-- Use the following to configure how ActiveMQ is exposed in JMX -->  
  55.         <managementContext>  
  56.             <managementContext createConnector="false"/>  
  57.         </managementContext>  
  58.   
  59.         <!-- The store and forward broker networks ActiveMQ will listen to -->  
  60.         <networkConnectors>  
  61.             <!-- by default just auto discover the other brokers -->  
  62.             <networkConnector name="default-nc" uri="multicast://default"/>  
  63.             <!-- Example of a static configuration:  
  64.             <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>  
  65.             -->  
  66.         </networkConnectors>  
  67.   
  68.         <persistenceAdapter>  
  69.             <amqPersistenceAdapter syncOnWrite="false" directory="${activemq.base}/data" maxFileLength="20 mb"/>  
  70.         </persistenceAdapter>  
  71.   
  72.         <!-- Use the following if you wish to configure the journal with JDBC -->  
  73.         <!--  
  74.         <persistenceAdapter>  
  75.             <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#postgres-ds"/>  
  76.         </persistenceAdapter>  
  77.         -->  
  78.   
  79.         <!-- Or if you want to use pure JDBC without a journal -->  
  80.           
  81.        <span style="color: #ff6600;"> <span style="color: #000000;"><persistenceAdapter>  
  82.             <jdbcPersistenceAdapter dataSource="#mysql-ds"/>  
  83.         </persistenceAdapter></span></span>  
  84.        
  85.   
  86.         <sslContext>  
  87.             <sslContext keyStore="file:${activemq.base}/conf/broker.ks" keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" trustStorePassword="password"/>  
  88.         </sslContext>  
  89.           
  90.         <!--  The maximum about of space the broker will use before slowing down producers -->  
  91.         <systemUsage>  
  92.             <systemUsage>  
  93.                 <memoryUsage>  
  94.                     <memoryUsage limit="20 mb"/>  
  95.                 </memoryUsage>  
  96.                 <storeUsage>  
  97.                     <storeUsage limit="1 gb" name="foo"/>  
  98.                 </storeUsage>  
  99.                 <tempUsage>  
  100.                     <tempUsage limit="100 mb"/>  
  101.                 </tempUsage>  
  102.             </systemUsage>  
  103.         </systemUsage>  
  104.   
  105.   
  106.         <!-- The transport connectors ActiveMQ will listen to -->  
  107.         <transportConnectors>  
  108.             <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>  
  109.             <transportConnector name="ssl" uri="ssl://localhost:61617"/>  
  110.             <transportConnector name="stomp" uri="stomp://localhost:61613"/>  
  111.             <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>  
  112.         </transportConnectors>  
  113.   
  114.     </broker>  
  115.   
  116.     <!--  
  117.     ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker  
  118.     ** For more details see  
  119.     **  
  120.     ** http://activemq./enterprise-integration-patterns.html  
  121.     -->  
  122.     <camelContext id="camel" xmlns="http://activemq./camel/schema/spring">  
  123.   
  124.         <!-- You can use a <package> element for each root package to search for Java routes -->  
  125.         <package>org.foo.bar</package>  
  126.   
  127.         <!-- You can use Spring XML syntax to define the routes here using the <route> element -->  
  128.         <route>  
  129.             <from uri="activemq:example.A"/>  
  130.             <to uri="activemq:example.B"/>  
  131.         </route>  
  132.     </camelContext>  
  133.   
  134.     <!--  
  135.     ** Lets configure some Camel endpoints  
  136.     **  
  137.     ** http://activemq./camel/components.html  
  138.     -->  
  139.   
  140.     <!-- configure the camel activemq component to use the current broker -->  
  141.     <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >  
  142.         <property name="connectionFactory">  
  143.           <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
  144.             <property name="brokerURL" value="vm://localhost?create=false&waitForStart=10000" />  
  145.             <property name="userName" value="${activemq.username}"/>  
  146.             <property name="password" value="${activemq.password}"/>  
  147.           </bean>  
  148.         </property>  
  149.     </bean>  
  150.   
  151.   
  152.   
  153.     <!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->  
  154.     <!--  
  155.     <commandAgent xmlns="http://activemq./schema/core" brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/>  
  156.     -->  
  157.   
  158.   
  159.     <!-- An embedded servlet engine for serving up the Admin console -->  
  160.     <jetty xmlns="http:///schemas/jetty/1.0">  
  161.         <connectors>  
  162.             <nioConnector port="8161"/>  
  163.         </connectors>  
  164.   
  165.         <handlers>  
  166.             <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>  
  167.             <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>  
  168.             <webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>  
  169.         </handlers>  
  170.     </jetty>  
  171.   
  172.     <!--  This xbean configuration file supports all the standard spring xml configuration options -->  
  173.   
  174.     <!-- Postgres DataSource Sample Setup -->  
  175.     <!--  
  176.     <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">  
  177.       <property name="serverName" value="localhost"/>  
  178.       <property name="databaseName" value="activemq"/>  
  179.       <property name="portNumber" value="0"/>  
  180.       <property name="user" value="activemq"/>  
  181.       <property name="password" value="activemq"/>  
  182.       <property name="dataSourceName" value="postgres"/>  
  183.       <property name="initialConnections" value="1"/>  
  184.       <property name="maxConnections" value="10"/>  
  185.     </bean>  
  186.     -->  
  187.   
  188.     <!-- MySql DataSource Sample Setup -->  
  189.       
  190. <span style="color: #000000;">   </span><span style="color: #000000;"> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
  191.       <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
  192.       <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>  
  193.       <property name="username" value="root"/>  
  194.       <property name="password" value=""/>  
  195.       <property name="maxActive" value="200"/>  
  196.       <property<strong> </strong>name="poolPreparedStatements" value="true"/>  
  197.     </bean></span>  
  198.    
  199.       
  200.     <!-- Oracle DataSource Sample Setup -->  
  201.     <!--  
  202.     <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
  203.       <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>  
  204.       <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>  
  205.       <property name="username" value="scott"/>  
  206.       <property name="password" value="tiger"/>  
  207.       <property name="maxActive" value="200"/>  
  208.       <property name="poolPreparedStatements" value="true"/>  
  209.     </bean>  
  210.     -->  
  211.   
  212.     <!-- Embedded Derby DataSource Sample Setup -->  
  213.     <!--  
  214.     <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">  
  215.       <property name="databaseName" value="derbydb"/>  
  216.       <property name="createDatabase" value="create"/>  
  217.     </bean>  
  218.     -->  
  219.   
  220. </beans>  
  221. <!-- END SNIPPET: example -->  

 

    改动部分主要是设置了mysql的datasource声明, 还有就是采用mysql作为persistenceAdapter,并声明如下。

 

Java代码  收藏代码
  1. <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
  2.     <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
  3.     <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>  
  4.     <property name="username" value="root"/>  
  5.     <property name="password" value=""/>  
  6.     <property name="maxActive" value="200"/>  
  7.     <property name="poolPreparedStatements" value="true"/>  
  8.   </bean>  
  9.   
  10.   
  11.   
  12. <persistenceAdapter>  
  13.           <jdbcPersistenceAdapter dataSource="#mysql-ds"/>  
  14. </persistenceAdapter>  

 

  

 

 

 

 2) 把数据库的驱动放入ActiveMQ的lib中,使其能够访问相应的数据库,关于数据库的表结构,ActiveMQ会自动创建,但是前提是当ActiveMQ启动以后,声明的数据库要是存在的。

    测试的时候发现以上条件都满足ActiveMQ还是会抛异常,看了一下异常,是有一张表(activemq_acks)创建的时候出了问题,自己手动创建后好了,把表结构列出来。

 

Java代码  收藏代码
  1. -- phpMyAdmin SQL Dump  
  2. -- version 2.10.2  
  3. -- http://www.phpmyadmin.net  
  4. --   
  5. -- 主机: localhost  
  6. -- 生成日期: 2009 年 11 月 06 日 05:29  
  7. -- 服务器版本: 5.0.45  
  8. -- PHP 版本: 5.2.3  
  9.   
  10. SET SQL_MODE="NO_AUTO_VALUE_ON_ZERO";  
  11.   
  12. --   
  13. -- 数据库: `activemq`  
  14. --   
  15.   
  16. -- --------------------------------------------------------  
  17.   
  18. --   
  19. -- 表的结构 `activemq_acks`  
  20. --   
  21.   
  22. CREATE TABLE `activemq_acks` (  
  23.   `SUB` varchar(250) collate utf8_bin NOT NULL,  
  24.   `CONTAINER` varchar(250) collate utf8_bin NOT NULL,  
  25.   `LAST_ACKED_ID` int(11default NULL,  
  26.   `SE_ID` int(11default NULL,  
  27.   `SE_CLIENT_ID` varchar(250) collate utf8_bin default NULL,  
  28.   `SE_CONSUMER_NAME` varchar(250) collate utf8_bin default NULL,  
  29.   `SE_SELECTOR` varchar(250) collate utf8_bin default NULL  
  30. ) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_bin;  
  31.   
  32. --   
  33. -- 导出表中的数据 `activemq_acks`  
  34. --   
  35.   
  36.   
  37. -- --------------------------------------------------------  
  38.   
  39. --   
  40. -- 表的结构 `activemq_lock`  
  41. --   
  42.   
  43. CREATE TABLE `activemq_lock` (  
  44.   `ID` bigint(20) NOT NULL,  
  45.   `TIME` bigint(20default NULL,  
  46.   `BROKER_NAME` varchar(250) collate utf8_bin default NULL,  
  47.   PRIMARY KEY  (`ID`)  
  48. ) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_bin;  
  49.   
  50. --   
  51. -- 导出表中的数据 `activemq_lock`  
  52. --   
  53.   
  54. INSERT INTO `activemq_lock` (`ID`, `TIME`, `BROKER_NAME`) VALUES   
  55. (11257485355546, NULL);  
  56.   
  57. -- --------------------------------------------------------  
  58.   
  59. --   
  60. -- 表的结构 `activemq_msgs`  
  61. --   
  62.   
  63. CREATE TABLE `activemq_msgs` (  
  64.   `ID` int(11) NOT NULL,  
  65.   `CONTAINER` varchar(250) collate utf8_bin default NULL,  
  66.   `MSGID_PROD` varchar(250) collate utf8_bin default NULL,  
  67.   `MSGID_SEQ` int(11default NULL,  
  68.   `EXPIRATION` bigint(20default NULL,  
  69.   `MSG` longblob,  
  70.   PRIMARY KEY  (`ID`),  
  71.   KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),  
  72.   KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),  
  73.   KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`)  
  74. ) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_bin;  
  75.   
  76. --   
  77. -- 导出表中的数据 `activemq_msgs`  
  78. --   

 

当消息发送至ActiveMQ时,数据就被持久化到mysql了,如果消息被消费,数据会自动被删除,down机后重启没影响,有一点不好的是,这个有点拖数据库,我在本地的mysql,一开启ActiveMQ, 数据库就会变得很慢,不过这个只是在本地的机子上,想必实际应用时应该好很多。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约