首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >grokparsefailure失败,有多个if [type] - logstash配置

grokparsefailure失败,有多个if [type] - logstash配置
EN

Stack Overflow用户
提问于 2015-11-13 13:31:02
回答 1查看 140关注 0票数 0

好的--我已经在这个配置文件上绞尽脑汁好几天了,但收效甚微(我对logstash/ELK堆栈还很陌生)。我遇到的问题是,当我将两个logstash配置放在同一个目录中时,我在第二个配置上得到了一个grok错误。这意味着,001将正常工作,002将产生错误。如果我只用一个配置运行logstash (不管是哪一个配置),一切都运行得很好。当组合在一起时,一个工作,另一个失败。我已经将这两个conf文件合并到一个conf文件中,但同样的问题仍然存在。下面是配置的组合版本和syslog的示例。任何帮助都将不胜感激!

代码语言:javascript
复制
input {
  file {
    path => ["/var/log/pantraffic.log"]
    #start_position => "beginning"
    type => "pantraffic"
  }
   file {
    path => ["/var/log/panthreat.log"]
    #start_position => "beginning"
    type => "panthreat"
  } 
}

filter {
  if [type] == "pantraffic" {
    grok {
      #patterns_dir => "/opt/logstash/patterns"
      match => [ "message_traffic", "%{TIMESTAMP_ISO8601:@timestamp} %       {HOSTNAME:syslog_host} %{GREEDYDATA:traffic_message}"]
    }
    syslog_pri { }
  }
   csv {
      source => "traffic_message"
columns => [ "PaloAltoDomain","ReceiveTime","SerialNum","Type","Threat-   ContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourceUser","DestinationUser","Application","VirtualSystem","SourceZone","DestinationZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SourcePort","DestinationPort","NATSourcePort","NATDestinationPort","Flags","IPProtocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","seqno","actionflags","SourceCountry","DestinationCountry","cpadding","pkts_sent","pkts_received","sessionEndReason" ]
}

 date {
      #timezone => "America/Chicago"
      match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]    
}

mutate {
      convert => [ "Bytes", "integer" ]
      convert => [ "BytesReceived", "integer" ]
      convert => [ "BytesSent", "integer" ]
      convert => [ "ElapsedTimeInSec", "integer" ]
      convert => [ "geoip.area_code", "integer" ]
      convert => [ "geoip.dma_code", "integer" ]
      convert => [ "geoip.latitude", "float" ]
      convert => [ "geoip.longitude", "float" ]
      convert => [ "NATDestinationPort", "integer" ]
      convert => [ "NATSourcePort", "integer" ]
      convert => [ "Packets", "integer" ]
      convert => [ "pkts_received", "integer" ]
      convert => [ "pkts_sent", "integer" ]
      convert => [ "seqno", "integer" ]
      gsub => [ "Rule", " ", "_",
                "Application", "( |-)", "_" ]
      remove_field => [ "message_traffic", "traffic_message" ]
    }

if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-   9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "SourceAddress"
           target => "SourceGeo"
      }
      if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
        mutate {
          replace => [ "SourceGeo.location", "" ]
        }
      }
    }
if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "DestinationAddress"
           target => "DestinationGeo"
      }

      if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
        mutate {
          replace => [ "DestinationAddress.location", "" ]
        }
      }
    }

  if [SourceAddress] and [DestinationAddress] {
    fingerprint {
      concatenate_sources => true
      method => "SHA1"
      key => "logstash"
      source => [ "SourceAddress", "SourcePort", "DestinationAddress",  "DestinationPort", "IPProtocol" ]
    }
  }
###########################################################################
if [type] == "panthreat" {
    grok {

      match => [ "message", "%{TIMESTAMP_ISO8601:@timestamp} % {HOSTNAME:syslog_host} %{GREEDYDATA:threat_message}"]
    }
        syslog_pri { }
  }
   csv {
      source => "threat_message"
columns => [ "Domain","ReceiveTime","Serial","Type","ThreatContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourceUser","DestinationUser","Application","VirtualSystem","SourceZone","DestinationZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SourcePort","DestinationPort","NATSourcePort","NATDestinationPort","Flags","IPProtocol","Action","URL","ThreatContentName","Category","Severity","Direction","seqno","actionflags","SourceCountry","DestinationCountry","cpadding","contenttype","pcap_id","filedigest","cloud","url_idx","user_agent","filetype","xff","referer","sender","subject","recipient","reportid" ]
}

