asyncstdlib icon indicating copy to clipboard operation
asyncstdlib copied to clipboard

[Bug]: `asyncstdlib.tee()` doesn't preserve buffered state when creating multiple consumers

Open Atry opened this issue 6 months ago • 13 comments

What happened?

The asyncstdlib.tee() function does not behave consistently with Python's built-in itertools.tee() when creating multiple consumers from an already-advanced iterator. Specifically, asyncstdlib.tee() advances all consumers to the current position of the source iterator, while itertools.tee() correctly preserves a buffer of previously yielded values.

Environment

  • asyncstdlib version: 3.13.1
  • Python version: 3.12.10
  • Operating System: Linux

Expected Behavior

When using itertools.tee() with Python's built-in iterators, creating a new consumer from an already-advanced source iterator should provide access to previously yielded values through internal buffering.

Actual Behavior

When using asyncstdlib.tee(), creating a new consumer from an already-advanced source iterator causes the new consumer to start from the current position, losing access to previously yielded values.

Root Cause Analysis

The issue appears to be that asyncstdlib.tee() doesn't maintain the same buffering mechanism as itertools.tee(). In the standard library implementation, when creating a new consumer from an existing tee'd iterator, the new consumer gains access to the internal buffer that contains previously yielded values.

In asyncstdlib.tee(), it seems that new consumers are created at the current position of the source iterator without access to the historical buffer, causing them to miss previously yielded values.

Minimal Reproducible Example

# Standard library behavior (correct):
# Standard library behavior (correct):
import itertools

# Create initial source
(source,) = itertools.tee(itertools.count(), 1)

# Advance source to position 1
print(next(source))  # 0

# Create first consumer after advancing source
consumer1, = itertools.tee(source, 1)

# Advance source to position 3

print(next(source))  # 1
print(next(source))  # 2
print(next(source))  # 3

# Create second consumer after advancing even more
consumer2, = itertools.tee(source, 1)

# Advance source further
print(next(source))  # 4
print(next(source))  # 5


# Now test the consumers
print(next(consumer1))  # Outputs: 1 (starts from buffered position)
print(next(consumer2))  # Outputs: 4 (starts from buffered position)
print(next(consumer1))  # Outputs: 2 (continues from buffer)
print(next(consumer2))  # Outputs: 5 (continues from buffer)
# asyncstdlib behavior (incorrect):
import itertools
import asyncstdlib

async def infinite_counter():
    for i in itertools.count():
        yield i

# Create initial source
(source,) = asyncstdlib.tee(infinite_counter(), 1)

# Advance source to position 1
await anext(source)  # 0

# Create first consumer after advancing source
consumer1, = asyncstdlib.tee(source, 1)

# Advance source to position 3
await anext(source)  # 1
await anext(source)  # 2
await anext(source)  # 3

# Create second consumer after advancing even more
consumer2, = asyncstdlib.tee(source, 1)

# Advance source further
await anext(source)  # 4
await anext(source)  # 5


# Now test the consumers
print(await anext(consumer1))  # Outputs: 6 (should be 1!)
print(await anext(consumer2))  # Outputs: 7 (should be 4!)
print(await anext(consumer1))  # Outputs: 8 (should be 2!)
print(await anext(consumer2))  # Outputs: 9 (should be 5!)

Request Assignment [Optional]

  • [x] I already understand the cause and want to submit a bugfix.

Atry avatar Aug 13 '25 18:08 Atry

According to https://docs.python.org/3/library/itertools.html#:~:text=When%20the%20input%20iterable%20is%20already,rather%20than%20a%20chain%20of%20calls.

When the input iterable is already a tee iterator object, all members of the return tuple are constructed as if they had been produced by the upstream tee() call. This “flattening step” allows nested tee() calls to share the same underlying data chain and to have a single update step rather than a chain of calls.

Atry avatar Aug 13 '25 18:08 Atry

Are these tests AI generated? Their alleged output doesn't match what I get when running them.

That said, you are correct that the output is different for the stdlib and asyncstdlib variants.

maxfischer2781 avatar Aug 14 '25 15:08 maxfischer2781

You keep selecting that you want to submit a bugfix. Is that actually the case? I would assign these tickets to you, then.

maxfischer2781 avatar Aug 14 '25 16:08 maxfischer2781

It looks like the quoted section only appeared in Python 3.13.

