代码之家  ›  专栏  ›  技术社区  ›  Jamiec

并行长时间运行任务的时间优化

  •  6
  • Jamiec  · 技术社区  · 6 年前

    简介

    我正在处理一个复杂的外部库,在这个库中,我试图在一个大的项目列表上执行它的功能。这个库没有公开一个好的异步接口,所以我只能使用一些非常老式的代码。

    我的目标是优化完成一批处理所需的时间,并在不包括实际第三方库的情况下演示该问题,我创建了以下问题的近似值

    问题

    对于非异步操作,您可以提前知道操作的“大小”(即复杂性):

    public interface IAction
    {
        int Size { get; }
        void Execute();
    }
    

    考虑到这一行动有三种变体:

    public class LongAction : IAction
    {
        public int Size => 10000;
        public void Execute()
        {
            Thread.Sleep(10000);
        }
    }
    
    public class MediumAction : IAction
    {
    
        public int Size => 1000;
        public void Execute()
        {
            Thread.Sleep(1000);
        }
    }
    
    public class ShortAction : IAction
    {
        public int Size => 100;
        public void Execute()
        {
            Thread.Sleep(100);
        }
    }
    

    如何优化这些操作的长列表,以便在以某种并行方式运行时,整个批处理尽可能快地完成?

    天真的,你可以把所有的东西都扔到 Parallel.ForEach ,具有相当高的并行性,这当然是可行的——但必须有一种方法来优化调度它们,以便首先启动一些最大的。

    为了进一步说明这个问题,如果我们用一个超级简化的例子

    • 1个10号任务
    • 5个2号任务
    • 10个1号任务

    和2个可用线程。我可以想出两种(很多)方法来安排这些任务(黑条是死期-没有什么可安排的):

    enter image description here

    显然,第一个比第二个完成得早。

    最小完整和可验证代码

    如果有人想要一个bash,那么整个测试代码(试着让它比我下面的幼稚实现更快):

    class Program
    {
        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
            Console.ReadLine();
        }
    
        static async Task MainAsync()
        {
            var list = new List<IAction>();
            for (var i = 0; i < 200; i++) list.Add(new LongAction());
            for (var i = 0; i < 200; i++) list.Add(new MediumAction());
            for (var i = 0; i < 200; i++) list.Add(new ShortAction());
    
    
            var swSync = Stopwatch.StartNew();
            Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = 20 }, action =>
            {
                Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Starting action {action.GetType().Name} on thread {Thread.CurrentThread.ManagedThreadId}");
                var sw = Stopwatch.StartNew();
                action.Execute();
                sw.Stop();
                Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Finished action {action.GetType().Name} in {sw.ElapsedMilliseconds}ms on thread {Thread.CurrentThread.ManagedThreadId}");
            });
            swSync.Stop();
            Console.WriteLine($"Done in {swSync.ElapsedMilliseconds}ms");
        }
    }
    
    
    public interface IAction
    {
        int Size { get; }
        void Execute();
    }
    
    public class LongAction : IAction
    {
        public int Size => 10000;
        public void Execute()
        {
            Thread.Sleep(10000);
        }
    }
    
    public class MediumAction : IAction
    {
    
        public int Size => 1000;
        public void Execute()
        {
            Thread.Sleep(1000);
        }
    }
    
    public class ShortAction : IAction
    {
        public int Size => 100;
        public void Execute()
        {
            Thread.Sleep(100);
        }
    }
    
    4 回复  |  直到 6 年前
        1
  •  1
  •   Panagiotis Kanavos    6 年前

    相对快速和肮脏的解决方案是使用 a load-balancing partitioner 在按大小递减排序的操作列表顶部

    var sorted = list.OrderByDescending(a => a.Size).ToArray();
    var partitioner=Partitioner.Create(sorted, loadBalance:true);
    
    Parallel.ForEach(partitioner, options, action =>...);
    

    仅使用这两条线,表演者就可以提高大约30%,就像其他答案一样。

    plinq对数据进行分区,并使用单独的任务一次处理整个分区。当输入大小已知时,就像IList派生的数组和列表一样,输入被划分成大小相等的块,并送入每个工作任务。

    当大小未知时,例如迭代器方法、LINQ查询等,PLINQ使用块分区。一次检索一大块数据并将其发送给工作任务。

    另一个我忘记了的选择是 负载均衡 在上面的块分区。这将使用小块对数组和ilist派生的输入应用块分区。负载平衡 Partitioner.Create 重载返回OrderablePartitioner实例,因此保留了IAction项的顺序

    同样可以通过 IEnumerable<T> 通过指定 EnumerablePartitionerOptions.NoBuffering 选项:

    var sorted = list.OrderByDescending(a => a.Size);
    var partitioner=Partitioner.Create(sorted,EnumerablePartitionerOptions.NoBuffering);
    

    这将创建一个使用区块编码的OrderablePartitioner

        2
  •  1
  •   pid    6 年前

    我马上就说,问题如下。

    你有一个整数列表和有限数量的夏天。 您需要一个将整数求和到Summers中的算法,以便Summers的最大值是可能的最小值。

    例如。:

    list = 1, 4, 10, 2, 3, 4
    summers = 3
    
    summer(1): 10
    summer(2): 4 + 4
    summer(3): 3 + 2 + 1
    

    如您所见,边界因子是运行时间最长的任务。较短的容易同时送达或在较短的时间内送达。它类似于背包,但归根结底归结为一个非常简单的“最长优先”的任务顺序。

    伪代码(与我的 发明 类)将是:

    while (taskManager.HasTasks())
    {
        task = taskManager.GetLongestTask();
        thread = threadManager.GetFreeThread(); // blocks if no thread available
        thread.Run(task);
    }
    

    这只是伪代码,不是并行/异步和块。我希望你能从中得到一些有用的东西。

        3
  •  1
  •   GPW    6 年前

    嗯,要看情况。在我的硬件上,如果我简单地将循环更改为首先运行所有长任务,那么您的人为示例(修改后的睡眠时间为1000100和10毫秒,因为我没有一整天的睡眠时间)将快30%(~15秒对~22秒):

    Parallel.ForEach(list.OrderByDescending(l=>l.Size), action => ...
    

    但这当然取决于 完全地 这些任务的负载是多少。如果两个不同的任务大量使用同一个资源(例如共享数据库),那么并行运行这两个任务可能会获得非常有限的收益,因为它们最终只会在某个级别上锁定彼此。

    我建议您需要更深入的分析,然后以某种方式根据任务的“可并行性”对它们进行分组,并尽量确保您使用尽可能多的“兼容”任务运行尽可能多的并行线程…当然,如果一个特定的任务似乎总是花费所有其他任务的时间,那么确保首先开始一个任务……

    很难用这里给出的细节给出更好的建议。

        4
  •  1
  •   JonyVol    6 年前

    按任务大小按降序排序,然后使用taskfactory在不同的任务中执行每个任务,从而节省了大量的运行时间。平行度水平保持在20。 结果是:原始样本中有114676ms对193713ms。(约40%改善)

    编辑:在您的特定示例中,列表仍然是从get go排序的,但是parallel.foreach不保留输入顺序。

    static async Task MainAsync()
    {
        var list = new List<IAction>();
        for (var i = 0; i < 200; i++) list.Add(new LongAction());
        for (var i = 0; i < 200; i++) list.Add(new MediumAction());
        for (var i = 0; i < 200; i++) list.Add(new ShortAction());
    
        Console.WriteLine("Sorting...");
        list.Sort((x, y) => y.Size.CompareTo(x.Size));
        int totalTasks = 0;
    
        int degreeOfParallelism = 20;
        var swSync = Stopwatch.StartNew();
        using (SemaphoreSlim semaphore = new SemaphoreSlim(degreeOfParallelism))
        {
            foreach (IAction action in list)
            {
                semaphore.Wait();
                Task.Factory.StartNew(() =>
                {
                    try
                    {
                        Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Starting action {action.GetType().Name} on thread {Thread.CurrentThread.ManagedThreadId}");
                        var sw = Stopwatch.StartNew();
                        action.Execute();
                        sw.Stop();
                        Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Finished action {action.GetType().Name} in {sw.ElapsedMilliseconds}ms on thread {Thread.CurrentThread.ManagedThreadId}");
                    }
                    finally
                    {
                        totalTasks++;
                        semaphore.Release();
                    }
                });
            }
    
            // Wait for remaining tasks....
            while (semaphore.CurrentCount < 20)
            { }
    
            swSync.Stop();
            Console.WriteLine($"Done in {swSync.ElapsedMilliseconds}ms");
            Console.WriteLine("Performed total tasks: " + totalTasks);
        }
    }