pulsar-client-node icon indicating copy to clipboard operation
pulsar-client-node copied to clipboard

Waiting for ack

Open taefed opened this issue 2 years ago • 3 comments

I'm trying to consume 1 message at a time, and i'm acknowledging the message outside of the listener. But it's just grabbing the next message without waiting for the ack.

code sample:

let msg;
let msgConsumer;

const consumer = client.subscribe({
                    topic: `persistent://public/default/test1`,
                    subscription: `sub1`,
                    subscriptionType: 'Exclusive',
                    receiverQueueSize:0,
                    ackTimeoutMs:300000,
                    listener: (msg,msgConsumer) => {
                        console.log(msg.getData().toString());
                        msg = msg;
                        msgConsumer = msgConsumer;

                    }
                  });
                  
function someWhereElse(){

    msgConsumer.acknowledge(msg);


}

Is something like this possible?

taefed avatar Dec 11 '23 14:12 taefed

Since listener is used for asynchronous message processing, it doesn't seem to meet your requirement (1 message at a time).

  • Why are you using listener?
  • Could you try receive() instead? e.g.
const consumer = client.subscribe( /* without listener */ );

async function somewhereElse() {
  const msg = await consumer.receive();
  consumer.acknowledge(msg);
}

kimula avatar Dec 26 '23 02:12 kimula

Thanks a lot for the response.

I'm not receiving in the same place I'm acknowledging. I'm trying to find a way to make 1 process add jobs to queues, and acknowledge them, and the other process needs to just process 1 at a time and wait for acknowledgement.

  1. process 1 adds a message to the queue.
  2. process 2 starts processing message
  3. process 1 acknowledges message
  4. process 2 can receive next message

if a second message gets added to the queue, it has to wait for the first message to be acknowledged, I haven't found a way to make this work with the manual receive(). But I'm new to queueing so maybe I'm looking over something.

taefed avatar Dec 29 '23 14:12 taefed

I'm trying to find a way to make 1 process add jobs to queues, and acknowledge them, and the other process needs to just process 1 at a time and wait for acknowledgement.

It seems we can't implement it with only the functionality of pulsar-client.

equanz avatar Jan 25 '24 02:01 equanz