代码之家  ›  专栏  ›  技术社区  ›  majocha

以偶数间隔推送缓冲事件的方法

  •  12
  • majocha  · 技术社区  · 14 年前

    这样地:

    -oo-ooo-oo------------------oooo-oo-o-------------->
    
    -o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
    

    因为我对 接收

    更新:

    Richard Szalay 为了指出 排水管 接线员,我发现另一个 example by James Miles 排水管操作员的使用。下面是我如何让它在WPF应用程序中工作的:

        .Drain(x => {
            Process(x);
            return Observable.Return(new Unit())
                .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
        }).Subscribe();
    

    我有一些乐趣,因为省略scheduler参数会导致应用程序在调试模式下崩溃,而不会出现任何异常(我需要学习如何处理Rx中的异常)。

    更新:

    与此同时,我一直在尝试ISubject,下面的类实现了我想要的——它及时地释放缓冲的T:

    public class StepSubject<T> : ISubject<T>
    {
        IObserver<T> subscriber;
        Queue<T> queue = new Queue<T>();
        MutableDisposable cancel = new MutableDisposable();
        TimeSpan interval;
        IScheduler scheduler;
        bool idle = true;
    
        public StepSubject(TimeSpan interval, IScheduler scheduler)
        {
            this.interval = interval;
            this.scheduler = scheduler;
        }
    
        void Step()
        {
            T next;
            lock (queue)
            {
                idle = queue.Count == 0;
                if (!idle)
                    next = queue.Dequeue();
            }
    
            if (!idle)
            {
                cancel.Disposable = scheduler.Schedule(Step, interval);
                subscriber.OnNext(next);
            }
        }
    
        public void OnNext(T value)
        {
            lock (queue)
                queue.Enqueue(value);
    
            if (idle)
                cancel.Disposable = scheduler.Schedule(Step);
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            subscriber = observer;
            return cancel;
        }
    }
    

    3 回复  |  直到 7 年前
        1
  •  11
  •   Richard Szalay    14 年前

    实际上比听起来更诡异。

    使用 Delay

    使用 Interval 无论哪一种 CombineLatest Zip

    我觉得新的 Drain added in 1.0.2787.0 延迟 应该这样做:

    source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));
    

    排水管 SelectMany ,但在使用下一个值调用选择器之前,请等待上一个输出完成。 还是没有 你在追求什么(块中的第一个值也将延迟),但它很接近: 上面的用法现在与大理石图匹配。

    显然是 在框架中不能像 选择多 . 我会在官方论坛上征求一些建议。同时,这里有一个Drain的实现,它可以实现您想要的功能:

    修复了实现中的错误并更新了用法以匹配您请求的大理石图。

    public static class ObservableDrainExtensions
    {
        public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
            Func<TSource, IObservable<TOut>> selector)
        {
            return Observable.Defer(() =>
            {
                BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
    
                return source
                    .Zip(queue, (v, q) => v)
                    .SelectMany(v => selector(v)
                        .Do(_ => { }, () => queue.OnNext(new Unit()))
                    );
            });
        }
    }
    
        2
  •  2
  •   blueling    14 年前

    为了完整起见,这里是Richard建议的Drain()方法的另一个版本(更紧凑):

    public static IObservable<T2> SelectManySequential<T1, T2>(
        this IObservable<T1> source, 
        Func<T1, IObservable<T2>> selector
    )
    {
        return source
            .Select(x => Observable.Defer<T2>(() => selector(x)))
            .Concat();
    }
    

    见线 Drain + SelectMany = ? 在Rx论坛上。

    我意识到我使用的Concat()重载是我的个人Rx扩展之一,它(还没有)是框架的一部分。我为这个错误道歉。。当然,这使我的解决方案没有我想象的那么优雅。

    public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
    {
        return Observable.CreateWithDisposable<T>(o =>
        {
            var lockCookie = new Object();
            bool completed = false;
            bool subscribed = false;
            var waiting = new Queue<IObservable<T>>();
            var pendingSubscription = new MutableDisposable();
    
            Action<Exception> errorHandler = e =>
            {
                o.OnError(e);
                pendingSubscription.Dispose();
            };
    
            Func<IObservable<T>, IDisposable> subscribe = null;
            subscribe = (ob) =>
            {
                subscribed = true;
                return ob.Subscribe(
                    o.OnNext,
                    errorHandler,
                    () =>
                    {
                        lock (lockCookie)
                        {
                            if (waiting.Count > 0)
                                pendingSubscription.Disposable = subscribe(waiting.Dequeue());
                            else if (completed)
                                o.OnCompleted();
                            else
                                subscribed = false;
                        }
                    }
                );
            };
    
            return new CompositeDisposable(pendingSubscription,
                source.Subscribe(
                    n =>
                    {
                        lock (lockCookie)
                        {
                            if (!subscribed)
                                pendingSubscription.Disposable = subscribe(n);
                            else
                                waiting.Enqueue(n);
                        }
    
                    },
                    errorHandler
                    , () =>
                    {
                        lock (lockCookie)
                        {
                            completed = true;
                            if (!subscribed)
                                o.OnCompleted();
                        }
                    }
                )
            );
        });
    }
    

    现在用自己的武器打自己: 同样的Concat()方法可以用Richard Szalay的绝妙方式编写得更加优雅:

    public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => 
                    v.Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
    

    所以荣誉属于理查德。:-)

        3
  •  2
  •   Ana Betts    14 年前

    我是这样做的,只是使用了一个显式队列(ReactiveCollection只是WPF的observateCollection-ReactiveCollection.item s added OnNext's的一个奇特版本,您可以想象得到,对于添加的每个项):

    https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

    public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
    {
        var ret = new ReactiveCollection<T>();
        if (WithDelay == null) {
            FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
            return ret;
        }
    
        // On a timer, dequeue items from queue if they are available
        var queue = new Queue<T>();
        var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
            .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => {
                if (queue.Count > 0) { 
                    ret.Add(queue.Dequeue());
                }
            });
    
        // When new items come in from the observable, stuff them in the queue.
        // Using the DeferredScheduler guarantees we'll always access the queue
        // from the same thread.
        FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);
    
        // This is a bit clever - keep a running count of the items actually 
        // added and compare them to the final count of items provided by the
        // Observable. Combine the two values, and when they're equal, 
        // disconnect the timer
        ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), 
            (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());
    
        return ret;
    }