在上一篇中我们基本上完成了 ELK 和 Kafka 环境的安装,并且也通过几个简单的例子入门。现在我们就把搭建好的架构中加入 Kakfa 作为缓冲区。再来说一下,首先 Logstash 从日志源读取日志并且存储到 Kafka,然后 Logstash 再从 Kafka 中读取日志存储到 Elasticsearch。所以我们需要两步骤。
Logstash -> Kafka
Logstash 会直接把日志发送给 Elasticsearch,再由 Kibana 进行展示。因为因为 Logstash 会同步把日志传输到 Elasticsearch ,一旦 ElasticSearch 挂掉数据就有可能会丢失。于是,我们考虑利用 Kafka 作为缓冲区,让 Logstash 不受 Elasticsearch 的影响,第一步就是让 Logstash 把日志发送到 Kafka,这里 Logstash 相当于 Producer。直接来看看 Logstash 的配置文件:
input { file { path => ["/var/log/laravel/storage/logs/*.log"] }}filter { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:logtime}\] %{WORD:env}\.%{LOGLEVEL:level}\: %{GREEDYDATA:msg}" } }}output { kafka { bootstrap_servers => "kafka:9092" topic_id => "laravellog" }}
这里是用来读取 Laravel 项目的日志文件,我们在 input 和 output 中间加入了一个 filter,这是 Logstash的插件,用户格式化读取进来的数据。一般情况下,Laravel的日志文件大概是这样:
[2017-12-05 17:45:07] production.ERROR: 报错接口 {"api":"/admin/sales"}
分为几个部分,分别是日志的记录时间,产生日志的环境,日志的级别,日志的信息以及额外数据。所以我们进行了一个格式化,最后可以让他以 JSON 的形式存储到 Elasticsearch,默认没有 filter 的情况是直接一行存储进去。格式化后的数据就是这样的(部分):
{ "msg": "接口参数 {\"params\":[]} ", "path": "/var/log/fenyong/storage/logs/laravel-2017-12-05.log", "level": "ERROR", "env": "local", "logtime": "2017-12-05 17:54:50" }
Kafka -> Elasticsearch
利用 Logstash 从 Kafka 读取数据然后存储到 Elasticsearch,这里 Logstash 作为 Consumer,唯一需要注意的地方是要保证 Topic 的名称一致。
input { kafka { bootstrap_servers => "kafka:9092" topics => ["laravellog"] }}output { elasticsearch { hosts => "elasticsearch:9200" index => "laravellog" user => "elastic" password => "changeme" }}
这样我们就完成了从 Logstash 到 Kafka 再到 Elasticsearch 的日志存储,接下来就可以用 Kibana 来展示数据了。
至此,我们就成功把 Kafka 加入到日志分析平台的架构中。