airflow icon indicating copy to clipboard operation
airflow copied to clipboard

KubernetesPodOperator fails due to getting logs

Open antoniocorralsierra opened this issue 1 year ago • 7 comments

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes 8.0.1 apache-airflow-providers-celery 3.6.1

Apache Airflow version

2.8.3

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

I have an Airflow instance deploy on GKE cluster with helm (keda is activate on workers to scale from 1 to 5 with a cooldownPeriod set to 240). I use CeleryKubernetesExecutor and I run the tasks with KubernetesPodOperator on the celery workers. To not have an active task on celery worker while KubernetesPodOperator running I activate deferrable mode on them. I set poll_interval to 10 seconds. After that many errors with 404 status code appears.

Situation:

example_task = KubernetesPodOperator(task_id="example_task", name="example_task", get_logs=True, on_finish_action="delete_pod", log_events_on_failure=True, deferrable=True, poll_interval=10, logging_interval=None)

We can suppose that the rest of input params are correct.

The steps are:

  1. The celery worker get the task.
  2. Launch the KubernetesPodOperators that create a new pod on the cluster that execute the task.
  3. The status of the task change to deferred.
  4. Checking the triggerer log I see that task is complete: [2024-04-24T12:48:57.740+0000] {triggerer_job_runner.py:623} INFO - trigger example_task (ID 668324) completed
  5. Check the status of the pod, it is completed (success) and finished at [2024-04-24 12:48:51.122].
  6. Task change to queue status, waiting for a gap on celery worker to run.
  7. When a gap is avalible on celery worker the task begin to run.
  8. After running, the task finish with error and change to up_for_retry status.

The task logs is: [2024-04-24, 12:51:21 UTC] {taskinstance.py:2731} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1604, in resume_execution return execute_callable(context) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 738, in trigger_reentry self.write_logs(self.pod) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 771, in write_logs logs = self.pod_manager.read_pod_logs( File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 324, in wrapped_f return self(f, *args, **kw) File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 404, in __call__ do = self.iter(retry_state=retry_state) File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 360, in iter raise retry_exc.reraise() File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 193, in reraise raise self.last_attempt.result() File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result return self.__get_result() File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 407, in __call__ result = fn(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 668, in read_pod_logs logs = self._client.read_namespaced_pod_log( File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 23957, in read_namespaced_pod_log return self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs) # noqa: E501 File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 24076, in read_namespaced_pod_log_with_http_info return self.api_client.call_api( File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 373, in request return self.rest_client.GET(url, File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 244, in GET return self.request("GET", url, File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 238, in request raise ApiException(http_resp=r) kubernetes.client.exceptions.ApiException: (404) Reason: Not Found

The trigger_reentry method causes the error. I think that this is due to the POST_TERMINATION_TIMEOUT value. This is set to 120 on KubernetesPodOperator and the worker try to read the pod log 3 minutes later (pod finish at 12:48:51 and worker reading at 12:51:21). In that time the pod don't exists it not possible read the log and 404 error is back.

What you think should happen instead

The task should finish success with a warning indicating that logs can't reading or maybe other option but not mark the task as failed and set to up_for_retry.

How to reproduce

The steps are indicate on "What happened" section. You just activate deferrable mode on KubernetesPodOperators and use CeleryKubernetesExecutor. Derive the task to celery workers.

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

antoniocorralsierra avatar Apr 24 '24 16:04 antoniocorralsierra

There is exception handling for write_logs:

    def write_logs(self, pod: k8s.V1Pod):
        try:
            logs = self.pod_manager.read_pod_logs(
                pod=pod,
                container_name=self.base_container_name,
                follow=False,
            )
            for raw_line in logs:
                line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
                self.log.info("Container logs: %s", line)
        except HTTPError as e:
            self.log.warning(
                "Reading of logs interrupted with error %r; will retry. "
                "Set log level to DEBUG for traceback.",
                e,
            )

The error that bubbles up is an kubernetes.client.exceptions.ApiException though... I wonder if this should be included in the exception handling?

RNHTTR avatar Apr 25 '24 15:04 RNHTTR

Hello @RNHTTR ,

Yes, I belive that too. I am doing a PR including this exception in that point but It is my firts PR and it's taking me longer.

antoniocorralsierra avatar Apr 25 '24 15:04 antoniocorralsierra

Something like this:

    def write_logs(self, pod: k8s.V1Pod):
        try:
            logs = self.pod_manager.read_pod_logs(
                pod=pod,
                container_name=self.base_container_name,
                follow=False,
            )
            for raw_line in logs:
                line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
                self.log.info("Container logs: %s", line)
        except (HTTPError, ApiException) as e:
            self.log.warning(
                "Reading of logs interrupted with error %r; will retry. "
                "Set log level to DEBUG for traceback.",
                e if not isinstance(e, ApiException) else e.reason,
            )

antoniocorralsierra avatar Apr 25 '24 15:04 antoniocorralsierra

I think simply handling the ApiException won't be enough. We should perform additional checks because during cleanup, we also read the pod.

pankajastro avatar Apr 25 '24 16:04 pankajastro

Hi @pankajastro ,

Of course. I have added the following in the _clean:

    def _clean(self, event: dict[str, Any]):
        if event["status"] == "running":
            return
        istio_enabled = self.is_istio_enabled(self.pod)
        # Skip await_pod_completion when the event is 'timeout' due to the pod can hang
        # on the ErrImagePull or ContainerCreating step and it will never complete
        if event["status"] != "timeout":
            try:
                self.pod = self.pod_manager.await_pod_completion(
                    self.pod, istio_enabled, self.base_container_name
                )
            except ApiException as e:
                if e.status == 404:
                    self.pod = None
                else:
                    raise e
        if self.pod is not None:
            self.post_complete_action(
                pod=self.pod,
                remote_pod=self.pod,
            )

I have done this due to the await_pod_completion can throw the same ApiException than write_logs. Handling this exception not is necessary make the post_complete_action. What do you think?

antoniocorralsierra avatar Apr 26 '24 07:04 antoniocorralsierra

would you like to create a PR?

pankajastro avatar Apr 26 '24 07:04 pankajastro

Yes, I would like it but It's my first time so I reading how do it

antoniocorralsierra avatar Apr 26 '24 08:04 antoniocorralsierra