分享

ActiveMQ使用笔记

 开心豆豆2010 2014-03-28

(一)ActiveMQ的安装

安装要求:

部署需要jdk1.5及以上,编译需要jdk1.5(java5)及以上

Java的环境变量(JAVA_HOME)必须设置,即jdk安装的目录,比如c:\Program Files\jsdk.1.6

下载ActiveMQ:http://activemq./download.html

解压,如图:

目录结构

运行bin文件夹下的activemq.bat,出现如下图所示:

运行

验证是否运行成功:

在浏览器中输入:http://localhost:8161/admin/,出现如下图所示表示成功:

控制台

此时,ActiveMQ已经安装完成了,接下来配置登录监视控制台的用户名和密码。

打开conf文件夹下的jetty.xml,找到

1
2
3
4
5
6
7
8
9
    <beanid="securityConstraint">
        <property name="name"value="BASIC"/>
        <property name="roles"value="admin"/>
        <property name="authenticate"value="false"/>
    </bean>

把authenticate属性的值改成true即可,重启activemq.bat,再登录监视控制台,就需要输入密码了,默认的用户名和密码是admin/admin。roles属性指的是登录的用户角色,这些登录的用户在jetty-realm.properties配置。

修改web的访问端口,在jetty.xml找到一下配置,修改8161即可。

1
2
3
4
5
6
7
        <property name="connectors">
           <list>
               <beanid="Connector"class="org.eclipse.jetty.server.nio.SelectChannelConnector">
                   <propertyname="port"value="8161"/>
               </bean>
           </list>
        </property>

ActiveMQ的运行日志存放在data文件夹下的activemq.log中。

Linux和Aix系统下的安装:

解压:tar zxvf activemq-x.x.x.tar.gz,进入bin文件夹,运行:./activemq start &,也可以只运行:./activemq console。

验证方式和安全性配置和windows下的配置一样。

 

(二)ActiveMQ消息持久化(1)

在broker中设置属性persistent=”true”(默认是true),同时发送的消息也应该是persitent类型的。ActiveMQ消息持久化有三种方式:AMQ、KahaDB、JDBC。

1、AMQ

AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32兆,如果一条消息的大小超过了32兆,那么这个值必须设置大点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。默认配置如下:

Java
1
2
3
    <persistenceAdapter>
     <amqPersistenceAdapterdirectory="activemq-data"maxFileLength="32mb"/>
    </persistenceAdapter>

AMQ的属性:

 

属性名称 默认值 描述
directory activemq-data 消息文件和日志的存储目录
useNIO true 使用NIO协议存储消息
syncOnWrite false 同步写到磁盘,这个选项对性能影响非常大
maxFileLength 32mb 一个消息文件的大小
persistentIndex true 消息索引的持久化,如果为false,那么索引保存在内存中
maxCheckpointMessageAddSize 4kb 一个事务允许的最大消息量
cleanupInterval 30000 清除操作周期,单位ms
indexBinSize 1024 索引文件缓存页面数,缺省为1024,当amq扩充或者缩减存储时,会锁定整个broker,导致一定时间的阻塞,所以这个值应该调整到比较大,但是代码中实现会动态伸缩,调整效果并不理想。
indexKeySize 96 索引key的大小,key是消息ID
indexPageSize 16kb 索引的页大小
directoryArchive archive 存储被归档的消息文件目录
archiveDataLogs false 当为true时,归档的消息文件被移到directoryArchive,而不是直接删除

(二)ActiveMQ消息持久化(2)

2、KahaDB

KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。默认配置如下:

Java
1
2
3
    <persistenceAdapter>
       <kahaDBdirectory="activemq-data"journalMaxFileLength="32mb"/>
    </persistenceAdapter>

KahaDB的属性:

 

