我正在从事一个个人项目,并且在学习JSON、NiFi、SQL等方面非常新,所以请原谅这里使用的任何令人困惑的语言或可能非常明显的解决方案。我可以根据需要澄清。
我需要从网站的API调用中获取JSON输出,并将其插入我设置的MariaDB本地服务器中的一个表中。问题是JSON数据是嵌套的,我需要插入的两个关键数据片段被用作变量键对象,而不是值,因此我不知道如何提取它并将其放入数据库表中。本质上,我认为我需要识别JSON表达式的不同部分,并将它们作为值插入,但我不知道如何这样做。
我特别使用过EvaluateJSON、SplitJSON和FlattenJSON处理器,但我无法使它工作。我所能做的就是得到整个表达的结果,而不是它的每一部分。
{"5381":{"wind_speed":4.0,"tm_st_snp":26.0,"tm_off_snp":74.0,"tm_def_snp":63.0,"temperature":58.0,"st_snp":8.0,"punts":4.0,"punt_yds":178.0,"punt_lng":55.0,"punt_in_20":1.0,"punt_avg":44.5,"humidity":47.0,"gp":1.0,"gms_active":1.0},
"1023":{"wind_speed":4.0,"tm_st_snp":26.0,"tm_off_snp":82.0,"tm_def_snp":56.0,"temperature":74.0,"off_snp":82.0,"humidity":66.0,"gs":1.0,"gp":1.0,"gms_active":1.0},
"5300":{"wind_speed":17.0,"tm_st_snp":27.0,"tm_off_snp":80.0,"tm_def_snp":64.0,"temperature":64.0,"st_snp":21.0,"pts_std":9.0,"pts_ppr":9.0,"pts_half_ppr":9.0,"idp_tkl_solo":4.0,"idp_tkl_loss":1.0,"idp_tkl":4.0,"idp_sack":1.0,"idp_qb_hit":2.0,"humidity":100.0,"gp":1.0,"gms_active":1.0,"def_snp":23.0},
"608":{"wind_speed":6.0,"tm_st_snp":20.0,"tm_off_snp":53.0,"tm_def_snp":79.0,"temperature":88.0,"st_snp":4.0,"pts_std":5.5,"pts_ppr":5.5,"pts_half_ppr":5.5,"idp_tkl_solo":4.0,"idp_tkl_loss":1.0,"idp_tkl_ast":1.0,"idp_tkl":5.0,"humidity":78.0,"gs":1.0,"gp":1.0,"gms_active":1.0,"def_snp":56.0},
"3396":{"wind_speed":6.0,"tm_st_snp":20.0,"tm_off_snp":60.0,"tm_def_snp":70.0,"temperature":63.0,"st_snp":19.0,"off_snp":13.0,"humidity":100.0,"gp":1.0,"gms_active":1.0}}这是带有几千行的输出的快照。上面看到的每个数字键(5381、1023、5300等)都是用于下列统计的玩家ID。我有一个由三列组成的表:Player ID、Stat ID和Stat Value。例如,我需要将第一个片段插入到表中,如下所示:
Player ID Stat ID Stat Value
5381 wind_speed 4.0
5381 tm_st_snp 26.0
5381 tm_off_snp 74.0以此类推,对于每一块数据。但我不知道如何让NiFi选择要插入正确列中的正确数据片段。
发布于 2019-08-08 21:17:56
我相信,可以将json转换为一种格式:
[
{"playerId":"5381", "statId":"wind_speed", "statValue": 0.123},
{"playerId":"5381", "statId":"tm_st_snp", "statValue": 0.456},
...
] 然后将PutDatabaseRecord与json读取器结合使用。
另一种方法是使用ExecuteGroovyScript处理器。
添加名为SQL.mydb的新参数,并将其链接到DBCP控制器服务。

并使用以下脚本作为Script Body参数:
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
def ff=session.get()
if(!ff)return
//read flow file content and parse it
def body = ff.read().withReader("UTF-8"){reader->
new JsonSlurper().parse(reader)
}
def results = []
//use defined sql connection to create a batch
SQL.mydb.withTransaction{
def cmd = 'insert into mytable(playerId, statId, statValue) values(?,?,?)'
results = SQL.mydb.withBatch(100, cmd){statement->
//run through all keys/subkeys in flow file body
body.each{pid,keys->
keys.each{k,v->
statement.addBatch(pid,k,v)
}
}
}
}
//write results as a new flow file content
ff.write("UTF-8"){writer->
new JsonBuilder(results).writeTo(writer)
}
//transfer to success
REL_SUCCESS << ffhttps://stackoverflow.com/questions/57380000
复制相似问题