代码之家  ›  专栏  ›  技术社区  ›  CmdrSharp

发布前请确保存在AMQP exchange绑定

  •  0
  • CmdrSharp  · 技术社区  · 6 年前

    系统布局

    我们有三个系统:

    1. API端点(发布者和使用者)
    2. RabbitMQ服务器
    3. 主要应用程序/处理器(发布者和消费者)

    系统1和3都使用Laravel,并使用PHPAMQPLIB与RabbitMQ交互。

    消息的路径

    System 1(API端点)向RabbitMQ服务器发送序列化作业,供System 3处理。然后它立即声明一个新的随机命名队列,用相关ID将一个交换绑定到该队列,并开始侦听消息。

    同时,系统3完成了作业,一旦完成,就用相关ID将该作业的详细信息返回给exchange上的RabbitMQ。

    问题和我的努力

    我经常发现这个过程失败了。作业被发送和接收,响应也被发送-但system 1从未读取此响应,我也没有看到它在RabbitMQ中发布。

    我已经对此进行了大量调试,但没有找到根本原因。我目前的理论是,系统3返回响应的速度非常快,以至于新的队列和交换绑定还没有从系统1中声明出来。这意味着系统3的响应无处可去,结果消失了。这一理论主要基于这样一个事实,即如果我在System 3上设置作业以较低的频率处理,系统会变得更可靠。工作流程越快,就越不可靠。

    问题是:我该如何防止这种情况?还是我还遗漏了什么?我当然希望这些工作能够快速高效地处理,而不破坏请求/响应模式。

    我已经记录了两个系统的输出-它们都使用相同的相关ID工作,并且系统3在发布时得到ACK-而系统1有一个声明队列,没有消息,最终只是超时。

    代码示例1:发布消息

    /**
     * Helper method to publish a message to RabbitMQ
     *
     * @param $exchange
     * @param $message
     * @param $correlation_id
     * @return bool
     */
    public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
    {
        try {
            $connection = new AMQPStreamConnection(
                env('RABBITMQ_HOST'),
                env('RABBITMQ_PORT'),
                env('RABBITMQ_LOGIN'),
                env('RABBITMQ_PASSWORD'),
                env('RABBITMQ_VHOST')
            );
            $channel = $connection->channel();
    
            $channel->set_ack_handler(function (AMQPMessage $message) {
                Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
            });
    
            $channel->set_nack_handler(function (AMQPMessage $message) {
                Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
            });
    
            $channel->confirm_select();
    
            $channel->exchange_declare(
                $exchange,
                'direct',
                false,
                false,
                false
            );
    
            $msg = new AMQPMessage($message);
            $channel->basic_publish($msg, $exchange, $correlation_id);
    
            $channel->wait_for_pending_acks();
    
            $channel->close();
            $connection->close();
    
            return true;
        } catch (Exception $e) {
            return false;
        }
    }
    

    代码示例2:等待消息响应

    /**
     * Helper method to fetch messages from RabbitMQ.
     *
     * @param $exchange
     * @param $correlation_id
     * @return mixed
     */
    public static function readAMQPRouteMessage($exchange, $correlation_id)
    {
        $connection = new AMQPStreamConnection(
            env('RABBITMQ_HOST'),
            env('RABBITMQ_PORT'),
            env('RABBITMQ_LOGIN'),
            env('RABBITMQ_PASSWORD'),
            env('RABBITMQ_VHOST')
        );
        $channel = $connection->channel();
    
        $channel->exchange_declare(
            $exchange,
            'direct',
            false,
            false,
            false
        );
    
        list($queue_name, ,) = $channel->queue_declare(
            '',
            false,
            false,
            true,
            false
        );
    
        $channel->queue_bind($queue_name, $exchange, $correlation_id);
    
        $callback = function ($msg) {
            return self::$rfcResponse = $msg->body;
        };
    
        $channel->basic_consume(
            $queue_name,
            '',
            false,
            true,
            false,
            false,
            $callback
        );
    
        if (!count($channel->callbacks)) {
            Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
        }
    
        while (self::$rfcResponse === null && count($channel->callbacks)) {
            $channel->wait();
        }
    
        $channel->close();
        $connection->close();
    
        return self::$rfcResponse;
    }
    

    感谢您提供的任何建议!

    1 回复  |  直到 6 年前
        1
  •  1
  •   IMSoP    6 年前

    我可能遗漏了什么,但当我读到这篇文章时:

    System 1(API端点)向RabbitMQ服务器发送序列化作业,供System 3处理。然后它立即声明一个新的随机命名队列,用相关ID将一个交换绑定到该队列,并开始侦听消息。

    我的第一个想法是:“为什么要等到消息发送后再声明返回队列?”

    事实上,我们在这里有一系列单独的步骤:

    1. 生成相关ID
    2. 将包含该ID的消息发布到exchange以在其他地方进行处理
    3. 声明新队列以接收响应
    4. 使用相关ID将队列绑定到exchange
    5. 将回调绑定到新队列
    6. 正在等待响应

    在第2步之后才能做出响应,因此我们希望尽可能晚地做出响应。在这之前唯一不能完成的步骤是步骤6,但在代码中把步骤5和6放在一起可能比较方便。因此,我会重新安排代码:

    1. 生成相关ID
    2. 声明新队列以接收响应
    3. 使用相关ID将队列绑定到exchange
    4. 将包含相关ID的消息发布到exchange以在其他地方进行处理
    5. 将回调绑定到新队列
    6. 正在等待响应

    这样,无论响应发布的速度有多快,它都会被步骤2中声明的队列拾取,并且一旦绑定回调并开始等待,就会对其进行处理。

    请注意,没有什么 readAMQPRouteMessage 知道这一点 publishAMQPRouteMessage 不会,因此您可以在它们之间自由移动代码。当您想从响应队列中使用时,只需要它的名称,您可以将其保存到变量中并传递,也可以自己生成,而不是让RabbitMQ命名。例如,您可以用它正在侦听的相关ID来命名它,这样您就可以通过简单的字符串操作来确定它是什么,例如。 "job_response.{$correlation_id}"