akka-stream-contrib
akka-stream-contrib copied to clipboard
Add Debouncing
A useful bit of functionality I threw together: Given a time window, emit the latest value given a uniqueness function.
It might make sense to generalize this as a "time-aware reduce", rather than a pure "debounce"
See #60
Isn't debouncing ensuring that a single signal is propagated per timeunit? (to remove bouncing, the tendency of mechanical switches to emit multiple signals when opened or closed?)
debounce I always understood as:
a a a x x y y y z z z a a -> [ debounce ] -> a x y z a
Not sure if time by definition is part of it. The above could well be debounce(within = Duration.Inf)
@ktoso http://whatis.techtarget.com/definition/debouncing
What you talk about is: distinctConsecutive :-)
(distinctConsecutive is not time related, only order related) debounce is time related.
Right I see. Better name for what I showed anyway. (Would like to have it too ;-))
Scrubbing was the word we used for this thing when filtering market data streams.
Scrubbing to me means removing sensitive data :)
/**
* Emits elements which are passed in as long as they are not considered to be the same as the previous element emitted according the predicate function `p`.
* `p` returns *true* if the parameters are considered to be the same and *false* otherwise.
*/
distinctConsecutive(p: (T, T) => Boolean = _ equals _) = ???
I have no clue what to call this. My use case was I needed something to stick on the end of DirectoryChanges. Imagine large files being written a chunk at a time. They will be emitting a bunch of events, but what I want to know is when the file is done being written. The events may not necessarily be grouped by order if a couple processes are writing the other files (i.e. I may get a stream of "File 1 changed" "File 2 changed" "File 1 changed" "File 1 changed" "File 2 changed"). And what I want to see on the other end is "File 1 changed, File 2 changed", once each file hasn't changed for a specified amount of time.
Anyway, it's a keyed reduce that requires things to be sufficiently stale before sending them downstream and a reduction function hardcoded to be "take the last one". It would absolutely make sense to generalize that last bit to a parameter.
@joearasin Hmmm, isn't your desired behaviour possible to implement with the existing combinators?
What you talk about is: distinctConsecutive :-)
a.k.a destutter :speak_no_evil:
Let's not rehash that conversation, please. :)
Cheers, √
On Sep 9, 2016 11:06 AM, "2beaucoup" [email protected] wrote:
What you talk about is: distinctConsecutive :-)
a.k.a destutter 🙊
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/akka/akka-stream-contrib/issues/61#issuecomment-245860213, or mute the thread https://github.com/notifications/unsubscribe-auth/AAAqdzWAtoUwxRWDTPekpAn9F6z-vmIzks5qoSGVgaJpZM4J3cwU .
Isn't debouncing ensuring that a single signal is propagated per timeunit? (to remove bouncing, the tendency of mechanical switches to emit multiple signals when opened or closed?)
That would be throttling. Debounce only emits when a value hasn't changed in the specified time window.
@2beaucoup No? http://www.labbookpages.co.uk/electronics/debounce.html
I was referring to the deferred version of debounce. http://drupalmotion.com/article/debounce-and-throttle-visual-explanation has a nice overview of debounce/throttle and their variations.
@viktorklang I tried -- I couldn't get the results I wanted. If you have any insight, please let me know.
There are approaches other than this that might make sense to make the desired functionality possible. i.e. partition/groupBy that completes idle downstream after nothing has been sent downstream for a period of time. Customizable overflow behavior on groupBy as well would be useful. As an example, rather than a hard fail, if I'm at max substreams sending all excess elements to an "overflow" stream, which backpressures when full, and could then be buffered and cycled back around to the input, which is a mergePreferred between the overflow and "new" elements.
What does groupBy do if a downstream partition is completed with "takeWithin" or failed with "idleTimeout"? Does a new "slot" open up, or will I eventually run out of subStreams?
Would something like groupBy => idleTimeout => recover => reduce => merge work?
I think these concept should use proper Greek letters instead of names. I mean, what is wrong with Ξ.
Hmm, after reading docs, I think having groupby either back pressure, buffer-then-backpressure, or send excess flow elsewhere is probably the most "reusable" method towards having the desired effect.
I tried to recreate this (or something like it) with groupBy:
Flow[T]
.groupBy(1000, keyFunction)
.idleTimeout(duration)
.recoverWithRetries(-1, { case e: scala.concurrent.TimeoutException => Source.empty[T]})
.reduce((prev, curr) => curr)
.mergeSubstreams
This approach, at the very least, does not work b/c groupBy won't recreate a Subflow after the downstream has been closed.
So a test that looks like: https://github.com/akka/akka-stream-contrib/pull/60/files#diff-fbc9034f76805e1d77caa627418fbdfaR76 will fail.