首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Clojure中编写Spark结构化流式处理示例时出错

在Clojure中编写Spark结构化流式处理示例时出错
EN

Stack Overflow用户
提问于 2017-10-10 09:07:34
回答 1查看 188关注 0票数 4

我正在尝试用Clojure重写Spark结构化流媒体示例。

示例使用Scala编写,如下所示:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

代码语言:javascript
复制
(ns flambo-example.streaming-example
  (:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
           [org.apache.spark.sql.functions]
           ))

(def spark
  (->
   (SparkSession/builder)
   (.appName "sample")
   (.master "local[*]")
   .getOrCreate)  
  )


(def lines
  (-> spark
      .readStream
      (.format "socket")
      (.option "host" "localhost")
      (.option "port" 9999)
      .load      
      )
  )

(def words
  (-> lines
      (.as (Encoders/STRING))      
      (.flatMap #(clojure.string/split  % #" " ))      
      ))

上面的代码导致了以下异常。

;;由java.lang.IllegalArgumentException引起;;找不到匹配的方法:类的flatMap ;;org.apache.spark.sql.Dataset

我如何避免这个错误?

EN

回答 1

Stack Overflow用户

发布于 2017-10-10 20:46:37

你得跟着签名走。Java Dataset API提供了两种Dataset.flatMap实现,一种采用scala.Function1

代码语言:javascript
复制
def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

第二个获取了斯帕克自己的o.a.s.api.java.function.FlatMapFunction

代码语言:javascript
复制
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 

前者对您来说相当无用,但您应该能够使用后者。对于可以通过flambo.api/fn访问的RDD API flambo uses macros to create Spark friendly adapters -我不确定它们是否可以直接与Datasets一起使用,但如果需要,您应该能够调整它们。

由于您不能依赖隐式Encoders,因此还必须提供与返回类型匹配的显式编码器。

总体而言,你需要一些东西:

代码语言:javascript
复制
(def words
  (-> lines
    (.as (Encoders/STRING))      
    (.flatMap f e)      
  ))

其中f实现了FlatMapFunction,而e是一个Encoder。一个示例实现:

代码语言:javascript
复制
(def words
  (-> lines
      (.as (Encoders/STRING))      
      (.flatMap
        (proxy [FlatMapFunction] [] 
          (call [s] (.iterator (clojure.string/split s #" ")))) 
        (Encoders/STRING))))

但我想有可能找到一个更好的。

在实践中,我会尽量避免输入Dataset,而专注于DataFrame (Dataset[Row])。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46657192

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档