kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Add timeout operator to Flow
Enhancement: Add timeout operator to Flow
This is in response to the updated contribution guidelines, and to push for a timeout mechanism for Flow (https://github.com/Kotlin/kotlinx.coroutines/pull/2597). Currently, there is no easy way to timeout a flow if the upstream takes too long to produce an event.
Use case: A flow when collected may not always emit items at a regular interval. If this flow is using some resource intensive process or enabling some hardware, leaving it active with no events for an extended period of time is wasteful. By having a timeout operator on the flow if no events have emitted, it allows for creating flows based with a defined timeout strategy, to ensure these resources aren't being accidentally misused.
I'm not against such operator at all (on the contrary) but I am not sure this specific use case is a good example for it.
This use case seems to need the timeout on the consumer side, and for this I wonder why a withTimeout around the collect call is not sufficient. Could you please elaborate?
I would imagine that .timeout() operator for a flow would be useful for the producer side instead, so that a returned flow can be forced to have a timeout. For instance, if the producer knows that the events are too irregular and prefers to send "stub" events or something to indicate it's taking too long, it would return a Flow with the timeout operator already set up.
To give a more specific use-case, I'll go with one for Android:
With GPS location on Android, it's not always guaranteed that location events come in at the desired interval (it could vary based on the environment). It's even possible that no GPS location will come in if there is no clear signal. However, even if no signal is available, the GPS radio will remain on, draining battery power. To prevent this, you would use a timeout on the location events coming in, and if there are none after a period of time, turn off the radio.
An example of what a location emitting flow would look like (without a timeout):
fun getLocationFlow(context: Context): Flow<Location> {
val locationManager = context.getSystemService(Context.LOCATION_SERVICE) as LocationManager
return callbackFlow {
val listener = LocationListener { sendBlocking(it) }
locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, 0L, 0F, listener)
invokeOnClose { locationManager.removeUpdates(listener) }
}.buffer(Channel.UNLIMITED)
}
Every consumer of this flow could have a different requirement for what they consider to be a timeout event. One example is that these locations may not be accurate enough. In this case, a filter would be added to the flow, which filters out inaccurate locations.
getLocationFlow(context)
.filter(::locationFilteringLogic)
If locations are emitted regularly until we get to a bad signal area, where new locations have now become inaccurate enough to be filtered, I would want a timeout to occur if I received no location for the last 10 seconds. Adding the timeout to the producer wouldn't work, as it could be emitting location data regularly, even though they are filtered downstream. Using a withTimeout around collect would timeout the entire flow, even if I am still receiving valid locations. In this case, I would use a timeout just after the filter, which I would want to either throw an exception if triggered, or perform some other action of my choosing.
However, even if no signal is available, the GPS radio will remain on, draining battery power. To prevent this, you would use a timeout on the location events coming in, and if there are none after a period of time, turn off the radio.
The requirement seems reasonable, but the solution doesn't seem to help with this problem, as the upstream would probably not be affected by the timeout at all (at least in the PR, I didn't see any mechanism to re-collect the upstream flow on timeout).
So invokeOnClose won't be called on timeout AFAICT.
Using a withTimeout around collect would timeout the entire flow, even if I am still receiving valid locations
Exactly, and that is the point: cancel the flow so that the producer can clean up resources. If you want to re-subscribe later, you can always re-collect that flow.
In any case, in this example I don't believe there is a way for you to know that new valid locations are available again after a timeout without keeping the location service on in the first place. Or am I missing something?
The requirement seems reasonable, but the solution doesn't seem to help with this problem, as the upstream would probably not be affected by the timeout at all (at least in the PR, I didn't see any mechanism to re-collect the upstream flow on timeout). So invokeOnClose won't be called on timeout AFAICT.
In the PR, an exception is being thrown, which would close the the entire flow, unless the flow was a SharedFlow, in which case only the downstream would be closed. Even if you override the default (which throws a public facing FlowTimeoutException), internally the flow has already ended due to the InternalFlowTimeoutException being thrown.
Exactly, and that is the point: cancel the flow so that the producer can clean up resources. If you want to re-subscribe later, you can always re-collect that flow.
The withTimeout block would cancel the flow if collect suspended for longer than the given time. So if I wrapped the above flow in a withTimeout block with 5 seconds, I would collect 5 seconds of location before the entire flow is cancelled, regardless if the flow was still emitting or not. That doesn't solve the issue of me wanting to collect for as long as the GPS is emitting. If the GPS radio stops emitting locations for 5+ seconds, that's where I want to just turn it off, and handle that event separately.
In any case, in this example I don't believe there is a way for you to know that new valid locations are available again after a timeout without keeping the location service on in the first place. Or am I missing something?
Well... I could leave the radio on, but users would be upset for the battery drain. :-) In the above example, if I got the timeout trigger, I would probably setup a different trigger (timer, geofence, activity recognition) to notify for a retry.
Actually, this conversation is showing me a bug in my implementation. This breaks the retry operator... I'm not sure if it's because I'm using a callbackFlow or something else.
What I thought was a bug: If the timeout emits an item instead of throwing an exception, the retry operators don't work (similar thing happens if you do catch before the retry operator).
It wasn't a bug, but I did see I was misusing the scopedFlow a bit, so I made some corrections on my PR there.
In the PR, an exception is being thrown, which would close the the entire flow
Ok, I think I had missed the initial intent of the timeout operator then. For some reason I expected a continuity, basically the timeout action in my mind was just an extra hook when too much time passes since the last received value, but without implication of cancellation of the upstream, so that further values could still be received from the same upstream.
But I guess that could be achieved with a combination of timeout + retry.
if I wrapped the above flow in a withTimeout block with 5 seconds, I would collect 5 seconds of location before the entire flow is cancelled, regardless if the flow was still emitting or not.
Oh, right. Obvious overlook on my part, sorry about that.
In the PR, an exception is being thrown, which would close the the entire flow
Just as an FYI, this isn't being done any longer. The flow is still cancelled, so it's done both upstream and downstream (thus awaitClose will be called), but it's following a similar pattern to the other Delay operators. The timeout + retry still works though.
Oh, right. Obvious overlook on my part, sorry about that.
No need for apologies. We're human, not machines... or are you?

