我使用Hazelcast Jet为我的DAG设计新的处理器。其中一些处理器可能会失败,抛出异常--如果不以某种方式处理--将导致整个作业失败并停止。
因此,我试图设计一种机制,在我的处理器代码中引入一些可选的容错策略。为了给出一个想法,我想通过配置以下策略之一来处理错误:
一旦发生,unhandled)
但是,考虑到处理器接口的设计方式,没有办法通过通用装饰处理器来提供这种行为,除非它们是内部设计来支持这种容错策略的。
事实上,我希望能够用这样的方法来装饰它们:
Processors.faultTolerantP(ProcessorMetaSupplier supplier, FaultTolerancePolicy policy)其中FaultTolerancePolicy是对上述策略的描述。
到目前为止,我唯一能做的就是设计我的“容错处理器”来实现允许将策略注入处理器的IFaultTolerant接口。然后处理器代码必须“手动”处理容错策略。
interface IFaultTolerant{
void setFaultTolerancePolicy(FaultTolerancePolicy policy);
}
class MyProcessor extends AbstractProcessor implements IFaultTolerant{
public void setFaultTolerancePolicy(FaultTolerancePolicy policy){
// stores the policy and behaves as specified by the policy when errors occur
}
}
class MyProcessors{
public static ProcessorMetaSupplier faultTolerantP(ProcessorMetaSupplier supplier, FaultTolerancePolicy policy) {
return new WrappingProcessorMetaSupplier(supplier, p -> faultTolerantP(p, policy));
}
private static Processor faultTolerantP(Processor p, FaultTolerancePolicy policy) {
if (p instanceof IFaultTolerant) {
((IFaultTolerant)p).setFaultTolerancePolicy(policy);
}
return p;
}
}你对此有什么建议吗?是否有可能拦截更高级别的故障,从而使任何处理器都可以在不需要为此设计的情况下实现容错?
发布于 2020-05-12 20:58:55
我不确定是否有可能在处理器之外以通用的方式处理这个问题。处理器必须支持重试。例如,处理器可以从收件箱中获取一个项并继续处理它,但是在处理它时它会失败。process方法的调用者将捕获异常并重试--但该项已不在收件箱中。
但是假设您知道处理器的设计是为了能够重试或忽略调用,您可以这样做:
首先,创建一个处理器包装器,捕获并处理异常:
public static class FaultTolerantProcessorWrapper implements Processor {
private final Processor delegate;
private final FaultTolerancePolicy policy;
protected FaultTolerantProcessorWrapper(FaultTolerancePolicy policy, Processor delegate) {
this.policy = policy;
this.delegate = delegate;
}
@Override
public void process(int ordinal, @Nonnull Inbox inbox) {
try {
delegate.process(ordinal, inbox);
} catch (Exception e) {
policy.handle(e);
if (policy.isIgnore()) {
// will ignore the entire batch of items, not just the item that failed
inbox.clear();
}
}
}
// repeat for other methods such as `tryProcessWatermark`, `complete` etc.
}然后像这样使用它:
// if your custom processor uses a ProcessorMetaSupplier
Vertex v = dag.newVertex("v", new WrappingProcessorMetaSupplier(
YourProcessor.metaSupplier(),
p -> new FaultTolerantProcessorWrapper(faultTolerancePolicy, p)));
// if your custom processor uses a ProcessorSupplier
Vertex v = dag.newVertex("v", new WrappingProcessorSupplier(
YourProcessor.supplier(),
p -> new FaultTolerantProcessorWrapper(faultTolerancePolicy, p)));
// if your custom processor uses a SupplierEx<Processor>
Vertex v = dag.newVertex("v",
() -> new FaultTolerantProcessorWrapper(faultTolerancePolicy, new YourProcessor()));但我个人不会使用这种方法。在我看来它是脆弱的。我也不建议“增强”jet的内置处理器。批处理源很可能产生不正确的结果。流式处理器通常已经内置了容错功能。
https://stackoverflow.com/questions/61753348
复制相似问题