代码之家  ›  专栏  ›  技术社区  ›  The Quantum Physicist

boost::beast:多个异步写入调用正在触发断言错误

  •  3
  • The Quantum Physicist  · 技术社区  · 6 年前

    我正在为我的全双工服务器编写测试,当我执行多个(顺序)时 async_write 调用(尽管包含一个字符串),我从 boost::beast 在文件中 boost/beast/websocket/detail/stream_base.hpp :

    // If this assert goes off it means you are attempting to
    // simultaneously initiate more than one of same asynchronous
    // operation, which is not allowed. For example, you must wait
    // for an async_read to complete before performing another
    // async_read.
    //
    BOOST_ASSERT(id_ != T::id);
    

    要在计算机上重现问题: 可以找到复制此问题(MCVE)的完整客户端代码 here . 它在链接中不起作用,因为您需要一个服务器(在您自己的计算机上,很抱歉,无法方便地在线执行此操作,这更客观地表明问题出在客户端,而不是服务器(如果我将其包含在此处)。我用过 websocketd 使用命令创建服务器 ./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py 哪里 ./prog.py 是一个简单的python程序,可以打印和刷新(我从 websocketd home page )

    在客户端中执行写入操作的调用如下所示:

      std::vector<std::vector<std::future<void>>> clients_write_futures(
          clients_count);
      for (int i = 0; i < clients_count; i++) {
        clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
        for (int j = 0; j < num_of_messages; j++) {
          clients_write_futures[i][j] =
              clients[i]->write_data_async_future("Hello"); // writing here
        }
      }
    

    注意,在这个例子中我只使用了一个客户机。客户机数组只是为了在测试时给服务器带来更大压力的一种概括。

    我对这个问题的看法是:

    1. 循环是连续的;这不像我在多个线程中这样做
    2. 应该可以以全双工形式进行通信,在这种情况下,向服务器发送的消息数量不定。否则,如何进行全双工通信?
    3. 我正在使用串来包装异步调用,以防止通过io_服务/io_上下文在套接字中发生任何冲突
    4. 使用调试器对此进行研究表明,循环的第二次迭代始终失败,这意味着我做了一些根本错误的事情,但我不知道这是什么。换句话说:这显然是一个确定性问题。

    我在这里做错什么了?如何向websocket服务器写入无限数量的消息?


    编辑:

    Sehe,我想先为代码混乱道歉(没意识到有那么糟糕),并感谢你在这方面所做的努力。我希望你能问我,为什么它的结构是这样(可能)有组织的,同时也是混乱的,答案很简单:主要是一个gtest代码,看看我的通用的,多功能的websocket客户端是否工作,我用它来对我的服务器进行压力测试(它使用了大量的多线程IO服务)。ce对象,我认为它很敏感,需要广泛的测试)。我计划在实际的生产测试中同时用许多客户机轰炸我的服务器。我发这个问题是因为我不明白客户的行为。我在这个文件中所做的是创建一个mcve(人们一致要求这样做)。我花了两个小时剥离代码来创建它,最后我复制了gtest测试fixture代码(这是服务器上的一个fixture),并将其粘贴到主服务器上,验证了问题仍然存在于另一个服务器上,并清理了一点(这显然是不够的)。

    为什么我不捕捉异常呢?因为gtest会抓住他们并认为测试失败。主要的不是生产代码,而是客户机。我从你提到的学到了很多,我不得不说,扔和抓是愚蠢的,但是我不知道STD::MaMaGeExpOuttoSpRE(),所以我找到了我(哑)的方法来达到同样的结果:-)。为什么有太多无用的函数:在这个测试/示例中,它们是无用的,但是通常我可以在以后的其他事情中使用它们,因为这个客户机不仅适用于这个例子。

    现在回到问题上来:我不明白为什么我们要掩盖 异步写 当在主线程的循环中按顺序使用strand时(我错误地表示我只覆盖了处理程序)。我理解为什么要覆盖处理程序,因为套接字不是线程安全的,而且是多线程的 io_service 会在那里创造一场比赛。我们也知道 io_service::post 它本身是线程安全的(这就是为什么我认为不需要包装异步写)。你能解释一下什么是线程不安全的,当我们这样做时,我们需要包装异步写本身吗?我知道你已经知道了,但同样的断言仍然在发射。我们将处理程序和异步队列排序,客户端仍然不喜欢进行多个写调用。还有什么会丢失?

    (顺便说一句,如果你写了,那么就得到了未来,然后读,然后再写,它就工作了。这就是为什么我使用futures来精确定义测试用例和定义测试的时间顺序。我有点多疑。)

    1 回复  |  直到 6 年前
        1
  •  2
  •   sehe    6 年前

    你掩护你的 async_write 一根绳子。但是 你不做这种事 . 你所能看到的就是 在该链中包装完成处理程序 . 但是你在发布异步操作 直接地 .

    更糟糕的是,您是从主线程执行的,而与您的 WSClient 实例,这意味着您同时访问的对象实例不是线程安全的。

    这是一场数据竞赛 Undefined Behaviour .

    一个天真的解决方案可能是:

    std::future<void> write_data_async_future(const std::string &data) {
        // shared_ptr is used to ensure data's survival
        std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
        std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
    
        post(strand_, [=,self=shared_from_this()] {
            websock.async_write(
                boost::asio::buffer(*data_ptr),
                boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
                                                              std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                              write_promise)));
        });
    
        return write_promise->get_future();
    }
    

    但那是 不够 . 现在您可以确定您的异步操作或其完成不会同时运行,但是您仍然可以在调用第一个异步操作的完成处理程序之前发布下一个异步操作。

    为了解决这个问题,你只需要排队。

    老实说,我不知道你为什么这么关注使用未来的同步。这让我们很难做到这一点。如果你能描述一下你/功能上/想要达到的目标,我可以提出一个可能要短得多的解决方案。

    代码评审说明

    在我意识到代码的意义之前,我花了很多时间阅读你的代码。我不想抢了你一路上的笔记。

    警告:这是一个相当长的代码潜水。我之所以提供它,是因为一些见解可能会帮助您了解如何重新构造代码。

    我开始阅读异步代码链直到 on_handshake (哪个) 集合 这个 started_promise 价值)。

    然后我去了马尔斯特罗姆那是你的 main 功能。你的主要功能是50行代码?!有几个并行容器和重复的手动嵌套循环?

    这是我经过重构后得到的:

    int main() {
        std::vector<actor> actors(1);
    
        for (auto& a : actors) {
            a.client = std::make_shared<WSClient>();
            a.session_start_future = a.client->start("127.0.0.1", "8085");
            a.messages.resize(50);
        }
    
        for (auto& a : actors) { a.session_start_future.get(); }
    
        for (auto& a : actors) { for (auto& m : a.messages) {
            m.write_future = a.client->write_data_async_future("Hello");
        } }
    
        for (auto& a : actors) { for (auto& m : a.messages) {
            m.read_future = a.client->read_data_async_future();
        } }
    
        for (auto& a : actors) { for (auto& m : a.messages) {
            m.write_future.get();
            std::string result = m.read_future.get();
        } }
    }
    

    所有的数据结构都被折叠到小助手中 actor :

    struct actor {
        std::shared_ptr<WSClient> client;
        std::future<void> session_start_future;
    
        struct message {
            std::string message = GenerateRandomString(20);
            std::future<void> write_future;
            std::future<std::string> read_future;
        };
    
        std::vector<message> messages;
    };
    

    我们现在大约一个小时的代码审查,没有任何收获,除了我们现在可以告诉 主要的 正在做,并且有信心循环变量或其他东西不会有任何小错误。

    拾起

    开始写作时: write_data_async_future . 等待。还有 write_data_async write_data_sync . 为什么?你会想看的

    更糟的是, 威斯林特 只是把这些转达给 单一的 会议。为什么有区别 威斯林特 WSClientSession 在这一点上?我说,没有。

    再蒸发30行不太有用的代码,我们仍然有同样的失败,所以这是好的。

    我们去哪了。 写入数据异步未来 . 哦,是的,我们需要非未来版本吗?不,所以,还有40行代码没了。

    现在,说真的: 写入数据异步未来 :

    std::future<void> write_data_async_future(const std::string &data) {
        // shared_ptr is used to ensure data's survival
        std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
        std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
        websock.async_write(
            boost::asio::buffer(*data_ptr),
            boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
                                                          std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                          write_promise)));
        return write_promise->get_future();
    }
    

    看。。。奥凯什等等,有 on_write_future ?这可能意味着我们需要蒸发更多未使用的代码行。看。。。是的。噗噗,走了。

    现在,diffstat看起来像这样:

      test.cpp | 683 +++++++++++++++++++++++----------------------------------------
      1 file changed, 249 insertions(+), 434 deletions(-)
    

    回到那个函数,让我们看看 论未来 :

    void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
                         std::shared_ptr<std::string> data_posted,
                         std::shared_ptr<std::promise<void> > write_promise) {
        boost::ignore_unused(bytes_transferred);
        boost::ignore_unused(data_posted);
    
        if (ec) {
            try {
                throw std::runtime_error("Error thrown while performing async write: " + ec.message());
            } catch (...) {
                write_promise->set_exception(std::current_exception());
            }
            return;
        }
        write_promise->set_value();
    }
    

    一些问题。过去的一切都被忽略了。我知道您传递共享ptr的目的,但也许您应该将它们作为操作对象的一部分传递,以避免有这么多单独的共享ptr。

    抛出异常只是为了抓住它?嗯。我不确定。或许只是设定了一个新的例外:

    if (ec) {
        write_promise->set_exception(
                std::make_exception_ptr(std::system_error(ec, "async write failed")));
    } else {
        write_promise->set_value();
    }
    

    即便如此,现在还是有一个概念上的问题。你自由使用的方式 get() 不插手 主要的 这意味着任何连接中的任何错误都将中止所有操作。如果错误只是中止一个连接/会话/客户机,这将非常有用。在你的代码中,它们都是同义的 io_context thread )

    旁注:您将线程存储为一个成员,但总是分离它。这意味着从那时起这个成员就没用了。

    在这一点上,我暂停了复习,碰巧我的脑波告诉我这个问题。我锻炼的半生不熟的结果是 here . 请注意,您不能使用它,因为它实际上并不能解决问题。但它可能在其他方面有所帮助?