property name default value Comments
directory activemq-data 消息文件和日志的存储目录
indexWriteBatchSize 1000 一批索引的大小,当要更新的索引量到达这个值时,更新到消息文件中
indexCacheSize 10000 内存中,索引的页大小
enableIndexWriteAsync false 索引是否异步写到消息文件中
journalMaxFileLength 32mb 一个消息文件的大小
enableJournalDiskSyncs true 是否讲非事务的消息同步写入到磁盘
cleanupInterval 30000 清除操作周期,单位ms
checkpointInterval 5000 索引写入到消息文件的周期,单位ms
ignoreMissingJournalfiles false 忽略丢失的消息文件,false,当丢失了消息文件,启动异常
checkForCorruptJournalFiles false 检查消息文件是否损坏,true,检查发现损坏会尝试修复
checksumJournalFiles false 产生一个checksum,以便能够检测journal文件是否损坏。
5.4版本之后有效的属性:    
archiveDataLogs false 当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
directoryArchive null 存储被归档的消息文件目录
databaseLockedWaitDelay 10000 在使用负载时,等待获得文件锁的延迟时间,单位ms
maxAsyncJobs 10000 同个生产者产生等待写入的异步消息最大量
concurrentStoreAndDispatchTopics false 当写入消息的时候,是否转发主题消息
concurrentStoreAndDispatchQueues true 当写入消息的时候,是否转发队列消息
5.6版本之后有效的属性:    
archiveCorruptedIndex false 是否归档错误的索引

从5.6版本之后,有可能发布通过多个kahadb持久适配器来实现分布式目标队列存储。什么时候用呢?如果有一个快速的生产者和消费者,当某一个时刻生产者发生了不规范的消费,那么有可能产生一条消息被存储在两个消息文件中,同时,有些目标队列是危险的并且要求访问磁盘。在这种情况下,你应该用通配符来使用mKahaDB。如果目标队列是分布的,事务是可以跨越多个消息文件的。

每个KahaDB的实例都可以配置单独的适配器,如果没有目标队列提交给filteredKahaDB,那么意味着对所有的队列有效。如果一个队列没有对应的适配器,那么将会抛出一个异常。配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<persistenceAdapter>
 <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
     <!--matchallqueues-->
      <filteredKahaDB queue=">">
       <persistenceAdapter>
          <kahaDB journalMaxFileLength="32mb"/>
       </persistenceAdapter>
      </filteredKahaDB>
     
      <!--matchalldestinations-->
     <filteredKahaDB>
        <persistenceAdapter>
         <kahaDBenableJournalDiskSyncs="false"/>
        </persistenceAdapter>
     </filteredKahaDB>
    </filteredPersistenceAdapters>
 </mKahaDB>
</persistenceAdapter>

如果filteredKahaDB的perDestination属性设置为true,那么匹配的目标队列将会得到自己对应的KahaDB实例。配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
<persistenceAdapter>
 <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
     <!--kahaDBperdestinations-->
      <filteredKahaDB perDestination="true">
       <persistenceAdapter>
          <kahaDB journalMaxFileLength="32mb"/>
       </persistenceAdapter>
      </filteredKahaDB>
   </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

(二)ActiveMQ消息持久化(3)

3、JDBC

配置JDBC适配器:

1
2
3
    <persistenceAdapter>
       <jdbcPersistenceAdapterdataSource="#mysql-ds"createTablesOnStartup="false"/>
    </persistenceAdapter>

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

MYSQL持久化bean

1
2
3
4
5
6
7
  <beanid="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">
   <propertyname="driverClassName"value="com.mysql.jdbc.Driver"/>
    <propertyname="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
   <propertyname="username"value="activemq"/>
    <propertyname="password"value="activemq"/>
   <propertyname="poolPreparedStatements"value="true"/>
  </bean>

SQL Server持久化bean

1
2
3
4
5
6
7
<beanid="mssql-ds"class="net.sourceforge.jtds.jdbcx.JtdsDataSource"destroy-method="close">
  <propertyname="serverName"value="SERVERNAME"/>
   <propertyname="portNumber"value="PORTNUMBER"/>
  <propertyname="databaseName"value="DATABASENAME"/>
   <propertyname="user"value="USER"/>
  <propertyname="password"value="PASSWORD"/>
</bean>

Oracle持久化bean

