kafka基础命令及api使用

一、Kafka 0.11

参考文档
(1)https://kafka.apache.org/0110/documentation.html

二、kafka 0.8
1、命令行操作
(1)新建topic

> bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic msg_format_v0

(2)发送消息

bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic msg_format_v0

2、API使用
(1)pom依赖

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.8.2.1</version>
    </dependency>

(2)生产者api使用

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class TestProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        //topic: 目标topic;  key: message的序号;   value: 写入的message信息;
        producer.send(new ProducerRecord<>("msg_format_v0", "key", "value"));

        //当不需要指定key值时,采用下面的方法
        //Producer<Object, String> producer2 = new KafkaProducer<>(props);
        //producer2.send(new ProducerRecord<>("msg_format_v1", "value"));

        producer.close();

    }
}

参考文档
(1)https://kafka.apache.org/082/documentation.html#producerapi

相关推荐