akka-stream-contrib icon indicating copy to clipboard operation
akka-stream-contrib copied to clipboard

consider adding a simpler API for retrying simple flows

Open hochgi opened this issue 8 years ago • 0 comments

this comes from a co-worker, I suggested Retry to. in his case, we had a simple Flow[A,Future[B]], which can easily be turned to Flow[A,Try[B]], using something like:

.mapAsync { f: Future[B] =>
  val p = Promise[Try[B]]
  f.onComplete(t => p.success(t))
  p.future
}

but in order to do something simple like retrying at most 5 times, we had to come up with:

val flow: Flow[A,Try[B],M] = ...

val wrapped: Flow[(A,(A,Int)),(Try[B],(A,Int)),M] = Flow.fromGraph(GraphDSL.create(flow){
  implicit b => {
    f => {
      import GraphDSL.Implicits._

      val u = b.add(Unzip[A,(A,Int)]())
      val z = b.add(Zip[Try[B],(A,Int)]())

      u.out0 ~> f ~> z.in0
      u.out1   ~>    z.in1

      FlowShape(u.in, z.out)
    }
  }
})

val s = Flow.fromFunction[A,(A,(A,Int))](a => (a,(a,5)))

s.via(Retry(wrapped){
  case (_,0) => None
  case (a,i) => Some(a -> (a,i-1))
}).map(_._1)

well, that's a lot of boilerplate... So, I thought I could add something like Retry.simple(flow,retries) which will do exactly what we did.

And maybe also something like:

def simpleWith[I,O](flow: Flow[I,O], retries: Int)(betweenRetries: (Try[O],I) => (Try[O],I))

to enable recovery attempts when needed. not sure about this though... since it could be handled with the more basic existing API, and it's reasonable to assume betweenRetries will make the call "boilerplaty" anyway...

Thoughts?

hochgi avatar Sep 04 '16 14:09 hochgi