kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

throttleFirst Flow operator

Open sum-elier opened this issue 6 years ago • 22 comments

Is there a Flow equivalent for this Reactive stream operator? I guess it might be derived from the sample operator.

Is it possible to implement this operator using only the publicly exposed api? I tried to modify the sample operator implementation but since some parts of it are internal it isn't possible.

sum-elier avatar Aug 18 '19 22:08 sum-elier

Is there a Flow equivalent for this Reactive stream operator?

No, but we have plans to implement its analogue (actually, the whole family of time-based sampling).

Is it possible to implement this operator using only the publicly exposed api?

Modulo some cancellation details. For ad-hoc solution you can replace scopedFlow with coroutineScope { flow { ... } }

qwwdfsad avatar Aug 20 '19 08:08 qwwdfsad

I see, thanks. What are your plans regarding custom operator implementation? I am still learning Flow API, but this makes me think that not every operator could be implemented because of internal API, is it?

sum-elier avatar Aug 24 '19 00:08 sum-elier

Every operator should be functionally implementable via some public API. Internal API is only needed for highly-efficient (optimized) implementations. In particular, throttleFirst is relative easy to implement. See here quite a straightforward implementation: https://pl.kotl.in/kKgI484X-

elizarov avatar Aug 27 '19 11:08 elizarov

Thanks for the response. Your implementation seems to work alright, but if I were to use it with TestCoroutineScope I wouldn't be able to advanceTime during tests. Is there a way to solve this?

PS : I came with this implementation also for throttleFirst, would it be enough or is it missing something (e.g. coroutineScope?)

@FlowPreview
@ExperimentalCoroutinesApi
fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
   var lastEmissionTime = 0L
   collect { upstream ->
      val currentTime = System.currentTimeMillis()
      val mayEmit = currentTime - lastEmissionTime > windowDuration
      if (mayEmit)
      {
         lastEmissionTime = currentTime
         emit(upstream)
      }
   }
}

sum-elier avatar Aug 28 '19 03:08 sum-elier

@fjna You'll need additional code to support TestCoroutineScope. Instead of using System.currentTimeMillis() you shall use the following snippet:

val time = (coroutineContext[ContinuationInterceptor] as? DelayController)?.currentTime ?: System.currentTimeMillis()

