Shellbye.github.io
Shellbye.github.io copied to clipboard
《RocketMQ 实战与原理解析》读书笔记
第1章 快速入门
1.1 消息队列功能介绍
1.1.1 应用解耦
1.1.2 流量消峰
1.1.3 消息分发
1.2 RocketMQ简介
1.3 快速上手RocketMQ
1.3.1 RocketMQ的下载、安装和配置
1.3.2 启动消息队列服务
启动NameServer:
> nohup sh bin/mqnamesrv &
启动Broker:
> nohup sh bin/mqbroker –n localhost:9876&
1.3.3 用命令行发送和接收消息
1.3.4 关闭消息队列
> sh bin/mqshutdown broker
> sh bin/mqshutdown namesrv
第2章 生产环境下的配置和使用
2.1 RocketMQ各部分角色介绍
2.2 多机集群配置和部署
2.2.1 启动多个NameServer和Broker
2.2.2 配置参数介绍
2.3 发送/接收消息示例
2.4 常用管理命令
1. 创建/修改Topic
2. 删除Topic
等其他
2.5 通过图形界面管理集群
第3章 用适合的方式发送和接收消息
3.1 不同类型的消费者
3.1.1 DefaultMQPushConsumer 的使用
3.1.2 DefaultMQPushConsumer 的处理流程
通过“长轮询”方式达到Push效果的方法,长轮询方式既有Pull的优点,又兼具Push方式的实时性
Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。
对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:
首先是加大Server端的工作量,进而影响Server的性能;
其次,Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题
Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。
Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;
每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。
“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又能达到保证实时性的目的
语句requestHeader.setSuspendTimeoutMillis(brokerSus-pendMaxTimeMillis),
作用是设置Broker最长阻塞时间,默认设置是15秒,注意是Broker在没有新消息的时候才阻塞,有消息会立刻返回
“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。
“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。
长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中
3.1.3 DefaultMQPushConsumer的流量控制
RocketMQ定义了一个快照类ProcessQueue来解决这些问题,在PushConsumer运行的时候,
每个Message Queue都会有个对应的ProcessQueue对象,保存了这个Message Queue消息处理状态的快照
ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。
TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,
保存了所有从MessageQueue获取到,但是还未被处理的消息;
读写锁控制着多个线程对TreeMap对象的并发访问。
有了ProcessQueue对象,流量控制就方便和灵活多了,
PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,
任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的
3.1.4 DefaultMQPullConsumer
3.1.5 Consumer的启动、关闭流程
3.2 不同类型的生产者
3.2.1 DefaultMQProducer
3.2.2 发送延迟消息
3.2.3 自定义消息发送规则
3.2.4 对事务的支持
1)发送方向RocketMQ发送“待确认”消息。
2)RocketMQ将收到的“待确认”消息持久化成功后,
向发送方回复消息已经发送成功,此时第一阶段消息发送完成。
3)发送方开始执行本地事件逻辑。
4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,
RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;
收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。
5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,
服务器在经过固定时间段后将对“待确认”消息发起回查请求。
6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,
回查请求将被发送到和Producer在同一个Group里的其他Producer),
通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。
7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理
3.3 如何存储队列位置信息
RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,
一般一个Topic会有多个Message Queue(也可以设置成一个),
Offset是指某个Topic下的一条消息在某个Message Queue里的位置,
通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理
3.4 自定义日志输出
第4章 分布式消息队列的协调者
4.1 NameServer的功能
4.1.1 集群状态的存储结构
4.1.2 状态维护逻辑
当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,
把这个Broker的信息清理出去。
NameServer还有定时检查时间戳的逻辑,Broker向NameServer发送的心跳会更新时间戳,
当NameServer检查到时间戳长时间没有更新后,便会触发清理逻辑
4.2 各个角色间的交互流程
4.2.1 交互流程源码分析
4.2.2 为何不用ZooKeeper
4.3 底层通信机制
4.3.1 Remoting模块
RemotingService为最上层接口,定义了三个方法:
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook)
RocketMQ各个模块间的通信,可以通过发送统一格式的自定义消息(RemotingCommand)来完成,各个模块间的通信实现简洁明了
4.3.2 协议设计和编解码
RocketMQ自己定义了一个通信协议,使得模块间传输的二进制消息和有意义的内容之间互相转换
|------------------------------------------|
| length | header length | header | body |
|------------------------------------------|
1)第一部分是大端4个字节整数,值等于第二、三、四部分长度的总和;
2)第二部分是大端4个字节整数,值等于第三部分的长度;
3)第三部分是通过Json序列化的数据;
4)第四部分是通过应用自定义二进制序列化的数据
4.3.3 Netty库
第5章 消息队列的核心机制
5.1 消息存储和发送
一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read(file,tmp_buf,len);,读取本地文件内容;
2)write(socket,tmp_buf,len);,将读取的内容通过网络发送出去。
tmp_buf是预先申请的内存,这两个看似简单的操作,实际进行了4次数据复制,分别是:
1. 从磁盘复制数据到内核态内存
2. 从内核态内存复制到用户态内存(完成了read(file,tmp_buf,len))
3. 然后从用户态内存复制到网络驱动的内核态内存
4. 最后是从网络驱动的内核态内存复制到网卡中进行传输(完成write(socket,tmp_buf,len))
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。
这种机制在Java中是通过MappedByteBuffer实现的,
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度
5.2 消息存储结构
消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,
类似数据库的索引文件,存储的是指向物理存储的地址
存储机制这样设计有以下几个好处:
1)CommitLog顺序写,可以大大提高写入效率。
2)虽然是随机读,但是利用操作系统的pagecache机制,
可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
3)为了保证完全的顺序写,需要ConsumeQueue这个中间结构,
因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,
大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,
可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,
CommitLog里存储了Consume Queues、Message Key、Tag等所有信息,
即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来
5.3 高可用性机制
5.4 同步刷盘和异步刷盘
SYNC_FLUSH
ASYNC_FLUSH
5.5 同步复制和异步复制
同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态
异步复制方式是只要Master写成功即可反馈给客户端写成功状态
第6章 可靠性优先的使用场景
6.1 顺序消息
6.1.1 全局顺序消息
要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。
简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。
这时高并发、高吞吐量的功能完全用不上了
6.1.2 部分顺序消息
发送端使用MessageQueueSelector类来控制把消息发往哪个Message Queue
消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题
MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,
需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,
同一个Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理
6.2 消息重复问题
6.3 动态增减机器
6.3.1 动态增减NameServer
6.3.2 动态增减Broker
6.4 各种故障对消息的影响
6.5 消息优先级
第7章 吞吐量优先的使用场景
7.1 在Broker端进行消息过滤
7.1.1 消息的Tag和Key
服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高
Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。
Broker会创建专门的索引文件,来存储Key到消息的映射,
由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突
7.1.2 通过Tag进行过滤
7.1.3 用SQL表达式的方式进行过滤
SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有Tag方式高效
7.1.4 Filter Server方式过滤
7.2 提高Consumer处理能力
(1)提高消费并行度
注意总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息
(2)以批量方式进行消费
(3)检测延时情况,跳过非重要消息
7.3 Consumer的负载均衡
7.3.1 DefaultMQPushConsumer的负载均衡
7.3.2 DefaultMQPullConsumer的负载均衡
7.4 提高Producer的发送速度
可靠性要求不高的场景下,可以采用Oneway方式发送,Oneway方式只发送请求不等待应答,
即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果,
用这种方式发送消息的耗时可以缩短到微秒级
另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,
我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,
在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能
7.5 系统性能调优的一般流程
(1)使用TOP命令查看CPU和内存的利用率
(2)使用Linux的sar命令查看网卡使用情况
(3)用iostat查看磁盘的使用情况
第8章 和其他系统交互
略
第9章 首个Apache中间件顶级项目
9.1 RocketMQ的前世今生
9.2 Apache顶级项目(TLP)之路
9.3 源码结构
9.4 不断迭代的代码
第10章 NameServer源码解析
10.1 模块入口代码的功能
10.1.1 入口函数
NamesrvStartup是模块的启动入口,NamesrvController是用来协块各个调模功能的代码
10.1.2 解析命令行参数
10.1.3 初始化NameServer的Controller
10.2 NameServer的总控逻辑
NamesrvController.java
10.3 核心业务逻辑处理
DefaultRequestProcessor.java
10.4 集群状态存储
RouteInfoManager
第11章 最常用的消费类
11.1 整体流程
11.1.1 上层接口类
11.1.2 DefaultMQPushConsumer的实现者
DefaultMQPushConsumerImpl
11.1.3 获取消息逻辑
11.2 消息的并发处理
11.2.1 并发处理过程
MessageConcurrentlyService
这个类定义了三个线程池,
一个主线程池用来正常执行收到的消息,
用户可以自定义通过consumeThreadMin和consumeThreadMax来自定义线程个数。
另外两个都是单线程的线程池,
一个用来执行推迟消费的消息,
另一个用来定期清理超时消息
11.2.2 ProcessQueue对象
ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。
TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,
保存了所有从MessageQueue获取到但是还未被处理的消息,
读写锁控制着多个线程对TreeMap对象的并发访问
11.3 生产者消费者的底层类
11.3.1 MQClientInstance类的创建规则
MQClientInstance是客户端各种类型的Consumer和Producer的底层类。
这个类首先从NameServer获取并保存各种配置信息,比如Topic的Route信息。
同时MQClientInstance还会通过MQClientAPIImpl类实现消息的收发,
也就是从Broker获取消息或者发送消息到Broker。
既然MQClientInstance实现的是底层通信功能和获取并保存元数据的功能,
就没必要每个Consumer或Producer都创建一个对象,
一个MQClientInstance对象可以被多个Consumer或Producer公用。
RocketMQ通过一个工厂类达到共用MQClientInstance的目的。
11.3.2 MQClientInstance类的功能
pullMessageService
rebalanceService
定时进行如下几个操作:
获取NameServer地址
更新TopicRoute信息
清理离线的Broker
保存消费者的Offset
第12章 主从同步机制
12.1 同步属性信息
在syncAll函数里,调用一下方法进行元数据同步
syncTopicConfig()
syncConsumerOffset()
syncDelayOffset()
syncSubscriptionGroupConfig()
采用基于Netty的command方式来同步消息
12.2 同步消息体
CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接(基于Java NIO),这样效率更高
12.3 sync_master和async_master
在CommitLog类的putMessage函数末尾,调用handleHA函数。
代码中的关键词是wakeupAll和waitForFlush,在同步方式下,
Master每次写消息的时候,都会等待向Slave同步消息的过程,同步完成后再返回
第13章 基于Netty的通信实现
13.1 Netty介绍
13.2 Netty架构总览
13.2.1 重新实现ByteBuffer
在网络通信中,CPU处理数据的速度大大快于网络传输数据的速度,所以需要引入缓冲区,
将网络传输的数据放入缓冲区,累积足够的数据再发给CPU处理
重新实现的ByteBuf特性包括允许使用自定义的缓存类型、透明的零拷贝实现、比ByteBuffer更快的响应速度等
13.2.2 统一的异步I/O接口
Netty有一个被称为Channel的统一异步I/O编程接口,这个编程接口抽象了所有点对点的通信操作
13.2.3 基于拦截链模式的事件模型
13.3 Netty用法示例
略
13.4 RocketMQ基于Netty的通信功能实现
13.4.1 顶层抽象类
RocketMQ通信模块的顶层结构是RemotingServer和RemotingClient,分别对应通信的服务端和客户端
RemotingServer类中比较重要的是:
localListenPort
registerProcessor
registerDefaultProcessor
registerDefaultProcessor
用来设置接收到消息后的处理方法
RemotingClient类和RemotingServer类相对应,比较重要的方法是
updateNameServerAddressList
用来获取有效的NameServer地址
invokeSync
invokeOneway
用来向Server端发送请求
13.4.2 自定义协议
13.4.3 基于Netty的Server和Client