代码之家  ›  专栏  ›  技术社区  ›  sdgfsdh

具有冲突/排序/互斥的异步操作

  •  3
  • sdgfsdh  · 技术社区  · 6 年前

    F使使用 async 建设者。你可以编写一个完整的程序,然后将其传递给 Async.RunSynchronously .

    异步 异步 完成行动。这有点像互斥。但是,我不想仅仅把它们串联起来,因为这样做效率很低。

    具体示例:下载缓存

    假设我想使用本地文件缓存获取一些远程文件。在我的申请中 fetchFile : Async<string> 在很多地方,但是如果我打电话 fetchFile 在同一时间在同一个URL上,缓存将被多次写入而损坏。取而代之的是 获取文件 命令应该有这样的行为:

    • 如果没有缓存,请将文件下载到缓存,然后读取缓存内容
    • 如果当前正在写入缓存,请等待写入完成,然后读取内容
    • 如果缓存存在且完整,只需读取缓存内容
    • 获取文件 在两个不同的url上应该并行工作

    我在想象一种有状态的 DownloadManager 类,请求可以在内部发送和排序。

    F程序员通常如何用 异步 ?


    假想用法:

    let dm = new DownloadManager()
    
    let urls = [
      "https://www.google.com"; 
      "https://www.google.com"; 
      "https://www.wikipedia.org"; 
      "https://www.google.com"; 
      "https://www.bing.com"; 
    ]
    
    let results = 
      urls
      |> Seq.map dm.Download
      |> Async.Parallel
      |> Async.RunSynchronously
    

    this question 之前关于如何跑步 异步 以半并行的方式进行操作,但现在我意识到这种方法很难组合。

    3 回复  |  直到 6 年前
        1
  •  3
  •   Tomas Petricek    6 年前

    我同意@AMieres的观点,邮箱处理器是一种很好的方法。我的代码版本有点不太通用—它直接使用邮箱处理器来实现这一目的,因此可能会更简单一些。

    我们的邮箱处理器只有一条消息—您要求它下载一个URL,它会返回一个异步工作流,您可以等待该工作流来获得结果:

    type DownloadMessage = 
      | Download of string * AsyncReplyChannel<Async<string>>
    

    我们需要一个helper函数来异步下载URL:

    let asyncDownload url = async {
      let wc = new System.Net.WebClient()
      printfn "Downloading: %s" url
      return! wc.AsyncDownloadString(System.Uri(url)) }
    

    cache (这很好,因为邮箱处理器同步处理消息)。当我们收到下载请求时,我们检查缓存中是否已经有下载——如果没有,我们将作为子级开始下载 async

    let downloadCache = MailboxProcessor.Start(fun inbox -> async {
      let cache = System.Collections.Generic.Dictionary<_, _>()
      while true do
        let! (Download(url, repl)) = inbox.Receive()
        if not (cache.ContainsKey url) then 
          let! proc = asyncDownload url |> Async.StartChild
          cache.Add(url, proc)
        repl.Reply(cache.[url]) })
    

    要使用缓存进行实际下载,我们只需向邮箱处理器发送一个请求,然后等待返回的工作流(可能由多个请求共享)的结果。

    let downloadUsingCache url = async {
      let! res = downloadCache.PostAndAsyncReply(fun ch -> Download(url, ch))
      return! res }
    
        2
  •  3
  •   AMieres    6 年前

    更新

    Async.StartChild 佩特里切克建议我改了 lazyDownload asyncDownload


    你可以用 MailboxProcessor 作为处理缓存的下载管理器。MailboxProcessor是F中的一个结构,它处理一个消息队列,确保不发生冲突。

    let stateFull hndl initState =
        MailboxProcessor.Start(fun inbox ->
            let rec loop state : Async<unit> = async {
                try         let! f        = inbox.Receive()
                            let! newState = f state
                            return! loop newState
                with e ->   return! loop (hndl e state)
            }
            loop initState
        )
    

    第一个参数是错误的处理程序,第二个参数是初始状态,在本例中是 Map<string, Async<string>> downloadManager :

    let downloadManager = 
        stateFull (fun e s -> printfn "%A" e ; s) (Map.empty : Map<string, _>)
    

    PostAndReply :

    let applyReplyS f (agent: MailboxProcessor<'a->Async<'a>>) = 
        agent.PostAndReply(fun (reply:AsyncReplyChannel<'r>) -> 
            fun v -> async {
                let st, r = f v
                reply.Reply r
                return st 
            })
    

    此函数需要一个文件夹函数,该函数检查缓存并添加 Async<string>

    首先 异步下载

    let asyncDownload url = 
        async { 
            let started = System.DateTime.UtcNow.Ticks
            do! Async.Sleep 30
            let finished = System.DateTime.UtcNow.Ticks
            let r = sprintf "Downloaded  %A it took: %dms %s" (started / 10000L) ((finished - started) / 10000L) url
            printfn "%s" r
            return r
        }
    

    只是一个返回字符串和计时信息的伪函数。

    现在检查缓存的文件夹函数:

    let folderCache url cache  =
        cache 
        |> Map.tryFind url
        |> Option.map(fun ld -> cache, ld)
        |> Option.defaultWith (fun () -> 
            let ld = asyncDownload url |> Async.StartChild |> Async.RunSynchronously
            cache |> Map.add url ld, ld
        )
    

    最后,我们的下载功能:

    let downloadUrl url =
        downloadManager 
        |> applyReplyS (folderCache url)
    
    // val downloadUrl: url: string -> Async<string>
    

    测试

    let s = System.DateTime.UtcNow.Ticks
    printfn "started %A" (s / 10000L)
    let res = 
        List.init 50 (fun i -> i, downloadUrl (string <| i % 5) )
        |> List.groupBy (snd >> Async.RunSynchronously)
        |> List.map (fun (t, ts) -> sprintf "%s - %A" t (ts |> List.map fst ) )
    
    let f = System.DateTime.UtcNow.Ticks
    printfn "finish  %A" (f / 10000L)
    
    printfn "elapsed %dms" ((f - s) / 10000L)
    
    res |> printfn "Result: \n%A"
    

    生成以下输出:

    started 63676683215256L
    Downloaded  63676683215292L it took: 37ms "2"
    Downloaded  63676683215292L it took: 36ms "3"
    Downloaded  63676683215292L it took: 36ms "1"
    Downloaded  63676683215291L it took: 38ms "0"
    Downloaded  63676683215292L it took: 36ms "4"
    finish  63676683215362L
    elapsed 106ms
    Result: 
    ["Downloaded  63676683215291L it took: 38ms "0" - [0; 5; 10; 15; 20; 25; 30; 35; 40; 45]";
     "Downloaded  63676683215292L it took: 36ms "1" - [1; 6; 11; 16; 21; 26; 31; 36; 41; 46]";
     "Downloaded  63676683215292L it took: 37ms "2" - [2; 7; 12; 17; 22; 27; 32; 37; 42; 47]";
     "Downloaded  63676683215292L it took: 36ms "3" - [3; 8; 13; 18; 23; 28; 33; 38; 43; 48]";
     "Downloaded  63676683215292L it took: 36ms "4" - [4; 9; 14; 19; 24; 29; 34; 39; 44; 49]"]
    
        3
  •  2
  •   AMieres    6 年前

    我提供一个基于@Tomas petrichek答案的简化版本。


    假设我们有一个下载函数,给定一个url返回一个 Async<string> . 这是一个虚拟版本:

    let asyncDownload url = 
        async { 
            let started = System.DateTime.UtcNow.Ticks
            do! Async.Sleep 30
            let finished = System.DateTime.UtcNow.Ticks
            let r = sprintf "Downloaded  %A it took: %dms %s" (started / 10000L) ((finished - started) / 10000L) url
            printfn "%s" r
            return r
        }
    

    这里我们有一些简单的通用 Mailbox

    module Mailbox =
        let iterA hndl f =
            MailboxProcessor.Start(fun inbox ->
                async {
                    while true do
                        try       let!   msg = inbox.Receive()
                                  do!  f msg
                        with e -> hndl e
                }
            )
        let callA hndl f = iterA hndl (fun ((replyChannel: AsyncReplyChannel<_>), msg) -> async {
            let! r = f msg
            replyChannel.Reply r
        })
        let call hndl f = callA hndl (fun msg -> async { return f msg } )
    

    此“库”的目的是简化 MailboxProcessor . 虽然看起来很复杂也很难理解,但重要的是函数做什么以及如何使用它们。 Mailbox.call 返回能够返回值的邮箱代理。签名是:

    val call: 
       hndl: exn -> unit ->
       f   : 'a -> 'b    
          -> MailboxProcessor<AsyncReplyChannel<'b> * 'a>
    

    第一个参数是异常处理程序,第二个参数是返回值的函数。以下是我们如何定义 downloadManager :

    let downloadManager = 
        let dict = new System.Collections.Generic.Dictionary<string, _>()
        Mailbox.call (printfn "%A") (fun url ->         
            if dict.ContainsKey url then dict.[url] else
            let result = asyncDownload url |> Async.StartChild |> Async.RunSynchronously
            dict.Add(url, result)
            result
        )
    

    我们的储藏室是 Dictionary . 如果url不存在,我们调用 asyncDownload Async.StartChild 我们不必等到它完成下载,我们只返回一个 async 等待它完成。

    调用我们使用的管理器 downloadManager.PostAndReply

    let downloadUrl url = downloadManager.PostAndReply(fun reply -> reply, url)
    

    let s = System.DateTime.UtcNow.Ticks
    printfn "started %A" (s / 10000L)
    let res = 
        List.init 50 (fun i -> i, downloadUrl (string <| i % 5) )
        |> List.groupBy (snd >> Async.RunSynchronously)
        |> List.map (fun (t, ts) -> sprintf "%s - %A" t (ts |> List.map fst ) )
    
    let f = System.DateTime.UtcNow.Ticks
    printfn "finish  %A" (f / 10000L)
    
    printfn "elapsed %dms" ((f - s) / 10000L)
    
    res |> printfn "Result: \n%A"
    

    从而产生:

    started 63676682503885L
    Downloaded  63676682503911L it took: 34ms 1
    Downloaded  63676682503912L it took: 33ms 2
    Downloaded  63676682503911L it took: 37ms 0
    Downloaded  63676682503912L it took: 33ms 3
    Downloaded  63676682503912L it took: 33ms 4
    finish  63676682503994L
    elapsed 109ms
    Result: 
    ["Downloaded  63676682503911L it took: 37ms 0 - [0; 5; 10; 15; 20; 25; 30; 35; 40; 45]";
     "Downloaded  63676682503911L it took: 34ms 1 - [1; 6; 11; 16; 21; 26; 31; 36; 41; 46]";
     "Downloaded  63676682503912L it took: 33ms 2 - [2; 7; 12; 17; 22; 27; 32; 37; 42; 47]";
     "Downloaded  63676682503912L it took: 33ms 3 - [3; 8; 13; 18; 23; 28; 33; 38; 43; 48]";
     "Downloaded  63676682503912L it took: 33ms 4 - [4; 9; 14; 19; 24; 29; 34; 39; 44; 49]"]