ActiveMQ部署及发送接收消息迷蝶下载下载地址:http://activemq.apache.org/我这里使用的版本为当前最新5. 8.0。下载版本有Windows和Linux两个版本,且都分为32位和64位。根据自己需要选择下载。安装我这里下载的为window s的32位版本(apache-activemq-5.8.0-bin.zip),下载后直接解压到需要安装的目录或在直接解压到当前目录 也可,解压完安装也完成。解压后目录如上图,里面包含了示例和文档,及所有的jar包。运行进入到bin目录(apache-active mq-5.8.0\bin),双击activemq.bat,就会运行,运行截图如下:此时表示ActiveMQ已经在运行了,当然正常生 产环境下可以设置作为服务在后台运行,并且随系统启动而启动。测试ActiveMQ自带了一套管理系统,访问http://localho st:8161/admin/http://localhost:8161/admin/,会出现需要输入用户名和密码的页面如下:默认用 户名和密码都是admin,进入后则为主界面:在这个界面上,我们可以管理队列及其他的一些功能,为了下面的继续,我们在这里创建一个Qu eue和一个Topic(Queue和Topic的区别见附件一)。点击目录上的Queues进入创建Queue页面,输入Queue名称 ,点击Create后下面就创建了G2Queue的queue队列。这里也可以不用这样手工创建,在发送端指定了一个Queue或Topi c名字后,会自动创建一个队列,如上面的choice.queue和FirstQueue都是我测试程序时,程序里面指定的Queue名称 ,自动创建的。同样的方式创建一个Topic,如下:发送消息创建一个新的项目,我这里是创建的webproject名称为Active MQ,引入ActiveMQ的jar包,整个工程结构如下:此段代码从网上直接copy,只是稍作修改:importjava.util .Random;importjavax.jms.Connection;importjavax.jms.ConnectionFa ctory;importjavax.jms.DeliveryMode;importjavax.jms.Destination; importjavax.jms.JMSException;importjavax.jms.MessageProducer;im portjavax.jms.Session;importjavax.jms.TextMessage;importorg.ap ache.activemq.ActiveMQConnectionFactory;publicclassSendMessage {privatestaticfinalStringurl="tcp://localhost:61616";privat estaticfinalStringQUEUE_NAME="G2Queue";publicvoidsendMess age()throwsJMSException{//JMS客户端到JMSProvider的连接Connection connection=null;try{//连接工厂,JMS用它创建连接//构造ConnectionFactory实例 对象,此处采用ActiveMq的实现jarConnectionFactoryconnectionFactory=newAc tiveMQConnectionFactory(url);connection=(Connection)connection Factory.createConnection();//启动连接connection.start();//Session:发 送或接收消息的线程//获取sessionSessionsession=(Session)connection.creat eSession(false,Session.AUTO_ACKNOWLEDGE);//消息的目的地,消息发送到那个队列Desti nationdestination=session.createQueue(QUEUE_NAME);//MessagePr oducer:消息发送者(生产者)//创建消息发送者MessageProducerproducer=session.cre ateProducer(destination);//设置是否持久化//DeliveryMode.NON_PERSISTENT :不持久化//DeliveryMode.PERSISTENT:持久化producer.setDeliveryMode(Deliv eryMode.PERSISTENT);Stringmsg="";inti=0;do{msg="第"+i+ "次发送的消息:"+newRandom();TextMessagemessage=session.createText Message(msg);Thread.sleep(1000);//发送消息到目的地方producer.send(mess age);System.out.println("发送消息:"+msg);i++;}while(i<1000);} catch(Exceptione){e.printStackTrace();}}publicstaticvoidmai n(String[]args){SendMessagesndMsg=newSendMessage();try{snd Msg.sendMessage();}catch(Exceptionex){System.out.println(ex.t oString());}}}运行结果如下:接收消息packagecn.g2room.mq.test;importjavax.j ms.Connection;importjavax.jms.ConnectionFactory;importjavax.jms .Destination;importjavax.jms.JMSException;importjavax.jms.Messa ge;importjavax.jms.MessageConsumer;importjavax.jms.Session;impo rtjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConne ctionFactory;/消息接收类@createTime:Apr7,20135:11:11PM @author:迷蝶@version: 0.1@lastVersion:0.1@updateTime:@updateAuthor:mailto:252909344@qq.com">迷蝶@changesSum:/publicclass ReceiveMessage{privatestaticfinalStringurl="tcp://localhos t:61616";privatestaticfinalStringQUEUE_NAME="G2Queue";publi cvoidreceiveMessage(){Connectionconnection=null;try{try{C onnectionFactoryconnectionFactory=newActiveMQConnectionFactor y(url);connection=connectionFactory.createConnection();}catch (Exceptione){System.out.println(e.toString());}connection.start ();Sessionsession=connection.createSession(false,Session.AUTO_ ACKNOWLEDGE);Destinationdestination=session.createQueue(QUEUE_ NAME);//消息接收者,也就是消费者MessageConsumerconsumer=session.createCon sumer(destination);consumeMessagesAndClose(connection,session,c onsumer);}catch(Exceptione){System.out.println(e.toString()); }}/接收和关闭消息,如遇到消息内容为close则,关闭连接@paramconnectionJMS客户端到 JMSProvider的连接@paramsession发送或接收消息的线程@paramconsumer消息接收 对象@throwsJMSException@authercom">迷蝶Apr8,201310:31:55AM/protectedvoidconsumeMes sagesAndClose(Connectionconnection,Sessionsession,MessageConsu merconsumer)throwsJMSException{do{Messagemessage=consumer .receive(1000);if("close".equals(message)){consumer.close();sess ion.close();connection.close();}if(message!=null){onMessage(m essage);}}while(true);}publicvoidonMessage(Messagemessage){ try{if(messageinstanceofTextMessage){TextMessagetxtMsg=(TextMessage)message;Stringmsg=txtMsg.getText();System.out.println("Received:"+msg);}}catch(Exceptione){e.printStackTrace();}}publicstaticvoidmain(Stringargs[]){ReceiveMessagerm=newReceiveMessage();rm.receiveMessage();}}运行结果如下: |
|