As for your implementation it looks fine apart from the code style (extra new later before {). It has different behavior which might be exactly what you wanted, though.

Btw, that's the key challenge in providing time-related operators out-of-the-box, since there are some many subtly different ways to implement them.

elizarov avatar Aug 28 '19 07:08 elizarov

It seems like it would be appropriate for all the time based operators to use the standard library's new Clock API, since it's already multiplatform and uses the correct time source for time measurement (currentTimeMillis can move backwards). It might even make sense for CoroutineDispatchers in general to be associated with a Clock, which could (I think, if the testing support used the API as well) eliminate the need for the special-casing for test context support:

val clock = (coroutineContext[ContinuationInterceptor] as? CoroutineDispatcher)?.clock ?: MonoClock

This would obviously be a big, ambitious change involving coroutine internals, but for now at least I think the time operators could still use Clock internally and to abstract out the test context support.

zach-klippenstein avatar Aug 28 '19 11:08 zach-klippenstein

@zach-klippenstein Yes. Created a separate issue for it: #1499

elizarov avatar Sep 03 '19 14:09 elizarov

Is there any activity related to this issue going on 😄 ?

pavlospt avatar Mar 28 '20 16:03 pavlospt

Is there any activity related to this issue going on 😄 ?

Not yet.

elizarov avatar Mar 30 '20 08:03 elizarov

@elizarov It looked a little bit difficult to prepare your code snipped for tests, or at least I didn't find any easy way, so I've tried to implement it a little bit differently. Maybe this can be useful for other folks too!

I'm worried that maybe I have overlooked something, though. Do you think this implementation could work? Are there any inherent disadvantages to it? (such as leaking some scope or something similar).

EDIT: One limitation I am aware of, is that when collecting an indefinite flow (such as BroadcastChannel), after first item is emitted, and nobody closes the channel in the meantime, the internal coroutine needs time to get finished. In tests then, one needs to move time of TestCoroutineDispatcher, otherwise unfinished coroutine exception is thrown in test. That is definitely not ideal for use in the Coroutines library itself, but for my project that's an acceptable limitation.

fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> {
    var job: Job = Job().apply { complete() }

    return onCompletion { job.cancel() }.run {
        flow {
            coroutineScope {
                collect { value ->
                    if (!job.isActive) {
                        emit(value)
                        job = launch { delay(windowDuration) }
                    }
                }
            }
        }
    }
}

and I have following tests for it:

    @Test
    fun throttleFirst() = runBlockingTest {
        val testDispatcher = TestCoroutineDispatcher()
        val values = mutableListOf<Int>()
        val flow = (1..10).asFlow().onEach { delay(200) }
        flow
            .throttleFirst(500)
            .flowOn(testDispatcher)
            .onEach { values.add(it) }
            .launchIn(this)

        testDispatcher.advanceTimeBy(2000)

        assertEquals(listOf(1, 4, 7, 10), values)
    }

    @Test
    fun throttleFirstWithError() = runBlockingTest {
        val testDispatcher = TestCoroutineDispatcher()
        val values = mutableListOf<Int>()
        val flow = (1..10).asFlow()
            .onEach { delay(200) }
            .onEach { if (it == 2) throw IllegalStateException() }
        flow
            .throttleFirst(500)
            .flowOn(testDispatcher)
            .onEach { values.add(it) }
            .catch {  }
            .launchIn(this)

        testDispatcher.advanceTimeBy(400)

        assertEquals(listOf(1), values)
    }

lukas1 avatar May 07 '20 13:05 lukas1

please check my lib https://github.com/hoc081098/FlowExt

  • https://github.com/hoc081098/FlowExt
  • https://hoc081098.github.io/FlowExt/docs/latest/-flow-ext/com.hoc081098.flowext/throttle-time.html

hoc081098 avatar Mar 24 '22 18:03 hoc081098

To whoever still looking for the Flow equivalent of throttleLatest, here is how to implement them by composing existing operators:

fun <T> Flow<T>.throttleLatest(delayMillis: Long): Flow<T> = this
    .conflate()
    .transform {
        emit(it)
        delay(delayMillis)
    }

Many thanks to the person that gave me this solution on the Kotlin Slack.

thibseisel avatar Jul 28 '22 12:07 thibseisel

Also to whoever is looking for this, I've made a simple helper method called throttle for creating an intermediate flow that essentially does the same as @thibseisel's solution.

fun <T> Flow<T>.throttle(periodMillis: Long): Flow<T> {
    if (periodMillis < 0) return this
    return flow {
        conflate().collect { value ->
            emit(value)
            delay(periodMillis)
        }
    }
}

Can be used as follows:

flow {
    for (num in 1..15) {
        emit(num)
        delay(25)
    }
}.throttle(100)
 .onEach { println(it) }
 .collect()
 // This will print 1, 5, 9, 13, 15

Not sure what difference it would make compared to the above solution, but meh.

flamewave000 avatar Dec 14 '22 21:12 flamewave000

Every operator should be functionally implementable via some public API. Internal API is only needed for highly-efficient (optimized) implementations. In particular, throttleFirst is relative easy to implement. See here quite a straightforward implementation: https://pl.kotl.in/kKgI484X-

Why windowStartTime += delta / windowDuration * windowDuration but not just windowStartTime += delta?

PenzK avatar Apr 19 '23 08:04 PenzK

This way, the moments when the emissions are allowed are [startTime, startTime + windowDuration, startTime + windowDuration * 2, startTime + windowDuration * 3, ...]. If it was just windowStartTime += delta, the emission times would be [startTime, timeOfFirstEmission + windowDuration, timeOfSecondEmission + windowDuration, ...].

dkhalanskyjb avatar Apr 19 '23 10:04 dkhalanskyjb

Opened this topic in 2023, because there is still no throttleLatest in the standard kotlinx.coroutines lib (1.6.4).

@thibseisel thank you for your concise implementation.

@elizarov please consider to change your attitude, Kotlin was advertised as a concise language, I guess many developers came to Kotlin because they didn't like a Java's verbosity. Also many of them look at Coroutines/Flow as a replacement of RxJava (it doesn't mean it's better, but advertisement is definitely higher). So it's really surprising when you have to write such basic operators every time like if you find yourself in 2010. By the way it's really convenient how Kotlin team implemented hints about other rx operators. Thank you for this.

p.s. For anyone who is not sure about the solution, I wrote a quick test (one more time consumer, because we wouldn't spend time for testing of std kotlinx.coroutines lib): https://gist.github.com/Andrew0000/6d4477642cfadc7822742bd7986e79f4

Andrew0000 avatar Jun 02 '23 08:06 Andrew0000