dagster icon indicating copy to clipboard operation
dagster copied to clipboard

[dagster-k8s] Retried k8s job is reported as failure

Open alex-treebeard opened this issue 2 years ago • 7 comments

Summary

A successful job is reported as a failure by dagster-k8s when it is retried due to backoff_limit > 0

dagster_k8s.client.DagsterK8sError: Encountered failed job pods for job dagster-job-4fa7db7aa93211d03f3eca0e2acff339 with status: {'active': 1,
'completion_time': None,
'conditions': None,
'failed': 1,
'start_time': datetime.datetime(2022, 1, 18, 15, 14, 30, tzinfo=tzlocal()),
'succeeded': None}, in namespace faculty-dagster
  File "/usr/local/lib/python3.7/site-packages/dagster_celery_k8s/executor.py", line 457, in _execute_step_k8s_job
    wait_timeout=job_wait_timeout,
  File "/usr/local/lib/python3.7/site-packages/dagster_k8s/utils.py", line 41, in wait_for_job_success
    num_pods_to_wait_for,
  File "/usr/local/lib/python3.7/site-packages/dagster_k8s/client.py", line 278, in wait_for_job_success
    job_name=job_name, status=status, namespace=namespace

A failed pod does not imply a failed job. I believe this is a bug.

Reproduction

If you can create a solid which fails on first attempt and set backoff_limit to 1, you will find the job succeeds but dagster reports as failure.

I have this issue in 0.12.5, but believe that it is still present in 0.13.14

alex-treebeard avatar Jan 18 '22 15:01 alex-treebeard

This seems to fix our use case: https://github.com/facultyai/dagster/commit/0f770784b771b39956b2741859a7fb3cfdd71dd5

Still not sure about:

  1. If extra work is required to avoid retrying non-retryable errors (e.g. a bug in the solid logic)
  2. how we could/should accommodate num_pods_to_wait_for != 1

alex-treebeard avatar Jan 18 '22 18:01 alex-treebeard

cc @johannkm

yuhan avatar Feb 15 '22 22:02 yuhan

Hi @alex-treebeard, what run launcher/executor are you using? I believe the celery_k8s_job_executor?

Thanks for catching this! Part of why this has slipped by is that the majority of users don't configure the backoff_limit on K8s Jobs. What we actually recommend is using https://docs.dagster.io/_apidocs/ops#dagster.RetryPolicy which can configured on Ops and Jobs. With this retry configured, Dagster will launch a whole new K8s Job after a failure.

I'm curious if RetryPolicies cover your use case for backoff_limit. If so I think we may consider deprecating the knob for backoff_limit. We should definitely improve documentation around this.

johannkm avatar Feb 15 '22 23:02 johannkm

@johannkm Yes using celery_k8s_job_executor. I'm not 100% sure of this, but I believe retrypolicy did not help the case where the k8s apiserver fails to create a pod to run the solid.

We were on 0.12.5 when I patched this, now we're on 0.13. I expect to revisit this issue in the coming weeks.

alex-treebeard avatar Feb 16 '22 13:02 alex-treebeard

@johannkm Having tested this again, I can confirm that dagster RetryPolicy will only retry if business logic fails, whereas backoff_limit allows us to retry APIServer failures/timeout, so I would like to upstream this fix if possible.

alex-treebeard avatar Mar 08 '22 17:03 alex-treebeard

I also believe backoff_limit should be used to retry in case of APIServer failures/timeout, specifically when there is a trigger of auto-scaling marking the status of the pod as OutOfcpu for example.

gjeusel avatar Mar 21 '22 18:03 gjeusel

Hello, I have tried some things and I could not find a work around or solution.

Edit: forgot to mention we are on dagster version 1.4.7.

We are in the same situation (using celery_k8s_job_executor) and sometimes pods are deleted outside of dagster, for example by the autoscaler removing the node. We have not been able to find a reliable way of restarting the specific op that failed when this happens.

I have tried adding dagster/max_retries and dagster/retry_policy: FROM_FAILURE to the job, however in that case not all steps in the job are executed: the failed one specifically is not executed, only the downstream ones. They of course crash because the expected input is not present. In our particular case we have many dynamic steps, which may be the reason why this retry policy is not working as expected? Using retry policy ALL_STEPS works, however it is very resource intensive especially if the failed step happens near the end of the pipeline, which very often is the case.

I have also tried the setting (in the helm chart) dagsterDaemon.runMonitoring.maxResumeAttempts, although it clearly specifies it only works with k8s_job_executor. Also setting runLauncher.config.celeryK8sRunLauncher.failPodOnRunFailure=true does not change anything as far as I could tell.

