系统布局
我们有三个系统:
-
API端点(发布者和使用者)
-
RabbitMQ服务器
-
主要应用程序/处理器(发布者和消费者)
系统1和3都使用Laravel,并使用PHPAMQPLIB与RabbitMQ交互。
消息的路径
System 1(API端点)向RabbitMQ服务器发送序列化作业,供System 3处理。然后它立即声明一个新的随机命名队列,用相关ID将一个交换绑定到该队列,并开始侦听消息。
同时,系统3完成了作业,一旦完成,就用相关ID将该作业的详细信息返回给exchange上的RabbitMQ。
问题和我的努力
我经常发现这个过程失败了。作业被发送和接收,响应也被发送-但system 1从未读取此响应,我也没有看到它在RabbitMQ中发布。
我已经对此进行了大量调试,但没有找到根本原因。我目前的理论是,系统3返回响应的速度非常快,以至于新的队列和交换绑定还没有从系统1中声明出来。这意味着系统3的响应无处可去,结果消失了。这一理论主要基于这样一个事实,即如果我在System 3上设置作业以较低的频率处理,系统会变得更可靠。工作流程越快,就越不可靠。
问题是:我该如何防止这种情况?还是我还遗漏了什么?我当然希望这些工作能够快速高效地处理,而不破坏请求/响应模式。
我已经记录了两个系统的输出-它们都使用相同的相关ID工作,并且系统3在发布时得到ACK-而系统1有一个声明队列,没有消息,最终只是超时。
代码示例1:发布消息
/**
* Helper method to publish a message to RabbitMQ
*
* @param $exchange
* @param $message
* @param $correlation_id
* @return bool
*/
public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
{
try {
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->set_ack_handler(function (AMQPMessage $message) {
Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
});
$channel->set_nack_handler(function (AMQPMessage $message) {
Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
});
$channel->confirm_select();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
$msg = new AMQPMessage($message);
$channel->basic_publish($msg, $exchange, $correlation_id);
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
return true;
} catch (Exception $e) {
return false;
}
}
代码示例2:等待消息响应
/**
* Helper method to fetch messages from RabbitMQ.
*
* @param $exchange
* @param $correlation_id
* @return mixed
*/
public static function readAMQPRouteMessage($exchange, $correlation_id)
{
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
list($queue_name, ,) = $channel->queue_declare(
'',
false,
false,
true,
false
);
$channel->queue_bind($queue_name, $exchange, $correlation_id);
$callback = function ($msg) {
return self::$rfcResponse = $msg->body;
};
$channel->basic_consume(
$queue_name,
'',
false,
true,
false,
false,
$callback
);
if (!count($channel->callbacks)) {
Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
}
while (self::$rfcResponse === null && count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
return self::$rfcResponse;
}
感谢您提供的任何建议!