首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将Wazuh与HELK集成

如何将Wazuh与HELK集成
EN

Stack Overflow用户
提问于 2019-04-27 17:09:29
回答 2查看 835关注 0票数 0

我想要将Wazuh服务器与HELK集成,但我做不到,logstash无法从kafka获得任何Wazuh警报,也无法向Elasticsearch发送Wazuh警报。我创建了名为“wazuh alert”的kafka主题,并在logstash配置文件中设置了我的配置。有什么问题吗?

在HELK中,Logstash配置放置在管道目录中,包含在以下文件中:

代码语言:javascript
复制
0002-kafka-input.conf                                       1534-winevent-application-filter.conf        8801-meta-command_line-enrichment_and_additions-filter.conf
0003-attack-input.conf                                      1535-winevent-wmiactivity-filter.conf        8802-meta-powershell-enrichment_and_additions-filter.conf
0004-beats-input.conf                                       1541-winevent-process-name-split.conf        8901-fingerprints-command_line-filter.conf
0005-nxlog-winevent-syslog-tcp-input.conf                   1542-winevent-process-ids-conversions.conf   8902-fingerprints-powershell.conf
0098-all-filter.conf                                        1543-winevent-user-ids-conversions.conf      9950-winevent-sysmon-output.conf
0099-all-fingerprint-hash-filter.conf                       1544-winevent-cleanup-other.conf             9951-winevent-security-output.conf
0301-nxlog-winevent-to-json.conf                            1545-winevent-security-conversions.conf      9952-winevent-system-output.conf
1010-winevent-winlogbeats-filter.conf                       9953-winevent-application-output.conf
1050-nxlog-winevent-to-winlogbeats-merge-filter.conf        2511-winevent-powershell-filter.conf         9954-winevent-powershell-output.conf
1216-attack-filter.conf                                     2512-winevent-security-schtasks-filter.conf  9955-winevent-wmiactivity-output.conf
1500-winevent-cleanup-no-dashes-only-values-filter.conf     8012-dst-ip-cleanups-filter.conf             9956-attack-output.conf
1521-winevent-conversions-ip-conversions-basic-filter.conf  8013-src-ip-cleanups-filter.conf             9957-winevent-sysmon-join-output.conf
1522-winevent-cleanup-lowercasing-windows-filter.conf       8014-dst-nat-ip-cleanups-filter.conf         9958-osquery-output.conf
1523-winevent-process-name-filter.conf                      8015-src-nat-ip-cleanups-filter.conf         9959-winevent-codeintegrity-output.conf
1524-winevent-process-ids-filter.conf                       8112-dst-ip-filter.conf                      9960-winevent-bits-output.conf
1531-winevent-sysmon-filter.conf                            8113-src-ip-filter.conf                      9961-winevent-dns-client-output.conf
1532-winevent-security-filter.conf                          8114-dst-nat-ip-filter.conf                  9962-winevent-firewall-advanced-output.conf
1533-winevent-system-filter.conf                            8115-src-nat-ip-filter.conf                  

我将0002-kafka-input.conf文件从:

代码语言:javascript
复制
input {
  kafka {
    bootstrap_servers => "helk-kafka-broker:9092"
    topics => ["winlogbeat", "SYSMON_JOIN","filebeat"]
    decorate_events => true
    codec => "json"
    auto_offset_reset => "latest"
    ############################# HELK Optimizing Throughput & Latency #############################
    fetch_min_bytes => "1"
    request_timeout_ms => "40000"
    ############################# HELK Optimizing Durability #############################
    enable_auto_commit => "false"
    ############################# HELK Optimizing Availability #############################
    connections_max_idle_ms => "540000"
    session_timeout_ms => "30000"
    max_poll_interval_ms => "300000"
    #############################
    max_poll_records => "500"
  }
}

下面是为Wazuh警报创建的新的kafka主题输入:

代码语言:javascript
复制
input {
  kafka {
    bootstrap_servers => "helk-kafka-broker:9092"
    topics => ["winlogbeat", "SYSMON_JOIN","filebeat"]
    decorate_events => true
    codec => "json"
    auto_offset_reset => "latest"
    tags => [ "winlog-sysmon" ]
    ############################# HELK Optimizing Throughput & Latency #############################
    fetch_min_bytes => "1"
    request_timeout_ms => "40000"
    ############################# HELK Optimizing Durability #############################
    enable_auto_commit => "false"
    ############################# HELK Optimizing Availability #############################
    connections_max_idle_ms => "540000"
    session_timeout_ms => "30000"
    max_poll_interval_ms => "300000"
    #############################
    max_poll_records => "500"
  }
  kafka {
    bootstrap_servers => "helk-kafka-broker:9092"
    topics => ["wazuh-alerts"]
    decorate_events => true
    codec => "json_lines"
    tags => [ "wazuh-alerts" ]
    auto_offset_reset => "latest"
  }
}

