在Kafka中,是否可以一次将多个记录转发给不同的子处理器?举个例子,假设我们有一个名为处理器-父处理器的父处理器和两个子处理器,子处理器-1,子处理器-2。
当处理器-父程序接收到要处理的记录时,我想做以下操作.
new_record = create_new_record(current_record)
context.forward(new_record, To(Child-1))
context.forward(old_record, To(Child-2))这是一个良好的做法转发这样的记录吗?
发布于 2019-05-02 10:37:34
这取决于你的要求:
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(), SOURCE)
.addProcessor("child2", () -> new child2(), SOURCE);builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
.addProcessor("child1", () -> new child1(), "InputProcessor")
.addProcessor("child2", () -> new child2(), "InputProcessor");public class InputProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
try {
context().forward(key, Integer.parseInt(value), To.child("child1"));
context().forward(key, value, To.child("child2"));
}
catch (NumberFormatException nfe) {
context().forward(key, value, To.child("child2"));
}
}
}发布于 2019-05-02 09:31:52
这不是最佳做法。相反,使用一个父处理器和多个子处理器创建您的应用程序。
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(),SOURCE)
.addProcessor("child2", () -> new child2(),SOURCE);通过这种方式,kafka流可以确保源上的每一条消息都流向两个子处理器。
https://stackoverflow.com/questions/55940292
复制相似问题