分享

JMS应用的简单例子

 我不认识你 2008-02-26

JMS应用的简单例子

September 16, 2006 – 9:54 pm No Tags

J2EE应用客户端通常是用来访问安装在服务器端的J2EE组件。客户端是一个类文件,是一个简单的、独立的、运行在服务器外的程序。它描述了JMS应用必须完成的基本工作:

     

  • 创建连接和会话
  • 创建消息生产者和消费者
  • 发送和接收消息

在J2EE应用中,以上工作全部或者部分由EJB容器完成。

本文涵盖以下主题:

每一个例子都包含两个程序:一个发送消息,另一个接收消息。可以在两个窗口中运行。

1  配置运行应用的环境

在运行例子前,必须确定运行环境已经配置好了。Table 1显示如何配置环境变量。Before you can run the examples, you need to make sure your environment is set appropriately. Table 1 shows how to set the environment variables needed to run J2EE applications on Windows and UNIX platforms.

Platform
Variable Name
Values
Windows
%JAVA_HOME%
Directory where the JavaTM 2 SDK, Standard Edition, version 1.3.1 is installed
%J2EE_HOME%
Directory where the J2EE 1.3 SDK is installed, usually C:\j2sdkee1.3
%CLASSPATH%
Include the following:
.;%J2EE_HOME%\lib\j2ee.jar;%J2EE_HOME%\lib\locale
%PATH%
Include %J2EE_HOME%\bin
UNIX
$JAVA_HOME
Directory where the Java 2 SDK, Standard Edition, version 1.3.1 is installed
$J2EE_HOME
Directory where the J2EE 1.3 SDK is installed, usually $HOME/j2sdkee1.3
$CLASSPATH
Include the following:
.:$J2EE_HOME/lib/j2ee.jar:$J2EE_HOME/lib/locale
$PATH
Include $J2EE_HOME/bin

2  一个简单的PTP例子

本节描述了PTP客户端程序怎样发送、接收消息

步骤:

2.1  编写 PTP客户端程序

消息发送程序SimpleQueueSender.java完成以下任务:

     

  1. 用JNDI查找队列连接工厂(QueueConnectionFactory) 和消息队列(Queue)
  2. 创建连接(connection)和会话(session)
  3. 创建消息发送者(QueueSender)
  4. 创建消息(TextMessage)
  5. 发送消息到队列
  6. 发送控制消息表明消息末尾

     

  1. 在finally代码块中关闭连接(connection),关闭连接则自动关闭会话和消息发送

