我是否可以在Kafka中插入数据,这是avro模式?
我想从主题中选择记录,然后过滤航班(例如:假设两个记录具有相同的航班号。我们只需要通过考虑Avro模式中提到的时间戳来选择最新的一个
我想删除相同航班号的重复项,该怎么做呢?
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "oldsomething random" }输出流应该是这样的,
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" } builder.stream(inputTopic, Consumed.with(Serdes.String(), flightDataSerde))
.map((k, v) -> new KeyValue<>((String) v.getFlightStatus(), (Integer) v.getFlightNumber()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
// Apply COUNT method
.count()
// Write to stream specified by outputTopic
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));Avro:
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "FlightData",
"fields": [
{"name": "FlightNumber", "type": "int"},
{"name": "OriginAirport", "type": "string"},
{"name": "DestinationAirport", "type": "string"},
{"name": "OriginDate", "type": "string"},
{"name": "OriginTime", "type": "string"},
{"name": "DestinationDate", "type": "string"},
{"name": "DestinationTime", "type": "string"},
{"name": "FlightStatus", "type": "string"},
{"name": "GateOut", "type": "string"},
{"name": "GateIn", "type": "string"},
{"name": "RecordDateTime", "type": "string"}
]
}发布于 2021-01-04 06:02:34
您需要解决的主要问题是,您希望在发出结果记录之前等待多长时间。当您获得第一条记录时,您不知道是否可以立即发出它,或者稍后是否会有重复的记录(具有更大或更小的时间戳)。
因此,您需要定义一些窗口,并使用一个聚合来为每个键和每个窗口只保留一条记录。在这个聚合中,您可以比较时间戳,并且只保留所需的记录。
聚合之后,您可以使用suppress()在窗口关闭时仅发出一条最终结果记录。
发布于 2020-07-27 22:03:49
通过考虑Avro模式中提到的时间戳来实现
这就是TimestampExtractor接口的用途。否则,您可以调整上游生成器,使时间戳成为实际的记录时间戳
两个记录具有相同的航班号。我们只需要挑选最新的。
这是到达源主题的相同键的有序记录的默认行为。不过,您需要考虑逻辑来处理延迟到达的数据,并跳过任何具有较晚时间戳的数据。与Streams DSL相比,使用Processor API可以更轻松地完成此操作,您无论如何都需要使用Streams DSL来获取对表内容的检查权
https://stackoverflow.com/questions/63115244
复制相似问题