我试图使用带有水印的dropDuplicate函数对流数据进行去复制。我目前面临的问题是,对于给定的记录,我必须有两个时间戳。
副本是在中间阶段引入的,因此对于给定的记录复制,事件时间戳是相同的,但是传输时间戳是不同的。
对于水印,我喜欢使用传输时间戳,因为我知道重复不能发生超过3分钟的传输间隔。但是我不能在dropDuplicate中使用它,因为它不会捕获副本,因为副本具有不同的传输时间戳。
以下是一个例子,
Event 1:{ "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00" }
Event 2 (duplicate): {"EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"}在这种情况下,复制是在从原始事件转移到3分钟之后创建的。
我的代码如下,
streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring","transferTimestamp");上面的代码不会删除重复项,因为对于事件及其副本,transferTimestamp是唯一的。但目前,这是唯一的方式,因为火花迫使我包括水印列在dropDuplicates函数时,水印设置。
我真的很想看到一个像下面这样的dropDuplicate实现,这对于任何至少一次语义流来说都是有效的,在这种情况下,我不必在dropDuplicates中使用水印字段,但是基于水印的状态驱逐仍然是值得尊重的。但目前情况并非如此。
streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring");我不能使用事件时间戳,因为它没有排序,时间范围变化很大(延迟事件和垃圾事件)。
如果有人有替代的解决方案或想法,在这种情况下,请让我知道。
发布于 2020-09-28 21:13:17
对于您的用例,您不能直接使用dropDuplicates API。您必须使用一些星星之火API (如平面flatmapgroupwithstate )对同一状态使用任意有状态操作。
https://stackoverflow.com/questions/53622632
复制相似问题