kafka 偏移量相关接口

kafka 0.9 之前的版本偏移量信息是通过 zookeeper 管理的;为了避免对 zookeeper 的过度依赖,每次从 kafka 上读取 topic 偏移量信息,连接消耗还是比较大的,从 kafka 0.9 开始,kafka 已接管了偏移量信息管理功能,并将各消费组的偏移量写入了 __consumer_offsets 主题(默认50个分区);
api 方式获取某消费组的消费偏移量信息:
通过 ConsumerGroupCommand 作为入口,然后调用 ConsumerGroupService接口,该接口有如下两个实现:

  • KafkaConsumerGroupService: 0.11 之前的api 需要添加参数:--new-consumer,从0.11
    开始,默认先使用的是 kafka 实现;
  • ZkConsumerGroupService:0.11 之前默认实现方式;

该接口如下方法:

  • 查看所有的消费组方法:listGroups
  • 描述消费组消费信息:describeGroup
val listStrZk = "--zookeeper 192.168.xx.xx:2181 --list"
//列出所有的消费组
val listStr = "--bootstrap-server 192.168.xx.xx:9092 --list  --new-consumer"
val listArgs = listStr.split(" ")
val describeStrzk = "--zookeeper 192.168.xx.xx:2181 --describe --group 3"
// 描述消费组信息
val describeStr = "--bootstrap-server 192.168.xx.xx:9092 --describe --group 6 --new-consumer"
val describeArgs = describeStr.split(" ")
val args = Array[String](topic,bootstrap,group,describe)
// ConsumerGroupCommand.main(listArgs)
ConsumerGroupCommand.main(describeArgs)

/* 主题                           分区        消费位移        最高日志位移     消费滞后offset
* TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
   test                           0          2282954         2743178         460224     -                                                 -                              -
   test                           1          1500            860230          858730     -                                                 -                              -
   test                           4          500             860231          859731     -                                                 -                              -
   test                           3          0               860228          860228     -                                                 -                              -
   test                           2          0               860226          860226     -                                                 -                              -
* */
  • LAG:消费滞后
  • LEO:最高日志位移
  • Lag = HW - ConsumerOffset

相关推荐