代码之家  ›  专栏  ›  技术社区  ›  Bruno Reis

parallel.foreach生成方式线程过多

  •  13
  • Bruno Reis  · 技术社区  · 15 年前

    问题

    虽然我将在这里讨论的代码是我用f编写的,但它是基于.NET 4框架,而不是具体取决于f的任何特殊性(至少看起来是这样!).

    我的磁盘上有一些数据,我应该从网络上更新,将最新版本保存到磁盘上:

    type MyData =
        { field1 : int;
          field2 : float }
    
    type MyDataGroup =
        { Data : MyData[];
          Id : int }
    
    // load : int -> MyDataGroup
    let load dataId =
        let data = ... // reads from disk
        { Data = data;
          Id = dataId }
    
    // update : MyDataGroup -> MyDataGroup
    let update dg =
        let newData = ... // reads from the network and process
                          // newData : MyData[]
    
        { dg with Data = dg.Data
                         |> Seq.ofArray
                         |> Seq.append newData
                         |> processDataSomehow
                         |> Seq.toArray }
    
    // save : MyDataGroup -> unit
    let save dg = ... // writes to the disk
    
    let loadAndSaveAndUpdate = load >> update >> save
    

    问题在于 loadAndSaveAndUpdate 所有的数据,我必须执行函数 许多的 时代:

    {1 .. 5000} |> loadAndSaveAndUpdate
    

    每一步都可以

    • 一些磁盘IO,
    • 一些数据处理,
    • 一些网络IO(可能存在大量延迟)
    • 更多的数据处理,
    • 还有一些磁盘IO。

    在某种程度上,并行完成这项工作不是很好吗?不幸的是,我的读取和解析函数都不是“异步工作流就绪”。

    我提出的第一个(不是很好)解决方案

    任务

    我做的第一件事就是 Task[] 然后全部开始:

    let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
    let tasks = {1 .. 5000}
                |> Seq.map createTask
                |> Seq.toArray
    
    tasks |> Array.iter (fun x -> x.Start())
    Task.WaitAll(tasks)
    

    然后按ctrl+esc查看它使用了多少线程。15,17,…,35,…,170,……直到终止应用程序!出了点问题。

    平行

    我做了几乎相同的事情,但使用 Parallel.ForEach(...) 结果是一样的:很多,很多,很多线。

    有效的解决方案…有点

    然后我决定只开始 n 线程, Task.WaitAll(of them) 然后其他 n ,直到没有其他任务可用。

    这是可行的,但问题是当它完成处理后,比如, n-1 任务,它将等待,等待,等待该死的最后一个任务,因为许多网络延迟坚持阻塞。这不好!

    所以, 你将如何解决这个问题 是吗?我很高兴能看到不同的解决方案,包括异步工作流(在本例中是如何调整非异步函数)、并行扩展、奇怪的并行模式等。

    谢谢。

    4 回复  |  直到 15 年前
        1
  •  7
  •   kvb    15 年前

    你确定你的个人任务是及时完成的吗?我相信两者 Parallel.ForEach 以及 Task 类已使用.NET线程池。任务通常应该是短期工作项,在这种情况下,线程池只会生成少量实际线程,但如果任务没有进展,并且有其他任务排队,则使用的线程数将稳步增加到最大值(默认值为 250/processor 在.NET 2.0 SP1中,但在不同版本的框架下是不同的)。同样值得注意的是(至少在.NET 2.0 SP1中),新线程的创建速度被限制为每秒2个新线程,因此达到您看到的线程数表明任务没有在很短的时间内完成(因此,将责任归咎于它可能不是完全准确的) 并行循环 )

    我认为布赖恩的建议 async 工作流是一个很好的工作流程,特别是如果长期任务的来源是IO,因为 异步的 将线程返回到threadpool,直到IO完成。另一种选择是简单地接受您的任务没有快速完成,并且允许产生许多线程(可以通过使用在某种程度上控制这些线程 System.Threading.ThreadPool.SetMaxThreads )-根据您的情况,使用大量线程可能不是什么大问题。

        2
  •  11
  •   Marc Bate    14 年前

    ParallelOptions.MaxDegreeOfParallelism 限制并行方法调用运行的并发操作数

        3
  •  10
  •   Brian    15 年前

    当各种I/O调用处于“出海”状态时,使用“异步”将使您能够在不烧掉线程的情况下完成I/O绑定的工作,因此这将是我的第一个建议。将代码转换为异步应该很简单,通常是沿着

    • 将每个函数体包装在 async{...} ,添加 return 必要时
    • 通过创建库中尚未存在的任何I/O基元的异步版本 Async.FromBeginEnd
    • 切换窗体调用 let r = Foo() let! r = AsyncFoo()
    • 使用 Async.Parallel 将5000个异步对象转换为并行运行的单个异步

    有各种各样的教程可以做到这一点;其中一个这样的网络广播是 here .

        4
  •  0
  •   tster    15 年前

    你可以一直使用 ThreadPool .

    http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

    基本上:

    1. 创建线程池
    2. 设置最大线程数
    3. 使用将所有任务排队 QueueUserWorkItem(WaitCallback)