rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[Bug] broker开始代理模式,启动消费者时,不会生成消费者组对应的重试队列:%RETRY%<ConsumerGroup>,导致无法查询消费者组的消费详情

Open onejimmyboy opened this issue 1 year ago • 1 comments

Before Creating the Bug Report

  • [X] I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • [X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

centos7

RocketMQ version

RocketMQ 5.1.3 集群模式:单机版+Local Proxy

JDK Version

1.8

Describe the Bug

rocketmq开启Proxy,消费者连接8081Endpoints消费时,不会自动生成以消费者组的重试队列:%RETRY%+consumerGroup。 而在查询消费者组消费状态examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo的时候都会使用到%RETRY%+consumerGroup。如下: @Override public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { TopicRouteData topicRouteData = null; List<String> routeTopics = new ArrayList<>(); routeTopics.add(MixAll.getRetryTopic(consumerGroup)); .........

@Override public ConsumerConnection examineConsumerConnectionInfo( String consumerGroup) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ConsumerConnection result = new ConsumerConnection(); String topic = MixAll.getRetryTopic(consumerGroup); List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas(); ............

public static String getRetryTopic(final String consumerGroup) { return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; }

另外:使用endpoints消费消息时,如果消费失败,会生成MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;这个topic。

Steps to Reproduce

1:启动broker 2:启动消费者 3:生产消息 5:使用rocketmq-console查看消费者组信息

What Did You Expect to See?

消费者启动时,生成%RETRY%+consumerGroup的重试topic,examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo接口查询有返回值

What Did You See Instead?

消费者启动时,没有生成%RETRY%+consumerGroup的重试topic,examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo接口查询没有返回值

Additional Context

消费者代码: @Bean(name = "MyConsumer") public void MyConsumer(){ log.info("MyConsumer start ..."); ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = new ClientConfigurationBuilder(); if (aclEnable){ SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey,secretKey); builder = ClientConfiguration.newBuilder() .setCredentialProvider(sessionCredentialsProvider) .setEndpoints(endPoints); }else { builder = ClientConfiguration.newBuilder() .setEndpoints(endPoints); } ClientConfiguration configuration = builder.build(); try { log.info("构建5.0消费者 endPoints:{},topic:{},consumerGroup:{}",endPoints,topic,consumerGroup); String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); provider.newPushConsumerBuilder() .setClientConfiguration(configuration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic,filterExpression)) .setMessageListener(messageView -> { log.info("接受到的完整消息:{}",messageView); String str = StandardCharsets.UTF_8.decode(messageView.getBody()).toString(); log.info("消息内容为:{}",str); return ConsumeResult.FAILURE; }).build(); log.info("构建5.0消费者成功");

    }catch (Exception e){
        log.error("构建5.0消费者异常",e);
    }
}

onejimmyboy avatar Aug 25 '23 01:08 onejimmyboy