首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将FlowFile的特定字段替换为来自另一个FlowFile的字段?

如何将FlowFile的特定字段替换为来自另一个FlowFile的字段?
EN

Stack Overflow用户
提问于 2017-09-07 11:47:32
回答 1查看 546关注 0票数 2

我有ExecuteScript的输出FlowFile #1和另一个ExecuteScript的输出FlowFile #2。

FlowFile #1

代码语言:javascript
复制
{
 "field1": "val",
 "field2": "val"
}

FlowFile #2

代码语言:javascript
复制
{
  "field2": "abc"
}

我应该使用哪个处理器来将field1在FlowFile #1中的值替换为FlowFile #2中的field2abc

我不想使用MergeContent,因为我需要的只是替换值。

更新:

UpdateAttribute中,我将属性filename设置为${UUID()}。然后,在命名为ExecuteScriptMerge inputs into single FlowFile中,我使用了下面所示的代码。这些文件没有合并,而是排队。

Replacetext的输出类似于FlowFile #2,UpdateAttribute的输出类似于FlowFile #1。

代码语言:javascript
复制
import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

//get first flow file
def ff0 = session.get()
if(!ff0)return

def filename = ff0.getAttribute('filename')

//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
    public FlowFileFilterResult filter(FlowFile ff) {
        if( filename == ff.getAttribute('filename') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
        return FlowFileFilterResult.REJECT_AND_CONTINUE
    }
})

//let's assume you require two additional files in queue with the same attribute

if( !ffList || ffList.size()<1 ){
    session.rollback(true)
    return
}

//let's put all in one list to simplify later iterations
ffList.add(ff0)

if( ffList.size()>2 ){
    session.transfer(ffList, REL_FAILURE)
    return
}

//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
    session.read(ff).withStream{rawIn->
        def fjson = new JsonSlurper().parse(rawIn)
        json.putAll(fjson)
    }
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
    rawOut.withWriter("UTF-8"){writer->
        new JsonBuilder(json).writeTo(writer)
    }
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")

session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)

UDPATE #2:

EN

回答 1

Stack Overflow用户

发布于 2018-12-18 12:18:05

完全的细节可以在聊天中找到,谢谢@dagget,但是为了确保这个问题看起来没有答案,我会在这里发布要点。

  1. 确保在流发散之前将id添加到流中,这样您就可以再次将相关的消息对放在一起。
  2. 禁用回滚(惩罚),以避免等待消息的两部分再次可用太长时间
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46095557

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档