Can't use FastAPI + Faust-streaming version above 0.10.1 - attached to a different loop
Thanks for all the people continuing to support this project, unfortunately I run into a critical bug in my system:
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
- fastapi_app.py - FastAPI app started with uvicorn.
- faust_app.py - Faust app started with
python faust_app.py worker -l info - both processes running in the same docker container.
- Faust-streaming any version above 0.10.1 (checked 0.10.2, 0.10.3, 0.10.4)
Expected behavior
I can import a topic created in faust_app.py into fastapi_app.py and use await my_topic.send(value={...}) inside of FastAPI async endpoints (or use an async function that does the same). Everything works OK in faust-streaming version 0.10.1 and below.
Actual behavior
AssertionError: Please create objects with the same loop as running with
or
RuntimeError: Task attached to a different loop
looks like the 1st attempt will result in the 1st error, and further attempts result in the second error.
Full traceback
in production k8s
....
File "/app/my_service/fastapi_app.py", line 70, in send_my_message_endpoint
await send_my_message(my_message)
File "/app/my_service/faust_app.py", line 71, in send_my_message
await my_message_topic.send(value=my_message.json())
File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 189, in send
return await self._send_now(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/channels.py", line 314, in _send_now
return await self.publish_message(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 429, in publish_message
await producer.send(
File "/usr/local/lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 1360, in send
await transaction_producer.send(
File "/usr/local/lib/python3.11/site-packages/aiokafka/producer/producer.py", line 442, in send
await self.client._wait_on_metadata(topic)
File "/usr/local/lib/python3.11/site-packages/aiokafka/client.py", line 639, in _wait_on_metadata
await self.force_metadata_update()
RuntimeError: Task <Task pending name='Task-50' coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py:407> cb=[set.discard()]> got Future <Future pending cb=[shield.<locals>._outer_done_callback() at /usr/local/lib/python3.11/asyncio/tasks.py:898]> attached to a different loop
in local docker-compose
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 407, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 270, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 103, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 101, in __call__
await self.app(scope, receive, send_wrapper)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 92, in __call__
await self.simple_response(scope, receive, send, request_headers=headers)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 147, in simple_response
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 706, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 66, in app
response = await func(request)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 235, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 161, in run_endpoint_function
return await dependant.call(**values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/my_service/fastapi_app.py", line 70, in send_my_message_endpoint
await send_my_message(my_message)
File "/app/my_service/faust_app.py", line 71, in send_my_message
await my_message_topic.send(value=my_message.json())
File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 189, in send
return await self._send_now(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/channels.py", line 314, in _send_now
return await self.publish_message(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 407, in publish_message
producer = await self._get_producer()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 384, in _get_producer
return await self.app.maybe_start_producer()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/mode/utils/futures.py", line 58, in __call__
result = await self.fun(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1569, in maybe_start_producer
await producer.maybe_start()
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 859, in maybe_start
await self.start()
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 800, in start
await self._default_start()
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 807, in _default_start
await self._actually_start()
File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 824, in _actually_start
await self.on_start()
File "/usr/local/lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 1300, in on_start
await producer.start()
File "/usr/local/lib/python3.11/site-packages/aiokafka/producer/producer.py", line 303, in start
assert self._loop is get_running_loop(), (
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: Please create objects with the same loop as running with
Versions
- Python version 3.11.2
- Faust version 0.10.2, 0.10.3, 0.10.4
- Operating system Win11 + WSL (Debian container
python:3.11-slim) - Kafka version -
image: confluentinc/cp-kafka:7.0.0
faust-streaming has aiohttp embedded in it and you should use that instead of fast-api
aiohttp
- Before I could use any web framework before, and it worked.
- Aiohttp is nowhere as good as FastAPI. Faust probably should ditch
faust.web(aiohttp) anyway and let users plug-in any framework they like
I have Faust services where I only use built-in aiohttp for web interface, but it is cumbersome at best.
After some instigation, I was able to make my services work with 0.10.2, 0.10.3, 0.10.4 versions I need to start my fastapi app from command line or from separate python file like run.py, not from fastapi_app.py Moving the code snippet blow from fastapi_app.py to separate python file like run.py and changing entry point seem to fix the issue. Probably it's because of double import and double creation of faust app/kafka producer.
if __name__ == "__main__":
import uvicorn
uvicorn.run('fastapi_app:app', host=..., port=..., reload=True)
I guess the changes from 0.10.2 should be at least a minor patch, as it breaks some workflows.
Thanks for pursuing this, I'll take a stab at expanding upon a better solution in https://github.com/faust-streaming/faust/blob/master/examples/fastapi_example.py.
I do think that we should explore replacing aiohttp with fastapi. It's just a matter of finding the time to do so...