RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

Weird interaction between group_by and to_iterable?

Open ericbottard opened this issue 4 years ago • 9 comments

The following piece of code does not produce what I expect (subscription of numbers whose modulo4 is not zero, because of .take(3)):

    def test_broken(self):
        numbers = of(7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11)
        numbers.pipe(
            op.group_by(lambda x: x % 4),
            op.take(3),
            op.to_iterable(),  
            op.flat_map(lambda list: from_iterable(sorted(list, key = lambda g: g.key))),
            op.do(Observer(on_next=lambda g: print("group modulo=%s %s" % (g.key, g)), on_completed=print("groups complete"), on_error=print_tb)),
            op.flat_map(lambda g: g)
        ).subscribe(on_next=lambda e: print("subscr %s" % e), on_error=print_tb, on_completed=print("THE END"))
        time.sleep(1)

Yet, both this

    def test_to_iterable_then_from(self):
        numbers = of(7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11)
        numbers.pipe(
            op.to_iterable(),
            op.flat_map(lambda list: from_iterable(sorted(list))),
        ).subscribe(on_next=lambda e: print("subscr %s" % e))
        time.sleep(1)

which shows that emitting an iterable and then flat_mapping works ok, as well as this

    def test_grouping_behaves_correctly(self):
        numbers = of(7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11)
        numbers.pipe(
            op.group_by(lambda x: x % 4),
            op.take(3),
            #op.to_iterable(),
            #op.flat_map(lambda list: from_iterable(sorted(list, key = lambda g: g.key))),
            op.do(Observer(on_next=lambda g: print("group modulo=%s %s" % (g.key, g)), on_completed=print("groups complete"), on_error=print_tb)),
            op.flat_map(lambda g: g)
        ).subscribe(on_next=lambda e: print("subscr %s" % e), on_error=print_tb, on_completed=print("THE END"))
        time.sleep(1)

which correctly produces

groups complete
THE END
group modulo=3 <rx.core.observable.groupedobservable.GroupedObservable object at 0x10a8a3ed0>
subscr 7
group modulo=1 <rx.core.observable.groupedobservable.GroupedObservable object at 0x10a8ab0d0>
subscr 1
subscr 3
group modulo=2 <rx.core.observable.groupedobservable.GroupedObservable object at 0x10a8abad0>
subscr 2
subscr 5
subscr 6
subscr 10
subscr 9
subscr 11

(numbers 0, 4, 8, 12 are indeed absent) seem to show that it's the combination of group_by and to/from_iterable that is causing the problem.

Or I could be completely missing something obvious, but I've been banging my head on this for a couple of days now. Came up with those simplified tests, but the initial usecase is porting this piece of java Reactor code to reactive python, if you need to know.

ericbottard avatar Nov 08 '19 16:11 ericbottard

Hello, ~~I don't know actually why but it does not work when "flat mapping" multiple embeded iterables~~.

Edit: forget my answer, I was wrong with my example. I need more time to investigate, but there may be a conflict between to_iterable (blocking) operator and the type of subject used to spawn grouped observables in group_by_until.

Show initial comment

Here is a minimal example:

import rx
from rx import operators as ops

src0 = rx.of(1, 2, 3)
src1 = rx.of(3, 4, 5)
src2 = rx.of(6, 7, 8)
sources = [src0, src1, src2]

rx.just(sources).pipe(
    ops.flat_map(lambda sources: rx.from_iterable(sources)),
    ops.flat_map(lambda source: rx.from_iterable(source)),
    # EDIT: error here, source is an observable, should use instead:
    # ops.flat_map(lambda source: source),
    ).subscribe(print)

This example never ends, on_next callback receives indefinitely a pattern of 4 successive observables instead of an int, :

<rx.core.observable.observable.Observable object at 0x7f3ff0eab310>
<rx.core.observable.observable.Observable object at 0x7f3ff0eab290>
<rx.core.observable.observable.Observable object at 0x7f3ff0eab1d0>
<rx.core.observable.observable.Observable object at 0x7f3ff0eab210>

<rx.core.observable.observable.Observable object at 0x7f3ff0eab310>
<rx.core.observable.observable.Observable object at 0x7f3ff0eab290>
<rx.core.observable.observable.Observable object at 0x7f3ff0eab1d0>
<rx.core.observable.observable.Observable object at 0x7f3ff0eab210>

<rx.core.observable.observable.Observable object at 0x7f3ff0eab310>
...

jcafhe avatar Nov 09 '19 11:11 jcafhe

@ericbottard After some tests, grouped Observables created by group_by_until (and group_by) are not compatible with to_iterable, due to the type of subject used to create those observables: Subject does not retain any previous states. However, this is working when using a ReplaySubject.

It seems to be a known concern in rxjs: https://github.com/ReactiveX/rxjs/issues/2028 https://github.com/ReactiveX/rxjs/issues/1945

