我试着为Rx做一个SerialGroupBy操作符。网操作员的目的是像GroupBy一样工作,但每次创建一个新组时,前者就完成了。因此,一次开放的团体永远不会超过一个。
我目前的“最佳”实现是这样的:
public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
stream.Publish(shared =>
shared.GroupByUntil(keySelector, elementSelector, group =>
shared.DistinctUntilChanged(keySelector)));
我希望在下一个小组开始之前让小组结束,就像这里测试的那样:
[Fact]
public void SerialGroupBy()
{
var scheduler = new TestScheduler();
var stream = scheduler.CreateHotObservable(
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
var observer = scheduler.CreateObserver<string>();
stream.SerialGroupBy(x => x.Length, x => x)
.Select(x => x.Subscribe(observer))
.Subscribe();
scheduler.Start();
observer.Messages.ShouldBeLike(
OnNext(201, "First group"),
OnCompleted<string>(202),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
}
但第一组的完成太晚了,比如:
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));
我能理解为什么(根据GroupByTill的实现,在结束观察者之前通知开始观察者),但我如何实现它,以使组不重叠?
我尝试过几种不同的方法,但最终总是会出现同一问题的变体。