目标
将kafka中的日志写入elasticsearch,并要求支持以下功能:
- 根据日志时间建立index,便于后续对index的管理
- 使用log_id作为document_id,保持写入es的幂等性
- 对建立的多个index, 设置相同的alias (业务方使用alias进行query,对具体index不感知)
- 由于日志时间为epoch_seconds,无法被es自动引用为时间,需要进行字段mapping
软件版本及日志格式
软件版本
kafka: 2.1.0
elasticsearch: 6.6.0
kakfa-connect-elasticsearch: v5.1.1
logstash: 6.6.0
日志格式
日志是以json格式存储在topic中,topic_name为logs
1 | { |
计算方案选型
kafka-connect-elasticsearch
kafka-connect-elasticseach是confluent(kafka团队的母公司)提供的kafka connect的一个plugin, 用于将kakfa的数据导入es
相关地址及参考链接如下:
logstash
logstash 提供kafka input plugin 和 elasticsearch output plugin, 也支持将kafka数据导入es
对比
kafka-connect-elasticsearch是基于kafka connect api实现的plugin,功能强在kafka端,目前该插件对elasticsearch的支持有限,而且迭代也较慢,而且目前部署对devops也不友好,不满足我们需要的功能
logstash是elasticsearch较早推出的收集及分发数据的组件,功能强在elasticsearch端,功能强大且较成熟,并且支持所需的所有功能。
因此,决定使用logstash来满足我们的需求
实现
任务分拆
- 支持按照日志创建index名称: 在output中指定index的命名方式
- 支持使用log_id作为es的document_id
- 支持设置别名
- 支持将日志时间戳mapping 为document的时间戳
要实现上述目标,只需要提供两个配置文件:
1 pipeline.yaml: 用于定义input fliter output等信息
2 index-template.json: 用于定义index模板,将index名与alias动态关联,并且创建mapping
pipeline.yaml
1 | # configuration for transformming events from kafka to elasicsearch |
解释:
- input: 配置了kafka相关的信息
- fliter: 由于index命名时所用的时间为logstash的元数据
@timestamp
, 因此需要在fliter
中使用date
插件,用日志时间覆写原始的@timestamp
(原始的@timestamp
是日志摄入时间,而非日志时间) - output: 配置了elasticsearch的信息,以及使用log_id作为document_id, index的命名、及index使用的模板路径
config/index-template.json
index-template.json
1 | { |
解释:
- index_patterns: 确定了该index模板作用的index范围
- mappings: 配置使用日志的
timestamp
字段,作为es中document的日期字段 - alias: 为新增的index,创建统一的别名
运行
下载logstash(current version = 6.6.0), 并将上述两个文件置于path/config
目录下, 然后运行如下命令:
1 | $ bin/logstash -f config/pipeline.yaml |
部署
logstash已经提供了官方镜像, 因此可以使用docker-compose或者kubernetes来部署logstash,此处不再赘述