akka icon indicating copy to clipboard operation
akka copied to clipboard

Idle-complete graph stage

Open ncreep opened this issue 2 years ago • 4 comments

Short description

Currently, the idleTimeout function throws an exception when reaching the timeout. In some cases it can useful to complete the stream successfully rather than failing it. Would there be any interest in defining a new function which is just like idleTimeout but doesn't throw an exception? If so, I can open up a pull request.

Details

An example use-case is when trying to take events from an "infinite" stream like events from Kafka. One might want to reach the "last" event and then complete. Since the stream is infinite a possible surrogate is to wait until the Source is idle for long enough and then complete.

Although it's possible to emulate this behavior with idleTimeout(...).recover() it gets clunky and seems like an abuse of exceptions for the purposes of control-flow.

Thanks

ncreep avatar Jul 04 '22 14:07 ncreep

Somewhat duplicate of https://github.com/akka/akka/issues/24951, although that is messy, let's keep this one and move it over to akka/akka.

I think that would be a nice feature, not sure what would be best API-wise, an overload of the existing with a flag idleTimeout(timeout: Duration, completeOnIdle: Boolean) or a separate name, perhaps something like completeAfterIdle(timeout)

johanandren avatar Jul 05 '22 12:07 johanandren

Note that Patrik had opinions in https://github.com/akka/akka/issues/24951#issuecomment-384222068 but I disagree that having an alternative for idle means we also need to add the same, I think completing the other operators can be turned into completion using composition.

johanandren avatar Jul 05 '22 12:07 johanandren

Thanks for the quick response. My googling skills failed me... I tried searching for a similar issue but didn't find the one you linked to.

In any case, I'm all for achieving completion by composition (sounds vaguely philosophical), but if you're referring to the approach of recovering the exception and completing, that won't work for me, as the TimeoutException that idleTimeout throws is too generic for me to be willing to catch it blindly. Or did you mean something else?

Given that the linked issue is 4 years old, I'm a bit hesitant to tackle it on my own. Though I will gladly take pointers as to how this can be resolved (even if only for the idleTimeout case).

ncreep avatar Jul 05 '22 16:07 ncreep

Currently I'm using stream with caffeine cache with an expireAfterAccess and in the RemovalListener I complete the stream. Seems like I can compose completeAfterIdle and watchTermination to get the same result.

before add this, I think should add recoverWithComplete first.

He-Pin avatar Sep 02 '22 03:09 He-Pin

@He-Pin You added the exception subclasses in https://github.com/akka/akka/pull/31640, but does that fix this issue, or is our conclusion that this feature request is not needed and we can close this ticket?

patriknw avatar Oct 14 '22 06:10 patriknw

@johanandren @ncreep I think this issue can be closed

He-Pin avatar Apr 17 '23 11:04 He-Pin