kubernetes_asyncio icon indicating copy to clipboard operation
kubernetes_asyncio copied to clipboard

Watches can get stuck

Open nosammai opened this issue 7 months ago • 3 comments

Hello!

After upgrading from 31.x to 32.x of this library, I am noticing that occasionally watches can get "stuck", even when they have a timeout configured. I haven't able to repro it locally with a debugger because it seems to take an arbitrary/long amount of time to repro in production (many hours/days). From the logging I was able to gather it appears that the async generator returned by stream() is just never yielding anything again, and never times out. I am not seeing any exceptions being thrown. I am running in GKE and observing this behavior on both K8s version 1.31 and 1.32.

I am going to try wrapping the async generator stream() in a class that will time out after a bit longer than the configured watch timeout so that I can confirm where it's getting stuck.

Some code snippets for the watching:

async def watch_pods(app):
    app['state'].pods = {}
    async with ApiClient() as api_client:
        v1_api = client.CoreV1Api(api_client)
        while True:
            try:
                # Load a full list of pods
                app['state'].pods = {}
                resource_version = await get_all_pods(app, v1_api)
                app['state'].k8s_last_update = time.time()
                wipe_cache = False

                # Stream changes to pods with the watch API
                while not wipe_cache:
                    wipe_cache, resource_version = await watch_k8s_stream(app, v1_api.list_pod_for_all_namespaces,
                                                                          resource_version, on_pod_event)

                LOGGER.info("Wiping pod cache")

            except asyncio.CancelledError:
                LOGGER.info("Cancelling K8s Pod watch")
                break
            except Exception as e:
                LOGGER.exception(f"Uncaught Exception in K8s Pod watch: {e}")
                await asyncio.sleep(10)
async def watch_k8s_stream(app, v1_api_func, resource_version, callback, **kwargs):
    LOGGER.debug(f"Starting {v1_api_func.__name__} stream from resource version {resource_version}")
    latest_version = resource_version
    try:
        async with watch.Watch().stream(v1_api_func, resource_version=latest_version, allow_watch_bookmarks=False,
                                        timeout_seconds=180, **kwargs) as stream:
            async for event in stream:
                evt, obj = event['type'], event['object']

                if evt == 'ERROR':
                    LOGGER.warning(f"Got error from {v1_api_func.__name__} stream: {obj}. Clearing cache and restarting watch")
                    return True, latest_version

                latest_version = obj.metadata.resource_version
                app['state'].k8s_last_update = time.time()

                if evt == "BOOKMARK":
                    continue

                # Call the provided callback with the event details
                callback(app, evt, obj)

            LOGGER.debug(f"Stream {v1_api_func.__name__} End")
        LOGGER.debug(f"Watch {v1_api_func.__name__} closed")
    except ApiException as e:
        LOGGER.log(logging.INFO if e.status == 410 else logging.ERROR, f"Got error from {v1_api_func.__name__} stream: {e}. Clearing cache and restarting watch")
        return True, latest_version
    return False, latest_version

nosammai avatar May 23 '25 15:05 nosammai

Yeah I can confirm it seems to be getting stuck somewhere inside the async generator. I wrapped it to raise an exception if there is more than 200s between events, and that went off, even though the timeout_seconds on the watch is set to 180s.

nosammai avatar May 23 '25 19:05 nosammai

Thanks for reporting the issue.

Starting from version v32.0.0 a bug related to timeout on the client side has been fixed (#337). In the previous version the timeout was always set to 5 minutes, but now it's disabled by default as in the official Kubernetes client. It may be important in your case so I suggest adding _request_timeout too to prevent getting stuck on te TCP level. More information about timeouts: https://github.com/kubernetes-client/python/blob/master/examples/watch/timeout-settings.md

tomplus avatar Jun 01 '25 22:06 tomplus

        async with watch.Watch().stream(v1_api_func, resource_version=latest_version, allow_watch_bookmarks=False,
                                        timeout_seconds=600, _request_timeout=300, **kwargs) as stream:
            async for event in AsyncIteratorWithTimeout(stream, timeout=630):

I tried setting _request_timeout as well, but I am not sure that is working either. With the above code I am still seeing the 630s timeout on the iterator go off.

I am also seeing a bunch of these as well:

Uncaught Exception in K8s Pod watch: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data for satisfy transfer length header.'>

nosammai avatar Jun 02 '25 15:06 nosammai