代码之家  ›  专栏  ›  技术社区  ›  jkff

如何收回rabbitmq中的消息?

  •  13
  • jkff  · 技术社区  · 14 年前

    我有一个类似于rabbitmq上的作业队列的东西,在请求取消作业时,我想收回尚未开始处理的任务(它们的消息尚未确认),这相当于从它们被路由到的队列中收回这些消息。

    我在amqp或rabbitmq api中没有找到这个功能;也许我搜索得不够好?或者我必须使用一个变通方法(这并不困难,但仍然如此)?

    4 回复  |  直到 11 年前
        1
  •  8
  •   cdeszaq Sudhir N    11 年前

    我将通过让工人检查某种权威数据源来解决这个问题,以确定该作业是否应该继续。例如,工作人员将检查数据库中作业的状态,以查看作业是否已被取消。

    对于处理作业的速度可能比可以更新和读取权威存储的速度快的情况,一个不太有保障的数据存储(将速度用于其他特性)可能有用。

    其中一个例子是使用redis作为存储来取消对消息的处理,而不是像mysql这样的关系数据库。redis速度非常快,但对它所保存的数据的保证较少,而mysql速度慢得多,但对它所保存的数据提供了更多的保证。

    最后,检查另一个源是否处理消息的概念是相同的,但实现的方式取决于特定的场景。

        2
  •  4
  •   Tony Garnock-Jones    13 年前

    rabbitmq不允许您在消息排队后修改或删除它们。为此,您需要某种数据库来保存 状态 并使用rabbitmq通知相关方该状态的更改。

    对于较低的卷,您可以将其与每个作业的队列组合在一起。创建队列,将作业描述发布到队列,将队列的名称通知工作人员。如果在处理作业之前需要取消作业,请删除作业队列;当工作人员来提取作业描述时,他们会注意到队列已消失。

    更轻,通常更好的方法是使用redis或另一个键/值存储来保持作业状态(删除或不存在的记录表示已取消或不存在的作业),并使用rabbitmq来通知键/值存储中新的/删除的/更改的记录。

        3
  •  2
  •   Sergei Zimakov    14 年前

    至少有两种实现目标的方法:

        4
  •  1
  •   Michael Dillon    13 年前

    您需要订阅已将消息路由到的所有队列,并使用ACK使用它们。

    例如,如果您以“test”作为路由密钥发布到主题交换,并且有3个持久队列订阅“test”,则需要使用这三个队列。最好添加另一个队列,您的使用者进程也会监听该队列,并告诉他们忽略这些消息。

    另一种选择是,因为您使用的是rabbitmq,编写一个自定义的交换插件,它将接受一些带外指令来清除所有队列。例如,您可能让Exchange读取一个特殊的消息头,告诉它清除该消息要发往的所有队列。这确实需要编写Erlang代码,但是实现了4种不同的交换类型,因此您只需要复制最相似的类型,并为新的Bahaviours编写代码。如果您对此只使用自定义头,那么消息的主体对于消费者来说可能是一条普通的消息。

    总结如下:

    1)发布者需要使用消息本身 2)发布者可以在特殊队列中发送特殊消息,通知消费者忽略该消息。 3)发布者可以向自定义交换发送一条特殊消息,在将此特殊消息发送给消费者之前,该交换将清除队列中的任何现有消息。