rabbitmq批量处理

我们通过spring-amqp操作rabbitmq是极其简单的,消息的生产者和消费者只需要如下配置:

客户端(生产者):connectionFactory、queue、exchange、messageConverter、RabbitTemplate。

服务端(消费者):connectionFactory、queue、exchange、messageConverter、listenerContainer。

如果消息堆积严重,我们可以通过两种方式来处理消息,一种是在服务端开启监听多线程服务(concurrency="10"),另一种是让消息批量出队列。

开启多线程的配置示例如下:

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false"   
	      concurrency="10"   message-converter="jsonMessageConverter" > 
    	    <rabbit:listener ref="tradeListener" method="listen"  queues="queue_trade_repay" />
	</rabbit:listener-container>

批量出队列的示例如下:

客户端(消息生产者)

import java.math.BigDecimal;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.rd.account.domain.AccountLog;
import com.rd.ifaes.mq.producer.RabbitProducer;
import com.rd.ifaes.web.BaseTest;

/**
 * 消息生产者
 * @author lihua
 * @since 2018-04-08
 *
 */
public class Producer  extends BaseTest{
	
//	@Autowired
//	private RabbitTemplate rabbitTemplate;
	
	//这里对rabbitTemplate做了简单的封装,您可以直接使用rabbitTemplate
	@Autowired
	private RabbitProducer rabbitProducer;
	
	private static final String queueName = "ACCOUNT_LOG_BATCH"; //MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;

	@Test
	public void main() {
		for (int i = 0; i < 512; i++) {
			AccountLog log = new AccountLog("001", "001", "asdf", BigDecimal.valueOf(i), "remark"+i);
			rabbitProducer.send(queueName, log);
//			rabbitTemplate.convertAndSend(queueName, "hello" + i);
		}
		
	}
	
}

服务端(消息消费者)

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 消息消费者
 * @author lihua
 * @since 2018-04-08
 *
 */
public class Consumer extends BaseTest{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private AccountLogService accountLogService;
	private static final String queueName = "ACCOUNT_LOG_BATCH"; //MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;
	private static final int BATCH_SIZE = 100;
	
	@Test
	public void consumer() {
    	while (true) {
    		rabbitTemplate.execute(new ChannelCallback<String>() {
    			@Override
    			public String doInRabbit(Channel channel) throws Exception {
    				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    		        try {
    		            final AMQP.Queue.DeclareOk ok = channel.queueDeclare(queueName, true, false, false, null);
    		            int messageCount = ok.getMessageCount();
    		            LOGGER.info("run consumer {}, msg count {}", sdf.format(new Date()), messageCount);
    		            if (messageCount == 0) {
    		                return null;
    		            }
    		            List<AccountLog> list = new ArrayList<>();
    		            channel.basicQos(BATCH_SIZE);
    		            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    		            LOGGER.info("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
    		            final String inConsumerTag = "test consumer" + sdf.format(new Date());
    		            channel.basicConsume(queueName, false, inConsumerTag, queueingConsumer);
    		            long messageId = -1;
    		            int dealedCount = 0;
    		            int i = BATCH_SIZE;
    		            while (i-- > 0) {
    		                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(BATCH_SIZE);
    		                if (delivery == null) {
    		                    break;
    		                }
    		                String msg = new String(delivery.getBody());
    		                AccountLog log = JSONObject.parseObject(msg, AccountLog.class);
    		                list.add(log);
    		                messageId = delivery.getEnvelope().getDeliveryTag();
    		                LOGGER.info("get message {} delivery id {}", msg, messageId);
    		                dealedCount++;
    		                if (dealedCount % 5 == 0) {
    		                    channel.basicAck(messageId, true);
    		                    LOGGER.info("batch ack message id =>{}", messageId);
    		                    messageId = -1;
    		                }
    		            }
    		            if (messageId > 0) {
    		                channel.basicAck(messageId, true);
    		                LOGGER.info("last to ack message id =>{}", messageId);
    		            }
    		            
    		            // 日志入库
    		            accountLogService.saveBatch(list);
    		            
    		        } finally {
    		            LOGGER.info("consumer done {}", sdf.format(new Date()));
    		        }
    		        channel.abort();
    				return null;
    			}
    		});
			
    		try {
    			Thread.sleep(5000);
    		} catch (InterruptedException e) {
    			
    		}
		}
	}

}
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring-context.xml"})
public abstract class BaseTest {

}

补一个服务端真实案例:

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;


@Component
@Lazy(value=false)
public class AccountLogBatchListener {
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private AccountLogService accountLogService;
	
	private static final Logger LOGGER = LoggerFactory.getLogger(AccountLogBatchListener.class);
	private static final String QUEUE_NAME = MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;
	private static final ExecutorService executor = Executors.newFixedThreadPool(1);
	private static final int BATCH_SIZE = 100;
	
	@PostConstruct
	public void init(){		
		executor.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				execute();
				return null;
			}			
		});
	}
	
	private void execute(){		
		while (true) {
			rabbitTemplate.execute(new ChannelCallback<String>() {
				@Override
				public String doInRabbit(Channel channel) throws Exception {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
					try {		        	
						final AMQP.Queue.DeclareOk ok = channel.queueDeclare(QUEUE_NAME, true, false, false, null);
						int messageCount = ok.getMessageCount();
						LOGGER.debug("accountLogBatchListener {}, msg count {}", sdf.format(new Date()), messageCount);
						if (messageCount == 0) {
							return null;
						}
						List<AccountLog> list = new ArrayList<>();
						channel.basicQos(BATCH_SIZE);
						QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
						LOGGER.debug("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
						final String inConsumerTag = "accountLogBatchListener {}" + sdf.format(new Date());
						channel.basicConsume(QUEUE_NAME, false, inConsumerTag, queueingConsumer);
						long messageId = -1;
						int dealedCount = 0;
						int i = BATCH_SIZE;
						while (i-- > 0) {
							QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(BATCH_SIZE);
							if (delivery == null) {
								break;
							}
							String msg = new String(delivery.getBody());
							AccountLog log = JSONObject.parseObject(msg, AccountLog.class);
							list.add(log);
							messageId = delivery.getEnvelope().getDeliveryTag();
							LOGGER.info(" userId {}, delivery id {}", log.getUserId(), messageId);
							dealedCount++;
							if (dealedCount % 5 == 0) {
								channel.basicAck(messageId, true);
								LOGGER.debug("batch ack message id =>{}", messageId);
								messageId = -1;
							}
						}
						if (messageId > 0) {
							channel.basicAck(messageId, true);
							LOGGER.debug("last to ack message id =>{}", messageId);
						}
						
						// 日志入库
						accountLogService.saveBatch(list);
						
						
					} finally {
						LOGGER.info("accountLogBatchListener done {}", sdf.format(new Date()));
					}
					channel.abort();
					return null;
				}
			});
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
			}
		}
	}

}

相关推荐