astronomer-providers
astronomer-providers copied to clipboard
DatabricksSubmitRunOperatorAsync fails with `KeyError: 'state'`
Describe the bug While trying to use DatabricksSubmitRunOperatorAsync on an existing Databricks Cluster, encountered this error:
{taskinstance.py:1641} ERROR - Trigger failed:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/triggerer_job.py", line 296, in cleanup_finished_triggers
result = details["task"].result()
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/triggerer_job.py", line 360, in run_trigger
async for event in trigger.run():
File "/usr/local/lib/python3.9/site-packages/astronomer/providers/databricks/triggers/databricks.py", line 72, in run
run_state = await hook.get_run_state_async(self.run_id)
File "/usr/local/lib/python3.9/site-packages/astronomer/providers/databricks/hooks/databricks.py", line 44, in get_run_state_async
state = response["state"]
KeyError: 'state'
If I try to use the regular non-async DatabricksSubmitRunOperator, it works just fine.
Environment Details:
- Local Airflow on
astro-runtime:5.0.8
- Field Engineering Databricks Spark Cluster
- OpenLineage Enabled
To Reproduce Steps to reproduce the behavior:
- Spin-up local Airflow Environment using
astro
with runtime 5.0.8 - Copy the following DAG:
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
from astronomer.providers.databricks.operators.databricks import DatabricksSubmitRunOperatorAsync
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
default_args = {
'owner': 'Cosmic Energy DE',
}
with DAG('test_aws'
, start_date=datetime(2022,8,18)
, catchup=False
, max_active_runs=1
, schedule_interval='@daily'
, default_args=default_args
, tags = ['transform', 'daily', 's3', 'snowflake', 'lineage'],
) as dag:
task_databricks_process=DatabricksSubmitRunOperatorAsync(
task_id="databricks_process",
json={
"existing_cluster_id" : '0810-130606-yjnn4pqp',
"notebook_task": {
"notebook_path" : "/Shared/ghcnd-stations",
"base_parameters": {
"access_key": "{{ var.value.fe_access_key }}",
"secret_key": "{{ var.value.fe_secret_key }}",
"execution_date": "{{ ds_nodash }}",
"file_name": "ghcnd-stations"
}
},
"timeout_seconds": 600
}
)
task_databricks_process
- Go to localhost:8080
- Enable the DAG and it will fail
- Replace
DatabricksSubmitRunOperatorAsync
withDatabricksSubmitRunOperator
and it will run
Expected behavior Failed Databricks Task when using the Async Operator
Screenshots N/A
Desktop (please complete the following information):
- OS: MacOS
- Browser Chrome
- 12.5.1
Smartphone (please complete the following information):
- Device: [e.g. iPhone6]
- OS: [e.g. iOS8.1]
- Browser [e.g. stock browser, safari]
- Version [e.g. 22]
Additional context Logs for failed and successful run attached herewith: SUCCESS_DatabricksSubmitRunOperator.log FAILED_DatabricksSubmitRunOperatorAsync.log
We should release 1.8.1 that contains this fix -- cc @phanikumv @rajaths010494
I tried to debug it but was unable to reproduce it.
What version of databricks sync provider you are using and what is the value wait_for_termination
param?
Also, would it be possible for you to share the trigger log?
For now, I have made the changes in DatabricksSubmitRunOperatorAsync
to propagate the trigger log to the worker. https://github.com/astronomer/astronomer-providers/pull/612
Also, we run https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/databricks/example_dags/example_databricks.py in our integration test it runs fine would you like to try it in your env?
@manmeetkaur did you get a chance to try the example DAG given above in your env?
@manmeetkaur please confirm if you got a chance to try the example DAG in your env.
Closing this as we haven't received any reply. Please feel free to re-open in case of any more issues.