1
2
3
4
5
6
7
8
  <beanid="oracle-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">
   <propertyname="driverClassName"value="oracle.jdbc.driver.OracleDriver"/>
    <propertyname="url"value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
   <propertyname="username"value="activemq"/>
    <propertyname="password"value="activemq"/>
   <propertyname="maxActive"value="200"/>
    <propertyname="poolPreparedStatements"value="true"/>
 </bean>

DB2持久化bean

1
2
3
4
5
6
7
8
  <beanid="db2-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
     <propertyname="driverClassName"value="com.ibm.db2.jcc.DB2Driver"/>
      <property name="url"value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
     <propertyname="username"value="activemq"/>
      <property name="password"value="activemq"/>
     <propertyname="maxActive"value="200"/>
      <property name="poolPreparedStatements"value="true"/>
 </bean>

(三)ActiveMQ消息发送与接收

配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:
activemq所需的jar包
一段发送消息的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    publicstaticvoidsend(){
   try{
            // 创建一个连接工厂
           Stringurl="tcp://localhost:61616";
            ActiveMQConnectionFactory connectionFactory=newActiveMQConnectionFactory(url);
           // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
            connectionFactory.setUserName("system");
           connectionFactory.setPassword("manager");
            // 创建连接
           Connectionconnection=connectionFactory.createConnection();
           connection.start();
           // 创建Session,参数解释:
           // 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
           // 第二个参数消息的确认模式:
           // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
           // CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
           // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。
           Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
           // 创建目标,就创建主题也可以创建队列
           Destinationdestination=session.createQueue("test");
           // 创建消息生产者
           MessageProducerproducer=session.createProducer(destination);
           // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
           producer.setDeliveryMode(DeliveryMode.PERSISTENT);
           // 创建消息
           Stringtext="Hello ActiveMQ!";
           TextMessagemessage=session.createTextMessage(text);
           // 发送消息到ActiveMQ
           producer.send(message);
           System.out.println("Message is sent!");
           // 关闭资源
           session.close();
           connection.close();
       }
        catch (Exceptione){
           e.printStackTrace();
        }
   }

执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:

点击队列名test,然后点击消息ID即可查看消息内容,如图:

如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
publicstaticvoidget(){
   try{
        String url="tcp://localhost:61616";
       ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);
        // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
       connectionFactory.setUserName("system");
        connectionFactory.setPassword("manager");
       // 创建连接
        Connection connection=connectionFactory.createConnection();
       connection.start();
        // 创建Session
       Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 创建目标,就创建主题也可以创建队列
       Destinationdestination=session.createQueue("test");
        // 创建消息消费者
       MessageConsumerconsumer=session.createConsumer(destination);
        // 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
       Messagemessage=consumer.receive(1000);
        if (messageinstanceofTextMessage){
           TextMessagetextMessage=(TextMessage)message;
           Stringtext=textMessage.getText();
           System.out.println("Received: "+ text);
        } else{
           System.out.println("Received: "+ message);
        }
       consumer.close();
        session.close();
       connection.close();
    } catch(Exceptione){
       e.printStackTrace();
    }
}

执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:

这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。

 

(四)ActiveMQ配置安全性

监视ActiveMQ的方式有多种,在第一部分中已经说到了Web监视控制台,设置登录用户名和密码,这里再说一下JMX监控。运行了ActiveMQ之后,再运行jdk自带的jconsole即可以看到ActiveMQ的进程,如图:点击连接之后就可以看到ActiveMQ的运行情况。默认情况下是不需要用户名和口令的,修改activemq.bat,找到

1
2
3
SUNJMX=-Dcom.sun.management.jmxremote.port=1099
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false

修改成

1
2
3
4
5
SUNJMX=-Dcom.sun.management.jmxremote.port=1616
-Dcom.sun.management.jmxremote.authenticate=true
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password
-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access

Linux下的找到:

1
2
3
4
5
#ACTIVEMQ_SUNJMX_START="-Dcom.sun.management.jmxremote.port=11099 "
#ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.password.file=${ACTIVEMQ_CONFIG_DIR}/jmx.password"
#ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.access.file=${ACTIVEMQ_CONFIG_DIR}/jmx.access"
#ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.ssl=false"
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote"

