pystreamapi icon indicating copy to clipboard operation
pystreamapi copied to clipboard

`Stream.of(...)` consumes the underlying iterable eagerly

Open adrian-herscu opened this issue 1 year ago • 2 comments

Describe the bug Stream.of(...) consumes the underlying iterable eagerly. Perhaps there is another way of wrapping an iterable?

To Reproduce

def gen() -> Generator[int, None, None]:
    for i in range(1, 4):
        print(">>")
        yield i


def should_stream0():
    for i in gen():
        print(i)


def should_stream2():
    Stream.of(gen()) \
        .map(lambda i: str(i)) \
        .for_each(print)

Expected behavior should_stream2 should behave the same as should_stream0 by printing >> interleaved with numbers.

Machine (please complete the following information):

  • linux
  • Python Version: 3.12.4
  • PyStreamAPI Version: 1.3.0

adrian-herscu avatar Aug 25 '24 05:08 adrian-herscu

BTW... from __csv_loader.py:

def __load_csv(file_path, cast, delimiter, encoding):
    """Load a CSV file and convert it into a list of namedtuples"""
    # skipcq: PTC-W6004
    with open(file_path, mode='r', newline='', encoding=encoding) as csvfile:
        csvreader = reader(csvfile, delimiter=delimiter)

        # Create a namedtuple type, casting the header values to int or float if possible
        header = __get_csv_header(csvreader)

        Row = namedtuple('Row', list(header))

        mapper = LoaderUtils.try_cast if cast else lambda x: x

        # Process the data, casting values to int or float if possible
        data = [Row(*[mapper(value) for value in row]) for row in csvreader]
    return data

what happens if the file is too big to fit in memory?...

adrian-herscu avatar Aug 27 '24 07:08 adrian-herscu

Thanks @adrian-herscu for your helping improve pystreamapi!

You are right that generators are not handled as they should be. The problem is very visible in this snippet:

def infinite_gen():
    yield from iter(int, 1)

Stream.of(infinite_gen()) \
    .map(lambda x: x * 2) \ # 💀 Infinite loop
    .limit(10) \ 
    .for_each(print)

Currently, you have to position limit at the beginning of the stream as the first intermediate operation. In Java, any order is supported as long as you use limit somewhere.

def infinite_gen():
    yield from iter(int, 1)

Stream.of(infinite_gen()) \
    .limit(10) \ 
    .map(lambda x: x * 2) \
    .for_each(print) # 👍 Working perfectly

Pystreamapi is executing all operations lazy, but internally uses lists, being the reason the whole generator is consumed with the first intermediate operation.

The best solution is to use generators internally so the generators are not completeley consumed.

Note: The missing pattern in your output is not necessarily always an issue, since there are intermediate ops which require the whole generator to be consumed (sorted, distinct, etc.)

Tasks to complete

This issue can be split into the following three tasks

  • [x] Adapt the sequential implementation to support infinite sources: #98
  • [x] Change the parallelization strategy used to support consuming infinite sources
  • [x] ~~Adapt the parallel implementation to use the new parallelizer and (possibly) remove joblib dependency~~

garlontas avatar Sep 02 '24 11:09 garlontas

Closing this issue as infinite generators are now handled correctly in sequential streams. In parallel streams, we cannot support (infinite) generators. If you use Stream.of(...) and pass a generator, it will always create a sequential stream. If your source is a finite generator, and you need parallelization, make sure to use the .parallel() intermediate operation to explicitly convert your stream to a parallel one.

garlontas avatar Jun 05 '25 15:06 garlontas