首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从clojure.lang.LazySeq转换为org.apache.spark.api.java.JavaRDD类型

从clojure.lang.LazySeq转换为org.apache.spark.api.java.JavaRDD类型
EN

Stack Overflow用户
提问于 2015-08-25 14:49:06
回答 1查看 232关注 0票数 1

我在clojure中开发了一个函数,用于从最后一个非空值中填充一个空列,假设这是可行的。

代码语言:javascript
复制
(:require [flambo.api :as f])

(defn replicate-val
  [ rdd input ]
  (let [{:keys [ col ]} input
    result (reductions (fn [a b]
                         (if (empty? (nth b col))
                           (assoc b col (nth a col))
                           b)) rdd )]
(println "Result type is: "(type result))))

得到了这个:

代码语言:javascript
复制
;=> "Result type is:  clojure.lang.LazySeq"

问题是如何使用flambo (火花包装)将其转换回JavaRDD类型。

我在let表单中尝试将let类型转换为JavaRDD类型

我犯了这个错误

代码语言:javascript
复制
"No matching method found: map for class clojure.lang.LazySeq"

这是预期的,因为结果是clojure.lang.LazySeq类型。

问题是如何进行此转换,或如何重构代码以容纳此转换。

下面是一个示例输入rdd:

代码语言:javascript
复制
(type rdd) ;=> "org.apache.spark.api.java.JavaRDD"

但看起来:

代码语言:javascript
复制
[["04" "2" "3"] ["04" "" "5"] ["5" "16" ""] ["07" "" "36"] ["07" "" "34"] ["07" "25" "34"]]

所需产出是:

代码语言:javascript
复制
[["04" "2" "3"] ["04" "2" "5"] ["5" "16" ""] ["07" "16" "36"] ["07" "16" "34"] ["07" "25" "34"]]

谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-08-25 18:57:53

首先,RDD是不可迭代的(不要实现ISeq),因此不能使用reductions。忽略访问先前记录的整个想法是相当棘手的。首先,您不能直接从另一个分区访问值。此外,只有不需要洗牌的转换才能保持秩序。

这里最简单的方法是使用具有显式顺序的数据帧和窗口函数,但据我所知,Flambo并不实现所需的方法。始终可以使用原始SQL或访问Java/Scala,但如果要避免这种情况,可以尝试使用以下管道。

首先,让我们用每个分区的最后一个值创建一个广播变量:

代码语言:javascript
复制
(require '[flambo.broadcast :as bd])
(import org.apache.spark.TaskContext)

(def last-per-part (f/fn [it]
  (let [context (TaskContext/get) xs (iterator-seq it)]
  [[(.partitionId context) (last xs)]])))

(def last-vals-bd
 (bd/broadcast sc
   (into {} (-> rdd (f/map-partitions last-per-part) (f/collect)))))

接下来是实际工作的助手:

代码语言:javascript
复制
(defn fill-pair [col]
  (fn [x] (let [[a b] x] (if (empty? (nth b col)) (assoc b col (nth a col)) b))))

(def fill-pairs
  (f/fn [it] (let [part-id (.partitionId (TaskContext/get)) ;; Get partion ID
                   xs (iterator-seq it) ;; Convert input to seq
                   prev (if (zero? part-id) ;; Find previous element
                     (first xs) ((bd/value last-vals-bd) part-id))        
                   ;; Create seq of pairs (prev, current)
                   pairs (partition 2 1 (cons prev xs))
                   ;; Same as before
                   {:keys [ col ]} input
                   ;; Prepare mapping function
                   mapper (fill-pair col)]
               (map mapper pairs))))

最后,您可以使用fill-pairs来实现map-partitions

代码语言:javascript
复制
(-> rdd (f/map-partitions fill-pairs) (f/collect))

这里隐藏的一个假设是分区的顺序遵循值的顺序。在一般情况下,它可能是也可能不是,但是如果没有明确的排序,它可能是您所能得到的最好的。

另一种方法是使用zipWithIndex,交换值的顺序并使用偏移量执行连接。

代码语言:javascript
复制
(require '[flambo.tuple :as tp])

(def rdd-idx (f/map-to-pair (.zipWithIndex rdd) #(.swap %)))

(def rdd-idx-offset
  (f/map-to-pair rdd-idx
    (fn [t] (let [p (f/untuple t)] (tp/tuple (dec' (first p)) (second p))))))

(f/map (f/values (.rightOuterJoin rdd-idx-offset rdd-idx)) f/untuple)

接下来,您可以使用类似的方法进行映射。

编辑

快速记录on using atoms。问题是,缺乏引用透明度,而且您正在利用给定实现的附带属性,而不是契约。map语义中没有任何东西需要按给定的顺序处理元素。如果内部实现发生更改,则可能不再有效。使用Clojure

代码语言:javascript
复制
(defn foo [x] (let [aa @a] (swap! a (fn [&args] x)) aa))

(def a (atom 0))
(map foo (range 1 20))

与之相比:

代码语言:javascript
复制
(def a (atom 0))
(pmap foo (range 1 20))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32207234

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档