Add time windowed chunking, `groupedWithin`
Similar to groupedWithin in Akka, I think this is a very useful utility to have.
Proposed solution
fun Flow<T>.groupedWithin(size: Int, limit: Duration): Flow<List<T>> { ... }
implementation can be based on https://github.com/Kotlin/kotlinx.coroutines/issues/1290#issuecomment-1309290613
I have modified that code slightly, I can help work on a solution based on a channel flow.
Behavior
- Once flow reaches size items, it emits.
- If flow can't reach size items within limit time, it emits the items collected until now, unless there is none.
Why This is very useful when we are bridging the gap between the regular APIs and streaming APIs. For example, assume you have an API to fetch SQS messages, traditionally you would implement it as
suspend fun main() {
val sqsClient = getSqsClient()
while (true) {
val items = sqsClient.poll(10)
process(items)
delay(10.seconds)
}
}
instead, we can use
suspend fun main() {
val sqsClient = getSqsClient()
flow {
while (true) {
val items = sqsClient.poll(10)
items.forEach { emit(it) }
delay(10.seconds)
}
}.groupedWithin(128, 30.seconds) {
process(it)
}
}
Thank @Dogacel 🙏 for this issue.
Is this operator similar to
- RxJs bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>
- RxJava https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-long-java.util.concurrent.TimeUnit-io.reactivex.rxjava3.core.Scheduler-int-:
public final @NonNull Observable<List<T>> buffer(long timespan, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, int count).
Thank @Dogacel 🙏 for this issue.
Is this operator similar to
- RxJs bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>
- RxJava https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-long-java.util.concurrent.TimeUnit-io.reactivex.rxjava3.core.Scheduler-int-:
public final @NonNull Observable<List<T>> buffer(long timespan, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, int count).
I haven't used RxJava but from the explanation, it looks like it is the bufferTime described in RxJava.