rocketmq-client-cpp
rocketmq-client-cpp copied to clipboard
[ISSUE #479]fix: duplicate pulls due to concurrency issues in rebalance.
fix #479
根因分析
根因:AsyncPullCallback 构建存在并发问题,导致多个 pull 线程消费同一个队列。
- 每个 queue 对应一个 PullRequest,Rebalance 负责添加和删除队列,并对每个队列对应的 PullRequest 进行初始化,或配置状态为 Drop。
- 消息拉取依赖 PullRequest 的状态,如果 PullRequest 状态不为 Drop,会重复利用这个 PullRequest 进行循环拉取。
- C++ sdk 中每个队列对应的 PullRequest 是 AsyncPullCallback 的局部变量,AsyncPullCallback 会存储在一个本地缓存中(<mq, AsyncPullCallback>)
- Rebalance 添加队列时,会生成一个新的 pullRequest 替换缓存中 AsyncPullCallback 的 pullRequest。
- Pull 是否结束是依赖 AsyncPullCallback 中的 pullRequest 是否 Drop。 通过以上 5 个条件:如果两次 Rebalance 相隔时间很短,mq 对应的之前的 PullRequest 还来不及因设置为 drop 而结束当前的拉取。第二次 rebalance 就替换掉了 AsyncPullCallback 中的局部变量 PullRequest,导致上一次的拉取请求又拿到了非 drop 的 pullRequest。此时就会出现 2 个 pullRequest 同时拉取同一个队列的情况。依次类推,如果重复多次短时间 rebalance,单个分区可能存在多个 pullRequest 同时拉取。
新的 pullRequest 替换掉旧的 pullRequest, drop 状态被配置成了 false
上次还没处理结束的拉取线程因为状态变化不能正常退出
@ShannonDing @ifplusor Could you please help to review this pr?
PullRequest是由Rebalance::m_requestQueueTable持有的,AsyncPullCallback中只是弱指针引用。
我认为问题在于sdk中在复用AsyncPullCallback时,跨越了PullRequest的生命周期。理想的方案应该是PullRequest持有一个AsyncPullCallback用于复用。