kafka(2): Java实现简单的Producer和Consumer
Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jj</groupId>
<artifactId>ak02</artifactId>
<version>1.0.0</version>
<properties>
<kafka.version>1.1.0</kafka.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>BasicProducer.java
package com.jj.ak02;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class BasicProducer {
public static void main(String[] args) {
// 步驟1. 設定要連線到Kafka集群的相關設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgKey的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgValue的序列化器
// 步驟2. 產生一個Kafka的Producer的實例
Producer<String, String> producer = new KafkaProducer<>(props);
// 步驟3. 指定想要發佈訊息的topic名稱
String topicName = "test";
int msgCounter = 0;
try {
System.out.println("Start sending messages ...");
// 步驟4. 產生要發佈到Kafka的訊息 (把訊息封裝進一個ProducerRecord的實例中)
// - 參數#1: topicName
// - 參數#2: msgKey
// - 參數#3: msgValue
producer.send(new ProducerRecord<>(topicName, null, "Hello"));
producer.send(new ProducerRecord<>(topicName, null, "Hello2"));
producer.send(new ProducerRecord<>(topicName, "8703147", "Hello3"));
producer.send(new ProducerRecord<>(topicName, "8703147", "Hello4"));
msgCounter+=4;
System.out.println("Send " + msgCounter + " messages to Kafka");
} catch (Exception e) {
// 錯誤處理
e.printStackTrace();
}
// 步驟5. 關掉Producer實例的連線
producer.close();
System.out.println("Message sending completed!");
}
}BasicConsumer.java
package com.wistron.witlab4.ak02;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.record.TimestampType;
import java.util.Arrays;
import java.util.Properties;
public class BasicConsumer {
public static void main(String[] args) {
// 步驟1. 設定要連線到Kafka集群的相關設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
props.put("group.id", "my-group"); // <-- 這就是ConsumerGroup
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgKey的反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgValue的反序列化器
props.put("auto.offset.reset", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀
// 步驟2. 產生一個Kafka的Consumer的實例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 步驟3. 指定想要訂閱訊息的topic名稱
String topicName = "test";
// 步驟4. 讓Consumer向Kafka集群訂閱指定的topic
consumer.subscribe(Arrays.asList(topicName));
// 步驟5. 持續的拉取Kafka有進來的訊息
try {
System.out.println("Start listen incoming messages ...");
while (true) {
// 請求Kafka把新的訊息吐出來
ConsumerRecords<String, String> records = consumer.poll(1000);
// 如果有任何新的訊息就會進到下面的迭代
for (ConsumerRecord<String, String> record : records){
// ** 在這裡進行商業邏輯與訊息處理 **
// 取出相關的metadata
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
TimestampType timestampType = record.timestampType();
long timestamp = record.timestamp();
// 取出msgKey與msgValue
String msgKey = record.key();
String msgValue = record.value();
// 秀出metadata與msgKey & msgValue訊息
System.out.println(topic + "-" + partition + "-" + offset + " : (" + record.key() + ", " + record.value() + ")");
}
}
} finally {
// 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線
consumer.close();
System.out.println("Stop listen incoming messages");
}
}
}