我目前正在学习Lwt。我有兴趣使用异步进程来用OCaml例程替换一些shell例程。
让我们看一看简化的第一次尝试,其中通过组合运行cat的两个线程创建了一个过滤器。
let filter_cat ()=
Lwt_process.pmap_lines ("cat", [| "cat" |])
let filter_t () =
Lwt_io.stdin
|> Lwt_io.read_lines
|> filter_cat ()
|> filter_cat ()
|> Lwt_io.write_lines Lwt_io.stdout
let () =
filter_t ()
|> Lwt_main.run这个过滤器在某种程度上起作用,但是当它的标准输入关闭而不是退出时挂断。如果我移除其中一个filter_cat,它就会像预期的那样工作。
我猜我没有适当地组合这些过滤器,因此不能加入我正在启动的两个线程。合成这些过滤器的正确方法是什么,以便程序在读取EOF on stdin之后终止。
您可以在BSD猫头鹰中找到这个程序和一个Github gist生成文件。
发布于 2016-01-25 08:11:15
这个问题的答案是,Lwt中有一个小错误。有一个内部函数,监控器,它执行管道:
(* Monitor the thread [sender] in the stream [st] so write errors are
reported. *)
let monitor sender st =
let sender = sender >|= fun () -> None in
let state = ref Init in
Lwt_stream.from
(fun () ->
match !state with
| Init ->
let getter = Lwt.apply Lwt_stream.get st in
let result _ =
match Lwt.state sender with
| Lwt.Sleep ->
(* The sender is still sleeping, behave as the
getter. *)
getter
| Lwt.Return _ ->
(* The sender terminated successfully, we are
done monitoring it. *)
state := Done;
getter
| Lwt.Fail _ ->
(* The sender failed, behave as the sender for
this element and save current getter. *)
state := Save getter;
sender
in
Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
| Save t ->
state := Done;
t
| Done ->
Lwt_stream.get st)问题在定义中。
let getter = Lwt.apply Lwt_stream.get st当getter进程满足流的末尾时,它就会被保存,但是sender会丢失,这似乎会阻止完成。这可以通过改进getter的定义来解决,方法是让它在流结束时作为sender运行。
https://stackoverflow.com/questions/29049860
复制相似问题