rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。他遵循Mozilla Public License开源协议。采用 Erlang 实现的工业级的消息队列(MQ)服务器。 RabbitMQ的官方站:http://www./ AMQP当中有四个概念非常重要
一个虚拟主机持有一组交换机、队列和绑定。 为什么需要多个虚拟主机呢?因为RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机 何谓虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding) 队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。不过,也可以将一个队列配置成这样的:一旦消息进入这个队列,此消息就被删除。 队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求。因此我们可以将消息队列的配置写在应用程序的代码里面。 而要把一个消息放进队列前,需要有一个交换机(Exchange)。 交换机(Exchange)可以理解成具有路由表的路由程序。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。(例如,指明具有路由键 “X” 的消息要到名为timbuku的队列当中去。) 消费者程序(Consumer)要负责创建你的交换机。交换机可以存在多个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个 进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机来用5个核,另外3个核留下来做消息处理。类 似的,在RabbitMQ的集群当中,你可以用类似的思路来扩展交换机一边获取更高的吞吐量。 交换机如何判断要把消息送到哪个队列?你需要路由规则,即绑定(binding)。一个绑定就是一个类似这样的规则:将交换机“desert(沙 漠)”当中具有路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由 规则。例如,具有路由键“audit”的消息需要被送到两个队列,“log-forever”和“alert-the-big-dude”。要做到这个, 就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“audit”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两 个队列当中。交换机不过就是一个由绑定构成的路由表。 交换机有多种类型。他们都是做路由的,但是它们接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分 子的CPU开销是不同的。例如,一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将 路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。如果你不需要“topic”类型的交换机带来的灵活性,你可以通过使用 “direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢? Exchange
持久化你花了大量的时间来创建队列、交换机和绑定,然后,服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面但是尚未处理的消息们呢? 如果你是用默认参数构造的这一切的话,那么,他们都灰飞烟灭了。RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切,亡羊补牢,如何避免将来再度发生此类杯具? 队列和交换机有一个创建时候指定的标志durable。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢? 但是首先需要考虑的问题是:是否真的需要消息的持久化?如果需要重启后消息可以回复,那么它需要被写入磁盘。但即使是最简单的磁盘操作也是要消耗时间的。所以需要衡量判断。 当你将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不同,指定这个标志的方法可能不太一样。简单的说,就是将Delivery Mode设置成2,也就是持久的(persistent)即可。一般的AMQP库都是将Delivery Mode设置成1,也就是非持久的。所以要持久化消息的步骤如下:
绑定(Bindings)怎么办?绑定无法在创建的时候设置成durable。没问题,如果你绑定了一个durable的队列和一个durable 的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是durable),依赖它的绑定都会自动删除。 注意:
在Windows上安装Rabbit MQ 指南,最好的是这篇《Rabbit MQ Windows Installation guide》,其中还包括了使用.NET RabbitMQ.Client Nuget 包访问Rabbit MQ的示例代码。 安装Rabbit MQ Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang。通过下面两个连接下载安装3.2.3 版本:
默认安装的Rabbit MQ 监听端口是5672 激活Rabbit MQ's Management Plugin 使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,你可以在命令行中使用下面的命令激活:
下面我们使用rabbitmqctl控制台命令(位于C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>)来创建用户,密码,绑定权限等。 Microsoft Windows [版本 6.3.9600] c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin 的目录 2014/11/01 15:04 <DIR> . c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba 使用浏览器打开 在.NET上使用Rabbit MQ 通过Nuget 获取Rabbit MQ NET client bindings from NuGet:
private readonly ConnectionFactory rabbitMqFactory =
new ConnectionFactory { HostName = "Geffzhang-NB",
UserName="geffzhang", Password ="zsy@2014", VirtualHost ="/" }; 下面对上面代码进行说明: 2. 定义交换方式 ,创建了Direct Exchange和Durable Queue,并使用QueueName作为routing key ,可以把消息直接投递到某个队列。rabbitmq交换方式分为三种,分别是: 运行上述代码,可以在Rabbit MQ的管理控制台上看到test.exchange Exchange 绑定到 创建的队列 test.queue 第二步就是发布持久化消息到队列 Exchange和Queue建立好以后,就可以发送消息到队列了。RabbitMq 可以接受byte[]的数据,字符串采用utf-8编码的字节数组。确保消息可持久化的,需要设置PersistMode为true,参看下面的代码: var props = channel.CreateBasicProperties(); BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true); var msgBody = Encoding.UTF8.GetString(msgResponse.Body); NoAck:true 告诉RabbitMQ立即从队列中删除消息,另一个非常受欢迎的方式是从队列中删除已经确认接收的消息,可以通过单独调用BasicAck 进行确认: BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false); 另一种方法是通过基于推送的事件订阅。您可以使用内置的 QueueingBasicConsumer 提供简化的编程模型,通过允许您在共享队列上阻塞,直到收到一条消息,例如 var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(QueueName, noAck: true, consumer: consumer); var msgResponse = consumer.Queue.Dequeue(); //blocking var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
|
|
来自: land_zhj > 《rabbitmq》