我一直在尝试使用reactive-kafka,我在条件处理方面遇到了一个问题,我没有找到一个令人满意的答案。
基本上,我试图消费一个包含大量消息(每天大约100亿条消息)的kafka主题,然后根据消息的某些属性只处理其中的几条消息(每天几千条),然后将我的消息的处理版本推送到另一个主题,我正在努力做到这一点。
我的第一次尝试是这样的:
// This is pseudo code.
Source(ProducerSettings(...))
.filter(isProcessable(_))
.map(process(_))
.via(Producer.flow(producerSettings))
.map(_.commitScalaDsl())
.runWith(Sink.ignore)这种方法的问题是,我只在读取我能够处理的消息时才提交,这显然不酷,因为如果我必须停止并重新启动我的程序,那么我必须重新读取一堆无用的消息,而且由于它们太多,我负担不起这样做。
然后,我尝试通过执行以下内容来使用GraphDSL:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> commit
~> broadcast ~> isNotProcessable ~> merge这种解决方案显然也不是很好,因为我无法处理的消息会通过图的第二个分支,并在可处理的消息被真正推送到它们的目的地之前提交,这比第一个消息更糟糕,因为它甚至不能保证至少交付一次。
有谁知道我该如何解决这个问题?
发布于 2018-02-16 21:55:35
我以前解决类似问题的一种方法是利用序列号来保证排序。
例如,您可以构建一个流,如您所描述的保存提交:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
~> broadcast ~> isNotProcessable ~> merge然后将它包装成一个顺序保持流,就像这样(取自我们公司开发的一个库):OrderPreservingFlow。然后,可以将生成的流发送到提交者接收器。
如果您的处理阶段保证了排序,那么您甚至可以通过将逻辑直接嵌入到图中来更有效地避免任何缓冲:
in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
~> broadcast ~> isNotProcessable ~> mergeNextSeqNr在这里,您的mergeNextSeqNr只是一个修改的合并阶段,如果输入在端口1上可用,那么如果它的序列号是预期的,则立即发出它,否则只需等待数据在另一个端口上可用。
最终结果应该与使用上面的流包装完全相同,但如果嵌入它,可能会更容易使其适应您的需要。
https://stackoverflow.com/questions/48817195
复制相似问题