你可以使用
aggregate
过滤以达到你想要的。这个
聚合
filter支持基于公共字段值将多个日志行聚合为一个事件在你的例子中,公共领域是
@timestamp
,
hostname
和
program_name
.
自从
syslog
输入已经正确地解析了syslog行,我们不需要摸索任何东西,因此我们可以利用
聚合
立即过滤。我们根据
SYSLOGBASE2
字段,它将包含最长为冒号字符的所有内容
:
是的。然后我们简单地收集所有消息,最后将这些消息连接成一个字符串事情是这样的:
input {
syslog {
...
}
}
filter {
aggregate {
task_id => "%{SYSLOGBASE2}"
code => "map['message'] ||= []; map['message'].push(event.get('message'));"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 1 # 1 second timeout
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('message', map['message'].join(' '))"
}
}
output {
...
}