首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带分区的core.async -通过状态传感器不保持状态?

带分区的core.async -通过状态传感器不保持状态?
EN

Stack Overflow用户
提问于 2018-03-07 07:56:20
回答 1查看 268关注 0票数 1

(我有一个先前的问题,https://stackoverflow.com/questions/49097325/clojure-strng-concat-with-group-by-in-sequences-of-maps,并假设我不会有问题进入core.async)。

给定这样的输入数据:

代码语言:javascript
复制
(require '[clojure.core.async :as a])

(def input-data
  [{:itm_na 1 :seq_no 1  :doc_img "this is a very long "}
   {:itm_na 1 :seq_no 2  :doc_img "sentence from a mainframe "}
   {:itm_na 1 :seq_no 3  :doc_img "system that was built before i was "}
   {:itm_na 1 :seq_no 4  :doc_img "born."}
   {:itm_na 2 :seq_no 1  :doc_img "this is a another very long "}
   {:itm_na 2 :seq_no 2  :doc_img "sentence from the same mainframe "}
   {:itm_na 3 :seq_no 1  :doc_img "Ok here we are again. "}
   {:itm_na 3 :seq_no 2  :doc_img "The mainframe only had 40 char per field so"}
   {:itm_na 3 :seq_no 3  :doc_img "they broke it into multiple rows "}
   {:itm_na 3 :seq_no 4  :doc_img "which seems to be common"}
   {:itm_na 3 :seq_no 5  :doc_img " for the time. "}
   {:itm_na 3 :seq_no 6  :doc_img "thanks for your help."}])

partition-by (如预期的)将数据集中到seq中(以便稍后崩溃):

代码语言:javascript
复制
(count (partition-by :itm_na input-data ))
;;=> 3

然而,当我试图用core.async管道来做这件事时,出于某种原因,它似乎不这么做……在异步管道中,如何获得partition-by的状态转换器部分实际保持状态?

代码语言:javascript
复制
(let
    [source-chan (a/to-chan input-data)
     target-chan (a/chan 100)
     xf (comp (partition-by :itm_na))
     ]
  (a/pipeline 1
              target-chan
              xf
              source-chan)
  (count (<!! (a/into [] target-chan))))

;;=>12

这应该是3?

奇怪的是,当我像下面这样将xf绑定到通道时,我就得到了预期的结果。我不知道为什么a/pipeline的行为会有所不同。

代码语言:javascript
复制
(let [xf (comp (partition-by :itm_na))
      ch (a/chan 1 xf)]
  (a/onto-chan ch input-data)
  (count (<!! (a/into [] ch))))
=>3

医生的..。提到有状态比特:

代码语言:javascript
复制
(clojure.repl/doc partition-by) 
-------------------------
clojure.core/partition-by
([f] [f coll])
  Applies f to each value in coll, splitting it each time f returns a
   new value.  Returns a lazy seq of partitions.  Returns a stateful
   transducer when no collection is provided.
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-03-07 09:25:25

这个特殊的例子是Rich在他的演讲中的简略突出:您不能在有状态的传感器上使用pipeline,这主要是因为pipeline的并行性。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49146778

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档