Trace topic's routeInfoData can never be updated
BUG REPORT
-
Please describe the issue you observed:
-
What did you do (The steps to reproduce)? I disable write permission for a broker.
-
What did you expect to see? All queues belong to this broker are not allowed to write.
-
What did you see instead? queues belongs to topic RMQ_SYS_TRACE_TOPIC still emit messages to this broker.
-
I read trace.go code for a while and found that routeInfoData for topic RMQ_SYS_TRACE_TOPIC will never update since first boot; Maybe we should add a timer to process UpdateTopicRouteInfo job every several seconds
我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:
- 初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172 启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438
- 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。
- 正常发送消息
我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:
初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172
启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438
第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。
正常发送消息
RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样
func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
traceTopic := td.traceTopic
if td.access == primitive.Cloud {
traceTopic = td.traceTopic + regionID
}
msg := primitive.NewMessage(traceTopic, []byte(data))
msg.WithKeys(keySet.slice())
mq, addr := td.findMq(regionID)
if mq == nil {
return
}
var req = td.buildSendRequest(mq, msg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
cancel()
resp := primitive.NewSendResult()
if e != nil {
rlog.Info("send trace data error.", map[string]interface{}{
"traceData": data,
})
} else {
td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
rlog.Debug("send trace data success:", map[string]interface{}{
"SendResult": resp,
"traceData": data,
})
}
})
...
}
这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo
我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:
初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172
启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438
第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。
正常发送消息
RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样
func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) { traceTopic := td.traceTopic if td.access == primitive.Cloud { traceTopic = td.traceTopic + regionID } msg := primitive.NewMessage(traceTopic, []byte(data)) msg.WithKeys(keySet.slice()) mq, addr := td.findMq(regionID) if mq == nil { return } var req = td.buildSendRequest(mq, msg) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) { cancel() resp := primitive.NewSendResult() if e != nil { rlog.Info("send trace data error.", map[string]interface{}{ "traceData": data, }) } else { td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg) rlog.Debug("send trace data success:", map[string]interface{}{ "SendResult": resp, "traceData": data, }) } }) ... }这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo
分析完全正确, 这里是bug,非常欢迎你提个PR修复这个问题。
正常的producer在生产的时候会把自己注册到client的producerMap中,在第一次发送的时候,会拉取路由,更新在producerMap对应实例的路由信息。 以后是由定时任务检查client.producerMap中每个生产者的topic的路由信息。
traceDispatcher在初始化的时候,并没有初始化生产者, 注册生产者到producerMap。这样定时任务检查路由的机制是存在的,但是没有topic需要检查。
处理办法:仿照java的逻辑, 初始化一个真正的producer来发送trace信息, 而不是直接调用通信层的invokeSync。这样每个producer的全部topic可以正常更新路由。
java的trace producer: https://github.com/apache/rocketmq/blob/06f2208a34907211591114f6b0d327168c250fb3/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java#L62
我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:
初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172
启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438
第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。
正常发送消息
RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样
func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) { traceTopic := td.traceTopic if td.access == primitive.Cloud { traceTopic = td.traceTopic + regionID } msg := primitive.NewMessage(traceTopic, []byte(data)) msg.WithKeys(keySet.slice()) mq, addr := td.findMq(regionID) if mq == nil { return } var req = td.buildSendRequest(mq, msg) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) { cancel() resp := primitive.NewSendResult() if e != nil { rlog.Info("send trace data error.", map[string]interface{}{ "traceData": data, }) } else { td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg) rlog.Debug("send trace data success:", map[string]interface{}{ "SendResult": resp, "traceData": data, }) } }) ... }这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo
分析完全正确, 这里是bug,非常欢迎你提个PR修复这个问题。
正常的producer在生产的时候会把自己注册到client的producerMap中,在第一次发送的时候,会拉取路由,更新在producerMap对应实例的路由信息。 以后是由定时任务检查client.producerMap中每个生产者的topic的路由信息。
traceDispatcher在初始化的时候,并没有初始化生产者, 注册生产者到producerMap。这样定时任务检查路由的机制是存在的,但是没有topic需要检查。
处理办法:仿照java的逻辑, 初始化一个真正的producer来发送trace信息, 而不是直接调用通信层的invokeSync。这样每个producer的全部topic可以正常更新路由。
java的trace producer: https://github.com/apache/rocketmq/blob/06f2208a34907211591114f6b0d327168c250fb3/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java#L62
hi,您好,我尝试修复了这个bug,辛苦review PR,第一次参与该项目有不规范的地方请指出