首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >等待多F# Akka.net可观察事件的Akka.net F#有状态参与者

等待多F# Akka.net可观察事件的Akka.net F#有状态参与者
EN

Stack Overflow用户
提问于 2015-12-16 10:33:54
回答 1查看 483关注 0票数 1

我对F#和Akka.Net都很陌生,并试图用它们实现以下目标:

我想要创建一个参与者(尾巴),它接收一个文件位置,然后使用FileSystemWatcher和一些可观察到的值侦听该位置的事件,并将它们作为消息转发给其他参与者进行处理。

我遇到的问题是,侦听事件的代码一次只接收一个事件,而忽略所有其他事件。例如,如果我将20个文件复制到正在监视的目录中,它似乎只发送其中一个文件的事件。

这是我的演员代码:

代码语言:javascript
复制
module Tail

open Akka
open Akka.FSharp
open Akka.Actor
open System
open Model
open ObserveFiles
open ConsoleWriteActor

let handleTailMessages tm =
    match tm with
        | StartTail (f,r) ->
            observeFile f consoleWriteActor |!> consoleWriteActor

    |> ignore

let spawnTail =
    fun (a : Actor<IMessage> )  -> 
    let rec l (count : int) = actor{

        let! m = a.Receive()
        handleTailMessages m
        return! l (count + 1)
    } 
    l(0) 

下面是监听事件的代码:

代码语言:javascript
复制
module ObserveFiles
open System
open System.IO
open System.Threading
open Model
open Utils
open Akka
open Akka.FSharp
open Akka.Actor



let rec observeFile (absolutePath : string) (a : IActorRef )  = async{

    let fsw = new FileSystemWatcher(
                        Path = Path.GetDirectoryName(absolutePath), 
                        Filter = "*.*",
                        EnableRaisingEvents = true, 
                        NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName)
                        )

    let prepareMessage  (args: EventArgs) =
        let text = 
            match box args with
            | :? FileSystemEventArgs as fsa ->
                match fsa.ChangeType with
                | WatcherChangeTypes.Changed -> "Changed " + fsa.Name
                | WatcherChangeTypes.Created ->  "Created " + fsa.Name
                | WatcherChangeTypes.Deleted -> "Deleted " + fsa.Name
                | WatcherChangeTypes.Renamed -> "Renamed " + fsa.Name
                | _ -> "Some other change " + fsa.ChangeType.ToString()
            | :? ErrorEventArgs as ea -> "Error: " + ea.GetException().Message
            | o -> "some other unexpected event occurd" + o.GetType().ToString()
        WriteMessage text 


    let sendMessage x = async{  async.Return(prepareMessage x) |!> a
                                return! observeFile absolutePath a }

    let! occurance  = 
        [
        fsw.Changed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Created |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Deleted |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Renamed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Error |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        ] 
        |> List.reduce Observable.merge
        |> Async.AwaitObservable

    return! occurance
}

花了相当多的技巧才能做到这一点,任何关于我如何改变它的建议都是非常感谢的,这样它就能在演员运行时收集和处理所有的事件。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-12-16 12:13:54

在设计这样的任务时,我们可以将其分成以下组件:

  1. 创建负责接收所有消息的管理器-它的主要角色是响应传入的目录侦听请求。一旦请求传入,它就会创建一个子角色,负责在这个特定目录下侦听。
  2. 子参与者负责管理特定路径的FileSystemWatcher。它应该订阅传入的事件,并将它们作为消息重定向到负责接收更改事件的参与者。当它关闭时,它也应该免费使用可支配资源。
  3. 负责接收更改事件的参与者--在我们的示例中,通过在控制台上显示更改事件。

示例代码:

代码语言:javascript
复制
open Akka.FSharp
open System
open System.IO

let system = System.create "observer-system" <| Configuration.defaultConfig()

let observer filePath consoleWriter (mailbox: Actor<_>) =    
    let fsw = new FileSystemWatcher(
                        Path = filePath, 
                        Filter = "*.*",
                        EnableRaisingEvents = true, 
                        NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName)
                        )
    // subscribe to all incoming events - send them to consoleWriter
    let subscription = 
        [fsw.Changed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
         fsw.Created |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
         fsw.Deleted |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
         fsw.Renamed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());]
             |> List.reduce Observable.merge
             |> Observable.subscribe(fun x -> consoleWriter <! x)

    // don't forget to free resources at the end
    mailbox.Defer <| fun () -> 
        subscription.Dispose()
        fsw.Dispose()

    let rec loop () = actor {
        let! msg = mailbox.Receive()
        return! loop()
    }
    loop ()

// create actor responsible for printing messages
let writer = spawn system "console-writer" <| actorOf (printfn "%A")

// create manager responsible for serving listeners for provided paths
let manager = spawn system "manager" <| actorOf2 (fun mailbox filePath ->
    spawn mailbox ("observer-" + Uri.EscapeDataString(filePath)) (observer filePath writer) |> ignore)

manager <! "testDir"
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34309671

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档