我们已经构建了一个从主题读取并在不同字段上执行groupBy的管道。
input
.groupBy(
(key, value) -> value.getFieldA(),
Grouped.with("TopicName", Serdes.String(), Serdes.Integer()))
.windowedBy(SessionWindows.with(ofMinutes(5)).grace(Duration.ZERO))此步骤创建一个中间app-TopicName-repartition主题。然而,KStream不断地向Kafka发送Delete请求。我们可以看到卡夫卡一侧的原木:
INFO [DENY] Auth request Delete on Topic:app-TopicName-repartition by User test_user (cached) (io.aiven.kafka.auth.AivenAclAuthorizer)我们的代码中没有通过admin进行streams.cleanUp()或手动删除的过程。删除请求仅用于重新分区主题,而不是其他中间主题。这个应用程序运行得非常好。它一直发送后台删除请求,因为我已经将retries设置为Integer.MAX_VALUE。我未能调试这个问题。为什么KStream要为重新分区主题发出删除请求?
更新
就我在KStreams源代码上的跟踪而言,它在TaskManager中调用了KafkaAdminClient.deleteRecords()。这就是我在日志文件中看到Delete的原因吗?在KStreams源代码中没有显式删除主题的其他调用。
发布于 2019-12-12 03:14:38
这是正确的。卡夫卡流从不试图删除一个主题。但是,您需要允许它从重新分区主题中清除数据。请注意,默认情况下,重新分区主题配置为无限保留时间,如果Kafka流无法清除该主题,则会无限制地增长。
有关您需要哪些ACLs的更多详细信息,请查看docs:https://docs.confluent.io/current/streams/developer-guide/security.html#required-acl-setting-for-secure-ak-clusters
https://stackoverflow.com/questions/59284789
复制相似问题