我正在尝试实现Filebeat和Logstash之间的Kafka。
当向Kafka发送文件节拍(它以文本格式提供)时,Logstash也是如此。为此,我假设Logstash无法处理输入。
以下是数据来自Kafka和直接从Filebeat到Logstash的不同之处。
来自Kafka:
{
"message" => "nice",
"tags" => [
[0] "kafka-stream"
],
"@timestamp" => 2020-06-30T08:29:29.071Z,
"@version" => "1"
}
{
"message" => "{\"@timestamp\":\"2020-06-30T08:34:28.178Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.8.0\"},\"agent\":{\"hostname\":\"Smits-MacBook-Pro.local\",\"ephemeral_id\":\"b9779246-3cc9-408b-83ac-e69eeef3cd28\",\"id\":\"864be1a9-e233-4d41-8624-cf94e916a0b7\",\"name\":\"Smits-MacBook-Pro.local\",\"type\":\"filebeat\",\"version\":\"7.8.0\"},\"log\":{\"offset\":11341,\"file\":{\"path\":\"/Users/Smit/Downloads/chrome/observability/spring_app_log_file.log\"}},\"message\":\"2020-06-30 16:34:20.328 INFO 63741 --- [http-nio-8080-exec-7] c.e.o.controller.HomeController : AUDIT_LOG >> customer id a8703\",\"tags\":[\"observability\",\"audit\"],\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"1.5.0\"},\"host\":{\"name\":\"Smits-MacBook-Pro.local\"}}",
"tags" => [
[0] "kafka-stream"
],
"@timestamp" => 2020-06-30T08:34:29.222Z,
"@version" => "1"
}来自Filebeat:
{
"type" => "log",
"@timestamp" => 2020-06-30T04:37:18.935Z,
"@version" => "1",
"log" => {
"file" => {
"path" => "/Users/Smit/Downloads/chrome/observability/spring_app_log_file.log"
},
"offset" => 10846
},
"input" => {
"type" => "log"
},
"ecs" => {
"version" => "1.5.0"
},
"message" => "2020-06-30 12:37:16.900 INFO 63741 --- [http-nio-8080-exec-3] c.e.o.controller.HomeController : AUDIT_LOG >> customer id d6ebe",
"tags" => [
[0] "observability",
[1] "audit",
[2] "beats",
[3] "beats_input_codec_plain_applied"
],
"hostname" => {
"name" => "Smits-MacBook-Pro.local"
},
"agent" => {
"type" => "filebeat",
"version" => "7.8.0",
"name" => "Smits-MacBook-Pro.local",
"hostname" => "Smits-MacBook-Pro.local",
"ephemeral_id" => "1ca4e838-eeaa-4b87-b52a-89fa385865b8",
"id" => "864be1a9-e233-4d41-8624-cf94e916a0b7"
}
}现在,当我在Kibana中可视化数据时:
以下是将log直接从Filebeat发送到Logstash时的输出:

当log从Filebeat到Kafka再到Logstash时,输出如下:

如果你需要更多的信息,请告诉我。
ELK中每个产品的配置也在这里:https://github.com/shah-smit/observability-spring-demo
发布于 2020-07-08 14:39:30
当您直接将日志发送到logstash时,您使用的是beats协议,该协议会向您的事件添加一些“额外”字段:
https://www.elastic.co/guide/en/beats/filebeat/current/exported-fields-beat-common.html
我真的不确定kafka输出使用的是哪种协议,但可以肯定的是,这不会添加节拍额外的字段。
因此,当您将日志发送到kafka,然后从logstash中读取它们时,您拥有的字段会更少。
https://stackoverflow.com/questions/62658134
复制相似问题