代码之家  ›  专栏  ›  技术社区  ›  Kadir Erdem Demir

最佳进程间消息传递模式,用于定期侦听同一本地主机上的多个进程

  •  0
  • Kadir Erdem Demir  · 技术社区  · 6 年前

    我有一个奇怪的情况,我需要从我的主进程创建许多进程。

    我创建的这些进程将对来自web套接字的一些消息进行排队。

    我的主要流程的最小示例是:

    Socket*[] socketList;
    
    string sRecv( Socket* socket)
    {
        ubyte[256] buffer;
        immutable size = socket.receive(buffer);
        import std.algorithm: min;
        return buffer[0 .. min(size,256)].idup.asString();
    }
    
    void startServer( string servername )
    {
        auto pid = spawnProcess(["/home/erdem/eclipse-workspace/WebSocketDenemesi/websocketdenemesi",
                                  servername, "\n"]);
        auto requester = new Socket(SocketType.req);
        auto allName = "ipc:///tmp/" ~  servername;
        requester.connect(allName);
        socketList ~= requester;
    
    }
    
    void main() {
    
        import std.array : split;
        import std.algorithm : each;
    
        startServer("iotabtc@depth");
        startServer("iotabtc@aggTrade");
        startServer("ethbtc@depth");
    
        int counter = 30;
        while(counter--) {
            foreach ( requester; socketList)
            {
                requester.send("send"); 
            }
    
            foreach ( requester; socketList)
            {
                auto strList = sRecv(requester).split("\n");
                strList.each!( str => writefln("Received [%d]reply [%s]", strList.length,  str) );
    
            }
            sleep(1000.msecs);
        }
        foreach ( requester; socketList)
        {
            requester.send("done"); 
        }
    }
    

    对于我的小流程,我有一个最小的例子:

    WebSocket startSocket( string temp )
    {
        auto ws_url = URL(temp);
        auto ws = connectWebSocket(ws_url);
        if ( !ws.connected )
            return null;    
        return  ws;
    }
    
    void close( WebSocket ws )
    {
        int timeOut = 5;
        while ( ws && ws.connected && timeOut-- )
        {
            vibe.core.concurrency.async( { ws.close(); return true;} ); 
            sleep(5.msecs);
        }   
    }
    
    string sRecv(ref Socket socket)
    {
        ubyte[256] buffer;
        immutable size = socket.tryReceive(buffer)[0];
        import std.algorithm: min;
        return size ? buffer[0 .. min(size,256)].idup.asString() : "";
    }
    
    void main( string[] args ) {
    
        auto responder = Socket(SocketType.rep);
        string seperatorChar = args[2];
        string temp = "ipc:///tmp/" ~ args[1];
        responder.bind(temp);
    
        string socketName =  "wss://stream.binance.com:9443/ws/" ~ args[1];
        auto curSocket = startSocket(socketName);
        string curString;
        while (true) {
            auto result = responder.sRecv();
            if ( result == "send")
            {
                responder.send(curString);      
                curString = "";
            }
            else if ( result == "done" )
            {
                break;
            }
            else 
            {
                if ( curSocket.dataAvailableForRead )
                {
                    auto text = curSocket.receiveText();
                    if ( !curString.empty )
                       curString ~= seperatorChar;
                    curString ~= text;
                }
            }
            sleep(100.msecs);
        }
        writeln( "Shutting down: ", args[1]);
        curSocket.close();
    
    }
    

    REQ/REP 插座。有没有更好的方法来达到我的要求。例如,是否有更好的消息传递模式?例如,是否有一种模式,其中我的小进程不被 responder.receive( buffer ); .

    如果有的话,我就不需要从另一个线程监听websocket了。

    1 回复  |  直到 6 年前
        1
  •  1
  •   user3666197    6 年前

    欢迎使用基于ZeroMQ的

    有没有 更好的消息传递模式 例如?

    这取决于您的流程需要如何进行通信。简而言之,使用 REQ/REP 几乎是菜单上最糟糕的选择。

    • ws.recv() + PUSHer.send() + if PULLer.poll(): PULLer.recv() 流水线事件获取+ PUSH/PULL 传播+条件再处理最符合现实世界的行为。

    • 考虑到处理场的占用空间可能会超出非本地节点的单个本地主机、其他传输类的范围~ { tipc:// | tcp:// | udp:// | pgm:// | epgm:// | norm:// | vmci:// } 可能会加入游戏 ipc:// -当前localhost-ZeroMQ上的链接在处理这种混合时的透明性是进入掌握ZenOfZero的一个很酷的好处。

    • PUB/SUB 可伸缩的正式通信原型模式可能会变得有益,可以选择使用 .setsockopt( zmq.CONFLATE, 1 )