去掉注释即可。
重启ActiveMQ之后,在用jconsole连接就需要输入用户名和密码,jmx.access文件配置用户的访问权限readonly和readwrite,admin readwrite表示用户admin具有读写权限。Jmx.password文件配置用户的密码,admin activemq 表示admin用户的密码是activemq。

除了监视台可以设置用户名和密码之后,ActiveMQ也可以对各个主题和队列设置用户名和密码,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<plugins>
 <!--Configureauthentication;Username,passwordsandgroups-->
  <simpleAuthenticationPlugin>
     <users>
          <authenticationUser username="system"password="manager"groups="users,admins"/>
         <authenticationUserusername="user"password="password"groups="users"/>
          <authenticationUser username="guest"password="password"groups="guests"/>
         <authenticationUserusername="testUser"password="123456"groups="testGroup"/>
      </users>
 </simpleAuthenticationPlugin>
 <!-- Letsconfigureadestinationbasedauthorizationmechanism-->
  <authorizationPlugin>
   <map>
      <authorizationMap>
       <authorizationEntries>
          <authorizationEntry queue="queue.group.uum"read="users"write="users"admin="users"/>
         <authorizationEntryqueue=">"read="admins"write="admins"admin="admins"/>
          <authorizationEntry queue="USERS.>"read="users"write="users"admin="users"/>
         <authorizationEntryqueue="GUEST.>"read="guests"write="guests,users"admin="guests,users"/>
         <authorizationEntryqueue="TEST.Q"read="guests"write="guests"/>
          <authorizationEntry queue="test"read=" testGroup "write=" testGroup "/>
          <authorizationEntry topic=">"read="admins"write="admins"admin="admins"/>
         <authorizationEntrytopic="USERS.>"read="users"write="users"admin="users"/>
          <authorizationEntry topic="GUEST.>"read="guests"write="guests,users"admin="guests,users"/>
          <authorizationEntry topic="ActiveMQ.Advisory.>"read="guests,users ,testGroup"write="guests,users ,testGroup "admin="guests,users ,testGroup "/>
       </authorizationEntries>
      </authorizationMap>
   </map>
  </authorizationPlugin>
</plugins>

simpleAuthenticationPlugin中设置用户名、密码和群组,authorizationPlugin设置主题和队列的访问群组,“>”表示所有的主题或者队列。上面的配置中添加了一个testUser,属于群组testGroup,同时设置test这个队列的访问读写权限为testGroup,当然admins也可以访问的,因为admins是对所有的队列都有访问权限。将第三部分代码中的设置用户名和密码改成刚刚添加的用户testUser,如果密码不正确,将会抛出User name or password is invalid.异常,如果testUser所属的群组不能访问test队列,那么会抛出User guest is not authorized to write to: queue://test异常。需要注意的是所有的群组都需要对以ActiveMQ.Advisory为前缀的主题具有访问权限。

 

(五)ActiveMQ负载均衡

ActiveMQ可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当有一条消息发送到brokerA的队列test中,有一个客户端连接到brokerB上,并且要求获取test队列的消息时,brokerA中队列test的消息就会路由到brokerB上,反之brokerB的消息也会路由到brokerA。
静态路由配置,brokerA不需要特别的配置,brokerB需要配置networkConnectors节点,具体配置如下:

1
2
3
        <networkConnectors>
           <networkConnectoruri="static:(tcp://localhost:61616)"duplex="true"/>
        </networkConnectors>

静态路由支持failover,如:static:failover://(tcp://host1:61616,tcp://host2:61616)。
动态路由配置,每个mq都需要配置如下:

1
2
3
4
5
6
7
<networkConnectors>
   <networkConnectoruri="multicast://default"/>
</networkConnectors>
<transportConnectors>
   <transportConnectorname="openwire"uri="tcp://0.0.0.0:61618"discoveryUri="multicast://default"/>
</transportConnectors>

