asynctnt icon indicating copy to clipboard operation
asynctnt copied to clipboard

Error asyncio.exceptions.InvalidStateError: invalid state on connection_lost

Open golubovai opened this issue 1 year ago • 5 comments

Get error on production server:

Traceback (most recent call last): File "uvloop/cbhandles.pyx", line 69, in uvloop.loop.Handle._run File "uvloop/handles/basetransport.pyx", line 169, in uvloop.loop.UVBaseTransport._call_connection_lost File "asynctnt/iproto/coreproto.pyx", line 185, in asynctnt.iproto.protocol.CoreProtocol.connection_lost File "asynctnt/iproto/protocol.pyx", line 413, in asynctnt.iproto.protocol.BaseProtocol._on_connection_lost File "/usr/local/lib/python3.12/site-packages/asynctnt/connection.py", line 174, in connection_lost self._disconnect_waiter.set_result(True) asyncio.exceptions.InvalidStateError: invalid state

After this no connections could be established.

version 2.2.1 python 3.12

Maybe this change will be sufficient:

if self._disconnect_waiter: -> if self._disconnect_waiter and not self._disconnect_waiter.done():

golubovai avatar Aug 19 '24 10:08 golubovai

Hi! If possible could you provide some more info on how it could be reproduced? Maybe some code snippets on how you setup connection or general connection lifecycle in your app? I understand that it could be hard to reproduce, but still may be some clues.

igorcoding avatar Aug 19 '24 11:08 igorcoding

Hi! I'll try.

We use async connection pool (asyncio_connection_pool) with asynctnt to reuse connections and speed up interaction.

Connection pool strategy looks like this:

from asyncio_connection_pool import ConnectionStrategy
from asynctnt import Connection

class ConnectionPoolStrategy(ConnectionStrategy[Connection]):
    def __init__(
        self,
        connection_config: ConnectionConfig,
        connection_pool_config: ConnectionPoolConfig,
    ):
        self._connection_config = connection_config
        self._connection_pool_config = connection_pool_config

    async def make_connection(self):
        return await Connection(
            host=self._connection_config.host,
            port=self._connection_config.port,
            username=self._connection_config.username,
            password=self._connection_config.password,
            fetch_schema=False,
            auto_refetch_schema=False,
            connect_timeout=self._connection_pool_config.connect_timeout,
            reconnect_timeout=self._connection_pool_config.reconnect_timeout,
            ping_timeout=self._connection_pool_config.ping_timeout,
        ).connect()

    def connection_is_closed(self, conn):
        return not conn.is_connected

    async def close_connection(self, conn):
        await conn.disconnect()

asyncio_connection_pool don't have an option for timeout on get_connection function. Therefore we use wrapper for async context manager to add this functionality:

@asynccontextmanager
    async def get_connection(self):
        mgr = (self._connection_pool.get_connection())
        aexit = type(mgr).__aexit__
        aenter = type(mgr).__aenter__
        timeout = self.get_connection_pool_config().connect_timeout
        try:
            async with async_timeout(timeout):
                conn = await aenter(mgr)
            try:
                yield conn
            except Exception:
                if not await aexit(mgr, *sys.exc_info()):
                    raise
            else:
                await aexit(mgr, None, None, None)
        except (TimeoutError, asyncio.TimeoutError):
            raise TimeoutError(f'Connection timeout: {str(timeout)}')

Then this wrapper is used in standart way:

async with self.get_connection() as conn:
     pass

golubovai avatar Aug 20 '24 04:08 golubovai

Configuration values are: connect_timeout = 0.5 reconnect_timeout = None ping_timeout = 0

golubovai avatar Aug 20 '24 05:08 golubovai

Found this place in source Connection.py: except asyncio.TimeoutError: # pragma: nocover tr.close() continue # try again

What if while tr.close() (docs say that it works asynchronously) we already create new protocol and call disconnect on it.

golubovai avatar Aug 20 '24 13:08 golubovai

Hi @golubovai ! Sorry for the delay - got carried away with other stuff. I tried to reproduce the error - still no luck :( Instead I've conducted several "thought experiments" and maybe (maybe) the problem could've been in the not so transparent state logic. Could you maybe try to replace connection_is_closed method with the following:

    def connection_is_closed(self, conn: asynctnt.Connection):
        return conn.state == ConnectionState.DISCONNECTED

This conn.state seems much more accurate to what the end-user (you) expect for the connection to be connected or disconnected. So if errors still happen - please try to replace this function and please let me know how it goes.

igorcoding avatar Sep 16 '24 17:09 igorcoding