(我有一个先前的问题,https://stackoverflow.com/questions/49097325/clojure-strng-concat-with-group-by-in-sequences-of-maps,并假设我不会有问题进入core.async)。
给定这样的输入数据:
(require '[clojure.core.async :as a])
(def input-data
[{:itm_na 1 :seq_no 1 :doc_img "this is a very long "}
{:itm_na 1 :seq_no 2 :doc_img "sentence from a mainframe "}
{:itm_na 1 :seq_no 3 :doc_img "system that was built before i was "}
{:itm_na 1 :seq_no 4 :doc_img "born."}
{:itm_na 2 :seq_no 1 :doc_img "this is a another very long "}
{:itm_na 2 :seq_no 2 :doc_img "sentence from the same mainframe "}
{:itm_na 3 :seq_no 1 :doc_img "Ok here we are again. "}
{:itm_na 3 :seq_no 2 :doc_img "The mainframe only had 40 char per field so"}
{:itm_na 3 :seq_no 3 :doc_img "they broke it into multiple rows "}
{:itm_na 3 :seq_no 4 :doc_img "which seems to be common"}
{:itm_na 3 :seq_no 5 :doc_img " for the time. "}
{:itm_na 3 :seq_no 6 :doc_img "thanks for your help."}])partition-by (如预期的)将数据集中到seq中(以便稍后崩溃):
(count (partition-by :itm_na input-data ))
;;=> 3然而,当我试图用core.async管道来做这件事时,出于某种原因,它似乎不这么做……在异步管道中,如何获得partition-by的状态转换器部分实际保持状态?
(let
[source-chan (a/to-chan input-data)
target-chan (a/chan 100)
xf (comp (partition-by :itm_na))
]
(a/pipeline 1
target-chan
xf
source-chan)
(count (<!! (a/into [] target-chan))))
;;=>12这应该是3?
奇怪的是,当我像下面这样将xf绑定到通道时,我就得到了预期的结果。我不知道为什么a/pipeline的行为会有所不同。
(let [xf (comp (partition-by :itm_na))
ch (a/chan 1 xf)]
(a/onto-chan ch input-data)
(count (<!! (a/into [] ch))))
=>3医生的..。提到有状态比特:
(clojure.repl/doc partition-by)
-------------------------
clojure.core/partition-by
([f] [f coll])
Applies f to each value in coll, splitting it each time f returns a
new value. Returns a lazy seq of partitions. Returns a stateful
transducer when no collection is provided.发布于 2018-03-07 09:25:25
这个特殊的例子是Rich在他的演讲中的简略突出:您不能在有状态的传感器上使用pipeline,这主要是因为pipeline的并行性。
https://stackoverflow.com/questions/49146778
复制相似问题