RxPY
RxPY copied to clipboard
When using `from_iterable` with a generator the generator might not be closed
Example code:
from typing import Iterator
from contextlib import contextmanager
from rx import from_iterable
from rx.operators import take
@contextmanager
def my_context():
print('enter context')
yield
print('release context')
def my_generator() -> Iterator[int]:
with my_context():
yield 1
yield 2
yield 3
obs = from_iterable(my_generator()).pipe(take(2))
obs.subscribe()
In this example the generator returned by my_generator
uses a context manager for resource management (my_context
). However the resource will never be cleaned up because the generator is not being fully consumed.
I suspect that this might be working as designed and that it's expected to use the using
mechanism to manage the lifecycle of the generator in conjunction with from_iterable
. Is this how it's supposed to work?
The take operator consumes the whole generator, but emits only the number of items asked for. With the contextmanager decorator, you need to use a try/finally construct to clean-up the resource. This works:
from typing import Iterator
from contextlib import contextmanager
from rx import from_iterable
from rx.operators import take
@contextmanager
def my_context():
print('enter context')
try:
yield
finally:
print('release context')
def my_generator() -> Iterator[int]:
with my_context():
yield 1
yield 2
yield 3
obs = from_iterable(my_generator()).pipe(take(2))
obs.subscribe()
Hi @MainRo , thanks for pointing out the glitch in the context manager. Still I think it's not working as desired. My updated code is:
from typing import Iterator
from contextlib import contextmanager
from rx import from_iterable
from rx.operators import take
@contextmanager
def my_context():
print('enter context')
try:
yield
finally:
print('release context')
def my_generator() -> Iterator[int]:
with my_context():
yield 1
yield 2
yield 3
obs = from_iterable(my_generator()).pipe(take(2))
obs.subscribe(on_completed=lambda: print('completed'))
print('after subscribe')
The result is
enter context
completed
after subscribe
release context
This is not the desired output, because the generator should be released by the take
operator, so the release context
message should appear before the completed
message. The fact that it appears at the very end shows that it's the GC process that kills the generator, not the observable chain.
Regarding your comment:
take operator consumes the whole generator
I do not think that this is the case. If this were so, then a generator that emitted an infinite sequence would never complete when used with the take
operator.
If I change the generator to
def my_generator() -> Iterator[int]:
with my_context():
while True:
yield
then the sequence does complete as expected after yielding two items. So the take
operator is definitely not consuming the full generator.
This is not the desired output, because the generator should be released by the
take
operator, so therelease context
message should appear before thecompleted
message. The fact that it appears at the very end shows that it's the GC process that kills the generator, not the observable chain.
The reference to the generator is kept as long as obs exists. At first I also thought that reference should be dropped at disposal but it is not the case, and it is not desirable: After completion of obs, it is still possible to subscribe again to obs. And in such cases, the second subscription must continue to consume from the generator. So although this behavior is not obvious I think it is the correct one. See this example:
...
def my_generator() -> Iterator[int]:
i = 0
with my_context():
while True:
yield i
i += 1
obs = from_iterable(my_generator()).pipe(take(2))
obs.subscribe(
on_next=print,
on_completed=lambda: print('completed'))
obs.subscribe(
on_next=print,
on_completed=lambda: print('completed'))
print('after subscribe')
that gives:
enter context
0
1
completed
2
3
completed
after subscribe
release context
The only way to drop the reference to the generator is to drop the reference to obs. on cpython you can do it this way:
obs.subscribe(
on_next=print,
on_completed=lambda: print('completed'))
obs = None
print('after subscribe')
But this will not work on pypy where the garbage collector will trigger after the last print statement.
then the sequence does complete as expected after yielding two items. So the take operator is definitely not consuming the full generator.
Yes, you're right. After re-reading the code I understood how this is done, and it explains a bug I have on another library where the take operator does not stop consuming the source observable !
The reference to the generator is kept as long as obs exists.
I think that makes sense because - as you mention - you can re-subscribe to the source sequence. I guess my expectation would have been that the source iterable would be closed when disposed and then re-started. Since re-start for a generator is not possible I would have expected an exception for the second subscription.
Take this example:
# obs = from_iterable(my_generator()).pipe(take(2))
obs = from_iterable((1,2,3,4,5,6)).pipe(take(2))
when subscribed twice to obs
the output is - as expected:
1
2
completed
1
2
completed
and NOT
1
2
completed
3
4
completed
So each new subscription starts the iteration again. To me it's counter intuitive that the behavior should depend on the actual kind of Iterable
. This seems to contradict the liskov substitution principle.
But maybe the source of confusion is that in Python a Generator
is also an Iterable
and not just an Iterator
(and that an Iterator
is an Iterable
?!).
So each new subscription starts the iteration again. To me it's counter intuitive that the behavior should depend on the actual kind of Iterable. This seems to contradict the liskov substitution principle.
This is a very good point. So probably the behavior should be consistent with other iterables. Unfortunately, as you said generators cannot be re-started. A possible alternative is to add a "from_generator" operator that would take a generator factory as input. This would allow to create a new generator for each subscription with behavior consistent with from_iterable on other iterable types.
In any case it's worth adding a note on the documentation.
Hi, I think that from_iterable
is working as intended. It cannot have any opinion if the iterable is it's own iterator or not.
This is similar to the iterable world:
def gen():
for x in range(10):
yield x
There is a difference between calling the generator function once or twice:
it = gen()
for x in it:
if x > 3:
break
print(x)
for x in it:
print(x)
for x in gen():
if x > 3:
break
print(x)
for x in gen():
print(x)
RxPY have no way of callng the generator function twice in from_iterable
.
To produce such behavior you can e.g defer
the creation of the observable so it's recreated for each subscribe.
obs = defer(lambda _: from_iterable(my_generator())).pipe(take(2))
What about the life-cycle handling if the iterable is actually a generator? The generator features a close
method (https://docs.python.org/3/reference/expressions.html#generator.close), wouldn't we expect this method to be invoked when the observable returned by from_iterable
gets unsubscribed?
I have implemented an idea for an observable factory that takes the life cycle of iterables into account. It works for regular iterables, generators and their asyncio equivalents.
https://github.com/Carsten-Leue/UtilsRxPY
I would appreciate to get your thoughts.