我应该清楚地表明,所有这些都是RX设计内部的演示。你可以看一下课程
AnonymousObservable<T>
,
AnonymousObserver<T>
和
AnonymousDisposable
框架就是这样做的。挺直的。但是,您几乎不应该使用任何代码,而应该使用
Disposable.Create
和
Observable.Create
。如果你正在实施
IObservable
你肯定做错了。
这是一个基本的想法:可观测的需要产生一个
IDisposable
将相关观察者从观察者的内部观察者列表中删除。您的代码正在(错误地)删除
全部的
内部名单的观察员。
这里有一个基本的一次性,使它很容易创建功能。有了这个代码,
GenericDisposable.Create
与
Disposable.Create(Action a)
public class GenericDisposable : IDisposable
{
public static IDisposable Create(Action disposeAction)
{
return new GenericDisposable(disposeAction);
}
private readonly Action _disposeAction;
public GenericDisposable(Action disposeAction)
{
_disposeAction = disposeAction;
}
public void Dispose()
{
_disposeAction();
}
}
…下面是一个可观察的实现示例:
public class SendIntMessages : IObservable<int>
{
private readonly HashSet<IObserver<int>> _observers = new HashSet<IObserver<int>>();
protected void OnNext(int i)
{
foreach (var o in _observers)
o.OnNext(i);
}
protected void OnError(Exception e)
{
foreach (var o in _observers)
o.OnError(e);
}
protected void OnCompleted()
{
foreach (var o in _observers)
o.OnCompleted();
}
public void SendIntMessage(int i)
{
OnNext(i);
}
public void EndStream()
{
OnCompleted();
}
public void SendError(Exception e)
{
OnError(e);
}
public IDisposable Subscribe(IObserver<int> observer)
{
_observers.Add(observer);
return GenericDisposable.Create(() => _observers.Remove(observer));
}
}
这是一个长期的,热观测。它跟踪观察者,一次性取消订阅。
相比之下,考虑到这一可观察到的:
public class CountTo5 : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnNext(4);
observer.OnNext(5);
return GenericDisposable.Create(() => {});
}
}
这是一种可以立即观察到的“冷”现象。中间没有退订的方法:当你拿到一次性物品时,观察者已经得出结论。
Disposable.Empty
是一个简单的
DisposableCreate(() => {})
.