环境信息
使用的 hadoop 完全分布式集群
1 | 192.168.2.241 hadoop01 |
简介
Logstash 实现的功能主要分为接收数据、解析过滤并转换数据、输出数据三个部分。这三个部分对应的插件依次是
- input 插件,必选
- filter 插件,可选
- output 插件,必选
配置文件实例
1 | input { |
logstash 安装
官网 https://www.elastic.co/downloads/logstash
所有节点 root 用户1
2
3
4
5
6wget https://artifacts.elastic.co/downloads/logstash/logstash-8.2.0-linux-x86_64.tar.gz
mkdir -p /opt/bigdata/logstash
tar -zxf logstash-8.2.0-linux-x86_64.tar.gz -C /opt/bigdata/logstash
cd /opt/bigdata/logstash/
ln -s logstash-8.2.0 current
简单测试
1 | $ cd /opt/bigdata/logstash/current |
- -e : 执行的意思;
- input : 选择 stdin
- output : 选择 stdout, codec 是个插件, 表示输出的格式;rubydebug 是专门用来做测试的格式,一般用来在终端输出 JSON 格式。
也可以写入到配置文件,上述案例
/opt/bigdata/logstash/current/logstash-simple.conf1
2
3
4
5
6input {
stdin { }
}
output {
stdout { codec => rubydebug }
}
启动
1 | $ bin/logstash -f logstash-simple.conf |
input 插件
配置示例
Logstash 从文件读取数据
1
2
3
4
5
6
7
8
9
10
11
12input {
file {
path => ["/var/log/secure"]
type => "system"
start_position => "beginning"
}
}
output {
stdout{
codec=>rubydebug
}
}从标准输入读取数据
1
2
3
4
5
6
7
8
9
10
11
12input{
stdin{
add_field=>{"key"=>"ok"}
tags=>["add field"]
type=>"mytype"
}
}
output {
stdout{
codec=>rubydebug
}
}
输入 hello world
结果1
2
3
4
5
6
7
8
9
10
11
12
13{
"host" =>{
"hostname" => "hadoop02"
},
"message" => "hello world",
"type" => "mytype",
"tags" => [
[0] "add field"
],
"key" => "ok",
"@timestamp" => 2022-05-22T11:25:56.419Z,
"@version" => "1"
}
- Logstash 从网络读取 TCP 数据
修改 /opt/bigdata/logstash/current/logstash-tcp.conf1
2
3
4
5
6
7
8
9
10
11
12
13
14
15input {
tcp {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGLINE}" }
}
}
output {
stdout{
codec=>rubydebug
}
}
启动1
bin/logstash -f logstash-tcp.conf
测试,另一个终端输入1
nc 192.168.2.241 5044 < /var/log/secure
结果节选1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19{
"event" => {
"original" => "May 22 07:49:28 hadoop01 sshd[3106]: Disconnected from 192.168.2.243 port 41710"
},
"timestamp" => "May 22 07:49:28",
"process" => {
"pid" => 3106,
"name" => "sshd"
},
"@version" => "1",
"host" => {
"hostname" => "hadoop01"
},
"message" => [
[0] "May 22 07:49:28 hadoop01 sshd[3106]: Disconnected from 192.168.2.243 port 41710",
[1] "Disconnected from 192.168.2.243 port 41710"
],
"@timestamp" => 2022-05-23T01:55:22.020103Z
}
编码插件 (Codec)
编码插件(Codec) 可以在 Logstash 中输入或输出时处理不同类型的数据,eg 前文的 rubydebug
Logstash 不只是一个 input → filter → output 的数据流,而是一个 input → decode → filter → encode → output 的数据流。
Codec 支持的编码格式常见的有 plain、json、json_lines 等
简单测试:
- plain
1
2
3
4
5
6
7
8
9input{
stdin {
}
}
output{
stdout {
codec => "plain"
}
}
结果1
2hello world # 输入
2022-05-23T02:03:30.140800Z {hostname=hadoop01} hello world # 输入
- json
1
2
3
4
5
6
7
8
9input {
stdin {
}
}
output {
stdout {
codec => json
}
}
结果1
2hello world # 输入
{"message":"hello world # 输入","@version":"1","host":{"hostname":"hadoop02"},"event":{"original":"hello world # 输入"},"@timestamp":"2022-05-23T02:04:49.242024Z"}
过滤器插件(Filter)
丰富的过滤器插件是 Logstash 功能强大的重要因素。名为过滤器,其实它提供的不单单是过滤器的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。
需要时可以自行查找相关资料,当前用 这三个 插件处理数据
假设的日志格式1
192.168.2.241 [16/Jun/2021:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
- Grok 正则捕获 : 删除掉 message 字段
- 时间处理 (Date)
- 数据修改(Mutate)
配置示例,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24input {
stdin {}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" } # %{语法: 语义},以以上格式收集数据
remove_field => [ "message", "event" ] # 删除掉 message 字段
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] # 收集然后转存到 @timestamp 字段里
}
mutate {
rename => { "response" => "response_new" } # 重命名字段
convert => [ "response","float" ] # 将 response 字段类型修改为 float
gsub => ["referrer","\"",""] # 将 referrer 字段中所有 "\" 字符替换为 ""。
remove_field => ["timestamp"] # 删掉 input 接收数据的时间,而不是日志时间
split => ["clientip", "."] # 将 ip 以 . 分为列表
}
}
output {
stdout {
codec => "rubydebug"
}
}
测试,转化后的日志
1 | { |
输出插件
一些常用的输出包括
- stout 一般只用来调试
- file,表示将日志数据写入磁盘上的文件;
- elasticsearch,表示将日志数据发送给 Elasticsearch,它可以高效方便和易于查询的保存数据。
- Logstash 还支持输出到 Nagios、HDFS、Email(发送邮件)和 Exec(调用命令执行)。
- 输出到 stout
1
2
3
4
5
6
7
8
9input{
stdin {
}
}
output {
stdout {
codec => rubydebug
}
}
上面基本都用的这个模式
- 输出到文件
1
2
3
4
5
6
7
8
9input{
stdin {
}
}
output {
file {
path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
}
}
启动后输入 hello world
查看1
2$ cat /data/log/2022-05-23/\{\"hostname\"\:\"hadoop03\"\}_02.log
{"host":{"hostname":"hadoop03"},"message":"hello world","@version":"1","@timestamp":"2022-05-23T02:20:57.849569Z","event":{"original":"hello world"}}
- 输出到 elasticsearch
参考 elasticsearch