代码之家  ›  专栏  ›  技术社区  ›  Edward

如何避免与“asio::ip::tcp::iostream”的数据竞争?

  •  7
  • Edward  · 技术社区  · 7 年前

    我的问题

    使用两个线程通过 asio::ip::tcp::iostream ?

    设计

    我正在编写一个使用 asio::ip::tcp::iostream 用于输入和输出。程序通过端口5555接受(远程)用户的命令,并通过相同的TCP连接向用户发送消息。因为这些事件(从用户接收的命令或发送给用户的消息)是异步发生的,所以我有单独的传输线程和接收线程。

    在这个玩具版本中,命令是“一”、“二”和“退出”。当然,“退出”会退出程序。其他命令不起任何作用,任何无法识别的命令都会导致服务器关闭TCP连接。

    传输的消息是简单的序列号消息,每秒发送一次。

    在这个玩具版本和我试图编写的真实代码中,发送和接收进程都使用阻塞IO,因此似乎没有一种好的方法来使用 std::mutex 或其他同步机制。(在我的尝试中,一个进程将获取互斥锁,然后阻塞,这对这不起作用。)

    构建和测试

    为了构建和测试它,我在64位Linux机器上使用gcc版本7.2.1和valgrind 3.13。构建:

    g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread
    

    为了进行测试,我使用以下命令运行服务器:

    valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent 
    

    然后我使用 telnet 127.0.0.1 5555 在另一个窗口中创建到服务器的连接。什么 helgrind 正确地指出,存在数据竞争,因为 runTx runRx 正在尝试异步访问同一流:

    ===16188==线程#1在0x1ffeff1cc读取大小1期间可能存在数据争用

    ==16188==锁定:无

    ... 删除了更多行

    同时发生的cpp公司

    #include <asio.hpp>
    #include <iostream>
    #include <fstream>
    #include <thread>
    #include <array>
    #include <chrono>
    
    class Console {
    public:
        Console() :
            want_quit{false},
            want_reset{false}
        {}
        bool getQuitValue() const { return want_quit; }
        int run(std::istream *in, std::ostream *out);
        bool wantReset() const { return want_reset; }
    private:
        int runTx(std::istream *in);
        int runRx(std::ostream *out);
        bool want_quit;
        bool want_reset;
    };
    
    int Console::runTx(std::istream *in) {
        static const std::array<std::string, 3> cmds{
            "quit", "one", "two", 
        };
        std::string command;
        while (!want_quit && !want_reset && *in >> command) {
            if (command == cmds.front()) {
                want_quit = true;
            }
            if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
                want_reset = true;
                std::cout << "unknown command [" << command << "]\n";
            } else {
                std::cout << command << '\n';
            }
        }
        return 0;
    }
    
    int Console::runRx(std::ostream *out) {
        for (int i=0; !(want_reset || want_quit); ++i) {
            (*out) << "This is message number " << i << '\n';
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            out->flush();
        }
        return 0;
    }
    
    int Console::run(std::istream *in, std::ostream *out) {
        want_reset = false;
        std::thread t1{&Console::runRx, this, out};
        int status = runTx(in);
        t1.join();
        return status;
    }
    
    int main()
    {
        Console con;
        asio::io_service ios;
        // IPv4 address, port 5555
        asio::ip::tcp::acceptor acceptor(ios, 
                asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
        while (!con.getQuitValue()) {
            asio::ip::tcp::iostream stream;
            acceptor.accept(*stream.rdbuf());
            con.run(&stream, &stream);
            if (con.wantReset()) {
                std::cout << "resetting\n";
            }
        }
    }
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   sehe    7 年前

    是的,你正在共享流下的套接字,没有同步

    旁注,与布尔标志相同,可以通过更改以下内容轻松“修复”:

    std::atomic_bool want_quit;
    std::atomic_bool want_reset;
    

    如何解决

    老实说,我认为没有好的解决办法。您自己也说过:这些操作是异步的,所以如果您尝试同步执行它们,您会遇到麻烦。

    你可以试着想想黑客。如果我们基于相同的底层套接字(filedescriptor)创建一个单独的流对象,会怎么样。不会的 非常 如此简单的溪流不是Asio的一部分。

    但我们可以使用Boost Iostreams破解一个:

    #define BOOST_IOSTREAMS_USE_DEPRECATED
    #include <boost/iostreams/device/file_descriptor.hpp>
    #include <boost/iostreams/stream.hpp>
    
    // .... later:
    
        // HACK: procure a _separate `ostream` to prevent the race, using the same fd
        namespace bio = boost::iostreams;
        bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
        bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
    
        con.run(stream, hack_ostream);
    

    实际上,这在没有竞争的情况下运行(在同一套接字上同时读写 are fine ,只要不共享包装它们的非线程安全Asio对象)。

    我的建议是:

    不要那样做 . 这是一个难题。您正在使事情复杂化,显然是为了避免使用异步代码。我会咬紧牙关的。

    将IO机制从服务逻辑中分离出来并不需要太多的工作。您最终将摆脱随机限制(您可以考虑处理多个客户端,您可以不使用任何线程 完全 等等)。

    如果您想了解一些中间立场,请查看堆叠式协同路由( http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html )

    表册

    仅供参考

    注:我进行了重构,以消除对指针的需要。您没有转移所有权,所以可以使用引用。如果您不知道如何将引用传递给 bind / std::thread 构造函数,诀窍在于 std::ref 你会看到的。

    [对于压力测试,我已经大大减少了延迟。]

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <fstream>
    #include <thread>
    #include <array>
    #include <chrono>
    
    class Console {
    public:
        Console() :
            want_quit{false},
            want_reset{false}
        {}
        bool getQuitValue() const { return want_quit; }
        int run(std::istream &in, std::ostream &out);
        bool wantReset() const { return want_reset; }
    private:
        int runTx(std::istream &in);
        int runRx(std::ostream &out);
        std::atomic_bool want_quit;
        std::atomic_bool want_reset;
    };
    
    int Console::runTx(std::istream &in) {
        static const std::array<std::string, 3> cmds{
            {"quit", "one", "two"}, 
        };
        std::string command;
        while (!want_quit && !want_reset && in >> command) {
            if (command == cmds.front()) {
                want_quit = true;
            }
            if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
                want_reset = true;
                std::cout << "unknown command [" << command << "]\n";
            } else {
                std::cout << command << '\n';
            }
        }
        return 0;
    }
    
    int Console::runRx(std::ostream &out) {
        for (int i=0; !(want_reset || want_quit); ++i) {
            out << "This is message number " << i << '\n';
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
            out.flush();
        }
        return 0;
    }
    
    int Console::run(std::istream &in, std::ostream &out) {
        want_reset = false;
        std::thread t1{&Console::runRx, this, std::ref(out)};
        int status = runTx(in);
        t1.join();
        return status;
    }
    
    #define BOOST_IOSTREAMS_USE_DEPRECATED
    #include <boost/iostreams/device/file_descriptor.hpp>
    #include <boost/iostreams/stream.hpp>
    
    int main()
    {
        Console con;
        boost::asio::io_service ios;
    
        // IPv4 address, port 5555
        boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});
    
        while (!con.getQuitValue()) {
            boost::asio::ip::tcp::iostream stream;
            acceptor.accept(*stream.rdbuf());
    
            {
                // HACK: procure a _separate `ostream` to prevent the race, using the same fd
                namespace bio = boost::iostreams;
                bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
                bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
    
                con.run(stream, hack_ostream);
            }
    
            if (con.wantReset()) {
                std::cout << "resetting\n";
            }
        }
    }
    

    测试:

    netcat localhost 5555 <<<quit
    This is message number 0
    This is message number 1
    This is message number 2
    

    commands=( one two one two one two one two one two one two one two three )
    while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)
    

    无限期运行,偶尔重置连接(当发送命令“三”时)。