qmq icon indicating copy to clipboard operation
qmq copied to clipboard

消费端怎么设置手动ack

Open lichchw opened this issue 3 years ago • 17 comments

lichchw avatar Jul 01 '21 07:07 lichchw

https://github.com/qunarcorp/qmq/blob/22d691707122c41fa6676df367c64b677ecfd4a3/qmq-api/src/main/java/qunar/tc/qmq/Message.java#L136

通过设置这个 autoAck 来调整。

keliwang avatar Jul 01 '21 08:07 keliwang

我在消费实时消息的时候,由于我的业务比较耗时,当服务器宕机的时候,那些没有被消费的消息为什么丢失了,服务器重启后也没有消费

lichchw avatar Jul 01 '21 08:07 lichchw

这里的服务器是你的消费者还是 qmq 的 broker? 消费者重启,没有 ack 的消息应该都会重发,除非消息已经过期了。 无论你的业务多么耗时,都是需要保证你的消费能力是要超过消息发送量的,否则消息肯定会不断堆积。同时消费耗时也不能超过设定的消息过期时间。

keliwang avatar Jul 01 '21 08:07 keliwang

实时消息的过期时间默认是多久?哪里可以设置吗?

lichchw avatar Jul 01 '21 09:07 lichchw

参考这里:https://github.com/qunarcorp/qmq/blob/master/docs/cn/install.md#%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6-1

# 可选,动态生效,messagelog过期时间,单位是小时
messagelog.retention.hours=72
# 可选,动态生效,consumerlog过期时间
consumerlog.retention.hours=72
# 可选,动态生效,pulllog过期时间
pulllog.retention.hours=72

keliwang avatar Jul 01 '21 09:07 keliwang

您好,现在消费端重启后可以继续消费了,但是消费的数据有问题,是重复的

@.***

发件人: Keli 发送时间: 2021-07-01 17:18 收件人: qunarcorp/qmq 抄送: lichchw; Author 主题: Re: [qunarcorp/qmq] 消费端怎么设置手动ack (#137) 参考这里:https://github.com/qunarcorp/qmq/blob/master/docs/cn/install.md#%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6-1

可选,动态生效,messagelog过期时间,单位是小时

messagelog.retention.hours=72

可选,动态生效,consumerlog过期时间

consumerlog.retention.hours=72

可选,动态生效,pulllog过期时间

pulllog.retention.hours=72

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe.

lichchw avatar Jul 01 '21 09:07 lichchw

qmq 服务端没有 ack 的消息都会重复投递,消费的幂等性是需要消费者自己处理的。

keliwang avatar Jul 01 '21 09:07 keliwang

好的,谢谢

lichchw avatar Jul 01 '21 09:07 lichchw

你好,未ack的消息重新消费时会报这个错误 java.nio.BufferUnderflowException: null at java.nio.Buffer.nextGetIndex(Buffer.java:506) at java.nio.DirectByteBuffer.getLong(DirectByteBuffer.java:771) at qunar.tc.qmq.store.PullLog.getMessageSequence(PullLog.java:104) at qunar.tc.qmq.store.DefaultStorage.getMessageSequenceByPullLog(DefaultStorage.java:343) at qunar.tc.qmq.store.MessageStoreWrapper.getConsumerLogSequence(MessageStoreWrapper.java:358) at qunar.tc.qmq.store.MessageStoreWrapper.doFindUnAckMessages(MessageStoreWrapper.java:286) at qunar.tc.qmq.store.MessageStoreWrapper.findUnAckMessages(MessageStoreWrapper.java:254) at qunar.tc.qmq.store.MessageStoreWrapper.findMessages(MessageStoreWrapper.java:77) at qunar.tc.qmq.processor.PullMessageWorker.process(PullMessageWorker.java:68) at qunar.tc.qmq.processor.PullMessageWorker.process(PullMessageWorker.java:33) at qunar.tc.qmq.concurrent.ActorSystem$Actor.processMessages(ActorSystem.java:173) at qunar.tc.qmq.concurrent.ActorSystem$Actor.run(ActorSystem.java:155) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at qunar.tc.qmq.concurrent.NamedThreadFactory$1.run(NamedThreadFactory.java:52) at java.lang.Thread.run(Thread.java:748)

lichchw avatar Jul 03 '21 05:07 lichchw

这个错误倒是没有见过,你这个未 ack 的消息是多久之前的?

keliwang avatar Jul 03 '21 09:07 keliwang

简单来描述一下问题的过程: 1 往qmq中生产数字为0-99的数据 2 consumer端可以正常消费 3.在消费的过程中kill掉consumer端 4 重启consumer端后 消费的数据均为第一次生产的数据 也就是0

ToTouchMyheart avatar Jul 06 '21 08:07 ToTouchMyheart

@ToTouchMyheart 你用的是哪个版本,是 master 分支还是某个 release 版本?

keliwang avatar Jul 11 '21 09:07 keliwang

您好,那个问题解决了,是我用的版本不对。 另外再请教一下,多个consumerGroup拉取消息的策略是什么? 我有两台消费者,在消费过程中发现拉取到的消息数量差别很大。

lichchw avatar Jul 23 '21 01:07 lichchw

一个 consumer group 内的多个消费者共同消费主题的消息,消费者性能相同就是基本平分,消费者性能有差异则是消费能力强的消费更多。 多个 consumer group 则是互相隔离的,每个 consumer group 都是消费主题所有的消息。

keliwang avatar Jul 23 '21 02:07 keliwang

好的,谢谢啦

lichchw avatar Jul 23 '21 04:07 lichchw

When use message.ack method .got exception: java.lang.UnsupportedOperationException: BaseMessage does not support this method at qunar.tc.qmq.base.BaseMessage.ack(BaseMessage.java:335) ~[qmq-1.1.41.jar:?]

jonyangx avatar Nov 01 '22 07:11 jonyangx

When use message.ack method .got exception: java.lang.UnsupportedOperationException: BaseMessage does not support this method at qunar.tc.qmq.base.BaseMessage.ack(BaseMessage.java:335) ~[qmq-1.1.41.jar:?]

ack method only works in consumer side. e.g.

@QmqConsumer(subject = "your subject", consumerGroup = "group", executor = "your executor bean name")
public void onMessage(Message message) {
  message.ack(xxx);
}

keliwang avatar Nov 02 '22 01:11 keliwang