代码之家  ›  专栏  ›  技术社区  ›  jebentier

AMQP多任务处理

  •  1
  • jebentier  · 技术社区  · 11 年前

    我有一个有趣的情况需要解决。我需要有一个EventMachine循环,它在AMQP队列中等待消息,然后中断该循环,以便定期向单独的AMQP队列发送消息。我是EventMachine的新手,这就是我到目前为止所拥有的,只是EventMachine循环没有发送必要的消息。

    现在我做了两个proc:

        listen_loop = Proc.new {
            AMQP.start(connection_config) do |connection|
                AMQP::Channel.new(connection) do |channel|
                    channel.queue("queue1", :exclusive => false, :durable => true) do |requests_queue|
                        requests_queue.once_declared do
                            consumer = AMQP::Consumer.new(channel, requests_queue).consume
                            consumer.on_delivery do |metadata, payload|
                                puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
                                response = "responding"
                                channel.default_exchange.publish(response,
                                    :routing_key    => metadata.reply_to,
                                    :correlation_id => metadata.message_id,
                                    :mandatory      => true)
                                metadata.ack
                            end
                        end
                    end
                end
            end
            Signal.trap("INT")  { AMQP.stop { EM.stop } }
            Signal.trap("TERM") { AMQP.stop { EM.stop } }
        }
    
        send_message = Proc.new {
            AMQP.start(connection_config) do |connection|
                channel = AMQP::Channel.new(connection)
                queue   = channel.queue('queue2')
    
                channel.default_exchange.publish("hello world", :routing_key => queue.name)
                EM.add_timer(0.5) do
                    connection.close do
                        EM.stop{ exit }
                    end
                end
            end
        }
    

    然后我有了EventMachine循环:

        EM.run do 
            EM.add_periodic_timer(5) { send_message.call }
            listen_loop.call
        end
    

    我可以在侦听循环中接收消息,但无法在常规间隔内发送任何消息。

    1 回复  |  直到 11 年前
        1
  •  0
  •   jebentier    11 年前

    弄清楚我做错了什么。消息循环无法打开与RabbitMQ服务器的新连接,因为该服务器已连接。将所有内容合并到一个EventMachine循环中,并重用连接,它就可以工作了。

    对于那些好奇的人来说,它看起来是这样的:

    EM.run do
    
        AMQP.start(connection_config) do |connection|
            channel = AMQP::Channel.new(connection)
    
            EM.add_periodic_timer(5) { channel.default_exchange.publish("foo", :routing_key => 'queue2') }
    
            queue = channel.queue("queue1", :exclusive => false, :durable => true)
            channel.prefetch(1)
            queue.subscribe(:ack => true) do |metadata, payload|
                puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
                response = "bar"
                channel.default_exchange.publish(response,
                    :routing_key    => metadata.reply_to,
                    :correlation_id => metadata.message_id,
                    :mandatory      => true)
                metadata.ack
            end
        end
        Signal.trap("INT")  { AMQP.stop { EM.stop } }
        Signal.trap("TERM") { AMQP.stop { EM.stop } }
    
    end