date {
      #timezone => "America/Chicago"
      match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
}

mutate {
      #convert => [ "Bytes", "integer" ]
      #convert => [ "BytesReceived", "integer" ]
      #convert => [ "BytesSent", "integer" ]
      #convert => [ "ElapsedTimeInSec", "integer" ]
      convert => [ "geoip.area_code", "integer" ]
      convert => [ "geoip.dma_code", "integer" ]
      convert => [ "geoip.latitude", "float" ]
      convert => [ "geoip.longitude", "float" ]
      convert => [ "NATDestinationPort", "integer" ]
      convert => [ "NATSourcePort", "integer" ]
      #convert => [ "Packets", "integer" ]
      #convert => [ "pkts_received", "integer" ]
      #convert => [ "pkts_sent", "integer" ]
      #convert => [ "seqno", "integer" ]
      gsub => [ "Rule", " ", "_",
                "Application", "( |-)", "_" ]
      remove_field => [ "message", "threat_message" ]
    }

if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6- 9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "SourceAddress"
           target => "SourceGeo"
      }
      if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
        mutate {
          replace => [ "SourceGeo.location", "" ]
        }
      }
    }
 if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "DestinationAddress"
           target => "DestinationGeo"
      }

      if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
        mutate {
          replace => [ "DestinationAddress.location", "" ]
         }
       }
    }

  if [SourceAddress] and [DestinationAddress] {
    fingerprint {
      concatenate_sources => true
      method => "SHA1"
      key => "logstash"
      source => [ "SourceAddress", "SourcePort", "DestinationAddress",  "DestinationPort", "IPProtocol" ]
    }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

日志示例:

代码语言:javascript
复制
panthreat log:
015-11-13T04:53:28-06:00 PA-200 1,2015/11/13    04:53:28,0011122223333,THREAT,vulnerability,1,2015/11/13 04:53:28,73.222.111.1,4.4.4.4,0.0.0.0,0.0.0.0,rule1,,,dns,vsys1,trust,untrust,ethernet1/2,ethernet1/1,Default_Forwarder,2015/11/13 04:53:28,3602,1,34830,53,0,0,0x0,udp,drop-all-packets,"",Test(41000),0,any,high,client-to-server,37,0x0,US,US,0,,0,,,0,,,,,,,

pantraffic log:
2015-11-13T07:34:22-06:00 PA-200 1,2015/11/13 07:34:21,001112223334,TRAFFIC,end,1,2015/11/13 07:34:21,73.22.111.1,4.3.2.1,0.0.0.0,0.0.0.0,rule1,,,facebook-base,vsys1,trust,untrust,ethernet1/2,ethernet1/1,Default_Forwarder,2015/11/13 07:34:21,6385,1,63121,443,0,0,0x53,tcp,allow,6063,2285,3778,29,2015/11/13 07:34:05,2,social-networking,0,15951,0x0,US,IE,0,17,12,tcp-fin
EN

回答 1

Stack Overflow用户

发布于 2015-11-19 14:33:11

我想你把你的结束括号弄乱了。例如,检查此块(您的第一个if):

代码语言:javascript
复制
  if [type] == "pantraffic" {
    grok {
      #patterns_dir => "/opt/logstash/patterns"
      match => [ "message_traffic", "%{TIMESTAMP_ISO8601:@timestamp} %       {HOSTNAME:syslog_host} %{GREEDYDATA:traffic_message}"]
    }
    syslog_pri { }
  }

最后一个结束括号在这里可能是错误的。你不想在这里关闭if代码块,但就在你开始"panthreat“代码块之前。"panthreat“if块也有同样的问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33686503

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档