首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >导入spark.implicits._未使用

导入spark.implicits._未使用
EN

Stack Overflow用户
提问于 2016-09-29 05:23:00
回答 1查看 4.5K关注 0票数 1

我使用以下代码创建了一个数据帧

代码语言:javascript
复制
val SomeCsv = spark.read.option("header", "true").
  csv(conf.getString("data.path.Somecsv")).toDF()

我有一个函数(到目前为止什么也没做)看起来像这样。

代码语言:javascript
复制
def cleanUp(data: sql.DataFrame): sql.DataFrame = {
  data.map({
    doc =>
      (
        doc

        )
  })
}

它在编译时中断,并显示以下错误:

找不到存储在数据集中的类型的编码器。通过导入spark.implicits._支持基元类型(Int、String等)和产品类型(case类)

我已经按照其他帖子的建议设置了import语句。

代码语言:javascript
复制
val spark = SparkSession.builder...etc
import spark.implicits._

IntelliJ将import语句标记为未使用

我的猜测是

1.)csv加载代码使用的是某个编码器,它是一个对象而不是原语。

2.)和/或我需要在函数语句中指定dataframe的数据类型,就像您对RDD所做的那样?我在Spark文档中找不到任何关于这方面的信息。

编辑

如果我改用

代码语言:javascript
复制
val SomeOtherCsv = SomeCsv.map(t => t(0) + "foobar")

import语句被触发,一切都可以很好地编译。我现在的问题是,相同数据上的方法版本(上面)仍然中断。

EDIT2

这是MCVE

代码语言:javascript
复制
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._/*statement unused*/
import com.typesafe.config.ConfigFactory

object main {
  def main(args: Array[String]) = {
    /*load spark conf*/
    val sparkConf = new SparkConf().setAppName("main")
    val sc = new SparkContext(sparkConf)
    /*load configure tool*/
    val conf = ConfigFactory.load()
    /*load spark session*/
    val spark = SparkSession.builder.
      master("local")
      .appName("tester")
      .getOrCreate()
    import spark.implicits._/* is used for val ProcessedGenomeCsv but not testFunction*/
    /*load genome csv as dataframe, conf.getString points to application.conf which contains a local directory for the csv file*/
    val GenomeCsv = spark.read.option("header", "true").
      csv(conf.getString("data.path.genomecsv")).toDF()
    /*cleans up segment names in csv so the can be matched to amino data*/
    def testFunctionOne(data: sql.DataFrame): sql.DataFrame = {/* breaks with import spark.implicits._ error, error points to next line "data.map"*/
      data.map({
        doc =>
          (
            doc

            )
      })
    }
    val ProcessedGenomeCsv = GenomeCsv.map(t => t(12) + "foobar")/* breaks when adding sqlContext and sqlContext.implicits._, is fine otherwise*/
    val FunctionProcessedGenomCsv = testFunctionOne(GenomeCsv)
    ProcessedGenomeCsv.take(1).foreach(println)
    FunctionProcessedGenomCsv.take(1).foreach(println)
  }
}
EN

回答 1

Stack Overflow用户

发布于 2016-09-29 06:38:09

你想要sqlContext.implicits._

您希望在创建sqlContext (它已经在spark-shell中为您创建,但不是在spark-submit中)之后声明它。

您希望它看起来像这样:

代码语言:javascript
复制
object Driver {
    def main(args: Array[String]):Unit = {
        val spark_conf =
          new SparkConf()
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .setAppName("Spark Tika HDFS")
        val sc = new SparkContext(spark_conf)

        import sqlContext.implicits._

        val df = ....

    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39757898

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档