faust
faust copied to clipboard
Faust channel and web are running a different loops
Question: Question previously asked here: https://github.com/robinhood/faust/issues/287 by somebody else.
It seems that the web is running in a completely different loop from the rest of the app. At least sometimes. It is quite annoying to not be able to use a ClientSession defined as a member of the faust app instance or simply a aioredis client. Is this expected/intended?
Let's say I inherit from faust.App
to and I attach an aiohttp.ClientSession
and a redis
client so I can use them either in my agents/timers or inside views.
class CustomFaustApplication(faust.App):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.aiohttp_session: aiohttp.ClientSession
self.redis_client: Redis
async def on_start(self) -> None:
await super().on_start()
self.redis_client = await self._get_redis_client()
self.aiohttp_session = self._get_session()
def _get_jcm_session(self):
return aiohttp.ClientSession(raise_for_status=True, base_url=RestAPISettings.BASE_URL)
async def _get_redis_client(self) -> Redis:
sentinel = Sentinel([RedisSettings.SENTINELS])
return sentinel.master_for('redis', decode_responses=True)
async def on_shutdown(self) -> None:
if self.aiohttp_session:
await self.aiohttp_session.close()
if self.redis_client:
await self.redis_client.close()
Let's say I have the following views:
async def get_keys_with_prefixx(redis_client, prefix: str) -> List[str]:
keys = []
cursor = 0
while True:
cursor, result = await redis_client.scan(cursor, match=f'{prefix}*', count=10000)
keys.extend(result)
if cursor == 0:
break
return keys
@app.page('/test_1')
async def test_1(self, request):
res = await get_keys_with_prefixx(app.redis_client, 'someprefix')
return self.json({'result': res})
@app.page('/test_2')
async def test_2(self, request):
res = await app.aiohttp_session.get('/api')
return self.json({'result': res})
For test_1, I receive:
...
res = await app.redis_client.get(key)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/client.py", line 1061, in execute_command
conn = self.connection or await pool.get_connection(command_name, **options)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/connection.py", line 1359, in get_connection
await connection.connect()
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 54, in connect
await self.connect_to(await self.connection_pool.get_master_address())
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 117, in get_master_address
master_address = await self.sentinel_manager.discover_master(self.service_name)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 222, in discover_master
masters = await sentinel.sentinel_masters()
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/client.py", line 1064, in execute_command
return await self.parse_response(conn, command_name, **options)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/client.py", line 1080, in parse_response
response = await connection.read_response()
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/connection.py", line 859, in read_response
await self.disconnect()
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/connection.py", line 762, in disconnect
await self._writer.wait_closed()
File "/home/apop/miniconda3/lib/python3.7/asyncio/streams.py", line 323, in wait_closed
await self._protocol._closed
RuntimeError: Task <Task pending coro=<Agent._execute_actor() running at /home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/agents/agent.py:671> cb=[<TaskWakeupMethWrapper object at 0x7fffe8642750>()]> got Future <Future pending> attached to a different loop
[2021-11-26 05:46:51,740] [37473] [INFO] [^----OneForOneSupervisor: (1@0x7fffe86314d0)]: Restarting dead <Agent*: worker.agents.cfgevents_agent>! Last crash reason: RuntimeError('Task <Task pending coro=<Agent._execute_actor() running at /home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/agents/agent.py:671> cb=[<TaskWakeupMethWrapper object at 0x7fffe8642750>()]> got Future <Future pending> attached to a different loop')
For test_2, I get:
Traceback (most recent call last):
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 430, in _handle_request
resp = await request_handler(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_app.py", line 504, in _handle
resp = await handler(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/drivers/aiohttp.py", line 247, in _dispatch
return await handler(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 82, in __call__
return await self.dispatch(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 98, in dispatch
response = await method(cast(Request, request), **kwargs)
File "/home/apop/git/switchlogs-parser/worker/views.py", line 25, in test
res = await app.jcm_aiohttp_session.get('/api')
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/client.py", line 466, in _request
with timer:
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/helpers.py", line 699, in __enter__
"Timeout context manager should be used " "inside a task"
RuntimeError: Timeout context manager should be used inside a task
Note that it seems to work if I initialize my app with `web_in_thread=False'.
Versions:
- python==3.7
- faust==0.6.10
you have to use asyncio.run_coroutine_threadsafe
to run an asyncio task attached to a different asyncio loop
you have to use
asyncio.run_coroutine_threadsafe
to run an asyncio task attached to a different asyncio loop
And what is the best practice in this case, i.e. using an aiohttp session that's a member of a faust.App
instance? Is there an example somewhere? I tried the snipped below and I just gen an error:
coro = app.aiohttp_session.get(path)
future = asyncio.run_coroutine_threadsafe(coro, app.loop)
print(future.result(2))
It seems that it just blocks everything and then errors out.
Edit: Also tried the following and it still doesn't work.
result = await app.loop.create_task(app.aiohttp_session.get(path))
data = await result.json()
print(data)
The exception:
[2021-12-07 08:53:39,629] [111129] [ERROR] Error handling request
Traceback (most recent call last):
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 430, in _handle_request
resp = await request_handler(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_app.py", line 504, in _handle
resp = await handler(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/drivers/aiohttp.py", line 247, in _dispatch
return await handler(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 82, in __call__
return await self.dispatch(request)
File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 98, in dispatch
response = await method(cast(Request, request), **kwargs)
File "/home/apop/git/switchlogs-parser/worker/views.py", line 89, in get
return await self.trigger(request.query)
File "/home/apop/git/switchlogs-parser/worker/views.py", line 132, in trigger
result = await app.loop.create_task(app.aiohttp_session.get(path))
RuntimeError: Task <Task pending coro=<RequestHandler._handle_request() running at /home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_protocol.py:430> cb=[<TaskWakeupMethWrapper object at 0x7fffbc79b6d0>()]> got Future <Task pending coro=<<_RequestContextManager without __name__>()>> attached to a different loop
As long as I have web_in_thread=True
it doens't seem to work.