本文主要分析RocketMQ中如何保证消息有序的 。
RocketMQ的版本为:4.2.0 release 。
一.时序图
还是老规矩,先把分析过程的时序图摆出来:
1.Producer发送顺序消息
2.Consumer接收顺序消息(一)
3.Consumer接收顺序消息(二)
二.源码分析 – Producer发送顺序消息
1 DefaultMQProducer#send:发送消息,入参中有自定义的消息队列选择器 。
// DefaultMQProducer#send public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); }1.1 DefaultMQProducerImpl#makeSureStateOK:确保Producer的状态是运行状态-ServiceState.RUNNING 。
// DefaultMQProducerImpl#makeSureStateOK private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, "+ this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:根据Topic获取发布Topic用到的路由信息 。
// DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 为空则从 NameServer更新获取,false,不传入 defaultMQProducer topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 有了路由信息而且状态OK,则返回 return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }1.3 调用自定义消息队列选择器的select方法 。
// DefaultMQProducerImpl#sendSelectImpl MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } // Producer#main SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);【rocketmq源码解析 rocketmq源码部署】1.4 DefaultMQProducerImpl#sendKernelImpl:发送消息的核心实现方法 。
// DefaultMQProducerImpl#sendKernelImpl ...... switch (communicationMode) { case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; ......1.4.1 MQClientAPIImpl#sendMessage:发送消息 。
// MQClientAPIImpl#sendMessage ...... switch (communicationMode) {// 根据发送消息的模式(同步/异步)选择不同的方式,默认是同步 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); ......
推荐阅读
- jdk源码阅读顺序 jdk源码垃圾
- 异星探险家astroneer蓄电池作用解析 蓄电池干什么用的?
- 异星探险家astroneer过滤器作用解析 过滤器有什么作用
- 拉结尔天赋树加点流派解析 弓箭手满级成型加点攻略_天赋选择
- 《拉结尔》斗兽之王赛季套装解析 磐石赛季装备一览
- 拉结尔控制魂灯功能解析 如何利用魂灯快速提升英雄等级
- 侍魂胧月传说红魔炼狱怎么打 红魔炼狱玩法解析
- 侍魂胧月传说试合竞技怎么打 试合竞技玩法解析
- 我的世界恶魂为什么会哭 哭泣的恶魂解析
- 异星探险家Astroneer基地板作用解析 基地板有什么作用