RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

using timeout with GroupedObservable leads to an index shift

Open copper-ctl opened this issue 7 years ago • 0 comments

I was trying to achieve the following:

  1. use group_by to partition data stream
  2. for each group: 2.1. apply a timeout 2.2. re-emit the first item if timeout has passed

A minimal example

from rx import Observable

o = Observable.zip(Observable.interval(100), Observable.range(0, 30), lambda x, y: y).publish()

o.group_by(lambda x: x//10)\
    .flat_map(lambda x: x.timeout(90, other=x.first().map(lambda x: (x, 'first'))))\
    .subscribe(lambda x: print(x),
               lambda x: print('err {}'.format(x)),
               lambda : print('done'))

o.connect()
input()

output

0 (1, 'first') 10 (11, 'first') 20 (21, 'first') done

changing the timeout duration to 101 gives the following output:

0 1 2 3 (4, 'first') 10 11 (12, 'first') 20 21 (22, 'first') done

copper-ctl avatar Jan 10 '18 19:01 copper-ctl