我现在正在玩MailboxProcessor。因此,我组成了几个代理程序,它们可以在计算机和所有子目录中爬行一个目录,然后打印每个目录中的文件:
let fileCollector =
MailboxProcessor.Start(fun self ->
let rec loop() =
async { let! file = self.Receive()
printfn "%s" file
return! loop() }
loop())
let folderCollector =
MailboxProcessor.Start(fun self ->
let rec loop() =
async { let! dir = self.Receive()
do! Async.StartChild(
async { let! files = Directory.AsyncGetFiles dir
for z in files do fileCollector.Post z }) |> Async.Ignore
return! loop() }
loop())
let crawler =
MailboxProcessor.Start(fun self ->
let rec loop() =
async { let! dir = self.Receive()
folderCollector.Post dir
do! Async.StartChild(
async { let! dirs = Directory.AsyncGetDirectories dir
for z in dirs do self.Post z }) |> Async.Ignore
return! loop() }
loop())
crawler.Post @"C:\Projects"
printfn "Done" // Message getting fired right away, due to the async stuff.现在,我如何知道什么时候完成了folderCollector、fileCollector和crawler,以便在爬虫成功地爬行所有子目录并打印所有文件之后调用最后的printfn语句?
Update:通过使用Tomas在http://tomasp.net/blog/parallel-extra-image-pipeline.aspx中展示的技术,我成功地编写了以下代码:
let folders = new BlockingQueueAgent<string>(100)
let files = new BlockingQueueAgent<string>(100)
let rec folderCollector path =
async { do! folders.AsyncAdd(path)
do! Async.StartChild(
async { let! dirs = Directory.AsyncGetDirectories path
for z in dirs do
do! folderCollector z }) |> Async.Ignore }
let fileCollector =
async { while true do
let! dir = folders.AsyncGet()
do! Async.StartChild(
async { let! fs = Directory.AsyncGetFiles dir
for z in fs do
do! files.AsyncAdd z }) |> Async.Ignore }
let rec printFiles() =
async { let! file = files.AsyncTryGet(75)
match file with
| Some s ->
printfn "%s" s
return! displayFiles()
| None -> () }
let cts = new CancellationTokenSource()
Async.Start(folderCollector @"C:\Projects", cts.Token)
Async.Start(fileCollector, cts.Token)
Async.RunSynchronously(printFiles(), cancellationToken = cts.Token)
printfn "DONE!"更新:更新:好的,我混淆了以下代码:
let folders = new BlockingQueueAgent<string option>(10)
let files = new BlockingQueueAgent<string option>(10)
let folderCollector path =
async { let rec loop path =
async { do! folders.AsyncAdd(Some path)
let! dirs = Directory.AsyncGetDirectories path
do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore }
do! loop path
do! folders.AsyncAdd(None) }
let rec fileCollector() =
async { let! dir = folders.AsyncGet 125
match dir with
| Some s ->
let fs = Directory.GetFiles s
do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] |> Async.Parallel |> Async.Ignore // <-- Fails silence if files are full
do! fileCollector() // <-- unreachable
| None -> printfn "Done!"; ()}看上去不错吧?由于某些原因,在do! fileCollector()行的fileCollector()函数中,如果files BlockingQueueAgent已满,则不会执行。相反,它失败了沉默。
然而,如果我这样做了:
let folderCollector path =
async { let rec loop path =
async { do! folders.AsyncAdd(Some path)
let! dirs = Directory.AsyncGetDirectories path
do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore }
do! loop path
do! folders.AsyncAdd(None) }
let rec fileCollector() =
async { let! dir = folders.AsyncGet 75
match dir with
| Some s ->
let fs = Directory.GetFiles s
do! Async.StartChild(async { do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ]
|> Async.Parallel |> Async.Ignore } ) |> Async.Ignore
do! fileCollector()
| None -> printfn "Done!"; ()}效果很好。但是,现在我无法跟踪fileCollector是什么时候完成的,因为它正在运行大量异步计算,因此即使在队列中“无”,它也可能还有一些工作要做。到底怎么回事?
更新:--我已经将fileCollector修改为与folderCollector相同的“样式”,但问题仍然存在。修改后的版本:
let fileCollector() =
async { let rec loop() =
async { let! dir = folders.AsyncGet 750
match dir with
| Some s ->
let! fs = Directory.AsyncGetFiles s
do! [ for z in fs -> printfn "%A" z; files.AsyncAdd(Some z) ]
|> Async.Parallel |> Async.Ignore
return! loop()
| None -> printfn "Done!"; () }
do! loop()
printfn "after" // Never gets this far...
do! files.AsyncAdd(None) }发布于 2011-08-09 22:38:53
要回答关于基于管道的更新版本的第二个问题(来自评论)--我认为您可以在生成所有文件时使用BlockingQueueAgent<option<string>>并使用值None (然后,None值将在管道中传播,当它们得到None时,您可以结束所有工作流)。
要做到这一点,您需要修改folderCollector以实际检测它何时完成迭代。它没有经过测试,但以下内容应该有效(关键是您需要等待递归调用的完成):
let rec folderCollector path =
let rec loop path =
async { do! folders.AsyncAdd(Some path)
let! dirs = Directory.AsyncGetDirectories path
do! [ for z in dirs do -> folderCollector z ]
|> Async.Parallel |> Async.Ignore }
async { do! loop path
do! folders.AsyncAdd(None) }所有工作流都有可能获得None,这是AsyncGet的结果。当发生这种情况时,他们应该将None发送给正在执行中的下一个工作人员。最后一个可以在接收到None时终止
let rec printFiles() =
async { let! file = files.AsyncGet(75) // Note - now we use just AsyncGet
match file with
| Some s ->
printfn "%s" s
return! displayFiles()
| None -> () } // Completed processing all files发布于 2011-08-09 12:48:35
不支持在F#代理完成时通知您。这其实是很难说出来的。即使队列为空,代理仍未完成,因为它仍然可以接收来自其他代理的消息并重新开始工作。
在您的示例中,当所有三个代理的队列都为空时,就会完成这项工作。这可以使用CurrentQueueLength进行检查。这不是很好的解决方案,但它会奏效的:
crawler.Post @"C:\Temp"
// Busy waiting until all queues are empty
while crawler.CurrentQueueLength <> 0 || folderCollector.CurrentQueueLength <> 0 ||
fileCollector.CurrentQueueLength <> 0 do
System.Threading.Thread.Sleep(10)
printfn "Done"我认为更好的方法是以不同的方式构造代码--您实际上不需要使用代理来递归处理目录树。在您的版本中,目录的遍历(crawler代理)是与在文件夹(folderCollector)中查找文件和处理结果(fileCollector)并行进行的,因此实际上您正在实现一个三步管道。
您可以更容易地使用async实现管道,使用阻塞队列来存储处理的即时结果。这个本文给出了一个图像处理的例子。。我认为同样的方法也适用于你。检测管道处理何时结束应该更容易(在发送所有输入之后,您可以发送一条指示完成的特殊消息,当消息到达管道的末尾时,您就完成了)。
另一种选择是使用异步序列,这可能是解决此类问题的一个很好的模式(但目前还没有好的在线示例)。
https://stackoverflow.com/questions/6995313
复制相似问题