edgedb-python icon indicating copy to clipboard operation
edgedb-python copied to clipboard

getting ClientConnectionClosedError randomly

Open rajeshdhanda opened this issue 2 years ago • 15 comments

I am connecting to EdgeDB-SERVER in a GCP VM, using FASTAPI, code is given below API code file and edgedb.toml are already in the same directory.

conn = edgedb.connect() 
@app.post("/add_data")   
def add_data(data : Item):
    logging.info(f"Connection_is_closed() : {conn.is_closed()}")
    if conn.is_closed():
        conn._reconnect()
    if conn:
        return conn.query_json("some query here..")

Whenever I send a request got my response but randomly get ClientConnectionClosedError.

just before and after the error

  • request sent, got an error
INFO:root:Connection_is_closed() : False
INFO:     IP:55358 - "POST /add_batch_data_uc HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):  LONG ERROR BELOW
  • when I resent/retry the request, got my response, no error
INFO:root:Connection_is_closed() : True
INFO:root:Query exec time: 0:00:00.013752
INFO:     IP:55366 - "POST /add_batch_data_uc HTTP/1.1" 200 OK

INFO:root:Connection_is_closed() : False
INFO:root:Query exec time: 0:00:00.014205
INFO:     IP:55366 - "POST /add_batch_data_uc HTTP/1.1" 200 OK
  • same problem happens randomly when I try to execute conn.json(query) in random_test.ipynb. It works most of the time but sometimes I got ClientConnectionClosedError
import asyncio
import datetime
import edgedb 
conn= edgedb.connect()
conn.query_json( a query inside )    # generates error randomly / rarely
  • ERROR
INFO:root:Connection_is_closed() : False
INFO:root:query_execution_time : 0:00:00.015267
INFO:     IP_address:54516 - "POST /add_batch_data_uc HTTP/1.1" 200 OK

