我有一个主题,在这个主题中,我从各种设备上获得了一系列事件。有n个设备每秒发出一次天气报告。
问题是这些设备每秒会发出5-10个相同值的记录。因此,如果您在kafka主题中看到单个设备的输出,则如下所示:
对于设备1:- t1,t1(在同一时刻,然后间隔s秒) t2,t2(然后间隔s秒),t3,t3
然而,我想在kafka中删除这些作为突发事件出现的重复记录。我想消费如下:- t1,t2,t3,...
我试图使用Kafka stream API提供的窗口和ktable的概念,但这似乎不太可能。有什么想法吗?
发布于 2018-07-28 03:45:54
你可能想要使用kafka的Log压缩。但是为了使用它,U应该对所有重复的消息使用相同的密钥,而对非重复的消息使用不同的密钥。看看这个。https://kafka.apache.org/documentation/#compaction
发布于 2020-01-16 10:23:24
是否可以使用t作为关键字将主题读取到KTable中。重复的值将被视为upserts,而不是实际上会删除它们的insert。然后将KTable写入另一个主题
https://stackoverflow.com/questions/51224039
复制相似问题