作者:刘金
链接:https://www.zhihu.com/question/276783606/answer/523437827
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

今天刚好搭建了一套filebeat=>kafka=>双路logstash=>双路es的数据流,不同的业务日志写到不同的topic,es索引名字是topic名字加年月日。logstash配置文件如下input{

kafka{
    bootstrap_servers => ["xxxxx:9092"]
    topics_pattern => ".*"
    auto_offset_reset => "earliest"
    decorate_events => true
    codec => json {
        charset => "UTF-8"
    }
}

}
output{

elasticsearch{
    hosts => "xxxx:9200"
    index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}

}其中最重要的是input中decorate_events => true,这样output中%{@metadata[topic]}才生效,不然要使用mutatefilter。官方文档如下Metadata fieldsThe following metadata from Kafka broker are added under the [@metadata] field:@metadata[topic]: Original Kafka topic from where the message was consumed.@metadata[consumer_group]: Consumer group@metadata[partition]: Partition info for this message.@metadata[offset]: Original record offset for this message.@metadata[key]: Record key, if any.@metadata[timestamp]: Timestamp when this message was received by the Kafka broker.Please note that @metadata fields are not part of any of your events at output time. If you need these information to be inserted into your original event, you’ll have to use the mutate filter to manually copy the required fields into your event.

最后修改:2020 年 07 月 10 日 06 : 34 PM
如果觉得我的文章对你有用,请随意赞赏