aetcd
aetcd copied to clipboard
Is it necessary to make refresh_lease one long-lived coroutine?
Here is my code:
async def refresh_lease(self, lease_id: int, *, period: int):
refresh = self.aetcd_client.refresh_lease
while True:
# stream_stream meth but only request once with request_timeout
# so this will raise ConnectionTimeoutError(ClientError),
# then it means the service is down in etcd's view, and service supplier
# must register itself again
reply = await refresh(lease_id) # let registered key closed with lease itself
self.last_refresh_time = monotonic() # update if succeed
logger.debug(f"[EtcdLease] refresh_lease got a reply: {reply.ID} / {self.lease_id}")
await sleep(period)
Two hours after the periodic execution of refresh_lease of my code, I got the following exceptions:
Traceback (most recent call last):
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 21, in handler
return await f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 42, in handler
return await f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 1033, in refresh_lease
async for reply in self.leasestub.LeaseKeepAlive(
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/grpc/aio/_call.py", line 356, in _fetch_stream_responses
await self._raise_for_status()
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/grpc/aio/_call.py", line 263, in _raise_for_status
raise _create_rpc_error(
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "Internal error from Core"
debug_error_string = "Failed "execute_batch": (<grpc._cython.cygrpc.SendMessageOperation object at 0x7ff7890dd030>,)"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/chatp-0.0.1-py3.11.egg/chatp/discovery/etcd.py", line 62, in refresh_lease
reply = await refresh(
^^^^^^^^^^^^^^
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 23, in handler
exceptions._handle_exception(e)
File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/exceptions.py", line 80, in _handle_exception
raise e(error_details) from error
aetcd.exceptions.InternalError: Internal error from Core
@garvenlee aetcd
is mostly a thin wrapper over native gRPC
transport used by etcd
. In this particular case what refresh_lease
does is just iterate over replies https://github.com/martyanov/aetcd/blob/v1.0.0a4/aetcd/client.py#L1032C6-L1038C25, you can see this in your traceback also. I have no idea what is the reason behind the internal error, but it came from gRPC
.
If this is a transient issue may be just spawn a warning and continue, otherwise check the server side and probably network for issues.
Also the thread may be helpful https://github.com/martyanov/aetcd/issues/31.
@martyanov
aetcd is mostly a thin wrapper over native gRPC transport used by etcd
Yeah, but I suspect that the frequent creation of stream-stream requests is more stressful for the ETCD server? So can you evaluate this:
async def refresh_lease(self, id):
await self.id_queue.put(id)
waiter = create_future()
self.waiter = waiter
return await wait_for(waiter, timeout)
async def grpc_lease_keepalive(self, id_queue):
async def request_iterator():
while True:
id = await id_queue.get()
yield LeaseKeepAliveRequest(ID=id)
async for reply in stub.LeaseKeepalive(request_iterator()):
if not self.waiter.done():
self.waiter.set_result(reply)
@garvenlee I think you don't need this operation to perform continuously. Usually there is a sane TTL, for example 10 seconds, and after that you don't need more than TTL / 2
interval checks to maintain the lease. It highly depends on you particular case, but even 1 second interval seems unreasonable.
All the operations share one single channel and connection, so there should not be much overhead except for extreme cases.
The lease machinery by itself solves the original KV TTL problem, when you have many entities and want to maintain TTL for all of them. With lease it is as easy as attach the same lease ID to all of them. I don't argue that there are no projects that will be limited by the one-stream-stream-per-refresh, but It's not an ordinary situation. When you have to maintain a large amount of keys with different TTLs leases are not the biggest concern.
All the operations share one single channel and connection
Indeed, all requests share a channel, but I think that since ETCD has designed LeaseKeepalive as a two-way stream method, reusing this same stream can avoid the cost of stream resources to a certain extent.