astronomer-providers icon indicating copy to clipboard operation
astronomer-providers copied to clipboard

DatabricksSubmitRunOperatorAsync fails with `KeyError: 'state'`

Open manmeetkaur opened this issue 1 year ago • 4 comments

Describe the bug While trying to use DatabricksSubmitRunOperatorAsync on an existing Databricks Cluster, encountered this error:

{taskinstance.py:1641} ERROR - Trigger failed:
Traceback (most recent call last):

  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/triggerer_job.py", line 296, in cleanup_finished_triggers
    result = details["task"].result()

  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/triggerer_job.py", line 360, in run_trigger
    async for event in trigger.run():

  File "/usr/local/lib/python3.9/site-packages/astronomer/providers/databricks/triggers/databricks.py", line 72, in run
    run_state = await hook.get_run_state_async(self.run_id)

  File "/usr/local/lib/python3.9/site-packages/astronomer/providers/databricks/hooks/databricks.py", line 44, in get_run_state_async
    state = response["state"]

KeyError: 'state'

If I try to use the regular non-async DatabricksSubmitRunOperator, it works just fine.

Environment Details:

  • Local Airflow on astro-runtime:5.0.8
  • Field Engineering Databricks Spark Cluster
  • OpenLineage Enabled

To Reproduce Steps to reproduce the behavior:

  1. Spin-up local Airflow Environment using astro with runtime 5.0.8
  2. Copy the following DAG:
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

from astronomer.providers.databricks.operators.databricks import DatabricksSubmitRunOperatorAsync
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

default_args = {
    'owner': 'Cosmic Energy DE',
}

with DAG('test_aws'
        , start_date=datetime(2022,8,18)
        , catchup=False
        , max_active_runs=1
        , schedule_interval='@daily'
        , default_args=default_args
        , tags = ['transform', 'daily', 's3', 'snowflake', 'lineage'],
    ) as dag:

    task_databricks_process=DatabricksSubmitRunOperatorAsync(
        task_id="databricks_process",
        json={
            "existing_cluster_id" : '0810-130606-yjnn4pqp',
            "notebook_task": {
                "notebook_path" : "/Shared/ghcnd-stations",
                "base_parameters": {
                    "access_key": "{{ var.value.fe_access_key }}",
                    "secret_key": "{{ var.value.fe_secret_key }}",
                    "execution_date": "{{ ds_nodash }}",
                    "file_name": "ghcnd-stations"
                }
            },
            "timeout_seconds": 600
        }
    )

   task_databricks_process
  1. Go to localhost:8080
  2. Enable the DAG and it will fail
  3. Replace DatabricksSubmitRunOperatorAsync with DatabricksSubmitRunOperator and it will run

Expected behavior Failed Databricks Task when using the Async Operator

Screenshots N/A

Desktop (please complete the following information):

  • OS: MacOS
  • Browser Chrome
  • 12.5.1

Smartphone (please complete the following information):

  • Device: [e.g. iPhone6]
  • OS: [e.g. iOS8.1]
  • Browser [e.g. stock browser, safari]
  • Version [e.g. 22]

Additional context Logs for failed and successful run attached herewith: SUCCESS_DatabricksSubmitRunOperator.log FAILED_DatabricksSubmitRunOperatorAsync.log

manmeetkaur avatar Aug 29 '22 09:08 manmeetkaur

We should release 1.8.1 that contains this fix -- cc @phanikumv @rajaths010494

kaxil avatar Aug 31 '22 12:08 kaxil

I tried to debug it but was unable to reproduce it.

What version of databricks sync provider you are using and what is the value wait_for_termination param? Also, would it be possible for you to share the trigger log?

For now, I have made the changes in DatabricksSubmitRunOperatorAsync to propagate the trigger log to the worker. https://github.com/astronomer/astronomer-providers/pull/612

Also, we run https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/databricks/example_dags/example_databricks.py in our integration test it runs fine would you like to try it in your env?

pankajastro avatar Sep 01 '22 15:09 pankajastro

@manmeetkaur did you get a chance to try the example DAG given above in your env?

phanikumv avatar Sep 08 '22 08:09 phanikumv

@manmeetkaur please confirm if you got a chance to try the example DAG in your env.

phanikumv avatar Oct 10 '22 11:10 phanikumv

Closing this as we haven't received any reply. Please feel free to re-open in case of any more issues.

phanikumv avatar Nov 07 '22 11:11 phanikumv