Kafka与Spring的集成
producer
public class KafkaServiceImpl implements KafkaService {
private Producer<byte[], byte[]> inner;
private Properties properties;
public void setInner(Producer<byte[], byte[]> inner) {
this.inner = inner;
}
public void setProperties(Properties properties) {
this.properties = properties;
}
public void init() throws IOException {
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer<byte[], byte[]>(config);
}
@Override
public void sendMessage(String topicName, byte[] message) {
if (topicName == null || message == null) {
return;
}
KeyedMessage<byte[], byte[]> km = new KeyedMessage<byte[], byte[]>(topicName, "".getBytes(), message);
inner.send(km);
}
}配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.0.xsd">
<bean id="producerConfig" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>config/properties/producer.properties</value>
</list>
</property>
</bean>
<bean id="kafkaService" class="com.aiyou.gamecloud.kafka.KafkaServiceImpl" init-method="init">
<property name="properties" ref="producerConfig"/>
</bean>
</beans>property
metadata.broker.list=192.168.113.181:9092 producer.type=async compression.codec=0 #serializer.class=kafka.serializer.StringEncoder #key.serializer.class=kafka.serializer.StringEncoder
编解码部分如果使用字符串需要设置,默认是字节数组
使用异(async)通信时,消息队列默认发送时间间隔由queue.buffering.max.ms决定(kafka.producer.async.AsyncProducerConfig中),默认时间间隔为5000ms,也就是说异步方式默认每5s发送一次消息
Customer
import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.QueueChannel; import org.springframework.messaging.Message; import com.google.protobuf.InvalidProtocolBufferException; import kafka.consumer.Consumer; /** * @project: gate * @Title: KafkaCustomerService.java * @author: chenpeng * @email: [email protected] * @date: 2016年1月14日下午1:40:47 * @description: * @version: */ public class KafkaCustomerService { private static final Logger logger = LoggerFactory.getLogger(KafkaCustomerService.class); private static final String CONFIG = "kafka-customer-config.xml"; private static Random rand = new Random(); public static void main(String[] args) { final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer.class); ctx.start(); final QueueChannel channel = ctx.getBean("inputFromKafka", QueueChannel.class); Message msg; while ((msg = channel.receive()) != null) { HashMap map = (HashMap) msg.getPayload(); System.out.println("Here in disb ================" + map.size()); Set<Map.Entry> set = map.entrySet(); for (Map.Entry entry : set) { String topic = (String) entry.getKey(); System.out.println("Topic:" + topic); ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry .getValue(); Collection<List<byte[]>> values = messages.values(); for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) { List<byte[]> list = iterator.next(); System.out.println("================" + list.size()); for (byte[] bytes : list) { try { BroadcastMessage message = BroadcastMessage.parseFrom(bytes); logger.debug(message.getGameId()); } catch (InvalidProtocolBufferException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.close(); } }
配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <int:channel id="inputFromKafka" ><int:queue/></int:channel> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="false" channel="inputFromKafka"> <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="50" /> </int-kafka:inbound-channel-adapter> <int-kafka:consumer-context id="consumerContext" consumer-timeout="10" zookeeper-connect="zookeeperConnect" > <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default" max-messages="5000"> <int-kafka:topic id="websocket_01" streams="1" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="192.168.113.181:2121" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="200" /> </beans:beans>
这里需要注意两个参数:
fixed-delay:即从上一个任务完成开始到下一个任务开始的间隔,单位是毫秒。即每次sleep的时间间隔。
fixed-rate:即从上一个任务开始到下一个任务开始的间隔,单位是毫秒。即每次获取消息的时间间隔。
consumer-timeout:如果在指定的时间间隔后,没有发现可用的消息可消费,则抛出一个timeout异常,我的理解是处理完一个消息后等待fixed-delay+consumer-timeout时间间隔,如果还没消息就重连(不知道理解的对不对,不过实验证明将consumer-timeout值修改后会影响接收消息的频率)
结合spring,还可以使用SpringIntegration方式进行配置
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
<int:channel id="inputFromKafka"></int:channel>
<int:service-activator auto-startup="true"
input-channel="inputFromKafka" ref="disbService" method="distribute">
</int:service-activator>
<int:poller default="true" id="default" fixed-rate="5"
time-unit="MILLISECONDS">
</int:poller>
<int-kafka:inbound-channel-adapter
kafka-consumer-context-ref="consumerContext" channel="inputFromKafka">
</int-kafka:inbound-channel-adapter>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="5" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="default" max-messages="5000">
<int-kafka:topic id="${gateId}" streams="1" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="192.168.113.181:2121" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
</beans:beans>service实现
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
@Service("disbService")
public class DisbService {
private static final Logger logger = LoggerFactory.getLogger(DisbService.class);
@Value("#{configProperties['server.requestType']}")
private String requestType = ERequestType.SOCKET.getValue();
@Autowired
private CacheService cacheService;
@SuppressWarnings({ "rawtypes", "unchecked" })
public void distribute(HashMap map) {
System.out.println("Here in disb ================" + map.size());
Set<Map.Entry> set = map.entrySet();
for (Map.Entry entry : set) {
String topic = (String) entry.getKey();
logger.debug("Topic:" + topic);
ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry
.getValue();
Collection<List<byte[]>> values = messages.values();
for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
List<byte[]> list = iterator.next();
System.out.println("================" + list.size());
for (byte[] bytes : list) {
// 这里获取到的是广播的信息
try {
if (ERequestType.HTTP.getValue().equals(requestType)) {
// 缓存起来!!!
} else {
Message.BroadcastMessage message = Message.BroadcastMessage.parseFrom(bytes);
if (message.getUserIdsList().isEmpty()) {
List<ChannelCache> channelList = cacheService.getGameChannelList(topic,
message.getGameId(), message.getServerId());
for (ChannelCache channelCache : channelList) {
sendMessage(channelCache, message);
}
}
for (String userId : message.getUserIdsList()) {
ChannelCache channelCache = cacheService.getCachedChannel(message.getGameId(),
message.getServerId(), userId);
sendMessage(channelCache, message);
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
}
}
private void sendMessage(ChannelCache channelCache, Message.BroadcastMessage message) {
if (channelCache != null) {
switch (ERequestType.parse(requestType)) {
case HTTP:
break;
case SOCKET:
try {
if (channelCache.getChannel().isActive()) {
ByteBuf messageData = Unpooled.buffer();
messageData.writeInt(message.getMessage().toByteArray().length);
messageData.writeBytes(message.getMessage().toByteArray());
channelCache.getChannel().writeAndFlush(messageData).sync();
} else {
cacheService.userLogout(channelCache.getChannel().hashCode());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case WEBSOCKET:
try {
if (channelCache.getChannel().isActive()) {
ByteBuf messageData = Unpooled.buffer();
messageData.writeInt(message.getMessage().toByteArray().length);
messageData.writeBytes(message.getMessage().toByteArray());
channelCache.getChannel().writeAndFlush(new BinaryWebSocketFrame(messageData)).sync();
} else {
cacheService.userLogout(channelCache.getChannel().hashCode());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
default:
break;
}
}
}
} 相关推荐
yupi0 2020-10-10
spring 2020-08-18
编程点滴 2020-07-29
幸运小侯子 2020-07-05
itjavashuai 2020-07-04
qingjiuquan 2020-06-29
shushan 2020-06-25
小鱿鱼 2020-06-22
咻pur慢 2020-06-18
melonjj 2020-06-17
qingjiuquan 2020-06-13
neweastsun 2020-06-05
小鱿鱼 2020-06-05
mxcsdn 2020-05-31
吾日五省我身 2020-05-27
牧场SZShepherd 2020-05-27
sweetgirl0 2020-05-14