聊聊rocketmq的maxReconsumeTimes
序
本文主要研究一下rocketmq的maxReconsumeTimes
maxReconsumeTimes
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { private final InternalLogger log = ClientLogger.getLog(); //...... /** * Max re-consume times. -1 means 16 times. * </p> * * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion * queue waiting. */ private int maxReconsumeTimes = -1; //...... public int getMaxReconsumeTimes() { return maxReconsumeTimes; } public void setMaxReconsumeTimes(final int maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } //...... }
- DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1
sendMessageBack
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } finally { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } private int getMaxReconsumeTimes() { // default reconsume times: 16 if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { return 16; } else { return this.defaultMQPushConsumer.getMaxReconsumeTimes(); } } //...... }
- DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息
handleRetryAndDLQ
rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { //...... private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) { String newTopic = requestHeader.getTopic(); if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return false; } int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); msg.setTopic(newTopic); msg.setQueueId(queueIdInt); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return false; } } } int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; } msg.setSysFlag(sysFlag); return true; } //...... }
- SendMessageProcessor的handleRetryAndDLQ方法会判断如果topic是RETRY_GROUP_TOPIC_PREFIX(
%RETRY%
)开头的,会先从subscriptionGroupConfig.getRetryMaxTimes()获取maxReconsumeTimes,对于mq版本大于等于MQVersion.Version.V3_4_9.ordinal()的则会从request的header中读取maxReconsumeTimes;之后从request的header读取reconsumeTimes,如果该值大于等于maxReconsumeTimes则更新newTopic为MixAll.getDLQTopic(groupName)
小结
DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1;DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息;broker端会判断reconsumeTimes如果大于等于maxReconsumeTimes则会将其topic改为MixAll.getDLQTopic(groupName)