Watches can get stuck
Hello!
After upgrading from 31.x to 32.x of this library, I am noticing that occasionally watches can get "stuck", even when they have a timeout configured. I haven't able to repro it locally with a debugger because it seems to take an arbitrary/long amount of time to repro in production (many hours/days). From the logging I was able to gather it appears that the async generator returned by stream() is just never yielding anything again, and never times out. I am not seeing any exceptions being thrown. I am running in GKE and observing this behavior on both K8s version 1.31 and 1.32.
I am going to try wrapping the async generator stream() in a class that will time out after a bit longer than the configured watch timeout so that I can confirm where it's getting stuck.
Some code snippets for the watching:
async def watch_pods(app):
app['state'].pods = {}
async with ApiClient() as api_client:
v1_api = client.CoreV1Api(api_client)
while True:
try:
# Load a full list of pods
app['state'].pods = {}
resource_version = await get_all_pods(app, v1_api)
app['state'].k8s_last_update = time.time()
wipe_cache = False
# Stream changes to pods with the watch API
while not wipe_cache:
wipe_cache, resource_version = await watch_k8s_stream(app, v1_api.list_pod_for_all_namespaces,
resource_version, on_pod_event)
LOGGER.info("Wiping pod cache")
except asyncio.CancelledError:
LOGGER.info("Cancelling K8s Pod watch")
break
except Exception as e:
LOGGER.exception(f"Uncaught Exception in K8s Pod watch: {e}")
await asyncio.sleep(10)
async def watch_k8s_stream(app, v1_api_func, resource_version, callback, **kwargs):
LOGGER.debug(f"Starting {v1_api_func.__name__} stream from resource version {resource_version}")
latest_version = resource_version
try:
async with watch.Watch().stream(v1_api_func, resource_version=latest_version, allow_watch_bookmarks=False,
timeout_seconds=180, **kwargs) as stream:
async for event in stream:
evt, obj = event['type'], event['object']
if evt == 'ERROR':
LOGGER.warning(f"Got error from {v1_api_func.__name__} stream: {obj}. Clearing cache and restarting watch")
return True, latest_version
latest_version = obj.metadata.resource_version
app['state'].k8s_last_update = time.time()
if evt == "BOOKMARK":
continue
# Call the provided callback with the event details
callback(app, evt, obj)
LOGGER.debug(f"Stream {v1_api_func.__name__} End")
LOGGER.debug(f"Watch {v1_api_func.__name__} closed")
except ApiException as e:
LOGGER.log(logging.INFO if e.status == 410 else logging.ERROR, f"Got error from {v1_api_func.__name__} stream: {e}. Clearing cache and restarting watch")
return True, latest_version
return False, latest_version
Yeah I can confirm it seems to be getting stuck somewhere inside the async generator. I wrapped it to raise an exception if there is more than 200s between events, and that went off, even though the timeout_seconds on the watch is set to 180s.
Thanks for reporting the issue.
Starting from version v32.0.0 a bug related to timeout on the client side has been fixed (#337). In the previous version the timeout was always set to 5 minutes, but now it's disabled by default as in the official Kubernetes client. It may be important in your case so I suggest adding _request_timeout too to prevent getting stuck on te TCP level. More information about timeouts: https://github.com/kubernetes-client/python/blob/master/examples/watch/timeout-settings.md
async with watch.Watch().stream(v1_api_func, resource_version=latest_version, allow_watch_bookmarks=False,
timeout_seconds=600, _request_timeout=300, **kwargs) as stream:
async for event in AsyncIteratorWithTimeout(stream, timeout=630):
I tried setting _request_timeout as well, but I am not sure that is working either. With the above code I am still seeing the 630s timeout on the iterator go off.
I am also seeing a bunch of these as well:
Uncaught Exception in K8s Pod watch: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data for satisfy transfer length header.'>