nats.java
nats.java copied to clipboard
MessageConsumer.isFinished() after .stop() will never report true on empty stream or after seeing messages with no future messages in stream
Observed behavior
When testing some of the graceful closure of MessageConsumer I came across MessageConsumerExample. This tells me to call consumerContext.consume( handler ), and then when finished I can call consumer.stop() and wait for consumer.isFinished() to return true.
I've found instead that the message consumer will never return true unless there are more messages in the stream to consume, and it has already seen messages.
My scenario is that I'm shutting down one consumer last in a series of applications, and since its already consumed everything it always times out.
Based on a brief examination of the code it looks like the AtomicBoolean finished field is only set to true in the consumption of a message, rather than the time outs of all requests to the server.
This also occurs if no messages are received before consumer.stop() is called - any subsequent messages will be received but I will not see the consumer close.
The only time I can consistently successfully stop() is when the consumer sees some events before stop() and there are more messages in the stream.
Expected behavior
Calling consumer.stop() on a stream which never receives messages should have isFinished() return true quickly. Calling consumer.stop() on a stream post seeing messages but with no more messages in the stream should have isFinished() return true quickly
Server and client version
nats-server 2.10.26 nats java 2.20.5
Host environment
Seen locally on Mac OSX and on ubuntu servers in GCP.
Steps to reproduce
Call .consume() on empty stream, call .stop() then wait for isFinished() == true which will infinitely loop. Call .consume() on a stream, publish exactly X messages and then wait for a countdown latch for them to be received, then call .stop() and wait for isFinished() == true which will will infinitely loop.
Ah of course I go check on the latest releases and I see https://github.com/nats-io/nats.java/pull/1271 So my assumption is that I should be using .stop() and then calling .close() immediately, then waiting for isFinished(). However even if I do that I still don't see isFinished() consistently true.
I believe one possible cause of this is also heartbeatError which calls doSub() and which unsets the stopped and finished values even if we're waiting for termination, however even increasing the expiresInMillis so we don't get heartbeat errors doesn't solve the underlying issue of never terminating.
@wjnicholson Thank you for the bug report, I was able to figure out the problem and fix it, see #1339