streamz
streamz copied to clipboard
Reset accumulate after something happens
One could imagine using accumulate to sum things together (or do something else) until some threshold is met and then starting the average all over again (resetting the result to the starting value). What would be the best way to do this?
Maybe we have an accumulate which takes in two streams, the first has the data and the second exists just to send a reset message (or even a new start value?). This way we could reset based off of some property of the data, eg we reached some threshold (via cyclic graphing) or reset based on something completely different occurring somewhere else in the graph.
Perhaps you can put this logic into your accumulator function?
funny, i was working on this yesterday. What @mrocklin mentioned is what I'm doing. Basically, have an stop condition and initial condition in accumulator:
def acc(prevstate, nextstate):
# initial condition
if prevstate is None:
return nextstate
# stop condition (reset to initial condition)
if nextstate is None:
# if prevstate will get set to None
return None
# rest of acc logic
(assumes returns_state=False
here)
If you want to be cleaner, prev
could be a tuple of mydata, state
where state gives information on clearing or not. This ensures more homogeneity downstream.
It's messy, but I feel that enforcing a global clearing mechanism might be too restrictive, considering the stream can already do this. I'd def like to see a use case where it's necessary though. I haven't used this feature so much.
just thoughts, what do you think? I'm interested in exploring more concise methods. About a stream that resets another, I would vote against that as we'd have to worry about synchronizing yet another stream, when we may not always have to.
EDIT : Changed next
to nextstate
just to avoid any confusion. thanks @CJ-Wright
Just as a heads up, I would not use next
as an arg name since it shadows a python builtin.