Logstash v7.3 处理基于json filter的日志处理与转换

利用Logstash 提供的filter机制,可以方便地按照用户要求解析日志信息。如果需要抽取、分析、计算可能蕴含的元数据信息,可以利用Logstash 中的filter 机制完成相应的功能。

Logstash v7.3的安装,可以参考如下文章:

Logstash 7.3 概述、原理、安装、配置以及实际应用演示

Logstash v7.3 处理基于json filter的日志处理与转换

一、基于json filter的日志处理与转换

如果日志数据源是json格式,则利用json filter,可以解析json 数据。

Logstash v7.3 处理基于json filter的日志处理与转换

如下是基于json filter的Logstash 配置文件,文件名为 filter-demo.conf。

input {

stdin{}

}

filter {

json {

source=>"message"

}

}

output {

stdout { codec => "rubydebug"}

}

在input、output 中间的filter 部分嵌入了对json 数据的解析方法,其中的source => 是指让 json filter 插件解析指定字段的json 格式数据。

sournce=>"message" 是让 json filter 解析message 字段的数据。

启动logstash,并指定配置文件:

Logstash v7.3 处理基于json filter的日志处理与转换

当我们输入如下json 数据后,Logstash的filter 会对输入信息进行加工处理,经过处理后返回的结果如下:

{"name":"rickie", "age":16}

{

"host" => "Thinkpad-T460P",

"message" => "{\"name\":\"rickie\", \"age\":16}\r",

"@timestamp" => 2019-10-04T05:48:21.122Z,

"@version" => "1",

"name" => "rickie",

"age" => 16

}

其中,name和age是解析出来的结构化数据。

对比一下,没有配置json filter 插件的情况。

如下是没有基于json filter的Logstash 配置文件,文件名为 no-filter-demo.conf。

input {

stdin{}

}

output {

stdout { codec => "rubydebug"}

}

启动logstash,并指定配置文件:

Logstash v7.3 处理基于json filter的日志处理与转换

当我们输入如下json 数据后,Logstash 处理后返回的结果如下:

{"name":"rickie", "age":16}

{

"@version" => "1",

"@timestamp" => 2019-10-04T05:53:26.708Z,

"host" => "Thinkpad-T460P",

"message" => "{\"name\":\"rickie\", \"age\":16}\r"

}

没有前面 json filter 处理后的name、age节点信息。

二、将处理后的日志通过TCP输出

前面的输出通过stdout{} 标准输出,这里演示Logstash 将处理后的日志从TCP Socket 中输出。

Logstash 配置文件,输出中添加tcp 输出,文件名为 tcp-output.conf:

input {

stdin{}

}

filter {

json {

source=>"message"

}

}

output {

stdout { codec => "rubydebug"}

tcp {

host=>"127.0.0.1"

port=>6000

mode=>"client"

}

}

设计一个Java应用程序作为TCP Server端,接收来自Logstash output输出的日志信息。

package com.rickie.Output;

import java.io.IOException;

import java.io.InputStream;

import java.net.ServerSocket;

import java.net.Socket;

public class TCPServer_output {

public static void main(String[] args) throws IOException {

ServerSocket serverSocket = new ServerSocket(6000);

// 阻塞等待消息

Socket socket = serverSocket.accept();

InputStream inputStream = socket.getInputStream();

while (true) {

byte[] buf = new byte[1024];

int len = inputStream.read(buf);

System.out.println(new String(buf, 0, len));

}

}

}

TCP 端口号和Logstash 中配送的port 需要保持一致。

依次启动Java应用程序、Logstash:

(1)启动Java应用程序(TCP Server)

Logstash v7.3 处理基于json filter的日志处理与转换

(2)启动Logstash,并设置配置文件

bin\logstash -f ..\logstash-conf\tcp-output.conf

Logstash v7.3 处理基于json filter的日志处理与转换

输入:{"name":"rickie", "age":16}

在TCP Server端输出的内容:

Logstash v7.3 处理基于json filter的日志处理与转换

相关推荐