kafka java连接操作

Java连接kafka操作,单机版kafka

代码记录如下

1、maven添加依赖配置:

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.10.2.0</version>

</dependency>

2、Java Code实现:

package com.sam.project.kafka;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.security.JaasUtils;

import kafka.admin.AdminUtils;

import kafka.admin.RackAwareMode;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.consumer.Whitelist;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

import kafka.server.ConfigType;

import kafka.utils.ZkUtils;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import kafka.serializer.StringEncoder;

/**

* Test kafka

*/

public class App {

private final static String URL = "192.168.1.114:2181";

private final static String NAME = "test_topic";

// 创建主题

private static void createTopic() {

ZkUtils zkUtils = ZkUtils.apply(URL, 30000, 30000, JaasUtils.isZkSecurityEnabled());

// 创建一个单分区单副本名为t1的topic

AdminUtils.createTopic(zkUtils, NAME, 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);

zkUtils.close();

System.out.println("创建成功!");

}

// 删除主题(未彻底删除)

private static void deleteTopic() {

ZkUtils zkUtils = ZkUtils.apply(URL, 30000, 30000, JaasUtils.isZkSecurityEnabled());

// 删除topic 't1'

AdminUtils.deleteTopic(zkUtils, NAME);

zkUtils.close();

System.out.println("删除成功!");

}

// 修改主题

private static void editTopic() {

ZkUtils zkUtils = ZkUtils.apply(URL, 30000, 30000, JaasUtils.isZkSecurityEnabled());

Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), NAME);

// 增加topic级别属性

props.put("min.cleanable.dirty.ratio", "0.3");

// 删除topic级别属性

props.remove("max.message.bytes");

// 修改topic 'test'的属性

AdminUtils.changeTopicConfig(zkUtils, NAME, props);

zkUtils.close();

}

// 主题读取

private static void queryTopic() {

ZkUtils zkUtils = ZkUtils.apply(URL, 30000, 30000, JaasUtils.isZkSecurityEnabled());

// 获取topic 'test'的topic属性属性

Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), NAME);

// 查询topic-level属性

Iterator it = props.entrySet().iterator();

while (it.hasNext()) {

Map.Entry entry = (Map.Entry) it.next();

Object key = entry.getKey();

Object value = entry.getValue();

System.out.println(key + " = " + value);

}

zkUtils.close();

}

/**

* @Description: 生产者

*/

private static void producer() {

Properties properties = new Properties();

properties.put("zookeeper.connect", "192.168.1.114:2181");//声明zk

properties.put("serializer.class", StringEncoder.class.getName());

properties.put("metadata.broker.list", "192.168.1.114:9092");// 声明kafka broker

Producer producer = new Producer<Integer, String>(new ProducerConfig(properties));

int i=0;

while(true){

producer.send(new KeyedMessage<Integer, String>(NAME, "message: " + i++));

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

/**

* @Description: 消费者

*/

private static void customer() {

try {

Properties properties = new Properties();

properties.put("zookeeper.connect", URL);

properties.put("auto.commit.enable", "true");

properties.put("auto.commit.interval.ms", "60000");

properties.put("group.id", "test_topic");

ConsumerConfig consumerConfig = new ConsumerConfig(properties);

ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

// topic的过滤器

Whitelist whitelist = new Whitelist("test_topic");

List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector

.createMessageStreamsByFilter(whitelist);

if (partitions == null) {

System.out.println("empty!");

TimeUnit.SECONDS.sleep(1);

}

System.out.println("partitions:"+partitions.size());

// 消费消息

for (KafkaStream<byte[], byte[]> partition : partitions) {

ConsumerIterator<byte[], byte[]> iterator = partition.iterator();

while (iterator.hasNext()) {

MessageAndMetadata<byte[], byte[]> next = iterator.next();

System.out.println("partiton:" + next.partition());

System.out.println("offset:" + next.offset());

System.out.println("接收到message:" + new String(next.message(), "utf-8"));

}

}

} catch (Exception e) {

// TODO: handle exception

}

}

public static void main(String[] args) {

createTopic();

// deleteTopic();

// editTopic();

// queryTopic();

//producer();

//customer();

}

}

以上代码实现java连接kafka主题topic操作,简单实现了生产者与消费者。

kafka java连接操作

最美赏花季 桃花映濠河惹人醉 

相关推荐