首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >去掉重复项(以key为航班号),只获取最新记录w.r.t时间戳

去掉重复项(以key为航班号),只获取最新记录w.r.t时间戳
EN

Stack Overflow用户
提问于 2020-07-27 20:21:27
回答 2查看 43关注 0票数 0

我是否可以在Kafka中插入数据,这是avro模式?

我想从主题中选择记录,然后过滤航班(例如:假设两个记录具有相同的航班号。我们只需要通过考虑Avro模式中提到的时间戳来选择最新的一个

我想删除相同航班号的重复项,该怎么做呢?

代码语言:javascript
复制
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "oldsomething random" }

输出流应该是这样的,

代码语言:javascript
复制
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
代码语言:javascript
复制
 builder.stream(inputTopic, Consumed.with(Serdes.String(), flightDataSerde))
    
        .map((k, v) -> new KeyValue<>((String) v.getFlightStatus(), (Integer) v.getFlightNumber()))
    
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
        // Apply COUNT method
      .count()
        // Write to stream specified by outputTopic
        .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

Avro:

代码语言:javascript
复制
  "namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "FlightData",
  "fields": [
    {"name": "FlightNumber", "type": "int"},
    {"name": "OriginAirport", "type": "string"},
    {"name": "DestinationAirport", "type": "string"},
        {"name": "OriginDate", "type": "string"},
        {"name": "OriginTime", "type": "string"},
        {"name": "DestinationDate", "type": "string"},
        {"name": "DestinationTime", "type": "string"},
        {"name": "FlightStatus", "type": "string"},

        {"name": "GateOut", "type": "string"},
        {"name": "GateIn", "type": "string"},
        {"name": "RecordDateTime", "type": "string"}
  ]
}
EN

回答 2

Stack Overflow用户

发布于 2021-01-04 06:02:34

您需要解决的主要问题是,您希望在发出结果记录之前等待多长时间。当您获得第一条记录时,您不知道是否可以立即发出它,或者稍后是否会有重复的记录(具有更大或更小的时间戳)。

因此,您需要定义一些窗口,并使用一个聚合来为每个键和每个窗口只保留一条记录。在这个聚合中,您可以比较时间戳,并且只保留所需的记录。

聚合之后,您可以使用suppress()在窗口关闭时仅发出一条最终结果记录。

票数 0
EN

Stack Overflow用户

发布于 2020-07-27 22:03:49

通过考虑Avro模式中提到的时间戳来实现

这就是TimestampExtractor接口的用途。否则,您可以调整上游生成器,使时间戳成为实际的记录时间戳

两个记录具有相同的航班号。我们只需要挑选最新的。

这是到达源主题的相同键的有序记录的默认行为。不过,您需要考虑逻辑来处理延迟到达的数据,并跳过任何具有较晚时间戳的数据。与Streams DSL相比,使用Processor API可以更轻松地完成此操作,您无论如何都需要使用Streams DSL来获取对表内容的检查权

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63115244

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档