smallrye-mutiny icon indicating copy to clipboard operation
smallrye-mutiny copied to clipboard

UnicastProcessor cannot handle retrys

Open Omega1001 opened this issue 2 years ago • 9 comments

Context

Mutiny Version 1.6.0 from Maven central
Output of java -version

openjdk version "17.0.2" 2022-01-18
OpenJDK Runtime Environment Temurin-17.0.2+8 (build 17.0.2+8)
OpenJDK 64-Bit Server VM Temurin-17.0.2+8 (build 17.0.2+8, mixed mode, sharing)

Description

When playing around with retry I found, that is is impossible to retry if the master upstream is a UnicastProcessor. Instead of retrying it simply does nothing.

However, it works as expected, if I put a broadcast stage before my stuff.

My small investigation into this shows, that after UnicastProcessor hast lost its downstream (by cancle) it will never again accept any other downstream. Unfortuneally, this breaks retry, because it is basically resubscribing to its upstream.

What I would expect

The docu of UnicatsProcessor state that it is a :

Implementation of a processor using a queue to store items and allows a single subscriber to receive these items.

Thus it should reject downstreams if there is currently a subscription (as it is now) but (unlike now) accept an other downstream, if the previous has left (cancled)

Additional details

Code

public static void main(String[] args) throws Exception {
		AtomicInteger i = new AtomicInteger();
		UnicastProcessor<Integer> p = UnicastProcessor.create();
		
		p
		//Uncomment below line to make it work as expected
//			.broadcast().toAllSubscribers()
			.onItem().call(e -> failingUni(e))//
			.onFailure().retry().indefinitely()//
			.subscribe().with(e -> System.out.println(e), ex -> ex.printStackTrace());

		new Thread(() -> {
			try {
				while (true) {
					p.onNext(i.getAndIncrement());
					Thread.sleep(1000);
				}
			} catch (InterruptedException e) {

			}
		}).start();
	}

	private static Uni<Long> failingUni(long i) {		
		return i %3 != 2 ? Uni.createFrom().nullItem() : Uni.createFrom().failure(new Exception());
	}

Expected output

0
1
3
4
6
7
...

Actual output

0
1

Omega1001 avatar Jul 01 '22 08:07 Omega1001

This is a good point, but it's also by design:

Implementation of a processor using a queue to store items and allows a single subscriber to receive these items.

You might want to try Multi.creatFrom().deferred(() -> p) to have multiple subscribers (and UnicastProcessors ).

jponge avatar Jul 01 '22 12:07 jponge

Also, be careful with leaving the main method (not sure of your program but it might exit here despite a thread running)

jponge avatar Jul 01 '22 12:07 jponge

@jponge Thank for the swift reply.

about the main, this was only a test about how retry works, it is not an actual production snippet. It is not an issue here, since the spawned Thread keeps the program alive indefinitely If I uncomment the commented line, which does probably what you suggested, the output is as expected, so exiting the main is not what's causing this issue.

To be clear on the semantics, I interpret the quoted line from the doc as: At any given time, there is a maximum of one Subscriber receiving items from this processor. So, no two subscriber can receive items at the same time without intermediat layer to allow that, but resubscription after the previous subscriber has left should be possible, as there is no time at wicht the processer is supplying more than one subscriber at once.

If it is actually meant to selfdestruct after its subscriber leaves, at the very least the doc should be changed to reflect that, so no one else runs into this issue.

Omega1001 avatar Jul 01 '22 15:07 Omega1001

One more thing to consider: I hit this issue when calling retry on a Multi that was handed to me from somewhere else. What happend, was that instead of retrying, the pipe just died without any further events or infos (no item, no error, no completion). Identifying the UnicastProcessor as the root cause was a luck shot.

There a two things here:

  1. Even if this was expected behavior, further donwstream, the processor looks like any other multi, an thus should behave like on. Especially if you write parts for others to use, how are they supposed to know that they can't use retry because this multi is backed by a UnicastProcessor?
  2. If the do call retry anyway, there pipelines go completely dark with no hint as to why or even that it happend, because the rejected downstream is never notified, that is is in fact not beeing supplied by this upstream (no Error, no Completion, no Exception)

I rewrote the previous example to illustrate the later problem:

