pulsar-client-node
pulsar-client-node copied to clipboard
Interruptible Reader.readNext()?
Hi, I have a use case where I expose a Pulsar topic over HTTP via Server-Sent Events. Basically, when a client connects over HTTP, I do this:
const reader = await client.createReader({
topic: request.params.topic,
startMessageId: request.headers['last-event-id'] ?
Pulsar.MessageId.deserialize(Buffer.from(request.headers['last-event-id'], 'base64')) :
Pulsar.MessageId.earliest()
});
Then I use a loop that reads messages as they come and sends them to the client:
while (!clientGoneAway) {
let message;
message = await reader.readNext();
reply.raw.write(formatServerSentEvent(
message.getMessageId().serialize().toString('base64'),
message.getData().toString('utf-8')
));
}
Additionally, the server detects the close
event on the HTTP requests, closes the reader and prevents further iteration:
request.raw.once('close', async function() {
clientGoneAway = true;
await reader.close();
reply.raw.end();
})
(request.raw
and reply.raw
are Node.js req and res objects, respectively - they're just wrapped like this in Fastify.js)
Now, my problem is that even if I call reader.close()
, the reader.readNext();
never resolves nor rejects. It's not just a Promise problem - it seems like it's keeping a thread busy, because then all other operations hang: things like fs.createReadStream
, as well as creating new readers, hang forever until I completely restart the Node.js process.
I know I can use a timeout with reader.readNext(timeoutMS)
, but this has 2 major disadvantages:
- It turns the reader into a kind of poller
- It still does not vacate the thread - so it's possible to trivially saturate the thread pool by creating more readers than the pool size within the timeout period (so for example 4 readers in 1 second, when using a timeout of 1000 ms)
Is there any way to have the reader immediately abort all reads when closed?
I've a similar need :D
Is there any way to have the reader immediately abort all reads when closed?
I'm sorry, there is no way right now.
Now, my problem is that even if I call reader.close(), the reader.readNext(); never resolves nor rejects
It seems that we need to modify pulsar C++ client used by pulsar nodejs client, so I think it will take some time. However we will consider fixing it.
@rkaw92 Is it possible to solve this problem by implementing ReaderListener, which is implemented in C++ Client and C APIs, in Node.js Client? By using ReaderListener, asynchronous read is possible, and read will also stop when close is done.
C++: https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-cpp/lib/ReaderConfiguration.cc#L43-L47 C API: https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc#L34-L51
One option is to do what we used to do in Python wrapper (before https://github.com/apache/pulsar/pull/5706 was applied). Use a 100millis timeout on the receive to check for interrupted status.
That won't be possible when receiveQueueSize=0 but it will work in all the other cases.