代码之家  ›  专栏  ›  技术社区  ›  James L

统计实时并发流的数量

  •  0
  • James L  · 技术社区  · 4 年前

    我有一条小溪。我想知道如何将其转化为许多内部流当前处于活动状态的实时计数,即未完成或出错。

    我该如何实现CountLiveStreams?

    var source = new Subject<IObservable<Unit>>();
    IObservable<int> count = source.CountLiveStreams();
    

    谢谢!

    0 回复  |  直到 4 年前
        1
  •  5
  •   Theodor Zoulias    4 年前

    这是一个实现 CountLiveStreams 操作员:

    public static IObservable<int> CountLiveStreams<T>(
        this IObservable<IObservable<T>> streamOfStreams)
    {
        return streamOfStreams
            .Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
                .Prepend(1).Append(-1))
            .Merge()
            .Scan(0, (accumulator, delta) => accumulator + delta)
            .Prepend(0);
    }
    

    每个发射的流都转换为 IObservable<int> 它发出2个值,开始时为1,结束时为-1。然后,将所有流生成的所有值对合并为一个 IObservable<int> ,最后使用 Scan 操作员。

        2
  •  2
  •   Enigmativity    4 年前

    这对我来说很有效:

    public static IObservable<int> CountLiveStreams<T>(this IObservable<IObservable<T>> source) =>
        source
            .SelectMany(xs =>
                xs
                    .Materialize()
                    .Where(x => x.Kind != NotificationKind.OnNext)
                    .Select(x => -1)
                    .StartWith(1))
            .Scan((x, y) => x + y);
    

    它使用的策略与西奥多的生产答案相同 1 -1 然后使用 .Scan 创建一个连续的统计数据,但我认为打电话给 .Materialize() .