rocketmq-spring icon indicating copy to clipboard operation
rocketmq-spring copied to clipboard

Consumer batch messages API.

Open ruoshuixuelabi opened this issue 6 years ago • 13 comments

请问怎么批量的接收消息,比如按照文档接收消息是这样的 @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") public class MyConsumer implements RocketMQListener<OrderPaidEvent> { @Override public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } } 我现在想把里面的OrderPaidEvent orderPaidEvent变成List<OrderPaidEvent> orderPaidEvent这样的,我测试了一下这样会报错,原生的是支持批量的,现在我看了一遍源码没有找到批量接收的方法

ruoshuixuelabi avatar Jun 05 '19 07:06 ruoshuixuelabi

It is not supported yet.

RongtongJin avatar Nov 12 '19 09:11 RongtongJin

为什么在 2.1.0 里面又删除了呢?

liuanxin avatar Mar 23 '20 09:03 liuanxin

是这样的我们这里也有批量消费的需求,主要是想结合ORM的批量提交增加性能 ,由于能力有限,,能想到的方式很不优雅,support native listener #293 ,然后在native listener自己处理批量。希望rocketmq-spring能提供批量消费这个功能。

liuliuzo avatar Nov 29 '20 04:11 liuliuzo

We also meet the same problem, this feature is important for us to batch save data to database.

wz2cool avatar Dec 08 '20 07:12 wz2cool

yes,we need batch consumer .

zdy333666 avatar Dec 08 '20 07:12 zdy333666

Currently, I found a workaround to get batch messages. But we still perfer batch listener

@RocketMQMessageListener(topic = "demo_product", consumerGroup = "demo_batch_product_consumerGroup", consumeMode = ConsumeMode.ORDERLY)
public class ProductBatchConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(String message) {
        // do noting, have to implement RocketMQListener or throw error
    }

    private AtomicInteger atomicInteger = new AtomicInteger();

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setPullInterval(1000);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeMessageBatchMaxSize(1000);
        consumer.setPullBatchSize(100);
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.println("batchSize: " + msgs.size());
            int result = atomicInteger.addAndGet(msgs.size());
            System.out.println("totalSize: " + result);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
    }
}

wz2cool avatar Dec 08 '20 08:12 wz2cool

@RongtongJin Any update about this issue ? I found an PR #170 about batch listener

wz2cool avatar Dec 20 '20 09:12 wz2cool

we need batch consumer .

nidonglin avatar Jan 25 '21 08:01 nidonglin

It is not supported yet.

why?

nidonglin avatar Jan 25 '21 08:01 nidonglin

使用pull模式的List<String> messages = rocketMQTemplate.receive(String.class)是否就可以获取批量数据?

ShawshankLin avatar May 09 '21 04:05 ShawshankLin

we need batch consumer

771917534 avatar Oct 28 '21 02:10 771917534

rocketMQTemplate.receive 后,是否需要手工调用下面的代码来提交ack呢?

rocketMQTemplate.getConsumer().commitSync();

superleo-cn avatar Jun 20 '22 06:06 superleo-cn

image DefaultRocketMQListenerContainer 这个地方能否直接支持把原始的List<MessageExt> msgs传递下去呢,很多场景都需要批量的数据处理提升性能

pupilming avatar Jul 14 '22 03:07 pupilming