RabbitMQ spring 使用总结

rabbitMQ相关概念不在本文介绍范围,rabbitMQ官网和其他博客都有大量介绍。

本文重点内容是spring和rabbit环境搭建以及使用中注意事项总结。

1.1rabbitMQ服务器搭建

下载安装官网最新版本服务器

1.2rabbitMQ开启服务管理

rabbitMQstart启动

1.3springpom配置

<spring-rabbit.version>1.3.9.RELEASE</spring-rabbit.version>

<!--消息队列rabbitmq-->

<dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp-client</artifactId>

<version>${rabbitmq-client.version}</version>

</dependency>

<dependency>

<groupId>org.springframework.amqp</groupId>

<artifactId>spring-rabbit</artifactId>

<version>${spring-rabbit.version}</version>

</dependency>

1.4springconfig配置

在D:\workspace\sps\src\main\resources\spring-rabbitmq.xml

配置如下:

<?xmlversion="1.0"encoding="UTF-8"?>

<beansxmlns:mvc="http://www.springframework.org/schema/mvc"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xmlns="http://www.springframework.org/schema/beans"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/mvc

http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<mvc:annotation-driven/>

<rabbit:connection-factoryid="connectionFactory"host="${rabbitmq.master.ip}"port="${rabbitmq.master.port}"username="${rabbitmq.master.username}"password="${rabbitmq.master.password}"/>

<rabbit:templateid="amqpTemplate"connection-factory="connectionFactory"

exchange="order_topic_exchange"message-converter="gsonConverter"/>

<rabbit:adminconnection-factory="connectionFactory"/>

<rabbit:queuename="orderQueue"durable="true"/>

<rabbit:queuename="orderPayQueryQueue"durable="true"auto-delete="false"exclusive="false">

<rabbit:queue-arguments>

<entrykey="x-message-ttl">

<valuetype="java.lang.Long">600000</value>

</entry>

<entrykey="x-dead-letter-exchange"value="pay_delay_exchange"/>

</rabbit:queue-arguments>

</rabbit:queue>

<rabbit:queuename="orderPayDelayQueryQueue"durable="true"/>

<rabbit:topic-exchangename="pay_delay_exchange">

<rabbit:bindings>

<rabbit:bindingqueue="orderPayDelayQueryQueue"pattern="orderPay.#"/>

</rabbit:bindings>

</rabbit:topic-exchange>

<rabbit:topic-exchangename="order_topic_exchange">

<rabbit:bindings>

<rabbit:bindingqueue="orderQueue"pattern="sps.#"/>

<rabbit:bindingqueue="orderPayQueryQueue"pattern="orderPay.#"/>

</rabbit:bindings>

</rabbit:topic-exchange>

<rabbit:listener-containerconnection-factory="connectionFactory"acknowledge="manual"concurrency="10">

<rabbit:listenerqueues="orderQueue"ref="orderQueueListener"/>

</rabbit:listener-container>

<beanid="orderQueueListener"class="com.supuy.sps.services.queue.OrderQueueListener"/>

<beanid="gsonConverter"class="com.supuy.core.mq.Gson2JsonMessageConverter"/>

</beans>

1.5延迟消息队列

有时候,因为各种原因,我们想实现延迟消费的目的,但是rabbitMQ并没有提供这个功能,这时候,可以通过x-message-ttl和x-dead-letter-exchange实现。

<rabbit:queuename="orderPayQueryQueue"durable="true"auto-delete="false"exclusive="false">

<rabbit:queue-arguments>

<entrykey="x-message-ttl">

<valuetype="java.lang.Long">600000</value>

</entry>

<entrykey="x-dead-letter-exchange"value="pay_delay_exchange"/>

</rabbit:queue-arguments>

</rabbit:queue>

1.6生产者

@Override

publicvoidorderBuilder(inttype,StringorderCode){

Stringkey="tps."+orderCode;

orderCode=type+"."+orderCode;

amqpMaster.convertAndSend(key,orderCode);

logger.info("订单加入消息队列,订单编码:{}",key);

}

1.7消费者

packagecom.supuy.tps.service.queue;

importcom.alibaba.fastjson.JSON;

importcom.rabbitmq.client.Channel;

