代码之家  ›  专栏  ›  技术社区  ›  fferri

zmq_conflate socket选项导致未接收到任何消息

  •  2
  • fferri  · 技术社区  · 6 年前

    我试图找出为什么启用zmq_Conflate选项会导致没有收到消息的原因。

    我重新创建了这个最小的测试用例(我的应用程序使用xpub/xsub代理,但是,这似乎不会改变测试的结果):

    #include <atomic>
    #include <boost/lexical_cast.hpp>
    #include <boost/thread.hpp>
    #include <iostream>
    #include <zmq.hpp>
    
    #define USE_PROXY
    
    std::atomic<bool> stop{false};
    
    void pub_thread(zmq::context_t &context)
    {
        zmq::socket_t pub(context, zmq::socket_type::pub);
    #ifdef USE_PROXY
        pub.connect("tcp://localhost:38922");
    #else
        pub.bind("tcp://*:38923");
    #endif
        long i = 0;
        for(;;)
        {
            if(stop) break;
            std::string m = boost::lexical_cast<std::string>(i);
            zmq::message_t hdr(6);
            memcpy(hdr.data(), "topic1", 6);
            zmq::message_t msg(m.size());
            memcpy(msg.data(), m.data(), m.size());
            std::cout << "send: " << m << std::endl;
            if(!pub.send(hdr, ZMQ_SNDMORE) || !pub.send(msg))
                std::cout << "send error" << std::endl;
            i++;
            boost::this_thread::sleep_for(boost::chrono::milliseconds{20});
        }
    }
    
    void sub_thread(zmq::context_t &context)
    {
        zmq::socket_t sub(context, zmq::socket_type::sub);
        const int v_true = 1;
        sub.setsockopt(ZMQ_CONFLATE, &v_true, sizeof(v_true));
    #ifdef USE_PROXY
        sub.connect("tcp://localhost:38921");
    #else
        sub.connect("tcp://localhost:38923");
    #endif
        sub.setsockopt(ZMQ_SUBSCRIBE, "topic1", 6);
        for(;;)
        {
            if(stop) break;
            zmq::message_t hdr, msg;
            if(!sub.recv(&hdr) || !hdr.more() || !sub.recv(&msg))
                std::cout << "recv error" << std::endl;
            std::string m(reinterpret_cast<const char*>(msg.data()), msg.size());
            std::cout << "                recv: " << m << std::endl;
            boost::this_thread::sleep_for(boost::chrono::milliseconds{250});
        }
    }
    
    void proxy_thread(zmq::context_t &context)
    {
    #ifdef USE_PROXY
        zmq::socket_t xpub(context, zmq::socket_type::xpub);
        xpub.bind("tcp://*:38921");
        zmq::socket_t xsub(context, zmq::socket_type::xsub);
        xsub.bind("tcp://*:38922");
        std::cout << "starting xpub/xsub proxy" << std::endl;
        zmq::proxy(xpub, xsub, nullptr);
        std::cout << "xpub/xsub proxy terminated" << std::endl;
    #endif
    }
    
    void timeout_thread()
    {
        boost::this_thread::sleep_for(boost::chrono::seconds{4});
        stop = true;
        boost::this_thread::sleep_for(boost::chrono::seconds{1});
        exit(0);
    }
    
    int main(int argc, char **argv)
    {
        zmq::context_t context(1);
        boost::thread t0(&timeout_thread);
        boost::thread t1(&proxy_thread, boost::ref(context));
        boost::this_thread::sleep_for(boost::chrono::seconds{1});
        boost::thread t2(&sub_thread, boost::ref(context));
        boost::this_thread::sleep_for(boost::chrono::seconds{1});
        boost::thread t3(&pub_thread, boost::ref(context));
        t0.join();
    }
    

    快速描述:我们有4个线程:

    • pub线程:每20毫秒将递增计数器的值写入pub套接字
    • 子线程:每隔250毫秒从子套接字读取一次值(消息应该排队,但是由于conflate选项的原因,除了最近的消息之外,应该丢弃它)
    • 代理线程:运行xpub/xsubn代理(如果定义了use_proxy)
    • 超时线程:4秒后停止所有操作

    我观察到的输出如下:

    starting xpub/xsub proxy
    send: 0
    send: 1
    send: 2
    send: 3
    send: 4
    send: 5
    send: 6
    send: 7
    send: 8
    send: 9
    send: 10
    send: 11
    send: 12
    send: 13
    send: 14
    send: 15
    send: 16
    send: 17
    send: 18
    send: 19
    send: 20
    send: 21
    send: 22
    send: 23
    send: 24
    send: 25
    send: 26
    send: 27
    send: 28
    send: 29
    send: 30
    send: 31
    send: 32
    send: 33
    send: 34
    send: 35
    send: 36
    send: 37
    send: 38
    send: 39
    send: 40
    send: 41
    send: 42
    send: 43
    send: 44
    send: 45
    send: 46
    send: 47
    send: 48
    send: 49
    send: 50
    send: 51
    send: 52
    send: 53
    send: 54
    send: 55
    send: 56
    send: 57
    send: 58
    send: 59
    send: 60
    send: 61
    send: 62
    send: 63
    send: 64
    send: 65
    send: 66
    send: 67
    send: 68
    send: 69
    send: 70
    send: 71
    send: 72
    send: 73
    send: 74
    send: 75
    send: 76
    send: 77
    send: 78
    send: 79
    send: 80
    send: 81
    send: 82
    send: 83
    send: 84
    send: 85
    send: 86
    send: 87
    

    也就是说,从未收到任何消息。

    预期的输出应该是这样的:

    starting xpub/xsub proxy
    send: 0
    send: 1
                    recv: 1
    send: 2
    send: 3
    send: 4
    send: 5
    send: 6
    send: 7
    send: 8
    send: 9
    send: 10
    send: 11
                    recv: 11
    send: 12
    send: 13
    send: 14
    send: 15
    send: 16
    send: 17
    send: 18
    send: 19
    send: 20
    send: 21
    send: 22
                    recv: 21
    send: 23
    send: 24
    send: 25
    send: 26
    send: 27
    send: 28
    send: 29
    send: 30
    send: 31
    send: 32
    send: 33
    send: 34
                    recv: 33
    send: 35
    send: 36
    send: 37
    send: 38
    send: 39
    send: 40
    send: 41
    send: 42
    send: 43
    send: 44
    send: 45
                    recv: 45
    send: 46
    send: 47
    send: 48
    send: 49
    send: 50
    send: 51
    send: 52
    send: 53
    send: 54
    send: 55
    send: 56
                    recv: 55
    send: 57
    send: 58
    send: 59
    send: 60
    send: 61
    send: 62
    send: 63
    send: 64
    send: 65
    send: 66
    send: 67
                    recv: 66
    send: 68
    send: 69
    send: 70
    send: 71
    send: 72
    send: 73
    send: 74
    send: 75
    send: 76
    send: 77
    send: 78
                    recv: 77
    send: 79
    send: 80
    send: 81
    send: 82
    send: 83
    send: 84
    send: 85
    send: 86
    send: 87
    

    我也试着搬家 sub.setsockopt(ZMQ_CONFLATE,... 之后 sub.connect(... 但在这种情况下,它没有效果,与移除zmq_合并线的效果相同:

    starting xpub/xsub proxy
    send: 0
    send: 1
                    recv: 1
    send: 2
    send: 3
    send: 4
    send: 5
    send: 6
    send: 7
    send: 8
    send: 9
    send: 10
    send: 11
                    recv: 2
    send: 12
    send: 13
    send: 14
    send: 15
    send: 16
    send: 17
    send: 18
    send: 19
    send: 20
    send: 21
    send: 22
                    recv: 3
    send: 23
    send: 24
    send: 25
    send: 26
    send: 27
    send: 28
    send: 29
    send: 30
    send: 31
    send: 32
    send: 33
    send: 34
                    recv: 4
    send: 35
    send: 36
    send: 37
    send: 38
    send: 39
    send: 40
    send: 41
    send: 42
    send: 43
    send: 44
    send: 45
                    recv: 5
    send: 46
    send: 47
    send: 48
    send: 49
    send: 50
    send: 51
    send: 52
    send: 53
    send: 54
    send: 55
    send: 56
                    recv: 6
    send: 57
    send: 58
    send: 59
    send: 60
    send: 61
    send: 62
    send: 63
    send: 64
    send: 65
    send: 66
    send: 67
                    recv: 7
    send: 68
    send: 69
    send: 70
    send: 71
    send: 72
    send: 73
    send: 74
    send: 75
    send: 76
    send: 77
    send: 78
                    recv: 8
    send: 79
    send: 80
    send: 81
    send: 82
    send: 83
    send: 84
    send: 85
    send: 86
    send: 87
    

    ZMQ版本:4.2.5

    2 回复  |  直到 6 年前
        1
  •  3
  •   James Harvey    6 年前

    ZMQ_CONFLATE

    sub.setsockopt(ZMQ_SUBSCRIBE, "", 0);

        2
  •  0
  •   fferri    6 年前