logstash 部署

环境信息

使用的 hadoop 完全分布式集群

1
2
3
192.168.2.241 hadoop01 
192.168.2.242 hadoop02
192.168.2.243 hadoop03

简介

Logstash 实现的功能主要分为接收数据、解析过滤并转换数据、输出数据三个部分。这三个部分对应的插件依次是

  1. input 插件,必选
  2. filter 插件,可选
  3. output 插件,必选

配置文件实例

1
2
3
4
5
6
7
8
9
input {
输入插件
}
filter {
过滤匹配插件
}
output {
输出插件
}

logstash 安装

官网 https://www.elastic.co/downloads/logstash

所有节点 root 用户

1
2
3
4
5
6
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.2.0-linux-x86_64.tar.gz

mkdir -p /opt/bigdata/logstash
tar -zxf logstash-8.2.0-linux-x86_64.tar.gz -C /opt/bigdata/logstash
cd /opt/bigdata/logstash/
ln -s logstash-8.2.0 current

简单测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ cd /opt/bigdata/logstash/current
$ bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

123 # 输入
{
"message" => "123 # 输入",
"host" => {
"hostname" => "hadoop01"
},
"event" => {
"original" => "123 # 输入"
},
"@timestamp" => 2022-05-22T11:14:26.267984Z,
"@version" => "1"
}
  1. -e : 执行的意思;
  2. input : 选择 stdin
  3. output : 选择 stdout, codec 是个插件, 表示输出的格式;rubydebug 是专门用来做测试的格式,一般用来在终端输出 JSON 格式。

也可以写入到配置文件,上述案例
/opt/bigdata/logstash/current/logstash-simple.conf

1
2
3
4
5
6
input { 
stdin { }
}
output {
stdout { codec => rubydebug }
}

启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ bin/logstash -f logstash-simple.conf

123 # 输入
{
"message" => "123 # 输入",
"@version" => "1",
"@timestamp" => 2022-05-22T11:22:06.980497Z,
"event" => {
"original" => "123 # 输入"
},
"host" => {
"hostname" => "hadoop02"
}
}

input 插件

配置示例

  1. Logstash 从文件读取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    input {
    file {
    path => ["/var/log/secure"]
    type => "system"
    start_position => "beginning"
    }
    }
    output {
    stdout{
    codec=>rubydebug
    }
    }
  2. 从标准输入读取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    input{
    stdin{
    add_field=>{"key"=>"ok"}
    tags=>["add field"]
    type=>"mytype"
    }
    }
    output {
    stdout{
    codec=>rubydebug
    }
    }

输入 hello world

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"host" =>{
"hostname" => "hadoop02"
},
"message" => "hello world",
"type" => "mytype",
"tags" => [
[0] "add field"
],
"key" => "ok",
"@timestamp" => 2022-05-22T11:25:56.419Z,
"@version" => "1"
}

  1. Logstash 从网络读取 TCP 数据
    修改 /opt/bigdata/logstash/current/logstash-tcp.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    input {
    tcp {
    port => "5044"
    }
    }
    filter {
    grok {
    match => { "message" => "%{SYSLOGLINE}" }
    }
    }
    output {
    stdout{
    codec=>rubydebug
    }
    }

启动

1
bin/logstash -f logstash-tcp.conf

测试,另一个终端输入

1
nc 192.168.2.241  5044 < /var/log/secure

结果节选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"event" => {
"original" => "May 22 07:49:28 hadoop01 sshd[3106]: Disconnected from 192.168.2.243 port 41710"
},
"timestamp" => "May 22 07:49:28",
"process" => {
"pid" => 3106,
"name" => "sshd"
},
"@version" => "1",
"host" => {
"hostname" => "hadoop01"
},
"message" => [
[0] "May 22 07:49:28 hadoop01 sshd[3106]: Disconnected from 192.168.2.243 port 41710",
[1] "Disconnected from 192.168.2.243 port 41710"
],
"@timestamp" => 2022-05-23T01:55:22.020103Z
}

编码插件 (Codec)

编码插件(Codec) 可以在 Logstash 中输入或输出时处理不同类型的数据,eg 前文的 rubydebug

Logstash 不只是一个 input → filter → output 的数据流,而是一个 input → decode → filter → encode → output 的数据流。

Codec 支持的编码格式常见的有 plain、json、json_lines 等

简单测试:

  1. plain
    1
    2
    3
    4
    5
    6
    7
    8
    9
    input{
    stdin {
    }
    }
    output{
    stdout {
    codec => "plain"
    }
    }

结果

1
2
hello world # 输入
2022-05-23T02:03:30.140800Z {hostname=hadoop01} hello world # 输入

  1. json
    1
    2
    3
    4
    5
    6
    7
    8
    9
    input {
    stdin {
    }
    }
    output {
    stdout {
    codec => json
    }
    }

结果

1
2
hello world # 输入
{"message":"hello world # 输入","@version":"1","host":{"hostname":"hadoop02"},"event":{"original":"hello world # 输入"},"@timestamp":"2022-05-23T02:04:49.242024Z"}

过滤器插件(Filter)

丰富的过滤器插件是 Logstash 功能强大的重要因素。名为过滤器,其实它提供的不单单是过滤器的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。

需要时可以自行查找相关资料,当前用 这三个 插件处理数据

假设的日志格式

1
192.168.2.241 [16/Jun/2021:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

  1. Grok 正则捕获 : 删除掉 message 字段
  2. 时间处理 (Date)
  3. 数据修改(Mutate)

配置示例,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
input {
stdin {}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" } # %{语法: 语义},以以上格式收集数据
remove_field => [ "message", "event" ] # 删除掉 message 字段
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] # 收集然后转存到 @timestamp 字段里
}
mutate {
rename => { "response" => "response_new" } # 重命名字段
convert => [ "response","float" ] # 将 response 字段类型修改为 float
gsub => ["referrer","\"",""] # 将 referrer 字段中所有 "\" 字符替换为 ""。
remove_field => ["timestamp"] # 删掉 input 接收数据的时间,而不是日志时间
split => ["clientip", "."] # 将 ip 以 . 分为列表
}
}
output {
stdout {
codec => "rubydebug"
}
}

测试,转化后的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"host" => {
"hostname" => "hadoop03"
},
"@timestamp" => 2021-06-16T08:24:19Z,
"referrer" => "GET / HTTP/1.1",
"response_new" => "403",
"clientip" => [
[0] "192",
[1] "168",
[2] "2",
[3] "241"
],
"@version" => "1",
"bytes" => "5039"
}

输出插件

一些常用的输出包括

  1. stout 一般只用来调试
  2. file,表示将日志数据写入磁盘上的文件;
  3. elasticsearch,表示将日志数据发送给 Elasticsearch,它可以高效方便和易于查询的保存数据。
  4. Logstash 还支持输出到 Nagios、HDFS、Email(发送邮件)和 Exec(调用命令执行)。
  1. 输出到 stout
    1
    2
    3
    4
    5
    6
    7
    8
    9
    input{
    stdin {
    }
    }
    output {
    stdout {
    codec => rubydebug
    }
    }

上面基本都用的这个模式

  1. 输出到文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    input{
    stdin {
    }
    }
    output {
    file {
    path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
    }
    }

启动后输入 hello world

查看

1
2
$ cat /data/log/2022-05-23/\{\"hostname\"\:\"hadoop03\"\}_02.log
{"host":{"hostname":"hadoop03"},"message":"hello world","@version":"1","@timestamp":"2022-05-23T02:20:57.849569Z","event":{"original":"hello world"}}

  1. 输出到 elasticsearch

参考 elasticsearch