akka
akka copied to clipboard
Idle-complete graph stage
Short description
Currently, the idleTimeout
function throws an exception when reaching the timeout. In some cases it can useful to complete the stream successfully rather than failing it. Would there be any interest in defining a new function which is just like idleTimeout
but doesn't throw an exception? If so, I can open up a pull request.
Details
An example use-case is when trying to take events from an "infinite" stream like events from Kafka. One might want to reach the "last" event and then complete. Since the stream is infinite a possible surrogate is to wait until the Source
is idle for long enough and then complete.
Although it's possible to emulate this behavior with idleTimeout(...).recover()
it gets clunky and seems like an abuse of exceptions for the purposes of control-flow.
Thanks
Somewhat duplicate of https://github.com/akka/akka/issues/24951, although that is messy, let's keep this one and move it over to akka/akka.
I think that would be a nice feature, not sure what would be best API-wise, an overload of the existing with a flag idleTimeout(timeout: Duration, completeOnIdle: Boolean)
or a separate name, perhaps something like completeAfterIdle(timeout)
Note that Patrik had opinions in https://github.com/akka/akka/issues/24951#issuecomment-384222068 but I disagree that having an alternative for idle means we also need to add the same, I think completing the other operators can be turned into completion using composition.
Thanks for the quick response. My googling skills failed me... I tried searching for a similar issue but didn't find the one you linked to.
In any case, I'm all for achieving completion by composition (sounds vaguely philosophical), but if you're referring to the approach of recovering the exception and completing, that won't work for me, as the TimeoutException
that idleTimeout
throws is too generic for me to be willing to catch it blindly. Or did you mean something else?
Given that the linked issue is 4 years old, I'm a bit hesitant to tackle it on my own. Though I will gladly take pointers as to how this can be resolved (even if only for the idleTimeout
case).
Currently I'm using stream with caffeine cache with an expireAfterAccess
and in the RemovalListener
I complete the stream.
Seems like I can compose completeAfterIdle
and watchTermination
to get the same result.
before add this, I think should add recoverWithComplete
first.
@He-Pin You added the exception subclasses in https://github.com/akka/akka/pull/31640, but does that fix this issue, or is our conclusion that this feature request is not needed and we can close this ticket?
@johanandren @ncreep I think this issue can be closed