命令位置
windows 系统命令在 kafka目录\bin\windows
目录下,后缀是 .bat
linux 系统命令在 kafka目录\bin
目录下,后缀是 .sh
查看topic列表
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list
解释:
--zookeeper localhost:2181
:指定zookeeper(kafka需要zookeeper)--list
:列出所有topic
创建topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 3 --topic userlog
解释:
--topic
定义 topic 名--replication-factor
定义副本数,不能大于broker数量--partitions
定义分区数
执行结果:
Created topic userlog.
注意: kafka topic命名规则
再次执行查看topic列表:
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list
执行结果:
userlog
查看logs文件夹
打开 kafka目录\logs
目录,发现有 userlog-0
文件夹,其中 0
表示 Partition分区号
查看zookeeper节点
启动zookeeper客户端,执行下面命令:
ls /brokers/topics
发现节点下有 userlog/0
说明 topic 保存在zookeeper中
结论
说明创建 topic 后,在
kafka目录\logs
目录下会生成 文件夹,名字为:topic名-分区号
zookeeper的
/brokers/topics
会增加topic名/分区号
的节点
查看topic详情
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic userlog
执行结果:
Topic: userlog2 PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: userlog2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: userlog2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: userlog2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
第一行是概览信息:
PartitionCount: 1
分区总数ReplicationFactor: 1
副本数
下面共有3行,表示有一个broker:
Partition: 0
表示编号是0的 Partition,在broker.id=0
(在server.properties中配置)这个节点上Leader: 0
表示编号是0的 Leader,在broker.id=0
(在server.properties中配置)这个节点上Replicas: 0
表示编号是0的 Replicas,在broker.id=0
(在server.properties中配置)这个节点上Isr: 0
副本都已同步的的节点集合
删除topic
执行删除命令
注意: windows下会报错
在 server.properties
配置文件增加下面配置:
auto.create.topics.enable=false
delete.topic.enable=true
执行下面命令
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic userlog
执行结果如下:
Topic userlog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
导致 kafka服务报错退出,且无法启动
需要在zookeeper中删除该主题节点
亲测可行
启动zookeeper客户端,删除
userlog
节点deleteall /brokers/topics/userlog
关闭kafka服务
删除
kafka目录\logs\userlog-0
目录启动kafka服务
消费者命令
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic userlog
解释:
--bootstrap-server localhost:9092
:指定kafka服务的ip、端口号--topic userlog
:指定topic
生产者命令
需要先启动消费者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic userlog
解释:
--broker-list localhost:9092
:指定kafka的ip、端口号--topic userlog
:指定topic
执行结果:
>
继续输入:
>test
>aaa
>bbb
>ccc
消费者会显示:
test
aaa
bbb
ccc
后启动消费者,接收之前的消息
再次启动消费者:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic userlog
将不会收到之前的消息
接收之前的消息,需要加参数 --from-beginning
,如下:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic userlog --from-beginning
这样启动才会接收之前的消息
__consumer_offsets文件
启动消费者后,在 kafka目录\logs
目录下 出现 __consumer_offsets-0
到 __consumer_offsets-49
共50个文件夹
__consumer_offsets
是 kafka 自行创建的,和普通的 topic 相同。- 在 zookeeper 的
/brokers/topics
节点下 - 这些文件大小约1G
每次消费者消费完一批数据后,需要标记这批数据被消费掉,需要将消费的偏移量即offset提交,也就是将 offset 信息写入 __consumer_offsets