我们从应用程序中的avro文件中读取时间戳信息。我正在测试从Spark 2.3.1升级到Spark 2.4的过程,其中包括新内置的spark-avro集成。然而,我不知道如何告诉avro模式,我希望时间戳具有"timestamp-millis“的logicalType,而不是默认的"timestamp-micros”。
从使用Databricks spark-avro 4.0.0包查看Spark 2.3.1下的测试avro文件来看,我们有以下字段/模式:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}其中的searchTime是从纪元存储为长整型以来的毫秒数。一切都很好。
当我升级到Spark 2.4和内置的spark-avro 2.4.0包时,我有了这些新的字段/模式:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}可以看到,底层类型仍然是long,但现在使用“logicalType -micros”来增强它。这正是the release notes所说的将会发生的事情,然而,我找不到一种方法来指定使用'timestamp-millis‘选项的模式。
这就成了一个问题,当我向avro文件写入一个时间戳对象时,该对象被初始化为纪元后10,000秒,它将被读回10,000,000秒。在2.3.1/databricks-avro下,它只是一个没有相关信息的long,所以它就是这样出来的。
我们目前通过在感兴趣的对象上进行反射来构建模式,如下所示:
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]我尝试通过创建一个修改后的模式来增强这一点,该模式尝试替换与searchTime条目对应的StructField,如下所示:
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})但是,在spark.sql.types中定义的StructField对象没有logicalType的概念来扩充其中的dataType。
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) 我还尝试通过两种方式从JSON表示创建模式:
val schemaJSONrepr = """{
| "name" : "id",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchQuery",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchTime",
| "type" : "long",
| "logicalType" : "timestamp-millis",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "score",
| "type" : "double",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "searchType",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }""".stripMargin第一次尝试只是简单地从它创建一个DataType
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
.schema(schema)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)此操作失败,因为它无法为searchTime节点创建StructType,因为其中包含"logicalType“。第二次尝试是通过传入原始JSON字符串来简单地创建模式。
spark.read
.schema(schemaJSONrepr)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)这不符合以下说法:
mismatched input '{' expecting {'SELECT', 'FROM', ...
== SQL ==
{
^^^我发现在spark-avro API中有一种方法可以从模式中获取logicalType,但不知道如何设置。
正如您在上面看到的失败的尝试,我尝试使用Schema.Parser创建一个avro schema对象,但是spark.read.schema中唯一接受的类型是String和StructType。
如果有人能提供关于如何更改/指定此logicalType的见解,我将不胜感激。谢谢
发布于 2019-02-07 03:20:46
好吧,我想我已经回答了我自己的问题。当我修改以编程方式构建的模式以使用显式的时间戳类型时
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})当我们有一个读回的Row对象时,我没有改变逻辑。最初我们会读取一个长的,并将其转换为时间戳,这就是事情出错的地方,因为它读回了长达微秒的数据,这将使它比我们预期的大1000倍。将我们的读取更改为直接读取时间戳对象,让底层逻辑考虑到这一点,使其脱离我们(我的)手中。所以:
// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN
searchTime = row.getAs[Timestamp]("searchTime") // SUCCESShttps://stackoverflow.com/questions/54560068
复制相似问题