代码之家  ›  专栏  ›  技术社区  ›  cdiggins

如何在.NET中最有效地利用多核进行短期计算?

  •  8
  • cdiggins  · 技术社区  · 14 年前

    programming language called Heron

    我面临的最大挑战之一是,每当遇到可并行化操作时,如何有效地将evaluator所做的工作分布到不同的内核中。这可能是短期或长期操作,很难提前确定。

    我不必担心的一件事是同步数据:明确不允许并行操作修改数据。

    • 什么是跨线程分配工作的最有效方法,这样我就可以保证计算机将工作分配到两个核心上?

    • 一个操作大概需要多长时间才能开始克服将工作分离到另一个线程的开销?
    3 回复  |  直到 14 年前
        1
  •  15
  •   Nick Craver    14 年前

    如果您想大量使用并行操作,您需要从.NET4.0开始。这是我的建议 Parallel Programming for .Net documentation . 你会想的 start here though

    for(int i = 0; i < 30000; i++)
    {
      doSomething(i);
    }
    

    新的.Net 4.0并行方法:

    Parallel.For(0, 30000, (i) => doSomething(i));
    

    Parallel.For 方法会自动跨可用的核心数进行缩放,您可以看到您可以多快地开始利用这一点。该框架中有几十个新库,支持与您的示例一样的完整线程/任务管理(包括用于同步、取消等的所有管道)。

    这里有图书馆供学生使用 Parallel LINQ (PLINQ) Task Factories , Task Schedulers go ahead and grab the free beta 2 ( RC coming soon (不,我不为微软工作……但我很少看到即将发布的版本能如此完美地满足需求,所以我强烈推荐您使用.Net 4.0)

        2
  •  5
  •   cdiggins    14 年前

    因为我不想用VS2010开发,我发现 ThreadPool 在跨内核分配工作时没有最佳性能(我认为这是因为它启动/停止了太多线程),我最终运行了自己的线程。希望其他人认为这很有用:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace HeronEngine
    {
        /// <summary>
        /// Represents a work item.
        /// </summary>
        public delegate void Task();
    
        /// <summary>
        /// This class is intended to efficiently distribute work 
        /// across the number of cores. 
        /// </summary>
        public static class Parallelizer 
        {
            /// <summary>
            /// List of tasks that haven't been yet acquired by a thread 
            /// </summary>
            static List<Task> allTasks = new List<Task>();
    
            /// <summary>
            /// List of threads. Should be one per core. 
            /// </summary>
            static List<Thread> threads = new List<Thread>();
    
            /// <summary>
            /// When set signals that there is more work to be done
            /// </summary>
            static ManualResetEvent signal = new ManualResetEvent(false);
    
            /// <summary>
            /// Used to tell threads to stop working.
            /// </summary>
            static bool shuttingDown = false;
    
            /// <summary>
            /// Creates a number of high-priority threads for performing 
            /// work. The hope is that the OS will assign each thread to 
            /// a separate core.
            /// </summary>
            /// <param name="cores"></param>
            public static void Initialize(int cores)
            {
                for (int i = 0; i < cores; ++i)
                {
                    Thread t = new Thread(ThreadMain);
                    // This system is not designed to play well with others
                    t.Priority = ThreadPriority.Highest;
                    threads.Add(t);
                    t.Start();
                }
            }
    
            /// <summary>
            /// Indicates to all threads that there is work
            /// to be done.
            /// </summary>
            public static void ReleaseThreads()
            {
                signal.Set();
            }
    
            /// <summary>
            /// Used to indicate that there is no more work 
            /// to be done, by unsetting the signal. Note: 
            /// will not work if shutting down.
            /// </summary>
            public static void BlockThreads()
            {
                if (!shuttingDown)
                    signal.Reset();
            }
    
            /// <summary>
            /// Returns any tasks queued up to perform, 
            /// or NULL if there is no work. It will reset
            /// the global signal effectively blocking all threads
            /// if there is no more work to be done.
            /// </summary>
            /// <returns></returns>
            public static Task GetTask()
            {
                lock (allTasks)
                {
                    if (allTasks.Count == 0)
                    {
                        BlockThreads();
                        return null;
                    }
                    Task t = allTasks.Peek();
                    allTasks.Pop();
                    return t;
                }
            }
    
            /// <summary>
            /// Primary function for each thread
            /// </summary>
            public static void ThreadMain()
            {
                while (!shuttingDown)
                {
                    // Wait until work is available
                    signal.WaitOne();
    
                    // Get an available task
                    Task task = GetTask();
    
                    // Note a task might still be null becaue
                    // another thread might have gotten to it first
                    while (task != null)
                    {
                        // Do the work
                        task();
    
                        // Get the next task
                        task = GetTask();
                    }
                }
            }
    
            /// <summary>
            /// Distributes work across a number of threads equivalent to the number 
            /// of cores. All tasks will be run on the available cores. 
            /// </summary>
            /// <param name="localTasks"></param>
            public static void DistributeWork(List<Task> localTasks)
            {
                // Create a list of handles indicating what the main thread should wait for
                WaitHandle[] handles = new WaitHandle[localTasks.Count];
    
                lock (allTasks)
                {
                    // Iterate over the list of localTasks, creating a new task that 
                    // will signal when it is done.
                    for (int i = 0; i < localTasks.Count; ++i)
                    {
                        Task t = localTasks[i];
    
                        // Create an event used to signal that the task is complete
                        ManualResetEvent e = new ManualResetEvent(false);
    
                        // Create a new signaling task and add it to the list
                        Task signalingTask = () => { t(); e.Set(); };
                        allTasks.Add(signalingTask);
    
                        // Set the corresponding wait handler 
                        handles[i] = e;
                    }
                }
    
                // Signal to waiting threads that there is work
                ReleaseThreads();
    
                // Wait until all of the designated work items are completed.
                Semaphore.WaitAll(handles);
            }
    
            /// <summary>
            /// Indicate to the system that the threads should terminate
            /// and unblock them.
            /// </summary>
            public static void CleanUp()
            {
                shuttingDown = true;
                ReleaseThreads();
            }
        }    
    }
    
        3
  •  2
  •   Mitzi    14 年前