Fork me on GitHub

Kafka 命令行操作

发送消息

1
2
3
4
5
6
[liucw@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 \
--topic first
>hello world
>hello liucw

消费消息

1
2
3
4
5
6
[liucw@hadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 \
--from-beginning \
--topic first
--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

查看某个Topic的详情

1
2
3
4
[liucw@hadoop102 kafka]$ bin/kafka-topics.sh \
--zookeeper hadoop102:2181 \
--describe \
--topic first

查看.log或者.index文件内容

1
2
3
4
5
6
7
8
bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
--files /tmp/kafka-logs/test3-0/00000000000000000000.log \
--print-data-log
# 验证日志片段00000.log索引文件的正确性
bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
--files /tmp/kafka-logs/test3-0/00000000000000000000.log,/tmp/kafka-logs/test3-0/00000000000000000000.index \
--index-sanity-check

Topic操作

创建topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[liucw@hadoop102 kafka]$ bin/kafka-topics.sh \
--zookeeper hadoop102:2181 \
--create \
--topic first \
--replication-factor 3 \
--partitions 1
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
[liucw@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--create \
--topic second \
--replication-factor 3 \
--partitions 3

增加分区

1
2
3
4
5
6
7
8
[liucw@hadoop102 kafka]$ bin/kafka-topics.sh \
--zookeeper hadoop102:2181 \
--alter \
--topic first \
--partitions 3 # 将分区数量增加到3
这里使用--if-exists参数,主题不存在的错误就会被忽略,不建议使用

减少分区数量

  我们无法减少主题的分区数量。
  因为如果删除了分区,分区里的数据也一并被删除,导致数据不一致,我们也无法将这些数据分配给其他分区,因为这样做很难,而且会出现消息乱序。
  所以,如果一定要减少分区数量,只能删除整个主题,然后重新创建它

删除topic

1
2
3
4
5
6
7
8
[liucw@hadoop102 kafka]$ bin/kafka-topics.sh \
--zookeeper hadoop102:2181 \
--delete \
--topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
删除主题会丢弃主题里的所有数据。这是一个不可逆的操作,所以在执行时要十分小心。

查看当前服务器中的所有topic

1
2
3
[cw@hadoop102 kafka]$ bin/kafka-topics.sh \
--zookeeper hadoop102:2181 \
--list

列出主题详情信息

1
2
3
4
5
6
7
8
9
10
11
12
13
[cw@hadoop102 kafka]$ bin/kafka-topics.sh \
--zookeeper hadoop102:2181 \
--describe
describe命令还提供了一些参数,用于过滤输出结果
--topic:指定特定的主题
下面两个参数可找出问题的分区:
--under-replicated-partitions: 列出所有包含不同步副本的分区
--unavailable-partitions:列出所有没有首领的分区,这些分区已经处于离线状态,对于生产者和消费者来说是必不可用的。
这些参数无法与list命令一起使用

消费者群组

保存消费者群组信息的地方
  对旧版本的消费者来说,它们的信息保存在Zookeeper上;
  对新版本的消费者来说,它们的信息保存在broker上

列出群组

1
2
3
4
5
6
7
8
9
10
11
# 列出旧版本的消费都群组
$ bin/kafka-consumer-groups.sh \
--zookeeper hadoop102:2181 \
--list
# 列出新版本的消费都群组
$ bin/kafka-consumer-groups.sh \
--new-consumer \
--bootstrap-server hadoop102:2181/kafka-cluster \
--list

描述群组

1
2
3
4
5
6
7
8
使用--describe代替--list
--group 指定特定的群组
如,获取旧版本消费者群组信息
$ bin/kafka-consumer-groups.sh \
--zookeeper hadoop102:2181/kafka-cluster \
--describe \
--group testgroup

输出结果中的字段
  group: 消费者群组的名字
  topic: 正在被读取的主题名字
  partition: 正在被读取的分区ID
  current-offset: 消费者群组最近提交的偏移盏, 也就是消费者在分区里读取的当前位置
  log-end-offset: 当前高水位偏移量, 也就是最近一个被读取消息的偏移量,同时也是最近一个被提 交到集群的偏移是
  lag: 消费者的 CURRENT-OFFSET 和 broker 的 LOG-END-OFFSET 之间的 差距
  owner: 消费者群组里正在读取该分区的消费者。这是一个消费者的 田, 不一定包含消费 者的主机名

删除群组

  只有旧版本的消费者客户端才支持删除群组操作。在进行删除操作之前,需要先关闭消费者,或者不要让它们读取即将被删除的主题

-----------------本文结束,感谢您的阅读-----------------