maxfischer2781 avatar Aug 14 '25 16:08 maxfischer2781

Are these tests AI generated? Their alleged output doesn't match what I get when running them.

Yes. I created some tests in a Jupyter notebook and asked AI to summarize them. Unfortunately, AI hallucinated some order of function calls.

I just updated the bug report to fix the outputs.

Atry avatar Aug 14 '25 19:08 Atry

You keep selecting that you want to submit a bugfix. Is that actually the case? I would assign these tickets to you, then.

Yes. I am willing to submit a bugfix, as long as you think the behavior actually needs to be fixed.

Atry avatar Aug 14 '25 19:08 Atry

It looks like the quoted section only appeared in Python 3.13.

According to https://github.com/python/cpython/issues/123884#issuecomment-2342829955 , it was documented from the beginning, but I cannot find the document about it before Python 3.13.

Atry avatar Aug 14 '25 19:08 Atry

You keep selecting that you want to submit a bugfix. Is that actually the case? I would assign these tickets to you, then.

Yes. I am willing to submit a bugfix, as long as you think the behavior actually needs to be fixed.

In fact, I would propose the following implementation:

_YieldType = TypeVar("_YieldType", covariant=True)


@dataclass(frozen=True)
class AsyncIteratorBackedIterator(Iterator[Awaitable[_YieldType]], Generic[_YieldType]):
    """
    An infinite iterator of awaitables backed by an async iterator.

    This iterator will never raise :py:class:`StopIteration` directly, but will
    continuously yield awaitables that, when awaited, produce values or
    raise exceptions from the underlying async iterator.
    """

    async_iterator: Final[AsyncIterator[_YieldType]]

    def __next__(self) -> Awaitable[_YieldType]:
        """
        Return the next awaitable from the async iterator.

        .. note::
            This method never raises :py:class:`StopIteration` or any other exception
            directly. Instead, it returns an awaitable that, when awaited,
            will either yield the next value or raise an exception.
        """
        return self.async_iterator.__anext__()


@dataclass(frozen=True)
class IteratorBackedAsyncIterator(AsyncIterator[_YieldType], Generic[_YieldType]):
    """
    An async iterator backed by an infinite iterator of awaitables.

    This adapter transforms an infinite iterator of awaitables into an async
    iterator by awaiting each item from the underlying iterator.
    """

    iterator: Iterator[Awaitable[_YieldType]]
    """
    An infinite iterator yielding awaitable objects.

    This iterator should never raise :py:class:`StopIteration` or any other exception 
    directly, but rather yield infinite number of awaitables that can be awaited to obtain 
    the next value or raise an exception, including :py:class:`StopAsyncIteration` to indicate the end of this :py:class:`IteratorBackedAsyncIterator`.
    """

    def __anext__(self):
        return self.iterator.__next__()


@dataclass(frozen=True)
class FutureIteratorBackedAsyncIterator(
    IteratorBackedAsyncIterator[_YieldType], Generic[_YieldType]
):
    """
    An async iterator backed by an infinite iterator of Future objects.

    This specialized implementation is designed for sharing async iterators
    across multiple consumers. The internal :py:attr:`iterator` can be shared
    via :py:func:`itertools.tee`, making it useful for implementing
    :py:func:`tee` and similar functions.
    """

    iterator: Iterator[Future[_YieldType]]
    """
    An infinite iterator yielding Future objects.
    
    Each Future can be awaited multiple times, enabling this iterator to be 
    safely shared across multiple consumers via :py:func:`itertools.tee`.
    """


