streamz icon indicating copy to clipboard operation
streamz copied to clipboard

ENH: add interval to window/rolling

Open martindurant opened this issue 5 years ago • 6 comments

Currently we keep a certain buffer length, and emit on every new item. Might be useful to ony emit every Nth item, like Spark-streaming does https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#window-operations

This can be achieved already with normal window-aggregate followed by the Slice operator in #241 , but would save on unnecessary computations.

martindurant avatar May 02 '19 17:05 martindurant

one pattern I need frequently is to skip a certain buffer length initially, followed by repeating take n, skip m e.g., given a stream of integers 1,2,3... skip 2, take 3, skip1 would get (3,4,5) (7,8,9) (11,12,13) ...

ea42gh avatar Sep 25 '19 18:09 ea42gh

This is a bit like the "range" functionality

martindurant avatar Sep 25 '19 18:09 martindurant

This is the kludge I had come up with. There may be better:

def kill_grab_skip( source, kill, grab, skip, element=0 ):
    '''remove an initial number of elements, then repeat a fixed grab,skip pattern
    
    Args:
        source  : stream node to append to
        kill    : number of initial elements to ignore
        grab    : number of elements to return
        skip    : number of subsequent elements to ignore
    
    Returns:
        the streamz node chain created
    '''
    def skip_first( x, skip):
        return x[skip:]

    if skip < kill:
        # since skip is greater than kill: transform to partition of length (skip, kill)
        p      = source.slice(kill-skip, None, None ).partition(skip+grab)
        if skip > 0:
            p = p.map(lambda x: skip_first(x,skip))
    elif skip == kill:
        p      = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
    else:
        # kill is less than skip: emit startup elements to get to the kill == skip case
        p      = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
        for i in range(skip-kill):
            source.emit(element)
    return p

print('3,8,0 pattern')
source = streamz.Stream();p= kill_grab_skip(source,3,8,0); p.sink(print)
for i in range(1,30): source.emit(i)

ea42gh avatar Sep 26 '19 14:09 ea42gh

Are you sure slice does not do the same thing as this?

In [2]: s = streamz.Stream()
In [3]: s2 = s.slice(3, None, 8)
In [4]: s2.sink(print)
Out[4]: <sink: print>
In [6]: for i in range(1, 30):
   ...:     s.emit(i)
   ...:
9
17
25

https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L771

martindurant avatar Sep 27 '19 17:09 martindurant

The call to slice picks every 8th element. I need to pick a contiguous set of more than 1 element, e.g., [8,9,10], [16,17,18], ...

So yes, slice comes close, and I did use it in the function I posted above, followed by partition() to make sure I obtain the wanted data set as soon as it is available. I really dislike the emit() calls in the function though.

Not sure github issues is a good place to discuss this, though?

ea42gh avatar Sep 27 '19 19:09 ea42gh

OK, understood. In that case, feel free to add as PR with relevant tests.

martindurant avatar Oct 01 '19 12:10 martindurant