在Clojure的异步库中,我很难理解一个我认为是相当简单的概念。我实际上是用管道创建了两个通道,其中输出通道是使用输入通道的take函数创建的。
根据我的理解,take的目的是在通道关闭之前限制通道将接收的项的数量(如果输入通道在此时还没有关闭)。然而,我一直使用的代码示例并没有产生我期望的结果。
以下面的代码为例:
(def in (chan 1))
(def out (async/take 5 in 1))
(doseq [i (range 10)]
(go (>! in i)))
(pipeline 4 out (filter even?) in)
(go-loop []
(when-some [val (<! out)]
(println val)
(recur))))我所期望发生的是,管道会过滤掉奇数,并且只将偶数传递给“out”通道,当out通道接收到5个偶数时,它将关闭。然而,我看到的是打印到REPL的奇数和偶数,如下所示:
2 7 4 0 8 6
在这一点上,out通道仍然没有关闭,在最终关闭之前,第二次运行doseq将打印一些其他值。
我对这里发生的事情感到难以置信的困惑,当使用take而不是管道时,它的工作方式很有魅力,当不使用take但仍然使用管道时,它也可以工作,使用两者的组合似乎是一个完全不同的故事。我是不是漏掉了什么明显的东西?如果这是一个简单的错误,很抱歉,这是我第一次(尽管很天真)尝试使用core.async。
发布于 2018-09-09 12:24:49
您已经将take和pipeline放在了竞争中。它们都从in中获取项目并将其添加到out中。替换out的定义
(def out (async/chan 3))例如,并获得预期的结果
0
2
4
6
8如果你真的想使用async/take,你可以这样做:
(def first (async/chan 1))
(def second (async/chan 3))
(pipeline 4 second (filter even?) first)
(def third (async/take 3 second))
(defn run []
(go
(doseq [i (range 10)]
(>! first i)))
(go (loop []
(when-some [val (<! third)]
(println val)
(recur)))))with结果:
0
2
4https://stackoverflow.com/questions/52240331
复制相似问题