prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Track pod state from deployment run

Open jpedrick-numeus opened this issue 1 year ago • 3 comments

Describe the current behavior

As a cost overrun prevention measure our kubernetes work pool base job template has active_deadline_seconds set. If a pod is killed from under the job, it stays in the Running state forever and takes up slots in the work queue.

Describe the proposed behavior

I think it would make sense to track the pod state for kubenetes work pools, so that the jobs know to either start a new pod and re-run or report 'Failed' with a reason.

Likewise, it would be good if some job metadata (such as the pod name, resource requests, etc) were visible from the prefect UI.

Example Use

They can add a simple layer of protection for cost-overruns by setting active_deadline_seconds.

Additional context

No response

jpedrick-numeus avatar Sep 17 '24 16:09 jpedrick-numeus

Thanks for the enhancement request @jpedrick-numeus! One idea we've had in this area is to have pods send heartbeats back to the Prefect server so that if the heartbeats stop, the server knows if a pod went down. In this case, we'd probably mark the flow run as CRASHED since the underlying infrastructure caused the failure. Does that sounds like it would work for your use case?

Also, where would you expect to see Kubernetes information for a flow run in the Prefect UI?

desertaxle avatar Sep 18 '24 16:09 desertaxle

@desertaxle that would work for me. In my case I only need the pod state to be tracked so that prefect knows to move on to the next job in the Work Queue.

I think the details tab under https:///flow-runs/flow-run/?tab=Details would be perfect.

jpedrick-numeus avatar Sep 18 '24 18:09 jpedrick-numeus

just to help tie dispersed communication together, i believe these threads are the same issue:

  • https://github.com/PrefectHQ/prefect/issues/7239
  • https://github.com/PrefectHQ/prefect/issues/15945

it seems that using heartbeats to detect crashed pids is an old idea, but unfortunately it caused unhappy memories back in v1. yet, according to the replies in #7239, many members of the community (myself included) would love to see it come back (of course, as an optional non-default feature).

the release of prefect v3 brought back an implementation of heartbeats as a status on the worker (for worker pools only). i would speculate it's not too hard to write a loop service that periodically checks for crashed workers and places their deployments also into crashed state, but the specifics are beyond my expertise.

the ability to distinguish a healthy worker vs a worker that crashed but came back before the heartbeat threshold was reached, may also not be easy to implement. perhaps one could use the unique worker id (auto-generated unique k8s pod name for example) as an incarnation distinguisher.

the other way to (partially) mitigate this issue would be to propagate SIGINT from the parent process to the child process, thereby giving the child process some time to react and gracefully shut down, or at least report back to prefect api that it has crashed. this behavior was first noticed a while ago but it seems to not have been fixed.

jameswu1991 avatar Nov 08 '24 22:11 jameswu1991

@jameswu1991 I started looking into this today as this problem is increasingly an issue w/ EC2 Spot instances.

I noticed with prefect-kubernetes that it's possible to list the running jobs. Rather than a heartbeat, how difficult would it be to query Kubernetes directly for the pod state? I'm picturing a process that runs periodically to check the flow-run-job's pod state and marks it crashed/failed if the pod no longer exists.

jpedrick-numeus avatar Dec 18 '24 15:12 jpedrick-numeus

@desertaxle @jameswu1991 I was able to work around this issue in Kubernetes with a scheduled flow that uses the infrastructure_pid to find if any pods exist. This likely would need to be modified in a mixed worker environment, so ymmv. I have the flow scheduled to run every 15 minutes.

get the pods

def get_pods(logger: logging.LoggerAdapter) -> list[str]:
    kube_config.load_incluster_config()
    client = kubernetes.client.CoreV1Api()
    pod_list = client.list_namespaced_pod('prefect')
    pod_names: list[str] = []
    logger.info('Fetching pods')
    for pod in pod_list.items:
        logger.info(
            '%s\t%s\t%s' % (pod.metadata.name, pod.status.phase, pod.status.pod_ip)
        )
        pod_names.append(pod.metadata.name)

    return pod_names

get running flows and compare to pod names

    running_flow_runs = FlowRunFilter(
        state=FlowRunFilterState(type=FlowRunFilterStateType(any_=[StateType.RUNNING]))
    )

    pod_names = get_pods(logger)

    async with get_client() as client:
        n = 0
        running_flows_to_delete = []
        while True:
            flow_runs: list[FlowRun] = await client.read_flow_runs(
                flow_run_filter=running_flow_runs, offset=parameters.query_size_limit * n
            )

            if not flow_runs:
                break

            for r in flow_runs:
                if r.infrastructure_pid:
                    logger.info('found infrastructure_pid: %s', r.infrastructure_pid)
                    job_name = r.infrastructure_pid.split(':')[2]
                    if job_name:
                        matching_pods = [
                            pod for pod in pod_names if pod.startswith(job_name)
                        ]
                        logger.info(
                            'found infrastructure-pid: %s, job_name: %s, matching-pods: %s',
                            r.infrastructure_pid,
                            job_name,
                            matching_pods,
                        )
                        if len(matching_pods) == 0:
                            logger.warning('Flow run is likely a zombie: %s', r)
                            running_flows_to_delete.append(r)
                else:
                    logger.info(
                        'Did not find infrastructure_pid in flow-run: %s %s',
                        r.id,
                        r.name,
                    )

            n += 1

            logger.warning("Found %d zombie flows to kill.", len(running_flows_to_delete))

finally set the flows as Crashed:

                for i, r in enumerate(running_flows_to_delete):
                    logger.warning('Killing zombie flow run(%s/%d): %s - %s', i, len(running_flows_to_delete), r.id, r.name)
                    await client.set_flow_run_state(
                        r.id, state=prefect.states.Crashed(), force=True
                    )
                    

jpedrick-numeus avatar Dec 20 '24 19:12 jpedrick-numeus