代码之家  ›  专栏  ›  技术社区  ›  hookenz

全局静态变量的副作用

  •  0
  • hookenz  · 技术社区  · 14 年前

    我正在编写一个UDP服务器,它当前从UDP接收数据,并将其包装在一个对象中,然后将它们放入一个并发队列中。并发队列是此处提供的实现: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

    工作线程池从队列中拉出数据进行处理。

    static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;
    

    现在我遇到的问题是,如果我只是编写一个函数来生成数据并将其插入队列,然后创建一些使用者线程来将其拉出,那么它就可以正常工作了。 但是当我添加基于UDP的生产者时,工作线程就不再收到队列中数据到达的通知。

    我已经跟踪到了并发队列中push函数的末尾。 特别是行:the_condition_variable.notify_one();

    所以问题与我编写网络代码的方式有关。

    这是它的样子。

    enum
    {
        MAX_LENGTH = 1500
    };
    
    
    class Msg
    {
      public:
        Msg()
        {
           static int i = 0;
           i_ = i++;
           printf("Construct ObbsMsg: %d\n", i_);
        }
    
        ~Msg()
        {
           printf("Destruct ObbsMsg: %d\n", i_);
        }
    
        const char* toString() { return data_; }
    
      private:
        friend class server;
    
        udp::endpoint sender_endpoint_;
        char data_[MAX_LENGTH];
        int i_;
    };
    
    class server
    {
    public:
      server::server(boost::asio::io_service& io_service)
        : io_service_(io_service),
          socket_(io_service, udp::endpoint(udp::v4(), PORT))
      {
        waitForNextMessage();
      }  
    
      void server::waitForNextMessage()
      {
        printf("Waiting for next msg\n");
    
        next_msg_.reset(new Msg());
    
        socket_.async_receive_from(
            boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
            boost::bind(&server::handleReceiveFrom, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
      }
    
      void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
      {
        if (!error && bytes_recvd > 0) {
            printf("got data: %s. Adding to work queue\n", next_msg_->toString());
            g_work_queue.push(next_msg_); // Add received msg to work queue
            waitForNextMessage();
        } else {
            waitForNextMessage();
        } 
      }
    
    private:
      boost::asio::io_service& io_service_;
      udp::socket socket_;
    
      udp::endpoint sender_endpoint_;
      boost::shared_ptr<Msg> next_msg_;
    }
    
    int main(int argc, char* argv[])
    {
        try{
          boost::asio::io_service io_service;
          server s(io_service);
          io_service.run();
        catch(std::exception& e){
          std::err << "Exception: " << e.what() << std::endl;
        }
        return 0;
    }
    

    现在我发现,如果handle\u receive\u from能够返回,那么在并发的\u队列返回中通知\u one()。我想这是因为我有一个递归循环。 那么,开始监听新数据的正确方法是什么呢?异步udp服务器的例子是否有缺陷,因为我是基于他们已经在做的事情。

    这里我没有提到的是,我有一个类叫做processor。 处理器如下所示:

    class processor
    {
    public:
       processor::processor(int thread_pool_size) :
          thread_pool_size_(thread_pool_size) { }
    
      void start()
      {
        boost::thread_group threads;
        for (std::size_t i = 0; i < thread_pool_size_; ++i){
            threads.create_thread(boost::bind(&ObbsServer::worker, this));
        }
      }
    
      void worker()
      {
        while (true){
            boost::shared_ptr<ObbsMsg> msg;
            g_work_queue.wait_and_pop(msg);
            printf("Got msg: %s\n", msg->toString());
        }
      }
    
    private:
      int thread_pool_size_;
    };
    

    现在看来,如果我自己提取worker函数并启动线程 从main。真管用!有人能解释为什么一个线程在类外会像我期望的那样工作,但在类内却有副作用吗?

    EDIT2:现在情况变得更奇怪了

    我提取了两个函数(完全相同)。

    void worker()
    {
        while (true){
            boost::shared_ptr<ObbsMsg> msg;
            printf("waiting for msg\n");
            g_work_queue.wait_and_pop(msg);
            printf("Got msg: %s\n", msg->toString());
        }
    }
    
    void consumer()
    {
        while (true){
            boost::shared_ptr<ObbsMsg> msg;
            printf("waiting for msg\n");
            g_work_queue.wait_and_pop(msg);
            printf("Got msg: %s\n", msg->toString());
        }
    }
    

    另一方面,worker位于processor.cpp文件中。

    void consumer();
    void worker();
    
    int main(int argc, char* argv[])
    {
        try {
            boost::asio::io_service io_service;
            server net(io_service);
            //processor s(7);
    
            boost::thread_group threads;
            for (std::size_t i = 0; i < 7; ++i){
                threads.create_thread(worker); // this doesn't work
                // threads.create_thread(consumer); // THIS WORKS!?!?!?
            }
    
    //        s.start();
    
            printf("Server Started...\n");
            boost::asio::io_service::work work(io_service);
            io_service.run();
    
            printf("exiting...\n");
        } catch (std::exception& e) {
            std::cerr << "Exception: " << e.what() << "\n";
        }
    
        return 0;
    }
    

    为什么消费者能够接收排队的项目,而工人却不能。 它们是具有不同名称的相同实现。

    这毫无意义。有什么想法吗?

    下面是接收txt“Hello World”时的示例输出:

    输出1:不工作。调用辅助函数或使用处理器类时。

    Construct ObbsMsg: 0
    waiting for msg
    waiting for msg
    waiting for msg
    waiting for msg
    waiting for msg
    waiting for msg
    Server Started...
    waiting for msg
    got data: hello world. Adding to work queue
    Construct ObbsMsg: 1
    

    输出2:在调用与worker函数相同的consumer函数时工作。

    Construct ObbsMsg: 0
    waiting for msg
    waiting for msg
    waiting for msg
    waiting for msg
    waiting for msg
    waiting for msg
    Server Started...
    waiting for msg
    got data: hello world. Adding to work queue
    Construct ObbsMsg: 1
    Got msg: hello world <----- this is what I've been wanting to see!
    Destruct ObbsMsg: 0
    waiting for msg
    
    1 回复  |  直到 14 年前
        1
  •  1
  •   hookenz    14 年前

    回答我自己的问题。

    在头文件中声明为:static concurrent\u queue<boost::共享\u ptr>工作队列;

    看来声明它是静态的并不是我想做的。 显然,这为每个编译的.o文件创建了一个单独的队列对象,显然

    与消费者和生产者在同一个文件中,它的工作。

    所以我重新声明了工作队列。

    -- workqueue.h --
    extern concurrent_queue< boost::shared_ptr<Msg> > g_work_queue;
    
    -- workqueue.cpp --
    #include "workqueue.h"
    concurrent_queue< boost::shared_ptr<Msg> > g_work_queue;
    

    这样做可以解决问题。