代码之家  ›  专栏  ›  技术社区  ›  Nick Strupat

批评我的并发队列[已关闭]

  •  2
  • Nick Strupat  · 技术社区  · 14 年前

    这是我编写的并发队列,我计划在编写的线程池中使用它。我想知道我是否可以改进性能。 atomic_counter 如果你好奇的话,贴在下面!

    #ifndef NS_CONCURRENT_QUEUE_HPP_INCLUDED
    #define NS_CONCURRENT_QUEUE_HPP_INCLUDED
    
    #include <ns/atomic_counter.hpp>
    #include <boost/noncopyable.hpp>
    #include <boost/smart_ptr/detail/spinlock.hpp>
    #include <cassert>
    #include <cstddef>
    
    namespace ns {
        template<typename T,
                 typename mutex_type = boost::detail::spinlock,
                 typename scoped_lock_type = typename mutex_type::scoped_lock>
        class concurrent_queue : boost::noncopyable {
            struct node {
                node * link;
                T const value;
                explicit node(T const & source) : link(0), value(source) { }
            };
            node * m_front;
            node * m_back;
            atomic_counter m_counter;
            mutex_type m_mutex;
        public:
            // types
            typedef T value_type;
    
            // construction
            concurrent_queue() : m_front(0), m_mutex() { }
            ~concurrent_queue() { clear(); }
    
            // capacity
            std::size_t size() const { return m_counter; }
            bool empty() const { return (m_counter == 0); }
    
            // modifiers
            void push(T const & source);
            bool try_pop(T & destination);
            void clear();
        };
    
        template<typename T, typename mutex_type, typename scoped_lock_type>
        void concurrent_queue<T, mutex_type, scoped_lock_type>::push(T const & source) {
            node * hold = new node(source);
            scoped_lock_type lock(m_mutex);
            if (empty())
                m_front = hold;
            else
                m_back->link = hold;
            m_back = hold;
            ++m_counter;
        }
    
        template<typename T, typename mutex_type, typename scoped_lock_type>
        bool concurrent_queue<T, mutex_type, scoped_lock_type>::try_pop(T & destination) {
            node const * hold;
            {
                scoped_lock_type lock(m_mutex);
                if (empty())
                    return false;
                hold = m_front;
                if (m_front == m_back)
                    m_front = m_back = 0;
                else
                    m_front = m_front->link;
                --m_counter;
            }
            destination = hold->value;
            delete hold;
            return true;
        }
    
        template<typename T, typename mutex_type, typename scoped_lock_type>
        void concurrent_queue<T, mutex_type, scoped_lock_type>::clear() {
            node * hold;
            {
                scoped_lock_type lock(m_mutex);
                hold = m_front;
                m_front = 0;
                m_back = 0;
                m_counter = 0;
            }
            if (hold == 0)
                return;
            node * it;
            while (hold != 0) {
                it = hold;
                hold = hold->link;
                delete it;
            }
        }
    }
    
    #endif
    

    原子计数器.hpp

    #ifndef NS_ATOMIC_COUNTER_HPP_INCLUDED
    #define NS_ATOMIC_COUNTER_HPP_INCLUDED
    
    #include <boost/interprocess/detail/atomic.hpp>
    #include <boost/noncopyable.hpp>
    
    namespace ns {
        class atomic_counter : boost::noncopyable {
            volatile boost::uint32_t m_count;
        public:
            explicit atomic_counter(boost::uint32_t value = 0) : m_count(value) { }
    
            operator boost::uint32_t() const {
                return boost::interprocess::detail::atomic_read32(const_cast<volatile boost::uint32_t *>(&m_count));
            }
    
            void operator=(boost::uint32_t value) {
                boost::interprocess::detail::atomic_write32(&m_count, value);
            }
    
            void operator++() {
                boost::interprocess::detail::atomic_inc32(&m_count);
            }
    
            void operator--() {
                boost::interprocess::detail::atomic_dec32(&m_count);
            }
        };
    }
    
    #endif
    
    2 回复  |  直到 13 年前
        1
  •  2
  •   Omnifarious    14 年前

    我认为在这种情况下,由于调用 new 对于每个新节点。这不仅仅是因为调用动态内存分配器很慢。这是因为调用它经常会带来大量并发开销,因为自由存储必须在多线程环境中保持一致。

    我将使用一个向量,当它太小而不能容纳队列时,您可以调整它的大小。我永远不会把它改小。

    我会安排前面和后面的值,这样向量就是一个环缓冲区。这需要在调整大小时移动元素。但这应该是一个相当罕见的事件,可以通过在构建时给出一个建议的向量大小在某种程度上加以缓解。

    或者,可以保留链接列表结构,但不销毁节点。只需不断地将其添加到一个空闲节点队列中。不幸的是,空闲节点的队列需要锁定才能正确地管理,我不确定您是否真的处于一个比一直调用delete和new更好的位置。

    您还可以通过向量得到更好的引用位置。但我不确定这将如何与必须在CPU之间来回切换的缓存线进行交互。

    有些人建议 ::std::deque 我不认为这是个坏主意,但我怀疑环缓冲向量是个更好的主意。

        2
  •  1
  •   Matthieu M.    14 年前

    Herb Sutter提出了一个无锁队列的实现,它肯定比您的更好:)

    主要思想是使用缓冲环,在队列运行期间完全放弃内存的动态分配。这意味着队列可以是满的(因此您可能需要等待放入元素),这在您的情况下可能是不可接受的。

    正如Omnifarious所指出的,最好不要使用链接列表(用于缓存位置),除非您为池分配。我会尝试使用 std::deque 作为一个后端,它对内存更友好,并保证不会有任何重新分配,只要您只弹出和推(在前面和后面),这是一个队列的正常情况。