airflow icon indicating copy to clipboard operation
airflow copied to clipboard

DatabricksSQLOperator struggles with parsing the data

Open aru-trackunit opened this issue 1 year ago • 7 comments

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

The error has not been present in version apache-airflow-providers-databricks==4.7.0 I upgraded to the latest and it is presentapache-airflow-providers-databricks==6.0.0

Apache Airflow version

2.8.0

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

[2024-01-16, 18:54:30 CET] {client.py:200} INFO - Successfully opened session XXXXXX-4c73-1765-b68b-b96c52b08745
[2024-01-16, 18:54:30 CET] {sql.py:450} INFO - Running statement: Select count(*) FROM catalog.schema.table_test parameters: None
[2024-01-16, 18:54:30 CET] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute
    output = hook.run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 256, in run
    result = self._make_common_data_structure(raw_result)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 286, in _make_common_data_structure
    rows_object = namedtuple("Row", rows_fields)  # type: ignore[misc]
  File "/usr/local/lib/python3.10/collections/__init__.py", line 373, in namedtuple
    raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'count(1)'

For the investigation reasons I edited the ./hooks/databricks_sql.py file and added some prints: in the method _make_common_data_structure result var is a type list and value [Row(count(1)=9714)] row_fields value: <Row('count(1)')>

What you think should happen instead

No response

How to reproduce

test = DatabricksSqlOperator(
        task_id="count_query",
        databricks_conn_id="databricks-sp",
        sql_endpoint_name="endpoint_name",
        sql="SELECT count(*) FROM catalog.schema.table_test;"
    )

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

aru-trackunit avatar Jan 17 '24 10:01 aru-trackunit

There is also another ticket that is related to the similar code path: https://github.com/apache/airflow/issues/36839

aru-trackunit avatar Jan 17 '24 11:01 aru-trackunit

@Joffreybvn any idea?

bolkedebruin avatar Jan 22 '24 10:01 bolkedebruin

A namedtuple uses the field name (count(1) in this case) to create attributes. But attributes names can't have special characters, like parenthesis. Two solutions:

  • Editing the field name to exclude those characters. You'll get something like count1 instead.
  • Going for tuple instead of namedtuple

I guess the second choice is too early. And we switched from tuple to namedtuple after previous discussions. Thus I'll open a PR with the first choice

Joffreybvn avatar Jan 22 '24 11:01 Joffreybvn

@Joffreybvn Just retested with Databricks provider 6.1.0 on airflow 2.8.1 and the issue still persists with different stacktrace:

[2024-01-29, 13:41:39 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:39 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:39 CET] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-01-29, 13:41:39 CET] {taskinstance.py:2191} INFO - Executing <Task(DatabricksSqlOperator): count> on 2024-01-29 12:41:35+00:00
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:60} INFO - Started process 118 to run task
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'task_1', 'count', 'manual__2024-01-29T13:41:35+01:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/dag-wn-equipment.py', '--cfg-path', '/tmp/tmpp4t8f52h']
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:88} INFO - Job 5: Subtask count
[2024-01-29, 13:41:39 CET] {task_command.py:423} INFO - Running <TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [running]> on host 33e1fb1e4ed5
[2024-01-29, 13:41:39 CET] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='team_analytics' AIRFLOW_CTX_DAG_ID='task_1' AIRFLOW_CTX_TASK_ID='count' AIRFLOW_CTX_EXECUTION_DATE='2024-01-29T12:41:35+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-29T13:41:35+01:00'
[2024-01-29, 13:41:39 CET] {sql.py:276} INFO - Executing: SELECT count(*) FROM catalog.schema.test_table;
[2024-01-29, 13:41:39 CET] {base.py:83} INFO - Using connection ID 'tu-databricks-sp' for task execution.
[2024-01-29, 13:41:39 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:39 CET] {databricks_base.py:223} INFO - Existing Service Principal token is expired, or going to expire soon. Refreshing...
[2024-01-29, 13:41:39 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:40 CET] {client.py:200} INFO - Successfully opened session 01eebea3-bfcf-14ed-8b50-a14cc9a61a35
[2024-01-29, 13:41:40 CET] {sql.py:450} INFO - Running statement: SELECT count(*) FROM catalog.schema.test_table, parameters: None
[2024-01-29, 13:41:52 CET] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute
    output = hook.run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 254, in run
    raw_result = handler(cur)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/hooks/sql.py", line 91, in fetch_all_handler
    return cursor.fetchall()
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 670, in fetchall
    return self.active_result_set.fetchall()
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 944, in fetchall
    return self._convert_arrow_table(self.fetchall_arrow())
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 884, in _convert_arrow_table
    res = df.to_numpy(na_value=None)
  File "/home/airflow/.local/lib/python3.10/site-packages/pandas/core/frame.py", line 1981, in to_numpy
    result = self._mgr.as_array(dtype=dtype, copy=copy, na_value=na_value)
  File "/home/airflow/.local/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1702, in as_array
    arr[isna(arr)] = na_value
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
[2024-01-29, 13:41:52 CET] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=task_1, task_id=count, execution_date=20240129T124135, start_date=20240129T124139, end_date=20240129T124152
[2024-01-29, 13:41:52 CET] {standard_task_runner.py:107} ERROR - Failed to execute job 5 for task count (int() argument must be a string, a bytes-like object or a real number, not 'NoneType'; 118)

aru-trackunit avatar Jan 29 '24 12:01 aru-trackunit

I cannot reproduce this error. Could you give me more info about your data ? Maybe a screenshot (like below) of what you have on the databrick's UI ?

MicrosoftTeams-image

Joffreybvn avatar Feb 04 '24 19:02 Joffreybvn

Using pandas-2.1.4 solved the issue for me (Airflow 2.8.1)

amoralca16 avatar Feb 05 '24 10:02 amoralca16

We have the same issue on Airflow 2.7.1 and 2.8.1, and both on provider 6.1.0 and 6.2.0. Upgrading pandas to 2.1.4 did not solve the issue for us. Downgrading the provider to 5.0.1 however did solve it. In 5.0.1, the result of a merge statement looks like this. Which fails to render (same DAG) on provider version 6.x image In the Databricks UI, the results look similar: image

w0ut0 avatar Feb 21 '24 08:02 w0ut0

Retested with newly released airflow

  • airflow 2.8.2
  • databricks provider 6.2.0
  • pandas 2.2.1 the issue still persists

aru-trackunit avatar Feb 26 '24 11:02 aru-trackunit

@aru-trackunit - seems that @Joffreybvn cannot reproduce the issue - can you please provide a bit more information as he asked in https://github.com/apache/airflow/issues/36838#issuecomment-1925889496 ?

potiuk avatar Mar 09 '24 16:03 potiuk

I discussed it with him on airflow's slack but it's a good idea to have a visibility over it in here as well. I have attached a DatabricksSqlOperator that is ready to test against. SQL returns one row as attached on the screenshot Screenshot 2024-03-11 at 16 35 57

test = DatabricksSqlOperator(
        task_id="count_query",
        databricks_conn_id="databricks-sp",
        sql_endpoint_name="endpoint_name",
        sql="SELECT count(*) FROM catalog.schema.table_test;"
    )

aru-trackunit avatar Mar 11 '24 15:03 aru-trackunit

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Mar 26 '24 00:03 github-actions[bot]

I think it's related to this issue https://github.com/databricks/databricks-sql-python/issues/326

aru-trackunit avatar Mar 26 '24 08:03 aru-trackunit

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Apr 12 '24 00:04 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Apr 19 '24 00:04 github-actions[bot]