kafka学习总结004 --- 生产者ISR

前言

要了解生产者ISR,需要提前了解下生产者重试机制和应答机制

生产者重试机制

创建生产者时,可以指定retries参数,如果向broker发送消息时抛出异常,并且异常是可重试异常RetriableException,那么此时就会按照指定的次数进行重试

1、哪些情况下可以重试

(1)没有到delivery超时时间

(2)剩余重试次数大于0

(3)异常类型为RetriableException或者使用事务管理器时允许重试

private boolean canRetry(ProducerBatch batch, PartitionResponse response, long now) {
        boolean var10000;
        label29: {
            if (!batch.hasReachedDeliveryTimeout(this.accumulator.getDeliveryTimeoutMs(), now) && batch.attempts() < this.retries && !batch.isDone()) {
                if (this.transactionManager == null) {
                    if (response.error.exception() instanceof RetriableException) {
                        break label29;
                    }
                } else if (this.transactionManager.canRetry(response, batch)) {
                    break label29;
                }
            }

            var10000 = false;
            return var10000;
        }

        var10000 = true;
        return var10000;
    }

2、KafkaProducer的send方法遇到的问题

kafka学习总结004 --- 生产者ISR

如图,如果不添加关闭producer的方法,那么这时消息没有写到任何分区中(只针对本代码,其他的使用场景可能有延迟处理能够保证消息被写到分区),当时很懵,这是为啥呢?

看了下kafka生产消息的源码, 才恍然大悟,总结如下:

(1)KafkaProducer的send方法并不会直接把消息发给broker,而是将消息发到内存中(Accumulator)

(2)后台存在一个IOThread(创建生产者的时候后台创建的),会一直扫描这块内存,也就是这个线程真正的把消息发给broker

(3)send方法是异步的

(4)close方法会一直阻塞,直到所有的发送请求完成;并且官网有说明:生产者创建后要关闭,避免资源泄露、数据丢失

到这就比较明朗了,send方法一定是异步的不要怀疑某种场景下是同步的(这是因为我在debug的时候,发现相同的代码,消息居然写到分区中了。。,所以怀疑debug时是同步发送的消息);

我遇到这个问题的根因是:主线程退出后,IOThread还没有来得及把消息发给broker,因此导致了数据丢失

应答机制

为了保证producer发送的数据能够可靠的写入分区(前面的重试机制是一种可靠机制),topic的每个分区在收到数据时需要回复ack给生产者,而且回复ack的时机很大程度上决定了kafka的可靠性;

创建生产者时可以通过设定acks参数决定回复ack的时机,有如下几个时机:

1、ack为0

broker收到消息时,就回复ack,而不管消息是否写入磁盘;这种场景会大概率丢失数据

2、ack为1

broker收到消息,并且落盘到Leader分区,回复ack;这种场景在Leader故障时会造成数据丢失

3、ack为-1或者all

broker收到消息,消息落盘的Leader分区,并且所有的follower同步Leader数据完成,回复ack;这种场景会大概率保证数据一致性;但是存在下面的问题:

如果在follower同步Leader数据时,有一个follower因为某种原因一直不能同步完成,导致一直不能回复ack给producer,这又该如何处理呢?由此引出了生产者ISR

生产者ISR

kafka的数据时多副本的,每个topic的分区下都有一个Leader和多个follower,那么kafka怎么保证每个副本的数据一致性呢?
ISR是一个副本的列表,列表中存储的是和Leader数据一致的副本(当然包括Leader),怎么确定一个副本是否在ISR中?

有两个条件:

(1)副本和Leader交互的时间差大于指定数值,就将此副本从ISR剔除;时间差由rerplica.lag.time.max.ms参数指定

(2)副本和Leader数据条数差值大于指定值,将此副本从ISR剔除;该差值由rerplica.lag.max.messages参数指定

最重要的:ISR的副本添加or删除是通过周期调度管理

回到上一节说道的问题,有了ISR,只要ISR中的副本全部同步Leader完成(除了Leader),就会回复producer ack,很大程度上降低了无法回复ack的风险

也可以使用min.insync.replicas参数指定有多少个副本同步完成Leader就可以回复ack

相关推荐