代码之家  ›  专栏  ›  技术社区  ›  PPP

C++和Python ZeroMQ 4。x发布/订阅示例不起作用

  •  11
  • PPP  · 技术社区  · 7 年前

    我只能找到旧的C++源代码示例。不管怎样,我是根据他们做的。这是我的python出版商:

    import zmq
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5563")
    
    while True:
        msg = "hello"
        socket.send_string(msg)
        print("sent "+ msg)
        sleep(5)
    

    这是C++中的订阅服务器:

    void * ctx = zmq_ctx_new();
    void * subscriber = zmq_socket(ctx, ZMQ_SUB);
    // zmq_connect(subscriber, "tcp://*:5563");
       zmq_connect(subscriber, "tcp://localhost:5563");
    // zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));
    
    while (true) {
        zmq_msg_t msg;
        int rc;
    
        rc = zmq_msg_init( & msg);
        assert(rc == 0);
        std::cout << "waiting for message..." << std::endl;
        rc = zmq_msg_recv( & msg, subscriber, 0);
        assert(rc == 1);
    
        std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
        zmq_msg_close( & msg);
    }
    

    起初,我试过 zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") ); 但我想如果我不设置这个,我应该得到一切,对吗?所以我评论了一下。

    当我运行代码时,我看到“正在等待消息…”永远

    我尝试使用 tcpdump . 结果是,当发布服务器打开时,我在 5563 端口,当我关闭发布服务器时,它们会停止。当我试着 PUSH/PULL scheme,我可以在 命令 . (我试着用nodejs推,用c++拉,结果成功了)。

    我可能做错了什么?

    我尝试了不同的组合 .bind() , .connect() , localhost , 127.0.0.1 ,但它们也不起作用。

    更新 :我刚读到我必须订阅一些东西,所以我订阅了 zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 ); 订阅所有内容,但我仍然没有收到任何内容

    PyZMQ版本为17.0.0。b3并具有ZeroMQ 4.2.3

    C++有ZeroMQ 4.2.2

    更新2:

    将两者都更新到4.2.3,也不起作用。

    4 回复  |  直到 7 年前
        1
  •  5
  •   user3666197    7 年前

    “我想如果我不设置这个,我应该得到一切,对吗? "

    不,这不是一个正确的假设。你可能会喜欢 collection of my other ZeroMQ posts here, about a { plain-string | unicode | serialisation }-issues and the { performance- | traffic- }-impacts actual policy ( SUB -side topic-filter processing on early ZeroMQ versions, and/or the PUB -side processing for more recent ones ) one may encounter in heterogeneous distributed-systems' design, using ZeroMQ.

    (任何其他可扩展的正式通信原型模式,如 PUSH/PULL ,不处理订阅策略,因此将独立于针对设置的主题筛选器列表的订阅匹配处理。)


    步骤0:首先测试发送部分RTO,如果 .send() -什么都可以:

    让我们模拟一个快速的pythonic接收器,看看发送者是否真的发送了任何东西:

    import zmq
    
    aContext = zmq.Context()                                          # .new Context
    aSUB     = aContext.socket( zmq.SUB )                             # .new Socket
    aSUB.connect( "tcp://127.0.0.1:5563" )                            # .connect
    aSUB.setsockopt( zmq.LINGER, 0 )                                  # .set ALWAYS!
    aSUB.setsockopt( zmq.SUBSCRIBE, "" )                              # .set T-filter
    
    MASK = "INF: .recv()-ed this:[{0:}]\n:     waited {1: > 7d} [us]"
    aClk = zmq.Stopwatch(); 
    
    while True:
          try:
               aClk.start(); print MASK.format( aSUB.recv(),
                                                aClk.stop()
                                                )
          except ( KeyboardInterrupt, SystemExit ):
               pass
               break
    pass
    aSUB.close()                                                     # .close ALWAYS!
    aContext.term()                                                  # .term  ALWAYS!
    

    这应该是报告 酒吧 -发件人实际上是 .发送() -以及实际的消息到达时间(in [us] ,很高兴ZeroMQ包含了这个用于调试和性能/延迟调整的工具)。

    如果你看到直播时确认 INF: -消息实际上在屏幕上滴答作响,让它继续运行,现在可以继续下一步了。


    步骤1:接下来测试接收部件代码:

    #include <zmq.h>
    void *aContext = zmq_ctx_new();        
    void *aSUB     = zmq_socket(     aContext, ZMQ_SUB );               std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
                     zmq_connect(    aSUB,    "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
                     zmq_setsockopt( aSUB,     ZMQ_SUBSCRIBE, "", 0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
                     zmq_setsockopt( aSUB,     ZMQ_LINGER,        0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ...    ) done" << std::endl;
    int rc;
    while (true) {
             zmq_msg_t       msg;                            /*       Create an empty ØMQ message */
        rc = zmq_msg_init  (&msg);          assert (rc ==  0 && "EXC: in zmq_msg_init() call" );
                                                   std::cout << "INF: .. zmq_msg_init() done" << std::endl;
        rc = zmq_msg_recv  (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
                                                   std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
             zmq_msg_close (&msg);                           /*       Release message */
                                                   std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
    }
    zmq_close(    aSUB );                          std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
    zmq_ctx_term( aContext );                      std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
    
        2
  •  2
  •   user3666197    7 年前

    返回值是多少 zmq_setsockopt() ?

    那么你应该使用 "" 而不是 NULL ,它们是不同的。

    zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );
    

    按照API的定义:

    返回值

    这个 zmq\u setsockopt() 如果成功,函数应返回零。否则应返回-1并设置 errno 到以下定义的值之一。
    ...

        3
  •  2
  •   Dorian B.    7 年前

    运行发布/订阅模式(无论语言如何)的正确方法是:

    酒吧

    1. socket(zmq.PUB)
    2. bind("tcp://127.0.0.1:5555")
    3. 编码(通常只对字符串进行encode(),但也可以对对象进行compress()或dumps(),甚至两者都可以) encoded_topic = topic.encode() encoded_msg = msg.encode()
    4. send_multipart([encoded_topic, encoded_msg])

    附属的

    1. socket(zmq.SUB)
    2. setsockopt(zmq.SUBSCRIBE, topic.encode())
    3. connect("tcp://127.0.0.1:5555")
    4. answer = recv_multipart()
    5. 解码答案 enc_topic, enc_msg = answer topic = enc_topic.decode() msg = enc_msg.decode()

    一般来说,步骤Pub-2/Sub-3(即绑定/连接)和Pub-3/Sub- 5(即编码/解码或转储/加载)需要相互补充才能正常工作。

        4
  •  0
  •   PPP    7 年前

    是我,那个提出问题的人。

    我设法通过 交换 socket.bind("tcp://*:5563") socket.connect("tcp://dns_address_of_my_dcker_container:5564") 在python中 ,

    和交换 zmq_connect(subscriber, "tcp://localhost:5563") zmq_bind(subscriber, "tcp://*:5563") 在C中++

    我在网上找到的例子说我应该使用 bind 为出版商和 connect 对于订阅者来说,但这对我来说根本不起作用。有人知道为什么吗?

    ZeroMQ文档说明如下:

    函数的作用是:将套接字绑定到本地端点,然后 接受该端点上的传入连接。

    函数的作用是:将套接字连接到端点,然后 接受该端点上的传入连接。

    我不清楚是什么改变了,但它起了作用。