代码之家  ›  专栏  ›  技术社区  ›  gavenant

Filebeat/Logstash多行系统日志解析

  •  0
  • gavenant  · 技术社区  · 6 年前

    我正在将系统日志解析到ELK堆栈中。 系统日志示例 Jul 19 10:47:21 host-abc systemd: Started myservice Jul 19 10:47:29 host-abc systemd: Started service. Jul 19 10:47:29 host-abc systemd: Starting service...

    理想的做法是将第2行和第3行聚合为一条消息,例如返回: Started Service. Starting service... 因此,我希望在合并行之前,时间戳、主机名和程序名匹配。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Val    6 年前

    你可以使用 aggregate 过滤以达到你想要的。这个 聚合 filter支持基于公共字段值将多个日志行聚合为一个事件在你的例子中,公共领域是 @timestamp , hostname program_name .

    自从 syslog 输入已经正确地解析了syslog行,我们不需要摸索任何东西,因此我们可以利用 聚合 立即过滤。我们根据 SYSLOGBASE2 字段,它将包含最长为冒号字符的所有内容 : 是的。然后我们简单地收集所有消息,最后将这些消息连接成一个字符串事情是这样的:

    input {
      syslog {
        ...
      }
    }
    filter {
      aggregate {
        task_id => "%{SYSLOGBASE2}"
        code => "map['message'] ||= []; map['message'].push(event.get('message'));"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "user_id"
        timeout => 1 # 1 second timeout
        timeout_tags => ['_aggregatetimeout']
        timeout_code => "event.set('message', map['message'].join(' '))"
      }
    }
    output {
      ...
    }