我创建了1546-wazuh alerts filter.conf文件,输入过滤器的内容如下:

代码语言:javascript
复制
filter {
  if "wazuh-alerts" in [tags]{
    if [data][srcip] {
      mutate {
        add_field => [ "@src_ip", "%{[data][srcip]}" ]
    }
    }
    if [data][aws][sourceIPAddress] {
      mutate {
        add_field => [ "@src_ip", "%{[data][aws][sourceIPAddress]}" ]
      }
    }
    geoip {
      source => "@src_ip"
      target => "GeoLocation"
      fields => ["city_name", "country_name", "region_name", "location"]
    }
    date {
      match => ["timestamp", "ISO8601"]
      target => "@timestamp"
    }
    mutate {
      remove_field => [ "timestamp", "beat", "input_type", "tags", "count", "@version", "log", "offset", "type", "@src_ip", "host"]
    }
  }
}

和用于logstash输出配置的9963-wazuh alerts output.conf文件:

代码语言:javascript
复制
output {
  if "wazuh-alerts" in [tags]{
    elasticsearch {
      hosts => ["helk-elasticsearch:9200"]
      index => "mitre-attack-%{+YYYY.MM.dd}"
      user => 'elastic'
      #password => 'elasticpassword'
    }
  }
}
EN

回答 2

Stack Overflow用户

发布于 2019-04-27 22:04:03

您的输出有一个使用tags字段的条件,但您正在过滤器块上删除它,它不起作用,因为当事件进入输出块时,该字段不存在。

在消息中保留tags字段,然后重试。

票数 0
EN

Stack Overflow用户

发布于 2019-04-28 16:40:36

我将logstash配置更改为:

代码语言:javascript
复制
input {
  kafka {
    bootstrap_servers => "helk-kafka-broker:9092"
    topics => ["winlogbeat", "SYSMON_JOIN","filebeat"]
    decorate_events => true
    codec => "json"
    auto_offset_reset => "latest"
    ############################# HELK Optimizing Throughput & Latency #############################
    fetch_min_bytes => "1"
    request_timeout_ms => "40000"
    ############################# HELK Optimizing Durability #############################
    enable_auto_commit => "false"
    ############################# HELK Optimizing Availability #############################
    connections_max_idle_ms => "540000"
    session_timeout_ms => "30000"
    max_poll_interval_ms => "300000"
    #############################
    max_poll_records => "500"
  }
  kafka {
    bootstrap_servers => "helk-kafka-broker:9092"
    topics => ["wazuh-alerts"]
    decorate_events => true
    codec => "json_lines"
    auto_offset_reset => "latest"
  }
}
代码语言:javascript
复制
filter {
  if [@metadata][kafka][topic] == "wazuh-alerts" {
    if [data][srcip] {
      mutate {
        add_field => [ "@src_ip", "%{[data][srcip]}" ]
    }
    }
    if [data][aws][sourceIPAddress] {
      mutate {
        add_field => [ "@src_ip", "%{[data][aws][sourceIPAddress]}" ]
      }
    }
    geoip {
      source => "@src_ip"
      target => "GeoLocation"
      fields => ["city_name", "country_name", "region_name", "location"]
    }
    date {
      match => ["timestamp", "ISO8601"]
      target => "@timestamp"
    }
    mutate {
      remove_field => [ "timestamp", "beat", "input_type", "tags", "count", "@version", "log", "offset", "type", "@src_ip", "host"]
    }
  }
}
代码语言:javascript
复制
output {
  if [@metadata][kafka][topic] == "wazuh-alerts" {
    file {
      path => "/var/log/greatlog.log"
    }
  }
}

如果可能,请参阅here上的HELK配置和here上的wazuh logstash配置。我应该使用kafka主题来发送wazuh警报日志到Helk,因为HELK使用kafka (我将wazuh警报日志与文件节拍一起发送到kafka )。现在,我如何更改HELK配置来执行此操作?谢谢

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55878952

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档