是的,你正在共享流下的套接字,没有同步
旁注,与布尔标志相同,可以通过更改以下内容轻松“修复”:
std::atomic_bool want_quit;
std::atomic_bool want_reset;
如何解决
老实说,我认为没有好的解决办法。您自己也说过:这些操作是异步的,所以如果您尝试同步执行它们,您会遇到麻烦。
你可以试着想想黑客。如果我们基于相同的底层套接字(filedescriptor)创建一个单独的流对象,会怎么样。不会的
非常
如此简单的溪流不是Asio的一部分。
但我们可以使用Boost Iostreams破解一个:
#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
// .... later:
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
con.run(stream, hack_ostream);
实际上,这在没有竞争的情况下运行(在同一套接字上同时读写
are
fine
,只要不共享包装它们的非线程安全Asio对象)。
我的建议是:
不要那样做
. 这是一个难题。您正在使事情复杂化,显然是为了避免使用异步代码。我会咬紧牙关的。
将IO机制从服务逻辑中分离出来并不需要太多的工作。您最终将摆脱随机限制(您可以考虑处理多个客户端,您可以不使用任何线程
完全
等等)。
如果您想了解一些中间立场,请查看堆叠式协同路由(
http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html
)
表册
仅供参考
注:我进行了重构,以消除对指针的需要。您没有转移所有权,所以可以使用引用。如果您不知道如何将引用传递给
bind
/
std::thread
构造函数,诀窍在于
std::ref
你会看到的。
[对于压力测试,我已经大大减少了延迟。]
Live On Coliru
#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream &in, std::ostream &out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream &in);
int runRx(std::ostream &out);
std::atomic_bool want_quit;
std::atomic_bool want_reset;
};
int Console::runTx(std::istream &in) {
static const std::array<std::string, 3> cmds{
{"quit", "one", "two"},
};
std::string command;
while (!want_quit && !want_reset && in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]\n";
} else {
std::cout << command << '\n';
}
}
return 0;
}
int Console::runRx(std::ostream &out) {
for (int i=0; !(want_reset || want_quit); ++i) {
out << "This is message number " << i << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(1));
out.flush();
}
return 0;
}
int Console::run(std::istream &in, std::ostream &out) {
want_reset = false;
std::thread t1{&Console::runRx, this, std::ref(out)};
int status = runTx(in);
t1.join();
return status;
}
#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
int main()
{
Console con;
boost::asio::io_service ios;
// IPv4 address, port 5555
boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
boost::asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
{
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
con.run(stream, hack_ostream);
}
if (con.wantReset()) {
std::cout << "resetting\n";
}
}
}
测试:
netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2
和
commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)
无限期运行,偶尔重置连接(当发送命令“三”时)。