首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使Jet处理器具有容错性

如何使Jet处理器具有容错性
EN

Stack Overflow用户
提问于 2020-05-12 13:45:13
回答 1查看 95关注 0票数 2

我使用Hazelcast Jet为我的DAG设计新的处理器。其中一些处理器可能会失败,抛出异常--如果不以某种方式处理--将导致整个作业失败并停止。

因此,我试图设计一种机制,在我的处理器代码中引入一些可选的容错策略。为了给出一个想法,我想通过配置以下策略之一来处理错误:

一旦发生,unhandled)

  • retrying

  • 就会失败(当异常在failing

  • executing之前几次为时,当前的行为会导致一些通常会捕获异常的回退代码,审计错误,然后继续。

但是,考虑到处理器接口的设计方式,没有办法通过通用装饰处理器来提供这种行为,除非它们是内部设计来支持这种容错策略的。

事实上,我希望能够用这样的方法来装饰它们:

代码语言:javascript
复制
Processors.faultTolerantP(ProcessorMetaSupplier supplier, FaultTolerancePolicy policy)

其中FaultTolerancePolicy是对上述策略的描述。

到目前为止,我唯一能做的就是设计我的“容错处理器”来实现允许将策略注入处理器的IFaultTolerant接口。然后处理器代码必须“手动”处理容错策略。

代码语言:javascript
复制
interface IFaultTolerant{
   void setFaultTolerancePolicy(FaultTolerancePolicy policy);
}

class MyProcessor extends AbstractProcessor implements IFaultTolerant{
   public void setFaultTolerancePolicy(FaultTolerancePolicy policy){
      // stores the policy and behaves as specified by the policy when errors occur
   }
}

class MyProcessors{
    public static ProcessorMetaSupplier faultTolerantP(ProcessorMetaSupplier supplier, FaultTolerancePolicy policy) {
        return new WrappingProcessorMetaSupplier(supplier, p -> faultTolerantP(p, policy));
    }

    private static Processor faultTolerantP(Processor p, FaultTolerancePolicy policy) {
        if (p instanceof IFaultTolerant) {
            ((IFaultTolerant)p).setFaultTolerancePolicy(policy);
        }
        return p;
    }
}

你对此有什么建议吗?是否有可能拦截更高级别的故障,从而使任何处理器都可以在不需要为此设计的情况下实现容错?

EN

回答 1

Stack Overflow用户

发布于 2020-05-12 20:58:55

我不确定是否有可能在处理器之外以通用的方式处理这个问题。处理器必须支持重试。例如,处理器可以从收件箱中获取一个项并继续处理它,但是在处理它时它会失败。process方法的调用者将捕获异常并重试--但该项已不在收件箱中。

但是假设您知道处理器的设计是为了能够重试或忽略调用,您可以这样做:

首先,创建一个处理器包装器,捕获并处理异常:

代码语言:javascript
复制
public static class FaultTolerantProcessorWrapper implements Processor {

    private final Processor delegate;
    private final FaultTolerancePolicy policy;

    protected FaultTolerantProcessorWrapper(FaultTolerancePolicy policy, Processor delegate) {
        this.policy = policy;
        this.delegate = delegate;
    }

    @Override
    public void process(int ordinal, @Nonnull Inbox inbox) {
        try {
            delegate.process(ordinal, inbox);
        } catch (Exception e) {
            policy.handle(e);
            if (policy.isIgnore()) {
                // will ignore the entire batch of items, not just the item that failed
                inbox.clear();
            }
        }
    }

    // repeat for other methods such as `tryProcessWatermark`, `complete` etc.
}

然后像这样使用它:

代码语言:javascript
复制
// if your custom processor uses a ProcessorMetaSupplier
Vertex v = dag.newVertex("v", new WrappingProcessorMetaSupplier(
        YourProcessor.metaSupplier(),
        p -> new FaultTolerantProcessorWrapper(faultTolerancePolicy, p)));

// if your custom processor uses a ProcessorSupplier
Vertex v = dag.newVertex("v", new WrappingProcessorSupplier(
        YourProcessor.supplier(),
        p -> new FaultTolerantProcessorWrapper(faultTolerancePolicy, p)));

// if your custom processor uses a SupplierEx<Processor>
Vertex v = dag.newVertex("v",
        () -> new FaultTolerantProcessorWrapper(faultTolerancePolicy, new YourProcessor()));

但我个人不会使用这种方法。在我看来它是脆弱的。我也不建议“增强”jet的内置处理器。批处理源很可能产生不正确的结果。流式处理器通常已经内置了容错功能。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61753348

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档