首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MailboxProcessor -告诉我什么时候停?

MailboxProcessor -告诉我什么时候停?
EN

Stack Overflow用户
提问于 2011-08-09 11:18:04
回答 2查看 2.7K关注 0票数 2

我现在正在玩MailboxProcessor。因此,我组成了几个代理程序,它们可以在计算机和所有子目录中爬行一个目录,然后打印每个目录中的文件:

代码语言:javascript
复制
let fileCollector =
  MailboxProcessor.Start(fun self -> 
    let rec loop() =
      async { let! file = self.Receive()
              printfn "%s" file
              return! loop() }
    loop()) 

let folderCollector = 
  MailboxProcessor.Start(fun self -> 
    let rec loop() =
      async { let! dir = self.Receive()
              do! Async.StartChild(
                    async { let! files = Directory.AsyncGetFiles dir
                            for z in files do fileCollector.Post z }) |> Async.Ignore
              return! loop() }
    loop())

let crawler =
  MailboxProcessor.Start(fun self ->
    let rec loop() =
      async { let! dir = self.Receive()
              folderCollector.Post dir
              do! Async.StartChild(
                    async { let! dirs = Directory.AsyncGetDirectories dir
                            for z in dirs do self.Post z }) |> Async.Ignore
              return! loop() }
    loop())

crawler.Post @"C:\Projects"

printfn "Done" // Message getting fired right away, due to the async stuff.

现在,我如何知道什么时候完成了folderCollectorfileCollectorcrawler,以便在爬虫成功地爬行所有子目录并打印所有文件之后调用最后的printfn语句?

Update:通过使用Tomas在http://tomasp.net/blog/parallel-extra-image-pipeline.aspx中展示的技术,我成功地编写了以下代码:

代码语言:javascript
复制
let folders = new BlockingQueueAgent<string>(100)
let files = new BlockingQueueAgent<string>(100)

let rec folderCollector path =
  async { do! folders.AsyncAdd(path)
          do! Async.StartChild(
                  async { let! dirs = Directory.AsyncGetDirectories path
                          for z in dirs do
                            do! folderCollector z }) |> Async.Ignore }

let fileCollector =
  async { while true do
            let! dir = folders.AsyncGet()
            do! Async.StartChild(
                    async { let! fs = Directory.AsyncGetFiles dir
                            for z in fs do
                              do! files.AsyncAdd z }) |> Async.Ignore }

let rec printFiles() =
  async { let! file = files.AsyncTryGet(75)
          match file with
          | Some s -> 
            printfn "%s" s
            return! displayFiles()
          | None -> () }

let cts = new CancellationTokenSource()
Async.Start(folderCollector @"C:\Projects", cts.Token)
Async.Start(fileCollector, cts.Token)
Async.RunSynchronously(printFiles(), cancellationToken = cts.Token)

printfn "DONE!"

更新:更新:好的,我混淆了以下代码:

代码语言:javascript
复制
let folders = new BlockingQueueAgent<string option>(10)
let files = new BlockingQueueAgent<string option>(10)

let folderCollector path =
  async { let rec loop path = 
            async { do! folders.AsyncAdd(Some path)
                    let! dirs = Directory.AsyncGetDirectories path
                    do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
          do! loop path 
          do! folders.AsyncAdd(None) }

let rec fileCollector() =
  async { let! dir = folders.AsyncGet 125
          match dir with
          | Some s -> 
            let fs = Directory.GetFiles s
            do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] |> Async.Parallel |> Async.Ignore // <-- Fails silence if files are full
            do! fileCollector() // <-- unreachable
          | None -> printfn "Done!"; ()}

看上去不错吧?由于某些原因,在do! fileCollector()行的fileCollector()函数中,如果files BlockingQueueAgent已满,则不会执行。相反,它失败了沉默。

然而,如果我这样做了:

