我试图找出为什么启用zmq_Conflate选项会导致没有收到消息的原因。
我重新创建了这个最小的测试用例(我的应用程序使用xpub/xsub代理,但是,这似乎不会改变测试的结果):
#include <atomic>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <zmq.hpp>
#define USE_PROXY
std::atomic<bool> stop{false};
void pub_thread(zmq::context_t &context)
{
zmq::socket_t pub(context, zmq::socket_type::pub);
#ifdef USE_PROXY
pub.connect("tcp://localhost:38922");
#else
pub.bind("tcp://*:38923");
#endif
long i = 0;
for(;;)
{
if(stop) break;
std::string m = boost::lexical_cast<std::string>(i);
zmq::message_t hdr(6);
memcpy(hdr.data(), "topic1", 6);
zmq::message_t msg(m.size());
memcpy(msg.data(), m.data(), m.size());
std::cout << "send: " << m << std::endl;
if(!pub.send(hdr, ZMQ_SNDMORE) || !pub.send(msg))
std::cout << "send error" << std::endl;
i++;
boost::this_thread::sleep_for(boost::chrono::milliseconds{20});
}
}
void sub_thread(zmq::context_t &context)
{
zmq::socket_t sub(context, zmq::socket_type::sub);
const int v_true = 1;
sub.setsockopt(ZMQ_CONFLATE, &v_true, sizeof(v_true));
#ifdef USE_PROXY
sub.connect("tcp://localhost:38921");
#else
sub.connect("tcp://localhost:38923");
#endif
sub.setsockopt(ZMQ_SUBSCRIBE, "topic1", 6);
for(;;)
{
if(stop) break;
zmq::message_t hdr, msg;
if(!sub.recv(&hdr) || !hdr.more() || !sub.recv(&msg))
std::cout << "recv error" << std::endl;
std::string m(reinterpret_cast<const char*>(msg.data()), msg.size());
std::cout << " recv: " << m << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds{250});
}
}
void proxy_thread(zmq::context_t &context)
{
#ifdef USE_PROXY
zmq::socket_t xpub(context, zmq::socket_type::xpub);
xpub.bind("tcp://*:38921");
zmq::socket_t xsub(context, zmq::socket_type::xsub);
xsub.bind("tcp://*:38922");
std::cout << "starting xpub/xsub proxy" << std::endl;
zmq::proxy(xpub, xsub, nullptr);
std::cout << "xpub/xsub proxy terminated" << std::endl;
#endif
}
void timeout_thread()
{
boost::this_thread::sleep_for(boost::chrono::seconds{4});
stop = true;
boost::this_thread::sleep_for(boost::chrono::seconds{1});
exit(0);
}
int main(int argc, char **argv)
{
zmq::context_t context(1);
boost::thread t0(&timeout_thread);
boost::thread t1(&proxy_thread, boost::ref(context));
boost::this_thread::sleep_for(boost::chrono::seconds{1});
boost::thread t2(&sub_thread, boost::ref(context));
boost::this_thread::sleep_for(boost::chrono::seconds{1});
boost::thread t3(&pub_thread, boost::ref(context));
t0.join();
}
快速描述:我们有4个线程:
-
pub线程:每20毫秒将递增计数器的值写入pub套接字
-
子线程:每隔250毫秒从子套接字读取一次值(消息应该排队,但是由于conflate选项的原因,除了最近的消息之外,应该丢弃它)
-
代理线程:运行xpub/xsubn代理(如果定义了use_proxy)
-
超时线程:4秒后停止所有操作
我观察到的输出如下:
starting xpub/xsub proxy
send: 0
send: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87
也就是说,从未收到任何消息。
预期的输出应该是这样的:
starting xpub/xsub proxy
send: 0
send: 1
recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
recv: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
recv: 21
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
recv: 33
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
recv: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
recv: 55
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
recv: 66
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
recv: 77
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87
我也试着搬家
sub.setsockopt(ZMQ_CONFLATE,...
之后
sub.connect(...
但在这种情况下,它没有效果,与移除zmq_合并线的效果相同:
starting xpub/xsub proxy
send: 0
send: 1
recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
recv: 2
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
recv: 3
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
recv: 4
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
recv: 5
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
recv: 6
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
recv: 7
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
recv: 8
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87
ZMQ版本:4.2.5