kafka常用命令

kafka常用命令

kafka

查看topic

1
kafka-topics.sh  --bootstrap-server xxx:9092 --list

创建topic

1
kafka-topics.sh --create  --bootstrap-server xxx:9092 --replication-factor 3 --partitions 3 --topic <topic-name>

查看指定topic

1
kafka-topics.sh --bootstrap-server xxx:9092 --describe --topic <topic-name>

删除指定topic

1
kafka-topics.sh --bootstrap-server xxx:9092 --delete --topic <topic-name>

扩展topic的partition数量

1
kafka-topics.sh --bootstrap-server xxx:9092 --topic <topic-name> --alter --partitions 30

修改topic的过期时间

1
./kafka-configs.sh --zookeeper xxx:2181 --alter --entity-name <topic-name> --entity-type topics --add-config retention.ms=8640000

扩展topic每个partition的副本数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
vim xxx.json
{"version":1,
"partitions":[
{"topic":"<topic-name>","partition":0,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":1,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":2,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":3,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":4,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":5,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":6,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":7,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":8,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":9,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":10,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":11,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":12,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":13,"replicas":[0,1,2]},
{"topic":"<topic-name>","partition":14,"replicas":[0,1,2]}
]}
kafka-reassign-partitions.sh --zookeeper xxx:2181 --reassignment-json-file xxx.json --execute

查看topic数据大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kafka-log-dirs.sh \
--bootstrap-server xx:9092 \
--topic-list <topic-name> \
--describe \
| grep -oP '(?<=size":)\d+' \
| awk '{ sum += $1 } END { print sum }'

#返回结果,单位 Byte
648

#方法二,需要安装 jq
kafka-log-dirs.sh \
--bootstrap-server xxx:9092 \
--topic-list <topic-name> \
--describe \
| grep '^{' \
| jq '[ ..|.size? | numbers ] | add'

#返回结果,单位 Byte
648

列出所有的consumer group

1
kafka-consumer-groups.sh --bootstrap-server xxx:9092 --list

查看指定consumer group详情

  • GROUP:消费者 group
  • TOPIC:话题 id
  • PARTITION:分区 id
  • CURRENT-OFFSET:当前已消费的条数
  • LOG-END-OFFSET:总条数
  • LAG:未消费的条数
  • CONSUMER-ID:消费者 id
  • HOST:消费者 ip 地址
  • CLIENT-ID:客户端 id
    1
    kafka-consumer-groups.sh --bootstrap-server xxx:9092 --describe --group <topic-name>

删除指定consumer group

1
kafka-consumer-groups.sh --bootstrap-server xxx:9092 --delete --group {消费组}

消费消息

从头消费

1
2
kafka-console-consumer.sh --bootstrap-server xxx:9092 --topic <topic-name> --from-beginning
this is my topic

从尾消费

1
kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic <topic-name>  --offset latest  --partition 0 1 2 

重置消费者offset

1
kafka-consumer-groups.sh --bootstrap-server xxx:9092 --group my-consumer-group --reset-offsets --execute --to-offset 3 --topic transaction-topic-msg

kafka常用命令
https://www.starsfox.com/posts/fcfd194a.html
作者
Flycat
发布于
2023年8月8日
许可协议