我必须知道数据传输作业(流程组内的流程)的状态,它是完成了,失败了,还是正在运行。我想在web应用程序中使用nipyapi来完成这个任务。
我在NiFi中有一个进程组,其中有NiFi流。我正在使用nipyapi调度进程组:
nipyapi.canvas.schedule_process_group(id, True)现在,我希望使用nipyapi监视进程组的状态。根据状态,我特别想知道它是否仍在运行、失败或完成。
发布于 2019-04-29 11:48:50
我想我找到了解决这个问题的好办法。我就是这样解决的。所以我有一个mysql db,它基本上跟踪要传输的所有文件。数据库表将有2列。一个用于文件名(假设是唯一的),并标记文件是否已被传输(真和假)。
我们有3部分处理器。
First: listSFTP and putMySQL Second: getSFTP and putHDFS Third: listHDFS and putHDFS第一部门负责在SFTP中列出文件。它获取所有文件,并将一个行添加到mysql中,该文件名为“X”和“False”,表示尚未传输。
insert into NifiTest.Jobs values('${filename}', 0);
第三节对HDFS也做了同样的事情。如果已经存在具有相同文件名的行,则它将使用Transferred = True插入或更新。
insert into NifiTest.Jobs values('${filename}', 1) on duplicate key update TRANSFERRED = 1;
第二部分除了将文件发送到HDFS之外,什么也不做。
现在检查数据传输工作何时完成。
您将一起启动整个流程组。当查询数据库并得到所有Transferred = 1时,这意味着作业已经完成。它可能觉得有些情况下,它可能会失败,但当你仔细考虑所有的情况,你会看到,它照顾所有的情况。如果我错了,或者这个解决方案可以做一些改进,请告诉我。
发布于 2019-04-24 13:01:04
NiFi实际上没有一个可以检查是否完成的作业的概念。一旦启动了流程组中的所有组件,它们就会无限期地运行,直到有人停止它们。
“完成”或“完成”的概念实际上取决于数据流所做的事情。例如,如果您的第一个处理器是GetFile,那么一旦该处理器运行,它就会监视文件目录,直到有人停止该处理器。当处理器运行时,它无法知道是否会有更多的文件,或者它是否已经看到所有将被丢弃在目录中的文件。只有把文件放在那里的人/任何东西才知道这些知识。
要确定失败,您需要在数据流中做一些事情来捕获故障。大多数处理器都有故障关系,因此您需要将它们路由到某个地方,并采取一些措施来跟踪故障。
发布于 2022-07-13 11:00:42
您可以在没有数据库的情况下使用流程组的注册表API变量来实现这一点。创建一个自定义处理器,它在进程组中设置一个变量,让我们将is_complete = true作为最后一个处理器。然后,您可以使用nipyapi监视这个变量。
https://stackoverflow.com/questions/55828440
复制相似问题