rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[Bug] Timed messages cannot be consumed

Open zyhui98 opened this issue 1 year ago • 3 comments

Before Creating the Bug Report

  • [X] I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • [X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

CentOS 7.6

RocketMQ version

5.1.4

JDK Version

1.8.0_66

Describe the Bug

2023-12-12 00:25:46 ERROR TimerEnqueueGetService - Unknown exception in enqueuing java.lang.IllegalArgumentException: null at java.nio.Buffer.limit(Buffer.java:275) at org.apache.rocketmq.store.timer.TimerMessageStore.getMessageByCommitOffset(TimerMessageStore.java:1013) at org.apache.rocketmq.store.timer.TimerMessageStore.enqueue(TimerMessageStore.java:652) at org.apache.rocketmq.store.timer.TimerMessageStore$TimerEnqueueGetService.run(TimerMessageStore.java:1269) at java.lang.Thread.run(Thread.java:745)

Steps to Reproduce

发送消息必现,有两台broke,有一台正常

What Did You Expect to See?

normal consume message

What Did You See Instead?

not stored message

Additional Context

2023-12-12 00:25:46 ERROR TimerEnqueueGetService - Unknown exception in enqueuing java.lang.IllegalArgumentException: null at java.nio.Buffer.limit(Buffer.java:275) at org.apache.rocketmq.store.timer.TimerMessageStore.getMessageByCommitOffset(TimerMessageStore.java:1013) at org.apache.rocketmq.store.timer.TimerMessageStore.enqueue(TimerMessageStore.java:652) at org.apache.rocketmq.store.timer.TimerMessageStore$TimerEnqueueGetService.run(TimerMessageStore.java:1269) at java.lang.Thread.run(Thread.java:745)

zyhui98 avatar Dec 12 '23 07:12 zyhui98

两个broker收发普通消息都正常吗?只有定时消息有问题嘛?

GenerousMan avatar Dec 12 '23 11:12 GenerousMan

是消息太大超过我们的maxMessageSize配置的64kb,轮转不了。

zyhui98 avatar Dec 13 '23 13:12 zyhui98

    public MessageExtEncoder(final int maxMessageBodySize) {
        ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
        //Reserve 64kb for encoding buffer outside body
        int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
            maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
        byteBuf = alloc.directBuffer(maxMessageSize);
        this.maxMessageBodySize = maxMessageBodySize;
        this.maxMessageSize = maxMessageSize;
    }

my config's maxMessageSize=64kb, then maxMessageBodySize=64kb and maxMessageSize=128kb one delay message bodySize=63kb, properties=3kb, it can write in commitLog, because 63<64, 63+3+other len<128

        bufferLocal = new ThreadLocal<ByteBuffer>() {
            @Override
            protected ByteBuffer initialValue() {
                // 64kb+100
                return ByteBuffer.allocateDirect(storeConfig.getMaxMessageSize() + 100);
            }
        };

    private MessageExt getMessageByCommitOffset(long offsetPy, int sizePy) {
        for (int i = 0; i < 3; i++) {
            MessageExt msgExt = null;
            bufferLocal.get().position(0);
            // throw IllegalArgumentException because sizePy>capacity
            bufferLocal.get().limit(sizePy);
            boolean res = messageStore.getData(offsetPy, sizePy, bufferLocal.get());
            if (res) {
                bufferLocal.get().flip();
                msgExt = MessageDecoder.decode(bufferLocal.get(), true, false, false);
            }
            if (null == msgExt) {
                LOGGER.warn("Fail to read msg from commitLog offsetPy:{} sizePy:{}", offsetPy, sizePy);
            } else {
                return msgExt;
            }
        }
        return null;
    }

In org.apache.rocketmq.store.timer.TimerMessageStore#getMessageByCommitOffset , this delay message's sizepy=63+3+other len>bufferLocal.capacity=64kb+100, throw IllegalArgumentException, cause currQueueOffset cannot update.

If my opinion is right, I think it's a bug because a message should cannot be written in commitLog if it cannot be consumed, the message's limit in write and read should not be different.

Wait for your reply, thx. @RongtongJin @GenerousMan

daigoopautoy avatar Dec 14 '23 07:12 daigoopautoy