在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack。那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理。 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care。他只是将自己的状态publish给上层,由上层的逻辑去处理。如果Message没有被正确处理,可能会导致某些状态丢失。但是由于提供了其他强制刷新全部状态的机制,因此这种异常情况的影响也就可以忽略不计了。 对于某些异步操作,比如客户端需要创建一个FileSystem,这个可能需要比较长的时间,甚至要数秒钟。这时候通过RPC可以解决这个问题。因此也就不存在Publisher端的确认机制了。 那么,有没有一种机制能保证Publisher能够感知它的Message有没有被处理的?答案肯定的。在这里感谢笑天居士同学:他在我的《RabbitMQ消息队列(三):任务分发机制》文后留言一起讨论了问题,而且也查找了一些资料。在这里我整理了一下他转载和一篇文章和原创的一篇文章。衔接已经附后。
1. 事务机制 VS Publisher Confirm如果采用标准的 AMQP 协议,则唯一能够保证消息不会丢失的方式是利用事务机制 — 令 channel 处于 transactional 模式、向其 publish 消息、执行 commit 动作。在这种方式下,事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。 为了使能 confirm 机制,client 首先要发送 confirm.select 方法帧。取决于是否设置了 no-wait 属性,broker 会相应的判定是否以 confirm.select-ok 进行应答。一旦在 channel 上使用 confirm.select方法,channel 就将处于 confirm 模式。处于 transactional 模式的 channel 不能再被设置成 confirm 模式,反之亦然。 在异常情况中,broker 将无法成功处理相应的消息,此时 broker 将发送 basic.nack 来代替 basic.ack 。在这个情形下,basic.nack 中各域值的含义与 basic.ack 中相应各域含义是相同的,同时 requeue 域的值应该被忽略。通过 nack 一或多条消息,broker 表明自身无法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client 可以选择将消息 re-publish 。 在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被 nack 一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack 。
2. 消息在什么时候确认broker 将在下面的情况中对消息进行 confirm :
broker 会丢失持久化消息,如果 broker 在将上述消息写入磁盘前异常。在一定条件下,这种情况会导致 broker 以一种奇怪的方式运行。例如,考虑下述情景: 1. 一个 client 将持久消息 publish 到持久 queue 中 在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息也未被写入磁盘)。 3. 编程实现首先要区别AMQP协议mandatory和immediate标志位的作用。 mandatory和immediate是AMQP协议中basic.pulish方法中的两个标志位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。具体区别在于: 具体的代码参考请参考参考资料1.
参考资料: 1. http://blog.csdn.net/jiao_fuyou/article/details/21594205 2. http://blog.csdn.net/jiao_fuyou/article/details/21594947 3. http://my.oschina.net/moooofly/blog/142095 |
|
来自: Aske5rqhg680oe > 《待分类》