pekko
pekko copied to clipboard
Feature request: RestartWithBackOffAndFallback for Flow & Sink
Motivation: I saw in https://github.com/akka/akka/issues/30267 @nvollmar added this feature for actor, but I think it would be nice to have this in pekko-stream, where user can logging the error with a restart/resume, and currently ,we just simply ignore it.
I found the related case from @tg44 in https://discuss.lightbend.com/t/recover-from-sink-exceptions/10554
I think this should be very useful for real production usecase.
Result: More handy control over the exceptions
You can do this already due to this change https://github.com/apache/incubator-pekko/pull/252 which has already landed.
You can just make your own supervision decider, i.e.
val decider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream", e)
Supervision.Stop
}
or am I missing something?
Yes, I was expecting some factory method, eg Decider.resume(level:LoggingLevel), let's wait for some feedbacks form @tg44 and @nvollmar
So, my initial problem was that I have a substream which has a fileIO in it, the fileIO was only a safety measure, so if we loose the db connection we don't loose (too much) data. Then one day the fileIO failed (we couldn't recreate it bcs it opened a file which failed due to hosting provider issues), and it brought down the whole stream, and we lost hours of data when we finally noticed that the stream is down...
My idea was to lazily create a sink, if it fails I can recreate it, if it fails multiple times I can simply change it to a Sink.ignore if I want to.
@tg44 So if by Sub Stream you mean SubFlow the issue you describe is partially solved in Pekko 1.1.x with this PR (which isn't released yet but there are snapshots if you want to try). You can read the migration notes here
What I mean by partially is that as is noted in the migration notes we don't support Supervision.restart. Initially when implementing that PR I had difficulties supporting Supervision.restart but since there seems to be some demand for it I can look into it if you confirm that Supervision.restart support for SubFlow's is indeed what you are actually asking for.
@He-Pin Aside from what I just said I don't think we need to add anything else for Pekko, it appears that @tg44 just wants support for automatic restarting of SubFlow's
@mdedetrich I think what's @tg44 needs is :
- a RestartSink, with is currently missing, maybe that can be done with the RestartFlow.to(IgnoreSink)
- a fallback, but for the sink, the main sink was a
fileIOsink ,but after some max retry, switch to aSink.ignore.
Data Source X
|
\|/
Datas --> Process A ----> Process FileIO Sink (main, can retry with backoff restart)
|
| (only switch after the main sink die and restart up to the max limit)
|
|----> fallback Ignore Sink(logging) maybe.
And I think this is what he really want.
@He-Pin But he is talking about SubFlow, so from what he seems to be saying is that he just wants the SubFlow to have the ability to restart (and this means by implication if the SubFlow is just a sink then the Sink will restart).
Then you can just do
val restartingDecider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream, restarting", e)
Supervision.Restart
}
source
.splitAfter(_ == somePredicate)
.withAttributes(ActorAttributes.supervisionStrategy(restartingDecider))
@mdedetrich But your proposal can not swith to an ignore sink.
I see you just updated a pr, what about extends RestartFlow to support a fallback too.
@He-Pin https://github.com/apache/incubator-pekko/pull/981 is what I was thinking of (still needs to be tested, there may be some mistake)
But your proposal can not swith to an ignore sink.
Its not necessary though, you can do this yourself by making a custom sink with Partition?
An out of box seems better, I think some kind of ZIO ZSink's orElse but only switch after some kind of maxRetriesWithBackOff.
An out of box seems better, I think some kind of ZIO ZSink's
orElsebut only switch after some kind of maxRetriesWithBackOff.
Yeah using Partition right now is a bit clunky because of how manual it is but thats another topic
@tg44 Let me know if I am on the right track and if so I will work on https://github.com/apache/incubator-pekko/pull/981
My usecase looks like this;
Data Source X
|
\|/
Datas
|
\|/
Process A
|
\|/
wireTap()----> Process FileIO Sink (main, can retry with backoff restart)
| | (if it is restarted with supervision it will fail at every retry effectively killing the database save)
\|/ | (only switch after the main sink die and restart up to the max limit)
database save |
|----> fallback Ignore Sink(logging) maybe.
I think we don't have a Sink which can handle the right side of the graph. It could be something like;
def sinkWithFallback[T, Mat](factory: () => (exception: Option[Throwable]) => Sink[T, Mat]): Sink[T, Mat]
And it should be easily implemented from lazySink, but lazySink is internal (or at least the akka version I'm using rn has it in internal), so it is messy...
(We need the factory to be like in mapConcat so we can add a factory like
() => {
var retries = 0
(exception: Option[Throwable]) => {
retries += 1
if(retries > 100) {
Sink.ignore
} else {
someProbablyFailingSink
}
}
}
also, sorry if I'm not using proper scala syntax, I code in ts a lot recently :( )
@tg44 So just to confirm, buy sub stream you didn't mean split/when (which creates a SubFlow) but instead wireTap().
Also is there a reason why you aren't using alsoTo instead of wireTap? Usually for your scenario you would use alsoTo since wireTap will not backpressure the flow it is tapped into (hence wireTap is typically only used for logging/metrics and the such)
@mdedetrich Nah, when I started to work with akka streams we tend to call every "part of the stream after a fanout" as a substream, and groupBy and the "real" substream/subflow as a term was introduced/standardized later on. (I was a really really early adopter, I think I have merged PRs almost every akka repos (akka, akka-http, I wrote/rewrote components to alpakka).) So sorry for the misunderstanding.
For me the fileIO is kinda a log to a file, so I choose wireTap with an intention, I have buffers and logrotator and everything in between the wireTap and the actual fileIO, I can allow to not write everything to the fileLog, but I need to write everything to the database as fast as possible.
Yes, I love your contritions @tg44 and I used it too.
And I think what's you did is some kind of Isolate strong and weak dependencies to prevent strong dependencies from being affected by weak dependencies
Okay so in the end this seems to be a request for an easier dsl for constructing a specific Sink and while https://github.com/apache/incubator-pekko/pull/981 is valid its also not related.
Now that I think of it, this honestly seems to be asking for a Sink equivalent of RetryFlow.withBackoff, @He-Pin wdyt
@mdedetrich That's true, but we can extends to Retry/Restart*.withBackOff/fallback, that's what I have in mind.