Consumer启动后会立即触发一次「重平衡」操作,给自己分配MessageQueue,对于新分配的MessageQueue会提交拉取请求,开始拉取消息进行消费。应用在消费消息时,返回消费状态CONSUME_SUCCESS或RECONSUME_LATER,如果消息消费失败,消息并不会丢失,Broker会在稍后一段时间重新投递该消息,如果超过16次都消费失败,Broker会认为Consumer已经不具备消费这条消息的能力,会将消息扔到死信队列,这个时候就需要人工介入处理了。
以上是RocketMQ消费者消息重试的大致流程,具体的实现原理是怎样的呢?通过源码分析下吧。
PullMessageService是消息拉取服务,它是一个单独的线程池,Consumer重平衡时,对于新分配MessageQueue会提交消息拉取请求交给它处理。
接收到消息拉取请求后,线程会被唤醒,通过PullAPIWrapper类给Broker发送消息拉取请求。Broker给Consumer投递消息后,PullMessageService会提交消息消费请求给ConsumeMessageService处理。
// 构建消费请求,让消费者去消费消息 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
ConsumeMessageService是消息消费服务,它也是一个单独的线程池,拉取到消息后会调用它的submitConsumeRequest方法提交消费请求到线程池等待被调度执行。它有两个实现类,ConsumeMessageConcurrentlyService用来并发消费消息,ConsumeMessageOrderlyService用来消费顺序消息,RocketMQ可以严格保证消息有序,本文暂且只看前者。
// 创建消费请求 ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { // 提交到线程池执行 this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); }
ConsumeRequest提交到线程池后开始异步执行,它的run方法会调用开发者注册的MessageListener对消息进行消费。只有返回CONSUME_SUCCESS才代表消息成功消费,返回NULL、消费超时、消费抛异常都算消费失败。
ConsumeConcurrentlyStatus status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
随后,调用processConsumeResult方法处理消费结果。
- 相关数据的统计。
- 集群模式下,消费失败的消息回传给Broker。
- 回传失败,提交异步任务5秒后消费重试。
- 上报消费位点,但不包括回传失败的消息。
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) { // ackIndex前的消息代表成功,之后的代表失败 int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; // 统计数据 switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: // 消费失败,重置ackIndex,整批消息都需要重新处理 ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING:// 广播模式下,直接丢掉 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING:// 集群模式下,回传给Broker // 消息回传失败列表 ListmsgBackFailed = new ArrayList (consumeRequest.getMsgs().size()); // ackIndex之前的消息代表消费成功,之后的消息失败需要发回Broker for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); // 回传给Broker boolean result = this.sendMessageBack(msg, context); if (!result) { // 回传失败,暂存到msgBackFailed,稍后重新消费 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { // 回传失败的消息从msgs删除,不会上报失败消息的位点 consumeRequest.getMsgs().removeAll(msgBackFailed); // 提交消费请求,5s后消费重试 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { // 上报消费位点,不包含发回Broker失败的消息 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
Consumer使用TreeMap存储拉取到的消息,Key是消息的Offset,Value是消息。红黑树结构,因此Key是有序的。上报的消费位点取的是最小的Key,即最小的Offset。
广播模式下,消费失败消息会丢弃。集群模式下,消费失败的消息需要回传给Broker,对应的方法是sendMessageBack。由于Consumer恢复需要一定的时间,如果Broker立马重新投递该消息,大概率还是会消费失败,所以Broker会将消息发送到延时队列中,隔一段时间再重新投递。延时多久,通过delayLevelWhenNextConsume属性来设置:-1代表不重试,直接扔到死信队列;0代表由Broker控制重试频率;>0代表由客户端控制重试频率。
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { // 消息重试的延时级别 int delayLevel = context.getDelayLevelWhenNextConsume(); msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic())); try { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); } return false; }
消息如果回传失败,会再重试一次。第一次是正常的消息回传,发送CONSUMER_SEND_MSG_BACK命令,Broker根据消息Offset从CommitLog中读取消息,再重设Topic为重试队列,延时级别为3+重试次数,将消息重新写入CommitLog。
第一次回传如果失败,RocketMQ会进行一次重试,第二次会构建一个新的Message对象,拷贝原Message的内容和属性,重设Topic为重试队列,重设延时级别,发送一个新的消息给Broker。两者的逻辑是类似的。
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); // 第一次 消息回传 this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); // 回传失败,再试一次,发送延时消息 Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED); // 延时级别 newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } finally { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } }
虽然Topic设置的是重试队列,但由于是延时消息,Broker存储消息时会改写Topic,放到延时队列中,然后由异步线程定时扫描这些消息。消息的延时时间一到,ScheduleMessageService会将需要交付的消息重新写回CommitLog,因为Consumer启动时会自动订阅重试Topic,所以Broker会将这些消息重新投递给Consumer消费,然后重复上面的流程。
消息的重试次数会记录在消息的Properties中,消息重试的延时级别会随着重试次数的增多而变大,消息重试次数越多,重试的间隔时间会越长,默认的重试间隔策略为3+reconsumeTimes,即10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h...,最大重试次数为16,超过最大重试次数,Broker会认为Consumer已经不具备消费这条消息的能力了,会将消息投递到死信队列。
3. 总结ConsumeMessageService在消费消息时,会根据消费状态去处理消费结果。消费成功的处理很简单,从本地缓存中删除消息,然后上报消费位点即可。
对于消费失败的消息,Consumer会将其重新回传给Broker,然后将这一批消息视为消费成功,直接上报消费位点即可。但是消息回传给Broker的过程可能会失败,一旦失败就不能上报这些消息的消费位点了,否则消息就丢失了,Consumer会先提交一个异步任务,5秒后重新消费这批回传失败的消息,然后上报位点时忽略这些回传失败的消息Offset。
Broker在接收到Consumer回传的消息后,会根据重试次数设置延时级别,将消息暂时投递到延时队列中,然后异步线程定时扫描延时消息,消息到期后再将其重新投递到重试队列中。由于Consumer启动时会自动订阅自身所在Group的重试Topic的,自然也就能正常消费重试消息了。