代码之家  ›  专栏  ›  技术社区  ›  cdiggins

此无锁.NET队列线程安全吗?

  •  6
  • cdiggins  · 技术社区  · 15 年前

    我的问题是,下面包含的类是否是单读单写队列类的线程安全类?这种队列称为无锁队列,即使它在队列已满时会阻塞。数据结构的灵感来自 Marc Gravell's implementation of a blocking queue 这里是StackOverflow。

    类似的数据结构在一个示例中描述 article at DDJ by Herb Sutter ,除了实现是在C++中。另一个区别是我使用的是普通的链表,我使用的是数组的链表。

    除了包含一段代码外,我还使用了一个许可的开源许可证(MIT许可证1.0)对整个过程进行了注释,以防有人发现它有用,并希望使用它(按原样或修改)。

    这与有关堆栈溢出的其他问题有关,即如何创建阻塞并发队列(请参阅 Creating a blockinq Queue in .NET Thread-safe blocking queue implementation in .NET ).

    代码如下:

    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Diagnostics;
    
    namespace CollectionSandbox
    {
        /// This is a single reader / singler writer buffered queue implemented
        /// with (almost) no locks. This implementation will block only if filled 
        /// up. The implementation is a linked-list of arrays.
        /// It was inspired by the desire to create a non-blocking version 
        /// of the blocking queue implementation in C# by Marc Gravell
        /// https://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
        class SimpleSharedQueue<T> : IStreamBuffer<T>
        {
            /// Used to signal things are no longer full
            ManualResetEvent canWrite = new ManualResetEvent(true);
    
            /// This is the size of a buffer 
            const int BUFFER_SIZE = 512;
    
            /// This is the maximum number of nodes. 
            const int MAX_NODE_COUNT = 100;
    
            /// This marks the location to write new data to.
            Cursor adder;
    
            /// This marks the location to read new data from.
            Cursor remover;
    
            /// Indicates that no more data is going to be written to the node.
            public bool completed = false;
    
            /// A node is an array of data items, a pointer to the next item,
            /// and in index of the number of occupied items 
            class Node
            {
                /// Where the data is stored.
                public T[] data = new T[BUFFER_SIZE];
    
                /// The number of data items currently stored in the node.
                public Node next;
    
                /// The number of data items currently stored in the node.
                public int count;
    
                /// Default constructor, only used for first node.
                public Node()
                {
                    count = 0;
                }
    
                /// Only ever called by the writer to add new Nodes to the scene
                public Node(T x, Node prev)
                {
                    data[0] = x;
                    count = 1;
    
                    // The previous node has to be safely updated to point to this node.
                    // A reader could looking at the point, while we set it, so this should be 
                    // atomic.
                    Interlocked.Exchange(ref prev.next, this);
                }
            }
    
            /// This is used to point to a location within a single node, and can perform 
            /// reads or writers. One cursor will only ever read, and another cursor will only
            /// ever write.
            class Cursor
            {
                /// Points to the parent Queue
                public SimpleSharedQueue<T> q;
    
                /// The current node
                public Node node;
    
                /// For a writer, this points to the position that the next item will be written to.
                /// For a reader, this points to the position that the next item will be read from.
                public int current = 0;
    
                /// Creates a new cursor, pointing to the node
                public Cursor(SimpleSharedQueue<T> q, Node node)
                {
                    this.q = q;
                    this.node = node;
                }
    
                /// Used to push more data onto the queue
                public void Write(T x)
                {
                    Trace.Assert(current == node.count);
    
                    // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                    if (current == BUFFER_SIZE)
                    {
                        // Check if the queue is full
                        if (q.IsFull())
                        {
                            // Signal the canWrite event to false
                            q.canWrite.Reset();
    
                            // Wait until the canWrite event is signaled 
                            q.canWrite.WaitOne();
                        }
    
                        // create a new node
                        node = new Node(x, node);
                        current = 1;
                    }
                    else
                    {
                        // If the implementation is correct then the reader will never try to access this 
                        // array location while we set it. This is because of the invariant that 
                        // if reader and writer are at the same node: 
                        //    reader.current < node.count 
                        // and 
                        //    writer.current = node.count 
                        node.data[current++] = x;
    
                        // We have to use interlocked, to assure that we incremeent the count 
                        // atomicalluy, because the reader could be reading it.
                        Interlocked.Increment(ref node.count);
                    }
                }
    
                /// Pulls data from the queue, returns false only if 
                /// there 
                public bool Read(ref T x)
                {
                    while (true)
                    {
                        if (current < node.count)
                        {
                            x = node.data[current++];
                            return true;
                        }
                        else if ((current == BUFFER_SIZE) && (node.next != null))
                        {
                            // Move the current node to the next one.
                            // We know it is safe to do so.
                            // The old node will have no more references to it it 
                            // and will be deleted by the garbage collector.
                            node = node.next;
    
                            // If there is a writer thread waiting on the Queue,
                            // then release it.
                            // Conceptually there is a "if (q.IsFull)", but we can't place it 
                            // because that would lead to a Race condition.
                            q.canWrite.Set();
    
                            // point to the first spot                
                            current = 0;
    
                            // One of the invariants is that every node created after the first,
                            // will have at least one item. So the following call is safe
                            x = node.data[current++];
                            return true;
                        }
    
                        // If we get here, we have read the most recently added data.
                        // We then check to see if the writer has finished producing data.
                        if (q.completed)
                            return false;
    
                        // If we get here there is no data waiting, and no flagging of the completed thread.
                        // Wait a millisecond. The system will also context switch. 
                        // This will allow the writing thread some additional resources to pump out 
                        // more data (especially if it iself is multithreaded)
                        Thread.Sleep(1);
                    }
                }
            }
    
            /// Returns the number of nodes currently used.
            private int NodeCount
            {
                get
                {
                    int result = 0;
                    Node cur = null;
                    Interlocked.Exchange<Node>(ref cur, remover.node);
    
                    // Counts all nodes from the remover to the adder
                    // Not efficient, but this is not called often. 
                    while (cur != null)
                    {
                        ++result;
                        Interlocked.Exchange<Node>(ref cur, cur.next);
                    }
                    return result;
                }
            }
    
            /// Construct the queue.
            public SimpleSharedQueue()
            {
                Node root = new Node();
                adder = new Cursor(this, root);
                remover = new Cursor(this, root);
            }
    
            /// Indicate to the reader that no more data is going to be written.
            public void MarkCompleted()
            {
                completed = true;
            }
    
            /// Read the next piece of data. Returns false if there is no more data. 
            public bool Read(ref T x)
            {
                return remover.Read(ref x);
            }
    
            /// Writes more data.
            public void Write(T x)
            {
                adder.Write(x);
            }
    
            /// Tells us if there are too many nodes, and can't add anymore.
            private bool IsFull()
            {
                return NodeCount == MAX_NODE_COUNT;  
            }
        }
    }
    
    6 回复  |  直到 7 年前
        1
  •  7
  •   GregC Benjamin Baumann    15 年前

    Microsoft Research CHESS应该被证明是测试您的实现的好工具。

        2
  •  4
  •   Henk Holterman    15 年前

    Sleep() 使无锁方法完全无用。面对无锁设计的复杂性的唯一原因是需要绝对速度和避免信号量的成本。睡眠的使用完全违背了这个目的。

        3
  •  3
  •   Languard    15 年前

    考虑到我找不到Interlocated.Exchange读取或写入块的任何引用,我认为没有。我还想问,为什么你想去无锁,因为很少有足够的好处来应对它的复杂性。

    微软在2009年GDC上做了一个很好的演示,你可以看到幻灯片 here

        4
  •  2
  •   deepsnore    15 年前

    注意双重检查-单锁模式(如上面引用的链接: http://www.yoda.arachsys.com/csharp/singleton.html )

    Andrei Alexandrescu《现代C++设计》逐字引述

      非常有经验的多线程程序员都知道,即使是双重检查的锁定模式,虽然在纸面上是正确的,但在实践中并不总是正确的。在某些对称的多处理器环境中(即所谓的松弛内存模型),写操作以突发方式提交到主内存,而不是逐个提交。突发事件以地址的递增顺序发生,而不是按时间顺序。由于这种写操作的重新排列,一个处理器一次看到的内存可能看起来像是另一个处理器没有按照正确的顺序执行操作。具体地说,处理器对pInstance_uu的赋值可能发生在Singleton对象完全初始化之前!因此,不幸的是,双重检查锁定模式对于此类系统是有缺陷的
        5
  •  1
  •   Andrew Matthews    15 年前

    两个线程进入 cursor.Write . 第一条线一直延伸到第二条线 node = new Node(x, node); 在真正的一半 if (current == BUFFER_SIZE) 声明(但我们也假设 current == BUFFER_SIZE )所以当1被添加到 current 然后另一个线程将沿着if语句的另一条路径进入。现在,假设线程1丢失了它的时间片,线程2得到了它,并继续输入if语句,错误地认为条件仍然存在。它应该进入另一条路径。

    我也没有运行过这段代码,所以我不确定我的假设在这段代码中是否可行,但它们是否可行(例如,输入cursor.Write时从多个线程执行) 当前==缓冲区大小 ),则很可能会出现并发错误。

        6
  •  1
  •   csharptest.net    15 年前

                    node.data[current++] = x;
    
                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
    

    node.data[]的新值已提交到此内存位置,这意味着什么?它不存储在易失性内存地址中,因此如果我理解正确,可以缓存它?这难道不可能导致“肮脏”阅读吗?也许还有其他地方也是如此,但这一个一目了然。

    第二,包含以下内容的多线程代码:

    Thread.Sleep(int);
    

    ... 这从来都不是好兆头。如果它是必需的,那么代码注定会失败,如果它不是必需的,那就是浪费。我真希望他们能完全删除这个API。要意识到这是一个至少要等待那么长时间的请求。由于上下文切换的开销,您几乎肯定要等待更长、更长的时间。

    第三,我完全不理解联锁API在这里的用法。也许我累了,只是没抓住重点;但我无法在两个线程上都找到潜在的线程冲突&写入同一个变量?似乎我能找到的联锁交换的唯一用途是修改node.data[]的内容以修复上面的#1。

    最后,执行似乎有些过于复杂。我是否遗漏了整个游标/节点的要点,或者它基本上是在做与这个类相同的事情(注意:我还没有尝试过,我也不认为这是线程安全的,只是试图把我认为你在做的事情归结起来。)

    class ReaderWriterQueue<T>
    {
        readonly AutoResetEvent _readComplete;
        readonly T[] _buffer;
        readonly int _maxBuffer;
        int _readerPos, _writerPos;
    
        public ReaderWriterQueue(int maxBuffer)
        {
            _readComplete = new AutoResetEvent(true);
            _maxBuffer = maxBuffer;
            _buffer = new T[_maxBuffer];
            _readerPos = _writerPos = 0;
        }
    
        public int Next(int current) { return ++current == _maxBuffer ? 0 : current; }
    
        public bool Read(ref T item)
        {
            if (_readerPos != _writerPos)
            {
                item = _buffer[_readerPos];
                _readerPos = Next(_readerPos);
                return true;
            }
            else
                return false;
        }
    
        public void Write(T item)
        {
            int next = Next(_writerPos);
    
            while (next == _readerPos)
                _readComplete.WaitOne();
    
            _buffer[next] = item;
            _writerPos = next;
        }
    }
    

    我必须承认一件事,我鄙视穿线。我见过最好的开发人员在这方面失败。这篇文章给出了一个很好的例子,说明正确使用线程有多么困难: http://www.yoda.arachsys.com/csharp/singleton.html