airflow
                                
                                 airflow copied to clipboard
                                
                                    airflow copied to clipboard
                            
                            
                            
                        Make Databricks operators' `json` parameter compatible with XComs, Jinja expression values
Body
A few Databricks operators exist which have a json parameter that can be a JSON object which contains any number of API parameters. In the constructors of these operators, the json value can be added to be other parameters that can be passed to the operator like name, tags, etc.
Since json is a templated field, attempting to modify it in this way will fail/not work as expected if the input arg is a string (e.g. "{{ var.json....}}" or an XComArg (meaning it's an output of a previous task). Template fields are not rendered until just before the execute method is called.
To illustrate the point, let's use this example DAG where we define the json arg in a previous task and use its output:
from __future__ import annotations
from pendulum import datetime
from typing import TYPE_CHECKING, Sequence
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
if TYPE_CHECKING:
    from airflow.utils.context import Context
class DatabricksCreateJobsOperator(BaseOperator):
    template_fields: Sequence[str] = ("json", "databricks_conn_id")
    def __init__(
        self,
        *,
        json: dict | None = None,
        name: str | None = None,
        tags: dict[str, str] | None = None,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.json = json or {}
        if name is not None:
            self.json["name"] = name
        if tags is not None:
            self.json["tags"] = tags
    def execute(context: Context) -> None:
        pass
@dag(start_date=datetime(2023, 1, 1), schedule=None)
def derived_template_fields():
    @task
    def push_json() -> dict[str, str]:
        return {"key1": "val1", "key2": "val2"}
    json = push_json()
    DatabricksCreateJobsOperator(
        task_id="create_job_w_json", json=json, name="some_name", tags={"key3": "value3"}
    )
derived_template_fields()
DAG parsing fails with:
Running: airflow dags reserialize
[2023-08-31T14:29:57.796+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:29:57.816+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:29:58.533+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:29:58.615+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/airflow/dags/derived_template_fields.py", line 50, in <module>
    derived_template_fields()
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
    f(**f_kwargs)
  File "/usr/local/airflow/dags/derived_template_fields.py", line 41, in derived_template_fields
    DatabricksCreateJobsOperator(
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
    self.json["name"] = name
    ~~~~~~~~~^^^^^^^^
TypeError: 'PlainXComArg' object does not support item assignment
Even if we change the json arg assignment to use the classic XCom Jinja template approach (i.e. json = "{{ ti.xcom_pull(task_ids='push_json') }}"), the DAG fails to parse:
Running: airflow dags reserialize
[2023-08-31T14:32:01.553+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:32:01.574+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:32:02.341+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:32:02.415+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/airflow/dags/derived_template_fields.py", line 51, in <module>
    derived_template_fields()
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
    f(**f_kwargs)
  File "/usr/local/airflow/dags/derived_template_fields.py", line 42, in derived_template_fields
    DatabricksCreateJobsOperator(
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
    self.json["name"] = name
    ~~~~~~~~~^^^^^^^^
TypeError: 'str' object does not support item assignment
It would be best to move modifying json (and generally any template field) to the execute method instead.
Currently this impacts the following operators:
- DatabricksCreateJobsOperator
- DatabricksSubmitRunOperator
- DatabricksRunNowOperator
Committer
- [X] I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@josh-fell I'm happy to contribute with this one. Can this be assigned to me?
@thcidale0808 Absolutely! All yours.
I can confirm that this is still happening at version 2.8.2. I tried to workaround the issue using docker images with previous versions but the error persists. I went back many many versions, at least until version 2.3.0, and the issue persists. Thanks for the support, hope you can fix this soon.
@thcidale0808 Are you still working on this one?
I can take this over if @thcidale0808 is not working on it anymore. Could you please assign this to me @josh-fell?
All yours @boraberke!