rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[Bug] Unable to consume messages orderly for ordered message while using RocketMQ 5 client

Open CodingOX opened this issue 1 year ago • 7 comments

Before Creating the Bug Report

  • [X] I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • [X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

Unbuntu 20.04

RocketMQ version

RocketMQ 5.2

JDK Version

JDK 21

Describe the Bug

When attempting to consume messages in order using the RocketMQ 5 client with a 2 master no any slaves, it is observed that the consumer is unable to consume messages sequentially.

Steps to Reproduce

Create FIFO topic

./mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 192.168.10.233:9876 -a +message.type=FIFO

Producer Code, Write in Kotlin


import cn.hutool.core.lang.generator.SnowflakeGenerator
import org.apache.rocketmq.client.apis.ClientConfiguration
import org.apache.rocketmq.client.apis.ClientServiceProvider
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicLong

fun main() {

    val log: Logger = LoggerFactory.getLogger("OCP")
    val provider = ClientServiceProvider.loadService()

    val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
    val clientConfiguration = ClientConfiguration.newBuilder()
        .setEndpoints(endpoints)
        .build()

    val topic = "FIFOTopic"
    val tag = "tag-2"

    val producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build()

    val atomicLong = AtomicLong()
    val snowflakeGenerator = SnowflakeGenerator()
    val messageGroup = "OrderedGroup-1"
    try {
        val ts = System.currentTimeMillis()
		// 
        repeat(30) {
            val id = snowflakeGenerator.next()
            val curIndex = atomicLong.incrementAndGet()
            val message = provider
                .newMessageBuilder()
                .setTag(tag)
                .setKeys(id.toString())
                .setTopic(topic)
                .setMessageGroup(messageGroup) // Set MessageGroup For FIFO
                .setBody(("$ts - $curIndex").toByteArray())
                .build()

            val receipt = producer.send(message)

            log.info("Send message: $curIndex - $id - ${receipt.messageId}")
        }
    } catch (ex: Exception) {
        log.error("Send message error", ex)
    }
}
  • The producer side seems to be correctly setting the message group for ensuring ordered message delivery.
  • Messages are being sent to the specified topic with message group OrderedGroup-1.

Consumer Code


import cn.hutool.core.thread.ThreadUtil
import org.apache.rocketmq.client.apis.ClientConfiguration
import org.apache.rocketmq.client.apis.ClientServiceProvider
import org.apache.rocketmq.client.apis.consumer.ConsumeResult
import org.apache.rocketmq.client.apis.consumer.FilterExpression
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit


fun main() {
    val log: Logger = LoggerFactory.getLogger("ORC")

    val provider = ClientServiceProvider.loadService()
    val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
    val clientConfiguration = ClientConfiguration.newBuilder()
        .setEndpoints(endpoints)
        .build()

    val topic = "FIFOTopic"
    val tag = "tag-2"
    val topicToTag = mapOf(topic to FilterExpression(tag, FilterExpressionType.TAG))
	
	// 
    val cq = ConcurrentLinkedQueue<String>()

    provider.newPushConsumerBuilder()
        .setClientConfiguration(clientConfiguration)
        .setSubscriptionExpressions(topicToTag)
        .setConsumerGroup("OrderConsumerGroupV3")
        .setMessageListener {
            val msg = StandardCharsets.UTF_8.decode(it!!.body)
            cq.offer(msg.toString())
            ConsumeResult.SUCCESS
        }.build()

    ThreadUtil.sleep(10, TimeUnit.SECONDS)

    log.info(cq.joinToString("\n"))

    ThreadUtil.sleep(1, TimeUnit.HOURS)
}

  • The consumer side is using a ConcurrentLinkedQueue to store received messages temporarily for logging purposes.
  • Messages are being consumed from the topic FIFOTopic with tag tag-2.
  • After a waiting period of 10 seconds, the received messages are logged using cq.joinToString("\n").

What Did You Expect to See?

Te messages should be consumed in the order they were sent. However, the consumer is not able to achieve this expected behavior.

What Did You See Instead?

Messages are not consumed in the order in which they were sent?

16:55:02.686 [main] INFO ORC -- 1712134499102 - 1
1712134499102 - 2
1712134499102 - 3
1712134499102 - 4
1712134499102 - 6
1712134499102 - 5
1712134499102 - 7
1712134499102 - 10
1712134499102 - 11
1712134499102 - 12
1712134499102 - 9
1712134499102 - 8
1712134499102 - 13
1712134499102 - 14
1712134499102 - 15
1712134499102 - 16
1712134499102 - 17
1712134499102 - 21
1712134499102 - 18
1712134499102 - 19
1712134499102 - 22
1712134499102 - 23
1712134499102 - 20
1712134499102 - 24
1712134499102 - 25
1712134499102 - 26
1712134499102 - 27
1712134499102 - 28
1712134499102 - 29
1712134499102 - 30

Additional Context

From the dashboard, we can see the message store in the same message queue. image

And From the http api /message/queryMessageByTopicAndKey.query?key=xxx&topic=FIFOTopic, we can get the message details. The message with index 6 arrived at the broker later than the message with index 5. but 6 is consume before than 5.

CodingOX avatar Apr 03 '24 09:04 CodingOX

rocketmq currently only supports sequential consumption for a single queue

3424672656 avatar Apr 03 '24 15:04 3424672656

Need to set consumer group as order consumption mode

lizhimins avatar Apr 06 '24 08:04 lizhimins

Need to set consumer group as order consumption mode

From the doc : https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage and souce code , I can't find the api for order consumption mode? Can you give me more help?

CodingOX avatar Apr 07 '24 08:04 CodingOX

Need to set consumer group as order consumption mode

Need to set consumer group as order consumption mode

You mean that if using RocketMQ version 5 and requiring a first-in-first-out (FIFO) ordering queue, I must configure the topic with only one message queue?

CodingOX avatar Apr 07 '24 08:04 CodingOX

Configure your consumer group as order with "-o" option.

./mqadmin updateSubGroup -c DefaultCluster -n 192.168.10.233:9876 -g OrderConsumerGroupV3 -o true

redlsz avatar Apr 15 '24 12:04 redlsz

Configure your consumer group as order with "-o" option.

./mqadmin updateSubGroup -c DefaultCluster -n 192.168.10.233:9876 -g OrderConsumerGroupV3 -o true

This is my original shell that from official document. I have used -o true for the topic creation.

./mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 192.168.10.233:9876 -a +message.type=FIFO

So this cause of the problem can be ruled out.

CodingOX avatar Apr 16 '24 02:04 CodingOX

I found the same problem. I use the Rust client. First I thought it was a client bug. But later I use grpcurl to send the same messages and I got the same result. The Rust client surpport batch send. I only found this problem in batch send api. When use batch send. not only the order is wrong, also the keys are wrong, all the messages in the same batch will have the key of the first message of the batch. and in the batch send scenario, the message are put in different queue randomly.

mtfcd avatar Jun 28 '24 02:06 mtfcd