Drain waits indefinitely after request timeout on fetch
(not sure if limited to the simplified API, but it can be reproduced with it)
Calling drain(Duration.ZERO) will wait indefinitely when a request timeout happened on a fetch.
For this to happen the fetch() must be called, the request timeout status message must be received and drain must be called. Since the status message is counted in the length of the MessageQueue, the isDrained() will always return false.
A workaround can be to always loop over the result and/or close the FetchConsumer manually.
However, the expectation would be that drain() closes this FetchConsumer automatically, even if not manually closed.
Added a test to reproduce the issue.
An easy, but hacky, way to solve it is to fully ignore StatusMessage for the length and sizeInBytes calculations in MessageQueue..
However, NatsJetStreamSubscription seems to be aware of these status messages, looking at _nextUnmanaged. So optimally the isDrained would be overwritten there to not count the status message as pending?
Or maybe automatically calling close on the FetchConsumer when drain is called?
What would be the right approach?
Have implemented the latter approach, so overriding isDrained() in NatsJetStreamSubscription to ignore pending status messages. Fixing drain(Duration.ZERO) blocking indefinitely when only status messages are left in the queue.
So I know how to fix the pull issue. In simplification, every pull request for the pull sub has an expiration. I need to track the expiration and if I don't get the proper status that closed the pull, I need to recognize that it has stalled. This will also help when a connection is broken and the simplification stalled.
Revisiting this again, I'm tempted to close this.
While it can technically be an issue, it will only happen if you'd have a fetch that expired, calling drain(Duration.ZERO) AND you don't call nextMessage() after calling drain. Not sure if you would ever get into such a situation normally, as the FetchMessageExample also shows the proper way to do this (and will not run into this issue).
@scottf, would you agree with closing this?
After some chat, @scottf and I agreed on leaving this open. Reverting the "fix" I did for now, giving some more time to think of a proper solution, while having visibility on whether the test is failing or not.