rocketmq-client-cpp
rocketmq-client-cpp copied to clipboard
How to set the pull number for each message?
When there is a large amount of message accumulation, the consumer's callback function will retrieve multiple messages at once, ranging from 1 to 32, but the maximum is 32 this function msgs param size is 1 <= msgs.size() <= 32 virtual rocketmq::ConsumeStatus consumeMessage(const std::vectorrocketmq::MQMessageExt &msgs);
This set of messages cannot have a separate status of CONSUME_SUCCESS or other status. So I must wait for all messages in this group to be processed by the business before being uniformly marked as CONSUME_SUCCESS or other status.
I set consumer setConsumeMessageBatchMaxSize(1), but is not work.
For example, if the size of msgs is 32, then I just need to return CONSUME_ SUCCESS means that all 32 messages will be consumed and removed from the message queue broker. Actually, I only want to consume one by one, rather than group by group
But the same logic applies in the Java version of Rocketmq, I can take one out of msgs and mark return CONSUME_SUCCESS Other messages will continue to be sent and have not been removed from the broker.
OR I can set setPullBatchSize, it default value is 32.
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
this is cpp consumer config
// 消费者
std::string unique_group_name = group + "_" + tag;
consumer = std::make_shared<rocketmq::DefaultMQPushConsumer>(unique_group_name);
consumer->setNamesrvAddr(namesrv);
consumer->setGroupName(group);
consumer->setConsumeThreadCount(max_thread_count);
consumer->setConsumeFromWhere(rocketmq::CONSUME_FROM_LAST_OFFSET);
// 设置每次最大拉取消息量。仅在mq发生消息堆积时候有效
consumer->setConsumeMessageBatchMaxSize(1);
consumer->setTcpTransportConnectTimeout(30 * 1000);
consumer->setAsyncPull(false); // set sync pull
consumer->setMessageModel(rocketmq::CLUSTERING);
consumer->setInstanceName(group);
rocketmq::elogLevel inputLevel = rocketmq::eLOG_LEVEL_LEVEL_NUM;
consumer->setLogLevel(inputLevel);
What should I do?