pulsar-client-node icon indicating copy to clipboard operation
pulsar-client-node copied to clipboard

Interruptible Reader.readNext()?

Open rkaw92 opened this issue 3 years ago • 4 comments

Hi, I have a use case where I expose a Pulsar topic over HTTP via Server-Sent Events. Basically, when a client connects over HTTP, I do this:

    const reader = await client.createReader({
        topic: request.params.topic,
        startMessageId: request.headers['last-event-id'] ?
                Pulsar.MessageId.deserialize(Buffer.from(request.headers['last-event-id'], 'base64')) :
                Pulsar.MessageId.earliest()
    });

Then I use a loop that reads messages as they come and sends them to the client:

    while (!clientGoneAway) {
        let message;
        message = await reader.readNext();
        reply.raw.write(formatServerSentEvent(
            message.getMessageId().serialize().toString('base64'),
            message.getData().toString('utf-8')
        ));
    }

Additionally, the server detects the close event on the HTTP requests, closes the reader and prevents further iteration:

   request.raw.once('close', async function() {
        clientGoneAway = true;
        await reader.close();
        reply.raw.end();
    })

(request.raw and reply.raw are Node.js req and res objects, respectively - they're just wrapped like this in Fastify.js)

Now, my problem is that even if I call reader.close(), the reader.readNext(); never resolves nor rejects. It's not just a Promise problem - it seems like it's keeping a thread busy, because then all other operations hang: things like fs.createReadStream, as well as creating new readers, hang forever until I completely restart the Node.js process.

I know I can use a timeout with reader.readNext(timeoutMS), but this has 2 major disadvantages:

  • It turns the reader into a kind of poller
  • It still does not vacate the thread - so it's possible to trivially saturate the thread pool by creating more readers than the pool size within the timeout period (so for example 4 readers in 1 second, when using a timeout of 1000 ms)

Is there any way to have the reader immediately abort all reads when closed?

rkaw92 avatar Apr 17 '21 14:04 rkaw92

I've a similar need :D

ludusrusso avatar Apr 22 '21 08:04 ludusrusso

Is there any way to have the reader immediately abort all reads when closed?

I'm sorry, there is no way right now.

Now, my problem is that even if I call reader.close(), the reader.readNext(); never resolves nor rejects

It seems that we need to modify pulsar C++ client used by pulsar nodejs client, so I think it will take some time. However we will consider fixing it.

hrsakai avatar Apr 27 '21 02:04 hrsakai

@rkaw92 Is it possible to solve this problem by implementing ReaderListener, which is implemented in C++ Client and C APIs, in Node.js Client? By using ReaderListener, asynchronous read is possible, and read will also stop when close is done.

C++: https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-cpp/lib/ReaderConfiguration.cc#L43-L47 C API: https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc#L34-L51

k2la avatar May 07 '21 02:05 k2la

One option is to do what we used to do in Python wrapper (before https://github.com/apache/pulsar/pull/5706 was applied). Use a 100millis timeout on the receive to check for interrupted status.

That won't be possible when receiveQueueSize=0 but it will work in all the other cases.

merlimat avatar May 07 '21 02:05 merlimat