dag-factory
dag-factory copied to clipboard
Deserialise arbitrary types in python
Most part of params that the DAG and the Operators and Sensors underneath take are basic/raw types (string, int, bool, float) and basic objects (lists, dicts). But from time to time we require some python object. For example:
- ExternalTaskSensor execution_delta is datetime.timedelta type.
- PythonOperator python_callable is of Callable type
An example for the ExternalTaskSensor
example_external_task_sensor:
default_args:
start_date: 2021-09-01
schedule_interval: @daily
tasks:
external_task_sensor:
operator: airflow.sensors.external_task.ExternalTaskSensor
external_dag_id: example_upstream_dependency_dag
external_task_id: example_upstream_dependency_task
execution_delta: !!python/object/apply:datetime.timedelta [0, 300]
And also an example for deserialising a reference to the callable straight from the yaml, lets suppose we are instantiating this yaml from a file main_dag_file.py
that also contains a python function we want to call (so that it belongs to the PYTHONPATH):
# main_dag_file.py
def add_two_numbers(first_operator: int, second_operator: int) -> int:
return first_operator + second_operator
example_call_python_code:
default_args:
start_date: 2021-09-01
schedule_interval: @daily
tasks:
example_python_task:
operator: airflow.operators.python.PythonOperator
python_callable: !!python/name:main_dag_file.add_two_numbers
op_wkargs:
first_operator: 3
second_operator: 2
In addition sometimes it's useful to both:
- Have your DAG as a config file with it's jinja templating ready for airflow.
- Have a render engine on top to be able to make references, modify strings, pass function return as a param for an operator.
# main_dag_file.py
def decide_executor() -> str:
environment = os.getenv('EXECUTION_ENVIRONMENT')
if environment == 'prod':
executor = 'kubernetes.KubernetesPodOperator'
else:
executor = 'bash.BashOperator'
return 'airflow.operators.' + executor
# pass environment variables to the function
example_call_python_code:
default_args:
start_date: 2021-09-01
schedule_interval: @daily
tasks:
example_python_task:
operator: !!python/object/main_dag_file.decide_executor()
some_param: {{ var.value.my_variable }}
This can also be solved by being able to add python elements to the yaml file. This is a fine line. Config files should be language agnostic, I do know, that's why I open this Issue for the community to have their call too.
To implement this we should use yaml.load
instead of the yaml.load_safe
. As we are the owners of the code there's no risk on doing so. This feature (load_safe) is in place to prevent injection attacks.
Also, this feature will render unnecessary the current way of attaching the callable to a PythonOperator, and it complies with the exact operator params.
What are your thoughts?
This sounds like a great idea to me. 👍
@dinigo have you ever started writing code for this? This would be very useful indeed.
I'm facing similar situation with kubernetes pod operator where all arguments are kubernetes python api objects
@vitoravancini, We were using our own coupled config-to-DAG library. For standardization purposes we decided to go with a FOSS solution such as dag-factory
one (thanks @ajbosco !).
I suspect it only requires changing where config is loaded
config: Dict[str, Any] = yaml.load(
stream=open(config_filepath, "r", encoding="utf-8"),
- Loader=yaml.FullLoader,
+ Loader=yaml.UnsafeLoader,
)
This way you can make use of types other than the mandatory ones in yaml.
Mi strong opinion is that, since you are the owner of the config, you can use the said "UnsafeLoader". In my opinion this name is not accurate, since most part of people managing yaml files are not doing so in an unsupervised way from the network. So this reference to a possible injection attack is out of context.
If you where so kind as to create a PR and write a couple of tests following the above examples that will create a working env for us to collaborate