分享

RabbitMQ 入门指南(Java)

 时间要去哪 2014-06-03

 RabbitMQ is a popular message broker typically used for building integration between applications or different components of the same application using messages. This post is a very basic introduction on how to get started using RabbitMQ and assumes you already have setup the rabbitmq server.

RabbitMQ is written in Erlang and has drivers/clients available for most major languages. We are using Java for this post therefore we will first get hold of the java client. The maven dependency for the java client is given below.

1<dependency>
2        <groupId>com.rabbitmq</groupId>
3        <artifactId>amqp-client</artifactId>
4        <version>3.0.4</version>
5</dependency>


译者信息

译者信息

LinuxQueen
LinuxQueen
翻译于 11个月前

0 此译文

RabbitMQ是一个受欢迎的消息代理,通常用于应用程序之间或者程序的不同组件之间通过消息来进行集成。本文简单介绍了如何使用 RabbitMQ,假定你已经配置好了rabbitmq服务器。

RabbitMQ是用Erlang,对于主要的编程语言都有驱动或者客户端。我们这里要用的是Java,所以先要获得Java客户端。。下面是Java客户端的maven依赖的配置。

1<dependency>
2        <groupId>com.rabbitmq</groupId>
3        <artifactId>amqp-client</artifactId>
4        <version>3.0.4</version>
5</dependency>
While message brokers such as RabbitMQ can be used to model a variety of schemes such as one to one message delivery or publisher/subscriber, our application will  be simple enough and have two basic components, a single producer, that will produce a message and a single consumer that will consume that message.


In our example, the producer will produce a large number of messages, each message carrying a sequence number while the consumer will consume the messages in a separate thread.

译者信息

译者信息

LinuxQueen
LinuxQueen
翻译于 11个月前

0 此译文

其它翻译版本:1(点击译者名切换)
Andy

像RabbitMQ这样的消息代理可用来模拟不同的场景,例如点对点的消息分发或者订阅/推送。我们的程序足够简单,有两个基本的组件,一个生产者用于产生消息,还有一个消费者用来使用产生的消息。

在这个例子里,生产者会产生大量的消息,每个消息带有一个序列号,另一个线程中的消费者会使用这些消息。

译者信息

译者信息

Andy
Andy
翻译于 11个月前

0 此译文

其它翻译版本:1(点击译者名切换)
LinuxQueen

RabbitMQ之类的消息中间件可以有很多应用模式,例如点对点的消息传送,发布者-订阅者模式等等。我们的程序非常简单,就是两个模块,一个是生产者,产生消息,一个是订阅者,消费消息。

在下面的程序中,生产者将产生大量的消息,每个消息有一个序列号,消费者将有一个单独的线程读取这些消息。


The EndPoint Abstract class:

Let’s first write a class that generalizes both producers and consumers as ‘endpoints’ of a queue. Whether you are a producer or a consumer, the code to connect to a queue remains the same therefore we can generalize it in this class.

01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04 
05import com.rabbitmq.client.Channel;
06import com.rabbitmq.client.Connection;
07import com.rabbitmq.client.ConnectionFactory;
08 
09/**
10 * Represents a connection with a queue
11 * @author syntx
12 *
13 */
14public abstract class EndPoint{
15     
16    protected Channel channel;
17    protected Connection connection;
18    protected String endPointName;
19     
20    public EndPoint(String endpointName) throws IOException{
21         this.endPointName = endpointName;
22         
23         //Create a connection factory
24         ConnectionFactory factory = new ConnectionFactory();
25         
26         //hostname of your rabbitmq server
27         factory.setHost("localhost");
28         
29         //getting a connection
30         connection = factory.newConnection();
31         
32         //creating a channel
33         channel = connection.createChannel();
34         
35         //declaring a queue for this channel. If queue does not exist,
36         //it will be created on the server.
37         channel.queueDeclare(endpointName, false, false, false, null);
38    }
39     
40     
41    /**
42     * Close channel and connection. Not necessary as it happens implicitly any way.
43     * @throws IOException
44     */
45     public void close() throws IOException{
46         this.channel.close();
47         this.connection.close();
48     }
49}
译者信息

译者信息