代码语言:javascript
复制
let folderCollector path =
  async { let rec loop path = 
            async { do! folders.AsyncAdd(Some path)
                    let! dirs = Directory.AsyncGetDirectories path
                    do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
          do! loop path 
          do! folders.AsyncAdd(None) }

let rec fileCollector() =
  async { let! dir = folders.AsyncGet 75
          match dir with
          | Some s -> 
            let fs = Directory.GetFiles s
            do! Async.StartChild(async { do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] 
                                             |> Async.Parallel |> Async.Ignore } ) |> Async.Ignore
            do! fileCollector()
          | None -> printfn "Done!"; ()}

效果很好。但是,现在我无法跟踪fileCollector是什么时候完成的,因为它正在运行大量异步计算,因此即使在队列中“无”,它也可能还有一些工作要做。到底怎么回事?

更新:--我已经将fileCollector修改为与folderCollector相同的“样式”,但问题仍然存在。修改后的版本:

代码语言:javascript
复制
let fileCollector() =
  async { let rec loop() = 
            async { let! dir = folders.AsyncGet 750
                    match dir with
                    | Some s -> 
                      let! fs = Directory.AsyncGetFiles s
                      do! [ for z in fs -> printfn "%A" z; files.AsyncAdd(Some z) ] 
                            |> Async.Parallel |> Async.Ignore 
                      return! loop()
                    | None -> printfn "Done!"; () }
          do! loop()
          printfn "after" // Never gets this far... 
          do! files.AsyncAdd(None) }
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2011-08-09 22:38:53

要回答关于基于管道的更新版本的第二个问题(来自评论)--我认为您可以在生成所有文件时使用BlockingQueueAgent<option<string>>并使用值None (然后,None值将在管道中传播,当它们得到None时,您可以结束所有工作流)。

要做到这一点,您需要修改folderCollector以实际检测它何时完成迭代。它没有经过测试,但以下内容应该有效(关键是您需要等待递归调用的完成):

代码语言:javascript
复制
let rec folderCollector path =
  let rec loop path = 
    async { do! folders.AsyncAdd(Some path)
            let! dirs = Directory.AsyncGetDirectories path
            do! [ for z in dirs do -> folderCollector z ] 
                |> Async.Parallel |> Async.Ignore }
  async { do! loop path
          do! folders.AsyncAdd(None) }

所有工作流都有可能获得None,这是AsyncGet的结果。当发生这种情况时,他们应该将None发送给正在执行中的下一个工作人员。最后一个可以在接收到None时终止

代码语言:javascript
复制
let rec printFiles() =
  async { let! file = files.AsyncGet(75) // Note - now we use just AsyncGet
          match file with
          | Some s -> 
            printfn "%s" s
            return! displayFiles()
          | None -> () } // Completed processing all files
票数 3
EN

Stack Overflow用户

发布于 2011-08-09 12:48:35

不支持在F#代理完成时通知您。这其实是很难说出来的。即使队列为空,代理仍未完成,因为它仍然可以接收来自其他代理的消息并重新开始工作。

在您的示例中,当所有三个代理的队列都为空时,就会完成这项工作。这可以使用CurrentQueueLength进行检查。这不是很好的解决方案,但它会奏效的:

代码语言:javascript
复制
crawler.Post @"C:\Temp"
// Busy waiting until all queues are empty
while crawler.CurrentQueueLength <> 0 || folderCollector.CurrentQueueLength <> 0 ||
      fileCollector.CurrentQueueLength <> 0 do
    System.Threading.Thread.Sleep(10)
printfn "Done"

我认为更好的方法是以不同的方式构造代码--您实际上不需要使用代理来递归处理目录树。在您的版本中,目录的遍历(crawler代理)是与在文件夹(folderCollector)中查找文件和处理结果(fileCollector)并行进行的,因此实际上您正在实现一个三步管道。

您可以更容易地使用async实现管道,使用阻塞队列来存储处理的即时结果。这个本文给出了一个图像处理的例子。。我认为同样的方法也适用于你。检测管道处理何时结束应该更容易(在发送所有输入之后,您可以发送一条指示完成的特殊消息,当消息到达管道的末尾时,您就完成了)。

另一种选择是使用异步序列,这可能是解决此类问题的一个很好的模式(但目前还没有好的在线示例)。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/6995313

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档