Improve Queue lifecycle management in operators
Currently, automatic context propagation using the context-propagation library uses a Hooks.addQueueWrapper method to register ContextQueue, an envelope mechanism to aid in propagating ThreadLocal context across Thread boundaries.
The implementation restores ThreadLocals in poll() and resets the state to the previous one when null is about to be returned.
This approach works in draining scenarios in operators, that always consume all the items in the queue. If, however, the consumption stops and a Thread moves on to a different concern, the context can leak to unrelated processing paths.
In order to improve that state, an abstraction covering the Queue interaction should be introduced, that allows to signal
- "begin draining"
- "end of draining"
Having such lifecycle methods, combined with exclusive access to the Queue granted to a single Thread at a time, the ThreadLocal context would not leak and would provide proper cleanup.
As a side note, it's worth remembering that Hooks.addQueueWrapper and Queues are public API. The javadoc for Queues does not currently define clearly what usage patterns are expected. Existing user wrappers should not have to be affected by this change. Also, the internal hook for ContextQueue could be applied only to the enhanced queues this issue proposes. Therefore, a new, well defined API should be introduced, which explains what usage patterns are anticipated and what side effects can be expected when the enhanced queue is used to implement an operator.