streamz
streamz copied to clipboard
ENH: add interval to window/rolling
Currently we keep a certain buffer length, and emit on every new item. Might be useful to ony emit every Nth item, like Spark-streaming does https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#window-operations
This can be achieved already with normal window-aggregate followed by the Slice operator in #241 , but would save on unnecessary computations.
one pattern I need frequently is to skip a certain buffer length initially, followed by repeating take n, skip m e.g., given a stream of integers 1,2,3... skip 2, take 3, skip1 would get (3,4,5) (7,8,9) (11,12,13) ...
This is a bit like the "range" functionality
This is the kludge I had come up with. There may be better:
def kill_grab_skip( source, kill, grab, skip, element=0 ):
'''remove an initial number of elements, then repeat a fixed grab,skip pattern
Args:
source : stream node to append to
kill : number of initial elements to ignore
grab : number of elements to return
skip : number of subsequent elements to ignore
Returns:
the streamz node chain created
'''
def skip_first( x, skip):
return x[skip:]
if skip < kill:
# since skip is greater than kill: transform to partition of length (skip, kill)
p = source.slice(kill-skip, None, None ).partition(skip+grab)
if skip > 0:
p = p.map(lambda x: skip_first(x,skip))
elif skip == kill:
p = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
else:
# kill is less than skip: emit startup elements to get to the kill == skip case
p = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
for i in range(skip-kill):
source.emit(element)
return p
print('3,8,0 pattern')
source = streamz.Stream();p= kill_grab_skip(source,3,8,0); p.sink(print)
for i in range(1,30): source.emit(i)
Are you sure slice
does not do the same thing as this?
In [2]: s = streamz.Stream()
In [3]: s2 = s.slice(3, None, 8)
In [4]: s2.sink(print)
Out[4]: <sink: print>
In [6]: for i in range(1, 30):
...: s.emit(i)
...:
9
17
25
https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L771
The call to slice picks every 8th element. I need to pick a contiguous set of more than 1 element, e.g., [8,9,10], [16,17,18], ...
So yes, slice comes close, and I did use it in the function I posted above, followed by partition() to make sure I obtain the wanted data set as soon as it is available. I really dislike the emit() calls in the function though.
Not sure github issues is a good place to discuss this, though?
OK, understood. In that case, feel free to add as PR with relevant tests.