def tee(
    async_iterable: AsyncIterable[_YieldType], n: int = 2
) -> Sequence[FutureIteratorBackedAsyncIterator[_YieldType]]:
    """
    Create multiple independent async iterators from a single async iterable.

    This is the async equivalent of :py:func:`itertools.tee`, providing the same semantics
    and behavior for async iteration. Like :py:func:`itertools.tee`, this function creates
    ``n`` independent iterators that can be consumed at different rates while
    sharing the underlying data.

    :param async_iterable: The source async iterable to split. If this is a
        :py:class:`FutureIteratorBackedAsyncIterator` (from a previous :py:func:`tee` call),
        it will be specially handled to support the "flattening step" behavior
        by directly using its underlying iterator instead of wrapping it again.
    :param n: Number of independent iterators to create (default: 2)
    :returns: A tuple of ``n`` independent async iterators

    .. note::
        Flattening Step Behavior: Like :py:func:`itertools.tee`, this function
        implements a "flattening step" optimization. When the input
        ``async_iterable`` is already a :py:class:`FutureIteratorBackedAsyncIterator`
        (i.e., a result from a previous :py:func:`tee` call), all returned iterators
        are constructed as if they had been produced by the upstream :py:func:`tee`
        call. This allows nested :py:func:`tee` calls to share the same underlying
        data chain and have a single update step rather than a chain of calls,
        improving efficiency and making the iterators efficiently peekable.

    .. warning::
        Memory Usage: Like :py:func:`itertools.tee`, if iterators are consumed at
        very different rates, the one that's ahead will cache values for the
        ones that are behind, potentially using significant memory.

    Examples::

        >>> async def source():
        ...     for i in range(3):
        ...         yield i

        >>> iter1, iter2 = tee(source())
        >>> list1 = [x async for x in iter1]  # [0, 1, 2]
        >>> list2 = [x async for x in iter2]  # [0, 1, 2]

        >>> # Different consumption rates
        >>> iter1, iter2 = tee(source())
        >>> first = await anext(iter1)        # 0
        >>> all_of_iter2 = [x async for x in iter2]  # [0, 1, 2]
        >>> rest_of_iter1 = [x async for x in iter1]  # [1, 2]

        >>> # Flattening step: nested tee() calls share data efficiently
        >>> iter1, iter2 = tee(source())
        >>> [iter1_copy] = tee(iter1, 1)  # Shares iter1's underlying data
    """
    return tuple(
        map(
            FutureIteratorBackedAsyncIterator[_YieldType],
            itertools.tee(
                (
                    async_iterable.iterator
                    if isinstance(async_iterable, FutureIteratorBackedAsyncIterator)
                    else map(
                        asyncio.ensure_future,
                        AsyncIteratorBackedIterator(aiter(async_iterable)),
                    )
                ),
                n,
            ),
        )
    )

Considering the backward incomplate behavior changes, I would create a new module (perhaps asyncstdlib.aitertools?) and implement all funcions as wrappers of itertools like the above tee implementation.

Atry avatar Aug 14 '25 22:08 Atry

Sorry, I just have to ask again: Is this AI generated? This is extremely convoluted and doesn't match the conventions of the rest of the library. The three-layer back-and-forth could be simplified considerably, I am not even sure why IteratorBackedAsyncIterator exists at all. asyncstdlib is agnostic to the event loop, so using asyncio.Future is a no-go. It should definitely have the usual cleanup means of aclose and async with.

A fix should most likely extend the tee_peer (likely turning it into a class) to allow sharing the buffers; reusing the buffer instead of creating a new one for every peer is likely necessary for that.

maxfischer2781 avatar Aug 15 '25 18:08 maxfischer2781

Sorry, I just have to ask again: Is this AI generated?

No implementation code is generated by AI. The docstring is partially generated by AI. Also I asked AI to create some tests, which are not pasted here.

This is extremely convoluted and doesn't match the conventions of the rest of the library. The three-layer back-and-forth could be simplified considerably, I am not even sure why IteratorBackedAsyncIterator exists at all.

I can see why it might look that way, but if you strip out the docstrings, the implementation is quite minimal. The IteratorBackedAsyncIterator class might seem out of place, but it's designed as a generic adapter for other itertools functions. My apologies for not including the full context: I was planning to use it to wrap other itertools functions as well, not just tee.

asyncstdlib is agnostic to the event loop, so using asyncio.Future is a no-go.

My understanding was that since asyncio.ensure_future gets the future from the currently running loop, the code remains functionally agnostic rather than being hardcoded to one loop implementation. Perhaps this interpretation doesn't align with the library's conventions. In any case, the core mechanism relies on a "future-like" object to cache the result, which I believe is the correct primitive for this task.

It should definitely have the usual cleanup means of aclose and async with.

I intentionally chose not to add aclose to the peer iterators to mirror the behavior of itertools.tee, where you cannot close the upstream iterator from a downstream one. My view is that the user who created the original async iterable is still able to close it by explicitly call aclose on the upstream generator. The downstream iterators do not hold any expensive resources other than the item cache, which is garbage-collectable. Adding aclose to the tuple results of tee seems like an overly complex API that deviates from the standard library's design.