消息接收程序SimpleQueueReceiver.java 完成以下任务:

     

  1. 通过JNDI查找队列连接工厂(QueueConnectionFactory )和队列(queue)
  2. 创建连接(connection )和会话(session)
  3. 创建消息接收者(QueueReceiver
  4. 开始连接,传送消息
  5. 从队列中接收消息,直至消息接受完毕
  6. 在finally代码块中关闭连接,关闭连接则自动关闭消息接收

有几种方式调用receive方法实现消息同步接收。如果没有定义参数或者参数为0,方法将一直处于封锁状态,直至消息到来

Message m = queueReceiver.receive();

Message m = queueReceiver.receive(0);

对于一个简单的客户端程序,完全没有必要用这种方式。但如果不想让程序不必要的消耗系统资源,可以采取以下的一种方式:

     

  • 调用receive(long timeout)方法,超时参数timeout大于0。receive(long timeout)根据指定的超时参数等待一个消息的到来,如果在这个时间内有可用的消息,则返回消息。如果超时后任没有可用的消息,则返回NULL

    Message m = queueReceiver.receive(1); // 1 millisecond

  • 调用reveiveNoWait()方法,如果有可用的消息到达,reveiveNoWait()方法将返回这个消息。

Message m = queueReceiver.receiveNoWait();
SimpleQueueReceiver 在无限循环 while loop中调用带超时参数的receive方法接收消息。调用receiveNoWait 方法也将得到同样的结果。 The SimpleQueueReceiver program uses an indefinite while loop to receive messages, calling receive with a timeout argument. Calling receiveNoWait would have the same effect.

2.2  编译PTP客户端

按以下步骤编译PTP样例:

     

  1. 确定配置好环境变量,参见 Table 4.1, “Environment Settings for Compiling and Running J2EE Applications”.
  2. 在DOS中编译以下两个源文件:

javac SimpleQueueSender.java
javac SimpleQueueReceiver.java

2.3  运行JMS服务

如果使用J2EE 1.3 SDK,在DOS窗口中输入以下命令行,运行J2EE服务器:
j2ee -verbose

一直等待,直至窗口中出现提示message J2EE server startup complete

2.4  创建JMS管理对象Creating the JMS Administered Objects

在编译客户端的窗口中,使用j2eeadmin命令行创建一个名为MyQueue的队列。最后一个参数表示创建的是哪一种消息目的。

j2eeadmin -addJmsDestination MyQueue queue
当消息队列创建之后,输入以下命令行:

j2eeadmin -listJmsDestination
本例中使用由J2EE 1.3 SDK 提供的、缺省的队列连接工厂QueueConnectionFactory 。你可以创建自己的连接工厂。

2.5  运行PTP客户端

步骤:

     

  1. 运行SimpleQueueSender 程序,发送消息,首先要设置jms.properties的值

    在windows系统中,输入以下命令行:

    java -Djms.properties=%J2EE_HOME%configjms_client.properties SimpleQueueSender
                    MyQueue 3

    在UNIX系统下,输入以下命令行:
    java -Djms.properties=$J2EE_HOME/config/jms_client.properties SimpleQueueSender MyQueue 3
    程序输出如下:

    Queue name is MyQueue
                    Sending message: This is message 1
                    Sending message: This is message 2
                    Sending message: This is message 3

  2. 在同一个窗口中,运行SimpleQueueReceiver 程序,指定队列名称??命令行如下:

    Windows:

    java -Djms.properties=%J2EE_HOME%configjms_client.properties
                    SimpleQueueReceiver MyQueue

    UNIX:

    java -Djms.properties=$J2EE_HOME/config/jms_client.properties
                    SimpleQueueReceiver MyQueue

    输出如下:

    Queue name is MyQueue
                    Reading message: This is message 1
                    Reading message: This is message 2
                    Reading message: This is message 3

  3. 如果按相反的顺序运行程序,先运行SimpleQueueReceiver ,则先显示出队列名称,然后等待消息的到来。Now try running the programs in the opposite order. Start the SimpleQueueReceiver program. It displays the queue name and then appears to hang, waiting for messages.
  4. 在不同的窗口中,运行SimpleQueueSender ,当发送消息,SimpleQueueReceiver 接收消息,然后退出。

2.6  删除队列Deleting the Queue

删除创建的队列:

j2eeadmin -removeJmsDestination MyQueue

3  一个简单的发布/订阅消息样例

本节描述了使用消息监听器异步消费消息的例子。

步骤:

3.1 编写Pub/Sub客户端程序

SimpleTopicPublisher.java 完成以下任务:

     

  1. 用JNDI查找主题连接工厂(TopicConnectionFactory 和消息主题(topic)
  2. 创建连接( connection)和会话( session)
  3. 创建消息发布者(TopicPublisher)
  4. 创建消息(TextMessage)
  5. 发送消息到队列
  6. 发布消息给主题
  7. 在finally代码块中关闭连接( connection),关闭连接则自动关闭会话和消息发送

SimpleTopicSubscriber.java完成以下任务:

     

  1. 通过JNDI查找主题连接工厂(TopicConnectionFactory )和主题(topic)
  2. 创建连接( connection )和会话( session)
  3. 创建消息订阅者(TopicSubscriber

  4. 创建类TextListener 实例,注册消息监听器
  5. 开始连接,传送消息
  6. 监听消息主题,当用户输入’q'或者’Q'时停止监听
  7. 在finally代码块中关闭连接,关闭连接则自动关闭消息接收

消息监听器TextListener.java完成的任务如下:

     

  1. 当消息到达时,自动调用onMessage方法
  2. onMessage方法将到达的消息转换成TextMessage类型,并显示消息内容

3.2  编译Pub/Sub客户端Compiling the Pub/Sub Clients

     

  1. 按以下步骤编译pub/sub样例:
    1. 确定配置好环境变量,参见 Table 4.1, “Environment Settings for Compiling and Running J2EE Applications”.
    2. 在DOS中编译以下源文件和监听器:

    javac SimpleTopicPublisher.java
                    javac SimpleTopicSubscriber.java
                    javac TextListener.java

3.3  运行JMS服务器Starting the JMS Provider

如果使用J2EE 1.3 SDK,在DOS窗口中输入以下命令行,运行J2EE服务器:
j2ee -verbose

一直等待,直至窗口中出现提示 message J2EE server startup complete

3.4  创建JMS管理对象

在编译客户端的窗口中,使用j2eeadmin命令行创建一个名为MyTopic的主题。最后一个参数表示创建的是哪一种消息目的。

j2eeadmin -addJmsDestination MyTopic topic

当消息队列创建之后,输入以下命令行:

j2eeadmin -listJmsDestination

本例中使用由J2EE 1.3 SDK 提供的、缺省的队列连接工厂TopicConnectionFactory 。你可以创建自己的连接工厂。

3.5  运行pub/sub客户端

步骤:

     

  1. 运行SimpleTopicSubscriber ,定义主题名称,设置jms.properties的值。

    在windows系统下,输入以下命令:

    java -Djms.properties=%J2EE_HOME%\config\jms_client.properties
    SimpleTopicSubscriber MyTopic
    在UNIX系统下,输入以下命令:
    java -Djms.properties=$J2EE_HOME/config/jms_client.properties
    SimpleTopicSubscriber MyTopic
    显示下列信息,然后等待:
    Topic name is MyTopic
    To end program, enter Q or q, then

  2. 在另一个DOS窗口,运行SimpleTopicPublisher ,发布消息,命令行如下:

    Windows:

    java -Djms.properties=%J2EE_HOME%configjms_client.properties
                    SimpleTopicPublisher MyTopic 3

    UNIX:

    java -Djms.properties=$J2EE_HOME/config/jms_client.properties
                    SimpleTopicPublisher MyTopic 3

    输出信息如下:
    Topic name is MyTopic
    Publishing message: This is message 1
    Publishing message: This is message 2
    Publishing message: This is message 3
    在另一个窗口,显示信息如下:
    Reading message: This is message 1
    Reading message: This is message 2
    Reading message: This is message 3
    输入Q或者q退出程序

3.6  删除主题,停止服务

删除创建的消息主题:

j2eeadmin -removeJmsDestination MyTopic

停止服务:

j2ee -stopSimpleQueueSender.javaSimpleQueueSender.java

SimpleQueueSender.java

/*
*
* Copyright 2001 Sun Microsystems, Inc. All Rights Reserved.
*
* This software is the proprietary information of Sun
* Microsystems, Inc.  Use is subject to license terms.
*
*/
/**
* The SimpleQueueSender class consists only of a main method,
* which sends several messages to a queue.
*
* Run this program in conjunction with SimpleQueueReceiver.
* Specify a queue name on the command line when you run the
* program.  By default, the program sends one message.  Specify
* a number after the queue name to send that number of messages.
*/
import javax.jms.*;
import javax.naming.*;

public class SimpleQueueSender {

/**
* Main method.
*
* @param args     the queue used by the example and,
*                 optionally, the number of messages to send
*/
public static void main(String[] args) {
String                  queueName = null;
Context                 jndiContext = null;
QueueConnectionFactory  queueConnectionFactory = null;
QueueConnection         queueConnection = null;
QueueSession            queueSession = null;
Queue                   queue = null;
QueueSender             queueSender = null;
TextMessage             message = null;
final int               NUM_MSGS;

if ( (args.length < 1) || (args.length > 2) ) {
System.out.println(”Usage: java SimpleQueueSender ” +
[]”);
System.exit(1);
}
queueName = new String(args[0]);
System.out.println(”Queue name is ” + queueName);
if (args.length == 2){
NUM_MSGS = (new Integer(args[1])).intValue();
} else {
NUM_MSGS = 1;
}

/*
* Create a JNDI InitialContext object if none exists
* yet.
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println(”Could not create JNDI ” +
“context: ” + e.toString());
System.exit(1);
}

/*
* Look up connection factory and queue.  If either does
* not exist, exit.
*/
try {
queueConnectionFactory = (QueueConnectionFactory)
jndiContext.lookup(”QueueConnectionFactory”);
queue = (Queue) jndiContext.lookup(queueName);
} catch (NamingException e) {
System.out.println(”JNDI lookup failed: ” +
e.toString());
System.exit(1);
}

/*
* Create connection.
* Create session from connection; false means session is
* not transacted.
* Create sender and text message.
* Send messages, varying text slightly.
* Send end-of-messages message.
* Finally, close connection.
*/
try {
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession =
queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queueSender = queueSession.createSender(queue);
message = queueSession.createTextMessage();
for (int i = 0; i < NUM_MSGS; i++) {
message.setText("This is message " + (i + 1));
System.out.println("Sending message: " +
message.getText());
queueSender.send(message);
}

/*
* Send a non-text control message indicating end of
* messages.
*/
queueSender.send(queueSession.createMessage());
} catch (JMSException e) {
System.out.println("Exception occurred: " +
e.toString());
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {}
}
}
}
}

SimpleQueueReceive.java

/*
*
* Copyright 2001 Sun Microsystems, Inc. All Rights Reserved.
*
* This software is the proprietary information of Sun
* Microsystems, Inc.  Use is subject to license terms.
*
*/
/**
* The SimpleQueueReceiver class consists only of a main method,
* which fetches one or more messages from a queue using
* synchronous message delivery.  Run this program in conjunction
* with SimpleQueueSender.  Specify a queue name on the command
* line when you run the program.
*/
import javax.jms.*;
import javax.naming.*;

public class SimpleQueueReceiver {

/**
* Main method.
*
* @param args     the queue used by the example
*/
public static void main(String[] args) {
String                  queueName = null;
Context                 jndiContext = null;
QueueConnectionFactory  queueConnectionFactory = null;
QueueConnection         queueConnection = null;
QueueSession            queueSession = null;
Queue                   queue = null;
QueueReceiver           queueReceiver = null;
TextMessage             message = null;

/*
* Read queue name from command line and display it.
*/
if (args.length != 1) {
System.out.println(”Usage: java ” +
“SimpleQueueReceiver “);
System.exit(1);
}
queueName = new String(args[0]);
System.out.println(”Queue name is ” + queueName);

/*
* Create a JNDI InitialContext object if none exists
* yet.
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println(”Could not create JNDI ” +
“context: ” + e.toString());
System.exit(1);
}

/*
* Look up connection factory and queue.  If either does
* not exist, exit.
*/
try {
queueConnectionFactory = (QueueConnectionFactory)
jndiContext.lookup(”QueueConnectionFactory”);
queue = (Queue) jndiContext.lookup(queueName);
} catch (NamingException e) {
System.out.println(”JNDI lookup failed: ” +
e.toString());
System.exit(1);
}

/*
* Create connection.
* Create session from connection; false means session is
* not transacted.
* Create receiver, then start message delivery.
* Receive all text messages from queue until
* a non-text message is received indicating end of
* message stream.
* Close connection.
*/
try {
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession =
queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
while (true) {
Message m = queueReceiver.receive(1);
if (m != null) {
if (m instanceof TextMessage) {
message = (TextMessage) m;
System.out.println(”Reading message: ” +
message.getText());
} else {
break;
}
}
}
} catch (JMSException e) {
System.out.println(”Exception occurred: ” +
e.toString());
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {}
}
}
}
}

SimpleTopicPublisher.java

/*
*
* Copyright 2001 Sun Microsystems, Inc. All Rights Reserved.
*
* This software is the proprietary information of Sun
* Microsystems, Inc.  Use is subject to license terms.
*
*/
/**
* The SimpleTopicPublisher class consists only of a main method,
* which publishes several messages to a topic.
*
* Run this program in conjunction with SimpleTopicSubscriber.
* Specify a topic name on the command line when you run the
* program.  By default, the program sends one message.
* Specify a number after the topic name to send that number
* of messages.
*/
import javax.jms.*;
import javax.naming.*;

public class SimpleTopicPublisher {

/**
* Main method.
*
* @param args     the topic used by the example and,
*                 optionally, the number of messages to send
*/
public static void main(String[] args) {
String                  topicName = null;
Context                 jndiContext = null;
TopicConnectionFactory  topicConnectionFactory = null;
TopicConnection         topicConnection = null;
TopicSession            topicSession = null;
Topic                   topic = null;
TopicPublisher          topicPublisher = null;
TextMessage             message = null;
final int               NUM_MSGS;

if ( (args.length < 1) || (args.length > 2) ) {
System.out.println(”Usage: java ” +
“SimpleTopicPublisher ” +
“[]”);
System.exit(1);
}
topicName = new String(args[0]);
System.out.println(”Topic name is ” + topicName);
if (args.length == 2){
NUM_MSGS = (new Integer(args[1])).intValue();
} else {
NUM_MSGS = 1;
}

/*
* Create a JNDI InitialContext object if none exists
* yet.
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println(”Could not create JNDI ” +
“context: ” + e.toString());
e.printStackTrace();
System.exit(1);
}

/*
* Look up connection factory and topic.  If either does
* not exist, exit.
*/
try {
topicConnectionFactory = (TopicConnectionFactory)
jndiContext.lookup(”TopicConnectionFactory”);
topic = (Topic) jndiContext.lookup(topicName);
} catch (NamingException e) {
System.out.println(”JNDI lookup failed: ” +
e.toString());
e.printStackTrace();
System.exit(1);
}

/*
* Create connection.
* Create session from connection; false means session is
* not transacted.
* Create publisher and text message.
* Send messages, varying text slightly.
* Finally, close connection.
*/
try {
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createTextMessage();
for (int i = 0; i < NUM_MSGS; i++) {
message.setText("This is message " + (i + 1));
System.out.println("Publishing message: " +
message.getText());
topicPublisher.publish(message);
}
} catch (JMSException e) {
System.out.println("Exception occurred: " +
e.toString());
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {}
}
}
}
}