Sorry for being late for the party and thanks for the productive discussion!
The overall idea seems worth integrating (especially now when such resources can be shared via shareIn and have to be disconnected somehow) and the idea with a lambda that has FlowCollector receiver is well-aligned with our API shape and goals.
The only thing that concerns me is the naming -- it's easy to confuse with withTimeout and bufferTimeout (the latter is from project Reactor). I do not have better alternatives in mind right now, but this is something I'm going to research and discuss in our design meetings prior to 1.6.0. Stay tuned!
Sorry for being late for the party and thanks for the productive discussion!
The overall idea seems worth integrating (especially now when such resources can be shared via
shareInand have to be disconnected somehow) and the idea with a lambda that hasFlowCollectorreceiver is well-aligned with our API shape and goals.The only thing that concerns me is the naming -- it's easy to confuse with
withTimeoutandbufferTimeout(the latter is from project Reactor). I do not have better alternatives in mind right now, but this is something I'm going to research and discuss in our design meetings prior to 1.6.0. Stay tuned!
I had a similar concern about the naming when I created the PR for this, but I figured that given this would be an operator on Flow, the context of the function name would make sense. For example, withTimeout can't be used outside of a suspend function, and bufferTimeout can't be used as an operator outside of a Flux.
If you need an argument in your design meetings in favor of timeout(), feel free to use the above. Just providing context on why I went with using that name.
@qwwdfsad any update on this?
Not trying to bug too much on this issue, just a small nudge to get this back in mind for folks, @qwwdfsad.
Sorry folks, we are stretched thin right now and cannot really get back to this issue, yet it's something in our "to do in the observable future" radar