INFO:root:Connection_is_closed() : False
INFO:     IP_address:24256 - "POST /add_batch_data_uc HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
ERROR:uvicorn.error:Exception in ASGI application
Traceback (most recent call last):
  File "/home/user/.local/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 376, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/user/.local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 75, in __call__
    return await self.app(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/fastapi/applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc
  File "/home/user/.local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc
  File "/home/user/.local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 656, in __call__
    await route.handle(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 259, in handle
    await self.app(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 61, in app
    response = await func(request)
  File "/home/user/.local/lib/python3.8/site-packages/fastapi/routing.py", line 226, in app
    raw_response = await run_endpoint_function(
  File "/home/user/.local/lib/python3.8/site-packages/fastapi/routing.py", line 161, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/concurrency.py", line 39, in run_in_threadpool
    return await anyio.to_thread.run_sync(func, *args)
  File "/home/user/.local/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/home/user/.local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/home/user/.local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "main.py", line 106, in add_batch_data
    var = (conn.query_json('''INSERT uniform_comp {
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/blocking_con.py", line 379, in query_json
    return self._execute(
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/blocking_con.py", line 342, in _execute
    raise e
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/blocking_con.py", line 314, in _execute
    return self._get_protocol().sync_execute_anonymous(
  File "edgedb/protocol/blocking_proto.pyx", line 105, in edgedb.protocol.blocking_proto.BlockingIOProtocol.sync_execute_anonymous
  File "edgedb/protocol/blocking_proto.pyx", line 91, in edgedb.protocol.blocking_proto.BlockingIOProtocol._iter_coroutine
  File "edgedb/protocol/protocol.pyx", line 677, in execute_anonymous
  File "edgedb/protocol/protocol.pyx", line 474, in _optimistic_execute
  File "edgedb/protocol/blocking_proto.pyx", line 62, in wait_for_message
edgedb.errors.ClientConnectionClosedError

INFO:root:Connection_is_closed() : True
INFO:root:query_execution_time : 0:00:00.017633
INFO:     IP_address:54530 - "POST /add_batch_data_uc HTTP/1.1" 200 OK

INFO:root:Connection_is_closed() : False
INFO:root:query_execution_time : 0:00:00.014480
INFO:     IP_address:54530 - "POST /add_batch_data_uc HTTP/1.1" 200 OK

@elprans @fantix @1st1 @tailhook

rajeshdhanda avatar Jan 13 '22 10:01 rajeshdhanda

This is likely NOT an issue of edgedb-python. 2 things in your application:

  1. By default the EdgeDB server will terminate idle connections, but the EdgeDB client library (edgedb-python in this case) will reconnect automatically. You shouldn't have to worry about is_closed() and _reconnect().
  2. In the current version 0.20.1 of edgedb-python, the blocking client (edgedb.connect()) has only one connection to EdgeDB server and it's not thread-safe. FastAPI will use a threadpool to handle concurrent requests if the handler method is not async. So it is not a surprise you ran into errors. Before #286, you should use edgedb.create_async_client() instead and use async def and await to get edgedb-python concurrency support.

Please @ me only if you still have problems.

fantix avatar Jan 13 '22 21:01 fantix

I also tried using

global conn 
conn = edgedb.create_async_client() 
@app.post("/add_data")   
async def add_data(data : Item):
    if conn:
        var = await  conn.query_json("some query here..")
    return var

using uvicorn in code

if __name__ == "__main__":
    uvicorn.run("main:app", host= '0.0.0.0',  port=10704 ) #, workers= 4) #, reload=True) 

it works well but after some time it throughs error

ERROR:uvicorn.error:Exception in ASGI application
Traceback (most recent call last):
  File "/home/user/.local/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 376, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/user/.local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 75, in __call__
    return await self.app(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/fastapi/applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc
  File "/home/user/.local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc
  File "/home/user/.local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 656, in __call__
    await route.handle(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 259, in handle
    await self.app(scope, receive, send)
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 61, in app
    response = await func(request)
  File "/home/user/.local/lib/python3.8/site-packages/fastapi/routing.py", line 226, in app
    raw_response = await run_endpoint_function(
  File "/home/user/.local/lib/python3.8/site-packages/fastapi/routing.py", line 159, in run_endpoint_function
    return await dependant.call(**values)
  File "main.py", line 117, in add_batch_data
    for id , data in inp_data.items():
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 559, in query_json
    async with self._acquire() as con:
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 620, in __aenter__
    self.connection = await self.pool._impl._acquire(
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 393, in _acquire
    return await _acquire_impl()
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 378, in _acquire_impl
    proxy = await ch.acquire()  # type: PoolConnection
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 110, in acquire
    await self.connect()
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 102, in connect
    self._con = await self._pool._get_new_connection()
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 346, in _get_new_connection
    con = await asyncio_con._connect_addr(
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_con.py", line 582, in _connect_addr
    await connection.ensure_connected()
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_con.py", line 236, in ensure_connected
    await self._reconnect(single_attempt=single_attempt)
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_pool.py", line 53, in _reconnect
    return await super()._reconnect(single_attempt=single_attempt)
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_con.py", line 243, in _reconnect
    await impl.connect(inner._loop, inner._addrs,
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_con.py", line 70, in connect
    await compat.wait_for(
  File "/usr/lib/python3.8/asyncio/tasks.py", line 494, in wait_for
    return fut.result()
  File "/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_con.py", line 115, in _connect_addr
    tr, pr = await loop.create_connection(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 986, in create_connection
    infos = await self._ensure_resolved(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1365, in _ensure_resolved
    return await loop.getaddrinfo(host, port, family=family, type=type,
  File "/usr/lib/python3.8/asyncio/base_events.py", line 825, in getaddrinfo
    return await self.run_in_executor(
RuntimeError: Task <Task pending name='Task-43' coro=<_AsyncIOConnectionImpl._connect_addr() running at 
/home/user/.local/lib/python3.8/site-packages/edgedb/asyncio_con.py:115> 
cb=[_release_waiter(<Future pendi...9bd703cd0>()]>)() at /usr/lib/python3.8/asyncio/tasks.py:429]> 
got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib/python3.8/asyncio/futures.py:360]> 
attached to a different loop

@fantix

rajeshdhanda avatar Jan 15 '22 07:01 rajeshdhanda

I did some modifications according to https://github.com/fastapi-users/fastapi-users/discussions/663, currently it's working

rajeshdhanda avatar Jan 15 '22 13:01 rajeshdhanda

Good to know that it finally worked for you! For the loop binding issue, though it is usually not assumed in a way that the loop is only attached on the first await call, but I fixed it anyways here so that in most cases a global EdgeDB client object won't get in the way of using a different loop.

fantix avatar Jan 17 '22 00:01 fantix

I can confirm that after restarting instance (1.4+5ffe34b), python-edgedb does not reconnect automatically (edgedb.errors.ClientConnectionClosedError)

I used this option :

client: AsyncIOClient = create_async_client(dsn=dsn)
client = client.with_retry_options(
    RetryOptions(attempts=10)
)

It is a file watcher, it must always be up even if server is killing connections, so I had to use client.ensure_connected()

AdrienPensart avatar Jul 26 '22 19:07 AdrienPensart

I can confirm that after restarting instance (1.4+5ffe34b), python-edgedb does not reconnect automatically (edgedb.errors.ClientConnectionClosedError)

I used this option :

client: AsyncIOClient = create_async_client(dsn=dsn)
client = client.with_retry_options(
    RetryOptions(attempts=10)
)

It is a file watcher, it must always be up even if server is killing connections, so I had to use client.ensure_connected()

That might be fixed in #349, could you please check again with the latest edgedb-python release please?

fantix avatar Jul 27 '22 03:07 fantix

Getting those a lot

   File "/app/octo/octo/tasks/tasks.py", line 108, in handle
     return JSONResponse({"created": str(task_id)})
   File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/asyncio_client.py", line 291, in __aexit__
     return await self._exit(extype, ex)
   File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 163, in _exit
     raise err
   File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 144, in _exit
     await self._privileged_execute("ROLLBACK;")
   File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 197, in _privileged_execute
     await self._connection.privileged_execute(abstract.ExecuteContext(
   File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/base_client.py", line 181, in privileged_execute
     await self._protocol.execute(
   File "edgedb/protocol/protocol.pyx", line 505, in execute
   File "edgedb/protocol/protocol.pyx", line 209, in edgedb.protocol.protocol.SansIOProtocol.ensure_connected
 edgedb.errors.ClientConnectionClosedError: the connection has been closed

monomonedula avatar Dec 29 '22 18:12 monomonedula

@monomonedula, does your transaction take a lot of time? Transactions should not be left open while you're doing some slow work like downloading files, sending emails, or doing any sort of complex computation (generally any network tasks aren't great there). So if that work takes longer than EdgeDB's server-side timeout even occasionally, you'll get these errors. And it's better to split this transaction into two or more in this case.

Also, presumably this exception is a result of other exception in the code, right? (because this is a rollback). Do you see that exception as part of traceback? (you don't have to show that to us, just we want to make sure that it's visible or we have to fix something to make it visible). That might help you figuring out what takes too much time.

tailhook avatar Dec 30 '22 12:12 tailhook

@tailhook , nope it doesn't do any long-running stuff. I'm getting those at different random places in code. It gets worse when there are more concurrent connections to the server (not the same database, but the EdgeDB server itself). Namely, when I switched from running tests in parallel to running them sequentially, it got much better. However, reducing load doesn't look like a solution, cause those are just normal tests, without any heavy-lifting.

To give some more context:

I'm also running this from inside docker-compose (the EdgeDB server, however, is on a standalone bare-metal deployment).

The previous testing flow relied on creating and using a bunch of (4 at a time, 15 in total) temporary databases for test purposes and it seems like the EdgeDB server didn't handle it quite well, because tests failed every time due to ClientConnectionClosedError appearing at random places (without any specific pattern). Note that the load was pretty insignificant, every of the databases operated about < 30 objects. When i changed the testing flow to using a single DB running 1 test a time, the ClientConnectionClosedError is almost gone. Nevertheless, it doesn't look good cause I am migrating from MongoDB and it handled all this without any problems.

monomonedula avatar Dec 30 '22 19:12 monomonedula

@tailhook here's a full traceback of a similar case (i lost the original one)

Traceback (most recent call last):
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 139, in _exit
    await self._privileged_execute(query)
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 197, in _privileged_execute
    await self._connection.privileged_execute(abstract.ExecuteContext(
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/base_client.py", line 181, in privileged_execute
    await self._protocol.execute(
  File "edgedb/protocol/protocol.pyx", line 559, in execute
  File "edgedb/protocol/protocol.pyx", line 392, in _execute
  File "edgedb/protocol/asyncio_proto.pyx", line 62, in wait_for_message
edgedb.errors.ClientConnectionClosedError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/app/common/rabbit_worker/worker/execution/task.py", line 67, in execute
    await self._task.execute()
  File "/app/common/rabbit_worker/worker/execution/task.py", line 119, in execute
    await asyncio.wait_for(self._task.execute(), timeout=self._timeout)
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 455, in wait_for
    return await fut
  File "/app/common/rabbit_worker/worker/execution/task.py", line 34, in execute
    await self._c()
  File "/app/overmind/overmind/consumer/proc_http.py", line 122, in run
    status, should_pub, t, hotness = await store_httpreq(
  File "/app/common/storage/httpreq.py", line 71, in store_httpreq
    id_ = await insert_req(
  File "/app/common/storage/httpreq.py", line 127, in insert_req
    return upserted.id
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/asyncio_client.py", line 291, in __aexit__
    return await self._exit(extype, ex)
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 163, in _exit
    raise err
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 144, in _exit
    await self._privileged_execute("ROLLBACK;")
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/transaction.py", line 197, in _privileged_execute
    await self._connection.privileged_execute(abstract.ExecuteContext(
  File "/opt/poetry-venv/lib/python3.8/site-packages/edgedb/base_client.py", line 181, in privileged_execute
    await self._protocol.execute(
  File "edgedb/protocol/protocol.pyx", line 505, in execute
  File "edgedb/protocol/protocol.pyx", line 209, in edgedb.protocol.protocol.SansIOProtocol.ensure_connected
edgedb.errors.ClientConnectionClosedError: the connection has been closed


monomonedula avatar Dec 30 '22 19:12 monomonedula

@fantix is it possible that connection is pre-emptied by the server because of too many connections or something?

tailhook avatar Jan 02 '23 14:01 tailhook

Also few notes for ourselves on how we can improve debugging of such cases after we figure out the real issue:

  1. If this is a full traceback the error is on commit (since there is no error that triggers rollback is reported). But to better differentiate commit from rollback in traceback we should not do self._privileged_execute(query) but rather make that different lines:
    if extype is None:
        self._privileged_execute(self._make_commit_query())
    else:
        self._privileged_execute(self._make_rollback_query())
    
    Alternative is probably to catch errors here and add more context.
  2. We should probably emit some debugging info to make a reason more clear. Off the top of my head: log transaction duration and connection inactivity time either in the error message itself, or in some debug-level log message.
  3. Together with (2) we should probably warn when the duration of a successful transaction comes close to the transaction duration or the connection inactivity time limit. So it's clear what's going even without extra logging.

tailhook avatar Jan 02 '23 14:01 tailhook

@fantix is it possible that connection is pre-emptied by the server because of too many connections or something?

I just double checked the server code, I don't think we are closing any frontend connection actively other than killng the idle ones, even if there are a lot of connections on different databases.

@monomonedula I assume you're running on a recent version of edgedb-python, right? So I think, except for reruning the test after we apply the suggestions from @tailhook for debugging on the client-side, it might also be worth looking at the server logs for anything interesting, and/or the server metrics like client_connections_idle_total or background_errors_total for signs of possible issues (ref).

fantix avatar Jan 02 '23 16:01 fantix

@fantix the link you gave for observability docs gives me 404

monomonedula avatar Jan 16 '23 08:01 monomonedula

@monomonedula Sorry it was just moved to: https://www.edgedb.com/docs/reference/http#observability

fantix avatar Jan 16 '23 13:01 fantix