Explore support for multiproducer Sinks.Many with heavy contention
Motivation
Currently, Sinks.many() produces sinks that fail fast in case parallel use is detected.
- This allows to get immediate and explicit feedback from the
tryEmitXxxAPI. - The downside is that it puts the onus on the user to ensure calls do not overlap.
- So far, general recommendation has been to busy-loop.
Processors' FluxSink API on the contrary didn't have explicit feedback.
The default FluxSink were hardened against parallel usage by offering elements to a queue and ensuring a single producer would "win" and drain that queue.
This only worked because of the "no feedback" aspect, which was also the greatest drawback of this approach:
- You could think the value you just emitted had been correctly processed, when it fact it was just offered to a queue.
- Said queue could fail to drain entirely, due to eg. underlying FluxSink termination.
But we've received feedback that for Sinks.Many, the busy-loop approach is problematic if there is a lot of contention...
So we gained predictability and insight into each individual tryEmitXxx calls, at the expense of an all-purpose contention-heavy-compatible multithreaded approach.
Desired solution
Offer an alternative to the busy loop strategy to deal with multiple producers, without having each user reinvent the wheel.
Considered alternatives
This can come in the form of either of:
- guidance and documentation?
- new
Manyfactory methods (eg. an overload ofSinks.many().multicast().onBackpressureBuffer()) - a new category under
ManySpec(likemulticast(),replay(), etc...) - a new
ManySpec(likeunsafe()) - utility class(es) or wrappers (to some extent, a new API)
- adding methods to the API
Additional Context
Option 2: Add partial support as factory methods
It would likely mean we can incrementally offer partial support based on multiproducer queues.
unicast() spec already has an option which can use Queues.multiproducerUnbounded().
In multicast(), direct* flavors would be excluded. onBackpressureBuffer flavors could support multiproducer case but we would need a bounded MPSC queue implementation (since onBackpressureBuffer uses the bounded aspect of the queue).
In replay(), we'd probably need a complete parallel implementation of the concrete class, which implements a linked-list-like datastructure (or at least make 100% sure this structure is thread safe and lock-free).
Option 5a: Lightweight queue-draining utility
This could maybe take the form of a MPSC queue + a single-threaded object which holds the Sinks.Many and drains the queue to it.
This assumes that in case of contention, producers just offer to the queue and so they DON'T get the immediate feedback aspect of tryEmitNext (which is invoked by the drainer).
The drainer can be customized by users in order to decide what to do in case of EmitResult.isFailure() (with access to the remainder of the queue for retrying, etc...).
Option 5b: Heavyweight new multiproducer API
(explored in https://github.com/reactor/reactor-core/compare/sinksMultiproducer?expand=1)
We could devise an API that is close to Sinks.Many but focuses on solving the multiproducer case using an MPSC queue to accommodate a lot of contention, but WITH support for immediate feedback.
With a dedicated API, tryEmitNext can return something that represents the fact that failures don't necessarily apply to the T that was just passed to the sink. Instead, it represent the fact that the current thread was draining the queue and encountered a failure EmitResult on some arbitrary element currently in the queue.
By having this dedicated API we can return a pair of EmitResult and T, which gives the caller access to which value was left unprocessed. We can also expose the remainder of the queue, or decide to automatically clear it for terminated/cancelled cases...
When competing for the multiproducer sink, callers that DON'T end up draining the queue just receive a notification that their value has been submitted to a batch.
This better represent the work-stealing aspect of the approach to the user.
One drawback is that this approach assumes any caller can deal with an EmitResult.isFailure() for an arbitrary value that might have come from another caller. ie. processing of emit failures is the same across all usages of such a multiproducer sink.
Option 6: Adding methods to Sinks.Many
The idea above could maybe be integrated into the existing API in the form of a tryEmitNext variant that return a pair of EmitResult and <T>.
Challenge here is to avoid paying a cost of dealing with MPSC queue in the case of Sinks.many().unsafe().
Most implementations could default to delegating to tryEmitNext(T value) and simply always return value as the <T> part of the pair? Only when explicitly enabling MPSC-safe trait would we obtain something that can return EmitResult.SUBMITTED_TO_BATCH or a failure with a <T> != value...
Any news on this? I'm finding it really hard to take multiple Web Sockets' messages into a single Sink to process each message one-by-one.
Is there any update on this issue ?
@jryan128 @deepak5127 there have been other priority things that we focused on so far, but this subject might require revisiting. Can you please elaborate on the proposal in the description and describe the user perspective and expectations? Several options are listed, each with different UX and design trade-offs. Were we to invest into this, we'd need to consider more thorough descriptions of the use cases, current API limitations and desired outcome from both the user-friendliness aspect but also "correctness" as to ACK-modes and notification of failures/success. Thanks for the input in case this is a needed feature.
I think for me getting a callback with failed emissions is good enough. I can at least retry them later or log them.