reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

[Enhancement request] Provide mechanism to identify if a Publisher is hot or cold.

Open srnagar opened this issue 4 years ago • 10 comments

Motivation

I am developing a library that provides APIs for interacting with cloud storage. One of the APIs I'd like to support is to be able to upload a Flux of ByteBuffer to storage - public void upload(Flux<ByteBuffer> data).

Users of this library can simply provide a Flux and the library does most of the heavy lifting on the client-side to reliably upload to storage. This may require retries if there's a transient failure when interacting with storage.

To retry efficiently, the library needs to know if the user-provided Flux is replayable or not. If it's replayable, then I can simply re-subscribe to the flux and retry. If it's not replayable, I have to maintain a copy of chunks of data in a buffer until the upload succeeds resulting in a less efficient but still functional API.

Given a publisher, I cannot determine if it's replayable or not. So, I have to assume that it's a hot publisher and always copy the data before attempting to upload. The ability to determine hot vs cold publishers will make uploading from a cold publisher efficient while still supporting hot publishers with reduced performance.

Desired solution

Provide an API boolean isHot() in Flux that returns true if the flux is non-replayable.

Considered alternatives

As described above, the alternative is to assume that all publishers are non-replayable.

srnagar avatar Nov 27 '19 19:11 srnagar

It doesn't work. The hotness property is lost upon composing anything onto a Processor. On one hand, for example, applying map to a Processor won't execute the mapping function unless the new sequence is subscribed to. Another example is having a cold sequence merged with a Processor - there is no way to tell the flow has hot signals. Knowing a Flux is a Processor doesn't help much either because it could be a ReplayProcessor with bounded buffer. On the other hand, it may not be the best to reconsume a Flux due to side-effects outside the control of the user, otherwise a transient network layer failure would start a long running Flux from the very beginning.

akarnokd avatar Nov 27 '19 19:11 akarnokd

I was thinking that we could provide an accessor like "isHot" that is abstract on Flux and delegate to the source. Cold factories would return false and Processor/Publish etc return true ? Obviously Flux.from would always return true as per Reactive Streams, the default behavior is hot.

smaldini avatar Nov 27 '19 20:11 smaldini

Thank you for providing your inputs @akarnokd and @smaldini

I just wanted to check if there are any updates on whether this can be supported.

srnagar avatar Dec 05 '19 22:12 srnagar

@akarnokd and @smaldini - Any update on this?

srnagar avatar Dec 20 '19 02:12 srnagar

Still it doesn't work.

akarnokd avatar Dec 20 '19 09:12 akarnokd

I think there is no way to have a 100% accurate response to the isHot question, as per @akarnokd input. Another issue is that this depends (mostly) on source operators, and the ability from the subscribe point to go back up the chain to that source. Yet, this is not a guaranteed possibility: by design, the operators are guaranteed to be able to access their downstream Subscriber, but not their upstream Publisher. A lot of operators still do, but that is not guaranteed.

That said @srnagar it sounds like you would still benefit from a YES/NO/UNCLEAR answer (ie, as long as you can identify at least most of the cold sources).

So we could have an enum-based method that attempts to go back up to the upmost Flux it can access. If that Flux can answer HOT or COLD, then this answer will be accurate. Otherwise, the method will return UNKNOWN.

That is quite a large change, and we'd need to design how to access that information. One possible less disruptive solution would be through a Scannable#scan Attribute. UNKNOWN would occur if there is no access to the source or if the source is not Scannable.

simonbasle avatar Jan 10 '20 10:01 simonbasle

@srnagar with all the caveats exposed above, do you still feel that there would be a huge benefit of getting a YES/NO/UNKNOWN type of answer? taking into consideration that this would represent quite a large effort on our part and likely wouldn't occur before 3.5 now

simonbasle avatar Oct 28 '20 11:10 simonbasle

@simonbasle Thank you for the update! I am discussing this with my team and I will have an update soon.

srnagar avatar Oct 28 '20 18:10 srnagar

@simonbasle yes/no/unknown would be a step forward. It would let us leverage this for flows we control and swap non-trivial population of user provided fluxes. When can we have it?

kasobol-msft avatar Jun 13 '22 23:06 kasobol-msft

another fundamental question / limitation is with any operator that combines multiple sources. it is not even always known in advance which sources will get combine: for example, what should a switchNext(Publisher<Publisher<?>>) answer to the question "are you hot or cold? One of the emitted and mirrored publishers might be hot at any point.

the easiest solution is we can answer UNKNOWN as soon as any such multi-source flux is involved in the chain that is being scanned.

I'm afraid this is going to be a feature with very limited usefulness and reliability, and yet that it won't be taken at face value.

There is a lot of ground to cover - basically every subclass of Flux and Mono needs to be reviewed and deal with the new attribute. We don't really have the bandwidth for this, as this is not considered a priority atm.

That said, the change is large but with the above compromise it is not necessarily complicated: add a new Attr.HEAT attribute with associated Heat enum (COLD/HOT/UNKNOWN), and every sub-class of CorePublisher having a scanUnsafe implementation that answers it and do either of:

  • return COLD (a source producer than can be proved to be cold)
  • return HOT (a source producer, an operator that caches or otherwise turns any source into a hot sequence...)
  • return UNKNOWN (any operator that combines multiple sources, any non-reactor adapter, any operator that cannot scan its source, any operator that isn't Scannable, etc...)
  • return super.scanUnsafe(Attr.HEAT)

If somebody wants to contribute, please do so.

simonbasle avatar Jun 17 '22 13:06 simonbasle