Flink生产数据到Kafka频繁出现事务失效导致任务重启

在生产中需要将一些数据发到kafka,而且需要做到EXACTLY_ONCE,kafka使用的版本为1.1.0,flink的版本为1.8.0,但是会很经常因为提交事务引起错误,甚至导致任务重启

kafka producer的配置如下

def getKafkaProducer(kafkaAddr: String, targetTopicName: String, kafkaProducersPoolSize: Int): FlinkKafkaProducer[String] = {
    val properties = new Properties()
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr)
    properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 6000 * 6 + "")
    // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
    properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(1048576 * 5))
    val serial = new KeyedSerializationSchemaWrapper(new SimpleStringSchema())
    //val producer = new FlinkKafkaProducer011[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), Semantic.EXACTLY_ONCE, kafkaProducersPoolSize)
    val producer = new FlinkKafkaProducer[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize)
    producer.setWriteTimestampToKafka(true)
    producer
  }

Flink env如下

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(60 * 1000 * 1, CheckpointingMode.EXACTLY_ONCE)
    val config = env.getCheckpointConfig
    //RETAIN_ON_CANCELLATION在job canceled的时候会保留externalized checkpoint state
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
    config.setMinPauseBetweenCheckpoints(3000)
    //用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
    config.setMaxConcurrentCheckpoints(1)
    //指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
    config.setCheckpointTimeout(30000)
    //用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
    //https://issues.apache.org/jira/browse/FLINK-11662
    config.setFailOnCheckpointingErrors(false)

然后经常会出现事务失效的问题,报错有很多种,大概为以下

java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:619)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
    at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278)
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:105)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:650)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
Checkpoint failed: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.


Checkpoint failed: Could not complete snapshot 11 for operator Sink: data_Sink (2/2).

这些错误基本涉及到两阶段提交、事务、checkpoint。

查看kafka documentation和研究ProducerConfig这个类后发现 kafka producer 在使用EXACTLY_ONCE的时候需要增加一些配置

the transaction timeout must be larger than the checkpoint interval, but smaller than the broker transaction.max.timeout.ms.

在getKafkaProducer增加以下配置后,不再出现原来的错误

//checkpoint 间隔时间<TRANSACTION_TIMEOUT_CONFIG<kafka transaction.max.timeout.ms (默认900秒)
    properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 3 + "")
    properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

至此,问题解决。

 

参考:

https://www.cnblogs.com/felixzh/p/10184762.html

https://www.cnblogs.com/wangzhuxing/p/10111831.html

http://www.heartthinkdo.com/?p=2040

http://romanmarkunas.com/web/blog/kafka-transactions-in-practice-1-producer/