在我的生产者中,我在标题中添加了一些自定义键。使用logstash,我需要获得有效负载,另外,我还需要读取我的键的值。
生产者代码:
kafkaTemplate
.send(
MessageBuilder.withPayload(myPayload)
.setHeader(KafkaHeaders.TOPIC, myTopic)
.setHeader("my-key1", "test-key1") // custom keys
.setHeader("my-key2", "test-key2") // custom keys
.build())Logstash conf
input{
kafka{
//kafka connection details
decorate_events => true
}
}
filter{
mutate => { add_field => { "kafka-topic" => "%{[@metadata][kafka][topic]}" } // this is working
mutate => { add_field => { "custom-key" => "%{[my-key1]}" } // this is not working
}
output{
// store the payload on ES index, _id = my-key1
}我看不懂我的定制钥匙。我错过了配置中的任何东西吗?
发布于 2022-09-13 07:05:25
在最近的Logstash发行版中,您可以设置decorate_events => "extended" (true意味着basic),然后使用以下方法获取头部:
%{[@metadata][kafka][headers][my-key1]}https://stackoverflow.com/questions/73697637
复制相似问题