我读取了一个拼图文件,并使用Flambo api以RDD格式获取数据。我们应用列名称的zipmap并创建一个hash map/ Clojure map
假设我的地图具有以下值
[{:a 1 :b2}
{:a 2 :b 2}]
(:require [flambo.api :as f])core.clj
我正在使用
(f/map rdd-records (f/fn[each-rdd]
(perform-calcs each-red)))在perform calcs函数中,我们根据map的输入进行额外的计算,如下所示
cals.clj
(defn perform-calcs
[r]
(merge r {:c (+ (:a r) (:b r))}))我们有了一个新的要求,那就是根据另一个文件中的另一个DataFrame执行另一个计算。我们不想为每个记录加载文件,所以保留了加载DataFrame外部计算的代码,并在commons文件中定义。此DataFrame作为应用程序的一部分加载,并且可以跨应用程序访问。
commons.clj
(def another-csv-df
(load-file->df "file-name"))calcs.clj
(defn df-lookup
[r df]
{:d (->
df (.filter (format "a = %d and b = %d" (:a r) (:b r) )
(.select (into [] (map #(Column. %) ["d"] )))
(first)
(.getString(0))})通过将其包含在perform calcs函数中,将按如下方式进行更改。
(defn perform-calcs
[r]
(-> r
(merge {:c (+ (:a r) (:b r))})
(df-lookup commons/another-csv-df))在现实中,我看到数据框上的值...代码运行了很长一段时间,没有DF的外部调用,代码运行了很长时间。并且永远不会完成这个过程
发布于 2018-12-26 08:28:11
像这样的嵌套转换在Spark中是完全不允许的。您将不得不重新考虑您的方法,可能是通过将RDD转换为Dataset并在两者之间执行join。
https://stackoverflow.com/questions/53926333
复制相似问题