2014年9月11日RabbitMQtutorial-Routing
http://adamlu.net/rabbitmq/tutorial-four-python1/4
从哪里获得帮助
如果你在这篇教程中遇到了困难,你可以通过
讨论列表或者直接联系我们
路由(Routing)
(使用pika0.9.5Python客户端)
在前面的教程中,我们实现了一个简单的日志系统。可以把日志
消息广播给多个接收者。
本篇教程中我们打算新增一个功能——使得它能够只订阅消息的
一个字集。例如,我们只需要把严重的错误日志信息写入日志文
件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台
中
绑定(Bindings)
前面的例子,我们已经创建过绑定(bindings),代码如下:
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
绑定(binding)是指交换器(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这
个交换器(exchange)的消息感兴趣。
绑定的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,我们把它叫
做bindingkey。以下是如何创建一个带bindingkey的绑定。
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key=''black'')
bindingkey的含义取决于交换器(exchange)的类型。我们之前使用过的fanout类型会忽略这个值。
Direct类型的交换器(exchange)
我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其可以能够精确的过滤消
息。例如我们也许值是希望当接收到一个严重的错误的时候才把消息写入磁盘,以免浪费磁盘空间。
我们使用的fanout类型的交换器(exchange)扩展性不够——它能做的仅仅是广播。
我们将会使用direct类型的交换器(exchange)来代替。路由的算法很简单——交换器将会对bindingkey
和routingkey进行精确匹配,从而确定消息该分发到哪个队列。
下图能够很好的描述这个场景:
2014年9月11日RabbitMQtutorial-Routing
http://adamlu.net/rabbitmq/tutorial-four-python2/4
在这个场景中,我们可以看到directexchangeX和两个队列绑定了。第一个队列使用orange作为binding
key,第二个队列有两个绑定,一个使用black作为bindingkey,另外一个是green。
这样以来,当routingkey为orange的消息发布到交换器(exchange),就会被路由到队列Q1。routingkey
为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。
多个绑定(Multiplebindings)
多个队列使用相同的bindingkey是合法的。我们的这个例子,我们可以添加一个X和Q1之间的绑定,使
用blackbindingkey。这样一来,direct交换器就和fanout交换器的行为一样,将会广播消息到所有匹配的队
列。带有routingkey为black的消息都会发送到Q1和Q2。
Emittinglogs
我们将会发送消息到一个directexchange,把日志级别作为routingkey。这样子负责处理接收的脚本就可
以选择它要处理的日志级别。我们先看看触发日志。
我们需要创建一个交换器(exchange):
channel.exchange_declare(exchange=''direct_logs'',
type=''direct'')
然后我们发送一则消息:
channel.basic_publish(exchange=''direct_logs'',
routing_key=severity,
body=message)
我们先假设“severity”的值是info、warning、error中的一个。
订阅(Subscribing)
2014年9月11日RabbitMQtutorial-Routing
http://adamlu.net/rabbitmq/tutorial-four-python3/4
处理接收消息的方式和之前差不多,但是我们为每一个日志级别创建了一个新的绑定。
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
forseverityinseverities:
channel.queue_bind(exchange=''direct_logs'',
queue=queue_name,
routing_key=severity)
整合
emit_log_direct.py的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/envpython
importpika
importsys
connection=pika.BlockingConnection(pika.ConnectionParameters(
host=''localhost''))
channel=connection.channel()
channel.exchange_declare(exchange=''direct_logs'',
type=''direct'')
severity=sys.argv[1]iflen(sys.argv)>1else''info''
message=''''.join(sys.argv[2:])or''HelloWorld!''
channel.basic_publish(exchange=''direct_logs'',
routing_key=severity,
body=message)
print"[x]Sent%r:%r"%(severity,message)
connection.close()
receive_logs_direct.py的代码:
1
2
3
4
5
6
7
#!/usr/bin/envpython
importpika
importsys
connection=pika.BlockingConnection(pika.ConnectionParameters(
host=''localhost''))
channel=connection.channel()
2014年9月11日RabbitMQtutorial-Routing
http://adamlu.net/rabbitmq/tutorial-four-python4/4
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
channel.exchange_declare(exchange=''direct_logs'',
type=''direct'')
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
severities=sys.argv[1:]
ifnotseverities:
print>>sys.stderr,"Usage:%s[info][warning][error]"%\
(sys.argv[0],)
sys.exit(1)
forseverityinseverities:
channel.queue_bind(exchange=''direct_logs'',
queue=queue_name,
routing_key=severity)
print''[]Waitingforlogs.ToexitpressCTRL+C''
defcallback(ch,method,properties,body):
print"[x]%r:%r"%(method.routing_key,body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:
$pythonreceive_logs_direct.pywarningerror>logs_from_rabbit.log
如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:
$pythonreceive_logs_direct.pyinfowarningerror
[]Waitingforlogs.ToexitpressCTRL+C
如果要触发一个error级别的日志,只需要输入:
$pythonemit_log_direct.pyerror"Run.Run.Oritwillexplode."
[x]Sent''error'':''Run.Run.Oritwillexplode.''
这里是完整的代码:(emit_log_direct.py和receive_logs_direct.py)
教程5展示了如果通过一个模式来监听消息。
|
|