代码之家  ›  专栏  ›  技术社区  ›  Tim Mahy

这个代码可以通过使用反应式框架来重构吗?

  •  0
  • Tim Mahy  · 技术社区  · 14 年前

    class Program
    {
        static void Main(string[] args)
        {
            var enumerator = new QueuedEnumerator<long>();
            var listenerWaitHandle = Listener(enumerator);
    
            Publisher(enumerator);
            listenerWaitHandle.WaitOne();
        }
    
        private static AutoResetEvent Listener(IEnumerator<long> items)
        {
            var @event = new AutoResetEvent(false);
            ThreadPool.QueueUserWorkItem((o) =>
            {
                while (items.MoveNext())
                {
                    Console.WriteLine("Received : " + items.Current);
                    Thread.Sleep(2 * 1000);
                }
                (o as AutoResetEvent).Set();
            }, @event);
            return @event;
        }
    
        private static void Publisher(QueuedEnumerator<long> enumerator)
        {
            for (int i = 0; i < 10; i++)
            {
                enumerator.Set(i);
                Console.WriteLine("Sended : " + i);
                Thread.Sleep(1 * 1000);
            }
            enumerator.Finish();
        }
    
        class QueuedEnumerator<T> : IEnumerator<T>
        {
            private Queue _internal = Queue.Synchronized(new Queue());
            private T _current;
            private bool _finished;
            private AutoResetEvent _setted = new AutoResetEvent(false);
    
            public void Finish()
            {
                _finished = true;
                _setted.Set();
            }
    
            public void Set(T item)
            {
                if (_internal.Count > 3)
                {
                    Console.WriteLine("I'm full, give the listener some slack !");
                    Thread.Sleep(3 * 1000);
                    Set(item);
                }
                else
                {
                    _internal.Enqueue(item);
                    _setted.Set();
                }
            }
    
            public T Current
            {
                get { return _current; }
            }
    
            public void Dispose()
            {
            }
    
    
            object System.Collections.IEnumerator.Current
            {
                get { return _current; }
            }
    
            public bool MoveNext()
            {
                if (_finished && _internal.Count == 0)
                    return false;
                else if (_internal.Count > 0)
                {
                    _current = (T)_internal.Dequeue();
                    return true;
                }
                else
                {
                    _setted.WaitOne();
                    return MoveNext();
                }
            }
    
            public void Reset()
            {
            }
        }
    }
    

    线程可以一次提供一个实例并调用Set方法 B线程希望接收一系列实例(由线程a提供)

    所以从字面上来说转换一个Add(item),Add(item)。。不同线程之间的IEnumerable

    1 回复  |  直到 13 年前
        1
  •  1
  •   Ana Betts    14 年前

    当然-此代码可能不是 最好的 很好,但我的初衷是:

    Subject<Item> toAddObservable;
    ListObservable<Item> buffer;
    
    void Init()
    {
        // Subjects are an IObservable we can trigger by-hand, they're the 
        // mutable variables of Rx
        toAddObservable = new Subject(Scheduler.TaskPool);
    
        // ListObservable will hold all our items until someone asks for them
        // It will yield exactly *one* item, but only when toAddObservable
        // is completed.
        buffer = new ListObservable<Item>(toAddObservable);
    }
    
    void Add(Item to_add)
    {
        lock (this) {
            // Subjects themselves are thread-safe, but we still need the lock
            // to protect against the reset in FetchResults
            ToAddOnAnotherThread.OnNext(to_add);
        }
    }
    
    IEnumerable<Item> FetchResults()
    {
        IEnumerable<Item> ret = null;
        buffer.Subscribe(x => ret = x);
    
        lock (this) {
            toAddObservable.OnCompleted();
            Init();     // Recreate everything
        }
    
        return ret;
    }