2014年9月11日RabbitMQtutorial-"Helloworld!"
http://adamlu.net/rabbitmq/tutorial-one-python1/5
从哪里获得帮助
如果你在这篇教程中遇到了困难,你可以通过
讨论列表或者直接联系我们
介绍
RabbitMQ是一个消息代理。它的核心原理非常简单:接收和发送
消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员
就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一
个邮箱、邮局、邮递员。
RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存
储和发送二进制的数据——消息。
一般提到RabbitMQ和消息,都用到一些专有名词。
生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用"P"来表示:
队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们能够只存储在一个队列
(queue)中。队列(queue)没有任何限制,你要存储多少消息都可以——基本上是一个无限的缓冲。多
个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能攻从一个队
列(queue)中获取数据。队列可以化城这样(图上是队列的名称):
消费(Consuming)和获取消息是一样的意思。一个消费者(consumer)就是一个等待获取消息的程序。
我们把它画作"C":
HelloWorld!
(使用pika0.9.5Python客户端)
我们的“Helloworld”不会很复杂——仅仅发送一个消息,然后获取它并输出到屏幕。这样以来我们需要两个程序,
一个用作发送消息,另一个接受消息并打印消息内容
我们大体的设计是这样的:
生产者(Producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。
2014年9月11日RabbitMQtutorial-"Helloworld!"
http://adamlu.net/rabbitmq/tutorial-one-python2/5
RabbitMQ库
RabbitMQ使用的是AMQP协议。要使用她你就必须需要一个使用同样协议的库。几乎所有的编程语言都有
可选择的库。python也是一样,可以从以下几个库中选择:
py-amqplib
txAMQP
pika
在这一系列教程中,我们打算使用pika。要安装pika,你可以使用pip这个包管理工具:
$sudopipinstallpika==0.9.5
安装过程依赖于pip和git-core两个包,你需要先安装它们。
Ubuntu平台
$sudoapt-getinstallpython-pipgit-core
Debian平台
$sudoapt-getinstallpython-setuptoolsgit-core
$sudoeasy_installpip
Windows平台运行easy_install的安装程序setuptools即可,安装后运行以下命令
>easy_installpip
>pipinstallpika==0.9.5
发送消息
我们第一个程序send.py会发送一个消息到队列中。首先要做的事情就是建立一个到RabbitMQ服务器的连接。
#!/usr/bin/envpython
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters(
''localhost''))
channel=connection.channel()
现在我们已经连接上服务器了,那么,在发送消息之前我们需要确认队列是存在的。如果我们把消息发送到一个不
存在的队列,RabbitMQ会丢弃这条消息。我门先创建一个名为hello的队列,然后把消息发送到这个队列中。
2014年9月11日RabbitMQtutorial-"Helloworld!"
http://adamlu.net/rabbitmq/tutorial-one-python3/5
channel.queue_declare(queue=''hello'')
这时候我们就可以发送消息了,我们第一条消息只包含了HelloWorld!字符串,我们打算把它发送到我们的hello
队列。
在RabbitMQ中,消息是不能直接发送到队列,它需要发送到交换器(exchange)中。我们不打算在这里深入讨
论它——你可以通过教程的第三部分了解更多。现在我们所需要了解的是如何使用默认的交换器(exchange),
它使用一个空字符串来标识。交换器允许我们指定某条消息需要投递到哪个队列,routing_key参数必须指定为
队列的名称:
channel.basic_publish(exchange='''',
routing_key=''hello'',
body=''HelloWorld!'')
print"[x]Sent''HelloWorld!''"
在退出程序之前,我们需要确认网络缓冲已经被刷写、消息已经投递到RabbitMQ。完成这些事情(正确的关闭连
接)是很简单的。
connection.close()
获取数据
我们的第二个程序receive.py,将会从队列中获取消息并打印消息。
这次我们还是先要连接到RabbitMQ服务器。连接服务器的代码和之前是一样的。
下一步也和之前一样,我们需要确认队列是存在的。使用queue_declare创建一个队列——我们可以运行这个命
令很多次,但是只有一个队列会创建。
channel.queue_declare(queue=''hello'')
你也许要问为什么重复声明了队列——我们已经在前面的代码中声明了它。如果我们确定了队列是已经存在的,那
么我们可以不这么做。比如先运行send.py程序。可是我们并不确定哪个程序先运行,这种情况的话再程序中重复
声明是好的做法。
列出所有队列
你也许希望查看RabbitMQ由哪些队列、有多少消息在队列中。你可以使用rabbitmqctl工具(使用有权限
的用户):
$sudorabbitmqctllist_queues
Listingqueues...
hello0
...done.
2014年9月11日RabbitMQtutorial-"Helloworld!"
http://adamlu.net/rabbitmq/tutorial-one-python4/5
(omitsudoonWindows)
(在Windows中不需要sudo命令)
从队列中获取消息相对来说稍显复杂。需要为队列定义一个回调(callback)函数。当我们获取到消息的时候,
Pika库就会调用这个回调(callback)函数。我们的这个回调函数将会但因消息的内容到屏幕上。
defcallback(ch,method,properties,body):
print"[x]Received%r"%(body,)
下一步,我们需要告诉RabbitMQ这个回调函数将会从hello队列中接收消息:
channel.basic_consume(callback,
queue=''hello'',
no_ack=True)
要成功运行这些命令,我们必须保证队列是存在的,我们已经能够保证——我们之前已经使用创建了一个队
列queue_declare。
no_ack参数稍后会介绍。
最后,我们输入一个无限循环来等待消息数据并确运行回调函数。
print''[]Waitingformessages.ToexitpressCTRL+C''
channel.start_consuming()
整合
send.py的全部代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/envpython
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters(
host=''localhost''))
channel=connection.channel()
channel.queue_declare(queue=''hello'')
channel.basic_publish(exchange='''',
routing_key=''hello'',
body=''HelloWorld!'')
print"[x]Sent''HelloWorld!''"
connection.close()
(send.py源码)
receive.py的全部代码:
1
2
#!/usr/bin/envpython
importpika
2014年9月11日RabbitMQtutorial-"Helloworld!"
http://adamlu.net/rabbitmq/tutorial-one-python5/5
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
connection=pika.BlockingConnection(pika.ConnectionParameters(
host=''localhost''))
channel=connection.channel()
channel.queue_declare(queue=''hello'')
print''[]Waitingformessages.ToexitpressCTRL+C''
defcallback(ch,method,properties,body):
print"[x]Received%r"%(body,)
channel.basic_consume(callback,
queue=''hello'',
no_ack=True)
channel.start_consuming()
(receive.py源码)
现在就可以在终端中运行我们的程序了。首先,用send.py重续发送一条消息:
$pythonsend.py
[x]Sent''HelloWorld!''
生产者(producer)程序send.py每次运行之后就会停止。现在我们就来接收消息:
$pythonreceive.py
[]Waitingformessages.ToexitpressCTRL+C
[x]Received''HelloWorld!''
成功了!我们已经通过RabbitMQ发送第一条消息。你也许已经注意到了,receive.py程序并没有退出。它一直
在准备获取消息,你可以通过Ctrl-C来终端它。
试下在新的终端中再次运行send.py。
我们已经学会如何发送消息到一个已知队列中并接收消息。是时候移步到第二部分了,我们将会建立一个简单的工
作队列(workqueue)。
|
|