DatabricksSQLOperator struggles with parsing the data
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
The error has not been present in version apache-airflow-providers-databricks==4.7.0
I upgraded to the latest and it is presentapache-airflow-providers-databricks==6.0.0
Apache Airflow version
2.8.0
Operating System
Debian GNU/Linux 11 (bullseye)
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
[2024-01-16, 18:54:30 CET] {client.py:200} INFO - Successfully opened session XXXXXX-4c73-1765-b68b-b96c52b08745
[2024-01-16, 18:54:30 CET] {sql.py:450} INFO - Running statement: Select count(*) FROM catalog.schema.table_test parameters: None
[2024-01-16, 18:54:30 CET] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute
output = hook.run(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 256, in run
result = self._make_common_data_structure(raw_result)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 286, in _make_common_data_structure
rows_object = namedtuple("Row", rows_fields) # type: ignore[misc]
File "/usr/local/lib/python3.10/collections/__init__.py", line 373, in namedtuple
raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'count(1)'
For the investigation reasons I edited the ./hooks/databricks_sql.py file and added some prints:
in the method _make_common_data_structure
result var is a type list and value [Row(count(1)=9714)]
row_fields value: <Row('count(1)')>
What you think should happen instead
No response
How to reproduce
test = DatabricksSqlOperator(
task_id="count_query",
databricks_conn_id="databricks-sp",
sql_endpoint_name="endpoint_name",
sql="SELECT count(*) FROM catalog.schema.table_test;"
)
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
There is also another ticket that is related to the similar code path: https://github.com/apache/airflow/issues/36839
@Joffreybvn any idea?
A namedtuple uses the field name (count(1) in this case) to create attributes. But attributes names can't have special characters, like parenthesis. Two solutions:
- Editing the field name to exclude those characters. You'll get something like
count1instead. - Going for tuple instead of namedtuple
I guess the second choice is too early. And we switched from tuple to namedtuple after previous discussions. Thus I'll open a PR with the first choice
@Joffreybvn Just retested with Databricks provider 6.1.0 on airflow 2.8.1 and the issue still persists with different stacktrace:
[2024-01-29, 13:41:39 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:39 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:39 CET] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-01-29, 13:41:39 CET] {taskinstance.py:2191} INFO - Executing <Task(DatabricksSqlOperator): count> on 2024-01-29 12:41:35+00:00
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:60} INFO - Started process 118 to run task
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'task_1', 'count', 'manual__2024-01-29T13:41:35+01:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/dag-wn-equipment.py', '--cfg-path', '/tmp/tmpp4t8f52h']
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:88} INFO - Job 5: Subtask count
[2024-01-29, 13:41:39 CET] {task_command.py:423} INFO - Running <TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [running]> on host 33e1fb1e4ed5
[2024-01-29, 13:41:39 CET] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='team_analytics' AIRFLOW_CTX_DAG_ID='task_1' AIRFLOW_CTX_TASK_ID='count' AIRFLOW_CTX_EXECUTION_DATE='2024-01-29T12:41:35+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-29T13:41:35+01:00'
[2024-01-29, 13:41:39 CET] {sql.py:276} INFO - Executing: SELECT count(*) FROM catalog.schema.test_table;
[2024-01-29, 13:41:39 CET] {base.py:83} INFO - Using connection ID 'tu-databricks-sp' for task execution.
[2024-01-29, 13:41:39 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:39 CET] {databricks_base.py:223} INFO - Existing Service Principal token is expired, or going to expire soon. Refreshing...
[2024-01-29, 13:41:39 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:40 CET] {client.py:200} INFO - Successfully opened session 01eebea3-bfcf-14ed-8b50-a14cc9a61a35
[2024-01-29, 13:41:40 CET] {sql.py:450} INFO - Running statement: SELECT count(*) FROM catalog.schema.test_table, parameters: None
[2024-01-29, 13:41:52 CET] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute
output = hook.run(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 254, in run
raw_result = handler(cur)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/hooks/sql.py", line 91, in fetch_all_handler
return cursor.fetchall()
File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 670, in fetchall
return self.active_result_set.fetchall()
File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 944, in fetchall
return self._convert_arrow_table(self.fetchall_arrow())
File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 884, in _convert_arrow_table
res = df.to_numpy(na_value=None)
File "/home/airflow/.local/lib/python3.10/site-packages/pandas/core/frame.py", line 1981, in to_numpy
result = self._mgr.as_array(dtype=dtype, copy=copy, na_value=na_value)
File "/home/airflow/.local/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1702, in as_array
arr[isna(arr)] = na_value
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
[2024-01-29, 13:41:52 CET] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=task_1, task_id=count, execution_date=20240129T124135, start_date=20240129T124139, end_date=20240129T124152
[2024-01-29, 13:41:52 CET] {standard_task_runner.py:107} ERROR - Failed to execute job 5 for task count (int() argument must be a string, a bytes-like object or a real number, not 'NoneType'; 118)
I cannot reproduce this error. Could you give me more info about your data ? Maybe a screenshot (like below) of what you have on the databrick's UI ?
Using pandas-2.1.4 solved the issue for me (Airflow 2.8.1)
We have the same issue on Airflow 2.7.1 and 2.8.1, and both on provider 6.1.0 and 6.2.0.
Upgrading pandas to 2.1.4 did not solve the issue for us.
Downgrading the provider to 5.0.1 however did solve it.
In 5.0.1, the result of a merge statement looks like this. Which fails to render (same DAG) on provider version 6.x
In the Databricks UI, the results look similar:
Retested with newly released airflow
- airflow 2.8.2
- databricks provider 6.2.0
- pandas 2.2.1 the issue still persists
@aru-trackunit - seems that @Joffreybvn cannot reproduce the issue - can you please provide a bit more information as he asked in https://github.com/apache/airflow/issues/36838#issuecomment-1925889496 ?
I discussed it with him on airflow's slack but it's a good idea to have a visibility over it in here as well.
I have attached a DatabricksSqlOperator that is ready to test against. SQL returns one row as attached on the screenshot
test = DatabricksSqlOperator(
task_id="count_query",
databricks_conn_id="databricks-sp",
sql_endpoint_name="endpoint_name",
sql="SELECT count(*) FROM catalog.schema.table_test;"
)
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
I think it's related to this issue https://github.com/databricks/databricks-sql-python/issues/326
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.