smallrye-mutiny
smallrye-mutiny copied to clipboard
UnicastProcessor cannot handle retrys
Context
Mutiny Version 1.6.0 from Maven central
Output of java -version
openjdk version "17.0.2" 2022-01-18
OpenJDK Runtime Environment Temurin-17.0.2+8 (build 17.0.2+8)
OpenJDK 64-Bit Server VM Temurin-17.0.2+8 (build 17.0.2+8, mixed mode, sharing)
Description
When playing around with retry I found, that is is impossible to retry if the master upstream is a UnicastProcessor. Instead of retrying it simply does nothing.
However, it works as expected, if I put a broadcast stage before my stuff.
My small investigation into this shows, that after UnicastProcessor hast lost its downstream (by cancle) it will never again accept any other downstream. Unfortuneally, this breaks retry, because it is basically resubscribing to its upstream.
What I would expect
The docu of UnicatsProcessor state that it is a :
Implementation of a processor using a queue to store items and allows a single subscriber to receive these items.
Thus it should reject downstreams if there is currently a subscription (as it is now) but (unlike now) accept an other downstream, if the previous has left (cancled)
Additional details
Code
public static void main(String[] args) throws Exception {
AtomicInteger i = new AtomicInteger();
UnicastProcessor<Integer> p = UnicastProcessor.create();
p
//Uncomment below line to make it work as expected
// .broadcast().toAllSubscribers()
.onItem().call(e -> failingUni(e))//
.onFailure().retry().indefinitely()//
.subscribe().with(e -> System.out.println(e), ex -> ex.printStackTrace());
new Thread(() -> {
try {
while (true) {
p.onNext(i.getAndIncrement());
Thread.sleep(1000);
}
} catch (InterruptedException e) {
}
}).start();
}
private static Uni<Long> failingUni(long i) {
return i %3 != 2 ? Uni.createFrom().nullItem() : Uni.createFrom().failure(new Exception());
}
Expected output
0
1
3
4
6
7
...
Actual output
0
1
This is a good point, but it's also by design:
Implementation of a processor using a queue to store items and allows a single subscriber to receive these items.
You might want to try Multi.creatFrom().deferred(() -> p)
to have multiple subscribers (and UnicastProcessor
s ).
Also, be careful with leaving the main
method (not sure of your program but it might exit here despite a thread running)
@jponge Thank for the swift reply.
about the main, this was only a test about how retry works, it is not an actual production snippet. It is not an issue here, since the spawned Thread keeps the program alive indefinitely If I uncomment the commented line, which does probably what you suggested, the output is as expected, so exiting the main is not what's causing this issue.
To be clear on the semantics, I interpret the quoted line from the doc as: At any given time, there is a maximum of one Subscriber receiving items from this processor. So, no two subscriber can receive items at the same time without intermediat layer to allow that, but resubscription after the previous subscriber has left should be possible, as there is no time at wicht the processer is supplying more than one subscriber at once.
If it is actually meant to selfdestruct after its subscriber leaves, at the very least the doc should be changed to reflect that, so no one else runs into this issue.
One more thing to consider: I hit this issue when calling retry on a Multi that was handed to me from somewhere else. What happend, was that instead of retrying, the pipe just died without any further events or infos (no item, no error, no completion). Identifying the UnicastProcessor as the root cause was a luck shot.
There a two things here:
- Even if this was expected behavior, further donwstream, the processor looks like any other multi, an thus should behave like on. Especially if you write parts for others to use, how are they supposed to know that they can't use retry because this multi is backed by a UnicastProcessor?
- If the do call retry anyway, there pipelines go completely dark with no hint as to why or even that it happend, because the rejected downstream is never notified, that is is in fact not beeing supplied by this upstream (no Error, no Completion, no Exception)
I rewrote the previous example to illustrate the later problem:
public static void main(String[] args) throws Exception {
AtomicInteger i = new AtomicInteger();
UnicastProcessor<Integer> p = UnicastProcessor.create();
List<Integer> items = new LinkedList<>();
p
//Uncomment below line to make it work as expected
// .broadcast().toAllSubscribers()
.onItem().call(e -> failingUni(e))//
.onFailure().retry().indefinitely()//
.onItem().invoke(items::add)
.onTermination().invoke(()->System.out.println("Terminated : " + items))
.subscribe().with(e -> System.out.println(e), ex -> ex.printStackTrace());
new Thread(() -> {
try {
while (i.get() <= 7) {
p.onNext(i.getAndIncrement());
Thread.sleep(1000);
}
p.onComplete();
} catch (InterruptedException e) {
}
}).start();
System.in.read();
}
private static Uni<Long> failingUni(long i) {
return i %3 != 2 ? Uni.createFrom().nullItem() : Uni.createFrom().failure(new Exception());
}
What changed is, that now the main is no longer exited (suspended by stdin read), but the supplier thread exits after emitting 8 items, and calls complet method on the processor. In addition, all items that make it are added to a list, that should be outputted on Termination, but that does not happen.
Expected Output
0
1
3
4
6
7
Terminated : [0, 1, 3, 4, 6, 7]
Supplier exit
Actual Output
0
1
Supplier exit
Again, if you uncomment the commented line, it actually produces the expected output.
While the fact that retry does not work is only a minor annoyance, this behavior is rather serious in my perspective.
The broadcast operator keeps the upstream alive as it dispatches to multiple subscribers, so this is expected.
Again, the UnicastProcessor
semantics are to dispatch to one subscriber but it does not support re-subscription.
One good reason is that you might be pushing events and in the middle they get redirected to someone else (because there's a different subscriber).
Having you tried .createFrom().emitter(...)
? This might be closer to what you're looking for.
But again be careful, a failure event terminates a stream as per reactive streams semantics, retrying is one way to recover, but it has to be sensible for the upstream.
@jponge thank you for the reply.
You are correct, in my scenario, I would probably be better off implementing a Emitter, if not a full subscriber my self, but let's step back for a bit:
When I started, i was looking for something that was compatible with Multi, that would allow me to put messages, buffered by a queue and found UnicastProcessor. I was aware of the possibility of implementing my own Emitter or even my own subscriber, but chose not to, because UnicastProcessor already does what I needed, or so I thought...
Let's consider this scenario: I am an API-Dev, that just wrote some piece of code that generates Objects in some way and wants to put them on a multi, so my college, who is working in the project downstream, can process these.
I choose a UnicastProcessor for the mentioned reason, but my college does not know that, and doesn't want to know either.
He now processes this source as it was a normal Multi, following the Multis contract, but the UnicastProcessor does not behave as expected:
(as Multi itself seems to missing javadoc, i manly quote from the Mutiny Getting started Guides)
the pipeline is materialized for each subscription
For me that implies, that resubscription is an option.
A Multi<T> is a data stream that:
- emits 0..n item events
- emits a failure event
- emits a completion event for bounded streams
I interpret that as: if the Stream is bounded, i will at least receive one event (failure or completion)
Lets assume, that i passed down to my college, that this is a bounded stream, and my college is interested in processing as much items as possible rather than waiting for one specific item with a timeout. Also he knows the Multi api and is thus aware of the retry option. So on error, he is using retry, to process more items without knowing, that he cannot do that, because the master upstream is a UnicastProcessor (ignoring the fact, that using UnicastProcessor is stupid here, because of that behavior), and even if he'd known, "its a Multi, so why not?!"
The result is, that after an error occurred, he will receive no further events including a completion event. His pipeline is now undead, but there is no indication to that. Even if the UnicastProcessor does not allow resubmission, it should at least send a terminator to pipes that subscribe to it after the first subscription, so a downstream user does not wait for a terminal event that never can occur. Otherwise, users waste loads of time to debug what they see as error, but what is actually meant by design.
What I'm saying is, that if it poses as a Multi, it should behave like one, following the "The Principle of Least Astonishment".
Well if you try to re-subscribe a UnicastProcessor
you get an error event.
Yes, resubscribing will produce an error.
Also, I should have added in the javadoc of the UnicastProducer - be very careful, it does things differently, it should NOT be considered totally compliant. TLDR; use an emitter.
Let’s improve the docs here