首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala中的Java时间戳问题-获取的java.lang.IllegalArgumentException错误

Scala中的Java时间戳问题-获取的java.lang.IllegalArgumentException错误
EN

Stack Overflow用户
提问于 2017-01-22 07:19:24
回答 1查看 405关注 0票数 0

晚上好。

我正在对Spark 2.1.0 (使用内置的Scala 2.11.8)中的RDDs、Dataframe和Datasets的性能做一些比较工作。我已经从https://data.london.gov.uk/dataset/smartmeter-energy-use-data-in-london-households下载了一些免费的数据,并在稍后执行了上面显示的脚本。为了给您提供预览,查询的数据如下所示:

代码语言:javascript
复制
LCLid,stdorToU,DateTime,KWH/hh (per half hour) ,Acorn,Acorn_grouped
MAC000002,Std,2012-10-12 00:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 01:00:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 01:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 02:00:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 02:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 03:00:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 03:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 04:00:00.0000000, 0 ,ACORN-A,Affluent

为了实现我的比较工作,我在time Spark的不同阶段导入和转换了String、String、Timestamp、Double、String、String这6个变量的上面表达。我已经成功地将数据映射到Dataframe和Dataset中,但在RDD方面不能完全实现同样的效果。每次我尝试将该文件转换为RDD时,都会出现以下错误:

代码语言:javascript
复制
ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]

我非常困惑,因为变量'DateTime‘已经被表示为时间戳格式'yyyy-mm-dd hh:mm:ss.fffffffff’。我读过这样的帖子(Convert Date to Timestamp in ScalaHow to convert unix timestamp to date in SparkSpark SQL: parse timestamp without seconds),但不能满足我的需求。

这更令人困惑,因为我构造的定义类'londonDataSchemaDS‘在我的数据集转换上工作,而不是在我的RDD上工作。

这是我使用的脚本:

代码语言:javascript
复制
import java.io.File
import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

val sparkSession = SparkSession.builder.appName("SmartData London").master("local[*]").getOrCreate()

val LCLid = StructField("LCLid", DataTypes.StringType)
val stdorToU = StructField("stdorToU", DataTypes.StringType)
val DateTime = StructField("DateTime", DataTypes.TimestampType)
val KWHhh = StructField("KWH/hh (per half hour) ", DataTypes.DoubleType)
val Acorn = StructField("Acorn", DataTypes.StringType)
val Acorn_grouped = StructField("Acorn_grouped", DataTypes.StringType)

val fields = Array(LCLid,stdorToU,DateTime,KWHhh,Acorn,Acorn_grouped)
val londonDataSchemaDF = StructType(fields)

import sparkSession.implicits._

case class londonDataSchemaDS(LCLid: String, stdorToU: String, DateTime: java.sql.Timestamp, KWHhh: Double, Acorn: String, Acorn_grouped: String)

val t0 = System.nanoTime()

val loadFileRDD=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv")
.map(_.split(","))
.map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).toDouble, r(4), r(5)))

val t1 = System.nanoTime()

val loadFileDF=sparkSession.read.schema(londonDataSchemaDF).option("header", true)
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv")

val t2=System.nanoTime()

val loadFileDS=sparkSession.read.option("header", "true")
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv")
.withColumn("DateTime", $"DateTime".cast("timestamp"))
.withColumnRenamed("KWH/hh (per half hour) ", "KWHhh")
.withColumn("KWHhh", $"KWHhh".cast("double"))
.as[londonDataSchemaDS]

val t3 = System.nanoTime()

loadFileRDD.take(10)

loadFileDF.show(10, false)
loadFileDF.printSchema()

loadFileDS.show(10, false)
loadFileDS.printSchema()

println("Time Elapsed to implement RDD: " + (t1 - t0) * 1E-9 + " seconds")
println("Time Elapsed to implement DataFrame: " + (t2 - t1) * 1E-9 + " seconds")
println("Time Elapsed to implement Dataset: " + (t3 - t2) * 1E-9 + " seconds")

在这方面的任何帮助都将是非常感谢的和/或在正确方向上的推动。

非常感谢,

克里斯蒂安

EN

回答 1

Stack Overflow用户

发布于 2017-01-22 23:26:17

我知道我做错了什么。我被DataFrame和Dataset转换所吸引,它有一个内置的函数来跳过头部,以至于我忘记了从RDD转换过程中删除头部。

通过添加以下几行代码,我删除了报头,并成功地将csv转换为RDD (这就解释了我在时间戳中遇到格式错误的原因):

代码语言:javascript
复制
val loadFileRDDwH=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv").map(_.split(","))

val header=loadFileRDDwH.first()

val loadFileRDD=loadFileRDDwH.filter(_(0) != header(0)).map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).split("\\s+").mkString.toDouble, r(4), r(5)))

感谢您的阅读

克里斯蒂安

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41785609

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档