注意:networkConnectors需要配置在persistenceAdapter之前。
重启ActiveMQ,可以看到brokerA的日志如图:
networkConnector的属性请参照:http://activemq./networks-of-brokers.html

 

(六)ActiveMQ主备配置

ActiveMQ的主备有三种方式:纯Master/Slave、文件共享方式、数据库共享方式。
1、纯Master/Slave
这种方式的主备不需要对Master Broker做特殊的配置,只要在Slave Broker中指定他的Master就可以了,指定Master有两种方式,最简单的配置就是在broker节点中添加masterConnectorURI=”tcp://localhost:61616″即可,还有一种方式就是添加一个services节点,可以指定连接的用户名和密码,配置如下:

1
2
3
<services>
 <masterConnectorremoteURI="tcp://localhost:61616"userName="system"password="manager"/>
</services>

纯Master/Slave只允许一个Slave连接到Master上面,也就是说只能有2台MQ做集群,同时当Master挂了之后需要停止Slave来恢复负载。
2、数据库共享方式
这种方式的主备采用数据库做消息的持久化,支持多个Slave,所有broker持久化数据源配置成同一个数据源,当一个broker获取的数据库锁之后,其他的broker都成为slave并且等待获取锁,当master挂了之后,其中的一个slave将会立刻获得数据库锁成为master,重启之前挂掉的master之后,这个master也就成了slave,不需要停止slave来恢复。由于采用的是数据库做为持久化,它的性能是有限的。
3、文件共享方式
这种方式的主备具有和数据库共享方式的负载一样的特性,不同的是broker的持久化采用的是文件(我这里用KahaDB),slave等待获取的锁是文件锁,它具有更高的性能,但是需要文件共享系统的支持。
Window下共享KahaDB持久化的目录,配置如下:

1
2
3
<persistenceAdapter>
   <kahaDBdirectory="//172.16.1.202/mqdata/kahadb"/>
</persistenceAdapter>

Linux下需要开启NFS服务,具体操作如下:
创建共享目录(192.168.0.1):
1、 修改etc/exports,添加需要共享的目录:/opt/mq/data *(rw,no_root_squash)
2、 启动NFS服务 service nfs start/restart
3、 查看共享 showmount –e
4、 NFS服务自启动 chkconfig –level 35 nfs on

挂载共享目录(192.168.0.2):
1、 挂载:mount –t nfs 192.168.0.1:/opt/mq/data /opt/mq/data
2、 启动自动挂载:在etc/fstab文件添加10.175.40.244:/opt/mq/data /opt/mq/data nfs defaults 0 0
然后指定KahaDB的持久化目录为/opt/mq/data即可。

AIX系统的文件共享和Linux类似,也是启动NFS服务。
注意:如果Master服务器宕机了,Slave是不会获得文件锁而启动,直到Master服务器重启。
Window下Master上有Slave连接时如图:

