pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Should the Decider of stream be a Function2?

Open He-Pin opened this issue 3 months ago • 2 comments

Motivation: With the current type Decider = Function[Throwable, Directive] , we can not know the current element that causes the exception, I think it would be better to be type Decider = Function2[Throwable,Any, Directive]

eg, The Map operator then will be:

      override def onPush(): Unit = {
        val current = grab(in)
        try {
          push(out, f(current))
        } catch {
          case NonFatal(ex) =>
            decider(ex, current) match {
              case Supervision.Stop => failStage(ex)
              case _                => pull(in)
            }
        }
      }

@mdedetrich @raboof @pjfanning wdyt?

We can see in reactor-core's Flux:

	public final Flux<T> onErrorComplete(Predicate<? super Throwable> predicate) {

He-Pin avatar Sep 30 '25 16:09 He-Pin

can we delay to 2.0.0-M2?

pjfanning avatar Sep 30 '25 17:09 pjfanning

I'm ok, maybe the current shape is fine, otherwise we need change many code

He-Pin avatar Sep 30 '25 17:09 He-Pin