配色: 字号:
java简单的实现RabbitMQ
2017-01-14 | 阅:  转:  |  分享 
  
java简单的实现RabbitMQ

前言:在这里我将用java来简单的实现rabbitMQ。下面我们带着下面问题来一步步的了解和学习rabbitMQ。



1:如果消费者连接中断,这期间我们应该怎么办



2:如何做到负载均衡



3:如何有效的将数据发送到相关的接收者?就是怎么样过滤



4:如何保证消费者收到完整正确的数据



5:如何让优先级高的接收者先收到数据



一:"HelloRabbitMQ"



下面有一幅图,其中P表示生产者,C表示消费者,红色部分为消息队列







二:项目开始



2.1:首先引入rabbitMQjar包





com.rabbitmq

amqp-client

3.6.5



2.2:创建消费者Producer



复制代码

/

消息生成者

/

publicclassProducer{

publicfinalstaticStringQUEUE_NAME="rabbitMQ.test";



publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{

//创建连接工厂

ConnectionFactoryfactory=newConnectionFactory();

//设置RabbitMQ相关信息

factory.setHost("localhost");

//factory.setUsername("lp");

//factory.setPassword("");

//factory.setPort(2088);

//创建一个新的连接

Connectionconnection=factory.newConnection();

//创建一个通道

Channelchannel=connection.createChannel();

//声明一个队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);

Stringmessage="HelloRabbitMQ";

//发送消息到队列中

channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));

System.out.println("ProducerSend+''"+message+"''");

//关闭通道和连接

channel.close();

connection.close();

}

}

复制代码

注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数



注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体



2.3:创建消费者







复制代码

publicclassCustomer{

privatefinalstaticStringQUEUE_NAME="rabbitMQ.test";



publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{

//创建连接工厂

ConnectionFactoryfactory=newConnectionFactory();

//设置RabbitMQ地址

factory.setHost("localhost");

//创建一个新的连接

Connectionconnection=factory.newConnection();

//创建一个通道

Channelchannel=connection.createChannel();

//声明要关注的队列

channel.queueDeclare(QUEUE_NAME,false,false,true,null);

System.out.println("CustomerWaitingReceivedmessages");

//DefaultConsumer类实现了Consumer接口,通过传入一个频道,

//告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery

Consumerconsumer=newDefaultConsumer(channel){

@Override

publicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,

AMQP.BasicPropertiesproperties,byte[]body)

throwsIOException{

Stringmessage=newString(body,"UTF-8");

System.out.println("CustomerReceived''"+message+"''");

}

};

//自动回复队列应答--RabbitMQ中的消息确认机制

channel.basicConsume(QUEUE_NAME,true,consumer);

}

复制代码

前面代码我们可以看出和生成者一样的,后面的是获取生产者发送的信息,其中envelope主要存放生产者相关信息(比如交换机、路由key等)body是消息实体。



三:实现任务分发



工作队列





一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这里就要采用分布机制了。



我们新创建一个生产者NewTask



复制代码

publicclassNewTask{

privatestaticfinalStringTASK_QUEUE_NAME="task_queue";

publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{

ConnectionFactoryfactory=newConnectionFactory();

factory.setHost("locawww.tt951.comlhost");

Connectionconnection=factory.newConnection();

Channelchannel=connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

//分发信息

for(inti=0;i<10;i++){

Stringmessage="HelloRabbitMQ"+i;

channel.basicPublish("",TASK_QUEUE_NAME,

MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

System.out.println("NewTasksend''"+message+"''");

}

channel.close();

connection.close();

}

}

复制代码

然后创建2个工作者Work1和Work2代码一样



复制代码

publicclassWork1{

privatestaticfinalStringTASK_QUEUE_NAME="task_queue";



publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{

finalConnectionFactoryfactory=newConnectionFactory();

factory.setwww.baiyuewang.netHost("localhost");

Connectionconnection=factory.newConnection();

finalChannelchannel=connection.createChannel();



channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

System.out.println("Worker1Waitingformessages");



//每次从队列获取的数量

channel.basicQos(1);



finalConsumerconsumer=newDefaultConsumer(channel){

@Override

publicvoidhandleDelivery(StringconsumerTag,

Envelopeenvelope,

AMQP.BasicPropertiesproperties,

byte[]body)throwsIOException{

Stringmessage=newString(body,"UTF-8");

System.out.println("Worker1Received''"+message+"''");

try{

thrownewException();

//doWork(message);

}catch(Exceptione){

channel.abort();

}finally{

System.out.println("Worker1Done");

channel.basicAck(envelope.getDeliveryTag(),false);

}

}

};

booleanautoAck=false;

//消息消费完成确认

channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);

}

privatestaticvoiddoWork(Stringtask){

try{

Thread.sleep(1000);//暂停1秒钟

}catch(InterruptedException_ignored){

Thread.currentThread().interrupt();

}

}

}

复制代码

注:channel.basicQos(1);保证一次只分发一个。autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。

献花(0)
+1
(本文系thedust79首藏)