nats.java icon indicating copy to clipboard operation
nats.java copied to clipboard

MessageConsumer.isFinished() after .stop() will never report true on empty stream or after seeing messages with no future messages in stream

Open wjnicholson opened this issue 4 months ago • 1 comments
trafficstars

Observed behavior

When testing some of the graceful closure of MessageConsumer I came across MessageConsumerExample. This tells me to call consumerContext.consume( handler ), and then when finished I can call consumer.stop() and wait for consumer.isFinished() to return true.

I've found instead that the message consumer will never return true unless there are more messages in the stream to consume, and it has already seen messages. My scenario is that I'm shutting down one consumer last in a series of applications, and since its already consumed everything it always times out. Based on a brief examination of the code it looks like the AtomicBoolean finished field is only set to true in the consumption of a message, rather than the time outs of all requests to the server. This also occurs if no messages are received before consumer.stop() is called - any subsequent messages will be received but I will not see the consumer close.

The only time I can consistently successfully stop() is when the consumer sees some events before stop() and there are more messages in the stream.

Expected behavior

Calling consumer.stop() on a stream which never receives messages should have isFinished() return true quickly. Calling consumer.stop() on a stream post seeing messages but with no more messages in the stream should have isFinished() return true quickly

Server and client version

nats-server 2.10.26 nats java 2.20.5

Host environment

Seen locally on Mac OSX and on ubuntu servers in GCP.

Steps to reproduce

Call .consume() on empty stream, call .stop() then wait for isFinished() == true which will infinitely loop. Call .consume() on a stream, publish exactly X messages and then wait for a countdown latch for them to be received, then call .stop() and wait for isFinished() == true which will will infinitely loop.

wjnicholson avatar Jun 25 '25 14:06 wjnicholson

Ah of course I go check on the latest releases and I see https://github.com/nats-io/nats.java/pull/1271 So my assumption is that I should be using .stop() and then calling .close() immediately, then waiting for isFinished(). However even if I do that I still don't see isFinished() consistently true.

I believe one possible cause of this is also heartbeatError which calls doSub() and which unsets the stopped and finished values even if we're waiting for termination, however even increasing the expiresInMillis so we don't get heartbeat errors doesn't solve the underlying issue of never terminating.

wjnicholson avatar Jun 25 '25 14:06 wjnicholson

@wjnicholson Thank you for the bug report, I was able to figure out the problem and fix it, see #1339

scottf avatar Jun 26 '25 20:06 scottf