下面是我的流程:
GetFile > ExecuteSparkInteractive > PutFile我希望在GetFile处理器中读取ExecuteSparkInteractive处理器中的文件,应用一些转换并将其放在某个位置。下面是我的流量

我在火花处理器的code部分下编写了code:
val sc1=sc.textFile("local_path")
sc1.foreach(println)在水流中什么也没有发生。那么,如何使用GetFile处理器读取火花处理器中的文件呢?
第二部分:
为了练习,我尝试了下面的流程:
ExecuteScript > PutFile > LogMessage我在执行文件处理器中提到了下面的代码:
readFile = open("/home/cloudera/Desktop/sample/data","r")
for line in readFile:
lines = line.strip()
finalline = re.sub(pattern='((?<=[0-9])[0-9]|(?<=\.)[0-9])',repl='X',string=lines)
readFile = open("/home/cloudera/Desktop/sample/data","w")
readFile.write(finalline) 代码工作正常,但它不会将格式化的数据写入目标文件夹。我哪里出问题了。此外,我还在本地机器上安装了熊猫,并从执行文件处理器运行熊猫代码,但nifi不读取熊猫模块。为什么会这样呢?我尽了最大努力。而且,我找不到任何相关的链接,在这里我可以得到基本的流程。
发布于 2019-04-10 13:35:02
这不是真正的工作方式。GetFile正在拾取NiFi节点本地的文件,并将它们带入NiFi流中进行处理。ExecuteSparkInteractive启动远程星火集群上的火花作业,它不会将数据传输到Spark。所以你可能想把这些数据放在星火可以访问的地方,比如GetFile -> PutHDFS -> ExecuteSparkInteractive。
https://stackoverflow.com/questions/55606681
复制相似问题