airflow
airflow copied to clipboard
Provide an alternative OpenTelemetry implementation for traces that follows standard otel practices
This is my first time contributing to Airflow and I'm not sure if there should be an AIP or a mailing discussion for such changes. I'd appreciate any feedback.
Issue description
related: https://github.com/apache/airflow/pull/40802
There are some OpenTelemetry standard practices that help keep the usage consistent across multiple projects. According to those
- when a root span starts, the trace id and the span id are generated randomly
- while the span is active, the context is captured
- injected into a carrier object which is a map or a python dictionary
- the carrier with the captured context is propagated across services and used to create sub spans
- the new sub-span extracts the parent context from the carrier and uses it to set the parent
- the trace id is accessed from the carrier and the span id is generated randomly
To explain more, this is what the flow of operations should be
- Dag run starts - start
dagrunspan i. Get thedagrunspan context - Start task - with the
dagruncontext, starttispan i. Get thetispan context - With the
tispan context create task-sub span - Task finishes -
tispan ends - Dag run finishes -
dagrunspan ends
Airflow follows a different approach
- trace and span ids are generated in a deterministic way from various properties of the
dag_runand thetask_instance - the spans for a dag and its tasks, are generated after the run has finished
This is the flow of the current implementation
- Dag run starts
- Tasks start
- Tasks finish
- Create spans for the tasks
- Dag run finishes
- Create span for the dag run
The current approach makes it impossible to create spans from under tasks while using the existing airflow code. To achieve that, you need to use https://github.com/howardyoo/airflow_otel_provider which has to generate the same trace id and span id as airflow otherwise the spans won't be properly associated. This isn't easily maintainable and it also makes it hard for people that are familiar with otel but new to airflow, to start using the feature.
These are some references to OpenTelemetry docs
https://opentelemetry.io/docs/concepts/context-propagation/ https://opentelemetry.io/docs/languages/python/propagation/ https://www.w3.org/TR/trace-context/
Implementation description
For the dagrun and ti spans, I've reused the attributes from the original implementation. I found in my testing that the span timings were occasionally off. That was due to the fact that the time conversion to nanoseconds wasn't considering that the timezone isn't always present.
To be able to propagate the context of a span, and use it to create subspan, the span must be active.
For example, for a dag run, the span can't be created at the end but
- it needs to start with the run
- stay active so that the context can be captured and propagated
- end once the run also ends
Same goes for a task and any sub-spans.
With this approach, we can use the new otel methods for creating spans directly from under a task without the need of the airflow_otel_provider. These spans will be children of the task span.
Check test_otel_dag.py for an example of usage.
In scheduler HA, multiple schedulers might process a dag, that is running for a long time. The OpenTelemetry philosophy is that spans can't be shared between processes. The process that starts a span should also be the only one that can end it.
To workaround that while staying consistent with the otel spec, each scheduler will keep track of the spans that it starts and will also be solely responsible for ending them.
Let's assume that we are expecting these spans to be created and exported
dag_span
|_ task1_span
|_ task1_sub1_span
|_ task1_sub1_sub_span
|_ task1_sub2_span
|_ task2_span
With the existing airflow implementation, the spans will be
dag_span
|_ task1_span
|_ task2_span
Here are some possible scenarios and how the dag spans will look like, assuming that the dag creates the same spans as above
-
Scheduler1 processes a dag from start to finish.
dag_span |_ task1_span |_ task1_sub1_span |_ task1_sub1_sub_span |_ task1_sub2_span |_ task2_span -
Scheduler1 starts a dag and sometime during the run, Scheduler2 picks up the processing and finishes the dag.
Through a db update, Scheduler2 will notify whoever scheduler started the spans, to end them.
dag_span |_ task1_span |_ task1_sub1_span |_ task1_sub1_sub_span |_ task1_sub2_span |_ task2_span -
Scheduler1 starts a dag and while the dag is running, exits gracefully. Scheduler2 picks up the processing and finishes the dag while the initial scheduler is no longer running.
Scheduler1 will end all active spans that it has a record of and update the db, while exiting. Scheduler2 will create continued spans. This will result in 2 or more spans in place of a long span, but the total duration will be the same. The continued span will be the new parent.
dag_span |_ task1_span |_ scheduler_exits_span |_ new_scheduler_span |_ dag_span_continued |_ task1_span_continued |_ task1_sub1_span |_ task1_sub1_sub_span |_ task1_sub2_span |_ task2_span -
Scheduler1 starts a dag and while the dag is running, exits forcefully. Scheduler2 picks up the processing and finishes the dag while the initial scheduler is no longer running.
Scheduler1 didn't have time to end any spans. Once the new scheduler picks up that the scheduler that started the spans, is unhealthy, it will recreate the lost spans.
dag_span_recreated |_ task1_span_recreated |_ task1_sub1_span |_ task1_sub1_sub_span |_ task1_sub2_span |_ task2_span
Testing
- Added a new unit test for the
otel_tracermethods and updated the existing ones - Added an integration test for each possible scenario
- Updated the existing tests to handle the changes
- Tested the changes manually with a
PythonVirtualenvOperatorwithout issues
The integration test can be used with otel and jaeger as well. To do that, follow the steps on this comment.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points:
- Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
- In case of a new feature add useful documentation (in docstrings or in
docs/directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it. - Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
- Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
- Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
- Be sure to read the Airflow Coding style.
- Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack
I'm confirming, but otel traces might have been experimental still, and if that's the case we are free to change them for a good reason, and your description certainly sounds like one!
If I may, can you add some indication to the title that this is related to OTel Traces specifically? We also have OTel Metrics implemented and it would be nice to minimize confusion.
@ferruzzi I adjusted the title. The only change in this patch that is related to metrics, it's https://github.com/apache/airflow/pull/43941/files#diff-1cca954ec0be1aaf2c212e718c004cb0902a96ac60043bf0c97a782dee52cc32R85-R86
If you think that it's out of scope, then I can remove it.
My initial approach wasn't considering scheduler HA. I've updated the patch accordingly.
There have been two main challenges
- Opentelemetry spans are designed so that only the process that starts them, can end them
- The span objects can't be shared or stored to a db
- The airflow philosophy for scheduler HA is that the only shared state between multiple schedulers is the db
- It is very common that one scheduler starts a dag (also starts the span) and another scheduler finishes the dag (should end the span)
To avoid breaking things, each scheduler will have a list of the spans that it started and will be solely responsible for ending them. We will save two new attributes on the dagrun table and the ti table.
The new columns will be
context_carrier- this is used for propagating the context and creating sub spans
span_status- this is keeping track of the span status and notifying each scheduler of how to handle the span
Possible scenarios with 2 scheduler (this can easily work with more)
- Scheduler1 starts a dag and finishes processing it
- This is straight forward
dag span |_ task1 span |_ task1 sub span |_ task2 span
- This is straight forward
- Scheduler1 starts a dag while another scheduler finishes it
- The visualized result will be the same as scenario 1
dag span |_ task1 span |_ task1 sub span |_ task2 span - scheduler2 will set the span status to
SHOULD_ENDand scheduler1 will end the spans during the next loop iteration.
- The visualized result will be the same as scenario 1
- Scheduler1 starts a dag, exits gracefully and another scheduler picks up the dag and finishes it
- Since scheduler1 exits gracefully, e.g. with a
SIGTERMorSIGINTsignal, we can end the spans and update the status - scheduler2 will create a continued span, for each prematurely ended span
original |----| continued |-------| dag span |_ task1 span |_ scheduler exited span |_ new scheduler span |_ dag span (continued suffix) |_ task1 span (continued suffix) |_ task1 sub span |_ task2 span
- Since scheduler1 exits gracefully, e.g. with a
- Scheduler1 starts a dag, exits forcefully and another scheduler picks up the dag and finishes it
- In this case scheduler1 exited with active spans. Airflow has a standard way of declaring a scheduler unhealthy and also stores the id of the scheduler job that started a dagrun or a ti
- If the scheduler that started the dagrun or the ti, is unhealthy, we can recreate the lost spans
- If a task is active and running and its part that hasn't been executed yet, is supposed to create some sub-spans, then it will use the new recreated span as a parent
dag span (recreated suffix) |_ task1 span (recreated suffix) |_ task1 sub span |_ task2 span - If a task has finished and it created some sub-spans, then those spans are referencing a parent span that is lost along with the unhealthy scheduler. The only way to get the sub-spans is to re-run the task. In that case, we are recreating the span of the task itself but we can't recreate the sub-spans without rerunning the task
dag span (recreated suffix) |_ task1 span (recreated suffix) |_ task2 span
Note that with the current airflow otel implementation, you can't create sub-spans from tasks. All dagrun spans are visualized like this
dag span
|_ task1 span
|_ task2 span
I'll do some testing to make sure nothing has been broken from merging with main and I'll move the PR out of draft.
Hi @ashb,
- I've addressed all your comments,
- I created a new migration file,
- I added some unit tests for the scheduler changes
- I fixed 2 bugs that I came across while making the changes.
Please take a look when you get a chance and let me know how it looks.
I'll push some commits for moving the dagrun/scheduler span changes to new methods, to make the code more readable.
I think this will work, though the main question I have is: if we can re-create the span for an unhealthy scheduler, couldn't we do that for healthy schedulers too? That way we wouldn't need to store active_spans in memory at all, and nor would we need to store span status in the DB?
This will get us to the old implementation where we can't create spans from under tasks.
If a task has already been run and it created spans, then these spans will be lost when we recreate the task span. This is happening for a few reasons
- the already exported spans are referencing the old span as a parent
- we are not setting the traceid and spanid deterministically but we are letting the otel sdk to handle that
- the task sub spans are getting the parent with context propagation
The only way to get the spans back, will be to rerun the task.
I'm aware this is waiting on my for a review, I will look at this when I can, but I've got some critical bits of AIP-72 (Task Execution Interface) to land first.
I'm aware this is waiting on my for a review, I will look at this when I can, but I've got some critical bits of AIP-72 (Task Execution Interface) to land first.
hey @ashb ! just checking in—do you have a rough idea of when you might be able to take a look? we’re really excited about this change and appreciate your time!
I got most of the way though this, I'll finish reviewing tomorrow. Overall looks like a sound plan, very well documented and commented.
Alright, I apologize for all the individual comments, I wasn't able to carve out a big block of time to get it all in one go, so I made my way through when I could. Overall it looks great and I love how detailed your tests are.
@ferruzzi Thanks for the review!
I've merged the branch with main. I used to pass the carrier as a parameter to the task command so that it can be accessed from within the dag but I can see that the task_run method has been removed. Once I figure out a new approach for this and fix any failures that come from the update, I'll start addressing your comments.
Oh amazing, glad to see this was updated a bit.
I plan to re-review and hopefully merge next week, particularly in light of the Task Execution api changes
@ashb @ferruzzi Thank you for all the help!
Although the PR is in a good state and the CI is green, I'm going to have to revisit the changes on the base_executor.py.
Due to the task api changes, I'm not convinced that creating spans under a task work as expected. Also, I used to set the start_time for task spans to the queued_dttmwhich isn't available anymore at the time of initializing a workload TaskInstance.
My new integration tests don't run on the CI but when I run them locally, task1 fails. I suspect that it's related to the task api changes.
Hello @ferruzzi @ashb, the PR is ready for review. My tests are all passing. I've run them at least 10 times in a row.
From what I understand, the new task sdk has removed direct db access for tasks. I've gone through most of the new code but I'm not entirely familiar with the full extend of it. There might be some db related changes in this patch that aren't needed anymore. If you see something that is redundant, please point it out.
BTW, I plan to create a follow up PR for cleanup but I would like to get this merged before new breaking changes end up in main. And I think that it's quite extensive as it stands right now.
Just the known uv+lowerdeps issue. Lets merge this!
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.
And a hell of a first contribution too!
Thank you @ashb and @ferruzzi! I couldn't have gotten this through without you.
Indeed. Thanks for doing it. It's great that we have now way "better" OTEL integration in place.