Additionally, I have also added backoff_limit: 3 to tag dagster-k8s/config, under job_spec_config, which results in correctly increasing the backoff limit of the step jobs. Looks like in this case I'm hitting an edge case not considered in the job executor because even if in this case the Kubernetes job correctly starts a new pod, it is not recognized by the run executor and simply reports: Step <op-name> finished without success or failure event. Downstream steps will not execute..

When backoff_limit is zero then the error originally reported above is thrown:

dagster_k8s.client.DagsterK8sError: Encountered failed job pods for job dagster-step-7ba8de678dc53724aac3de45df947f59 with status: {'active': None,
 'completed_indexes': None,
 'completion_time': None,
 'conditions': [{'last_probe_time': datetime.datetime(2023, 8, 20, 12, 19, 5, tzinfo=tzlocal()),
                 'last_transition_time': datetime.datetime(2023, 8, 20, 12, 19, 5, tzinfo=tzlocal()),
                 'message': 'Job has reached the specified backoff limit',
                 'reason': 'BackoffLimitExceeded',
                 'status': 'True',
                 'type': 'Failed'}],
 'failed': 1,
 'ready': 0,
 'start_time': datetime.datetime(2023, 8, 20, 12, 18, 51, tzinfo=tzlocal()),
 'succeeded': None,
 'uncounted_terminated_pods': {'failed': None, 'succeeded': None}}, in namespace dagster-dta

  File "/usr/local/lib/python3.10/site-packages/dagster_celery_k8s/executor.py", line 450, in _execute_step_k8s_job
    api_client.wait_for_job_success(
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 350, in wait_for_job_success
    self.wait_for_running_job_to_succeed(
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 399, in wait_for_running_job_to_succeed
    raise DagsterK8sError(

However, when backoff_limit is non-zero and the pod is deleted (I'm manually deleting the pod to simulate the original problem) then even if an exception in the business logic gets triggered (typically dagster._core.errors.DagsterExecutionInterruptedError, however not always the case) and the step is retried several things go wrong:

  • In some cases the logs cannot be retrieved, crashing the executor and therefore the whole run: The error is: [CeleryK8sJobExecutor] Encountered unexpected error while fetching pod logs for Kubernetes job dagster-step-1b1bf64b82a3dee76936a1c4bb5cd3a8, Pod name dagster-step-1b1bf64b82a3dee76936a1c4bb5cd3a8-5f4dk for step <step-name>. Will attempt to continue with other pods., and the stack trace:

    kubernetes.client.exceptions.ApiException: (400)
    Reason: Bad Request
    HTTP response headers: HTTPHeaderDict({'Audit-Id': 'f4ac8be4-67b8-42a9-9aca-857fbbf6dcef', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Sun, 20 Aug 2023 12:30:01 GMT', 'Content-Length': '238'})
    HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"container \\"dagster\\" in pod \\"dagster-step-1b1bf64b82a3dee76936a1c4bb5cd3a8-5f4dk\\" is waiting to start: ContainerCreating","reason":"BadRequest","code":400}\n'
    
    
      File "/usr/local/lib/python3.10/site-packages/dagster_celery_k8s/executor.py", line 532, in _execute_step_k8s_job
        raw_logs = api_client.retrieve_pod_logs(pod_name, namespace=job_namespace)
      File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 667, in retrieve_pod_logs
        return self.core_api.read_namespaced_pod_log(
      File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 23957, in read_namespaced_pod_log
        return self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs)  # noqa: E501
      File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 24076, in read_namespaced_pod_log_with_http_info
        return self.api_client.call_api(
      File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 348, in call_api
        return self.__call_api(resource_path, method,
      File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
        response_data = self.request(
      File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 373, in request
        return self.rest_client.GET(url,
      File "/usr/local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 241, in GET
        return self.request("GET", url,
      File "/usr/local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 235, in request
        raise ApiException(http_resp=r)
    
  • In another case the run itself was terminated because the job executor raised an exception when the pod was killed but because of backoff_limit > 0 the step was automatically attempted again by Kubernetes, successfully finishing the run. It even appeared on dagster UI after the run was terminated!

  • On top of this, sometimes the run itself never finishes because Kubernetes never reports a failed job to dagster, as it correctly finishes.

Minor thing to mention: I have run these with Op Retries as described in the documentation. However as mentioned above by @alex-treebeard adding this makes no difference because the error originates from the executor and not the business logic.

I don't know if this stems from using celery_k8s_job_executor instead of k8s_job_executor. I cannot test right now easily without celery, however it is not blocking to switch executors if that solves the issue.

Do you have any suggestion to move forward? Does this need to be fixed in dagster-k8s? My impression from this quick investigation is that the logic to deal with job failures due to Kubernetes circumstances can be improved. What is your opinion?

Thank you very much for reading!

Milias avatar Aug 20 '23 12:08 Milias