FlowExt icon indicating copy to clipboard operation
FlowExt copied to clipboard

Add time windowed chunking, `groupedWithin`

Open Dogacel opened this issue 2 years ago • 2 comments

Similar to groupedWithin in Akka, I think this is a very useful utility to have.

Akka Docs

Proposed solution

fun Flow<T>.groupedWithin(size: Int, limit: Duration): Flow<List<T>> { ... }

implementation can be based on https://github.com/Kotlin/kotlinx.coroutines/issues/1290#issuecomment-1309290613

I have modified that code slightly, I can help work on a solution based on a channel flow.

Behavior

  • Once flow reaches size items, it emits.
  • If flow can't reach size items within limit time, it emits the items collected until now, unless there is none.

Why This is very useful when we are bridging the gap between the regular APIs and streaming APIs. For example, assume you have an API to fetch SQS messages, traditionally you would implement it as

suspend fun main() {
  val sqsClient = getSqsClient()


  while (true) {
    val items = sqsClient.poll(10)
    process(items)
    delay(10.seconds)
  }
}

instead, we can use

suspend fun main() {
  val sqsClient = getSqsClient()

  flow {
    while (true) {
      val items = sqsClient.poll(10)
      items.forEach { emit(it) }
      delay(10.seconds)
    }
  }.groupedWithin(128, 30.seconds) {
    process(it)
  }
}

Dogacel avatar Oct 14 '23 13:10 Dogacel

Thank @Dogacel 🙏 for this issue.

Is this operator similar to

  • RxJs bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>
  • RxJava https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-long-java.util.concurrent.TimeUnit-io.reactivex.rxjava3.core.Scheduler-int-: public final @NonNull Observable<List<T>> buffer(long timespan, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, int count).

hoc081098 avatar Oct 15 '23 09:10 hoc081098

Thank @Dogacel 🙏 for this issue.

Is this operator similar to

  • RxJs bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>
  • RxJava https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-long-java.util.concurrent.TimeUnit-io.reactivex.rxjava3.core.Scheduler-int-: public final @NonNull Observable<List<T>> buffer(long timespan, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, int count).

I haven't used RxJava but from the explanation, it looks like it is the bufferTime described in RxJava.

Dogacel avatar Oct 15 '23 11:10 Dogacel