现在,我正在做一个使用Kafka Streams来处理消息的项目。我们的消息由两个标识符组成,一个是用户的标识符,第二个是用户所属的消息列表的标识符。
这个应用程序的故事是,一个客户端想要通过我的应用程序向他们的用户列表发送推送消息。因此,客户端在向用户发送推送消息之前从外部源加载用户列表,然后开始发送。但是客户端也希望在他想要的时候停止正在进行的发送。
我的问题在这一点上出现了。我想停止发送推送消息,而不是通过kafka的流api消费所有消息。我在谷歌上搜索了最新的流源偏移量的变化,但找不到合适的方法来完成它。
我如何才能做到这一点?谢谢。
发布于 2017-12-17 02:29:11
Kafka Streams不支持此功能。如果您确实需要这样做,您可以停止Kafka Streams并使用bin/kafka-consumer-groups.sh手动查找Kafka Streams应用程序的端到端(注: application.id == group.id)。之后,您可以重启Kafka Streams。
顺便说一下:在即将发布的1.1版本中,bin/kafka-streams-application-reset.sh还将允许操作偏移量。我希望我们还能设法使内部使用的类StreamsResetter成为公共API --这将允许您在我们的应用程序中以编程方式执行此操作,而无需退回到命令行。
https://stackoverflow.com/questions/47827722
复制相似问题