我的csv包含
date,name,department
2020-2-4,sachith,{dep_name:computer,location:2323,3434}
2020-2-5,nalaka,{dep_name:engineering,location:3343,5454}最后的csv应该是:
date,name,dep_name,lat,lot
2020-2-4,sachith,computer,2323,3434
2020-2-5,nalaka,engineering,3343,5454这里,lat,lot是从location:3343,5454数据中提取的。
为此,我尝试使用UpdateRecord处理器。里面有一些${field.value:join(','):substringAfter('dep_name:')}
但这不管用。如何使用apache-nifi完成此操作?
发布于 2020-02-06 12:29:55
在groovyConsole中测试脚本的普通groovy:
import groovy.json.*
def parser = new JsonSlurper().setType(JsonParserType.LAX) //LAX to accept strings without double-quotes
def w = System.out
new StringReader('''date,name,department
2020-2-4,sachith,{"dep_name":"computer","location":"2323,3434"}
2020-2-5,nalaka,{"dep_name":"engineering","location":"3343,5454"}''').withReader{r->
r.eachLine{line, lineNum->
if(lineNum==1){
w<<line<<',lon,lat'<<'\n'
}else{
def row=line.split(',') //split line by coma
def json=row[2..-1].join(',') //join back to string starting from 3rd element
json = parser.parseText(json)
w<<"${row[0]},${row[1]},${json.dep_name},${json.location}"<<'\n'
}
}
}现在,为nifi ExecuteGroovyScript处理器修改的相同脚本:
import groovy.json.*
def ff=session.get()
if(!ff)return
def parser = new JsonSlurper().setType(JsonParserType.LAX)
ff.write{streamIn,streamOut->
streamIn.withReader('UTF-8'){r-> //convert in stream to reader
streamOut.withWriter('UTF-8'){w-> //convert out stream to writer
//go line by line
r.eachLine{line, lineNum->
if(lineNum==1){
w<<line<<',lon,lat'<<'\n' //for the first line just add some headers
}else{
def row=line.split(',') //split line by coma
def json=row[2..-1].join(',') //join back to string starting from 3rd element
json = parser.parseText(json)
w<<"${row[0]},${row[1]},${json.dep_name},${json.location}"<<'\n'
}
}
}
}
}
REL_SUCCESS<<ffhttps://stackoverflow.com/questions/60092622
复制相似问题