rocketmq-client-cpp icon indicating copy to clipboard operation
rocketmq-client-cpp copied to clipboard

[ISSUE #479]fix: duplicate pulls due to concurrency issues in rebalance.

Open humkum opened this issue 1 year ago • 2 comments

fix #479

根因分析

根因:AsyncPullCallback 构建存在并发问题,导致多个 pull 线程消费同一个队列。

  • 每个 queue 对应一个 PullRequest,Rebalance 负责添加和删除队列,并对每个队列对应的 PullRequest 进行初始化,或配置状态为 Drop。
  • 消息拉取依赖 PullRequest 的状态,如果 PullRequest 状态不为 Drop,会重复利用这个 PullRequest 进行循环拉取。
  • C++ sdk 中每个队列对应的 PullRequest 是 AsyncPullCallback 的局部变量,AsyncPullCallback 会存储在一个本地缓存中(<mq, AsyncPullCallback>)
  • Rebalance 添加队列时,会生成一个新的 pullRequest 替换缓存中 AsyncPullCallback 的 pullRequest。
  • Pull 是否结束是依赖 AsyncPullCallback 中的 pullRequest 是否 Drop。 通过以上 5 个条件:如果两次 Rebalance 相隔时间很短,mq 对应的之前的 PullRequest 还来不及因设置为 drop 而结束当前的拉取。第二次 rebalance 就替换掉了 AsyncPullCallback 中的局部变量 PullRequest,导致上一次的拉取请求又拿到了非 drop 的 pullRequest。此时就会出现 2 个 pullRequest 同时拉取同一个队列的情况。依次类推,如果重复多次短时间 rebalance,单个分区可能存在多个 pullRequest 同时拉取。

新的 pullRequest 替换掉旧的 pullRequest, drop 状态被配置成了 false image 上次还没处理结束的拉取线程因为状态变化不能正常退出 image

humkum avatar Dec 04 '24 02:12 humkum

@ShannonDing @ifplusor Could you please help to review this pr?

humkum avatar Dec 19 '24 12:12 humkum

PullRequest是由Rebalance::m_requestQueueTable持有的,AsyncPullCallback中只是弱指针引用。 我认为问题在于sdk中在复用AsyncPullCallback时,跨越了PullRequest的生命周期。理想的方案应该是PullRequest持有一个AsyncPullCallback用于复用。

ifplusor avatar Dec 21 '24 09:12 ifplusor