rocketmq-client-go icon indicating copy to clipboard operation
rocketmq-client-go copied to clipboard

Trace topic's routeInfoData can never be updated

Open tt67wq opened this issue 3 years ago • 4 comments

BUG REPORT

  1. Please describe the issue you observed:

    • What did you do (The steps to reproduce)? I disable write permission for a broker.

    • What did you expect to see? All queues belong to this broker are not allowed to write.

    • What did you see instead? queues belongs to topic RMQ_SYS_TRACE_TOPIC still emit messages to this broker.

I read trace.go code for a while and found that routeInfoData for topic RMQ_SYS_TRACE_TOPIC will never update since first boot; Maybe we should add a timer to process UpdateTopicRouteInfo job every several seconds

tt67wq avatar Feb 16 '23 09:02 tt67wq

我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172 启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438
  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。
  3. 正常发送消息

francisoliverlee avatar Feb 21 '23 07:02 francisoliverlee

我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172

    启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438

  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。

  3. 正常发送消息

RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样

func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
	traceTopic := td.traceTopic
	if td.access == primitive.Cloud {
		traceTopic = td.traceTopic + regionID
	}
	msg := primitive.NewMessage(traceTopic, []byte(data))
	msg.WithKeys(keySet.slice())

	mq, addr := td.findMq(regionID)
	if mq == nil {
		return
	}

	var req = td.buildSendRequest(mq, msg)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
		cancel()
		resp := primitive.NewSendResult()
		if e != nil {
			rlog.Info("send trace data error.", map[string]interface{}{
				"traceData": data,
			})
		} else {
			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			rlog.Debug("send trace data success:", map[string]interface{}{
				"SendResult": resp,
				"traceData":  data,
			})
		}
	})
...
}

这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo

tt67wq avatar Feb 24 '23 06:02 tt67wq

我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172

    启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438

  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。

  3. 正常发送消息

RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样

func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
	traceTopic := td.traceTopic
	if td.access == primitive.Cloud {
		traceTopic = td.traceTopic + regionID
	}
	msg := primitive.NewMessage(traceTopic, []byte(data))
	msg.WithKeys(keySet.slice())

	mq, addr := td.findMq(regionID)
	if mq == nil {
		return
	}

	var req = td.buildSendRequest(mq, msg)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
		cancel()
		resp := primitive.NewSendResult()
		if e != nil {
			rlog.Info("send trace data error.", map[string]interface{}{
				"traceData": data,
			})
		} else {
			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			rlog.Debug("send trace data success:", map[string]interface{}{
				"SendResult": resp,
				"traceData":  data,
			})
		}
	})
...
}

这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo

分析完全正确, 这里是bug,非常欢迎你提个PR修复这个问题。

正常的producer在生产的时候会把自己注册到client的producerMap中,在第一次发送的时候,会拉取路由,更新在producerMap对应实例的路由信息。 以后是由定时任务检查client.producerMap中每个生产者的topic的路由信息。

traceDispatcher在初始化的时候,并没有初始化生产者, 注册生产者到producerMap。这样定时任务检查路由的机制是存在的,但是没有topic需要检查。

处理办法:仿照java的逻辑, 初始化一个真正的producer来发送trace信息, 而不是直接调用通信层的invokeSync。这样每个producer的全部topic可以正常更新路由。

java的trace producer: https://github.com/apache/rocketmq/blob/06f2208a34907211591114f6b0d327168c250fb3/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java#L62

francisoliverlee avatar Feb 27 '23 06:02 francisoliverlee

我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap, https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L172

    启动更新topic路由的TimeTicker https://github.com/apache/rocketmq-client-go/blob/c8d06a661a022097445cc7979290733a2cc86804/internal/client.go#L416-L438

  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。

  3. 正常发送消息

RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样

func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
	traceTopic := td.traceTopic
	if td.access == primitive.Cloud {
		traceTopic = td.traceTopic + regionID
	}
	msg := primitive.NewMessage(traceTopic, []byte(data))
	msg.WithKeys(keySet.slice())

	mq, addr := td.findMq(regionID)
	if mq == nil {
		return
	}

	var req = td.buildSendRequest(mq, msg)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
		cancel()
		resp := primitive.NewSendResult()
		if e != nil {
			rlog.Info("send trace data error.", map[string]interface{}{
				"traceData": data,
			})
		} else {
			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			rlog.Debug("send trace data success:", map[string]interface{}{
				"SendResult": resp,
				"traceData":  data,
			})
		}
	})
...
}

这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo

分析完全正确, 这里是bug,非常欢迎你提个PR修复这个问题。

正常的producer在生产的时候会把自己注册到client的producerMap中,在第一次发送的时候,会拉取路由,更新在producerMap对应实例的路由信息。 以后是由定时任务检查client.producerMap中每个生产者的topic的路由信息。

traceDispatcher在初始化的时候,并没有初始化生产者, 注册生产者到producerMap。这样定时任务检查路由的机制是存在的,但是没有topic需要检查。

处理办法:仿照java的逻辑, 初始化一个真正的producer来发送trace信息, 而不是直接调用通信层的invokeSync。这样每个producer的全部topic可以正常更新路由。

java的trace producer: https://github.com/apache/rocketmq/blob/06f2208a34907211591114f6b0d327168c250fb3/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java#L62

hi,您好,我尝试修复了这个bug,辛苦review PR,第一次参与该项目有不规范的地方请指出

xiaolibuzai-ovo avatar Sep 11 '23 01:09 xiaolibuzai-ovo