首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何同时将记录转发给多个Kafka Stream子处理器?

如何同时将记录转发给多个Kafka Stream子处理器?
EN

Stack Overflow用户
提问于 2019-05-01 17:39:12
回答 2查看 1.5K关注 0票数 3

在Kafka中,是否可以一次将多个记录转发给不同的子处理器?举个例子,假设我们有一个名为处理器-父处理器的父处理器和两个子处理器,子处理器-1,子处理器-2。

当处理器-父程序接收到要处理的记录时,我想做以下操作.

代码语言:javascript
复制
new_record = create_new_record(current_record)
context.forward(new_record, To(Child-1))
context.forward(old_record, To(Child-2))

这是一个良好的做法转发这样的记录吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-05-02 10:37:34

这取决于你的要求:

  • 如果您的逻辑是直接的,您甚至可以使用Kafka流DSL。
  • 如果它稍微复杂一些,并且需要Procesor API,但是您希望将相同的记录传递给两个处理器,那么您可以像@所提到的那样这样做。
代码语言:javascript
复制
builder = new TopologyBuilder();
    builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(), SOURCE)
.addProcessor("child2", () -> new child2(), SOURCE);
  • 如果它更复杂,并且取决于处理器中的某些逻辑,您想要将消息传递给不同的处理器节点,您可以这样做。
代码语言:javascript
复制
builder = new TopologyBuilder();
    builder.addSource(SOURCE, kafkaTopic)
.addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
.addProcessor("child1", () -> new child1(), "InputProcessor")
.addProcessor("child2", () -> new child2(), "InputProcessor");
代码语言:javascript
复制
public class InputProcessor extends AbstractProcessor<String, String> {
    @Override
    public void process(String key, String value) {
        try {
            context().forward(key, Integer.parseInt(value), To.child("child1"));
            context().forward(key, value, To.child("child2"));
        }
        catch (NumberFormatException nfe) {
            context().forward(key, value, To.child("child2"));
        }
    }
}
票数 3
EN

Stack Overflow用户

发布于 2019-05-02 09:31:52

这不是最佳做法。相反,使用一个父处理器和多个子处理器创建您的应用程序。

代码语言:javascript
复制
builder = new TopologyBuilder();
    builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(),SOURCE)
.addProcessor("child2", () -> new child2(),SOURCE);

通过这种方式,kafka流可以确保源上的每一条消息都流向两个子处理器。

票数 -2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55940292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档