dag-factory icon indicating copy to clipboard operation
dag-factory copied to clipboard

Deserialise arbitrary types in python

Open dinigo opened this issue 3 years ago • 3 comments

Most part of params that the DAG and the Operators and Sensors underneath take are basic/raw types (string, int, bool, float) and basic objects (lists, dicts). But from time to time we require some python object. For example:

An example for the ExternalTaskSensor

example_external_task_sensor:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    external_task_sensor:
      operator: airflow.sensors.external_task.ExternalTaskSensor
      external_dag_id: example_upstream_dependency_dag
      external_task_id: example_upstream_dependency_task
      execution_delta: !!python/object/apply:datetime.timedelta [0, 300]

And also an example for deserialising a reference to the callable straight from the yaml, lets suppose we are instantiating this yaml from a file main_dag_file.py that also contains a python function we want to call (so that it belongs to the PYTHONPATH):

# main_dag_file.py
def add_two_numbers(first_operator: int, second_operator: int) -> int:
  return first_operator + second_operator
example_call_python_code:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    example_python_task:
      operator: airflow.operators.python.PythonOperator
      python_callable: !!python/name:main_dag_file.add_two_numbers
      op_wkargs:
        first_operator: 3
        second_operator: 2

In addition sometimes it's useful to both:

  • Have your DAG as a config file with it's jinja templating ready for airflow.
  • Have a render engine on top to be able to make references, modify strings, pass function return as a param for an operator.
# main_dag_file.py
def decide_executor() -> str:
  environment = os.getenv('EXECUTION_ENVIRONMENT')
  if environment == 'prod':
    executor = 'kubernetes.KubernetesPodOperator'
  else:
    executor = 'bash.BashOperator'
  return 'airflow.operators.' + executor
# pass environment variables to the function
example_call_python_code:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    example_python_task:
      operator: !!python/object/main_dag_file.decide_executor()
      some_param: {{ var.value.my_variable }}

This can also be solved by being able to add python elements to the yaml file. This is a fine line. Config files should be language agnostic, I do know, that's why I open this Issue for the community to have their call too.

To implement this we should use yaml.load instead of the yaml.load_safe. As we are the owners of the code there's no risk on doing so. This feature (load_safe) is in place to prevent injection attacks.

Also, this feature will render unnecessary the current way of attaching the callable to a PythonOperator, and it complies with the exact operator params.

What are your thoughts?

dinigo avatar Sep 06 '21 14:09 dinigo

This sounds like a great idea to me. 👍

ajbosco avatar Sep 07 '21 13:09 ajbosco

@dinigo have you ever started writing code for this? This would be very useful indeed.

I'm facing similar situation with kubernetes pod operator where all arguments are kubernetes python api objects

vitoravancini avatar Jan 11 '22 12:01 vitoravancini

@vitoravancini, We were using our own coupled config-to-DAG library. For standardization purposes we decided to go with a FOSS solution such as dag-factory one (thanks @ajbosco !).

I suspect it only requires changing where config is loaded

            config: Dict[str, Any] = yaml.load(
                stream=open(config_filepath, "r", encoding="utf-8"),
-               Loader=yaml.FullLoader,
+               Loader=yaml.UnsafeLoader,
            )

This way you can make use of types other than the mandatory ones in yaml.

Mi strong opinion is that, since you are the owner of the config, you can use the said "UnsafeLoader". In my opinion this name is not accurate, since most part of people managing yaml files are not doing so in an unsupervised way from the network. So this reference to a possible injection attack is out of context.

If you where so kind as to create a PR and write a couple of tests following the above examples that will create a working env for us to collaborate

dinigo avatar Jan 13 '22 10:01 dinigo