我有一些用Java实现的Kafka用户,我正在实现一个独立的应用程序来检查记录和墓碑。希望卡夫卡将删除州商店,因为它压缩主题。
现在..。我对卡夫卡创建的不同类型的商店感到有点困惑。对于每一种类型的商店,我想知道:
卡夫卡删除相应主题中的旧记录时,是否删除了
我看到的商店类型如下:
对于使用聚合函数的流拓扑,我们已经有了一个墓碑策略,它应该覆盖#1类型的存储。我们向流应用程序发送一个空消息&它作为聚合结果返回它。
对于#3类型的商店,我将在相应的ktable上运行一个tombstoning应用程序。我预计变化量会缩小。
然而,对于#2和#4类型的商店,我不知道它们是如何被清理的。它们对应于我的使用者拓扑中的selectKey()+leftJoin()函数。但是,它们与以流为中心的拓扑结构有关,因此我不知道该如何清理它们。有什么建议不涉及阻止经纪人吗?
发布于 2020-03-07 21:08:09
#2和#4不是商店--它们是由Kafka流创建的内部主题。
没有办法显式地清理这些重新分区主题,但是因为这些主题(#2和#4)是由使用retention.ms的Kafka流创建的,所以kafka将根据卡夫卡服务器的cleanup.policy = delete配置自动清除这些主题(如果没有明确指定,这是默认的7天)。
https://stackoverflow.com/questions/60416821
复制相似问题