A fix should most likely extend the tee_peer (likely turning it into a class) to allow sharing the buffers; reusing the buffer instead of creating a new one for every peer is likely necessary for that.

This is the main point where I have a different perspective. My entire proposal is built on the idea that we should reuse itertools.tee instead of reimplementing it. The standard library's version is written in C and is highly optimized. A pure Python implementation of its buffering and sharing logic would almost certainly be slower and more prone to subtle bugs.

I believe leveraging the battle-tested, high-performance C implementation from itertools is a much better path than creating a new Python-based one from scratch. Also the current implementation behaves differently from itertools in many cases, fixing these inconsistency would be breaking changes. Instead, I would propose a new module that better reflect itertools behaviors.

Atry avatar Aug 15 '25 23:08 Atry

It seems we are talking past each other. You have posted a bug report. This to me means the corresponding changes will fix the bug. Critically, they won't just adapt the change to cover other primitives or even piggyback an entirely new module. If that's what you want, please open a feature request.

All the implementation is not generated by AI. The docstring is partially generated by AI. […]

Please don't do that. The docstrings are a huge volume to look through and double-check against the implementation.

I can see why it might look that way, but if you strip out the docstrings, the implementation is quite minimal. […]

I'm not saying it's not minimal – it's convoluted. The main logic is about eight layers deep, all crammed into a single expression. The result is about five iterators deep, mixing sync-to-async-iterator, two layers of sync-iterator, async-iterator-to-awaitabale-iterator and async-iterator (which itself might actually be an sync-iterator-to-async-iterator).

I do understand what you are trying to do here and it's very clever, but the code real estate is totally inverted to the actual complexity.

My understanding was that since asyncio.ensure_future gets the future from the currently running loop […]

asyncio.ensure_future will only produce asyncio Futures. It won't work with other event loop implementations, such as trio.

Note that the current asyncstdlib.tee already has to deal with the same problem. It does so without using any specific Future.

I intentionally chose not to add aclose to the peer iterators to mirror the behavior of itertools.tee […]

Well, the entire point of this library is that the async world doesn't behave the same as the sync world. Both aclose and async with are intentionally supported. There is no guarantee that whoever created the original async iterable even still owns it, let alone is prepared to clean it up. The "downstream iterators" are likely equal owners of the underlying iterator and should be able to close it once the last of them is done.

This is the main point where I have a different perspective. […]

I don't have a problem reusing itertools.tee (provided it's properly documented). It would very much be suitable to use as the buffer for the tee_peers.

Note though that I don't see any inherent advantage here, either. Using itertools.tee adds a lot of layers and complexity, whereas a simple singly-linked-list is rather straightforward to implement. Given the overhead of async, I don't see the performance/memory advantage of itertools.tee as significant; going through half a dozen layers vs just one per a/next makes the situation not comparable.
Note specifically that the required behaviour seems to be broken in itertools.tee of older versions of CPython. (Yes, this is the same issue you linked to before.) PyPy also has problems with this.

Also the current implementation behaves differently from itertools in many cases, fixing these inconsistency would be breaking changes. Instead, I would propose a new module that better reflect itertools behaviors.

So far we are talking about one bug fix to adjust behaviour to match what has only been codified for 3.13. This seems to be a very niche usecase, as even the stdlib behaviour was broken until recently. I don't see a problem fixing that in a patch release for asyncstdlib 3.13.

To be quite honest this seems to be way too much of a niche usecase to justify a new module with parallel implementation. Either it's important enough to match the standard and thus goes into the regular asyncstdlib.itertools module, or it's not and has no place in asyncstdlib.

maxfischer2781 avatar Aug 16 '25 15:08 maxfischer2781

In fact, I forgot asyncio.shield, which is neccessary to prevent the one consumer from being cancelled by another consumer accidentally.

@dataclass(frozen=True)
class FutureIteratorBackedAsyncIterator(IteratorBackedAsyncIterator[_ItemType], Generic[_ItemType]):
    iterator: TeeIterator[Future[_ItemType]]
    @override
    def __anext__(self):
        return asyncio.shield(super().__anext__())

Also, I found that asyncio.shield is not used in the current implementation of asyncstdlib, which I believe is another issue here.

Atry avatar Aug 17 '25 22:08 Atry

I think that I was clear that this library is not specific to asyncio.

maxfischer2781 avatar Aug 18 '25 05:08 maxfischer2781