airflow
airflow copied to clipboard
Intermittent json errors from xcom sidecar
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.8.3
What happened?
We are facing intermittent json error but on next retry it works.
[2024-04-26, 00:21:32 IST] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:36 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:40 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:44 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-04-26, 00:21:52 IST] {pod.py:909} INFO - Deleting pod: hps-mydata-generation-9lqygp1s
[2024-04-26, 00:21:52 IST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 200, in execute
result = self.extract_xcom(pod=self.pod)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 557, in extract_xcom
result = self.pod_manager.extract_xcom(pod)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 730, in extract_xcom
result = self.extract_xcom_json(pod)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 289, in wrapped_f
return self(f, *args, **kw)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 379, in __call__
do = self.iter(retry_state=retry_state)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 325, in iter
raise retry_exc.reraise()
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 158, in reraise
raise self.last_attempt.result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __call__
result = fn(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 765, in extract_xcom_json
json.loads(result)
File "/usr/local/lib/python3.10/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.10/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.10/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 16385 (char 16384)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 439, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 215, in execute
self.cleanup(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup
raise AirflowException
What you think should happen instead?
The task should not fail when xcom data.
How to reproduce
This can be reproduced by having a 20k char in json file for xcom and it'll fail intermittently while taking the data. I'll investigate on the reason for the xcom json issue.
Operating System
Amazon Linux 2
Versions of Apache Airflow Providers
pytest>=6.2.5 docker>=5.0.0 crypto>=1.4.1 cryptography>=3.4.7 pyOpenSSL>=20.0.1 ndg-httpsclient>=0.5.1 boto3>=1.34.0 sqlalchemy redis>=3.5.3 requests>=2.26.0 pysftp>=0.2.9 werkzeug>=1.0.1 apache-airflow-providers-cncf-kubernetes==8.0.0 apache-airflow-providers-amazon>=8.13.0 psycopg2>=2.8.5 grpcio>=1.37.1 grpcio-tools>=1.37.1 protobuf>=3.15.8,<=3.21 python-dateutil>=2.8.2 jira>=3.1.1 confluent_kafka>=1.7.0 pyarrow>=10.0.1,<10.1.0
Deployment
Official Apache Airflow Helm Chart
Deployment details
Official helm chart deployment.
Anything else?
I think , we are facing similar issue : https://github.com/apache/airflow/issues/32111 And it's fixed here : https://github.com/apache/airflow/pull/32113/files , we might need to increase the retry count.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Also found this github thread, shall we implement this : https://github.com/kubernetes-client/python-base/issues/190#issuecomment-805073981
@potiuk , Can you please guide us on this ? I'm not able to find a solution for this. This occurs intermittently and works on next retry. If you require further information on this , I'll be able collect the logs.
I can confirm that the issue is solved with the below code , we have added this as custom extract_xcom : This is also mentioned here : https://github.com/kubernetes-client/python-base/issues/190#issuecomment-805073981 , We didn't have this issue in v2.3.3 , so I believe this PR could have caused the error : https://github.com/apache/airflow/pull/23490/files
def extract_xcom_json(self, pod: V1Pod):
try:
self.log.info(f'Running command... cat {PodDefaults.XCOM_MOUNT_PATH}/return.json')
client = kubernetes_stream(
self._client.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
container=PodDefaults.SIDECAR_CONTAINER_NAME,
command=[
'/bin/sh',
'-c',
f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json',
],
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=False,
_request_timeout=10,
)
client.run_forever(timeout=10)
result = client.read_all()
self.log.info("Received {} ({}) ({} ... {}))".format(type(result), len(result), result[:64], result[-64:]))
# validate it's valid json
_ = json.loads(result)
# Terminate the sidecar
_ = kubernetes_stream(
self._client.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
container=PodDefaults.SIDECAR_CONTAINER_NAME,
command=[
'/bin/sh',
'-c',
'kill -s SIGINT 1',
],
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=True,
_request_timeout=10,
)
return result
except json.JSONDecodeError:
message = f'Failed to decode json document from pod: {pod.metadata.name}'
self.log.exception(message)
raise AirflowException(message)
except Exception as e:
message = f'Failed to extract xcom from pod: {pod.metadata.name}'
self.log.exception(message)
raise AirflowException(message)
@paramjeet01 do you have a proposed fix in mind? Can you open a PR?
Any news here? Is there any known workaround while a fix is in the way?
@eladkal yes , I'll create a PR with the above suggest code. I'm afraid that I can't find the root cause of the issue in current code.
@cleivson , you can customise your airflow to call the above mentioned method till we have a fix
@paramjeet01 , thanks. I'm not really sure where to put this. Based on your SO "Amazon Linux 2", am I right to assume you're using MWAA? I'm using it and I was wondering if the bug could be related to the environment
No , I use community edition and I have customized the xcom code to solve this problem