首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在spark-avro 2.4模式中设置logicalType?

如何在spark-avro 2.4模式中设置logicalType?
EN

Stack Overflow用户
提问于 2019-02-07 02:14:57
回答 1查看 1.1K关注 0票数 2

我们从应用程序中的avro文件中读取时间戳信息。我正在测试从Spark 2.3.1升级到Spark 2.4的过程,其中包括新内置的spark-avro集成。然而,我不知道如何告诉avro模式,我希望时间戳具有"timestamp-millis“的logicalType,而不是默认的"timestamp-micros”。

从使用Databricks spark-avro 4.0.0包查看Spark 2.3.1下的测试avro文件来看,我们有以下字段/模式:

代码语言:javascript
复制
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

其中的searchTime是从纪元存储为长整型以来的毫秒数。一切都很好。

当我升级到Spark 2.4和内置的spark-avro 2.4.0包时,我有了这些新的字段/模式:

代码语言:javascript
复制
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

可以看到,底层类型仍然是long,但现在使用“logicalType -micros”来增强它。这正是the release notes所说的将会发生的事情,然而,我找不到一种方法来指定使用'timestamp-millis‘选项的模式。

这就成了一个问题,当我向avro文件写入一个时间戳对象时,该对象被初始化为纪元后10,000秒,它将被读回10,000,000秒。在2.3.1/databricks-avro下,它只是一个没有相关信息的long,所以它就是这样出来的。

我们目前通过在感兴趣的对象上进行反射来构建模式,如下所示:

代码语言:javascript
复制
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]

我尝试通过创建一个修改后的模式来增强这一点,该模式尝试替换与searchTime条目对应的StructField,如下所示:

代码语言:javascript
复制
    val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

但是,在spark.sql.types中定义的StructField对象没有logicalType的概念来扩充其中的dataType。

代码语言:javascript
复制
case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) 

我还尝试通过两种方式从JSON表示创建模式:

代码语言:javascript
复制
val schemaJSONrepr = """{
          |          "name" : "id",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchQuery",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchTime",
          |          "type" : "long",
          |          "logicalType" : "timestamp-millis",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "score",
          |          "type" : "double",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchType",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }""".stripMargin

第一次尝试只是简单地从它创建一个DataType

代码语言:javascript
复制
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
     .schema(schema)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

此操作失败,因为它无法为searchTime节点创建StructType,因为其中包含"logicalType“。第二次尝试是通过传入原始JSON字符串来简单地创建模式。

代码语言:javascript
复制
spark.read
     .schema(schemaJSONrepr)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

这不符合以下说法:

代码语言:javascript
复制
mismatched input '{' expecting {'SELECT', 'FROM', ...

== SQL ==

{
^^^

我发现在spark-avro API中有一种方法可以从模式中获取logicalType,但不知道如何设置。

正如您在上面看到的失败的尝试,我尝试使用Schema.Parser创建一个avro schema对象,但是spark.read.schema中唯一接受的类型是String和StructType。

如果有人能提供关于如何更改/指定此logicalType的见解,我将不胜感激。谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-07 03:20:46

好吧,我想我已经回答了我自己的问题。当我修改以编程方式构建的模式以使用显式的时间戳类型时

代码语言:javascript
复制
val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

当我们有一个读回的Row对象时,我没有改变逻辑。最初我们会读取一个长的,并将其转换为时间戳,这就是事情出错的地方,因为它读回了长达微秒的数据,这将使它比我们预期的大1000倍。将我们的读取更改为直接读取时间戳对象,让底层逻辑考虑到这一点,使其脱离我们(我的)手中。所以:

代码语言:javascript
复制
// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN

searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54560068

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档