Error asyncio.exceptions.InvalidStateError: invalid state on connection_lost
Get error on production server:
Traceback (most recent call last): File "uvloop/cbhandles.pyx", line 69, in uvloop.loop.Handle._run File "uvloop/handles/basetransport.pyx", line 169, in uvloop.loop.UVBaseTransport._call_connection_lost File "asynctnt/iproto/coreproto.pyx", line 185, in asynctnt.iproto.protocol.CoreProtocol.connection_lost File "asynctnt/iproto/protocol.pyx", line 413, in asynctnt.iproto.protocol.BaseProtocol._on_connection_lost File "/usr/local/lib/python3.12/site-packages/asynctnt/connection.py", line 174, in connection_lost self._disconnect_waiter.set_result(True) asyncio.exceptions.InvalidStateError: invalid state
After this no connections could be established.
version 2.2.1 python 3.12
Maybe this change will be sufficient:
if self._disconnect_waiter: -> if self._disconnect_waiter and not self._disconnect_waiter.done():
Hi! If possible could you provide some more info on how it could be reproduced? Maybe some code snippets on how you setup connection or general connection lifecycle in your app? I understand that it could be hard to reproduce, but still may be some clues.
Hi! I'll try.
We use async connection pool (asyncio_connection_pool) with asynctnt to reuse connections and speed up interaction.
Connection pool strategy looks like this:
from asyncio_connection_pool import ConnectionStrategy
from asynctnt import Connection
class ConnectionPoolStrategy(ConnectionStrategy[Connection]):
def __init__(
self,
connection_config: ConnectionConfig,
connection_pool_config: ConnectionPoolConfig,
):
self._connection_config = connection_config
self._connection_pool_config = connection_pool_config
async def make_connection(self):
return await Connection(
host=self._connection_config.host,
port=self._connection_config.port,
username=self._connection_config.username,
password=self._connection_config.password,
fetch_schema=False,
auto_refetch_schema=False,
connect_timeout=self._connection_pool_config.connect_timeout,
reconnect_timeout=self._connection_pool_config.reconnect_timeout,
ping_timeout=self._connection_pool_config.ping_timeout,
).connect()
def connection_is_closed(self, conn):
return not conn.is_connected
async def close_connection(self, conn):
await conn.disconnect()
asyncio_connection_pool don't have an option for timeout on get_connection function. Therefore we use wrapper for async context manager to add this functionality:
@asynccontextmanager
async def get_connection(self):
mgr = (self._connection_pool.get_connection())
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__
timeout = self.get_connection_pool_config().connect_timeout
try:
async with async_timeout(timeout):
conn = await aenter(mgr)
try:
yield conn
except Exception:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)
except (TimeoutError, asyncio.TimeoutError):
raise TimeoutError(f'Connection timeout: {str(timeout)}')
Then this wrapper is used in standart way:
async with self.get_connection() as conn:
pass
Configuration values are: connect_timeout = 0.5 reconnect_timeout = None ping_timeout = 0
Found this place in source Connection.py: except asyncio.TimeoutError: # pragma: nocover tr.close() continue # try again
What if while tr.close() (docs say that it works asynchronously) we already create new protocol and call disconnect on it.
Hi @golubovai ! Sorry for the delay - got carried away with other stuff. I tried to reproduce the error - still no luck :(
Instead I've conducted several "thought experiments" and maybe (maybe) the problem could've been in the not so transparent state logic. Could you maybe try to replace connection_is_closed method with the following:
def connection_is_closed(self, conn: asynctnt.Connection):
return conn.state == ConnectionState.DISCONNECTED
This conn.state seems much more accurate to what the end-user (you) expect for the connection to be connected or disconnected. So if errors still happen - please try to replace this function and please let me know how it goes.