我想将mysql general_log添加到logstash中。我已经成功地以CSV格式制作了mysql日志,使用CSV模式,应该没有什么更容易做的了。这是我的general_log条目:
"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Query","SET NAMES utf8"
"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Query","SELECT @@SESSION.sql_mode"
"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Query","SET SESSION sql_mode='NO_ENGINE_SUBSTITUTION'"
"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Init DB","mrr"这是我的logstash.conf:
input {
lumberjack {
port => 5000
type => "logs"
ssl_certificate => "/etc/pki/tls/certs/logstash_forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash_forwarder.key"
}
}
filter {
if [type] == "nginx-access" {
grok {
match => { 'message' => '%{IPORHOST:clientip} %{NGUSER:indent} %{NGUSER:agent} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|)\" %{NUMBER:answer} (?:%{NUMBER:byte}|-) (?:\"(?:%{URI:referrer}|-))\" (?:%{QS:referree}) %{QS:agent}' }
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
if [type] == "mysql-general" {
csv {
columns => [ "@timestamp(6)", "user_host", "thready_id", "server_id", "ctype", "query" ]
separator => ","
}
grok {
match => { "user_host", "%{WORD:remoteuser}\[%{WORD:localuser}\] \@ %{IPORHOST:dbhost} \[(?:%{IPORHOST:qhost}|-)\]" }
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
host => "172.17.0.5"
cluster => "z0z0.tk-1.5"
flush_size => 2000
}
}但是,user_host列有以下格式:"mrr[mrr] @ localhost []"和我希望将其至少分为两个不同的值,一个用于用户,另一个用于主机。
我已经在logstash上运行了这个配置,由于grok解析,它在_grokparsefailure中结束了。
当我在配置文件上运行checktest选项时,我得到了以下输出:
Error: Expected one of #, => at line 36, column 26 (byte 1058) after filter {
if [type] == "nginx-access" {
grok {
match => { 'message' => '%{IPORHOST:clientip} %{NGUSER:indent} %{NGUSER:agent} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|)\" %{NUMBER:answer} (?:%{NUMBER:byte}|-) (?:\"(?:%{URI:referrer}|-))\" (?:%{QS:referree}) %{QS:agent}' }
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
if [type] == "mysql-general" {
csv {
columns => [ "@timestamp(6)", "user_host", "thready_id", "server_id", "ctype", "query" ]
separator => ","
}
grok {
match => { "user_host"你能告诉我出什么事了吗?
发布于 2015-08-16 15:00:37
开始起作用了。实际上,这个错误出现在grok模式中,因为第一个用户和最后一个主机在某个时候出现了emtpy,grok在解析中失败了,所以我不得不添加一些括号来接受空字符串。当前的logstash.conf如下所示:
input {
lumberjack {
port => 5000
type => "logs"
ssl_certificate => "/etc/pki/tls/certs/logstash_forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash_forwarder.key"
}
}
filter {
if [type] == "nginx-access" {
grok {
match => { 'message' => '%{IPORHOST:clientip} %{NGUSER:indent} %{NGUSER:agent} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|)\" %{NUMBER:answer} (?:%{NUMBER:byte}|-) (?:\"(?:%{URI:referrer}|-))\" (?:%{QS:referree}) %{QS:agent}' }
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
if [type] == "mysql-general" {
csv {
columns => [ "@timestamp(6)", "user_host", "thready_id", "server_id", "ctype", "query" ]
separator => ","
}
grok {
match => { "user_host", "(?:%{WORD:remoteuser}|)\[%{WORD:localuser}\] \@ %{IPORHOST:dbhost} \[(?:%{IPORHOST:qhost}|)\]" }
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
host => "172.17.0.5"
cluster => "clustername"
flush_size => 2000
}
}谢谢你的帮助和建议
发布于 2015-08-16 02:26:49
csv{}过滤器只是解析,嗯,逗号分隔的值。如果希望解析其他格式的字段,请在csv{}筛选器创建该字段后,在user_host列上使用grok{}。
编辑:要更明确。
运行csv过滤器:
csv {
columns => [ "@timestamp(6)", "user_host", "thready_id". "server_id", "ctype", "query" ]
separator => ","
}这将为您创建一个名为"user_host“的字段。
然后,您可以通过一个grok过滤器运行这个字段,如以下(未经测试的)一个:
grok {
match => [ "user_host", "%{WORD:myUser}\[%{WORD}\] @ %{WORD:myHost} \[\]" ]
}这将为您创建另外两个字段:myUser和myHost。
https://stackoverflow.com/questions/32028197
复制相似问题