kafka教程-topic命令操作(windows)

命令位置

windows 系统命令在 kafka目录\bin\windows 目录下,后缀是 .bat
linux 系统命令在 kafka目录\bin 目录下,后缀是 .sh

查看topic列表

  1. .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list

解释:

  • --zookeeper localhost:2181:指定zookeeper(kafka需要zookeeper)
  • --list :列出所有topic

创建topic

  1. .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 3 --topic userlog

解释:

  • --topic 定义 topic 名
  • --replication-factor 定义副本数,不能大于broker数量
  • --partitions 定义分区数

执行结果:

  1. Created topic userlog.

注意: kafka topic命名规则

再次执行查看topic列表:

  1. .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list

执行结果:

  1. userlog

查看logs文件夹

打开 kafka目录\logs 目录,发现有 userlog-0 文件夹,其中 0 表示 Partition分区号

查看zookeeper节点

启动zookeeper客户端,执行下面命令:

  1. ls /brokers/topics

发现节点下有 userlog/0

说明 topic 保存在zookeeper中

结论

  • 说明创建 topic 后,在kafka目录\logs 目录下会生成 文件夹,名字为:topic名-分区号

  • zookeeper的/brokers/topics 会增加 topic名/分区号 的节点

查看topic详情

  1. .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic userlog

执行结果:

  1. Topic: userlog2 PartitionCount: 3 ReplicationFactor: 1 Configs:
  2. Topic: userlog2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  3. Topic: userlog2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
  4. Topic: userlog2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0

第一行是概览信息:

  • PartitionCount: 1分区总数
  • ReplicationFactor: 1 副本数

下面共有3行,表示有一个broker:

  • Partition: 0 表示编号是0的 Partition,在 broker.id=0(在server.properties中配置)这个节点上
  • Leader: 0 表示编号是0的 Leader,在 broker.id=0(在server.properties中配置)这个节点上
  • Replicas: 0 表示编号是0的 Replicas,在 broker.id=0(在server.properties中配置)这个节点上
  • Isr: 0 副本都已同步的的节点集合

删除topic

执行删除命令

注意: windows下会报错

server.properties 配置文件增加下面配置:

  1. auto.create.topics.enable=false
  2. delete.topic.enable=true

执行下面命令

  1. .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic userlog

执行结果如下:

  1. Topic userlog is marked for deletion.
  2. Note: This will have no impact if delete.topic.enable is not set to true.

导致 kafka服务报错退出,且无法启动

需要在zookeeper中删除该主题节点

亲测可行

  1. 启动zookeeper客户端,删除 userlog 节点

    1. deleteall /brokers/topics/userlog
  2. 关闭kafka服务

  3. 删除 kafka目录\logs\userlog-0 目录

  4. 启动kafka服务

消费者命令

  1. .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic userlog

解释:

  • --bootstrap-server localhost:9092:指定kafka服务的ip、端口号
  • --topic userlog:指定topic

生产者命令

需要先启动消费者

  1. .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic userlog

解释:

  • --broker-list localhost:9092:指定kafka的ip、端口号
  • --topic userlog:指定topic

执行结果:

  1. >

继续输入:

  1. >test
  2. >aaa
  3. >bbb
  4. >ccc

消费者会显示:

  1. test
  2. aaa
  3. bbb
  4. ccc

后启动消费者,接收之前的消息

再次启动消费者:

  1. .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic userlog

将不会收到之前的消息

接收之前的消息,需要加参数 --from-beginning,如下:

  1. .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic userlog --from-beginning

这样启动才会接收之前的消息

__consumer_offsets文件

启动消费者后,在 kafka目录\logs 目录下 出现 __consumer_offsets-0__consumer_offsets-49 共50个文件夹

  • __consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。
  • 在 zookeeper 的 /brokers/topics 节点下
  • 这些文件大小约1G

每次消费者消费完一批数据后,需要标记这批数据被消费掉,需要将消费的偏移量即offset提交,也就是将 offset 信息写入 __consumer_offsets


原文出处:https://malaoshi.top/show_1IXLCafab5s.html