importcom.supuy.tps.common.mq.Gson2JsonMessageConverter;

importcom.supuy.tps.dto.bean.WmsOrderParam;

importcom.supuy.tps.service.IOrderShopService;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importorg.springframework.amqp.core.Message;

importorg.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

importorg.springframework.beans.factory.annotation.Autowired;

/**

*Createdbybillon2016/5/31.

*/

publicclassOrderSendQueueListenerimplementsChannelAwareMessageListener{

privatestaticLoggerlogger=LoggerFactory.getLogger(OrderSendQueueListener.class);

@Autowired

privateGson2JsonMessageConvertermessageConverter;

@Autowired

privateIOrderShopServiceorderShopService;

@Override

publicvoidonMessage(Messagemessage,Channelchannel)throwsException{

channel.basicQos(100);

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

Stringdata=(String)messageConverter.fromMessage(message);

if(data!=null){

WmsOrderParamwmsOrderParam=JSON.parseObject(data,WmsOrderParam.class);

if(wmsOrderParam!=null){

wmsOrderParam.setOrderCode(wmsOrderParam.getOrderCode().substring(1));

orderShopService.pushOrderLogInfo(wmsOrderParam);

}

}

}

}

附加类Gson2JsonMessageConverter实现如下,

packagecom.supuy.tps.common.mq;

importcom.google.gson.Gson;

importorg.apache.commons.logging.Log;

importorg.apache.commons.logging.LogFactory;

importorg.springframework.amqp.core.Message;

importorg.springframework.amqp.core.MessageProperties;

importorg.springframework.amqp.support.converter.AbstractJsonMessageConverter;

importorg.springframework.amqp.support.converter.ClassMapper;

importorg.springframework.amqp.support.converter.DefaultClassMapper;

importorg.springframework.amqp.support.converter.MessageConversionException;

importjava.io.IOException;

importjava.io.UnsupportedEncodingException;

publicclassGson2JsonMessageConverterextendsAbstractJsonMessageConverter{

privatestaticLoglog=LogFactory.getLog(Gson2JsonMessageConverter.class);

privatestaticClassMapperclassMapper=newDefaultClassMapper();

privatestaticGsongson=newGson();

publicGson2JsonMessageConverter(){

super();

}

@Override

protectedMessagecreateMessage(Objectobject,

MessagePropertiesmessageProperties){

byte[]bytes=null;

try{

StringjsonString=gson.toJson(object);

bytes=jsonString.getBytes(getDefaultCharset());

}

catch(IOExceptione){

thrownewMessageConversionException(

"FailedtoconvertMessagecontent",e);

}

messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);

messageProperties.setContentEncoding(getDefaultCharset());

if(bytes!=null){

messageProperties.setContentLength(bytes.length);

}

classMapper.fromClass(object.getClass(),messageProperties);

returnnewMessage(bytes,messageProperties);

}

@Override

publicObjectfromMessage(Messagemessage)

throwsMessageConversionException{

Objectcontent=null;

MessagePropertiesproperties=message.getMessageProperties();

if(properties!=null){

StringcontentType=properties.getContentType();

if(contentType!=null&&contentType.contains("json")){

Stringencoding=properties.getContentEncoding();

if(encoding==null){

encoding=getDefaultCharset();

}

try{

Class<?>targetClass=getClassMapper().toClass(

message.getMessageProperties());

content=convertBytesToObject(message.getBody(),

encoding,targetClass);

}

catch(IOExceptione){

thrownewMessageConversionException(

"FailedtoconvertMessagecontent",e);

}

}

else{

log.warn("Couldnotconvertincomingmessagewithcontent-type["

+contentType+"]");

}

}

if(content==null){

content=message.getBody();

}

returncontent;

}

privateObjectconvertBytesToObject(byte[]body,Stringencoding,

Class<?>clazz)throwsUnsupportedEncodingException{

StringcontentAsString=newString(body,encoding);

returngson.fromJson(contentAsString,clazz);

}

@Override

publicClassMappergetClassMapper(){

returnnewDefaultClassMapper();

}

}

1.8Q&A

1ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。

2接受消息之后,出现错误,该消息就会被持续占有,无法消费。所以,要活用消息的ack,nack,reject。

相关推荐