RxPY
RxPY copied to clipboard
using timeout with GroupedObservable leads to an index shift
I was trying to achieve the following:
- use group_by to partition data stream
- for each group: 2.1. apply a timeout 2.2. re-emit the first item if timeout has passed
A minimal example
from rx import Observable
o = Observable.zip(Observable.interval(100), Observable.range(0, 30), lambda x, y: y).publish()
o.group_by(lambda x: x//10)\
.flat_map(lambda x: x.timeout(90, other=x.first().map(lambda x: (x, 'first'))))\
.subscribe(lambda x: print(x),
lambda x: print('err {}'.format(x)),
lambda : print('done'))
o.connect()
input()
output
0 (1, 'first') 10 (11, 'first') 20 (21, 'first') done
changing the timeout duration to 101 gives the following output:
0 1 2 3 (4, 'first') 10 11 (12, 'first') 20 21 (22, 'first') done