我正在尝试用Clojure重写Spark结构化流媒体示例。
示例使用Scala编写,如下所示:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
(ns flambo-example.streaming-example
(:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
[org.apache.spark.sql.functions]
))
(def spark
(->
(SparkSession/builder)
(.appName "sample")
(.master "local[*]")
.getOrCreate)
)
(def lines
(-> spark
.readStream
(.format "socket")
(.option "host" "localhost")
(.option "port" 9999)
.load
)
)
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap #(clojure.string/split % #" " ))
))上面的代码导致了以下异常。
;;由java.lang.IllegalArgumentException引起;;找不到匹配的方法:类的flatMap ;;org.apache.spark.sql.Dataset
我如何避免这个错误?
发布于 2017-10-10 20:46:37
你得跟着签名走。Java Dataset API提供了两种Dataset.flatMap实现,一种采用scala.Function1
def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 第二个获取了斯帕克自己的o.a.s.api.java.function.FlatMapFunction
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 前者对您来说相当无用,但您应该能够使用后者。对于可以通过flambo.api/fn访问的RDD API flambo uses macros to create Spark friendly adapters -我不确定它们是否可以直接与Datasets一起使用,但如果需要,您应该能够调整它们。
由于您不能依赖隐式Encoders,因此还必须提供与返回类型匹配的显式编码器。
总体而言,你需要一些东西:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap f e)
))其中f实现了FlatMapFunction,而e是一个Encoder。一个示例实现:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap
(proxy [FlatMapFunction] []
(call [s] (.iterator (clojure.string/split s #" "))))
(Encoders/STRING))))但我想有可能找到一个更好的。
在实践中,我会尽量避免输入Dataset,而专注于DataFrame (Dataset[Row])。
https://stackoverflow.com/questions/46657192
复制相似问题