LinuxQueen
LinuxQueen
翻译于 11个月前

0 此译文

抽象类EndPoint:

我们首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者, 连接队列的代码都是一样的,这样可以通用一些。

01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04 
05import com.rabbitmq.client.Channel;
06import com.rabbitmq.client.Connection;
07import com.rabbitmq.client.ConnectionFactory;
08 
09/**
10 * Represents a connection with a queue
11 * @author syntx
12 *
13 */
14public abstract class EndPoint{
15     
16    protected Channel channel;
17    protected Connection connection;
18    protected String endPointName;
19     
20    public EndPoint(String endpointName) throws IOException{
21         this.endPointName = endpointName;
22         
23         //Create a connection factory
24         ConnectionFactory factory = new ConnectionFactory();
25         
26         //hostname of your rabbitmq server
27         factory.setHost("localhost");
28         
29         //getting a connection
30         connection = factory.newConnection();
31         
32         //creating a channel
33         channel = connection.createChannel();
34         
35         //declaring a queue for this channel. If queue does not exist,
36         //it will be created on the server.
37         channel.queueDeclare(endpointName, false, false, false, null);
38    }
39     
40     
41    /**
42     * 关闭channel和connection。并非必须,因为隐含是自动调用的。
43     * @throws IOException
44     */
45     public void close() throws IOException{
46         this.channel.close();
47         this.connection.close();
48     }
49}

The Producer:

The producer class is what is responsible for writing a message onto a queue. We are using Apache Commons Lang to convert a Serializable java object to a byte array. The maven dependency for commons lang is

<dependency>
	<groupId>commons-lang</groupId>
	<artifactId>commons-lang</artifactId>
	<version>2.6</version>
</dependency>
01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04import java.io.Serializable;
05 
06import org.apache.commons.lang.SerializationUtils;
07 
08 
09/**
10 * The producer endpoint that writes to the queue.
11 * @author syntx
12 *
13 */
14public class Producer extends EndPoint{
15     
16    public Producer(String endPointName) throws IOException{
17        super(endPointName);
18    }
19 
20    public void sendMessage(Serializable object) throws IOException {
21        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
22    }  
23}
译者信息

译者信息

LinuxQueen
LinuxQueen
翻译于 11个月前

0 此译文

生产者:

生产者类的任务是向队列里写一条消息。我们使用Apache Commons Lang把可序列化的Java对象转换成 byte 数组。commons lang的maven依赖如下:

<dependency>
	<groupId>commons-lang</groupId>
	<artifactId>commons-lang</artifactId>
	<version>2.6</version>
</dependency>
01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04import java.io.Serializable;
05 
06import org.apache.commons.lang.SerializationUtils;
07 
08 
09/**
10 * The producer endpoint that writes to the queue.
11 * @author syntx
12 *
13 */
14public class Producer extends EndPoint{
15     
16    public Producer(String endPointName) throws IOException{
17        super(endPointName);
18    }
19 
20    public void sendMessage(Serializable object) throws IOException {
21        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
22    }  
23}

The Consumer:

The consumer, which can be run as a thread, has callback functions for various events, most important of which is the availability of a new message.

01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04import java.util.HashMap;
05import java.util.Map;
06 
07import org.apache.commons.lang.SerializationUtils;
08 
09import com.rabbitmq.client.AMQP.BasicProperties;
10import com.rabbitmq.client.Consumer;
11import com.rabbitmq.client.Envelope;
12import com.rabbitmq.client.ShutdownSignalException;
13 
14 
15/**
16 * The endpoint that consumes messages off of the queue. Happens to be runnable.
17 * @author syntx
18 *
19 */
20public class QueueConsumer extends EndPoint implements Runnable, Consumer{
21     
22    public QueueConsumer(String endPointName) throws IOException{
23        super(endPointName);       
24    }
25     
26    public void run() {
27        try {
28            //start consuming messages. Auto acknowledge messages.
29            channel.basicConsume(endPointName, true,this);
30        } catch (IOException e) {
31            e.printStackTrace();
32        }
33    }
34 
35    /**
36     * Called when consumer is registered.
37     */
38    public void handleConsumeOk(String consumerTag) {
39        System.out.println("Consumer "+consumerTag +" registered");    
40    }
41 
42    /**
43     * Called when new message is available.
44     */
45    public void handleDelivery(String consumerTag, Envelope env,
46            BasicProperties props, byte[] body) throws IOException {
47        Map map = (HashMap)SerializationUtils.deserialize(body);
48        System.out.println("Message Number "+ map.get("message number") + " received.");
49         
50    }
51 
52    public void handleCancel(String consumerTag) {}
53    public void handleCancelOk(String consumerTag) {}
54    public void handleRecoverOk(String consumerTag) {}
55    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
56}
译者信息

