代码之家  ›  专栏  ›  技术社区  ›  darkl

ActionBlock Framework 4 rx替代方案

  •  2
  • darkl  · 技术社区  · 10 年前

    我对框架4.0的ActionBlock实现感兴趣,因为框架4.0似乎不支持TPL.Dataflow。 更特别地,我对接收Func<T输入,任务>委托和MaxDegreeOfParallism=1的情况。

    我想过使用反应式扩展来实现它,但我不确定如何实现;T输入>在Post上调用OnNext,并使用SelectMany和任务ToObservable,但我不确定如何使用调度器。这是我想的草稿。

    public class ActionBlock<TInput>
    {
        private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
        private readonly Subject<TInput> mQueue = new Subject<TInput>();
    
        public ActionBlock(Func<TInput, Task> action)
        {
            var observable =
                from item in mQueue
                from _ in action(item).ToObservable()
                select _;
    
            observable.Subscribe(x => { },
                OnComplete);
        }
    
        private void OnComplete()
        {
            mCompletion.SetResult(null);
        }
    
        public void Post(TInput input)
        {
            mQueue.OnNext(input);
        }
    
        public Task Completion
        {
            get
            {
                return mCompletion.Task;
            }
        }
    
        public void Complete()
        {
            mQueue.OnCompleted();
        }
    }
    

    我想可能使用EventLoopSchedur,但我不确定它是否适合这里,因为这是异步的。

    有什么想法吗?

    1 回复  |  直到 10 年前
        1
  •  3
  •   Brandon    10 年前
    mQueue
        .Select(input => Observable.FromAsync(() => action(input))
        .Merge(maxDegreeOfParallelism)
        .Subscribe(...);
    

    如果确实如此 maxDegreeOfParallelism 始终为1,则只需使用 Concat 而不是 Merge :

    mQueue
        .Select(input => Observable.FromAsync(() => action(input))
        .Concat()
        .Subscribe(...);
    

    这是因为 FromAsync 只是创建一个冷的可观察对象,它在订阅之前不会运行异步操作。然后我们使用 maxConcurrency 的参数 合并 (或者只是 连接两个字符串 )限制并发订阅的数量(从而限制正在运行的异步操作的数量)。

    编辑:

    因为你的目标是 Task 表示流的完成,可以使用 ToTask 而不是直接订阅。 任务 将订阅并返回 任务 具有最终值。因为 任务 如果可观察值不产生值,我们将使用 Count 以保证其产生价值:

    // task to mark completion
    private readonly Task mCompletion;
    
    // ...
    
    this.mCompletion = mQueue
        .Select(input => Observable.FromAsync(() => action(input))
        .Concat()
        .Count()
        .ToTask();