是否可以在@OnStopped注释上发送流文件?
基本上,我想编写自定义处理器,它可以在处理器停止时在flowFile中发送属性。
有什么建议吗?
我试着做以下工作:
ProcessSession session;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
flowFile = session.create();
}
flowFile = session.putAttribute(flowFile, "ATTRIBUTE_SIGNAL", "start");
session.transfer(flowFile, success);
}
@OnStopped
public void sendStop() {
FlowFile flowFile = session.get();
flowFile = session.create();
flowFile = session.putAttribute(flowFile, "ATTRIBUTE_SIGNAL", "stop");
session.transfer(flowFile, success);
}但它失败了
2018-04-30 20:44:25,540 ERROR [StandardProcessScheduler Thread-3] org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method 'public void com.kotak.nifi.processors.streaming.SignalGenerator.sendStop()' with arguments '[]'.
java.lang.reflect.InvocationTargetException: null.发布于 2018-04-30 14:08:01
诸如OnScheduled/OnStopped/etc这样的生命周期方法实际上并不意味着生成流文件,这就是为什么您不能访问ProcessSession,只有onTrigger才能这样做。
处理器通常被认为是松散耦合的,当一个处理器并不真正了解/关心其他处理器时,它只是从队列中取出一个流文件并处理它们。
从技术上讲,您可以通过在处理器成员变量中存储在ProcessSession中获得的对onTrigger的引用来实现您想要的结果,以便以后可以在OnStopped中使用它。
https://stackoverflow.com/questions/50099097
复制相似问题