kafka增加了在连接器中使用regex的新功能,但是似乎连接器启动后新添加的主题数据直到连接器重新启动时才会被消耗。我们需要动态添加新的主题,并让连接器根据连接器属性中定义的正则表达式使用该主题。怎样才能实现呢?例如: regex: topic-.* topic: topic-1,topic-2如果我引入了新的topic topic-3,那么如何让连接器在不重新启动的情况下使用主题数据?
发布于 2018-08-01 04:40:32
遵循其他人已经在评论中给出的想法,基本上你需要做的是建立一个机制,识别一个新的主题已经被引入,并且connecter需要干净地重新启动。
我会做这样的事,
1>在已经连接的主题中发送特定类型的消息(例如topic-1),如果接收到这样的消息,代码应该保持所有新的消息轮询,并等待所有偏移量提交完成。
然后,2>从轮询循环中中断并从您的消费者中删除订阅(consumer.unsubscribe())。
在订阅正则表达式主题的通常流程之后,需要遵循3>,因为新主题现在将成为正则表达式的一部分。
记住提交是很重要的,如果你匆忙重启连接器,你可能会得到重复的东西。同样明显的是,不要更改group.id并将auto.offset.reset保持为“最新”。
发布于 2019-02-19 14:27:27
Kafka消费者有一个选项metadata.max.age.ms -消费者刷新主题元数据的时间间隔。如果你不需要真正的实时,它可能会有所帮助。另请参阅:kafka consumer to dynamically detect topics added
在/etc/kafka-connect/kafka-connect.properties中,您应该将consumer.metadata.max.age.ms=1000指定为1秒。
https://stackoverflow.com/questions/51599243
复制相似问题