客户端连接的brokerURL为failover:(tcp://localhost:61616,tcp://localhost:61617)。用第三部分的代码测试,先向Master Broker发送一个消息,然后关闭master,运行获取消息的方法,即可获取之前发送的消息。

 

(七)ActiveMQ性能优化

1、目标策略

在节点destinationPolicy配置策略,可以对单个或者所有的主题和队列进行设置,使用流量监控,当消息达到memoryLimit的时候,ActiveMQ会减慢消息的产生甚至阻塞,destinationPolicy的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<destinationPolicy>
   <policyMap>
      <policyEntries>
       <policyEntrytopic=">"producerFlowControl="true"memoryLimit="1mb">
          <pendingSubscriberPolicy>
           <vmCursor/>
          </pendingSubscriberPolicy>
       </policyEntry>
        <policyEntry queue=">"producerFlowControl="true"memoryLimit="1mb">
         <!--UseVMcursor forbetterlatency
              Formoreinformation,see:             
              http://activemq./message-cursors.html             
          <pendingQueuePolicy>
           <vmQueueCursor/>
          </pendingQueuePolicy>
         -->
        </policyEntry>
     </policyEntries>
    </policyMap>
</destinationPolicy>

producerFlowControl表示是否监控流量,默认为true,如果设置为false,消息就会存在磁盘中以防止内存溢出;memoryLimit表示在producerFlowControl=”true”的情况下,消息存储在内存中最大量,当消息达到这个值时,ActiveMQ会减慢消息的产生甚至阻塞。policyEntry的属性参考:http://activemq./per-destination-policies.html

当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中。接下来,如果发现当前有活跃的consumer,如果这个consumer消费消息的速度能跟上producer生产消息的速度,那么ActiveMQ会直接把消息传递给broker内部跟这个consumer关联的dispatch queue;如果当前没有活跃的consumer或者consumer消费消息的速度跟不上producer生产消息的速度,那么ActiveMQ会使用Pending Message Cursors保存对消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给broker内部跟这个consumer关联的dispatch queue。以下是两种Pending Message Cursors:

VM Cursor:在内存中保存消息的引用。

File Cursor:首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中。

在缺省情况下,ActiveMQ 会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。

对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有

vmDurableCursor 和 fileDurableSubscriberCursor;对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。

Message Cursors的使用参考:http://activemq./message-cursors.html

2、存储设置

设置消息在内存、磁盘中存储的大小,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<systemUsage>
 <systemUsage>
      <memoryUsage>
         <memoryUsagelimit="20 mb"/>
      </memoryUsage>
     <storeUsage>
          <storeUsage limit="1 gb"/>
     </storeUsage>
      <tempUsage>
         <tempUsagelimit="100 mb"/>
      </tempUsage>
 </systemUsage>
</systemUsage>

memoryUsage表示ActiveMQ使用的内存,这个值要大于等于destinationPolicy中设置的所有队列的内存之和。

storeUsage表示持久化存储文件的大小。

tempUsage表示非持久化消息存储的临时内存大小。

 

3.  优化ActiveMQ性能
3.1.  一般技术
3.1.1.  Persistent vs Non-Persistent Message
持久化和非持久化传递

1.PERSISTENT(持久性消息)

这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

2.NON_PERSISTENT(非持久性消息)

保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。

此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。

有两种方法指定传送模式:

1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式;

2.使用send 方法为每一条消息设置传送模式;

方法一:void send(Destination destination, Message message, int deliveryMode, int priority,long timeToLive);

方法二:void send(Message message, int deliveryMode, int priority, longtimeToLive);

其中 deliveryMode 为传送模式,priority 为消息优先级,timeToLive 为消息过期

时间。

方法三:producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

JMS 规范1.1允许消息传递包括Persistent和Non-Persistent。

Non-persistent传递消息比Persistents传递消息速度更快,原因如下:

1)      Non-persistent发送消息是异步的,Producer不需要等待Consumer的receipt消息。如下图:

2)      Persisting 传递消息是需要把消息存储起来。然后在传递,这样很慢 。

3.1.2.  Transactions
事务

以下列子说明了Transaction比Non-transaction的性能高。

Transaction和Non-transaction代码如下:

3.1.3.  超快回应消息
内嵌 broker;如下图:

下面以Co-lcate (合作定位)with a broker为例。

其运行原理如下图:

Java代码如下:

创建一个queue服务:

创建一个queueRequestor:

注意:

设置发送的消息不需要copy。

3.1.4.  Tuning  the OpenWire protocol
跨语言协议

//TODO

3.1.5.  Tuning the TCP Transport
TCP协议是ActiveMQ使用最常见的协议。

有以下两点影响TCP协议性能:

1)      socketBufferSize=缓存,默认是65536。

2)      tcpNoDelay=默认是false,

示例如下:

3.2.  优化消息发送
3.2.1.  Asynchronous Send
在ActiveMQ4.0以上,所有的异步或同步对于Consumer来说是变得可配置了。

默认是在ConnectionFactory、Connection、Connection URI等方面配置对于一个基于Destination 的Consumer来说。

