dag-factory
dag-factory copied to clipboard
Data access in task_name.output when expanding
Let's say we have two tasks, extract and load where extract returns a list of json objects say, [{"foo":"dan","bar":"shc"},{"foo":"alf","bar":"shc"}]
, and load expands on the output of extract
tasks:
extract
operator: airflow.operators.python.PythonOperator
python_callable_name: extract_fn
python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py
load:
operator: airflow.operators.python.PythonOperator
python_callable_name: load_fn
python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py
expand:
input: extract.output
dependencies: [extract]
This works, but it's not ideal. The solution is sort of clunky and the internals are obscured-- I have to make the signature of the load task callable load_fn(input)
, one argument and unpack the values in the function. I'm proposing this functionality:
tasks:
extract
operator: airflow.operators.python.PythonOperator
python_callable_name: extract_fn
python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py
load:
operator: airflow.operators.python.PythonOperator
python_callable_name: load_fn
python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py
expand:
foo: extract.output.foo
bar: extract.output.bar
dependencies: [extract]
so that I can leave the signature of the load callable load_fn(foo,bar)
.