我正在编写一个UDP服务器,它当前从UDP接收数据,并将其包装在一个对象中,然后将它们放入一个并发队列中。并发队列是此处提供的实现:
http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
工作线程池从队列中拉出数据进行处理。
static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;
现在我遇到的问题是,如果我只是编写一个函数来生成数据并将其插入队列,然后创建一些使用者线程来将其拉出,那么它就可以正常工作了。
但是当我添加基于UDP的生产者时,工作线程就不再收到队列中数据到达的通知。
我已经跟踪到了并发队列中push函数的末尾。
特别是行:the_condition_variable.notify_one();
所以问题与我编写网络代码的方式有关。
这是它的样子。
enum
{
MAX_LENGTH = 1500
};
class Msg
{
public:
Msg()
{
static int i = 0;
i_ = i++;
printf("Construct ObbsMsg: %d\n", i_);
}
~Msg()
{
printf("Destruct ObbsMsg: %d\n", i_);
}
const char* toString() { return data_; }
private:
friend class server;
udp::endpoint sender_endpoint_;
char data_[MAX_LENGTH];
int i_;
};
class server
{
public:
server::server(boost::asio::io_service& io_service)
: io_service_(io_service),
socket_(io_service, udp::endpoint(udp::v4(), PORT))
{
waitForNextMessage();
}
void server::waitForNextMessage()
{
printf("Waiting for next msg\n");
next_msg_.reset(new Msg());
socket_.async_receive_from(
boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
boost::bind(&server::handleReceiveFrom, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
{
if (!error && bytes_recvd > 0) {
printf("got data: %s. Adding to work queue\n", next_msg_->toString());
g_work_queue.push(next_msg_); // Add received msg to work queue
waitForNextMessage();
} else {
waitForNextMessage();
}
}
private:
boost::asio::io_service& io_service_;
udp::socket socket_;
udp::endpoint sender_endpoint_;
boost::shared_ptr<Msg> next_msg_;
}
int main(int argc, char* argv[])
{
try{
boost::asio::io_service io_service;
server s(io_service);
io_service.run();
catch(std::exception& e){
std::err << "Exception: " << e.what() << std::endl;
}
return 0;
}
现在我发现,如果handle\u receive\u from能够返回,那么在并发的\u队列返回中通知\u one()。我想这是因为我有一个递归循环。
那么,开始监听新数据的正确方法是什么呢?异步udp服务器的例子是否有缺陷,因为我是基于他们已经在做的事情。
这里我没有提到的是,我有一个类叫做processor。
处理器如下所示:
class processor
{
public:
processor::processor(int thread_pool_size) :
thread_pool_size_(thread_pool_size) { }
void start()
{
boost::thread_group threads;
for (std::size_t i = 0; i < thread_pool_size_; ++i){
threads.create_thread(boost::bind(&ObbsServer::worker, this));
}
}
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
private:
int thread_pool_size_;
};
现在看来,如果我自己提取worker函数并启动线程
从main。真管用!有人能解释为什么一个线程在类外会像我期望的那样工作,但在类内却有副作用吗?
EDIT2:现在情况变得更奇怪了
我提取了两个函数(完全相同)。
即
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
void consumer()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
另一方面,worker位于processor.cpp文件中。
void consumer();
void worker();
int main(int argc, char* argv[])
{
try {
boost::asio::io_service io_service;
server net(io_service);
//processor s(7);
boost::thread_group threads;
for (std::size_t i = 0; i < 7; ++i){
threads.create_thread(worker); // this doesn't work
// threads.create_thread(consumer); // THIS WORKS!?!?!?
}
// s.start();
printf("Server Started...\n");
boost::asio::io_service::work work(io_service);
io_service.run();
printf("exiting...\n");
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
为什么消费者能够接收排队的项目,而工人却不能。
它们是具有不同名称的相同实现。
这毫无意义。有什么想法吗?
下面是接收txt“Hello World”时的示例输出:
输出1:不工作。调用辅助函数或使用处理器类时。
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
输出2:在调用与worker函数相同的consumer函数时工作。
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Got msg: hello world <----- this is what I've been wanting to see!
Destruct ObbsMsg: 0
waiting for msg