streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Don't maintain state for some objects

Open ashaffer opened this issue 4 years ago • 0 comments

Hey, thanks for this library, it's great. I'm building an ML application that is learning in an online way. So, it's constantly acquiring new feature rows, but also having to serve up predictions at the same time. I want to use Streamz to accumulate data, however, for my real feature rows, I want the accumulators to maintain state (say, a rolling mean). But for my predictions, I don't want the state to get accumulated into the roller.

As an example, let's say i'm recording temperature measurements once every 15 seconds. Every 15 seconds I want to generate a new feature vector that contains the rolling mean of the last 25 temperatures. However, inside of that 15 second window, I may get asked to serve up a prediction, for which I want to use the current temperature in the rolling mean, but I don't want the current temperature to be accumulated into the roller's state.

Is there a straightforward way to do this currently?

EDIT: For now i'm doing this:

    state = None
    if hasattr(stream, 'state'):
        state = stream.state
    
    return {
        'state': state,
        'children': [saveStreamState(s) for s in stream.downstreams]
    }


def restoreStreamState(stream, stateTree):
    stream.state = stateTree['state']
    for subStream, subTree in zip(stream.downstreams, stateTree['children']):
        restoreStreamState(subStream, subTree)

Which seems to work. It'd be cool if there was some way to do it natively, though.

ashaffer avatar Apr 21 '20 08:04 ashaffer