libsql-experimental-python
libsql-experimental-python copied to clipboard
Stream expired due to inactivity.
- Trying to establish a connection with TursoDB, connection has been established successfully but getting this error after giving a pause for some seconds before sending any request again.
- It is basically closing the connection automatically.
I had to make a workaround as this hasn't been fixed :
class ConnectionWrapper:
def __init__(self, db_conn: libsql.Connection):
self.db_conn = db_conn
self.retry_connection_fn = None
def execute(self, *args, **kwargs):
try:
return self.db_conn.execute(*args, **kwargs)
except ValueError as e:
if "expired" in str(e):
self.retry_connection_fn()
return self.db_conn.execute(*args, **kwargs)
else:
raise e
... (in main) ...
conn = ConnectionWrapper(libsql.Connection(url, token, ...))
conn.retry_connection_fn = lambda: libsql.Connection(url, token, ...))
basically holding a lambda that reconnects whenever needed.