pekko
pekko copied to clipboard
Should the Decider of stream be a Function2?
Motivation:
With the current type Decider = Function[Throwable, Directive] , we can not know the current element that causes the exception, I think it would be better to be type Decider = Function2[Throwable,Any, Directive]
eg, The Map operator then will be:
override def onPush(): Unit = {
val current = grab(in)
try {
push(out, f(current))
} catch {
case NonFatal(ex) =>
decider(ex, current) match {
case Supervision.Stop => failStage(ex)
case _ => pull(in)
}
}
}
@mdedetrich @raboof @pjfanning wdyt?
We can see in reactor-core's Flux:
public final Flux<T> onErrorComplete(Predicate<? super Throwable> predicate) {
can we delay to 2.0.0-M2?
I'm ok, maybe the current shape is fine, otherwise we need change many code