代码之家  ›  专栏  ›  技术社区  ›  asgerhallas

反应式扩展非重叠、串行分组(或WindowUntilChange)

  •  2
  • asgerhallas  · 技术社区  · 6 年前

    我试着为Rx做一个SerialGroupBy操作符。网操作员的目的是像GroupBy一样工作,但每次创建一个新组时,前者就完成了。因此,一次开放的团体永远不会超过一个。

    我目前的“最佳”实现是这样的:

    public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
        this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
        stream.Publish(shared => 
            shared.GroupByUntil(keySelector, elementSelector, group => 
                shared.DistinctUntilChanged(keySelector)));
    

    我希望在下一个小组开始之前让小组结束,就像这里测试的那样:

    [Fact]
    public void SerialGroupBy()
    {
        var scheduler = new TestScheduler();
    
        var stream = scheduler.CreateHotObservable(
            OnNext(201, "First group"),
            OnNext(202, "Second group"),
            OnNext(203, "Second group"));
    
        var observer = scheduler.CreateObserver<string>();
    
        stream.SerialGroupBy(x => x.Length, x => x)
            .Select(x => x.Subscribe(observer))
            .Subscribe();
    
        scheduler.Start();
    
        observer.Messages.ShouldBeLike(
            OnNext(201, "First group"),
            OnCompleted<string>(202),
            OnNext(202, "Second group"),
            OnNext(203, "Second group"));
    }
    

    但第一组的完成太晚了,比如:

    OnNext(201, "First group"),
    OnNext(202, "Second group"),
    OnCompleted<string>(202),
    OnNext(203, "Second group"));
    

    我能理解为什么(根据GroupByTill的实现,在结束观察者之前通知开始观察者),但我如何实现它,以使组不重叠?

    我尝试过几种不同的方法,但最终总是会出现同一问题的变体。

    0 回复  |  直到 6 年前