rocketmq
rocketmq copied to clipboard
[Bug] broker开始代理模式,启动消费者时,不会生成消费者组对应的重试队列:%RETRY%<ConsumerGroup>,导致无法查询消费者组的消费详情
Before Creating the Bug Report
-
[X] I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
[X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
[X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
centos7
RocketMQ version
RocketMQ 5.1.3 集群模式:单机版+Local Proxy
JDK Version
1.8
Describe the Bug
rocketmq开启Proxy,消费者连接8081Endpoints消费时,不会自动生成以消费者组的重试队列:%RETRY%+consumerGroup。 而在查询消费者组消费状态examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo的时候都会使用到%RETRY%+consumerGroup。如下: @Override public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { TopicRouteData topicRouteData = null; List<String> routeTopics = new ArrayList<>(); routeTopics.add(MixAll.getRetryTopic(consumerGroup)); .........
@Override public ConsumerConnection examineConsumerConnectionInfo( String consumerGroup) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ConsumerConnection result = new ConsumerConnection(); String topic = MixAll.getRetryTopic(consumerGroup); List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas(); ............
public static String getRetryTopic(final String consumerGroup) { return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; }
另外:使用endpoints消费消息时,如果消费失败,会生成MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;这个topic。
Steps to Reproduce
1:启动broker 2:启动消费者 3:生产消息 5:使用rocketmq-console查看消费者组信息
What Did You Expect to See?
消费者启动时,生成%RETRY%+consumerGroup的重试topic,examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo接口查询有返回值
What Did You See Instead?
消费者启动时,没有生成%RETRY%+consumerGroup的重试topic,examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo接口查询没有返回值
Additional Context
消费者代码: @Bean(name = "MyConsumer") public void MyConsumer(){ log.info("MyConsumer start ..."); ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = new ClientConfigurationBuilder(); if (aclEnable){ SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey,secretKey); builder = ClientConfiguration.newBuilder() .setCredentialProvider(sessionCredentialsProvider) .setEndpoints(endPoints); }else { builder = ClientConfiguration.newBuilder() .setEndpoints(endPoints); } ClientConfiguration configuration = builder.build(); try { log.info("构建5.0消费者 endPoints:{},topic:{},consumerGroup:{}",endPoints,topic,consumerGroup); String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); provider.newPushConsumerBuilder() .setClientConfiguration(configuration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic,filterExpression)) .setMessageListener(messageView -> { log.info("接受到的完整消息:{}",messageView); String str = StandardCharsets.UTF_8.decode(messageView.getBody()).toString(); log.info("消息内容为:{}",str); return ConsumeResult.FAILURE; }).build(); log.info("构建5.0消费者成功");
}catch (Exception e){
log.error("构建5.0消费者异常",e);
}
}