faust icon indicating copy to clipboard operation
faust copied to clipboard

Faust channel and web are running a different loops

Open popadi opened this issue 3 years ago • 2 comments

Question: Question previously asked here: https://github.com/robinhood/faust/issues/287 by somebody else.

It seems that the web is running in a completely different loop from the rest of the app. At least sometimes. It is quite annoying to not be able to use a ClientSession defined as a member of the faust app instance or simply a aioredis client. Is this expected/intended?

Let's say I inherit from faust.App to and I attach an aiohttp.ClientSession and a redis client so I can use them either in my agents/timers or inside views.

class CustomFaustApplication(faust.App):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.aiohttp_session: aiohttp.ClientSession
        self.redis_client: Redis

    async def on_start(self) -> None:
        await super().on_start()
        self.redis_client = await self._get_redis_client()
        self.aiohttp_session = self._get_session()

    def _get_jcm_session(self):
        return aiohttp.ClientSession(raise_for_status=True, base_url=RestAPISettings.BASE_URL)

    async def _get_redis_client(self) -> Redis:
        sentinel = Sentinel([RedisSettings.SENTINELS])
        return sentinel.master_for('redis', decode_responses=True)

    async def on_shutdown(self) -> None:
        if self.aiohttp_session:
            await self.aiohttp_session.close()

        if self.redis_client:
            await self.redis_client.close()

Let's say I have the following views:

async def get_keys_with_prefixx(redis_client, prefix: str) -> List[str]:
    keys = []
    cursor = 0

    while True:
        cursor, result = await redis_client.scan(cursor, match=f'{prefix}*', count=10000)
        keys.extend(result)

        if cursor == 0:
            break

    return keys

@app.page('/test_1')
async def test_1(self, request):
    res = await get_keys_with_prefixx(app.redis_client, 'someprefix')
    return self.json({'result': res})

@app.page('/test_2')
async def test_2(self, request):
    res = await app.aiohttp_session.get('/api')
    return self.json({'result': res})

For test_1, I receive:

...
    res = await app.redis_client.get(key)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/client.py", line 1061, in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/connection.py", line 1359, in get_connection
    await connection.connect()
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 54, in connect
    await self.connect_to(await self.connection_pool.get_master_address())
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 117, in get_master_address
    master_address = await self.sentinel_manager.discover_master(self.service_name)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 222, in discover_master
    masters = await sentinel.sentinel_masters()
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/client.py", line 1064, in execute_command
    return await self.parse_response(conn, command_name, **options)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/client.py", line 1080, in parse_response
    response = await connection.read_response()
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/connection.py", line 859, in read_response
    await self.disconnect()
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aioredis/connection.py", line 762, in disconnect
    await self._writer.wait_closed()
  File "/home/apop/miniconda3/lib/python3.7/asyncio/streams.py", line 323, in wait_closed
    await self._protocol._closed
RuntimeError: Task <Task pending coro=<Agent._execute_actor() running at /home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/agents/agent.py:671> cb=[<TaskWakeupMethWrapper object at 0x7fffe8642750>()]> got Future <Future pending> attached to a different loop
[2021-11-26 05:46:51,740] [37473] [INFO] [^----OneForOneSupervisor: (1@0x7fffe86314d0)]: Restarting dead <Agent*: worker.agents.cfgevents_agent>! Last crash reason: RuntimeError('Task <Task pending coro=<Agent._execute_actor() running at /home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/agents/agent.py:671> cb=[<TaskWakeupMethWrapper object at 0x7fffe8642750>()]> got Future <Future pending> attached to a different loop') 

For test_2, I get:

 Traceback (most recent call last):
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 430, in _handle_request
    resp = await request_handler(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_app.py", line 504, in _handle
    resp = await handler(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/drivers/aiohttp.py", line 247, in _dispatch
    return await handler(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 82, in __call__
    return await self.dispatch(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 98, in dispatch
    response = await method(cast(Request, request), **kwargs)
  File "/home/apop/git/switchlogs-parser/worker/views.py", line 25, in test
    res = await app.jcm_aiohttp_session.get('/api')
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/client.py", line 466, in _request
    with timer:
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/helpers.py", line 699, in __enter__
    "Timeout context manager should be used " "inside a task"
RuntimeError: Timeout context manager should be used inside a task

Note that it seems to work if I initialize my app with `web_in_thread=False'.

Versions:

  • python==3.7
  • faust==0.6.10

popadi avatar Nov 26 '21 11:11 popadi

you have to use asyncio.run_coroutine_threadsafe to run an asyncio task attached to a different asyncio loop

patkivikram avatar Nov 30 '21 20:11 patkivikram

you have to use asyncio.run_coroutine_threadsafe to run an asyncio task attached to a different asyncio loop

And what is the best practice in this case, i.e. using an aiohttp session that's a member of a faust.App instance? Is there an example somewhere? I tried the snipped below and I just gen an error:

coro = app.aiohttp_session.get(path)
future = asyncio.run_coroutine_threadsafe(coro, app.loop)
print(future.result(2))

It seems that it just blocks everything and then errors out.

Edit: Also tried the following and it still doesn't work.

result = await app.loop.create_task(app.aiohttp_session.get(path))
data = await result.json()
print(data)

The exception:

[2021-12-07 08:53:39,629] [111129] [ERROR] Error handling request 
Traceback (most recent call last):
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 430, in _handle_request
    resp = await request_handler(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_app.py", line 504, in _handle
    resp = await handler(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/drivers/aiohttp.py", line 247, in _dispatch
    return await handler(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 82, in __call__
    return await self.dispatch(request)
  File "/home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/faust/web/views.py", line 98, in dispatch
    response = await method(cast(Request, request), **kwargs)
  File "/home/apop/git/switchlogs-parser/worker/views.py", line 89, in get
    return await self.trigger(request.query)
  File "/home/apop/git/switchlogs-parser/worker/views.py", line 132, in trigger
    result = await app.loop.create_task(app.aiohttp_session.get(path))
RuntimeError: Task <Task pending coro=<RequestHandler._handle_request() running at /home/apop/git/switchlogs-parser/.venv/lib/python3.7/site-packages/aiohttp/web_protocol.py:430> cb=[<TaskWakeupMethWrapper object at 0x7fffbc79b6d0>()]> got Future <Task pending coro=<<_RequestContextManager without __name__>()>> attached to a different loop

As long as I have web_in_thread=True it doens't seem to work.

popadi avatar Dec 07 '21 14:12 popadi