rocketmq-client-cpp icon indicating copy to clipboard operation
rocketmq-client-cpp copied to clipboard

How to set the pull number for each message?

Open cgeffect opened this issue 1 year ago • 0 comments

When there is a large amount of message accumulation, the consumer's callback function will retrieve multiple messages at once, ranging from 1 to 32, but the maximum is 32 this function msgs param size is 1 <= msgs.size() <= 32 virtual rocketmq::ConsumeStatus consumeMessage(const std::vectorrocketmq::MQMessageExt &msgs);

This set of messages cannot have a separate status of CONSUME_SUCCESS or other status. So I must wait for all messages in this group to be processed by the business before being uniformly marked as CONSUME_SUCCESS or other status.

I set consumer setConsumeMessageBatchMaxSize(1), but is not work.

For example, if the size of msgs is 32, then I just need to return CONSUME_ SUCCESS means that all 32 messages will be consumed and removed from the message queue broker. Actually, I only want to consume one by one, rather than group by group

But the same logic applies in the Java version of Rocketmq, I can take one out of msgs and mark return CONSUME_SUCCESS Other messages will continue to be sent and have not been removed from the broker.

OR I can set setPullBatchSize, it default value is 32.

public void setPullBatchSize(int pullBatchSize) {
    this.pullBatchSize = pullBatchSize;
}

this is cpp consumer config

// 消费者
std::string unique_group_name = group + "_" + tag;
consumer = std::make_shared<rocketmq::DefaultMQPushConsumer>(unique_group_name);
consumer->setNamesrvAddr(namesrv);
consumer->setGroupName(group);
consumer->setConsumeThreadCount(max_thread_count);
consumer->setConsumeFromWhere(rocketmq::CONSUME_FROM_LAST_OFFSET);
// 设置每次最大拉取消息量。仅在mq发生消息堆积时候有效
consumer->setConsumeMessageBatchMaxSize(1);
consumer->setTcpTransportConnectTimeout(30 * 1000);
consumer->setAsyncPull(false); // set sync pull
consumer->setMessageModel(rocketmq::CLUSTERING);
consumer->setInstanceName(group);
rocketmq::elogLevel inputLevel = rocketmq::eLOG_LEVEL_LEVEL_NUM;
consumer->setLogLevel(inputLevel);

What should I do?

cgeffect avatar Jul 21 '23 08:07 cgeffect