`Stream.of(...)` consumes the underlying iterable eagerly
Describe the bug
Stream.of(...) consumes the underlying iterable eagerly.
Perhaps there is another way of wrapping an iterable?
To Reproduce
def gen() -> Generator[int, None, None]:
for i in range(1, 4):
print(">>")
yield i
def should_stream0():
for i in gen():
print(i)
def should_stream2():
Stream.of(gen()) \
.map(lambda i: str(i)) \
.for_each(print)
Expected behavior
should_stream2 should behave the same as should_stream0 by printing >> interleaved with numbers.
Machine (please complete the following information):
- linux
- Python Version: 3.12.4
- PyStreamAPI Version: 1.3.0
BTW... from __csv_loader.py:
def __load_csv(file_path, cast, delimiter, encoding):
"""Load a CSV file and convert it into a list of namedtuples"""
# skipcq: PTC-W6004
with open(file_path, mode='r', newline='', encoding=encoding) as csvfile:
csvreader = reader(csvfile, delimiter=delimiter)
# Create a namedtuple type, casting the header values to int or float if possible
header = __get_csv_header(csvreader)
Row = namedtuple('Row', list(header))
mapper = LoaderUtils.try_cast if cast else lambda x: x
# Process the data, casting values to int or float if possible
data = [Row(*[mapper(value) for value in row]) for row in csvreader]
return data
what happens if the file is too big to fit in memory?...
Thanks @adrian-herscu for your helping improve pystreamapi!
You are right that generators are not handled as they should be. The problem is very visible in this snippet:
def infinite_gen():
yield from iter(int, 1)
Stream.of(infinite_gen()) \
.map(lambda x: x * 2) \ # 💀 Infinite loop
.limit(10) \
.for_each(print)
Currently, you have to position limit at the beginning of the stream as the first intermediate operation.
In Java, any order is supported as long as you use limit somewhere.
def infinite_gen():
yield from iter(int, 1)
Stream.of(infinite_gen()) \
.limit(10) \
.map(lambda x: x * 2) \
.for_each(print) # 👍 Working perfectly
Pystreamapi is executing all operations lazy, but internally uses lists, being the reason the whole generator is consumed with the first intermediate operation.
The best solution is to use generators internally so the generators are not completeley consumed.
Note: The missing pattern in your output is not necessarily always an issue, since there are intermediate ops which require the whole generator to be consumed (sorted, distinct, etc.)
Tasks to complete
This issue can be split into the following three tasks
- [x] Adapt the sequential implementation to support infinite sources: #98
- [x] Change the parallelization strategy used to support consuming infinite sources
- [x] ~~Adapt the parallel implementation to use the new parallelizer and (possibly) remove
joblibdependency~~
Closing this issue as infinite generators are now handled correctly in sequential streams. In parallel streams, we cannot support (infinite) generators.
If you use Stream.of(...) and pass a generator, it will always create a sequential stream.
If your source is a finite generator, and you need parallelization, make sure to use the .parallel() intermediate operation to explicitly convert your stream to a parallel one.