kafka 吞吐量为什么这么大?

batch 发送,batch 存储(可压缩, FileChannel 顺序写盘),batch 拉取(sendFile)

查看 log:

kafka-run-class.bat kafka.tools.DumpLogSegments 
--files D:/tmp/kafka-11-logs/zhang-0/00000000000000000000.log 
-verify-index-only

执行结果:

[2020-05-01 17:42:35,655] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
Dumping D:\tmp\kafka-11-logs\zhang-0\00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 29 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 CreateTime: 1588321516701 isvalid: true size: 331 magic: 2 compresscodec: LZ4 crc: 2597043246
baseOffset: 30 lastOffset: 49 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 331 CreateTime: 1588321516732 isvalid: true size: 260 magic: 2 compresscodec: LZ4 crc: 2529901070
baseOffset: 50 lastOffset: 88 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 591 CreateTime: 1588322691852 isvalid: true size: 402 magic: 2 compresscodec: LZ4 crc: 1161470610
baseOffset: 89 lastOffset: 99 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 993 CreateTime: 1588322691852 isvalid: true size: 190 magic: 2 compresscodec: LZ4 crc: 4233930008

查看 index

kafka-run-class.bat kafka.tools.DumpLogSegments 
--files D:/tmp/kafka-11-logs/zhang-0/00000000000000000000.index

[2020-05-01 18:31:04,806] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
Dumping D:\tmp\kafka-11-logs\zhang-0\00000000000000000000.index
offset: 885 position: 8243
offset: 1388 position: 12521
offset: 1872 position: 17070
offset: 2552 position: 22983
offset: 3236 position: 28925
offset: 3920 position: 34857
offset: 4604 position: 40783
offset: 5288 position: 46722

查看 timeindex

kafka-run-class.bat kafka.tools.DumpLogSegments 
--files D:/tmp/kafka-11-logs/zhang-0/00000000000000000000.timeindex

[2020-05-01 18:33:14,915] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
Dumping D:\tmp\kafka-11-logs\zhang-0\00000000000000000000.timeindex
timestamp: 1588327438584 offset: 263
timestamp: 1588328919650 offset: 1387
timestamp: 1588328919666 offset: 1505
timestamp: 1588328919728 offset: 1871
timestamp: 1588328919759 offset: 2893
timestamp: 1588328919775 offset: 3235
timestamp: 1588328919791 offset: 3919
timestamp: 1588328919806 offset: 4603
timestamp: 1588328919822 offset: 5287

重置消费组 zhang-what 下,topic 分区 0 的 offset 为 10

kafka-consumer-groups.bat --bootstrap-server localhost:9092 --reset-offsets --execute --group zhang-what --topic zhang:0 --to-offset 10

消费者按 offset=10 拉取消息时,broker 把 [0, 29] 的 batch 返回给消费者,消费者在本地进行过滤

org.apache.kafka.clients.consumer.internals.Fetcher.PartitionRecords#nextFetchedRecord

相关推荐