代码之家  ›  专栏  ›  技术社区  ›  TheGeneral

系统反应性节流异步方法

  •  -1
  • TheGeneral  · 技术社区  · 6 年前

    我已经推迟使用反应式扩展这么长时间了,我认为这将是一个很好的用途。很简单,我有一个方法,可以在不同的代码路径上以各种原因调用它

    private async Task GetProductAsync(string blah) {...}
    

    我需要能够 节气门 这种方法。也就是说,我要停止呼叫流,直到不再呼叫(在指定的时间段内)。或者更清楚地说,如果在某个特定的时间段内有10个对此方法的调用,我希望在进行最后一个调用时将其限制(限制)为仅1个调用(在一个时间段之后)。

    我可以看到一个使用 IEnumerable 这是有道理的

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents() 
    { ... }
    
    ...
    
    var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
    var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));
    
    using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
    {
        Console.WriteLine("Press any key to unsubscribe");
        Console.ReadKey();
    }
    
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
    

    然而,(这一直是我对RX的主要问题,永远),我如何创建一个 Observable 从一个简单的 async 方法。

    更新

    我设法找到了一种替代方法 反应性

    Barcode = new ReactiveProperty<string>();
    Barcode.Select(text => Observable.FromAsync(async () => await GetProductAsync(text)))
           .Throttle(TimeSpan.FromMilliseconds(1000))
           .Switch()
           .ToReactiveProperty(); 
    

    前提是我在文本属性中捕获它 Barcode 但它也有自己的缺点,如 ReactiveProperty 处理通知,并且我不能像已经管理的那样静默地更新支持字段。

    所以,再次,如果无论如何都不能帮助我转换 异步的 方法调用observable,这样我就可以使用throttle方法了。

    2 回复  |  直到 6 年前
        1
  •  1
  •   Sentinel    6 年前

    有两种将任务转换为可观测的关键方法,它们之间有着重要的区别。

    Observable.FromAsync(()=>GetProductAsync("test"));
    

    GetProductAsync("test").ToObservable();
    

    在您订阅任务之前,第一个任务不会启动。 第二个将创建(并启动)任务,根据任务的速度,结果将立即或稍后出现在可观察的中。

    不过,从总体上看,你似乎想停下来。 呼叫流。 您不希望限制结果流,这将导致不必要的计算和损失。

    如果这是您的目标,则可以将getProductAsync视为 观察者 对于调用事件,getProductAsync应该限制这些调用。实现这一目标的一种方法是

    public event Action<string> GetProduct;
    

    和使用

      var callStream= Observable.FromEvent<string>( 
                 handler =>  GetProduct+= handler , 
                 handler => GetProduct-= handler);
    

    然后问题就变成了如何返回结果,以及当您的“呼叫者”呼叫被抑制和丢弃时应该发生什么。

    其中一种方法是声明一个类型“getProductCall”,该类型将输入字符串和输出结果作为属性。

    然后您可以进行如下设置:

    var callStream= Observable.FromEvent<GetProductCall>( 
                 handler =>  GetProduct+= handler , 
                 handler => GetProduct-= handler)
                .Throttle(...)
                .Select(r=>async r.Result= await GetProductCall(r.Input).ToObservable().FirstAsync());
    

    (代码没有测试,只是说明性的)

    另一种方法可能包括合并(n)重载,它限制了并发可观测数据的最大数量。

        2
  •  1
  •   Shlomo    6 年前

    与你的问题无关,但可能有帮助:Rx Throttle 接线员真的是一个脱钩的接线员。最接近节流运算符的是 Sample 。这是区别(假设你想节流或放松到一个项目/3秒):

    items   : --1-23----4-56-7----8----9-
    throttle: --1--3-----4--6--7--8-----9
    debounce: --1-------4--6------8----9-
    

    样品 /节流阀会将在敏感时间到达的项目捆在一起,并在下一个采样勾号上发出最后一个项目。Debounce丢弃在敏感时间到达的项目,然后重新启动时钟:项目发出的唯一方法是在其前面加上静默的时间范围。

    Rx.net的 节气门 接线员做什么 debounce 以上描述。 样品 做什么? throttle 以上描述。

    如果你想要一些不同的东西,描述一下你想要如何节流。