rocketmq
rocketmq copied to clipboard
[Bug] Unable to consume messages orderly for ordered message while using RocketMQ 5 client
Before Creating the Bug Report
-
[X] I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
[X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
[X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
Unbuntu 20.04
RocketMQ version
RocketMQ 5.2
JDK Version
JDK 21
Describe the Bug
When attempting to consume messages in order using the RocketMQ 5 client with a 2 master no any slaves, it is observed that the consumer is unable to consume messages sequentially.
Steps to Reproduce
Create FIFO topic
./mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 192.168.10.233:9876 -a +message.type=FIFO
Producer Code, Write in Kotlin
import cn.hutool.core.lang.generator.SnowflakeGenerator
import org.apache.rocketmq.client.apis.ClientConfiguration
import org.apache.rocketmq.client.apis.ClientServiceProvider
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicLong
fun main() {
val log: Logger = LoggerFactory.getLogger("OCP")
val provider = ClientServiceProvider.loadService()
val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
val clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build()
val topic = "FIFOTopic"
val tag = "tag-2"
val producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build()
val atomicLong = AtomicLong()
val snowflakeGenerator = SnowflakeGenerator()
val messageGroup = "OrderedGroup-1"
try {
val ts = System.currentTimeMillis()
//
repeat(30) {
val id = snowflakeGenerator.next()
val curIndex = atomicLong.incrementAndGet()
val message = provider
.newMessageBuilder()
.setTag(tag)
.setKeys(id.toString())
.setTopic(topic)
.setMessageGroup(messageGroup) // Set MessageGroup For FIFO
.setBody(("$ts - $curIndex").toByteArray())
.build()
val receipt = producer.send(message)
log.info("Send message: $curIndex - $id - ${receipt.messageId}")
}
} catch (ex: Exception) {
log.error("Send message error", ex)
}
}
- The producer side seems to be correctly setting the message group for ensuring ordered message delivery.
- Messages are being sent to the specified topic with message group
OrderedGroup-1.
Consumer Code
import cn.hutool.core.thread.ThreadUtil
import org.apache.rocketmq.client.apis.ClientConfiguration
import org.apache.rocketmq.client.apis.ClientServiceProvider
import org.apache.rocketmq.client.apis.consumer.ConsumeResult
import org.apache.rocketmq.client.apis.consumer.FilterExpression
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
fun main() {
val log: Logger = LoggerFactory.getLogger("ORC")
val provider = ClientServiceProvider.loadService()
val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
val clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build()
val topic = "FIFOTopic"
val tag = "tag-2"
val topicToTag = mapOf(topic to FilterExpression(tag, FilterExpressionType.TAG))
//
val cq = ConcurrentLinkedQueue<String>()
provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setSubscriptionExpressions(topicToTag)
.setConsumerGroup("OrderConsumerGroupV3")
.setMessageListener {
val msg = StandardCharsets.UTF_8.decode(it!!.body)
cq.offer(msg.toString())
ConsumeResult.SUCCESS
}.build()
ThreadUtil.sleep(10, TimeUnit.SECONDS)
log.info(cq.joinToString("\n"))
ThreadUtil.sleep(1, TimeUnit.HOURS)
}
- The consumer side is using a
ConcurrentLinkedQueueto store received messages temporarily for logging purposes. - Messages are being consumed from the topic
FIFOTopicwith tagtag-2. - After a waiting period of 10 seconds, the received messages are logged using
cq.joinToString("\n").
What Did You Expect to See?
Te messages should be consumed in the order they were sent. However, the consumer is not able to achieve this expected behavior.
What Did You See Instead?
Messages are not consumed in the order in which they were sent?
16:55:02.686 [main] INFO ORC -- 1712134499102 - 1
1712134499102 - 2
1712134499102 - 3
1712134499102 - 4
1712134499102 - 6
1712134499102 - 5
1712134499102 - 7
1712134499102 - 10
1712134499102 - 11
1712134499102 - 12
1712134499102 - 9
1712134499102 - 8
1712134499102 - 13
1712134499102 - 14
1712134499102 - 15
1712134499102 - 16
1712134499102 - 17
1712134499102 - 21
1712134499102 - 18
1712134499102 - 19
1712134499102 - 22
1712134499102 - 23
1712134499102 - 20
1712134499102 - 24
1712134499102 - 25
1712134499102 - 26
1712134499102 - 27
1712134499102 - 28
1712134499102 - 29
1712134499102 - 30
Additional Context
From the dashboard, we can see the message store in the same message queue.
And From the http api /message/queryMessageByTopicAndKey.query?key=xxx&topic=FIFOTopic, we can get the message details. The message with index 6 arrived at the broker later than the message with index 5. but 6 is consume before than 5.
rocketmq currently only supports sequential consumption for a single queue
Need to set consumer group as order consumption mode
Need to set consumer group as order consumption mode
From the doc : https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage and souce code , I can't find the api for order consumption mode? Can you give me more help?
Need to set consumer group as order consumption mode
Need to set consumer group as order consumption mode
You mean that if using RocketMQ version 5 and requiring a first-in-first-out (FIFO) ordering queue, I must configure the topic with only one message queue?
Configure your consumer group as order with "-o" option.
./mqadmin updateSubGroup -c DefaultCluster -n 192.168.10.233:9876 -g OrderConsumerGroupV3 -o true
Configure your consumer group as order with "-o" option.
./mqadmin updateSubGroup -c DefaultCluster -n 192.168.10.233:9876 -g OrderConsumerGroupV3 -o true
This is my original shell that from official document. I have used -o true for the topic creation.
./mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 192.168.10.233:9876 -a +message.type=FIFO
So this cause of the problem can be ruled out.
I found the same problem. I use the Rust client. First I thought it was a client bug. But later I use grpcurl to send the same messages and I got the same result. The Rust client surpport batch send. I only found this problem in batch send api. When use batch send. not only the order is wrong, also the keys are wrong, all the messages in the same batch will have the key of the first message of the batch. and in the batch send scenario, the message are put in different queue randomly.