代码之家  ›  专栏  ›  技术社区  ›  NotAgain

从订阅方法返回可观察的

  •  1
  • NotAgain  · 技术社区  · 6 年前

    我有一个关于可见光的问题( which I posted 关于这本书的出版商分论坛,但我仍在等待任何回应)。

    我使用标准实践中提供的辅助方法,而不是手工制作观察对象。然而,出于学术兴趣,我确实看到了手工制作一个可观察到的东西需要什么。

    我在一本书中看到了一个实现,在subscribe方法disposable.empty的末尾返回了该实现。 代码如下所示。

    public class MyObservable : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> observer)
        {
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(1000);
                observer.OnNext(i);
            }
            observer.OnCompleted();
            return Disposable.Empty;
        }
    }
    

    如果我想退回一个合适的一次性物品,当调用Dispose时,这实际上会导致取消订阅,应该怎么做?

    我用这个 Observable 这是为了 Observer

    我必须介绍一个订阅处理程序

    public class SubscriptionHandler : IDisposable
    {
        private readonly List<IObserver<int>> _listOfObservers;
        private readonly IObserver<int> _currentObserver;
    
        public SubscriptionHandler(List<IObserver<int>> currentListOfObservers, IObserver<int> currentObserver)
        {
            _listOfObservers = currentListOfObservers;
            _currentObserver = currentObserver;
        }
    
        public void Dispose()
        {
            if (_currentObserver != null && _listOfObservers.Contains(_currentObserver))
            {
                _listOfObservers.Remove(_currentObserver);
            }
        }
    }
    

    这是可观察的代码

    public class MyObservable : IObservable<int>
    {
        private List<IObserver<int>> _listOfSubscribedObservers = new List<IObserver<int>>();
    
        public IDisposable Subscribe(IObserver<int> observer)
        {
            if (!_listOfSubscribedObservers.Contains(observer))
            {
                _listOfSubscribedObservers.Add(observer);
            }
    
            Task.Run(() =>
            {
                for (int i = 0; i < 5; i++)
                {
                    Thread.Sleep(1000);
                    observer.OnNext(i);
                }
    
                observer.OnCompleted();
            });
    
            return new SubscriptionHandler(_listOfSubscribedObservers, observer);
        }
    }
    

    我觉得我错过了什么。必须有一个内置的方法来返回一个有意义的一次性手工制作的Observable,或者这是只与Observable Create Helper方法一起提供的东西?

    附言:如果投反对票,请在评论中说明原因,这样我就可以避免在提问时重复出现错误。

    1 回复  |  直到 6 年前
        1
  •  2
  •   Shlomo    6 年前

    我应该清楚地表明,所有这些都是RX设计内部的演示。你可以看一下课程 AnonymousObservable<T> , AnonymousObserver<T> AnonymousDisposable 框架就是这样做的。挺直的。但是,您几乎不应该使用任何代码,而应该使用 Disposable.Create Observable.Create 。如果你正在实施 IObservable 你肯定做错了。

    这是一个基本的想法:可观测的需要产生一个 IDisposable 将相关观察者从观察者的内部观察者列表中删除。您的代码正在(错误地)删除 全部的 内部名单的观察员。

    这里有一个基本的一次性,使它很容易创建功能。有了这个代码, GenericDisposable.Create Disposable.Create(Action a)

    public class GenericDisposable : IDisposable
    {
        public static IDisposable Create(Action disposeAction)
        {
            return new GenericDisposable(disposeAction);
        }
    
        private readonly Action _disposeAction;
        public GenericDisposable(Action disposeAction)
        {
            _disposeAction = disposeAction;
        }
        public void Dispose()
        {
            _disposeAction();
        }
    }
    

    …下面是一个可观察的实现示例:

    public class SendIntMessages : IObservable<int>
    {
        private readonly HashSet<IObserver<int>> _observers = new HashSet<IObserver<int>>();
    
        protected void OnNext(int i)
        {
            foreach (var o in _observers)
                o.OnNext(i);
        }
    
        protected void OnError(Exception e)
        {
            foreach (var o in _observers)
                o.OnError(e);
        }
    
        protected void OnCompleted()
        {
            foreach (var o in _observers)
                o.OnCompleted();
        }
    
        public void SendIntMessage(int i)
        {
            OnNext(i);
        }
    
        public void EndStream()
        {
            OnCompleted();
        }
    
        public void SendError(Exception e)
        {
            OnError(e);
        }
    
        public IDisposable Subscribe(IObserver<int> observer)
        {
            _observers.Add(observer);
            return GenericDisposable.Create(() => _observers.Remove(observer));
        }
    }
    

    这是一个长期的,热观测。它跟踪观察者,一次性取消订阅。

    相比之下,考虑到这一可观察到的:

    public class CountTo5 : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> observer)
        {
            observer.OnNext(1);
            observer.OnNext(2);
            observer.OnNext(3);
            observer.OnNext(4);
            observer.OnNext(5);
    
            return GenericDisposable.Create(() => {});
        }
    }
    

    这是一种可以立即观察到的“冷”现象。中间没有退订的方法:当你拿到一次性物品时,观察者已经得出结论。

    Disposable.Empty 是一个简单的 DisposableCreate(() => {}) .