Access specific values within an `XCom` value using Taskflow API
Description
Currently the output property of operators doesn't support accessing a specific value within an XCom but rather the entire XCom value. Ideally the behavior of calling the XComArg via the output property would function the same as the task_instance.xcom_pull() method in which a user has immediate access the XCom value and can directly access specific values in that XCom.
For example, in the example DAG in the Apache Beam provider, the jobId arg in the DataflowJobStatusSensor task is a templated value using the task_instance.xcom_pull() method and is then accessing the dataflow_job_id key within the XCom value:
start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_dataflow_runner_async",
runner="DataflowRunner",
py_file=GCS_PYTHON_DATAFLOW_ASYNC,
pipeline_options={
'tempLocation': GCS_TMP,
'stagingLocation': GCS_STAGING,
'output': GCS_OUTPUT,
},
py_options=[],
py_requirements=['apache-beam[gcp]==2.26.0'],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}',
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)
wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location='us-central1',
)
There is no current, equivalent way to directly access the dataflow_job_id value in same manner using the output property.
Using start_python_job_dataflow_runner_async.output["dataflow_job_id"] yields an equivalent task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', key='dataflow_job_id'.
Or even start_python_job_dataflow_runner_async.output["return_value"]["dataflow_job_id"] yields the same result: task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', key='dataflow_job_id'.
It seems the only way to get the desired behavior currently is to hack around the __str__ method that's available with XComArg:
start_python_job_dataflow_runner_async_output = str(start_python_job_dataflow_runner_async.output).strip("{ }")
wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{{{ {start_python_job_dataflow_runner_async_output}['dataflow_job_id'] }}}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location='us-central1',
)
This approach is not elegant, straightforward, nor user-friendly.
Use case / motivation
It's functionally intuitive for users to have direct access to the specific values in an XCom related to the XComArg via the Taskflow API like the classic xcom_pull() method. Ideally using an operator's .output property and the xcom_pull() method would behave the same way when needing to pass the actual values between operators.
Are you willing to submit a PR? I would love to but I would certainly need some guidance on nuances here.
Related Issues https://github.com/apache/airflow/issues/10285
Thanks for opening your first issue here! Be sure to follow the issue template!
Hmmm, the "native" way I would like this to work is:
my_op.output['dataflow_job_id']
# Ideal, would be equivalent. to my_op.xcom_pull(key='return_value')['dataflow_job_id']
But sadly XComArg already has a __getitem__ to override the key, which means it is actually
my_op.output['dataflow_job_id']
# Ideal, would be equivalent. to my_op.xcom_pull(key="dataflow_job_id"]
If we hadn't yet released it, I would have suggested that the facility to change the key should have been done by __getattr__, yielding:
my_op.output.some_xcom_key # my_op.xcom_pull(key='some_xcom_key')
my_op.output['attr'] # my_op.xcom_pull(key='return_value')['attr']
But that ship is sailed, so I'm not sure how to proceed nicely.
I guess part of the confusion is some operators have an xcom return_key that is a dict, and others return separate xcom keys directly.
But wouldn’t my_op.output.some_xcom_key also be problematic if the xcom return value is a custom object? Although indeed there would have been significantly fewer edge cases.
At this point, maybe the best solution would be to deprecate accessing my_op.output directly, and require doing somethign else to get the default return value, say my_op.output[None] or my_op.return_value?
At this point, maybe the best solution would be to deprecate accessing
my_op.outputdirectly, and require doing somethign else to get the default return value, saymy_op.output[None]ormy_op.return_value?
I'm kind of on a big kick to reduce "boilerplate" in DAGs, so while both of these would be explicit, it's longer and a bit "un-pythonic".
Not sure.
Having this feature would be great! 👏 Can't wait for this!
It would also be great if there was a way of NOT creating the dependencies between tasks automatically. For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
It would also be great if there was a way of NOT creating the dependencies between tasks automatically. For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
Without a dependency then you might try to get a value out of XCom that hasn't been written yet!
It would also be great if there was a way of NOT creating the dependencies between tasks automatically. For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
Without a dependency then you might try to get a value out of XCom that hasn't been written yet!
Makes sense. I retire my comment then.
I think I got used to see how DAGs are now rendered with this new XComs/TaskFlow API (using .output) . Before Airflow wasn't creating edges between the tasks that were using the values.
Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI.
Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI
@ricardogaspar2 Sounds useful -- do you have any screenshot you could show?
Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI
@ricardogaspar2 Sounds useful -- do you have any screenshot you could show?
Sure thing. The screenshot below was grabbed from this talk: https://www.youtube.com/watch?v=D_1VJapCscc&t=1055s

There is also some info here: https://dagster.io/blog/dagster-airflow
@ash, @josh-fell and I had a chat about this. One approach that may work out is having a output.return_value, where [] get recorded in the XcomArg instead of changing the key.
Basically essentially output.return_value['outer']['inner'] == XComArg(key="return_value", path=["outer", "inner"]).
(@ash please correct me if I misunderstood).
That's correct. To expand on it a bit more.
Right now on XComArg we have a __getitem__ which changes they key we use. I'm proposing we add a __getattr__ to XComArg that changes the "mode" that XCom arg operates in.
In short something like this (self.mode is a new field):
def __getitem__(self, item: str) -> XComArg:
"""Implements xcomresult['some_result_key']"""
if not isinstance(item, str):
raise ValueError(f"XComArg only supports str lookup, received {type(item).__name__}")
if self.mode == Mode.KEY:
return PlainXComArg(operator=self.operator, key=item)
else:
return PlainXComArg(operator=self.operator, key=item, path=self.path + [item])
def __getattr__(self, key: str) -> XComArg:
if self.mode != Mode.KEY:
raise RuntimeError(f'Cannot get an attribute ({key!r}) on XComArg except at the top level")
return PlainXComArg(operator=self.operator, key=key, path=[], mode=Mode.ITEM)
And then some changes in the resolve function go get the right value out at render time. And double check it'll all work right in the zip/map path for XComArg too.
Can someone tell me what the more elegant alternative is? I just want to expand over a list that's locked in an xcom multiple-output dict.
@task
def task_get_item_from_dict_because_airflow(dictionary: dict, key: str):
return dictionary[key]