airflow
airflow copied to clipboard
Wait for xcom sidecar container to start before sidecar exec
According to k8s's docs on pod phase [1], "Running" state means "The Pod has been bound to a node, and all of the containers have been created. At least one container is still running, or is in the process of starting or restarting.".
This means there is no guarantee that the xcom sidecar container will have been
running by the time the extract_xcom is used by KPO's execute even if
we wait for pod to be a "Running" state - this further means if the base container
has completed before the xcom sidecar container is running, the entire
operator fails because you cannot exec against a non-running container.
Fix is to wait for the xcom sidecar container to be in the running state before we exec against it, which this PR implements.
[1] - https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase [2] - https://github.com/schattian/airflow/blob/86171ff43f8977021708cfa241da64f96c4c15e2/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L398
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst) Here are some useful points:
- Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
- In case of a new feature add useful documentation (in docstrings or in
docs/directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it. - Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
- Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
- Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
- Be sure to read the Airflow Coding style. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack
Not exactly sure how I can properly introduce a failing test and then make it pass here, would like some guidance on this.
Use mocking. see similar tests in test_pod_manager.py
The test I was thinking was more along the lines of actually having a k8s cluster that can reproduce the behaviour of xcom container not started when base container has completed, and check that xcom extraction was successful.
Anyway, I have added the mock-based unit test that just asserts the newly added await_xcom_sidecar_container_start has been called once and exactly once.
LGTM. @jedcunningham @dstandish WDYT?
The test I was thinking was more along the lines of actually having a k8s cluster that can reproduce the behaviour of xcom container not started when base container has completed, and check that xcom extraction was successful.
Running them on "integration" level is expensive and we alredy have some of those tests that take far too much time of our CI :)
Anyone else ?
i'll have a look
@ryantam626 are you still working on this PR?
@eladkal Oh sorry, I wasn't aware the WDYTs were directed at me.
I will work on the changes @dstandish suggested now.
Static checks failed :(
@eladkal all green now, sorry for the wait.
And thanks to whoever approved the start of CI for me :pray:
This looks cool - @dstandish you might take a look when you are back but this one looks rather straightforward. Merging.
Awesome work, congrats on your first merged pull request!