代码之家  ›  专栏  ›  技术社区  ›  stop-cran SimpleOne

假脱机由Observable.FromEvent生成的正在进行的项目

  •  0
  • stop-cran SimpleOne  · 技术社区  · 6 年前

    我的目标是将所有项目/通知从 IObservable<T> 对于未来的订户。

    例如,如果有人订阅了一个消息流,他首先会收到订阅之前收到的所有消息。然后他开始接收新的信息,只要有。这应该是无缝的,不会在新旧消息之间的“边界”上重复和丢失。

    我提出了以下扩展方法:

    public static IObservable<T> WithHistory<T>(this IObservable<T> source)
    {
        var accumulator = new BlockingCollection<T>();
    
        source.Subscribe(accumulator.Add);
    
        return accumulator
            .GetConsumingEnumerable()
            .ToObservable()
            .SubscribeOn(ThreadPoolScheduler.Instance);
    }
    

    就我所测试的而言,它是有效的:

    class Generator<T>
    {
        event Action<T> onPush;
    
        public IObservable<T> Items =>
            Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
    
        public void Push(T item) => onPush?.Invoke(item);
    }
    
    ...
    
    private static void Main()
    {
        var g = new Generator<int>();
        var ongoingItems = g.Items;
        var allItems = g.Items.WithHistory();
    
        g.Push(1);
        g.Push(2);
    
        ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
        allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
    
        g.Push(3);
        g.Push(4);
        g.Push(5);
    
        Console.ReadLine();
    }
    

    结果是:

    Ongoing: got 3
    Ongoing: got 4
    Ongoing: got 5
    WithHistory: got 1
    WithHistory: got 2
    WithHistory: got 3
    WithHistory: got 4
    WithHistory: got 5
    

    但是,使用 BlockingCollection<T> 似乎是杀戮过度。另外,上面的方法不支持完成、错误处理,并且会导致死锁 .SubscribeOn(ThreadPoolScheduler.Instance) .

    有没有更好的方法来实现它,而没有所描述的缺陷?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Shlomo    6 年前

    最好的方法是 .Replay()

    void Main()
    {
        var g = new Generator<int>();
        var ongoingItems = g.Items;
        var allItems = g.Items.Replay().RefCount();
    
        using(var tempSubscriber = allItems.Subscribe())
        {
            g.Push(1);
            g.Push(2);
    
            ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
            allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
    
            g.Push(3);
            g.Push(4);
            g.Push(5);
    
            Console.ReadLine();
        }
    }
    

    .Replay().RefCount() 生成一个observable,它将保留一个内部队列以便重播,只要有订户。但是,如果您有一个持久订户(就像您的解决方案在 WithHistory 方法),内存泄漏。解决这一问题的最佳方法是拥有一个临时订户,当您不再对历史感兴趣时,它会自动断开连接。