代码之家  ›  专栏  ›  技术社区  ›  Ronnyek

c#每次迭代的速率限制

  •  0
  • Ronnyek  · 技术社区  · 9 年前

    基本上,我试图对列表的迭代执行进行速率限制。

    我真的很喜欢使用RX的想法,因为我可以在它的基础上构建,并有一个更优雅的解决方案,但不必使用RX。

    我在许多比我聪明得多的人的帮助下制定了这个公式。我的问题是我想说一些Collection。RateLimitedForEach(rate,function),并使其最终阻塞,直到我们完成处理……或使其成为异步方法。

    函数下面的演示在控制台应用程序中工作,但如果我在foreach之后关闭,它会立即返回。

    我只是有点不知所措,这是否可以修复,或者我是否应该完全不同地处理它

    public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
    {
        list.ToObservable().Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
        .Do(action).Subscribe();
    }
    
    //rate limits iteration of foreach... keep in mind this is not the same thing as just sleeping for a second
    //between each iteration, this is saying at the start of the next iteration, if minimum delay time hasnt past, hold until it has
    var maxRequestsPerMinute = 60;
    requests.RateLimitedForeach(60/maxRequestsPerMinute,(request) =>   SendRequest(request));
    
    4 回复  |  直到 9 年前
        1
  •  4
  •   Yacoub Massad    9 年前

    但不必使用RX

    以下是同步执行的方法:

    public static void RateLimitedForEach<T>(
        this List<T> list,
        double minumumDelay,
        Action<T> action)
    {
        foreach (var item in list)
        {
            Stopwatch sw = Stopwatch.StartNew();
    
            action(item);
    
            double left = minumumDelay - sw.Elapsed.TotalSeconds;
    
            if(left > 0)
                Thread.Sleep(TimeSpan.FromSeconds(left));
        }
    }
    

    下面是异步执行的方法(只有潜在的等待是异步的):

    public static async Task RateLimitedForEachAsync<T>(
        this List<T> list,
        double minumumDelay,
        Action<T> action)
    {
        foreach (var item in list)
        {
            Stopwatch sw = Stopwatch.StartNew();
    
            action(item);
    
            double left = minumumDelay - sw.Elapsed.TotalSeconds;
    
            if (left > 0)
                await Task.Delay(TimeSpan.FromSeconds(left));
        }
    }    
    

    请注意,您可以更改异步版本,使操作像这样自异步:

    public static async Task RateLimitedForEachAsync<T>(
        this List<T> list,
        double minumumDelay,
        Func<T,Task> async_task_func)
    {
        foreach (var item in list)
        {
            Stopwatch sw = Stopwatch.StartNew();
    
            await async_task_func(item);
    
            double left = minumumDelay - sw.Elapsed.TotalSeconds;
    
            if (left > 0)
                await Task.Delay(TimeSpan.FromSeconds(left));
    
        }
    }    
    

    如果您需要对每个项目运行的操作是异步的,这将很有帮助。

    最后一个版本可以这样使用:

    List<string> list = new List<string>();
    
    list.Add("1");
    list.Add("2");
    
    var task = list.RateLimitedForEachAsync(1.0, async str =>
    {
        //Do something asynchronous here, e.g.:
        await Task.Delay(500);
        Console.WriteLine(DateTime.Now + ": " + str);
    });
    

    现在你应该等待 task 完成。如果这是 Main 方法,则需要像这样同步等待:

    task.Wait();
    

    另一方面,如果您在异步方法中,则需要异步等待,如下所示:

    await task;
    
        2
  •  0
  •   Enigmativity    9 年前

    你的代码非常完美。

    请尝试以下操作:

    public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
    {
        list
            .ToObservable()
            .Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
            .Do(action)
            .ToArray()
            .Wait();
    }
    
        3
  •  0
  •   Community c0D3l0g1c    7 年前

    你需要获得交叉的概念是,主线程不在你的 RateLimitedForEach 调用以完成。此外,在您的控制台应用程序上,只要主线程结束,进程就会结束。

    这是什么意思?这意味着,无论观察者是否在场,这一过程都将结束 每个的费率限制 已完成执行。

    注意:用户可能仍会强制完成应用程序的执行, 这是件好事 。如果您想在不挂起UI的情况下等待,可以使用表单应用程序;如果您不希望用户关闭与流程相关的窗口,可以使用服务。


    使用任务是 superios solution 我在下面介绍的内容。

    请注意,在控制台应用程序上使用任务时,您仍然需要等待任务,以防止主线程在 每个的费率限制 完成了工作。仍然建议远离控制台应用程序。


    如果你坚持继续使用你的代码,你可以调整它,让它挂起调用线程直到完成:

    public static void RateLimitedForEach<T>
    (
        this List<T> list,
        double minumumDelay,
        Action<T> action
    )
    {
        using (var waitHandle = new ManualResetEventSlim(false))
        {
    
            var mainObservable = list.ToObservable();
            var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(minumumDelay));  
            var zipObservable = mainObservable .Zip(intervalObservable, (v, _) => v);
            zipObservable.Subscribe
            (
                action,
                error => GC.KeepAlive(error), // Ingoring them, as you already were
                () => waitHandle.Set() // <-- "Done signal"
            );
    
            waitHandle.Wait(); // <--- Waiting on the observer to complete
        }
    }
    
        4
  •  -1
  •   Cleverguy25    9 年前