代码之家  ›  专栏  ›  技术社区  ›  EMP

等待带有超时的单个rabbitmq消息

  •  7
  • EMP  · 技术社区  · 14 年前

    我想向rabbitmq服务器发送一条消息,然后等待一条回复消息(在“回复”队列上)。当然,我不想永远等待,以防应用程序处理这些消息时停机——需要有一个超时。这听起来是一项非常基本的任务,但我找不到一种方法来完成。我现在碰到了这两个问题 py-amqplib 以及 RabbitMQ .NET client .

    到目前为止,我得到的最佳解决方案是使用 basic_get 具有 sleep 但这很难看:

    def _wait_for_message_with_timeout(channel, queue_name, timeout):
        slept = 0
        sleep_interval = 0.1
    
        while slept < timeout:
            reply = channel.basic_get(queue_name)
            if reply is not None:
                return reply
    
            time.sleep(sleep_interval)
            slept += sleep_interval
    
        raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)
    

    肯定有更好的方法吗?

    5 回复  |  直到 14 年前
        1
  •  8
  •   asksol    14 年前

    我刚为添加了超时支持 amqplib 在里面 carrot .

    这是 amqplib.client0_8.Connection :

    http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

    wait_multi 是的版本 channel.wait 能够接收任意数字 个频道。

    我想这可能会在上游某个地方被合并。

        2
  •  8
  •   EMP    14 年前

    以下是我在.NET客户端中所做的:

    protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
    {
        var consumer = new QueueingBasicConsumer(Channel);
        var tag = Channel.BasicConsume(queueName, true, null, consumer);
        try
        {
            object result;
            if (!consumer.Queue.Dequeue(timeoutMs, out result))
                throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));
    
            return ((BasicDeliverEventArgs)result).Body;
        }
        finally
        {
            Channel.BasicCancel(tag);
        }
    }
    

    不幸的是,我不能对py amqplib做同样的操作,因为它 basic_consume 方法不调用回调,除非调用 channel.wait() 频道。等待() 不支持超时!这个愚蠢的限制(我不断遇到)意味着如果你再也没有收到另一条消息,你的线程将永远冻结。

        3
  •  2
  •   Alex Martelli    14 年前

    有一个例子 here 使用 qpid 用一个 msg = q.get(timeout=1) 你想怎么做就怎么做。抱歉,我不知道其他AMQP客户机库实现什么超时(特别是我不知道您提到的两个具体的超时)。

        4
  •  1
  •   duffymo    14 年前

    这似乎打破了异步处理的整个概念,但如果必须这样做,我认为正确的方法是使用 RpcClient .

        5
  •  1
  •   Deadly    8 年前

    Rabbit现在允许您添加超时事件。只需将代码包装在try catch中,然后在超时和断开连接处理程序中引发异常:

    try{
        using (IModel channel = rabbitConnection.connection.CreateModel())
        {
            client = new SimpleRpcClient(channel, "", "", queue);
            client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
            client.TimedOut += RpcTimedOutHandler;
            client.Disconnected += RpcDisconnectedHandler;
            byte[] replyMessageBytes = client.Call(message);
            return replyMessageBytes;
        }
    }
    catch (Exception){
        //Handle timeout and disconnect here
    }
    private void RpcDisconnectedHandler(object sender, EventArgs e)
    {
         throw new Exception("RPC disconnect exception occured.");
    }
    
    private void RpcTimedOutHandler(object sender, EventArgs e)
    {
         throw new Exception("RPC timeout exception occured.");
    }
    
    推荐文章