众所周之,如果你想传递给Slow Consumer那么你可能使用异步的消息传递,但是对于Fast Consumer你可能使用同步发送消息。(这样可以避免同步和上下文切换额外的增加Queue堵塞花费。如果对于一个Slow Consumer,你使用同步发送消息可能出现Producer堵塞等显现。

ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Slow Consumer则使用dispatcheAsync=true,反之,那你使用的是Fast Consumer则使用dispatcheAsync=false。

用Connection URI来配置Async如下:

ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");用ConnectionFactory配置Async如下:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);用Connection配置Async如下:

((ActiveMQConnection)connection).setUseAsyncSend(true);

3.2.2.  Producer Flow Control

这种适合于慢的消费者,大量的消息暂时存储到内存中,然后慢慢的dispatche。

运行原理如图下:

Java代码如下:

Xml配置的策略如下:

Disabled Producer Flow Control运行原理:

1.3.  优化消息消费者
消息消费的内部流程结构如下:

3.3.1.  Prefetch Limit
ActiveMQ默认的prefetch大小不同的:

Queue Consumer 默认大小=1000

Queue Browser Consumer默认大小=500

Persistent Topic Consumer默认大小=100

Non-persistent Topic Consumer默认大小=32766

Prefecth policy设置如下:

设置prefetch policy在 Destinations上:

3.3.2.  Delivery and Acknowledgement of messages
传递和回执消息。

建议使用Session.DUPS_ACKNOWLEDGE。

JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。
    在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Sessiion.TRANSACTION。用session.commit()回执确认。
Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。当消息到达一定数量后,才开始消费该消息。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。
优化回执:

3.3.3.  Slow Consumer Handling
慢消费者绑定策略

Slow Consumer将一起一个问题,对于非持久主题上,强迫Broker发送的消息堆积起来,使得Broker对于Producer发送慢了下来,同时Fast Consumer也慢了下来。

目前,我们有一个策略来配置在原有Consumer的预存的大小的基础上增加了一定的缓存大小。因此,这个大小最终一旦满了,则旧消息将会丢弃,新消息则会进入。这将保持了一定的缓存大小。

Pending Message Limit Strategy

等待消息限制策略

   对于Slow Consumer来说,你将配置PendingMessageLimitStrategy策略来处理不同的策略。

以下有两种实现的策略:

ConstantPendingMessageLimitStrategy

Limit可以设置0、>0、-1三种方式:

0表示:不额外的增加其预存大小。

>0表示:在额外的增加其预存大小。

-1表示:不增加预存也不丢弃旧的消息。

<constantPendingMessageLimitStrategy limit="50"/>PrefetchRatePendingMessageLimitStrategy

这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。

<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>Configuring the Eviction Policy

配置去除策略

ActiveMQ有两种方式,默认配置方式如下:

<oldestMessageEvictionStrategy/>

方式二:

去除旧消息根据它的优先级来判断,如果你有一个较高的优先级的旧消息,则去除低优先级的消息。

<oldestMessageWithLowestPriorityEvictionStrategy/>                   1.3.4.  Destination Options
Destination Options 这种方式提供了扩展了JMS Consumer但并不是扩展了JMS  API。以URL的形式来编码的。

Consumer Options

示例如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

 

(八)使用过程中出现的问题

1、消息文件越来越多,导致超出了存储空间

报错日志:Usage Manager Store is Full, 100% of 1073741824. Stopping producer (ID:db01-48754-1336034955132-0:5:1:1) to prevent flooding queue://queue.land.group. See http://activemq./producer-flow-control.html for more info (blocking for: 1s) | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///172.24.99.41:44716

这是由于我们在配置文件中设置了storeUsage ,当存储的消息文件(log文件)超过了这值就会报这个异常,在官方网站看到说消息文件不删除是5.3版本的一个bug,在5.5版本的时候已经被解决了,但是我们使用的是5.5.1版本啊,然后在看存储下来的消息文件,文件名不是连续的,那么说明其中还是有被删除的,后来在评论中看到Jeff Genender 说的这个可能是ActiveMQ的线程调度问题,只要不使用线程调度就可以了,在broker中设置属性schedulerSupport=”false” ,这样消息文件就会自动在cleanup阶段删除了。

官方网址:https://issues./jira/browse/AMQ-2736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多