airflow
airflow copied to clipboard
AIP-72: Allow pushing and pulling XCom from Task Context
Part of https://github.com/apache/airflow/issues/44481
Example DAG used:
"""
### DAG Tutorial Documentation
This DAG is demonstrating an Extract -> Transform -> Load pipeline
"""
from __future__ import annotations
import json
import textwrap
import pendulum
from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
"sdk_tutorial_dag",
default_args={"retries": 2},
description="DAG tutorial",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
def transform(**kwargs):
ti = kwargs["ti"]
order_data = ti.xcom_pull(task_ids="extract", key="order_data")
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push("total_order_value", total_value_json_string)
def load(**kwargs):
ti = kwargs["ti"]
total_order_value = ti.xcom_pull(task_ids="transform", key="total_order_value")
print(total_order_value)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
"""\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
"""\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)
load_task = PythonOperator(
task_id="load",
python_callable=load,
)
load_task.doc_md = textwrap.dedent(
"""\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)
extract_task >> transform_task >> load_task
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.