rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[Bug] RocketMQ consumer消费不到第一条消息

Open zooah212 opened this issue 2 years ago • 6 comments
trafficstars

Before Creating the Bug Report

  • [X] I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • [X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • [ ] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

CentOS Linux 7

RocketMQ version

5.1.3

JDK Version

JDK 8

Describe the Bug

全新的RocketMQ环境,里面没有Consumer和Topic。StringBoot项目集群RocketMQ5.1.3版本,首先启动Consumer,此时查看dashboard,能查到Consumer,Consumer要消费的Topic还没有创建;然后启动Producer,并发送一条消息,此时收不到第一条消息,然后Producer再发一条消息,Consumer能消费到消息; ConsumeFromWhere:Last_Offset 消费模式:广播模式和集群模式都会由此现象 如果先发消息,再启动Consumer,广播消费模式会消费到之前的数据;集群消费模式,虽然在dashboard中看到消息的状态是已消费,但是在程序中Consumer还是没有收到消息,不知道消息被谁消费了;

Steps to Reproduce

看bug描述

What Did You Expect to See?

即使是全新的环境,先启动consumer,也能正常消费到数据。

What Did You See Instead?

Additional Context

zooah212 avatar Aug 15 '23 07:08 zooah212

我遇到过,不过我是5.0版本。 我理解是第一条消息发送时由于topic没有创建,所以第一条消息去触发了topic的创建(其实是consume queue的初始化)然后由于topic还没有创建(在消息被broker接收到时)所以消息发送失败了(write consume queue fail),所以在这里应该加一个重试,我原本打算debug一下这块的细节,不过还没来得及做。 不过丢失了第一条消息确实是一个已知的问题。

SchopenhauerZhang avatar Aug 21 '23 12:08 SchopenhauerZhang

我遇到过,不过我是5.0版本。 我理解是第一条消息发送时由于topic没有创建,所以第一条消息去触发了topic的创建(其实是consume queue的初始化)然后由于topic还没有创建(在消息被broker接收到时)所以消息发送失败了(write consume queue fail),所以在这里应该加一个重试,我原本打算debug一下这块的细节,不过还没来得及做。 不过丢失了第一条消息确实是一个已知的问题。

感谢,我测试了 5.0.0 和 5.1.3 都是有这个问题。而且如果是 mq 集群时,这个问题更让人头疼,单机时至少发第二次能够发出去,但是集群时如果正好创建了 topic 的 broker 宕机,那么后续再发消息其他的 broker 不会再自动创建 topic,除非重启应用。

zooah212 avatar Aug 24 '23 03:08 zooah212

还真的是。我也复现这个问题了。 同样的5.1.3版本

mufengCc avatar Sep 04 '23 02:09 mufengCc

还真的是。我也复现这个问题了。 同样的5.1.3版本

嗯。当在高可用场景下,会出现更严重的问题。如果有两个 master A 和 B,当发消息的时候会随机在某个 master 上面创建 topic,比如在 master A 上面创建好了 topic,之后发/收消息会正常;但是当 master A 挂掉之后,再发消息则 Producer 会直接报错,发不出消息,且 master B 上面也不会再创建 topic,除非重启应用。。。

zooah212 avatar Sep 05 '23 08:09 zooah212

收到,感谢提问。我正在本地复现,并且正在尝试基于5.0版本修复这个问题。按照以上大家的描述我理解是一个共性问题,所以等我PR,不过由于工作的原因,可能需要点时间,烦请暂时克服一下,我尽量加快修复速度。:-)

SchopenhauerZhang avatar Sep 18 '23 11:09 SchopenhauerZhang

收到,感谢提问。我正在本地复现,并且正在尝试基于5.0版本修复这个问题。按照以上大家的描述我理解是一个共性问题,所以等我PR,不过由于工作的原因,可能需要点时间,烦请暂时克服一下,我尽量加快修复速度。:-)

您好,这个问题解决了么?希望能百忙之中抽空处理下,或通知其他贡献者研究下。谢谢了~

xinchilde avatar Aug 14 '24 08:08 xinchilde

我的是4.9.7版本也出现这个问题,消费者无法消费第一条数据

wolf27w avatar Sep 02 '24 03:09 wolf27w

目前自己创建,临时解决方法: 引入配置文件: implementation('org.apache.rocketmq:rocketmq-tools:5.0.0') 简单实现代码: `/**

  • @author

  • @date 2024/12/20 */ @Slf4j @Component public class InitRocketmqTopicRunner implements ApplicationRunner {

    @Resource private RedisService redisService;

    @Value("${rocketmq.name-server}") private String nameServerAddr;

    @Override public void run(ApplicationArguments args) throws Exception { Object flag = redisService.get(RedisKeys.SYSTEM_INITIALIZATION_FLAG); if (flag == null) { log.info("Init Rocketmq Topic"); initTopic(); redisService.setCacheObject(RedisKeys.SYSTEM_INITIALIZATION_FLAG, true); } }

    private void initTopic() { DefaultMQAdminExt admin = new DefaultMQAdminExt(); admin.setInstanceName(Long.toString(System.currentTimeMillis())); admin.setNamesrvAddr(nameServerAddr); try { admin.start(); ClusterInfo clusterInfo = admin.examineBrokerClusterInfo(); Set<String> clusterSet = clusterInfo.getClusterAddrTable().keySet(); if (clusterSet.isEmpty()) { log.error("No clusters found in RocketMQ."); return; } // Use the first cluster found String clusterName = clusterSet.iterator().next(); log.info("Using cluster: {}", clusterName); Field[] fields = TopicConstants.class.getDeclaredFields(); for (Field field : fields) { if (field.getType() == String.class) { String topic = (String) field.get(null); if (checkIfTopicExists(admin, topic)) { log.info("Topic {} already exists.", topic); } else { createTopic(admin,clusterName, topic); } } } } catch (Exception e) { log.error("Failed to initialize RocketMQ topics", e); } finally { admin.shutdown(); } }

    private boolean checkIfTopicExists(DefaultMQAdminExt admin, String topic) { try { TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic); return topicRouteData != null && !topicRouteData.getQueueDatas().isEmpty(); } catch (MQClientException | RemotingException | InterruptedException e) { // If the topic does not exist, an exception will be thrown log.warn("Topic {} does not exist.", topic); return false; } } private void createTopic(DefaultMQAdminExt admin, String clusterName, String topic) throws Exception{ admin.createTopic(clusterName, topic, 4,null); log.info("Created topic {}", topic); }` 'public interface TopicConstants { String DOWN_INVOKE_SERVICE = "down_invoke_service"; }'

qinyi8888 avatar Dec 20 '24 08:12 qinyi8888

5.2.0版本,遇到类似的问题,producer发送成功,根据id通过mqadmin也查不到,后续跟踪到store/index文件里面缺失信息

lehug avatar Jul 09 '25 11:07 lehug

大家好,这个问题是一个时序问题,一个topic被创建的时候,队列为null,最大偏移量为0,然后生产者发送第一条消息的时候,最大偏移量为1。消费者启动的时候,查询最大偏移量为-1,然后调用maxoffset(mq),就会返回1.这样0消息就被跳过了。 可以在发送消息前,可以先启动 Consumer 实例,或者在 Consumer 启动后等待一段时间再发送消息。 然后在配置的时候,设置从ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET开始消费。 也可以在代码方面更改,https://github.com/apache/rocketmq/pull/9552

mengnankkkk avatar Jul 17 '25 01:07 mengnankkkk