kafka 简介与部署

环境信息

使用的 hadoop 完全分布式集群, 之前已经安装好 zookeeper

1
2
3
192.168.2.241 hadoop01 
192.168.2.242 hadoop02
192.168.2.243 hadoop03

kafka 安装

官网 https://kafka.apache.org/downloads

所有机器操作

1
2
3
4
5
6
7
8
9
wget --no-check-certificate https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz

useradd kafka
mkdir -p /opt/bigdata/kafka
tar -zxf kafka_2.13-3.2.0.tgz -C /opt/bigdata/kafka
cd /opt/bigdata/kafka/
ln -s kafka_2.13-3.2.0 current

chown -R kafka:kafka /opt/bigdata/kafka/

配置 kafka 集群 (with zookeeper)
/opt/bigdata/kafka/current/config/server.properties

1
2
3
4
5
6
7
8
9
10
# 每个节点不同 eg 1, 2, 3 
broker.id=1
listeners=PLAINTEXT://hadoop01:9092 # hadoop02:9092 ,hadoop03:9092 每个节点不同
log.dirs=/opt/bigdata/kafka/current/logs
num.partitions=6
log.retention.hours=60
log.segment.bytes=1073741824
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
auto.create.topics.enable=true
delete.topic.enable=true

依次启动

1
2
3
4
$ cd /opt/bigdata/kafka/current
$ nohup bin/kafka-server-start.sh config/server.properties &
$ jps
21840 Kafka

配置 kafka 集群 (without zookeeper) (两选一)
/opt/bigdata/kafka/current/config/kraft/server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
process.roles=broker,controller
# 每个节点不同 eg 1, 2, 3
node.id=1
controller.quorum.voters=1@hadoop01:19091,2@hadoop02:19091,3@hadoop03:19091
listeners=PLAINTEXT://:9092,CONTROLLER://:19091
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/bigdata/kafka/current/logs/kraft-combined-logs
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=3000

启动 kafka

1
2
3
4
5
6
7
8
$ cd /opt/bigdata/kafka/current
$ ./bin/kafka-storage.sh random-uuid # 生成集群 ID
xtzWWN4bTjitpL3kfd9s5g
$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c ./config/kraft/server.properties # 格式化存储目录 所有节点执行

$ nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties & # 启动 kafka 所有节点执行
$ jps
55535 Kafka

验证

创建拥有 3个副本,3 个分区的 topic testtopic

1
2
3
$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --create -replication-factor 3 --partitions 3 --topic testtopic

Created topic testtopic.

显示 topic

1
2
$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --list
testtopic

查看 topic testtopic 详细信息

1
2
3
4
5
6
$  ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --describe --topic testtopic

Topic: testtopic TopicId: 1o25WxrxTtiswG0nkNf6gw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: testtopic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: testtopic Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: testtopic Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

生成消息

1
2
3
4
5
6
./kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testtopic

### 暂时不输入,等消费信息启动后输入
>hello world
>test kafka
>end kafka

消费信息

1
2
3
4
5
./kafka-console-consumer.sh  --bootstrap-server  hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testtopic

hello world
test kafka
end

删除信息

1
2
3
4
$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --delete --topic testtopic
$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --list

__consumer_offsets

kafka 集群之间 同步数据

环境信息

新建 kafka 集群,以及一个 客户端(最好独立运行,也可以放在新建的kafka集群中,当前独立一个节点运行)

所有节点

1
2
3
4
5
6
7
8
9
10
$ cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.171 kafka01
192.168.2.173 kafka03
192.168.2.174 kafka04
192.168.2.241 hadoop01
192.168.2.242 hadoop02
192.168.2.243 hadoop03
192.168.2.86 kafkaclient

其中 kafka01,03,04 这三台为备用集群,hadoop01,02,03 为主要集群, 均已启动

MirrorMaker 的配置

  1. /opt/bigdata/kafka/current/config/consumer.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
    group.id=hadoop
    enable.auto.commit=false
    request.timeout.ms=180000
    heartbeat.interval.ms=1000
    session.timeout.ms=120000
    max.poll.interval.ms=600000
    max.poll.records=120000
    auto.offset.reset=earliest
  2. /opt/bigdata/kafka/current/config/producer.properties

    1
    2
    3
    4
    5
    6
    7
    bootstrap.servers=kafka01:9092,kafka03:9092,kafka04:9092
    acks=all
    batch.size=16348
    linger.ms=1
    max.block.ms=9223372036854775807
    compression.type=gzip
    request.timeout.ms=90000

测试

  1. 启动 MirrorMaker 服务

kafkaclient

1
2
cd /opt/bigdata/kafka/current
nohup bin/kafka-mirror-maker.sh --consumer.config ./config/consumer.properties --num.streams 16 --producer.config ./config/producer.properties --whitelist="mglogs*" &

-whitelist:设置要同步的 Topic

  1. 验证
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    [kafka@host86 bin]$ ./kafka-consumer-groups.sh  --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --describe --group hadoop

    GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
    hadoop mglogs 5 - 0 - hadoop-13-8744d13f-d252-459a-b1fb-223f5007d41a /192.168.2.86 hadoop-13
    hadoop mglogs 1 1 1 0 hadoop-1-b2ff1495-7c4c-4a49-b4a8-6a36b2873fa2 /192.168.2.86 hadoop-1
    hadoop mglogs 3 - 0 - hadoop-11-76351435-b37f-4596-81ed-e8ce1a730715 /192.168.2.86 hadoop-11
    hadoop mglogs 0 2 2 0 hadoop-0-c70aa0b9-a2f1-417c-a4c6-dfbd621752f0 /192.168.2.86 hadoop-0
    hadoop mglogs 2 - 0 - hadoop-10-7128d38d-1d31-45d9-9845-acc4831d2384 /192.168.2.86 hadoop-10
    hadoop mglogs 4 - 0 - hadoop-12-11bac4a8-be82-4c1b-be3a-8e0231008c9c /192.168.2.86 hadoop-12
    [kafka@host86 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic mglogs --from-beginning
    May 22 21:51:49 Installed: 2:nmap-ncat-6.40-19.el7.x86_64
    May 22 21:51:49 Installed: 14:libpcap-1.5.3-13.el7_9.x86_64
    hello world
    ^CProcessed a total of 3 messages
    [kafka@host86 bin]$ ./kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka03:9092,kafka04:9092 --topic mglogs --from-beginning
    hello world
    May 22 21:51:49 Installed: 14:libpcap-1.5.3-13.el7_9.x86_64
    May 22 21:51:49 Installed: 2:nmap-ncat-6.40-19.el7.x86_64
    ^[c^CProcessed a total of 3 messages
    [kafka@host86 bin]$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --list
    __consumer_offsets
    mglogs
    my_test
    test_topic

    [kafka@host86 bin]$ ./kafka-topics.sh --bootstrap-server kafka01:9092,kafka03:9092,kafka04:9092 --list
    __consumer_offsets
    mglogs
    [kafka@host86 bin]$ # 只同步了 mglogs

备注
数据收集采用的 filebeat 输出到 kafka, 具体可参考 filebeat 一章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
fields:
log_topic: mglogs
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
name: "appserver1"
output.kafka:
enabled: true
hosts: ["hadoop01:9092", "hadoop02:9092", "hadoop03:9092"]
version: "0.10"
topic: '%{[fields][log_topic]}'
codec.format.string: '%{[message]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
processors:
- drop_fields:
fields: ["input", "host", "agent.type", "agent.ephemeral_id", "agent.id", "agent.version", "ecs"]
logging.level: info