kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Flow size- and time-based chunked
Add two operators for size- and time-based chunking of Flows. Targets issues: https://github.com/Kotlin/kotlinx.coroutines/issues/1290 https://github.com/Kotlin/kotlinx.coroutines/issues/1302
New operators:
public fun <T> Flow<T>.chunked(maxSize: Int, minSize: Int = 1)
public fun <T> Flow<T>.chunked( chunkDuration: Duration, minSize: Int = 1, maxSize: Int = NO_MAXIMUM ): Flow<List<T>>
public fun <T> Flow<T>.chunked( chunkDurationMs: Long, minSize: Int = 1, maxSize: Int = NO_MAXIMUM ): Flow<List<T>>
Time based impl:
- By default starts chunking values with first emission since start of collection or last chunked emission (non-continuous time windows)
- With minSize = 0 switches to continuous time-windows: Starts chunking right away. New chunking window starts right after previous
- Can specify max size of a chunk. If specified, upon reaching max size, it emits chunked values and resets timer.
To be added:
- docs
- Annotations (@Experimental, etc.)
Please, please, read contributing guidelines first: https://github.com/Kotlin/kotlinx.coroutines/blob/master/CONTRIBUTING.md
If you introduce any new public APIs:
- All new APIs must come with documentation and tests.
- All new APIs are initially released with @ExperimentalCoroutineApi annotation and are graduated later.
- Update the public API dumps and commit the resulting changes as well. It will not pass the tests otherwise.
- If you plan large API additions, then please start by submitting an issue with the proposed API design to gather community feedback.
- Contact the maintainers to coordinate any big piece of work in advance.
Sure. I have initiated discussion in https://github.com/Kotlin/kotlinx.coroutines/issues/1302#issuecomment-725721317
Commits with lacking docs, annotations and api dump will follow shortly.
I am converting this PR to Draft until we reach some outcome in #1302 discussion.
I have:
- Updated the solution
- Added @Experimental annotations
- Updated ApiDump
Hi :) Are there any updates? The operator looks very handy and useful.
It looks to me that using onTime select clause in the provided implementation chunked(ByTime) is not efficient. It keeps executing even when the source flow is really idle.
For example, if one tries to "chunk" mouse clicks, the code in onTimeout(3.seconds) keeps running each 3 seconds regardless of whether user really clicks the mouse or just left the app running for a night.
Also, in the scenario above we probably want to buffer events while user keeps clicking and emit that buffer only when user stops. I see 2 options how to "chunk" here. We may want to a) emit something right away and cancel later as collectLatest does (responsive UI) or b) just buffer until there is duration-long silence. There are no such options in this PR.