站点上线联动ELK日志录入

ELK架构日志处理逻辑:

1、业务层Filebeat安装时会自动获取主机运行站点域名及环境信息新增channel及env标签,并将channel的值作为kafka的topic信息

2、Kafka收到Filebeat的新增字段及Topic信息,自动创建Topic信息,以等待logstash消费

3、Logstash根据脚本自动生成input及output配置

这里的topic一定和filebeat的channel一致。

示范:

filebeat层:

- type: log
  processors:
  - add_fields:
      fields:
        env: "prod"         ## ansible调用Python根据网段信息自动判断生成
        ip: "10.12.11.27"   ## ansible调用Python根据网段信息自动判断生成
        apptype: "service"  ## ansible调用Python根据域名自动判断生成
        channel: "cms.prod.tarscorp.com"   ##ansible调用Python根据站点目录生成
  enabled: true
  paths:
    - /data1/logs/cms.prod.tarscorp.com/*.log

output.kafka:

  codec.json:
    pretty: true
    escape_html: false

  hosts: ["kafka1.mgt.tarscorp.com:9092", "kafka2.mgt.tarscorp.com:9092", "kafka3.mgt.tarscorp.com:9092"]

  topic: ‘cms.prod.tarscorp.com‘        ## topic和channel取自同一数据
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

Kafka层:忽略,集群+开启自动创建Topic即可

logstash层:

vim prod-input.conf    ##输入信息

kafka {
      topics => "cms.prod.tarscorp.com"     ## kafka中的topic信息
      bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092"
      decorate_events => false
      group_id => "logstash-tars"
      # consumer_threads => 5
      client_id => "mgt-elk-logstash1-prod"
      codec => "json"
      add_field => {"topic"=>"cms.prod.tarscorp.com"}   ##这里主要是为了方便logstash做判断
   }

vim prod-javasite.conf    ##输出信息
if [topic] == "cms.prod.tarscorp.com" {
                elasticsearch {
                        hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"]
                        manage_template => false
                        index => "prod-javasite-%{+YYYY.MM.dd}"
                }
        }

说明:我们这里技术栈为Java spring,所以所有的Java站点都会放在[prod-javasite-%{+YYYY.MM.dd}]的索引下,由于配置环节众多,所以在站点上架时,采用分发机器先部署服务,然后ansible部署filebeat,其中ansible会通过Python脚本来获取服务器网段及站点信息通过templates来补充channel、Topic、apptype、env、ip标签生成配置,实现自动判断,减轻运维参与环节负担。

使用脚本生成举例:

./add_info.py --env prod --topic cms.prod.tarscorp.com --module javasite

vim add_info.py

#!/usr/bin/env python3

import os,sys,argparse

parser = argparse.ArgumentParser(description=‘Logstash configuration file add tools‘)

parser.add_argument(‘--env‘,type=str,required=True,help=‘环境信息‘)
parser.add_argument(‘--topic‘,type=str,required=True,help=‘Topic信息‘)
parser.add_argument(‘--module‘,type=str,required=True,help=‘模块信息‘)

args = parser.parse_args()

env_info = args.env
topic_name = args.topic
module_info = args.module
date = "%{+YYYY.MM.dd}"

template_input = ‘‘‘
   kafka {
      topics => "%s"
      bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092"
      decorate_events => false
      group_id => "logstash-tars"
      # consumer_threads => 5
      client_id => "mgt-elk-logstash1-dev"
      codec => "json"
      add_field => {"topic"=>"%s"}
   }
}
‘‘‘ %(topic_name,topic_name)

template_output = ‘‘‘
    if [topic] == "%s" {
        elasticsearch {
            hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"]
            manage_template => false
            index => "%s-%s-%s"
        }
    }
}
‘‘‘ %(topic_name,env_info,module_info,date)

init_input = ‘‘‘
input {

‘‘‘
init_output = ‘‘‘
output {

‘‘‘

path_home = "/etc/logstash/conf.d/"
input_file = "/etc/logstash/conf.d/%s-input.conf" % (env_info)
output_file = "/etc/logstash/conf.d/%s-%s.conf" % (env_info, module_info)

if os.path.exists(path_home) == False:
    print(‘请在logstash主机运行该脚本‘)
    exit(code=255)

if os.path.exists(input_file) == False:
    with open(input_file, mode=‘w‘, encoding=‘utf-8‘) as f:
        f.write(init_input)

if os.path.exists(output_file) == False:
    with open(output_file, mode=‘w‘, encoding=‘utf-8‘) as f:
        f.write(init_output)

with open(input_file,mode=‘rb+‘) as f:
    f.seek(-2,2)
    f.write(template_input.encode(‘utf-8‘))

with open(output_file,mode=‘rb+‘) as f:
    f.seek(-2,2)
    f.write(template_output.encode(‘utf-8‘))