Shellbye.github.io
Shellbye.github.io copied to clipboard
《RocketMQ 技术内幕》读书笔记
第 1 章 阅读源代码前的准备
1.3 RocketMQ 的设计理念与目标
1.3.1 设计理念
基于主题的发布、订阅模式
整体设计追求简单与性能第一:
a. nameserver 设计极其简单,抛弃了 zookeeper,
topic 路由信息无需在集群之间保持强一致,追求最终一致,
能容忍分钟级别的不一致,所以 nameserver 之间不通信。
带来的好处有:
1. 降低了 nameserver 的实现的复杂度
2. 对网络的要求也降低了不少
3. 性能获得了极大的提升
b. 高效的 IO 存储机制
RocketMQ 追求消息发送的高吞吐量,其消息存储文件设计成文件组的概念,
组内单个文件大小固定,方便引入内存映射机制,所有主题的消息存储基于顺序写,
极大的提高了写性能,同时为了兼顾消息消费和查找,引入了消息消费队列与索引文件
c. 容忍存在设计缺陷
为了保证消息一定送到,容忍了消息的重复消费
极大的简化了消息中间件的内核
1.3.2 设计目标
1. 架构模式
2. 顺序消费
3. 消息过滤
broker 端过滤
消费端过滤
4. 消息存储
5. 消息高可用
5.1 broker 正常关机
5.2 broker 异常奔溃
5.3 OS 奔溃
5.4 机器断电
5.5 机器无法开机
5.6 磁盘设备损坏
1--4 同步刷盘解决
5--6 异步复制/双写机制
6. 消息消费低延迟
7. 确保消息至少被消费一次
8. 回溯消息
重新消费
9. 消息堆积
10. 定时消息
11. 消息重试机制
第 2 章 RocketMQ 的路由中心 nameserver
2.1 nameserver 架构设计
broker 启动时像 nameserver 注册,生产者启动时从 nameserver 获取 broker 列表。
nameserver 与 broker 保持长链接,并每 30s 检查 broker 是否存活,若不存活,
则从注册表中删除之,但是并不通知生产者,这样是为了降低 nameserver 的复杂性,
发送端提供容错机制来保证消息发送的高可用。
2.2 nameserver 启动流程
1. 解析配置文件,填充 NameServerConfig NettyServerConfig
2. 创建 NamesrvController,核心控制器
定时任务
3. 注册 JVM 钩子函数并启动服务器,监听 broker 、 生产者的网络请求
2.3 nameserver 路由注册、故障剔除
2.3.1 路由元信息
topicQueueTable
brokerAddrTable
broker 基础信息: name,集群地址,主备地址
clusterAddrTable
集群中 broker 信息
brokerLiveTable
broker 状态信息
filterServerTable
2.3.2 路由注册
1. broker 发送心跳包
2. nameserver 处理心跳包
路由注册需要加写锁
2.3.3 路由删除
触发点
1. 每隔 10s 扫描 brokerLiveTable,lastUpdateTimestamp 距今超过 120s 没有反应就删除
2. broker 正常关闭是删除
2.3.4 路由发现
非实时,而是等待客户端定时拉取最新路由信息
2.4 本章小结
第 3 章 RocketMQ 消息发送
3.1 漫谈 RocketMQ 消息发送
三种消息发送方式
同步
异步
单向
3.2 认识 RocketMQ 消息
3.3 生产者启动流程
3.3.1 初始 DefaultMQProducer 消息发送者
3.3.2 消息生产者启动流程
1. 检查 ProducerGroup 是否符合要求,并改变生产者 instanceName 为进程 id
2. 创建 MQClientInstance 实例,整个 JVM 中只有一个,封装了 MQ 的网络处理 API,
是消费者和生产者与 nameserver 、 broker 打交道的通道
3. 将当前生产者注册到 MQClientInstance 中管理
4. 启动 MQClientInstance
3.4 消息发送基本流程
验证消息 -> 查找路由 -> 消息发送(异常处理)
3.4.1 消息长度验证
最大长度 4M
3.4.2 查找主题路由信息
3.4.3 选择消息队列
1. 默认机制 sendLatencyFaultEnable = false
2. broker 故障延迟机制
3.4.4 消息发送
1. 根据 MessageQueue 查找 Broker 的网络地址。
2. 为消息分配全局 id
3. 执行发送钩子函数
4. 构建消息发送请求包
5. 网络传输
6. 执行钩子函数 after 逻辑
3.5 批量消息发送
第 4 章 RocketMQ 消息存储
4.1 存储概要设计
RocketMQ 是一款高性能的中间件,存储部分的设计是核心,存储的核心是 IO 访问性能
所有主题的消息顺序写入同一个文件,保证高性能和高吞吐。
但是这样给按照主题消费带来了问题,因此引入了 ConsumeQueue 消费队列文件,
每个主题有多个消费队列,每个队列有一个消息文件。
IndexFile 索引文件,主要是为了加速检索消息,
根据消息的属性从 CommitLog 中快速检索文件
主要文件
CommitLog
消息存储文件,所有主题的消息都存在这里
ConsumeQueue
消息消费队列,消息到达 CommitLog 之后,异步到达 ConsumeQueue,供消费
IndexFile
消息索引文件,存储 key 和 offset 的对应关系
4.2 初始消息存储
消息存储实现类 org.apache.rocketmq.store.DefaultMessageStore
MessageStoreConfig
消息存储配置属性
CommitLog
CommitLog 文件的存储实现类
ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable
消息队列存储缓存表,按照消息主题分组
FlushConsumeQueueService
消息队列文件 ConsumeQueue 刷盘线程
CleanCommitLogService
清除 CommitLog 文件服务
CleanConsumeQueueService
清除 ConsumeQueue 文件服务
IndexService
索引文件 IndexFile 实现类
AllocateMappedFileService
MappedFile 分配服务
ReputMessageService
CommitLog 消息分发,根据 CommitLog 构建 ConsumeQueue 、 IndexFile 文件
HAService
存储 HA 机制
TransientStorePool
消息堆内存缓存
MessageArrivingListener
消息拉取长轮询模式 消息到达监听器
BrokerConfig
broker 配置属性
StoreCheckpoint
文件刷盘检测点
LinkedList<CommitLogDispatcher> dispatcherList
CommitLog 转发请求
4.3 消息发送存储流程
消息存储入口 org.apache.rocketmq.store.DefaultMessageStore#putMessage
1. broker 停止工作 或 broker 处于 slave 状态 或
不支持写入 或 主题长度超过限制 或 消息属性长度超过限制,则拒绝写入
2. 如果消息的延迟级别大于 0 ,将消息的原主题与原队列 id 存入消息的属性中,
用延迟消息主题 SCHEDULE_TOPIC 、 消息队列 id 更新原先消息的主题和队列 id 。
3. 获取当前可以写入的 CommitLog
MappedFileQueue 可以看作是 store/commitlog 下面的文件夹,
MappedFile 则对应文件夹下面的文件
4. 写入 CommitLog 之前,先申请 putMessageLock ,写入是串行的
5. 设置消息的存储时间,使用或创建 mappedile
6. 将消息追加到 MappedFile
7. 创建全局唯一消息 id ( 4字节id + 4字节端口 + 8字节消息偏移量 )
8. 获取该消息在消息队列中的偏移量, CommitLog 中保存了当前所有消息队列的当前待写入偏移量
9. 根据消息体的长度、主题的长度、属性的长度结合存储格式,计算消息的总长度
10. 如果消息长度 + END_FILE_MIN_BLANK_LENGTH 大于 CommitLog 的空闲空间,
则返回 AppendMessageStatus.END_OF_FILE , broker 会新建 CommitLog
11. 将消息内容存储到 ByteBuffer 中, 然后创建 AppendMessageResult ,
这里只是把消息存储到 MappedFile 对应的内存映射 ByteBuffer 中, 不刷盘
12. 更新消息队列逻辑偏移量 logigcsOffset
13. 释放锁
14. DefaultAppendMessageCallback#doAppend 只把消息追加到内存中
4.4 存储文件组织与内存映射
4.4.1 MappedFileQueue 映射文件队列
不同查询维度查找 MappedFile
根据消息存储时间戳来查询,从头开始找到第一个最后一次更新时间大于查询时间
根据消息偏移量来查询,不能直接用 offset % mappedFileSize,
因为前面的可能已经删除了,应该用 offset / mappedFileSize - mappedFile.getFileFromOffset / mappedFileSize
4.4.2 MappedFile 内存映射文件
1. MappedFile 初始化
2. MappedFile 提交(commit)
3. MappedFile 刷盘(flush)
4. 获取 MappedFile 最大读指针
5. MappedFile 销毁(destroy)
4.4.3 TransientStorePool
短暂的存储池
内存锁定,避免进程内存被交换到磁盘
4.5 RocketMQ 存储文件
概述
1. commitlog
消息存储目录
2. config
运行期间的配置信息,包含
2.1 consumerFilter.json
主题消息过滤信息
2.2 consumerOffset.json
集群消费模式消息消费进度
2.3 delayOffset.json
延时消息队列拉取进度
2.4 subscriptionGroup.json
消息消费组配置信息
2.5 topic.json
topic 配置信息
3. consumerqueue
消息消费队列存储目录
4. index
消息索引文件存储目录
5. abort
表示 broker 非正常关闭
6. checkpoint
存储各个文件的最后一次刷盘时间
4.5.1 CommitLog 文件
4.5.2 ConsumeQueue 文件
可以看成 CommitLog 关于消息的“索引”文件
不存储消息的全量信息,只存储 commitlog offset 等
4.5.3 Index 索引文件
===========【信息量很大】
4.5.4 checkpoint 文件
4.6 实时更新消息消费队列与索引文件
ReputMessageService 用来准实时转发 CommitLog 的更新事件到 ConsumeQueue 和 IndexFile
关键参数
reputFromOffset
执行一次休息 1 毫秒
过程
1. 读取从 reputFromOffset 开始的所有数据
2. 从 result 返回的 ByteBuffer 中循环读取消息,一次一条,创建 DispatchRequest 对象
最终分别调用 CommitLogDispatcherBuildConsumeQueue 或 CommitLogDispatcherBuildIndex
4.6.1 根据消息更新 ConsumeQueue
1. 根据消息主题和队列 id 获取对应的 ConsumeQueue 文件
2. 依次将消息偏移量、消息长度、 tag hashcode 写入到 ByteBuffer
4.6.2 根据消息更新 Index 索引文件
1. 获取或创建 IndexFile 文件并获取所有文件最大的物理偏移量
4.7 消息队列与索引文件的恢复
broker 宕机导致 CommitLog ConsumeQueue IndexFile 不一致
1. 判断上一次是否正常,通过 abort 文件是否存在判断
2. 加载延迟队列
3. 加载 CommitLog 文件
4. 加载消息消费队列
5. 加载存储检查点
6. 加载索引文件
7. 根据 broker 是否正常停止执行不同的恢复策略
8. ConsumeQueue 恢复之后,将在 CommmitLog 保存每个消息消费队列的存储逻辑偏移量
4.7.1 broker 正常停止文件恢复
1. 从倒数第三个文件开始恢复
2.
3. 遍历 CommitLog 文件
4. 更新 MappedFileQueue 的 flushedWhere 和 committedWhere
5. 删除 offset 之后所有的文件
4.7.2 broker 异常停止文件恢复
从最后一个文件开始往回走,找到第一个消息存储正常的文件,
如何判断一个消息文件是正确的文件呢?
1. 首先判断文件的魔数是否正确
2. 如果第一条消息的时间存储时间是 0 ,返回 false
3. 对比文件第一条消息的时间戳与检查点文件比较,小于则正常,则从该文件开始恢复
4. 根据前三步如果找到 MappedFile , 则遍历其中的消息
5. 如果没找到,commitlog 指针都置 0 ,删除消费队列文件
存储启动时所谓的文件恢复,主要完成 flushedPosition 和 committedWhere 指针的设置、
消息消费队列最大偏移量加载到内存 等操作
4.8 文件刷盘机制
RocketMQ 的存储和读写是基于 JDK NIO 的内存映射机制( MappedByteBuffer )的,
消息存储时先追加到内存,在根据不同的刷盘策略进行刷盘 MappedByteBuffer.force()
4.8.1 同步刷盘
1. 构建 GroupCommitRequest 同步任务并提交到 GroupCommitRequest
2. 等待同步刷盘任务完成
4.8.2 异步刷盘
transientStorePoolEnable 为 true
单独申请一个堆外内存,使用内存锁定方式被置换到磁盘,
消息先追加到堆外内存,然后递交到内存映射内存,再刷盘
1. 首先将消息直接追加到 ByteBuffer (堆外内存 DirectByteBuffer),
writePosition 随着消息的不断追加向后移动
2. CommitRealTimeService 线程默认每 200ms 将 ByteBuffer
新追加的内容 (writePosition - committedPosition)数据提交
到 MappedByteBuffer 中
3. MappedByteBuffer 在内存中提交追加的内容,writePosition 后移
4. commit 操作成功,将 committedPosition 后移本次递交内容的长度
5. FlushRealTimeService 每 500ms 将 MappedByteBuffer 中新追加的内容刷到磁盘
transientStorePoolEnable 为 false
直接追加到内存映射内存,刷盘
4.9 过期文件删除机制
每隔 10s 调度一次 clearFilesPeriodically
第 5 章 RocketMQ 消息消费
5.1 RocketMQ 消息消费概述
5.2 消息消费者初探
MQPushConsumer 核心属性
1. void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
发送消息 ACK 确认
2. Set<MessageQueue> fetchSubscribeMessageQueues(final String topic)
获取消费者对 topic 分配了哪些消费队列
3. void registerMessageListener(final MessageListenerConcurrently messageListener);
注册并发消息监听器
4. void registerMessageListener(final MessageListenerOrderly messageListener);
注册顺序消费消息监听器
5. void subscribe(final String topic, final MessageSelector selector)
基于主题订阅消息
6. void unsubscribe(final String topic);
取消订阅
5.3 消费者启动流程
1. 构建主题订阅信息 SubscriptionData , 并加入到 RebalanceImpl 的订阅关系中。
订阅关系来源主要有两个:
1. 调用 subscribe
2. 订阅重试主题消息,重试是以消费组为单位,而不是主题,
消息重试主题名为 %RETRY% + 消费组名
2. 初始化 MQClientInstance , RebalanceImpl (消息重新负载实现类)
3. 初始化消费进度,集群模式保存在 broker 上,广播模式保存在消费端
4. 根据是否顺序消费,创建消费线程服务。 ConsumeMessageService 主要负责消息消费,内部维护一个线程池
5. 向 MQClientInstance 注册消费者
5.4 消息拉取
RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取
5.4.1 PullMessageService 实现机制
run 方法的核心要点
1. pullRequestQueue 中获取一个 PullRequest 消息拉取任务,队列为空则阻塞
2. 调用 pullRequest 进行消息拉取
pullRequest 的添加时机
PullMessageService 提供延迟添加与立即添加 2 种方式将 pullRequest 放入 pullRequestQueue 中
pullRequest 的创建时机
1. PullRequest 执行完一次消息拉取任务之后
2. RebalanceImpl 中创建
PullRequest 核心属性
String consumerGrououp
MessageQueue messageQueue
ProcessQueue processQueue
消息处理队列,从 broker 拉取到的消息先存入 ProcessQueue,
然后再递交到消费者消费线程池处理
long nextOffset
Boolean lockedFirst
5.4.2 ProcessQueue 实现机制
ProcessQueue 是 MessageQueue 在消费端的重现、快照
1. ProcessQueue 核心属性
TreeMap<Long, MessageExt> msgTreeMap
消息存储容器,键为消息在 MessageQueue 中的偏移量
TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap
处理顺序消息时使用
2. ProcessQueue 核心方法
5.4.3 消息拉取基本流程
1. 客户端封装消息拉取请求
1. 从 PullRequest 中获取 ProcessQueue
2. 进行消息拉取流控,从消息消费数量和消息消费间隔两个维度进行控制
1. 消息处理总数,若超过 pullThresholdForQueue = 1000,放弃本次拉取,打印日志
the cached message count exceeds the threshold {}, so do flow control
2. ProcessQueue 中的最大和最小偏移量,不能超过 consumeConcurrentlyMaxSpan = 2000,
触发流量控制,打印 the queue's messages, span too long, so do flow control
防止消息堵塞,造成大量消息重复消费
3. 拉取该主题订阅消息
4. 构建消息拉取系统标识
FLAG_COMMIT_OFFSET
表示从内存中读取的消费进度大于 0
FLAG_SUSPEND
消息拉取时支持挂起
FLAG_SUBSCRIPTION
消息过滤机制为表达式
FLAG_CLASS_FILTER
消息过滤机制为类过滤
5. 调用 pullAPIWrapper.pullKernelImpl 与后端交互
6. 根据 broker name id 从 MQClientInstance 中获取 broker 地址
7. 如果消息过滤模式是类过滤模式,则需要找到 FilterServer 并从其上拉取消息
2. 消息服务端 broker 组装消息
处理消息拉取的入口
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest
0. 一系列的校验和检查
1. 根据订阅消息,构建消息过滤器
2. MessageStore().getMessage 查找消息
3. 根据主题名称、队列编号获取消息消费队列
4. 消息偏移情况校对下次拉取偏移量
1. maxOffset == 0
NO_MESSAGE_IN_QUEUE
2. offset < minOffset
OFFSET_TOO_SMALL
3. offset == maxOffset
OFFSET_OVERFLOW_ONE
4. offset > maxOffset
OFFSET_OVERFLOW_BADLY
5. 如果待拉取偏移量大于 minOffset 且小于 maxOffset,从当前 offset 开始拉取 32 条消息
6. 根据 PullRequest 填充 responseHeader
7. 根据主从同步延迟,如果从节点包含下一次拉取的偏移量,设置下一次拉取任务的 broker id
8. 根据 GetMessageResult 编码转换成 ResponseCode
9. 如果 commitlog 标记可用且当前节点为主节点,则更新消息消费进度
3. 消息拉取客户端处理消息
拉取到消息
1. 根据相应结果解码成 PullResultExt
2. 调用 pullAPIWrapper.processPullResult 将消息字节数组解码成消息列表 msgFoundList
并对消息进行过滤
3. 更新 pullRequest 的下次拉取偏移量,如果 msgFoundList 为空,
立即将 pullRequest 放到 pullRequestQueue 中
4. 拉取到的消息存入 ProcessQueue, 然后递交到 consumeMessageService 中供消费者消费
5. 将消息提交给消费者线程之后,pullCallBack 将立即返回,
然后根据 PullInterval 把 pullRequest 放到 pullRequestQueue
消息拉取异常处理
1. NO_NEW_MSG NO_MATCHED_MSG
使用服务端校正的偏移量进行下次拉取
2. OFFSET_ILLEGAL
使用服务端校正的偏移量进行下次拉取
processQueue drop 设置为 true
4. 消息拉取长轮询机制分析
RocketMQ 的推模式就是是通过主动的拉实现的
未开启长轮询
等待一段时间后再次判断,没有消息返回 PULL_NOT_FOUND
开启长轮询
等待 15s,有消息到达就判断是否是感兴趣的
轮询机制有两个线程共同完成
1. PullRequestHoldService, 每 5s 重试一次
2. DefaultMessageStore#ReputMessageService
5. PullRequestHoldService 线程详解
时间段重试
6. DefaultMessageStore#ReputMessageService 详解
消息到达就唤醒线程 notifyMessageArriving
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup
5.5 消息队列负载与重新分布
RocketMQ 消息队列重新分布是由 RebalanceService 线程来实现的。
一个 MQClientInstance 持有一个 RebalanceService 的实现,
并随着 MQClientInstance 的启动而启动
RebalanceService 每 20s 执行一次 this.mqClientFactory.doRebalance()
MQClientInstance 遍历注册的消费者,对消费者执行 impl.doRebalance()
每个 DefaultMQPushConsumerImpl 都单独持有一个 RebalanceImpl 对象,
遍历订阅信息对每个主题的队列进行重新负载。
重点分析 this.rebalanceByTopic(topic, isOrder)
1. 从主题订阅信息缓存表中获取主题的队列信息;
发送请求从 broker 中获取所有消费者 id
2. 首先对 cid 和 mqAll 排序,保证同一个消费组内看到的视图一致
执行分配算法
RocketMQ 5种分配算法
1. AllocateMessageQueueAveragely
2. AllocateMachineRoomNearby
3. AllocateMessageQueueAveragelyByCircle
4. AllocateMessageQueueByConfig
5. AllocateMessageQueueByMachineRoom
6. AllocateMessageQueueConsistentHash
3. ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable
当前消费者负载的消息队列缓存表
4. 遍历本次负载分配到的队列集合,对新增的消费队列,从磁盘读取消费进度,创建 pullRequest 对象
校对消费进度
CONSUME_FROM_LAST_OFFSET
最大偏移量开始消费
CONSUME_FROM_FIRST_OFFSET
从头开始消费
CONSUME_FROM_TIMESTAMP
消费者启动时间开始消费
5. 将 pullRequest 加入到 PullMessageService 中
5.6 消息消费过程
消息拉取: PullMessageService 负责对消息队列进行消息拉取,
从远端服务器拉取消息之后存入 ProcessQueue 消息处理队列,
然后调用 ConsumeMessageService#submitConsumeRequest 方法进行消费,
使用线程池来消费,确保消息拉取和消费的解耦。
ConsumeMessageService 核心方法
1. ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
直接消费消息,主要用于通过管理命令收到消费消息
2. void submitConsumeRequest(final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume);
递交消息消费
ConsumeMessageConcurrentlyService 核心参数
1. DefaultMQPushConsumerImpl defaultMQPushConsumerImpl
消息推模式实现类
2. DefaultMQPushConsumer defaultMQPushConsumer
消费者对象
3. MessageListenerConcurrently messageListener
并发消息业务事件类
4. BlockingQueue<Runnable> consumeRequestQueue
消息消费任务队列
5. ThreadPoolExecutor consumeExecutor
消息消费线程池
6. String consumerGroup
消费组
7. ScheduledExecutorService scheduledExecutorService
添加消费任务到 consumeExecutor 延迟调度器
8. ScheduledExecutorService cleanExpireMsgExecutors
定期删除过期消息的线程池
submitConsumeRequest 是消息消费的入口
5.6.1 消息消费
1. 消息数小于 this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize()
则直接递交
2. 消息数大于则分批次递交
3. 进入具体消息消费时先检查 processQueue 的 dropped, 如果为 true,
则停止消费该队列里的消息,在进行消息重新负载时如果该消息队列被分配给组内其他消费者,
则需要设置 dropped 为 true
4. 执行钩子函数 before
5. 恢复重试消息主题名
6. 具体的消息消费,调用具体应用的 consumeMessage
7. 执行钩子函数 after
8. 执行结束之后,处理结果之前,再次判断 processQueue 的 dropped 是否为 true,
防止重复消费
9. 根据消息监听器返回的结果计算 ackIndex
10.
11. 从 ProcessQueue 中移除这批消息,更新偏移量
5.6.2 消息确认 ACK
如果监听器返回 RECONSUME_LATER, 则需要将这些消息发送给 broker 延迟消息
客户端以同步方式发送 RequestCode.CONSUMER_SEND_MSG_BACK 到服务端,
服务端处理器
org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
1. 获取消费组的消费订阅信息,如果不存在就创建一个
2. 创建重试主题,从重试队列随机选择一个,构建 TopicConfig 主题配置信息
3. 根据消息物理偏移量从 commitlog 中获取消息,将消息的主题存入属性
4. 设置消息重试次数,超过则变为死信队列 DLQ ,需要人工干预
5. 根据原先的消息构建新的消息,重试消息有自己的 id 并存入 commitlog 中
6. 重试消息依托于定时任务
5.6.3 消息消费进度管理
RocketMQ 消息消费进度接口 OffsetStore
1. void load()
将消息进度存储文件加载到内存
2. void updateOffset(MessageQueue mq, long offset, boolean increaseOnly)
更新内存中的消息消费进度
3. long readOffset(final MessageQueue mq, final ReadOffsetType type)
读取消息消费进度
4. void persistAll(Set<MessageQueue> mqs)
5. void removeOffset(MessageQueue mq)
6. Map<MessageQueue, Long> cloneOffsetTable(String topic)
7. void updateConsumeOffsetToBroker(MessageQueue mq, long offset)
1. 广播模式消费进度存储
默认存储路径
~/.rocketmq_offsets
2. 集群模式消费进度存储
存在 broker 上
3. 消费进度设计思考
1. 每次以最小偏移量更新消费进度,而不是刚消费的消息的偏移量
2. 消息负载时,移除消费进度
5.7 定时消息机制
ScheduleMessageService 核心属性
SCHEDULE_TOPIC
定时消息统一主题
FIRST_DELAY_TIME
第一次调度时延时的时间
DELAY_FOR_A_WHILE
每一级别调度后,延迟该时间间隔后再放入调度池
DELAY_FOR_A_PERIOD
发送异常后,延迟该时间间隔后再继续参与调度
ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable
延迟级别
ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable
延迟级别消息消费进度
DefaultMessageStore defaultMessageStore
默认消息存储器
int maxDelayLevel
最大消息延迟级别
构造方法 -> load() -> start()
5.7.1 load() 方法
主要完成延迟消息消费队列进度的加载 与 delayLevelTable 数据的构造
5.7.2 start() 方法
根据延迟级别创建对应的延时任务,启动定时任务持久化延时消息队列进度存储
1. 根据延迟队列创建定时任务,遍历延迟级别,
根据延迟级别 level 从 offsetTable 中获取消费队列的消费进度。
每一个延迟级别对应一个消息消费队列
消息队列 id = 延迟级别 - 1
2. 创建定时任务,每 10s 持久化一次延迟消息消费进度
5.7.3 定时调度逻辑
DeliverDelayedMessageTimerTask#executeOnTimeup
1. 根据队列 id 与延迟主题查找消息消费队列,找不到即不存在该延迟级别消息
2. 根据 offset 从消息消费队列中获取当前队列中所有有效的消息
3. 遍历 ConsumeQueue, 每一个条目 20 个字节。解析出消息的物理偏移量、消息长度等,
未从 commitlog 读取消息做准备
4. 从 commitlog 读取消息
5. 构建新的消息对象,保留重试次数
6. 将新消息刷入 commitlog
7. 更新拉取进度
5.8 消息过滤机制
TAG / SQL92
MessageFilter
boolean isMatchedByConsumeQueue(final Long tagsCode, final ConsumeQueueExt.CqExtUnit cqExtUnit)
boolean isMatchedByCommitLog(final ByteBuffer msgBuffer, final Map<String, String> properties)
消息发送者如果在发送时设置了 tag 属性,存储时先存储在 commitlog 中, 然后转发到消息队列,队列会用 8 个字节存储消息
tag 的 hashcode,之所以不直接存 tag, 是因为 ConsumeQueue 设计为定长结构,加快消息消费的加载性能。
服务端
1. 消费者定于消息主题与消息过滤表达式,构建订阅信息并保存到 RebalanceImpl 中, 以便进行消息队列负载
SubscriptionData
1. String SUB_ALL
过滤模式,默认是全部
2. boolean classFilterMode
是否是类过滤模式
3. String topic
4. String subString
消息过滤表达式
5. Set<String> tagsSet
消息过滤 tag 集合
6. Set<Integer> codeSet
消息过滤 tag hashcode 集合
7. String expressionType
过滤类型,TAG 或 SQL92
2. 根据订阅消息构建消息拉取标记,设置 subExpression classFilter
3. 根据主题、消息过滤表达式构建消息订阅消息实体
4. 构建消息过滤对象
5. 根据偏移量拉取消息,先根据 ConsumeQueue 进行过滤,不匹配直接跳过
6. 如果批评则从 CommitLog 文件中加载整个消息体,然后根据属性进行过滤
客户端
5.9 顺序消息
消息消费包含四个步骤
消息队列负载
消息拉取
消息消费
消息消费进度存储
5.9.1 消息队列负载
RocketMQ 首先通过 RebalanceService 线程实现消息队列的负载,
集群模式下同一个消费组内的消费者共同承担其订阅的主题的消费队列的消费,
同一个队列同一时刻只能够被一个消费者消费,一个消费者可以同时消费多个队列。
负载后分配到新的队列时,要先向 broker 发起锁定该队列的请求。
(顺序消费和并发消费的第一个重要区别: 顺序消费在创建消息拉取队列时需要在 broker 服务器锁定该队列)
5.9.1 消息拉取
RocketMQ 消息拉取由 PullMessageService 线程负责,根据消息拉取任务循环拉取消息
如果消息队列未被锁定,则延迟 3s 将 PullRequest 放入拉取任务中,
如果该处理队列是第一次拉取任务,则首先计算偏移量,然后拉取消息
5.9.3 消息消费
ConsumeMessageOrderlyService
MAX_TIME_CONSUME_CONTINUOUSLY
每次消费任务最大持续时间
...
1. ConsumeMessageOrderlyService 构造方法
2. ConsumeMessageOrderlyService 启动方法
如果是集群模式,启动定时任务每 20s 执行一次锁定分配给自己的消费队列
1. lockMQPeriodically
processQueueTable, 将消息队列按照 broker 组织成 HashMap<String/* brokerName */, Set<MessageQueue>>
的形式,方便下一步向 broker 发送锁定请求
2. 向 broker (master 节点) 发送锁定消息队列,成功即被锁定
3. 将成功锁定的消息消费队列相对应的处理队列设置为锁定状态,同时更新加锁时间
4. 遍历当前处理队列中的消费队列,如果当前消费者不持有该对应的锁,将队列锁状态设置为 false ,
暂停其消息拉取与消费
3. ConsumeMessageOrderlyService 递交消费任务
构建消费任务 ConsumeRequest , 并提交到消费线程池中。
顺序消费的 ConsumeRequest 消费任务不会直接消费本次拉取的消息,
而是在消息消费时从处理队列中拉取
ConsumeRequest
1. 如果消息处理队列已被丢弃,则退出
2. 根据消息队列获取一个对象,然后在消费的时候申请一个独占锁 objLock。
顺序消息消费的并发度为消息队列,也就是一个消费队列同一时刻只有被一个线程池中的一个线程消费
3. 广播模式直接消费,无需锁定消费队列;
集群模式消费的前提是 processQueue 被锁定且未超时
4. 顺序消息消费处理逻辑,每一个 ConsumeRequest 消费任务不是以消息消费任务来计算的,
而是根据消费时间,默认当消费时常大于 MAX_TIME_CONSUME_CONTINUOUSLY,本次消费任务结束,
由组内其他线程继续消费
5. 每次从处理队列中顺序取出 consumeBatchSize 消息,
如果未取到,则设置 continueConsume 为 false , 本次消费结束;
顺序消息消费时,从 ProcessQueue 中取出的消息,
会临时存储在 ProcessQueue 的 consumingMsgOrderlyTreeMap 中
6. 执行钩子函数
7. 申请消费锁,执行消息监听器
8. 执行钩子函数
9. SUCCESS 时,执行 commit 方法
从 ProcessQueue 移除消息,维护 msgCount , 清空 consumingMsgOrderlyTreeMap
1. 检查消息的重试次数
2. 消息消费重试
10. 存储消息消费进度
5.9.4 消息队列锁实现
顺序消息消费的各个环节,基本都是围绕消息消费队列 MessageQueue 与
消息处理队列 ProcessQueue 展开的。消费进度拉取、进度拉取都需要判断
ProcessQueue 的锁 locked 是否为 true , 设置 ProcessQueue 为 true
的前提条件是消息消费者向 broker 端发送锁定请求,并返回加锁成功
5.10 本章小结
第 6 章 消息过滤 FilterSercer
略
第 7 章 RocketMQ 主从同步(HA)机制
7.1 RocketMQ 主从复制原理
RocketMQ HA 由 7 个核心类实现
1. HAService
RocketMQ 主从同步核心实现类
2. HAService$AcceptSocketService
HA Master 端监听客户端链接实现类
3. HAService$GroupTransferService
主从同步通知类
4. HAService$HAClient
HAClient 端实现类
5. HAConnection
HA Master 服务端 HA 链接对象的封装,与 Broker 从服务器的网络读写实现类
6. HAConnection$ReadSocketService
HA Master 网络读实现类
7. HAConnection$WriteSocketService
HA Master 网络写实现类
7.1.1 HAService 整体工作机制
1. 主服务器启动,并在特定端口等待从服务器的连接
2. 从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关 TCP 连接
3. 从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器
4. 从服务器保存消息并继续发送新的同步请求
7.1.2 AcceptSocketService 实现原理
AcceptSocketService 作为 HAService 内部类,实现 master 端监听 slave 连接
1. SocketAddress socketAddressListen
broker 服务器监听套接字 (本地 ip + 端口号)
2. ServerSocketChannel serverSocketChannel
服务端 socket 通道,基于 NIO
3. Selector selector
事件选择器,基于 NIO
beginAccept()
创建 ServerSocketChannel , 创建 Selector , 设置 TCP reuseAddress ,
绑定监听端口, 设置为非阻塞模式, 并注册 OP_ACCEPT
run()
该方法是标准的基于 NIO 的服务端程式实例,选择器每 1s 处理一次连接就绪事件。
连接事件就绪后,调用 ServerSocketChannel 的 accept() 方法创建 SocketChannel 。
然后为每一个连接创建一个 HAConnection 对象,负责 M-S 数据同步逻辑。
7.1.3 GroupTransferService 实现原理
GroupTransferService 主从同步阻塞实现,如果是同步主从模式,消息发送者将消息刷写到磁盘之后,
需要继续等待新数据被传输到从服务器,从服务器的数据的复制是在另外一个线程 HAConnection 中去拉取,
所以消息发送者在这里等待消息发送的结果, GroupTransferService 就实现该功能。
GroupTransferService 的职责是当主从同步复制结束后,通知由于等待 HA 同步结果而阻塞的消息发送线程。
判断是否同步完成的标准是 slave 中已成功复制的最大偏移量 是否大于等于
发送者发送消息后服务端返回的下一个消息的起始偏移量
7.1.4 HAClient 实现原理
1. READ_MAX_BUFFER_SIZE
socket 读缓存区大小
2. masterAddress
3. ByteBuffer reportOffset
slave 向 master 发起主从同步的拉取偏移量
4. SocketChannel socketChannel
网络传输通道
5. Selector selector
6. long lastWriteTimestamp
7. long currentReportedOffset
反馈 slave 当前的复制进度,commitlog 文件当前最大偏移量
8. int dispatchPosition
本次已处理读缓存区的指针
9. ByteBuffer byteBufferRead
读缓存区
10. ByteBuffer byteBufferBackup
读缓存区备份
run()
1. connectMaster()
slave 服务器连接 master 服务器
2. isTimeToReportOffset
判断是否需要向 master 发送当前待拉取偏移量
3. reportSlaveMaxOffset
向 master 服务器反馈拉取偏移量
4. this.selector.select(1000);
进行时间选择
5. processReadEvent
处理网络读请求
7.1.5 HAConnection 实现原理
Master 服务器收到从服务器的连接之后,会把 socketChannel 封装成 HAConnection 对象,
实现读写操作
ReadSocketService
WriteSocketService
7.2 RocketMQ 读写分离机制
当前需要拉取的消息 > 常驻内存的大小 表示主服务器繁忙,建议从 slave 拉取
第 8 章 RocketMQ 事务消息
8.1 事务消息实现思想
RocketMQ 事务消息的实现原理是基于 两阶段提交 和 定时事务状态回查 来觉得最终是回滚还是递交
8.2 事务消息发送流程
org.apache.rocketmq.client.producer.TransactionMQProducer
1. TransactionMQProducer
1. TransactionListener transactionListener
事务监听器
2. ExecutorService executorService
事务状态回查异步执行线程池
2. TransactionListener
1. LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
执行本地事务
2. LocalTransactionState checkLocalTransaction(final MessageExt msg);
事务消息状态回查
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
1. 首先为消息添加属性 TRAN_MSG PGROUP
2. 根据消息发送结果执行相应逻辑
1. 成功,则执行 executeLocalTransaction 记录事务消息的本地事务状态
2. 失败则置为回滚状态
3. 结束事务
TransactionMessageBridge
替换主题和队列,用定时任务处理事务消息
8.3 递交或回滚事务
org.apache.rocketmq.broker.processor.EndTransactionProcessor
1. 首先从结束事务请求命令中获取消息的物理偏移量
2. 然后恢复消息的主题和消费队列,构建新的消息
3. 将消息再次存储在 commitlog 中
4. 消息存储后,处理 prepare 消息
8.4 事务消息回查事务状态
TODO