Kafka 生产者 自定义序列化
Kafka在生产者中序列化为二进制对象推送给Broker,下面是一个自定义序列化的示例,序列化一个User对象;
首先,引入jackson-mapper-asl
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.12</version>
</dependency>然后定义需要被序列化的实体类:
package cn.org.fubin;
public class User {
private String firstName;
private String lastName;
private int age;
private String address;
public User() {
}
public User(String firstName, String lastName, int age, String address) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
this.address = address;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "User{" +
"firstName=‘" + firstName + ‘\‘‘ +
", lastName=‘" + lastName + ‘\‘‘ +
", age=" + age +
", address=‘" + address + ‘\‘‘ +
‘}‘;
}
}接下来,创建序列化类,实现Kafka客户端提供的Serializer接口:
import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class UserSerializer implements Serializer {
private ObjectMapper objectMapper;
public void configure(Map configs, boolean isKey) {
objectMapper = new ObjectMapper();
}
public byte[] serialize(String topic, Object data) {
byte[] ret = null;
try {
ret = objectMapper.writeValueAsString(data).getBytes("utf-8");
} catch (IOException e) {
System.out.println("序列化失败");
e.printStackTrace();
}
return ret;
}
public void close() {
}
}Kafka默认提供如下实现:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.RetriableException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
*
* 可重试异常
* 1. 分区副本不可用
* 2. Controller当前不可用
* 3. 网络瞬时故障
*
* 可自行恢复,超过重试次数也需要自行处理
*
*
* 不可重试异常
* 1. 发送消息尺寸过大
* 2. 序列化失败异常
* 3. 其他类型异常
*
*
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "cn.org.fubin.UserSerializer");
properties.put("acks", "-1");
System.out.println(ProducerConfig.ACKS_CONFIG);
properties.put("retries", "3");
properties.put("batch.size", 1048576);
properties.put("linger.ms", 10);
properties.put("buffer.memory", "33554432");
System.out.println(ProducerConfig.BUFFER_MEMORY_CONFIG);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
properties.put("max.block.ms", "3000");
String topic = "test-topic";
Producer<String,User> producer = new KafkaProducer<String, User>(properties);
User user = new User("a","b",23,"china");
ProducerRecord<String ,User> record = new ProducerRecord<String, User>(topic,user);
producer.send(record).get();
producer.close();
}
}然后在主类中指定声明好的序列化类,并发送一个User实体:
相关推荐
Lzs 2020-10-23
sweetgirl0 2020-06-28
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
ChaITSimpleLove 2020-10-06
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26