akka-stream-contrib
akka-stream-contrib copied to clipboard
consider adding a simpler API for retrying simple flows
this comes from a co-worker, I suggested Retry
to.
in his case, we had a simple Flow[A,Future[B]]
,
which can easily be turned to Flow[A,Try[B]]
, using something like:
.mapAsync { f: Future[B] =>
val p = Promise[Try[B]]
f.onComplete(t => p.success(t))
p.future
}
but in order to do something simple like retrying at most 5 times, we had to come up with:
val flow: Flow[A,Try[B],M] = ...
val wrapped: Flow[(A,(A,Int)),(Try[B],(A,Int)),M] = Flow.fromGraph(GraphDSL.create(flow){
implicit b => {
f => {
import GraphDSL.Implicits._
val u = b.add(Unzip[A,(A,Int)]())
val z = b.add(Zip[Try[B],(A,Int)]())
u.out0 ~> f ~> z.in0
u.out1 ~> z.in1
FlowShape(u.in, z.out)
}
}
})
val s = Flow.fromFunction[A,(A,(A,Int))](a => (a,(a,5)))
s.via(Retry(wrapped){
case (_,0) => None
case (a,i) => Some(a -> (a,i-1))
}).map(_._1)
well, that's a lot of boilerplate...
So, I thought I could add something like Retry.simple(flow,retries)
which will do exactly what we did.
And maybe also something like:
def simpleWith[I,O](flow: Flow[I,O], retries: Int)(betweenRetries: (Try[O],I) => (Try[O],I))
to enable recovery attempts when needed.
not sure about this though... since it could be handled with the more basic existing API,
and it's reasonable to assume betweenRetries
will make the call "boilerplaty" anyway...
Thoughts?