译者信息

LinuxQueen
LinuxQueen
翻译于 11个月前

0 此译文

消费者:

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04import java.util.HashMap;
05import java.util.Map;
06 
07import org.apache.commons.lang.SerializationUtils;
08 
09import com.rabbitmq.client.AMQP.BasicProperties;
10import com.rabbitmq.client.Consumer;
11import com.rabbitmq.client.Envelope;
12import com.rabbitmq.client.ShutdownSignalException;
13 
14 
15/**
16 * 读取队列的程序端,实现了Runnable接口。
17 * @author syntx
18 *
19 */
20public class QueueConsumer extends EndPoint implements Runnable, Consumer{
21     
22    public QueueConsumer(String endPointName) throws IOException{
23        super(endPointName);       
24    }
25     
26    public void run() {
27        try {
28            //start consuming messages. Auto acknowledge messages.
29            channel.basicConsume(endPointName, true,this);
30        } catch (IOException e) {
31            e.printStackTrace();
32        }
33    }
34 
35    /**
36     * Called when consumer is registered.
37     */
38    public void handleConsumeOk(String consumerTag) {
39        System.out.println("Consumer "+consumerTag +" registered");    
40    }
41 
42    /**
43     * Called when new message is available.
44     */
45    public void handleDelivery(String consumerTag, Envelope env,
46            BasicProperties props, byte[] body) throws IOException {
47        Map map = (HashMap)SerializationUtils.deserialize(body);
48        System.out.println("Message Number "+ map.get("message number") + " received.");
49         
50    }
51 
52    public void handleCancel(String consumerTag) {}
53    public void handleCancelOk(String consumerTag) {}
54    public void handleRecoverOk(String consumerTag) {}
55    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
56}

Putting it together:

In our driver class, we start a consumer thread and then proceed to generate a large number of messages that will be consumed by the consumer.

01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04import java.sql.SQLException;
05import java.util.HashMap;
06 
07public class Main {
08    public Main() throws Exception{
09         
10        QueueConsumer consumer = new QueueConsumer("queue");
11        Thread consumerThread = new Thread(consumer);
12        consumerThread.start();
13         
14        Producer producer = new Producer("queue");
15         
16        for (int i = 0; i < 100000; i++) {
17            HashMap message = new HashMap();
18            message.put("message number", i);
19            producer.sendMessage(message);
20            System.out.println("Message Number "+ i +" sent.");
21        }
22    }
23     
24    /**
25     * @param args
26     * @throws SQLException
27     * @throws IOException
28     */
29    public static void main(String[] args) throws Exception{
30      new Main();
31    }
32}
译者信息

译者信息

LinuxQueen
LinuxQueen
翻译于 11个月前

0 此译文

Putting it together:

在下面的测试类中,先运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走。

01package co.syntx.examples.rabbitmq;
02 
03import java.io.IOException;
04import java.sql.SQLException;
05import java.util.HashMap;
06 
07public class Main {
08    public Main() throws Exception{
09         
10        QueueConsumer consumer = new QueueConsumer("queue");
11        Thread consumerThread = new Thread(consumer);
12        consumerThread.start();
13         
14        Producer producer = new Producer("queue");
15         
16        for (int i = 0; i < 100000; i++) {
17            HashMap message = new HashMap();
18            message.put("message number", i);
19            producer.sendMessage(message);
20            System.out.println("Message Number "+ i +" sent.");
21        }
22    }
23     
24    /**
25     * @param args
26     * @throws SQLException
27     * @throws IOException
28     */
29    public static void main(String[] args) throws Exception{
30      new Main();
31    }
32}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多