From the last link:

this is working as expected. The GroupedObservables emitted by groupBy are notified of events as they occur, but they don't replay them after the fact. The toArray after groupBy queues all the GroupedObservables into a list, but only emits the list when groupBy completes, at which point all the inner groups have already been notified of events.

This is solved in rxjs by adding a subjectSelector as parameter to group_by_until. So that could be a solution to your problem. By default, we keep a Subject to avoid accumulating states.

jcafhe avatar Nov 09 '19 20:11 jcafhe

Good to see we're narrowing down on where the problem lies. Thanks for looking at this!

I'm not sure I have a good enough knowledge of RxPy to understand my potential next steps. Are you saying that there is a workaround I could apply today in RxPy? Or does this require changes in RxPy anyway? I feel like the latter, but wanted to make sure.

Also, should this be left as a potential gotcha, or should it "just work" without intervention of the end user (let alone a deeper understanding of the internals of RxPy)?

ericbottard avatar Nov 12 '19 11:11 ericbottard

Yes, it would require some changes in group_by and group_by_until codes and signatures to support a subject factory, something like:

def group_by(
    key_mapper: Mapper,
    element_mapper: Optional[Mapper] = None,
    subject_mapper: Optional[Callable[[], Subject] = None,
    ) :
    ...

In return, chaining group_bywith to_iterable would force the end-user to specify explicitly a subject factory that returns a ReplaySubject (default factory would be a function that returns a Subject):

numbers.pipe(
    op.group_by(
         key_mapper=lambda x: x % 4, 
         subject_mapper=lambda: rx.subject.ReplaySubject(),
         ),
    op.take(3),
    op.to_iterable(), 
    ....
    

jcafhe avatar Nov 12 '19 14:11 jcafhe

Hi,

I tried PR #469 with the following test:

    def test_broken(self):
        numbers = of(7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11)
        numbers.pipe(
            op.group_by(
                key_mapper=lambda x: x % 4,
                subject_mapper=lambda: rx.subject.ReplaySubject(),
            ),
            op.take(3),
            op.to_iterable(),
            op.flat_map(lambda list: from_iterable(sorted(list, key=lambda g: g.key))),
            op.do(Observer(on_next=lambda g: print("group modulo=%s %s" % (g.key, g)), on_completed=print("groups complete"), on_error=print_tb)),
            op.flat_map(lambda g: g),
        ).subscribe(on_next=lambda e: print("subscr %s" % e), on_error=print_tb, on_completed=print("THE END"))
        time.sleep(1)

I'm surprised to see 1, 2, 7, 3 only in the output. Is there another (maybe different and unrelated) issue at play here?

groups complete
THE END
group modulo=1 <rx.core.observable.groupedobservable.GroupedObservable object at 0x10a368e10>
group modulo=2 <rx.core.observable.groupedobservable.GroupedObservable object at 0x10a36b990>
group modulo=3 <rx.core.observable.groupedobservable.GroupedObservable object at 0x10a368390>
subscr 1
subscr 2
subscr 7
subscr 3

As a matter of fact, messing with the modulo in the key_mapper gives weird result, so maybe I did not understand fully what specifying rx.subject.ReplaySubject() does.

ericbottard avatar Jan 13 '20 13:01 ericbottard

In fact I think take operator messes things up in your example. I will try to explain but it's not trivial.

~~The resulting observable completes at "3" because this is the first element of the third emitted group.~~ In the figure below, groups are created as follow: [3], [1], [2], [0]. take(3) will unsubscribe to the source observable as soon as it gets 3 elements, i.e. 3 groups. The 3rd emitted group is the group [2] which starts with element "2". At that precise point in time, take unsubscribes to the source observable so the upcoming numbers are not pushed.

numbers = rx.of(7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11)
'remainder:     3, 1, 3, 2, 0, 1, 0, 2, 0,  2, 1,  0,  3 '

'group 3:       7     3                               11 '
'group 1:          1           5               9         '
'group 2:                2           6     10            '
'group 0:                   4     0     8         12     '
'---------------------- time -------------------------------->

So it your particular case, you should not use take in your workflow.

Side note: don't forget to provide a function or a Callable to the on_completed parameter . Try: on_completed=lambda: print("THE END") instead of on_completed=print("THE END") because it's not printed at the right moment in your example. Same for on_completed=print("groups complete"). 👍

jcafhe avatar Jan 14 '20 10:01 jcafhe

Thanks for your explanations! (although I'm not sure I understand them all ;-) )

I would expect the take(3) to act on the "outer" (ie groups) observable, and have no impact on the "inner" (ie the contents of the groups) observable. After all, group_by returns an observable of observable, right? If take unsubscribes the inner observable, then what operator can I use to achieve the desired effect?

Also, now that I think of it, how come the elements, even if emitted as per the discussion above, appear in this order? Shouldn't they be emitted as 7 1 3 2? Is there some threading issue that I did not take care of?

(sorry for all those noob questions. As posted initially, I have minimal python + rxpy knowledge and am trying to port some java/reactor code, and bringing my expectations over...)

Thanks for the side node catch (I was wondering why logs appeared out of place. It's obvious now).

ericbottard avatar Jan 14 '20 10:01 ericbottard

I would expect the take(3) to act on the "outer" (ie groups) observable, and have no impact on the "inner" (ie the contents of the groups) observable.

Yes, I understand your expectation but I don't think it works like that. The inner observables (GroupedObservables ) are created from the same single observable (numbers). When subscribing to one of the inners, there is no cascading re-subscription to the upstream source (numbers). So you have to 'save' elements in a ReplaySubject as soon as you can, i.e. at the outer subscription time. As quoted before, with the default Subject:

The GroupedObservables emitted by groupBy are notified of events as they occur, but they don't replay them after the fact.

What you could do instead is just slicing the list before sorting:


numbers = of(7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11)
numbers.pipe(
    op.group_by(
        key_mapper=lambda x: x % 4,
        subject_mapper=lambda: rx.subject.ReplaySubject(),
        ),
    op.to_iterable(),
    op.map(lambda groups: groups[0:3]),
    op.flat_map(lambda list: from_iterable(sorted(list, key=lambda g: g.key))),

Also, now that I think of it, how come the elements, even if emitted as per the discussion above, appear in this order? Shouldn't they be emitted as 7 1 3 2? Is there some threading issue that I did not take care of?

Yes, I guess it comes from flat_map. By the way, I've made a mistake in my previous comment, see correction, but when 3 groups have been created, i.e. when pushing number "2", take unsubscribes upstream source. So we have 3 groups collected:

  • group [3] with 7, 3
  • group [1] with just 1
  • group [2] with just 2:
numbers =     (7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11)
remainder:     3, 1, 3, 2, 0, 1, 0, 2, 0,  2, 1,  0,  3 

group 3:       7     3                                         
group 1:          1                                            
group 2:                2                                      
---------------|--|-----|--------------- time ----------->
       1st group  |     |      
       created    |    3rd group created     
                  |
              2nd group
               created                                     

Then, you sort the grouped observables by their key. So we have reordered groups from [3][1][2] to [1][2][3]. Now flat_map will push concurrently elements from these groups resulting in potentially interleaved elements. There's no rules here and it depends on which scheduler you use.

  • If order doesn't matter, you should attach the key value to each element.

  • If you want to preserve order, i.e. push all elements from group[1], then push all elements from group [2], ... and you don't want to manage schedulers, you should use a combination of map and merge(max_concurent=1) instead of flat_map, by creating a custom operator:

def concat_map(mapper):
    def _concat_map(source):
        return source.pipe(
            op.map(mapper),
            op.merge(max_concurrent=1),
            )
    return _concat_map

or just :

    ...
    op.to_iterable(),
    op.map(lambda groups: groups[0:3]),
    op.map(lambda list: from_iterable(sorted(list, key=lambda g: g.key))), # not needed here
    op.merge(max_concurrent=1),
    ...
  • If you want to preserve order with scheduler, you could try to force subscription with specific scheduler but personnaly, I'm not confident with that solution.

jcafhe avatar Jan 14 '20 14:01 jcafhe

Resuming work on this thread, I am revisiting your last comments. Some clarifications I need to make, which I believe make your guidelines inapplicable to my case:

  • In my real usecase, the input stream is actually infinite
  • It is not known in advance if all reminders (in the real use case, indices of arguments passed to a streaming function) will be seen. So I artificially prefix the stream with numbers for each reminder (the max number is known ahead of time)
  • Given that the stream may be infinite, the take() operation seems required, as it is the only thing that can trigger completion and allow to_iterable() to proceed.
  • For reference, the comments in this section of code may make sense to you

So I guess my questions are those:

  • it seems I do need to use take() or similar. To make sure I get all the data (including data which happens all groups have formed), you seem to imply that I

[So you] have to 'save' elements in a ReplaySubject as soon as you can, i.e. at the outer subscription time

But I'm not sure where I can drive that. Do you mean change the nature of the initial observable (ie numbers in the example we've been using thus far) and turn it into a ReplaySubject somehow?

  • Assuming we fix the above, and mapping my real usecase to the example, does the fact that I stick dummy data in front of all numbers in order, ie
numbers = of(0, 1, 2, 3, 7, 1, 3, 2, 4, 5, 0, 6, 8, 10, 9, 12, 11, ...)
             ^^^^^^^^^^  here

somehow guarantees that the groups (based on remainders) will be emitted in order?

ericbottard avatar Apr 07 '20 13:04 ericbottard