RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

When using `from_iterable` with a generator the generator might not be closed

Open Carsten-Leue opened this issue 4 years ago • 8 comments

Example code:

from typing import Iterator
from contextlib import contextmanager

from rx import from_iterable
from rx.operators import take

@contextmanager
def my_context():
    print('enter context')
    yield
    print('release context')

def my_generator() -> Iterator[int]:
    with my_context():
        yield 1
        yield 2
        yield 3

obs = from_iterable(my_generator()).pipe(take(2))

obs.subscribe()

In this example the generator returned by my_generator uses a context manager for resource management (my_context). However the resource will never be cleaned up because the generator is not being fully consumed.

I suspect that this might be working as designed and that it's expected to use the using mechanism to manage the lifecycle of the generator in conjunction with from_iterable. Is this how it's supposed to work?

Carsten-Leue avatar Nov 03 '20 21:11 Carsten-Leue

The take operator consumes the whole generator, but emits only the number of items asked for. With the contextmanager decorator, you need to use a try/finally construct to clean-up the resource. This works:

from typing import Iterator
from contextlib import contextmanager

from rx import from_iterable
from rx.operators import take

@contextmanager
def my_context():
    print('enter context')
    try:
        yield
    finally:
        print('release context')

def my_generator() -> Iterator[int]:
    with my_context():
        yield 1
        yield 2
        yield 3

obs = from_iterable(my_generator()).pipe(take(2))

obs.subscribe()

MainRo avatar Nov 03 '20 22:11 MainRo

Hi @MainRo , thanks for pointing out the glitch in the context manager. Still I think it's not working as desired. My updated code is:

from typing import Iterator
from contextlib import contextmanager

from rx import from_iterable
from rx.operators import take

@contextmanager
def my_context():
    print('enter context')
    try:
        yield
    finally:
        print('release context')

def my_generator() -> Iterator[int]:
    with my_context():
        yield 1
        yield 2
        yield 3

obs = from_iterable(my_generator()).pipe(take(2))

obs.subscribe(on_completed=lambda: print('completed'))

print('after subscribe')

The result is

enter context
completed
after subscribe
release context

This is not the desired output, because the generator should be released by the take operator, so the release context message should appear before the completed message. The fact that it appears at the very end shows that it's the GC process that kills the generator, not the observable chain.

Regarding your comment:

take operator consumes the whole generator

I do not think that this is the case. If this were so, then a generator that emitted an infinite sequence would never complete when used with the take operator. If I change the generator to

def my_generator() -> Iterator[int]:
    with my_context():
        while True:
            yield

then the sequence does complete as expected after yielding two items. So the take operator is definitely not consuming the full generator.

Carsten-Leue avatar Nov 04 '20 09:11 Carsten-Leue

This is not the desired output, because the generator should be released by the take operator, so the release context message should appear before the completed message. The fact that it appears at the very end shows that it's the GC process that kills the generator, not the observable chain.

The reference to the generator is kept as long as obs exists. At first I also thought that reference should be dropped at disposal but it is not the case, and it is not desirable: After completion of obs, it is still possible to subscribe again to obs. And in such cases, the second subscription must continue to consume from the generator. So although this behavior is not obvious I think it is the correct one. See this example:

...
def my_generator() -> Iterator[int]:
    i = 0
    with my_context():
        while True:
            yield i
            i += 1

obs = from_iterable(my_generator()).pipe(take(2))
obs.subscribe(
    on_next=print,
    on_completed=lambda: print('completed'))
obs.subscribe(
    on_next=print,
    on_completed=lambda: print('completed'))
print('after subscribe')

that gives:

enter context
0
1
completed
2
3
completed
after subscribe
release context

The only way to drop the reference to the generator is to drop the reference to obs. on cpython you can do it this way:

obs.subscribe(
    on_next=print,
    on_completed=lambda: print('completed'))
obs = None
print('after subscribe')

But this will not work on pypy where the garbage collector will trigger after the last print statement.

then the sequence does complete as expected after yielding two items. So the take operator is definitely not consuming the full generator.

Yes, you're right. After re-reading the code I understood how this is done, and it explains a bug I have on another library where the take operator does not stop consuming the source observable !

MainRo avatar Nov 04 '20 21:11 MainRo

The reference to the generator is kept as long as obs exists.

I think that makes sense because - as you mention - you can re-subscribe to the source sequence. I guess my expectation would have been that the source iterable would be closed when disposed and then re-started. Since re-start for a generator is not possible I would have expected an exception for the second subscription.

Take this example:

# obs = from_iterable(my_generator()).pipe(take(2))
obs = from_iterable((1,2,3,4,5,6)).pipe(take(2))

when subscribed twice to obs the output is - as expected:

1
2
completed
1
2
completed

and NOT

1
2
completed
3
4
completed

So each new subscription starts the iteration again. To me it's counter intuitive that the behavior should depend on the actual kind of Iterable. This seems to contradict the liskov substitution principle.

But maybe the source of confusion is that in Python a Generator is also an Iterable and not just an Iterator (and that an Iterator is an Iterable ?!).

Carsten-Leue avatar Nov 04 '20 22:11 Carsten-Leue

So each new subscription starts the iteration again. To me it's counter intuitive that the behavior should depend on the actual kind of Iterable. This seems to contradict the liskov substitution principle.

This is a very good point. So probably the behavior should be consistent with other iterables. Unfortunately, as you said generators cannot be re-started. A possible alternative is to add a "from_generator" operator that would take a generator factory as input. This would allow to create a new generator for each subscription with behavior consistent with from_iterable on other iterable types.

In any case it's worth adding a note on the documentation.

MainRo avatar Nov 05 '20 21:11 MainRo

Hi, I think that from_iterable is working as intended. It cannot have any opinion if the iterable is it's own iterator or not.

This is similar to the iterable world:

def gen():
    for x in range(10):
        yield x

There is a difference between calling the generator function once or twice:

it = gen()

for x in it:
    if x > 3:
        break
    print(x)

for x in it:
    print(x)
for x in gen():
    if x > 3:
        break
    print(x)

for x in gen():
    print(x)

RxPY have no way of callng the generator function twice in from_iterable.

To produce such behavior you can e.g defer the creation of the observable so it's recreated for each subscribe.

obs = defer(lambda _: from_iterable(my_generator())).pipe(take(2))

dbrattli avatar Nov 06 '20 06:11 dbrattli

What about the life-cycle handling if the iterable is actually a generator? The generator features a close method (https://docs.python.org/3/reference/expressions.html#generator.close), wouldn't we expect this method to be invoked when the observable returned by from_iterable gets unsubscribed?

Carsten-Leue avatar Dec 18 '20 15:12 Carsten-Leue

I have implemented an idea for an observable factory that takes the life cycle of iterables into account. It works for regular iterables, generators and their asyncio equivalents.

https://github.com/Carsten-Leue/UtilsRxPY

I would appreciate to get your thoughts.

Carsten-Leue avatar Dec 20 '20 11:12 Carsten-Leue