本篇文章主要介绍 Kafka 2.2.0 版本的单节点安装、启动及测试步骤,供初学者参考。测试系统为 CentOS 6.9,JDK 版本为 1.8.0_172。
第一步:下载安装包并上传到测试服务器
下载地址为官网地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.12-2.2.0.tgz。
下载后上传到测试服务器,这里上传到路径:/root/kafka下。
第二步:解压安装包
# tar -zxvf kafka_2.12-2.2.0.tgz
# cd kafka_2.12-2.2.0
第三步:启动 ZooKeeper 服务
Kafka 使用 ZooKeeper,如果 ZooKeeper 服务未启动,则需要先启动。如果没有 Zookeeper,则可以使用 kafka 安装包中自带的 ZooKeeper,直接使用便捷脚启动单节点的 ZooKeeper 实例即可。
# bin/zookeeper-server-start.sh config/zookeeper.properties
日志如下:
- [2018-10-27 00:44:48,746] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
- [2018-10-27 00:44:48,757] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
- ...
- [2018-10-27 00:44:48,803] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
- [2018-10-27 00:44:48,813] INFO Server environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT (org.apache.zookeeper.server.ZooKeeperServer)
- ...
- [2018-10-27 00:44:48,900] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
- [2018-10-27 00:45:48,349] INFO Accepted socket connection from /0:0:0:0:0:0:0:1:33778 (org.apache.zookeeper.server.NIOServerCnxnFactory)
- ...
第四步:启动 Kafka 服务
ZooKeeper 服务启动后,可以启动 Kafka 服务了。
# bin/kafka-server-start.sh config/server.properties
日志如下:
- [2018-10-27 00:45:47,376] INFO Registered kafkakafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
- [2018-10-27 00:45:48,272] INFO starting (kafka.server.KafkaServer)
- [2018-10-27 00:45:48,273] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
- [2018-10-27 00:45:48,307] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
- ...
- [2018-10-27 00:45:48,391] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
- [2018-10-27 00:45:48,897] INFO Cluster ID = AHzg5svXSpeK7ktdBvgv2g (kafka.server.KafkaServer)
- ...
- [2018-10-27 00:45:49,135] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
- [2018-10-27 00:45:49,135] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
- [2018-10-27 00:45:49,137] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
- ...
- [2018-10-27 00:45:50,591] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
- [2018-10-27 00:45:50,591] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
- [2018-10-27 00:45:50,592] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
- [2018-10-27 00:55:50,374] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
第五步:创建一个主题
创建一个名为“test”的主题,只包含一个分区,只有一个副本,命令如下:
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
第六步:查看创建的主题
查看 Kafka 中有哪些已创建的主题,可以用以下命令:
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
输出结果:
test
第七步:发送消息到服务中
运行生产者,然后键入一些消息,发送到服务器。默认情况下,每行将作为单独的消息发送。
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
分别输入以下信息:
>This is a message
>This is another message
第八步:从服务中获取消息
运行消费者,从服务中获取已有的一些消息。
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
输出结果:
This is a message
This is another message
再启动一个不同的终端作为生产者,用第七步的命令发送消息,则再消费者终端可以实时获取数据。
到此为止,单节点 Kafka 安装、启动及测试完成。