rocketmq
rocketmq copied to clipboard
[ISSUE #4382]Namesrv nearby route
RocketMQ同机房就近生产和消费机制。
RocketMQ注册、发现机制
- Broker
- 间隔30秒发送broker上的Topic全量信息到所有Nameserv。broker启动的时候也是延迟注册到Namesrv上的。当有topic发生变更时TopicConfigManager.dataVersion发生变更。
- 当有topic创建和更新时,broker立刻发送Topic增量信息到所有Namesrv。同时触发Namesrv的BrokerLiveInfo.dataVersion发生变更。
- Namesrv
- 保存集群和broker关系、broker信息(brokerName和地址)、broker真实连接和topic版本号,topic信息。
- 调度线程每10秒钟检查一次,如果发现一个broker 120秒都没有更新注册信息,则删除和这个broker相关的所有信息。
- Producer&Consumer
- Producer第一次发送Message时,会同步向namesrv请求topic路由信息
- Consumer启动过程中会向namesrv请求topic路由信息
- 每隔30秒向namesrv拉取一次topic路由信息,根据路由信息内容(不是版本号)判断是否变化,发生变化更新
- 总结: Producer第一次发送消息时会同步向namesrv请求topic路由信息,然后默认轮询往每个Queue发送消息。 Consumer启动时获取topic路由信息,然后请求broker获取consumer实例列表,最后根据consumer的数量和负载均衡算法给consumer分配queue。
实现rocketmq同机房就近生产和消费:
1.broker增加一个zoneName标识配置,slave的zoneName保持和master相同,通过heartbeat注册到namesrv上。 2.Producer和Consumer可以通过API或者环境变量、系统变量设置zoneName和zoneMode(这是一个开关)。 3.Producer和Consumer获取topic路由信息时,在namesrv上返回对应标识的broker信息。
功能开启方式
broker启动增加系统参数 -Drocketmq.zone=bj
producer或者consumer启动增加系统参数 -Drocketmq.zone=bj -Drocketmq.zone.mode=true #功能开关
Coverage decreased (-0.04%) to 49.124% when pulling 0d934ab4e1190561c008b5af1fec7c6204467e0f on lwclover:develop into 273f274ec89aa3097c977ffbc38901fdb88950e8 on apache:develop.
Codecov Report
Merging #4383 (05f64a9) into develop (418a5b2) will decrease coverage by
0.09%
. The diff coverage is22.98%
.
@@ Coverage Diff @@
## develop #4383 +/- ##
=============================================
- Coverage 48.19% 48.09% -0.10%
Complexity 5129 5129
=============================================
Files 649 651 +2
Lines 43037 43096 +59
Branches 5629 5642 +13
=============================================
- Hits 20740 20727 -13
- Misses 19784 19859 +75
+ Partials 2513 2510 -3
Impacted Files | Coverage Δ | |
---|---|---|
...c/main/java/org/apache/rocketmq/common/MixAll.java | 44.03% <ø> (ø) |
|
...rg/apache/rocketmq/common/constant/LoggerName.java | 0.00% <0.00%> (ø) |
|
...g/apache/rocketmq/common/protocol/RequestCode.java | 0.00% <ø> (ø) |
|
...ketmq/common/rpchook/DynamicalExtFieldRPCHook.java | 0.00% <0.00%> (ø) |
|
...va/org/apache/rocketmq/namesrv/NamesrvStartup.java | 21.05% <ø> (ø) |
|
...pache/rocketmq/namesrv/route/ZoneRouteRPCHook.java | 2.50% <2.50%> (ø) |
|
...he/rocketmq/broker/plugin/MessageStoreFactory.java | 15.38% <15.38%> (ø) |
|
...che/rocketmq/common/protocol/route/BrokerData.java | 36.95% <20.00%> (-1.14%) |
:arrow_down: |
...ketmq/broker/plugin/MessageStorePluginContext.java | 70.00% <70.00%> (ø) |
|
...org/apache/rocketmq/broker/out/BrokerOuterAPI.java | 46.47% <100.00%> (ø) |
|
... and 18 more |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact)
,ø = not affected
,? = missing data
Powered by Codecov. Last update 418a5b2...05f64a9. Read the comment docs.
good job
I finally found this function. What a surprise
@lwclover I have read all the code. This solution is simple and useful in some cases.
And the same time, there are two potential problems.
Firstly, in the public cloud or hybrid cloud, with SLB/PrivateLink, the network may be complicated, and we may not able to identify the client/broker by their remote address. So we may need a more common way to identify the client/broker. For example, we could abstract a config named "site", and the client/broker inject the "site" info to the extFields of RemotingCommand.
Secondly, we may need a way to solve the conflict between consumers of the same group in different networks. For example, some consumers on site A, some consumers on site global.
@lwclover I have read all the code. This solution is simple and useful in some cases.
And the same time, there are two potential problems.
Firstly, in the public cloud or hybrid cloud, with SLB/PrivateLink, the network may be complicated, and we may not able to identify the client/broker by their remote address. So we may need a more common way to identify the client/broker. For example, we could abstract a config named "site", and the client/broker inject the "site" info to the extFields of RemotingCommand.
Secondly, we may need a way to solve the conflict between consumers of the same group in different networks. For example, some consumers on site A, some consumers on site global.
@dongeforever Thank you for reviewing my code. For your first suggestion, My goal is to be transparent to the client,if I abstract a config named "site", client/broker need config it.
For your sencod suggestion,I agree with you. client may abstract a config named "disabledNearbyRouteWhenTurnedOn=true".
@lwclover This solution is enough for an inner network inside an organization. It may not be enough for common use cases.
And this issue is so important that it is worth adding a config in the client/broker.
It will nice if you could move forward with a more general solution and I'm glad to help you.
IMO, a RPCHook will be a nice way to add the extField to the API Headers.
@lwclover This solution is enough for an inner network inside an organization. It may not be enough for common use cases.
And this issue is so important that it is worth adding a config in the client/broker.
It will nice if you could move forward with a more general solution and I'm glad to help you.
IMO, a RPCHook will be a nice way to add the extField to the API Headers.
The optimized version has been submitted
I think we need more discussions about this feature and implementation. I have two points:
- Is there any chance to implement this feature through the client hooks, e.g.,
QueueSelector
for the producer andAllocateMessageQueueStrategy
for the consumer. - The associated implementation seems to introduce more complexity for both client and server, if we really want to bring in this feature, can we consider only implementing it on the server-side, e.g., control the topic route info from namesrv follow the az info of clients?
IMO, we can refer to affinity/anti-affinity in k8s. each broker can be labeled some tag, such as idc, rack and so on. namesrv hold broker's tag info through heartbeat. And then, client request route with tags and tolerations through rpchook.
IMO, we can refer to affinity/anti-affinity in k8s. each broker can be labeled some tag, such as idc, rack and so on. namesrv hold broker's tag info through heartbeat. And then, client request route with tags and tolerations through rpchook.
Yes,That's how I implemented it, I just didn't change the description of the implementation.
I think we need more discussions about this feature and implementation. I have two points:
- Is there any chance to implement this feature through the client hooks, e.g.,
QueueSelector
for the producer andAllocateMessageQueueStrategy
for the consumer.- The associated implementation seems to introduce more complexity for both client and server, if we really want to bring in this feature, can we consider only implementing it on the server-side, e.g., control the topic route info from namesrv follow the az info of clients?
My original intention was that the user would not need to pay attention to such intricate details
@lwclover Currently, we need to make the feature simple enough and prove it along the time. In the client and broker, we just add a general rpcHook and do not introduce explicit fields. Write all the logic inside the nameserver to control the code propagation.
BTW, the rpcHook could be a general DynamicalExtFieldRPCHook, loading the extent fields from -D, env, etc files.
OK,I will do it
@lwclover It seems ok.
BTW, some trivial code polishments could be done to make the code clear.
LGTM, but would you like to rebase your codebase first? it seems this PR contained so many other's commits
LGTM, but would you like to rebase your codebase first? it seems this PR contained so many other's commits
Next time. haha