在前一篇文章《大数据学习初级入门教程(三) —— Kafka 2.2.0 单节点的安装、启动和测试》中,简要说明了在一台测试服务器上如何安装单节点 Kafka 并做了消息发送和获取的测试,这篇主要说明一下在一台机器上如何配置多个节点,需要在前一篇文章操作后的基础上,搭建 Kafka 伪分布式集群,这里配置 3 个代理节点。
第一步:创建配置文件
为每个代理节点创建一个配置文件,可以直接复制已有的配置文件,命令如下:
# cp config/server.properties config/server2.properties
# cp config/server.properties config/server3.properties
第二步:编辑配置文件
vi config/server2.properties
修改其中的部分配置,如下:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-2
vi config/server3.properties
修改其中的部分配置,如下:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-3
第三步:启动服务
像启动单节点 Kafka 服务一样,启动各个服务。
# bin/zookeeper-server-start.sh config/zookeeper.properties &
# bin/kafka-server-start.sh config/server.properties &
# bin/kafka-server-start.sh config/server2.properties &
# bin/kafka-server-start.sh config/server3.properties &
第四步:创建一个主题
这次创建一个复制因子为 3 的新主题:
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic test3
第五步:查看创建的主题
查看 Kafka 中有哪些已创建的主题,可以用以下命令:
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
输出结果:
test
test3
第六步:查看集群描述信息
通过上面的步骤,目前已经搭建了一个有 3 个代理节点的伪分布式集群,如何知道代理正在做什么呢?可以通过“describe topics”命令查看运行的集群信息:
# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test3
输出信息如下:
Topic:test3 PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: test3 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
行信息给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于此主题只有一个分区,因此只有一行。
- “leader”是负责给定分区的所有读写的节点。每个节点将成为随机选择的分区部分的领导者。
- “replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
- “isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
第七步:发送消息到服务中
运行生产者,然后键入一些消息,发送到服务器。默认情况下,每行将作为单独的消息发送。
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test3
输入信息如下:
>my test message 1
>
第八步:从服务中获取消息
运行消费者,从服务中获取已有的一些消息。
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning
输出信息:
my test message 1
第九步:测试集群容错性
从第六步中可以看到,第三个代理节点是集群的 Leader,因为第三个代理节点的配置文件中配置的 broker.id=2。这里杀掉该节点的进程:
# ps aux | grep server3.properties
进程信息如下:
root 2651 2.4 20.1 4733556 202656 pts/3 Sl 02:03 0:32 java ...
# kill -9 2651
第十步:查看新 Leader
# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test3
输出信息如下:
Topic:test3 PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: test3 Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
可以看到集群的 Leader 变为第二个代理节点。也可以再次运行第八步中的消费命令,可以看到数据还是可以正常消费的。
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning
输出信息如下:
my test message 1
到此为止,伪分布式 Kafka 集群的安装、配置、启动及测试完成。