reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Flux.cache(0) doesn't replay the terminal signal

Open plcarmel opened this issue 2 years ago • 5 comments

Flux.cache(0) doesn't replay the terminal signal for late subscribers, contrary to what is stated in the documentation.

Here is an example:

jshell> var f = Flux.just(1,2,3,4).cache(0).log().doOnTerminate(() -> System.err.println("hello world"))

jshell> f.subscribe()
[ INFO] (main) onSubscribe(FluxPublish.PublishInner)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
[ INFO] (main) onNext(4)
[ INFO] (main) onComplete()
hello world

jshell> f.subscribe()
[ INFO] (main) onSubscribe(FluxPublish.PublishInner)
[ INFO] (main) request(unbounded)

No complete signal and thus no "hello world".

Tested with 3.4.17 and 3.4.22

Documentation says: "Note that cache(0) will only cache the terminal signal without expiration. "

Which I understand as meaning that the signal will be replayed for late subscribers.

plcarmel avatar Aug 23 '22 18:08 plcarmel

Thank you for the report. We are aware of the issue and it might require some significant design decisions to fix. Unfortunately, for the time being, I have no immediate workaround for the situation where the source terminates.

For full disclosure, the cause for this is that when the underlying reactor.core.publisher.FluxPublish.PublishSubscriber (returned from cache(0)) receives a terminal signal from the source, it sets its' state to a disconnected state and requires a new connect() call to be made (and cache(0) internally uses autoConnect() that does not re-connect upon termination of source). Which won't produce the desired result in the presented case, as that would cause the late subscriber receive the entire stream.

chemicL avatar Aug 25 '22 12:08 chemicL

I am not an expert, but to me, what needs to be done is quite simple.

Obviously, FluxPublish should not be used for cache(0). The comment in Flux.cache(int history) is misleading at best. replay(0) makes a lot of sense, because it produces (or not) something that is valuable in itself: the terminal signal.

		if (history == 0) {
			//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
			return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
		}

That leaves two options:

  1. Make FluxReplay work for history = 0
  2. Create a FluxReplayZero, or something like that, to handle that special case, if it is too complicated to modify FluxReplay so that it can handle history = 0

Did I miss something ?

plcarmel avatar Aug 25 '22 19:08 plcarmel

The request rate (if it is the right term) of FluxReplay is equal to the history size, but I am not really sure I understand why...

If replay(history) is supposed to transform a cold flux into a hot flux, couldn't it just send a request(unbounded) signal once and be done with it ?

plcarmel avatar Aug 25 '22 19:08 plcarmel

Thanks for the suggestions @plcarmel. We had a bit of a discussion around this problem and the background of the current state of affairs. It looks like we can try to incorporate your suggestions and make this case work as expected. I'll experiment and try to implement it. Can you elaborate a bit on the last comment? From my understanding the requests to the upstream will depend on the replay history and has to do with the buffer consumption. It can only request more once all the subscribers got the onNext signals - then it's possible to discard the item that is out of scope of the history. To be able to request(unbounded), an infinite buffer would be necessary to guarantee that no items are lost by the subscribers, and that's not really practical. Please guide me towards what you are thinking if I'm missing the point.

chemicL avatar Aug 29 '22 14:08 chemicL

No problem @chemicL, thank you for taking the time to address this.

Concerning request(unbounded), you are right and I am clearly still struggling at mastering back-pressure concepts.

The problem is that I was testing FluxReplay with one subscriber that cancelled its subscription after a while and I saw that FluxReplay kept asking for data in chunks of "historySize" size and I was like "what's the point of asking for data by chunks if you are going go ask for everything anyway and sending it to nowhere ?".

There is no back-pressure where there are no subscribers, but there is some when someone subscribes, and the fact that the Flux is hot doesn't mean that it doesn't respect back-pressure, it just means that it runs even when there are no subscribers. I understand that better now.

~~An other funny thing is that there is a discontinuity at historySize=0. Request rate goes down as historySize tends toward 0, but when it is exactly 0, since there are no data to remember, request(unbounded) is the way to go.~~

Damn it, I did it again. ~~For historySize=0, the way to go is to transmit the request from the subscriber as-is.~~

No, because there can be multiple subscribers. We have to use request(1) or we would have to keep a tally of the requested amount of each subscriber and use the current minimum value for the next request. Or do whatever publish() does I guess.

plcarmel avatar Aug 30 '22 01:08 plcarmel

@plcarmel please have a look at the latest 3.4.x snapshot and test with your use case. Feedback is welcome. The implementation resides in FluxPublish instead of FluxReplay as the latter is based around buffers - the buffer size instructs the prefetch size used in request(n). For 0 a buffer is unnecessary, that case is more similar to FluxPublish.

chemicL avatar Oct 03 '22 08:10 chemicL

Thank you @chemicL, that's awesome ! Yes, I think having the implementation in FluxPublish was the way to go after all. I just tested it and it works well.

And thank you for the caveat in your commit message. It would be nice if it could make its way in the documentation (warning: this case is not implemented yet ...).

From the commit message:

For cases with expiry (TTL arguments), FluxReplay does indeed reset itself. Therefore, the behaviour for the time constrained values remains as before, using FluxPublish implementation for the 0 history case, but without caching terminals, while not honouring the TTL. This case can be later implemented if needed.

plcarmel avatar Oct 04 '22 14:10 plcarmel

@plcarmel would you like to submit a PR with updates to the javadoc so it reflects what you'd expect to read? I guess I wouldn't do a better job at it than a conscious user :)

chemicL avatar Oct 05 '22 07:10 chemicL

Slowly looking into this.

plcarmel avatar Oct 11 '22 14:10 plcarmel

@chemicL, couldn't cache(0,ttl) simply return cache(0) ? If we remember only the terminal signal, the ttl becomes irrelevant.

I could create a new ticket for this, and an associated PR. That would be an easy first code contribution for me.

plcarmel avatar Oct 17 '22 13:10 plcarmel

That sounds like a good idea. It still does not honour the TTL as other cases of non-zero history, which would reset themselves after the TTL. But it is an improvement to the current fallback to plain FluxPublish, so please go ahead with opening an issue and providing a PR and that can drive a further discussion if necessary. Please do remember also to support the replay(...) operator, which is the one backing cache(...).

chemicL avatar Oct 17 '22 13:10 chemicL

cache(Queues.SMALL_BUFFER_SIZE, ttl).cache(0) ?

That way, we will be honouring the TTL and we will cache only the terminal signal.

For replay: cache(Queues.SMALL_BUFFER_SIZE, ttl).replay(0)

plcarmel avatar Oct 17 '22 18:10 plcarmel

The fetching behaviour would be different though (than with only FluxPublish). Using FluxReplay directly, for cache(Queues.SMALL_BUFFER_SIZE, ttl), might give enough flexibility to achieve the same fetching behaviour. I will be looking into this.

plcarmel avatar Oct 17 '22 19:10 plcarmel

Ugh, please don't comment on this, I don't think it would work. I will do my homework and test my things first.

plcarmel avatar Oct 17 '22 19:10 plcarmel