public static void main(String[] args) throws Exception {
		AtomicInteger i = new AtomicInteger();
		UnicastProcessor<Integer> p = UnicastProcessor.create();
		List<Integer> items = new LinkedList<>();
		
		p
		//Uncomment below line to make it work as expected
//			.broadcast().toAllSubscribers()
			.onItem().call(e -> failingUni(e))//
			.onFailure().retry().indefinitely()//
			
			.onItem().invoke(items::add)
			.onTermination().invoke(()->System.out.println("Terminated : " + items))
			
			.subscribe().with(e -> System.out.println(e), ex -> ex.printStackTrace());

		new Thread(() -> {
			try {
				while (i.get() <= 7) {
					p.onNext(i.getAndIncrement());
					Thread.sleep(1000);
				}
				p.onComplete();
			} catch (InterruptedException e) {

			}
		}).start();		
		
		System.in.read();
	}

	private static Uni<Long> failingUni(long i) {		
		return i %3 != 2 ? Uni.createFrom().nullItem() : Uni.createFrom().failure(new Exception());
	}

What changed is, that now the main is no longer exited (suspended by stdin read), but the supplier thread exits after emitting 8 items, and calls complet method on the processor. In addition, all items that make it are added to a list, that should be outputted on Termination, but that does not happen.

Expected Output

0
1
3
4
6
7
Terminated : [0, 1, 3, 4, 6, 7]
Supplier exit

Actual Output

0
1
Supplier exit

Again, if you uncomment the commented line, it actually produces the expected output.

While the fact that retry does not work is only a minor annoyance, this behavior is rather serious in my perspective.

Omega1001 avatar Jul 01 '22 16:07 Omega1001

The broadcast operator keeps the upstream alive as it dispatches to multiple subscribers, so this is expected.

Again, the UnicastProcessor semantics are to dispatch to one subscriber but it does not support re-subscription. One good reason is that you might be pushing events and in the middle they get redirected to someone else (because there's a different subscriber).

Having you tried .createFrom().emitter(...)? This might be closer to what you're looking for.

But again be careful, a failure event terminates a stream as per reactive streams semantics, retrying is one way to recover, but it has to be sensible for the upstream.

jponge avatar Jul 04 '22 06:07 jponge

@jponge thank you for the reply.

You are correct, in my scenario, I would probably be better off implementing a Emitter, if not a full subscriber my self, but let's step back for a bit:

When I started, i was looking for something that was compatible with Multi, that would allow me to put messages, buffered by a queue and found UnicastProcessor. I was aware of the possibility of implementing my own Emitter or even my own subscriber, but chose not to, because UnicastProcessor already does what I needed, or so I thought...

Let's consider this scenario: I am an API-Dev, that just wrote some piece of code that generates Objects in some way and wants to put them on a multi, so my college, who is working in the project downstream, can process these.
I choose a UnicastProcessor for the mentioned reason, but my college does not know that, and doesn't want to know either. He now processes this source as it was a normal Multi, following the Multis contract, but the UnicastProcessor does not behave as expected:

(as Multi itself seems to missing javadoc, i manly quote from the Mutiny Getting started Guides)

the pipeline is materialized for each subscription

For me that implies, that resubscription is an option.

A Multi<T> is a data stream that:

  • emits 0..n item events
  • emits a failure event
  • emits a completion event for bounded streams

I interpret that as: if the Stream is bounded, i will at least receive one event (failure or completion)

Lets assume, that i passed down to my college, that this is a bounded stream, and my college is interested in processing as much items as possible rather than waiting for one specific item with a timeout. Also he knows the Multi api and is thus aware of the retry option. So on error, he is using retry, to process more items without knowing, that he cannot do that, because the master upstream is a UnicastProcessor (ignoring the fact, that using UnicastProcessor is stupid here, because of that behavior), and even if he'd known, "its a Multi, so why not?!"

The result is, that after an error occurred, he will receive no further events including a completion event. His pipeline is now undead, but there is no indication to that. Even if the UnicastProcessor does not allow resubmission, it should at least send a terminator to pipes that subscribe to it after the first subscription, so a downstream user does not wait for a terminal event that never can occur. Otherwise, users waste loads of time to debug what they see as error, but what is actually meant by design.

What I'm saying is, that if it poses as a Multi, it should behave like one, following the "The Principle of Least Astonishment".

Omega1001 avatar Jul 04 '22 07:07 Omega1001

Well if you try to re-subscribe a UnicastProcessor you get an error event.

jponge avatar Jul 04 '22 10:07 jponge

Yes, resubscribing will produce an error.

Also, I should have added in the javadoc of the UnicastProducer - be very careful, it does things differently, it should NOT be considered totally compliant. TLDR; use an emitter.

cescoffier avatar Jul 04 '22 11:07 cescoffier

Let’s improve the docs here

jponge avatar Jul 04 '22 14:07 jponge