代码之家  ›  专栏  ›  技术社区  ›  Codebrain

多线程同步列表<T>

  •  5
  • Codebrain  · 技术社区  · 16 年前

    我有一个要求,我需要存储一个简单的项目列表缓存。我使用的是List<T>出于这个目的,但我们现在改变了设计以适应多线程。

    系统的架构是由事件驱动的,因此读写操作很可能会发生冲突。由于绝大多数访问将是只读的,我认为ReaderWriterLockSlim可能是一个不错的选择。缓存只需要在读取时准确无误。

    public class SynchronisedList<T> : IList<T>
    {
        private ReaderWriterLockSlim cacheLock = new ReaderWriterLockSlim();
        private IList<T> innerCache = new List<T>();
    
        private U ReadReturn<U>(Func<U> function)
        {
            cacheLock.EnterReadLock();
            try { return function(); }
            finally { cacheLock.ExitReadLock(); }
        }
    
        private void Read(Action action)
        {
            cacheLock.EnterReadLock();
            try { action(); }
            finally { cacheLock.ExitReadLock(); }
        }
    
        private U WriteReturn<U>(Func<U> function)
        {
            cacheLock.EnterWriteLock();
            try { return function(); }
            finally { cacheLock.ExitWriteLock(); }
        }
    
        private void Write(Action action)
        {
            cacheLock.EnterWriteLock();
            try { action(); }
            finally { cacheLock.ExitWriteLock(); }
        }
    
        public T this[int index]
        {
            get { return ReadReturn(() => innerCache[index]); }
            set { Write(() => innerCache[index] = value); }
        }
    
        public int IndexOf(T item) { return ReadReturn(() => innerCache.IndexOf(item)); }
        public void Insert(int index, T item) { Write(() => innerCache.Insert(index, item)); }
        public void RemoveAt(int index) { Write(() => innerCache.RemoveAt(index)); }
        public void Add(T item) { Write(() => innerCache.Add(item)); }
        public void Clear() { Write(() => innerCache.Clear()); }
        public bool Contains(T item) { return ReadReturn(() => innerCache.Contains(item)); }
        public int Count { get { return ReadReturn(() => innerCache.Count); } }
        public bool IsReadOnly { get { return ReadReturn(() => innerCache.IsReadOnly); } }
        public void CopyTo(T[] array, int arrayIndex) { Read(() => innerCache.CopyTo(array, arrayIndex)); }
        public bool Remove(T item) { return WriteReturn(() => innerCache.Remove(item)); }
        public IEnumerator<T> GetEnumerator() { return ReadReturn(() => innerCache.GetEnumerator()); }
        IEnumerator IEnumerable.GetEnumerator() { return ReadReturn(() => (innerCache as IEnumerable).GetEnumerator()); }
    }
    
    internal class Program
    {
        private static SynchronisedList<int> list = new SynchronisedList<int>();
    
        private static void Main()
        {          
            for (int i = 0; i < 500000; i++)
            {
                var index = i;
                ThreadPool.QueueUserWorkItem((state) =>
                {
                    var threadNum = (int)state;
                    if (threadNum % 10 == 0)
                    {
                        list.Add(threadNum);
                    }
                    else
                    {
                        Console.WriteLine(list.Count);
                    }
                }, index);
            }
            Console.ReadKey();
        }
    }
    
    6 回复  |  直到 9 年前
        1
  •  7
  •   LukeH    16 年前

    你知道内置 SynchronizedCollection<T> 班级?

    它使用标准 Monitor -基于锁定而不是 ReaderWriterLockSlim

        2
  •  3
  •   Mats Fredriksson    16 年前

    这里有几个线程问题。

    1. 我认为GetEnumerator函数在这里暴露了一个线程问题。它们提供了对不受锁控制的innerCache的引用。

    它可能会崩溃的示例是,如果你有一个线程在列表上执行foreach,而另一个线程正在删除或插入元素。

    解决方案是复制列表,并在新克隆的列表上返回一个枚举器。如果列表很长,就会出现内存问题。


    我不认为这是一个完全同步的列表的好主意。编写一个功能有限的定制版本。

        3
  •  2
  •   Michael Petrotta user3140870    13 年前

    此类解决了所有问题,并使您的列表100%线程安全。

    客户端代码

    List<T> unsafeList = ... 
    var threadSafeList = new SyncronisedList(unsafeList);
    using (threadSafeList.EnterReadScope()) {
       // all your sequential read operations are thread-safe
    }
    using (threadSafeList.EnterWriteScope()) {
       // all sequential read/write operations are thread-safe
    }
    

    班级代码

    public class SyncronisedList<T> : IList<T> {
        private readonly ReaderWriterLockSlim _threadLock;
        private readonly IList<T> _internalList;
    
        public SyncronisedList() : this(new List<T>()) {
        }
    
        public SyncronisedList(IList<T> internalList) {
            _internalList = internalList;
            _threadLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        }
    
    
        private U Read<U>(Func<U> function) {
            using (EnterReadScope())
                return function();
        }
    
        private void Read(Action action) {
            using (EnterReadScope())
                action();
        }
    
        private U Write<U>(Func<U> function) {
            using (EnterWriteScope())
                return function();
        }
    
        private void Write(Action action) {
            using (EnterWriteScope())
                action();
        }
    
        public IDisposable EnterReadScope() {
            return new Scope<T>(this, false);
        }
    
        public IDisposable EnterWriteScope() {
            return new Scope<T>(this, true);
        }
    
        public T this[int index] {
            get { return Read(() => _internalList[index]); }
            set { Write(() => _internalList[index] = value); }
        }
    
        public int IndexOf(T item) { return Read(() => _internalList.IndexOf(item)); }
        public void Insert(int index, T item) { Write(() => _internalList.Insert(index, item)); }
        public void RemoveAt(int index) { Write(() => _internalList.RemoveAt(index)); }
        public void Add(T item) { Write(() => _internalList.Add(item)); }
        public void Clear() { Write(() => _internalList.Clear()); }
        public bool Contains(T item) { return Read(() => _internalList.Contains(item)); }
        public int Count { get { return Read(() => _internalList.Count); } }
        public bool IsReadOnly { get { return Read(() => _internalList.IsReadOnly); } }
        public void CopyTo(T[] array, int arrayIndex) { Read(() => _internalList.CopyTo(array, arrayIndex)); }
        public bool Remove(T item) { return Write(() => _internalList.Remove(item)); }
        public IEnumerator<T> GetEnumerator() { return Read(() => _internalList.GetEnumerator()); }
        IEnumerator IEnumerable.GetEnumerator() { return Read(() => (_internalList as IEnumerable).GetEnumerator()); }
    
    
        private class Scope<U> : IDisposable {
            private readonly SyncronisedList<U> _owner;
            private readonly bool _write;
    
            internal Scope(SyncronisedList<U> owner, bool write) {
                _owner = owner;
                _write = write;
                if (_write)
                    _owner._threadLock.EnterWriteLock();
                else
                    _owner._threadLock.EnterReadLock();
            }
    
            public void Dispose() {
                if (_write)
                    _owner._threadLock.ExitWriteLock();
                else
                    _owner._threadLock.ExitReadLock();
            }
    
        }
    
    }
    
        4
  •  1
  •   Guillaume    16 年前

    您的实现还可以,但您仍然需要关心同步问题:

    给定一个列表{“foo”}

    int index = list.IndexOf("foo");
    Console.WriteLine(list[index]);
    

    现在,如果另一个线程做一个列表怎么办。在这两行之间清除()? 您的读写器锁应该是公开访问的,以处理这些情况。

        5
  •  0
  •   Ian Ringrose    16 年前

    非常难。

    我想你应该看看 您的应用程序需要

    很多问题,这意味着什么 另一个线程正在更改列表?

        6
  •  0
  •   supercat    12 年前

    列表中的某些操作相对于其他操作来说,不能是有意义的线程安全的。例如,一个线程即将写入元素5,另一个线程删除元素3,使得曾经的元素5被移动到4,曾经的元素6被移动到5,第一个线程最终将覆盖一个与预期不同的元素。

    IList<T> 数组也是如此。

    IList<T> :

    该列表将支持添加项目,但不支持插入或删除项目。自从 Add 方法将返回新创建项的索引,无论哪个线程添加了项,都将对其拥有独占控制权。此外 AccessItem 基元将允许两个或多个线程以他们认为合适的任何方式在列表项上使用原子基元(如果 T

    这种类型不应该建立在 List<T> ,而是在一个 T[32][] arr .从设置开始 arr[0] T[16] ;如果添加了第17项,则初始化 arr[1] T[32] ;如果已满,则初始化 arr[2] 到一个 T[64] 等等。即使列表扩展,任何给定的索引也将始终由相同的数组元素表示,因此对现有列表元素的访问不会受到列表扩展的影响。

    仅追加列表可能是一种有用的线程安全类型,但我还不知道在任何版本的.net中有任何此类类型。