首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带有错误处理和状态存储回滚的处理器拓扑

带有错误处理和状态存储回滚的处理器拓扑
EN

Stack Overflow用户
提问于 2020-02-27 22:30:33
回答 1查看 146关注 0票数 0

我给出了从主题、处理器和接收器到其他主题的源拓扑。

代码语言:javascript
复制
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
              Stores.persistentKeyValueStore("store"),
              Serdes.String(),
              Serdes.String());
Topology topology = new Topology();
topology.addSource("incoming", Serdes.String().deserializer(), Serdes.String().deserializer(), "topic");
topology.addProcessor("incoming_first", () -> new MyProcessor(), "incoming");
topology.addStateStore(storeBuilder, "incoming_first");
topology.addSink("sink", "sink", "incoming_first"),
代码语言:javascript
复制
public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, String> stateStore;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.stateStore = (KeyValueStore<String, String>) context.getStateStore("store");
    }

    @Override
    public void process(String key, String value) {
        stateStore.put(key, value);
        ....
        throw new RuntimeException();
        ....
        context.forward(); //forward to sink
    }

    @Override
    public void close() {
    }
}

我的问题是如何处理在写入状态存储后处理器中发生异常的情况。Kafka是否有一些带有状态存储回滚的错误处理机制来重新处理消息,还是将其转发到错误主题?

目前,没有任何处理,我的应用程序完全死亡,我需要重新启动它。此外,如果我添加一些尝试-捕捉消息,标识为ok,并且我的状态存储被更新,消息被发送到changelog主题。

我需要一些状态存储的回滚机制吗?

https://issues.apache.org/jira/browse/KAFKA-7192 KIP说,如果出现异常,状态存储不应使用EOS处理,但这仅适用于我的整个应用程序死亡时的情况。

提前感谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-10 04:07:32

对于从Processor抛出的任何异常,相应的线程总是会死掉的。防止这种情况的唯一方法是捕获所有异常并相应地处理它们(无论对您的应用程序如何处理都是正确的)。

如果线程死了,并且重新启动应用程序以恢复线程,则取决于您的配置是否回滚存储。默认情况下,不会回滚存储。只有在通过设置配置参数processing.guarantees="exactly_once"来启用精确一次语义的情况下,存储才会在重新启动时回滚。

如果捕捉到Processor代码中的任何异常,而业务逻辑需要回滚存储,则需要自己实现这一点,方法是首先从存储中获取旧值,更新存储,并导致异常--将旧值放回存储中以覆盖/撤消所有写入。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60442677

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档