filebeat+kafka+logstatsh+elasticsearch+kibana

环境信息

使用的 hadoop 完全分布式集群

1
2
3
192.168.2.241 hadoop01 
192.168.2.242 hadoop02
192.168.2.243 hadoop03

所有组件均已完成安装(参考前文), 以收集 nginx 日志为例

  1. /opt/bigdata/filebeat/current/filebeat-nginx-to-kafka.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/*.log
fields:
log_topic: nginxlogs
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
name: "hadoop01"
setup.kibana:
output.kafka:
enabled: true
hosts: ["hadoop01:9092", "hadoop02:9092", "hadoop03:9092"]
version: "0.10"
topic: '%{[fields][log_topic]}'
codec.format.string: '%{[message]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
processors:
- drop_fields:
fields: ["input", "host", "agent.type", "agent.ephemeral_id", "agent.id", "agent.version", "ecs"]
logging.level: info

启动

1
2
cd /opt/bigdata/filebeat/current/
nohup ./filebeat -e -c filebeat-nginx-to-kafka.yml &

  1. /opt/bigdata/logstash/current/kafka_nginx_into_es.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    input {
    kafka {
    bootstrap_servers => "hadoop01:9092,hadoop02:9092,hadoop03:9092"
    topics => ["nginxlogs"]
    add_field => { "[@metadata][myid]" => "nginxlogs" }
    }
    }
    filter {
    if [@metadata][myid] == "nginxlogs" {
    grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:localtime}\|\~\|%{IPORHOST:clientip}\|\~\|(%{GREEDYDATA:http_user_agent})\|\~\|(%{DATA:http_referer})\|\~\|%{GREEDYDATA:media_id}\|\~\|%{GREEDYDATA:nginx_id}" }
    }
    date {
    match => ["localtime", "yyyy-MM-dd'T'HH:mm:ssZZ"]
    target => "@timestamp"
    }

    mutate {
    remove_field => "@version"
    remove_field => "message"
    remove_field => "localtime"
    }
    }
    }
    output {
    if [@metadata][myid] == "nginxlogs" {
    elasticsearch {
    hosts => ["hadoop01:9200","hadoop02:9200","hadoop03:9200"]
    index => "nginxlogs-%{+YYYY.MM.dd}"
    }
    }
    }

启动

1
2
cd /opt/bigdata/logstash/current/
nohup bin/logstash -f kafka_nginx_into_es.conf &

验证

使用 kibana

创建index1
创建index2
discover