我正在收集由代理生成的日志。它生成一个大型JSON输出,我需要将其分解为较小的JSON文档,并使用sarama将其写入kafka。由于kafka消息的最大大小限制,我在分解成几个单独的JSON文档时遇到了问题。任何建议都将不胜感激。除了指示日志活动的日期/时间字段外,日志消息没有任何固定字段或数据类型
示例#1
[{"date":1596206786.847531,"rand_value":11885153394315023285},{"date":1596206787.847446,"rand_value":6208802038498064748},{"date":1596206788.847526,"rand_value":932964293334035461},{"date":1596206789.847568,"rand_value":13217490172547025909}]示例2
[{"date":1596206786.847743,"cpu_p":0,"user_p":0,"system_p":0,"cpu0.p_cpu":0,"cpu0.p_user":0,"cpu0.p_system":0,"cpu1.p_cpu":0,"cpu1.p_user":0,"cpu1.p_system":0,"cpu2.p_cpu":0,"cpu2.p_user":0,"cpu2.p_system":0,"cpu3.p_cpu":0,"cpu3.p_user":0,"cpu3.p_system":0,"cpu4.p_cpu":0,"cpu4.p_user":0,"cpu4.p_system":0,"cpu5.p_cpu":0,"cpu5.p_user":0,"cpu5.p_system":0,"cpu6.p_cpu":0,"cpu6.p_user":0,"cpu6.p_system":0,"cpu7.p_cpu":0,"cpu7.p_user":0,"cpu7.p_system":0},{"date":1596206787.847689,"cpu_p":1.25,"user_p":0.75,"system_p":0.5,"cpu0.p_cpu":2,"cpu0.p_user":1,"cpu0.p_system":1,"cpu1.p_cpu":1,"cpu1.p_user":0,"cpu1.p_system":1,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":3,"cpu3.p_user":2,"cpu3.p_system":1,"cpu4.p_cpu":1,"cpu4.p_user":0,"cpu4.p_system":1,"cpu5.p_cpu":1,"cpu5.p_user":1,"cpu5.p_system":0,"cpu6.p_cpu":2,"cpu6.p_user":2,"cpu6.p_system":0,"cpu7.p_cpu":0,"cpu7.p_user":0,"cpu7.p_system":0},{"date":1596206788.847754,"cpu_p":0.75,"user_p":0.5,"system_p":0.25,"cpu0.p_cpu":0,"cpu0.p_user":0,"cpu0.p_system":0,"cpu1.p_cpu":1,"cpu1.p_user":0,"cpu1.p_system":1,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":0,"cpu3.p_user":0,"cpu3.p_system":0,"cpu4.p_cpu":0,"cpu4.p_user":0,"cpu4.p_system":0,"cpu5.p_cpu":1,"cpu5.p_user":1,"cpu5.p_system":0,"cpu6.p_cpu":1,"cpu6.p_user":0,"cpu6.p_system":1,"cpu7.p_cpu":1,"cpu7.p_user":0,"cpu7.p_system":1},{"date":1596206789.847805,"cpu_p":0.8750000000000001,"user_p":0.5,"system_p":0.375,"cpu0.p_cpu":1,"cpu0.p_user":0,"cpu0.p_system":1,"cpu1.p_cpu":1,"cpu1.p_user":1,"cpu1.p_system":0,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":0,"cpu3.p_user":0,"cpu3.p_system":0,"cpu4.p_cpu":1,"cpu4.p_user":1,"cpu4.p_system":0,"cpu5.p_cpu":0,"cpu5.p_user":0,"cpu5.p_system":0,"cpu6.p_cpu":2,"cpu6.p_user":2,"cpu6.p_system":0,"cpu7.p_cpu":0,"cpu7.p_user":0,"cpu7.p_system":0}]package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
)
func main() {
ibytes, err := ioutil.ReadFile("hello.json")
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
var msgs []map[string]interface{}
err = json.Unmarshal(ibytes, &msgs)
if err != nil {
fmt.Println("Serialization Error", err)
os.Exit(-1)
}
for _,msg:=range msgs {
fmt.Println("%s",msg)
}
}我能够迭代各个消息,但不能以友好的格式写入kafka。
发布于 2020-08-11 12:45:55
我设法用下面的代码自己找到了解决方案
var PlaceHolder []interface{}
err=json.Unmarshal(dbytes,&PlaceHolder)
if err!=nil {
return errors.New(fmt.Sprintf("Error during JSON Unmarshalling (%s) ",err))
}
for _,doc:=range PlaceHolder {
event,_:=json.Marshal(doc)
if err!=nil{
log.Println("Skipping: Error during JSON Marshaling (%s) ",err)
continue
}
KafkaMessage:= &sarama.ProducerMessage{
Topic: this.Topic,
Value: sarama.StringEncoder(event),
}
msgs=append(msgs,KafkaMessage)
}https://stackoverflow.com/questions/63194229
复制相似问题