我给出了从主题、处理器和接收器到其他主题的源拓扑。
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"),
Serdes.String(),
Serdes.String());
Topology topology = new Topology();
topology.addSource("incoming", Serdes.String().deserializer(), Serdes.String().deserializer(), "topic");
topology.addProcessor("incoming_first", () -> new MyProcessor(), "incoming");
topology.addStateStore(storeBuilder, "incoming_first");
topology.addSink("sink", "sink", "incoming_first"),public class MyProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, String> stateStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = (KeyValueStore<String, String>) context.getStateStore("store");
}
@Override
public void process(String key, String value) {
stateStore.put(key, value);
....
throw new RuntimeException();
....
context.forward(); //forward to sink
}
@Override
public void close() {
}
}我的问题是如何处理在写入状态存储后处理器中发生异常的情况。Kafka是否有一些带有状态存储回滚的错误处理机制来重新处理消息,还是将其转发到错误主题?
目前,没有任何处理,我的应用程序完全死亡,我需要重新启动它。此外,如果我添加一些尝试-捕捉消息,标识为ok,并且我的状态存储被更新,消息被发送到changelog主题。
我需要一些状态存储的回滚机制吗?
https://issues.apache.org/jira/browse/KAFKA-7192 KIP说,如果出现异常,状态存储不应使用EOS处理,但这仅适用于我的整个应用程序死亡时的情况。
提前感谢!
发布于 2020-03-10 04:07:32
对于从Processor抛出的任何异常,相应的线程总是会死掉的。防止这种情况的唯一方法是捕获所有异常并相应地处理它们(无论对您的应用程序如何处理都是正确的)。
如果线程死了,并且重新启动应用程序以恢复线程,则取决于您的配置是否回滚存储。默认情况下,不会回滚存储。只有在通过设置配置参数processing.guarantees="exactly_once"来启用精确一次语义的情况下,存储才会在重新启动时回滚。
如果捕捉到Processor代码中的任何异常,而业务逻辑需要回滚存储,则需要自己实现这一点,方法是首先从存储中获取旧值,更新存储,并导致异常--将旧值放回存储中以覆盖/撤消所有写入。
https://stackoverflow.com/questions/60442677
复制相似问题