SimpleTopicSubscriber.java

/*
*
* Copyright 2001 Sun Microsystems, Inc. All Rights Reserved.
*
* This software is the proprietary information of Sun
* Microsystems, Inc.  Use is subject to license terms.
*
*/
/**
* The SimpleTopicSubscriber class consists only of a main
* method, which receives one or more messages from a topic using
* asynchronous message delivery.  It uses the message listener
* TextListener.  Run this program in conjunction with
* SimpleTopicPublisher.
*
* Specify a topic name on the command line when you run the
* program.
*
* To end the program, enter Q or q on the command line.
*/
import javax.jms.*;
import javax.naming.*;
import java.io.*;

public class SimpleTopicSubscriber {

/**
* Main method.
*
* @param args     the topic used by the example
*/
public static void main(String[] args) {
String                  topicName = null;
Context                 jndiContext = null;
TopicConnectionFactory  topicConnectionFactory = null;
TopicConnection         topicConnection = null;
TopicSession            topicSession = null;
Topic                   topic = null;
TopicSubscriber         topicSubscriber = null;
TextListener            topicListener = null;
TextMessage             message = null;
InputStreamReader       inputStreamReader = null;
char                    answer = ‘\0′;

/*
* Read topic name from command line and display it.
*/
if (args.length != 1) {
System.out.println(”Usage: java ” +
“SimpleTopicSubscriber “);
System.exit(1);
}
topicName = new String(args[0]);
System.out.println(”Topic name is ” + topicName);

/*
* Create a JNDI InitialContext object if none exists
* yet.
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println(”Could not create JNDI ” +
“context: ” + e.toString());
e.printStackTrace();
System.exit(1);
}

/*
* Look up connection factory and topic.  If either does
* not exist, exit.
*/
try {
topicConnectionFactory = (TopicConnectionFactory)
jndiContext.lookup(”TopicConnectionFactory”);
topic = (Topic) jndiContext.lookup(topicName);
} catch (NamingException e) {
System.out.println(”JNDI lookup failed: ” +
e.toString());
e.printStackTrace();
System.exit(1);
}

/*
* Create connection.
* Create session from connection; false means session is
* not transacted.
* Create subscriber.
* Register message listener (TextListener).
* Receive text messages from topic.
* When all messages have been received, enter Q to quit.
* Close connection.
*/
try {
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topicSubscriber =
topicSession.createSubscriber(topic);
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
System.out.println(”To end program, enter Q or q, ” +
“then “);
inputStreamReader = new InputStreamReader(System.in);
while (!((answer == ‘q’) || (answer == ‘Q’))) {
try {
answer = (char) inputStreamReader.read();
} catch (IOException e) {
System.out.println(”I/O exception: ”
+ e.toString());
}
}
} catch (JMSException e) {
System.out.println(”Exception occurred: ” +
e.toString());
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {}
}
}
}
}

TextListener.java

/*
*
* Copyright 2001 Sun Microsystems, Inc. All Rights Reserved.
*
* This software is the proprietary information of Sun
* Microsystems, Inc.  Use is subject to license terms.
*
*/
/**
* The TextListener class implements the MessageListener
* interface by defining an onMessage method that displays
* the contents of a TextMessage.
*
* This class acts as the listener for the SimpleTopicSubscriber
* class.
*/
import javax.jms.*;

public class TextListener implements MessageListener {

/**
* Casts the message to a TextMessage and displays its text.
*
* @param message     the incoming message
*/
public void onMessage(Message message) {
TextMessage msg = null;

try {
if (message instanceof TextMessage) {
msg = (TextMessage) message;
System.out.println(”Reading message: ” +
msg.getText());
} else {
System.out.println(”Message of wrong type: ” +
message.getClass().getName());
}
} catch (JMSException e) {
System.out.println(”JMSException in onMessage(): ” +
e.toString());
} catch (Throwable t) {
System.out.println(”Exception in onMessage():” +
t.getMessage());
}
}
}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多