代码之家  ›  专栏  ›  技术社区  ›  bobeff

这个boost::asio和boost::协程使用模式有什么问题?

  •  6
  • bobeff  · 技术社区  · 9 年前

    在里面 this 我描述的问题 助推::asio boost::协程 导致应用程序随机崩溃的使用模式,我发布了代码摘录 瓦尔格伦 GDB公司 输出

    为了进一步调查问题,我创建了更小的 概念证明 应用相同模式的应用程序。我看到,我在这里发布的小程序也出现了同样的问题。

    代码启动几个线程,并创建一个带有几个虚拟连接(用户提供的数字)的连接池。其他参数是无符号整数,它扮演着假请求的角色。的虚拟实现 sendRequest 函数只启动异步计时器等待等于输入数的秒数 屈服,屈服 从函数。

    有人能看到这个代码的问题吗?他能提出一些解决方案吗?

    #include "asiocoroutineutils.h"
    #include "concurrentqueue.h"
    
    #include <iostream>
    #include <thread>
    
    #include <boost/lexical_cast.hpp>
    
    using namespace std;
    using namespace boost;
    using namespace utils;
    
    #define id this_thread::get_id() << ": "
    
    // ---------------------------------------------------------------------------
    
    /*!
     * \brief This is a fake Connection class
     */
    class Connection
    {
    public:
        Connection(unsigned connectionId)
            : _id(connectionId)
        {
        }
    
        unsigned getId() const
        {
            return _id;
        }
    
        void sendRequest(asio::io_service& ioService,
                         unsigned seconds,
                         AsioCoroutineJoinerProxy,
                         asio::yield_context yield)
        {
            cout << id << "Connection " << getId()
                 << " Start sending: " << seconds << endl;
    
            // waiting on this timer is palceholder for any asynchronous operation
            asio::steady_timer timer(ioService);
            timer.expires_from_now(chrono::seconds(seconds));
            coroutineAsyncWait(timer, yield);
    
            cout << id << "Connection " << getId()
                 << " Received response: " << seconds << endl;
        }
    
    private:
        unsigned _id;
    };
    
    typedef std::unique_ptr<Connection> ConnectionPtr;
    typedef std::shared_ptr<asio::steady_timer> TimerPtr;
    
    // ---------------------------------------------------------------------------
    
    class ConnectionPool
    {
    public:
        ConnectionPool(size_t connectionsCount)
        {
            for(size_t i = 0; i < connectionsCount; ++i)
            {
                cout << "Creating connection: " << i << endl;
                _connections.emplace_back(new Connection(i));
            }
        }
    
        ConnectionPtr getConnection(TimerPtr timer,
                                    asio::yield_context& yield)
        {
            lock_guard<mutex> lock(_mutex);
    
            while(_connections.empty())
            {
                cout << id << "There is no free connection." << endl;
    
                _timers.emplace_back(timer);
                timer->expires_from_now(
                    asio::steady_timer::clock_type::duration::max());
    
                _mutex.unlock();
                coroutineAsyncWait(*timer, yield);
                _mutex.lock();
    
                cout << id << "Connection was freed." << endl;
            }
    
            cout << id << "Getting connection: "
                 << _connections.front()->getId() << endl;
    
            ConnectionPtr connection = std::move(_connections.front());
            _connections.pop_front();
            return connection;
        }
    
        void addConnection(ConnectionPtr connection)
        {
            lock_guard<mutex> lock(_mutex);
    
            cout << id << "Returning connection " << connection->getId()
                 << " to the pool." << endl;
    
            _connections.emplace_back(std::move(connection));
    
            if(_timers.empty())
                return;
    
            auto timer = _timers.back();
            _timers.pop_back();
            auto& ioService = timer->get_io_service();
    
            ioService.post([timer]()
            {
                cout << id << "Wake up waiting getConnection." << endl;
                timer->cancel();
            });
        }
    
    private:
        mutex _mutex;
        deque<ConnectionPtr> _connections;
        deque<TimerPtr> _timers;
    };
    
    typedef unique_ptr<ConnectionPool> ConnectionPoolPtr;
    
    // ---------------------------------------------------------------------------
    
    class ScopedConnection
    {
    public:
        ScopedConnection(ConnectionPool& pool,
                         asio::io_service& ioService,
                         asio::yield_context& yield)
            : _pool(pool)
        {
            auto timer = make_shared<asio::steady_timer>(ioService);
            _connection = _pool.getConnection(timer, yield);
        }
    
        Connection& get()
        {
            return *_connection;
        }
    
        ~ScopedConnection()
        {
            _pool.addConnection(std::move(_connection));
        }
    
    private:
        ConnectionPool& _pool;
        ConnectionPtr _connection;
    };
    
    // ---------------------------------------------------------------------------
    
    void sendRequest(asio::io_service& ioService,
                     ConnectionPool& pool,
                     unsigned seconds,
                     asio::yield_context yield)
    {
        cout << id << "Constructing request ..." << endl;
    
        AsioCoroutineJoiner joiner(ioService);
    
        ScopedConnection connection(pool, ioService, yield);
    
        asio::spawn(ioService, bind(&Connection::sendRequest,
                                    connection.get(),
                                    std::ref(ioService),
                                    seconds,
                                    AsioCoroutineJoinerProxy(joiner),
                                    placeholders::_1));
    
        joiner.join(yield);
    
        cout << id << "Processing response ..." << endl;
    }
    
    // ---------------------------------------------------------------------------
    
    void threadFunc(ConnectionPool& pool,
                    ConcurrentQueue<unsigned>& requests)
    {
        try
        {
            asio::io_service ioService;
    
            while(true)
            {
                unsigned request;
                if(!requests.tryPop(request))
                    break;
    
                cout << id << "Scheduling request: " << request << endl;
    
                asio::spawn(ioService, bind(sendRequest,
                                            std::ref(ioService),
                                            std::ref(pool),
                                            request,
                                            placeholders::_1));
            }
    
            ioService.run();
        }
        catch(const std::exception& e)
        {
            cerr << id << "Error: " << e.what() << endl;
        }
    }
    
    // ---------------------------------------------------------------------------
    
    int main(int argc, char* argv[])
    {
        if(argc < 3)
        {
            cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..."
                 << endl;
            return -1;
        }
    
        try
        {
            auto poolSize = lexical_cast<size_t>(argv[1]);
            auto threadsCount = lexical_cast<size_t>(argv[2]);
    
            ConcurrentQueue<unsigned> requests;
            for(int i = 3; i < argc; ++i)
            {
                auto request = lexical_cast<unsigned>(argv[i]);
                requests.tryPush(request);
            }
    
            ConnectionPoolPtr pool(new ConnectionPool(poolSize));
    
            vector<unique_ptr<thread>> threads;
            for(size_t i = 0; i < threadsCount; ++i)
            {
                threads.emplace_back(
                    new thread(threadFunc, std::ref(*pool), std::ref(requests)));
            }
    
            for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
        }
        catch(const std::exception& e)
        {
            cerr << "Error: " << e.what() << endl;
        }
    
        return 0;
    }
    

    下面是上面代码使用的一些助手实用程序:

    #pragma once
    
    #include <boost/asio/steady_timer.hpp>
    #include <boost/asio/spawn.hpp>
    
    namespace utils
    {
    
    inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                                   boost::asio::yield_context& yield)
    {
        boost::system::error_code ec;
        timer.async_wait(yield[ec]);
        if(ec && ec != boost::asio::error::operation_aborted)
            throw std::runtime_error(ec.message());
    }
    
    class AsioCoroutineJoiner
    {
    public:
        explicit AsioCoroutineJoiner(boost::asio::io_service& io)
            : _timer(io), _count(0) {}
    
        void join(boost::asio::yield_context yield)
        {
            assert(_count > 0);
            _timer.expires_from_now(
                boost::asio::steady_timer::clock_type::duration::max());
            coroutineAsyncWait(_timer, yield);
        }
    
        void inc()
        {
            ++_count;
        }
    
        void dec()
        {
            assert(_count > 0);
            --_count;
            if(0 == _count)
                _timer.cancel();
        }
    
    private:
        boost::asio::steady_timer _timer;
        std::size_t _count;
    
    }; // AsioCoroutineJoiner class
    
    class AsioCoroutineJoinerProxy
    {
    public:
        AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner)
            : _joiner(joiner)
        {
            _joiner.inc();
        }
    
        AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy)
            : _joiner(joinerProxy._joiner)
        {
            _joiner.inc();
        }
    
        ~AsioCoroutineJoinerProxy()
        {
            _joiner.dec();
        }
    
    private:
        AsioCoroutineJoiner& _joiner;
    
    }; // AsioCoroutineJoinerProxy class
    
    } // utils namespace
    

    为了代码的完整性,最后缺失的部分是 并发队列 班粘贴在这里太长了,但如果你想,你可以找到它 here .

    应用程序的示例用法如下:

    ./连接池测试3 3 5 7 8 1 0 9 2 4 3 6

    其中第一数字3是假连接计数,第二数字3是使用的线程数。后面的数字是假的请求。

    的输出 瓦尔格伦 GDB公司 与上述相同 question .

    使用的版本 促进 1.57 。编译器是 一般条款4.8.3 。操作系统为 CentOS Linux版本7.1.1503

    1 回复  |  直到 7 年前
        1
  •  1
  •   Community CDub    7 年前

    似乎所有的 瓦尔格伦 错误的原因是 增压器使用 宏未定义为 坦纳·桑斯伯里 与相关的评论点 this 问题除此之外,程序似乎是正确的。