akka-stream-contrib icon indicating copy to clipboard operation
akka-stream-contrib copied to clipboard

Add Debouncing

Open joearasin opened this issue 8 years ago • 20 comments

A useful bit of functionality I threw together: Given a time window, emit the latest value given a uniqueness function.

It might make sense to generalize this as a "time-aware reduce", rather than a pure "debounce"

See #60

joearasin avatar Sep 07 '16 23:09 joearasin

Isn't debouncing ensuring that a single signal is propagated per timeunit? (to remove bouncing, the tendency of mechanical switches to emit multiple signals when opened or closed?)

viktorklang avatar Sep 08 '16 08:09 viktorklang

debounce I always understood as:

a a a x x y y y z z z a a -> [ debounce ] -> a x y z a

Not sure if time by definition is part of it. The above could well be debounce(within = Duration.Inf)

ktoso avatar Sep 08 '16 08:09 ktoso

@ktoso http://whatis.techtarget.com/definition/debouncing

What you talk about is: distinctConsecutive :-)

viktorklang avatar Sep 08 '16 08:09 viktorklang

(distinctConsecutive is not time related, only order related) debounce is time related.

viktorklang avatar Sep 08 '16 08:09 viktorklang

Right I see. Better name for what I showed anyway. (Would like to have it too ;-))

ktoso avatar Sep 08 '16 08:09 ktoso

Scrubbing was the word we used for this thing when filtering market data streams.

patriknw avatar Sep 08 '16 08:09 patriknw

Scrubbing to me means removing sensitive data :)

viktorklang avatar Sep 08 '16 08:09 viktorklang

/**
 * Emits elements which are passed in as long as they are not considered to be the same as the previous element emitted according the predicate function `p`.
 * `p` returns *true* if the parameters are considered to be the same and *false* otherwise.
 */
distinctConsecutive(p: (T, T) => Boolean = _ equals _) = ???

viktorklang avatar Sep 08 '16 09:09 viktorklang

I have no clue what to call this. My use case was I needed something to stick on the end of DirectoryChanges. Imagine large files being written a chunk at a time. They will be emitting a bunch of events, but what I want to know is when the file is done being written. The events may not necessarily be grouped by order if a couple processes are writing the other files (i.e. I may get a stream of "File 1 changed" "File 2 changed" "File 1 changed" "File 1 changed" "File 2 changed"). And what I want to see on the other end is "File 1 changed, File 2 changed", once each file hasn't changed for a specified amount of time.

joearasin avatar Sep 08 '16 14:09 joearasin

Anyway, it's a keyed reduce that requires things to be sufficiently stale before sending them downstream and a reduction function hardcoded to be "take the last one". It would absolutely make sense to generalize that last bit to a parameter.

joearasin avatar Sep 08 '16 14:09 joearasin

@joearasin Hmmm, isn't your desired behaviour possible to implement with the existing combinators?

viktorklang avatar Sep 09 '16 08:09 viktorklang

What you talk about is: distinctConsecutive :-)

a.k.a destutter :speak_no_evil:

aruediger avatar Sep 09 '16 09:09 aruediger

Let's not rehash that conversation, please. :)

Cheers, √

On Sep 9, 2016 11:06 AM, "2beaucoup" [email protected] wrote:

What you talk about is: distinctConsecutive :-)

a.k.a destutter 🙊

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/akka/akka-stream-contrib/issues/61#issuecomment-245860213, or mute the thread https://github.com/notifications/unsubscribe-auth/AAAqdzWAtoUwxRWDTPekpAn9F6z-vmIzks5qoSGVgaJpZM4J3cwU .

viktorklang avatar Sep 09 '16 09:09 viktorklang

Isn't debouncing ensuring that a single signal is propagated per timeunit? (to remove bouncing, the tendency of mechanical switches to emit multiple signals when opened or closed?)

That would be throttling. Debounce only emits when a value hasn't changed in the specified time window.

aruediger avatar Sep 09 '16 09:09 aruediger

@2beaucoup No? http://www.labbookpages.co.uk/electronics/debounce.html

viktorklang avatar Sep 09 '16 09:09 viktorklang

I was referring to the deferred version of debounce. http://drupalmotion.com/article/debounce-and-throttle-visual-explanation has a nice overview of debounce/throttle and their variations.

aruediger avatar Sep 09 '16 09:09 aruediger

@viktorklang I tried -- I couldn't get the results I wanted. If you have any insight, please let me know.

There are approaches other than this that might make sense to make the desired functionality possible. i.e. partition/groupBy that completes idle downstream after nothing has been sent downstream for a period of time. Customizable overflow behavior on groupBy as well would be useful. As an example, rather than a hard fail, if I'm at max substreams sending all excess elements to an "overflow" stream, which backpressures when full, and could then be buffered and cycled back around to the input, which is a mergePreferred between the overflow and "new" elements.

What does groupBy do if a downstream partition is completed with "takeWithin" or failed with "idleTimeout"? Does a new "slot" open up, or will I eventually run out of subStreams?

Would something like groupBy => idleTimeout => recover => reduce => merge work?

joearasin avatar Sep 09 '16 14:09 joearasin

I think these concept should use proper Greek letters instead of names. I mean, what is wrong with Ξ.

drewhk avatar Sep 09 '16 14:09 drewhk

Hmm, after reading docs, I think having groupby either back pressure, buffer-then-backpressure, or send excess flow elsewhere is probably the most "reusable" method towards having the desired effect.

joearasin avatar Sep 09 '16 19:09 joearasin

I tried to recreate this (or something like it) with groupBy:

    Flow[T]
      .groupBy(1000, keyFunction)
      .idleTimeout(duration)
      .recoverWithRetries(-1,  { case e: scala.concurrent.TimeoutException => Source.empty[T]})
      .reduce((prev, curr) => curr)
      .mergeSubstreams

This approach, at the very least, does not work b/c groupBy won't recreate a Subflow after the downstream has been closed.

So a test that looks like: https://github.com/akka/akka-stream-contrib/pull/60/files#diff-fbc9034f76805e1d77caa627418fbdfaR76 will fail.

joearasin avatar Sep 09 '16 21:09 joearasin