我正在用java构建一个apache光束管道,它做了一系列的事情,比如读取文件,创建审计,并将其加载到bigquery。如果我的管道在任何步骤失败,我希望将文件移动到其他文件夹。所以基本上,如果我的流水线在任何步骤由于任何原因而失败,我想在下面的if条件下处理它。目前,如果流水线成功运行,那么它将进入else部分并完成打印,但如果流水线失败,它将显示退出代码1,但不会打印失败。有没有办法像这样处理我的管道故障。
PipelineResult.State state = pipeline.run().waitUntilFinish();
if(state == PipelineResult.State.FAILED){
System.out.println("failed");
}else if(state==PipelineResult.State.DONE){
System.out.println("Done");
}任何帮助都将不胜感激。
发布于 2021-03-19 05:45:59
我建议看看如何在管道的每个步骤中处理错误,而不是试图处理管道失败。
例如,WriteToBigQuery接收器的默认重试策略是always,因此即使错误不是可以成功写入BigQuery的格式,也将始终重试。您可以考虑实现死信模式,以捕获错误并将其写入不同的源。
我只使用Kafka和PubSub作为管道的输入,所以我不确定在读取文件时如何处理错误,以及您可能会遇到什么问题。
在创建审核或读取和写入数据之间的任何其他处理步骤时,您可以使用标记的输出并尝试捕获,从一个步骤为成功和失败的输出创建两个不同的输出,然后将它们写入您喜欢的任何位置
https://stackoverflow.com/questions/66673419
复制相似问题