aetcd icon indicating copy to clipboard operation
aetcd copied to clipboard

Is it necessary to make refresh_lease one long-lived coroutine?

Open garvenlee opened this issue 1 year ago • 7 comments

Here is my code:

    async def refresh_lease(self, lease_id: int, *, period: int):
        refresh = self.aetcd_client.refresh_lease                        
        while True:            
            # stream_stream meth but only request once with request_timeout            
            # so this will raise ConnectionTimeoutError(ClientError),           
            # then it means the service is down in etcd's view, and service supplier            
            # must register itself again            
            reply = await refresh(lease_id)  # let registered key closed with lease itself            
            self.last_refresh_time = monotonic()  # update if succeed            
            logger.debug(f"[EtcdLease] refresh_lease got a reply: {reply.ID} / {self.lease_id}")            
            await sleep(period)

Two hours after the periodic execution of refresh_lease of my code, I got the following exceptions:

Traceback (most recent call last):
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 21, in handler
    return await f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 42, in handler
    return await f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 1033, in refresh_lease
    async for reply in self.leasestub.LeaseKeepAlive(
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/grpc/aio/_call.py", line 356, in _fetch_stream_responses
    await self._raise_for_status()
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/grpc/aio/_call.py", line 263, in _raise_for_status
    raise _create_rpc_error(
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Internal error from Core"
	debug_error_string = "Failed "execute_batch": (<grpc._cython.cygrpc.SendMessageOperation object at 0x7ff7890dd030>,)"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/chatp-0.0.1-py3.11.egg/chatp/discovery/etcd.py", line 62, in refresh_lease
    reply = await refresh(
            ^^^^^^^^^^^^^^
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 23, in handler
    exceptions._handle_exception(e)
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/exceptions.py", line 80, in _handle_exception
    raise e(error_details) from error
aetcd.exceptions.InternalError: Internal error from Core

garvenlee avatar Dec 05 '23 05:12 garvenlee

@garvenlee aetcd is mostly a thin wrapper over native gRPC transport used by etcd. In this particular case what refresh_lease does is just iterate over replies https://github.com/martyanov/aetcd/blob/v1.0.0a4/aetcd/client.py#L1032C6-L1038C25, you can see this in your traceback also. I have no idea what is the reason behind the internal error, but it came from gRPC.

martyanov avatar Dec 05 '23 05:12 martyanov

If this is a transient issue may be just spawn a warning and continue, otherwise check the server side and probably network for issues.

martyanov avatar Dec 05 '23 05:12 martyanov

Also the thread may be helpful https://github.com/martyanov/aetcd/issues/31.

martyanov avatar Dec 05 '23 05:12 martyanov

@martyanov

aetcd is mostly a thin wrapper over native gRPC transport used by etcd

Yeah, but I suspect that the frequent creation of stream-stream requests is more stressful for the ETCD server? So can you evaluate this:

async def refresh_lease(self, id):
    await self.id_queue.put(id)
    waiter = create_future()
    self.waiter = waiter
    return await wait_for(waiter, timeout)

async def grpc_lease_keepalive(self, id_queue):
    async def request_iterator():
        while True:
            id = await id_queue.get()
            yield LeaseKeepAliveRequest(ID=id)

    async for reply in stub.LeaseKeepalive(request_iterator()):
       if not self.waiter.done():
           self.waiter.set_result(reply)
        

garvenlee avatar Dec 05 '23 05:12 garvenlee

@garvenlee I think you don't need this operation to perform continuously. Usually there is a sane TTL, for example 10 seconds, and after that you don't need more than TTL / 2 interval checks to maintain the lease. It highly depends on you particular case, but even 1 second interval seems unreasonable.

All the operations share one single channel and connection, so there should not be much overhead except for extreme cases.

martyanov avatar Dec 05 '23 06:12 martyanov

The lease machinery by itself solves the original KV TTL problem, when you have many entities and want to maintain TTL for all of them. With lease it is as easy as attach the same lease ID to all of them. I don't argue that there are no projects that will be limited by the one-stream-stream-per-refresh, but It's not an ordinary situation. When you have to maintain a large amount of keys with different TTLs leases are not the biggest concern.

martyanov avatar Dec 05 '23 06:12 martyanov

All the operations share one single channel and connection

Indeed, all requests share a channel, but I think that since ETCD has designed LeaseKeepalive as a two-way stream method, reusing this same stream can avoid the cost of stream resources to a certain extent.

garvenlee avatar Dec 05 '23 07:12 garvenlee