场景:
Stream create [StreamName] --definition " Kafka -zkconnect=10.10.10.1:2181 --topic=<topic name> | MyCompositeModule " --deploy我们在分布式模式下运行这个流,而redis是传输总线。据我所知,kafka source通过streamname-kafka-offsets主题维护MyCompositeModule使用的消息的偏移量(这是一个接收器,因为它是通过'module compose‘过程创建的一个模块)。这是不可读的,如果有一种方法可以从这个主题中读取数据,我将不胜感激。
此外,当我从kafka源推送消息时,消息在redis传输中排队,然后模块从这个队列中获取它们。
如果kafka消费者模块开始消费kafka redis队列中的1000条消息-复合模块在收到10条消息或随机处理10 messages.So后失败,如何识别剩余的990条1000 (已消费)- 10 (已处理)= 990条未处理的消息。即使我们检查kafka偏移量,它也会显示消耗的消息计数。例如:-kafka.offsets -它在我们的过程中是不可读的。
因此,当我们在SpringXD中使用Redis时,所有未处理的消息都将在Redis队列中。因此,谁能帮我找出如何识别未处理的消息,并将其发送到复合模块进行处理。
基本上,我正在寻找关于健壮交付的优雅解决方案的建议,当从kafka源码消费时,在spring xd流中添加故障处理功能。
发布于 2015-12-18 02:15:10
如果消息被有效地从Kafka消费并移动到总线,那么从偏移管理器的角度来看,它们将被确认为消费。
您可以尝试为Redis消息总线启用重试和死字标记,如下所述:http://docs.spring.io/spring-xd/docs/current/reference/html/#error-handling-message-delivery-failures。
干杯,马吕斯
https://stackoverflow.com/questions/34339900
复制相似问题