我使用以下代码创建了一个数据帧
val SomeCsv = spark.read.option("header", "true").
csv(conf.getString("data.path.Somecsv")).toDF()我有一个函数(到目前为止什么也没做)看起来像这样。
def cleanUp(data: sql.DataFrame): sql.DataFrame = {
data.map({
doc =>
(
doc
)
})
}它在编译时中断,并显示以下错误:
找不到存储在数据集中的类型的编码器。通过导入spark.implicits._支持基元类型(Int、String等)和产品类型(case类)
我已经按照其他帖子的建议设置了import语句。
val spark = SparkSession.builder...etc
import spark.implicits._IntelliJ将import语句标记为未使用
我的猜测是
1.)csv加载代码使用的是某个编码器,它是一个对象而不是原语。
2.)和/或我需要在函数语句中指定dataframe的数据类型,就像您对RDD所做的那样?我在Spark文档中找不到任何关于这方面的信息。
编辑
如果我改用
val SomeOtherCsv = SomeCsv.map(t => t(0) + "foobar")import语句被触发,一切都可以很好地编译。我现在的问题是,相同数据上的方法版本(上面)仍然中断。
EDIT2
这是MCVE
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._/*statement unused*/
import com.typesafe.config.ConfigFactory
object main {
def main(args: Array[String]) = {
/*load spark conf*/
val sparkConf = new SparkConf().setAppName("main")
val sc = new SparkContext(sparkConf)
/*load configure tool*/
val conf = ConfigFactory.load()
/*load spark session*/
val spark = SparkSession.builder.
master("local")
.appName("tester")
.getOrCreate()
import spark.implicits._/* is used for val ProcessedGenomeCsv but not testFunction*/
/*load genome csv as dataframe, conf.getString points to application.conf which contains a local directory for the csv file*/
val GenomeCsv = spark.read.option("header", "true").
csv(conf.getString("data.path.genomecsv")).toDF()
/*cleans up segment names in csv so the can be matched to amino data*/
def testFunctionOne(data: sql.DataFrame): sql.DataFrame = {/* breaks with import spark.implicits._ error, error points to next line "data.map"*/
data.map({
doc =>
(
doc
)
})
}
val ProcessedGenomeCsv = GenomeCsv.map(t => t(12) + "foobar")/* breaks when adding sqlContext and sqlContext.implicits._, is fine otherwise*/
val FunctionProcessedGenomCsv = testFunctionOne(GenomeCsv)
ProcessedGenomeCsv.take(1).foreach(println)
FunctionProcessedGenomCsv.take(1).foreach(println)
}
}发布于 2016-09-29 06:38:09
你想要sqlContext.implicits._
您希望在创建sqlContext (它已经在spark-shell中为您创建,但不是在spark-submit中)之后声明它。
您希望它看起来像这样:
object Driver {
def main(args: Array[String]):Unit = {
val spark_conf =
new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName("Spark Tika HDFS")
val sc = new SparkContext(spark_conf)
import sqlContext.implicits._
val df = ....
}
}https://stackoverflow.com/questions/39757898
复制相似问题