pydantic-ai icon indicating copy to clipboard operation
pydantic-ai copied to clipboard

#748: don't share httpx.AsyncHTTPTransport between event loops

Open ewjoachim opened this issue 6 months ago • 7 comments
trafficstars

Fixes #748

In cached_async_http_client we cache the http client (...duh) and its transport. The transport keeps a pointer to the current event loop through:

transport._pool._connections[0]._connection._network_stream._stream.transport_stream._transport._loop

and will try to reuse this event loop on subsequent calls. This causes RuntimeError('Event loop is closed') in case that loop was closed and we're using another one.

My fix makes it so that the real instance of the transport is only evaluated when we're in a loop context, and returns a different instance for each distinct loop.

I've been using https://github.com/oscar-broman/pydantic-ai-gemini-issue-mre as a reproducer, and it fails on main and passes on my branch.

The reason we don't see the same test failure in this repo's tests surely is this: because there are fixtures with scope='session', anyIO only creates a single event loop that each test share, so we can't see the issue (also, we mock HTTP calls with pytest-recording and I'm not 100% sure the mock occurs at a level that would make the test fail visible)

All that is to say: I'm not sure how best to implement a reproducer without impeding too much on the codebase. Opinions from maintainers welcome :)

[!WARNING] I'm afraid this makes the following fixture very brittle. It will work if the anyio single event loop issue continues to be, but if we manage to have an event loop for each test, then we won't be able to properly close the clients. That said: isn't that an issue with real code in the real life too ? Who's responsible for closing those cached clients ? (maybe they close on __del__ but that's not ideal, is it ?

https://github.com/pydantic/pydantic-ai/blob/e7ce6750e0178e942803d36f147f39d3201a437c/tests/conftest.py#L202-L217

ewjoachim avatar May 12 '25 13:05 ewjoachim

Screenshot 2025-05-12 at 16 24 41

We have another issue. Everywhere we copy the http client on the model from the provider (see screenshot above), we then won't be using the cache attributes to know whether this client belongs to the appropriate loop or not.

ewjoachim avatar May 12 '25 14:05 ewjoachim

Ok, I've amended my commit with a new proposal, though I haven't tested it as much as the previous one. Now, this idea is to use a proxy object for the Transport, and decide on the real object only when we're sure we're in a valid async context.

What do you think ? Is that accesptable and should I work on that or do you see a flaw in that plan ?

ewjoachim avatar May 12 '25 15:05 ewjoachim

Hm, I thought of a much cleaner solution: keeping transports in a weakrefs.WeakKeyDictionary[EventLoop, Transports]. This way, we keep the transports while the loop is alive and drop it when it's gone.

BUT the whole point is that transports keep a strong ref to the loop, so if we keep a strong ref to the transport, the loop's refcount is never going to go to zero 😭

I guess though we could remove any item from the dict where the loop is closed 🤔

ewjoachim avatar May 12 '25 21:05 ewjoachim

New tentative (likely going to get complain for missing coverage).

I've kept the weakref just in case, but I've added code that will remove from the (weak) keys when the loops that are closed.

I think this should solve the issue. Happy to get some feedback before spending time adding missing tests. Also, if you want to take it from here, feel free :)

ewjoachim avatar May 12 '25 21:05 ewjoachim

I tested changes with simple Streamlit app and Vertex AI. Old issue with Closed Loops is gone. Will try do some more tests and see how it works. Chears for the fix.

bojan2501 avatar May 13 '25 06:05 bojan2501

I'll be leaving this PR for Pydiantic folks to whatever they want with them, but specifically for solving the Gemini issue, using the new GoogleModel should be a better alternative. Now the transport issue still needs to be solved at some point.

ewjoachim avatar May 30 '25 14:05 ewjoachim

@ewjoachim sorry the lack of reply, I was not ignoring, I was talking it internally, because I actually expected unexpected issues with that implementation. 😔

I think the way we intend to move forward is to not do the cache we are doing, and have an async context manager to do the right thing in the Agent level.

I'll write more about it soon. But yeah, you can leave it to us. I'll get back to this soon.

Kludex avatar May 30 '25 16:05 Kludex

Great !

Yeah, I too think the proper fix is to avoid creating any httpx session/transport/... until we're 100% sure we're in an async context, and then make sure we stop it when we leave the context. But yeah, either there's going to be change in the API and that's probably annoying, or there's going to be more profound change in the integration, but it doesn't make a lot of sense that this kind of change would be lead by a first time contributor, so yeah, it's probably better this way :)

ewjoachim avatar Jun 02 '25 08:06 ewjoachim

I had a brief chat about this with @Kludex, and his thinking is that the right solution would be for the user to handle the httpx.AsyncClient lifecycle. So I'll close this PR. It would be good to document that recommendation though.

DouweM avatar Oct 01 '25 14:10 DouweM