首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何正确合并多个FlowFile文件?

如何正确合并多个FlowFile文件?
EN

Stack Overflow用户
提问于 2017-08-31 13:39:40
回答 2查看 2.7K关注 0票数 1

我使用MergeContent 1.3.0来合并来自两个源的FlowFiles : 1)来自ListenHTTP,2)来自QueryElasticsearchHTTP

问题是,合并结果是一个JSON字符串列表。如何将它们转换为单个JSON字符串?

代码语言:javascript
复制
{"event-date":"2017-08-08T00:00:00"}{"event-date":"2017-02-23T00:00:00"}{"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

我会得到这样的结果:

代码语言:javascript
复制
{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

有可能吗?

更新:

在更改了弹性数据结构之后,我得到了下面的MergeContent输出结果。现在,我在两个JSON字符串中都有了一个公共字段eid。为了获得一个JSON文件,我想通过eid合并这些字符串。我应该用哪个接线员?

代码语言:javascript
复制
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

我需要得到以下输出:

代码语言:javascript
复制
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4,"dates":{"event-date":["2017-08-08","2017-02-23"]}}

建议使用ExecuteScript合并文件。不过,我想不出该怎么做。这就是我试过的:

代码语言:javascript
复制
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "eid": obj['eid'],
          "zid": obj['zid'],
          ...
        }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))

flowFile1 = session.get()
flowFile2 = session.get()
if (flowFile1 != None && flowFile2 != None):
  # WHAT SHOULD I PUT HERE??
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-09-01 08:44:33

如何使用筛选从传入队列读取多个文件的示例

假设您有多对具有以下内容的流文件:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}

{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

eid字段的相同值提供了对之间的链接。

在合并之前,我们必须提取eid字段的值,并将其放入流文件的na属性中,以便进行快速筛选。

使用具有属性的EvaluateJsonPath处理器:

代码语言:javascript
复制
Destination :  flowfile-attribute 
eid         :  $.eid

在此之后,您将拥有流文件的新eid属性。

然后结合groovy语言和下面的代码使用ExecuteScript处理器:

代码语言:javascript
复制
import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

//get first flow file
def ff0 = session.get()
if(!ff0)return

def eid = ff0.getAttribute('eid')

//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
    public FlowFileFilterResult filter(FlowFile ff) {
        if( eid == ff.getAttribute('eid') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
        return FlowFileFilterResult.REJECT_AND_CONTINUE
    }
})

//let's assume you require two additional files in queue with the same attribute

if( !ffList || ffList.size()<1 ){
    //if less than required
    //rollback current session with penalize retrieved files so they will go to the end of the incoming queue
    //with pre-configured penalty delay (default 30sec)
    session.rollback(true)
    return
}

//let's put all in one list to simplify later iterations
ffList.add(ff0)

if( ffList.size()>2 ){
    //for example unexpected situation. you have more files then expected
    //redirect all of them to failure
    session.transfer(ffList, REL_FAILURE)
    return
}

//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
    session.read(ff).withStream{rawIn->
        def fjson = new JsonSlurper().parse(rawIn)
        json.putAll(fjson)
    }
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
    rawOut.withWriter("UTF-8"){writer->
        new JsonBuilder(json).writeTo(writer)
    }
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")

session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)
票数 4
EN

Stack Overflow用户

发布于 2017-08-31 13:54:50

将两种不同类型的数据连接起来并不是MergeContent真正要做的事情。

您需要编写一个自定义处理器或自定义脚本来理解传入的数据格式并创建新的输出。

如果您已经将ListenHttp连接到QueryElasticSearchHttp,这意味着您正在根据来自ListenHttp的流文件触发查询,那么您可能希望创建一个自定义的QueryElasticSearchHttp版本,该版本接受传入流文件的内容,并将其与任何传出结果连接起来。

以下是当前将查询结果写入流文件的位置:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java#L360

另一种选择是使用ExecuteScript并编写一个脚本,该脚本可以接受多个流文件并以您所描述的方式合并到一起。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45982908

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档