我有ExecuteScript的输出FlowFile #1和另一个ExecuteScript的输出FlowFile #2。
FlowFile #1
{
"field1": "val",
"field2": "val"
}FlowFile #2
{
"field2": "abc"
}我应该使用哪个处理器来将field1在FlowFile #1中的值替换为FlowFile #2中的field2值abc?
我不想使用MergeContent,因为我需要的只是替换值。
更新:
在UpdateAttribute中,我将属性filename设置为${UUID()}。然后,在命名为ExecuteScript的Merge inputs into single FlowFile中,我使用了下面所示的代码。这些文件没有合并,而是排队。
Replacetext的输出类似于FlowFile #2,UpdateAttribute的输出类似于FlowFile #1。

import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
//get first flow file
def ff0 = session.get()
if(!ff0)return
def filename = ff0.getAttribute('filename')
//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
public FlowFileFilterResult filter(FlowFile ff) {
if( filename == ff.getAttribute('filename') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
return FlowFileFilterResult.REJECT_AND_CONTINUE
}
})
//let's assume you require two additional files in queue with the same attribute
if( !ffList || ffList.size()<1 ){
session.rollback(true)
return
}
//let's put all in one list to simplify later iterations
ffList.add(ff0)
if( ffList.size()>2 ){
session.transfer(ffList, REL_FAILURE)
return
}
//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
session.read(ff).withStream{rawIn->
def fjson = new JsonSlurper().parse(rawIn)
json.putAll(fjson)
}
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
rawOut.withWriter("UTF-8"){writer->
new JsonBuilder(json).writeTo(writer)
}
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")
session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)UDPATE #2:

发布于 2018-12-18 12:18:05
完全的细节可以在聊天中找到,谢谢@dagget,但是为了确保这个问题看起来没有答案,我会在这里发布要点。
https://stackoverflow.com/questions/46095557
复制相似问题