环境信息
使用的 hadoop 完全分布式集群, 之前已经安装好 zookeeper
1 | 192.168.2.241 hadoop01 |
kafka 安装
官网 https://kafka.apache.org/downloads
所有机器操作1
2
3
4
5
6
7
8
9wget --no-check-certificate https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
useradd kafka
mkdir -p /opt/bigdata/kafka
tar -zxf kafka_2.13-3.2.0.tgz -C /opt/bigdata/kafka
cd /opt/bigdata/kafka/
ln -s kafka_2.13-3.2.0 current
chown -R kafka:kafka /opt/bigdata/kafka/
配置 kafka 集群 (with zookeeper)
/opt/bigdata/kafka/current/config/server.properties1
2
3
4
5
6
7
8
9
10# 每个节点不同 eg 1, 2, 3
broker.id=1
listeners=PLAINTEXT://hadoop01:9092 # hadoop02:9092 ,hadoop03:9092 每个节点不同
log.dirs=/opt/bigdata/kafka/current/logs
num.partitions=6
log.retention.hours=60
log.segment.bytes=1073741824
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
auto.create.topics.enable=true
delete.topic.enable=true
依次启动1
2
3
4$ cd /opt/bigdata/kafka/current
$ nohup bin/kafka-server-start.sh config/server.properties &
$ jps
21840 Kafka
配置 kafka 集群 (without zookeeper) (两选一)
/opt/bigdata/kafka/current/config/kraft/server.properties1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22process.roles=broker,controller
# 每个节点不同 eg 1, 2, 3
node.id=1
controller.quorum.voters=1@hadoop01:19091,2@hadoop02:19091,3@hadoop03:19091
listeners=PLAINTEXT://:9092,CONTROLLER://:19091
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/bigdata/kafka/current/logs/kraft-combined-logs
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=3000
启动 kafka1
2
3
4
5
6
7
8$ cd /opt/bigdata/kafka/current
$ ./bin/kafka-storage.sh random-uuid # 生成集群 ID
xtzWWN4bTjitpL3kfd9s5g
$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c ./config/kraft/server.properties # 格式化存储目录 所有节点执行
$ nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties & # 启动 kafka 所有节点执行
$ jps
55535 Kafka
验证
创建拥有 3个副本,3 个分区的 topic testtopic1
2
3$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --create -replication-factor 3 --partitions 3 --topic testtopic
Created topic testtopic.
显示 topic1
2$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --list
testtopic
查看 topic testtopic 详细信息1
2
3
4
5
6$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --describe --topic testtopic
Topic: testtopic TopicId: 1o25WxrxTtiswG0nkNf6gw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: testtopic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: testtopic Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: testtopic Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
生成消息1
2
3
4
5
6./kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testtopic
### 暂时不输入,等消费信息启动后输入
>hello world
>test kafka
>end kafka
消费信息1
2
3
4
5./kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testtopic
hello world
test kafka
end
删除信息1
2
3
4$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --delete --topic testtopic
$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --list
__consumer_offsets
kafka 集群之间 同步数据
环境信息
新建 kafka 集群,以及一个 客户端(最好独立运行,也可以放在新建的kafka集群中,当前独立一个节点运行)
所有节点1
2
3
4
5
6
7
8
9
10$ cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.171 kafka01
192.168.2.173 kafka03
192.168.2.174 kafka04
192.168.2.241 hadoop01
192.168.2.242 hadoop02
192.168.2.243 hadoop03
192.168.2.86 kafkaclient
其中 kafka01,03,04 这三台为备用集群,hadoop01,02,03 为主要集群, 均已启动
MirrorMaker 的配置
/opt/bigdata/kafka/current/config/consumer.properties
1
2
3
4
5
6
7
8
9bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
group.id=hadoop
enable.auto.commit=false
request.timeout.ms=180000
heartbeat.interval.ms=1000
session.timeout.ms=120000
max.poll.interval.ms=600000
max.poll.records=120000
auto.offset.reset=earliest/opt/bigdata/kafka/current/config/producer.properties
1
2
3
4
5
6
7bootstrap.servers=kafka01:9092,kafka03:9092,kafka04:9092
acks=all
batch.size=16348
linger.ms=1
max.block.ms=9223372036854775807
compression.type=gzip
request.timeout.ms=90000
测试
- 启动 MirrorMaker 服务
kafkaclient1
2cd /opt/bigdata/kafka/current
nohup bin/kafka-mirror-maker.sh --consumer.config ./config/consumer.properties --num.streams 16 --producer.config ./config/producer.properties --whitelist="mglogs*" &
-whitelist:设置要同步的 Topic
- 验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29[kafka@host86 bin]$ ./kafka-consumer-groups.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --describe --group hadoop
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hadoop mglogs 5 - 0 - hadoop-13-8744d13f-d252-459a-b1fb-223f5007d41a /192.168.2.86 hadoop-13
hadoop mglogs 1 1 1 0 hadoop-1-b2ff1495-7c4c-4a49-b4a8-6a36b2873fa2 /192.168.2.86 hadoop-1
hadoop mglogs 3 - 0 - hadoop-11-76351435-b37f-4596-81ed-e8ce1a730715 /192.168.2.86 hadoop-11
hadoop mglogs 0 2 2 0 hadoop-0-c70aa0b9-a2f1-417c-a4c6-dfbd621752f0 /192.168.2.86 hadoop-0
hadoop mglogs 2 - 0 - hadoop-10-7128d38d-1d31-45d9-9845-acc4831d2384 /192.168.2.86 hadoop-10
hadoop mglogs 4 - 0 - hadoop-12-11bac4a8-be82-4c1b-be3a-8e0231008c9c /192.168.2.86 hadoop-12
[kafka@host86 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic mglogs --from-beginning
May 22 21:51:49 Installed: 2:nmap-ncat-6.40-19.el7.x86_64
May 22 21:51:49 Installed: 14:libpcap-1.5.3-13.el7_9.x86_64
hello world
^CProcessed a total of 3 messages
[kafka@host86 bin]$ ./kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka03:9092,kafka04:9092 --topic mglogs --from-beginning
hello world
May 22 21:51:49 Installed: 14:libpcap-1.5.3-13.el7_9.x86_64
May 22 21:51:49 Installed: 2:nmap-ncat-6.40-19.el7.x86_64
^[c^CProcessed a total of 3 messages
[kafka@host86 bin]$ ./kafka-topics.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --list
__consumer_offsets
mglogs
my_test
test_topic
[kafka@host86 bin]$ ./kafka-topics.sh --bootstrap-server kafka01:9092,kafka03:9092,kafka04:9092 --list
__consumer_offsets
mglogs
[kafka@host86 bin]$ # 只同步了 mglogs
备注
数据收集采用的 filebeat 输出到 kafka, 具体可参考 filebeat 一章
1 | filebeat.inputs: |