Kafka 消费者API

消费者api,自动提交offset

public class MyConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        //连接的集群
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //开启自动提交(消费偏移量)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //自动提交的延迟
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
        //KV的反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc");

        //消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅主题
        kafkaConsumer.subscribe(Collections.singletonList("first"));

        while (true){
            //获取数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //解析数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
            }
        }

    }
}

手动提交offset,同步提交

public class ConsumerOffsetSync {
    public static void main(String[] args) {

        Properties props = new Properties();
        //连接的集群
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //关闭自动提交(消费偏移量)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //KV的反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1");

        //重置offset。
        //earliest:从头开始消费,触发的条件1,换组;条件2:保留的offset指向的数据已经不存在
        //latest:默认值,消费最新的数据。
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅主题
        kafkaConsumer.subscribe(Collections.singletonList("first"));

        while (true){
            //获取数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //解析数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
            }

            //同步提交,当前线程会阻塞直到 offset 提交成功
            kafkaConsumer.commitSync();
        }

    }
}

手动提交offset,异步提交

//异步提交
kafkaConsumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed for" +
                offsets);
    }
});

相关推荐