airflow icon indicating copy to clipboard operation
airflow copied to clipboard

SQLSensor does not time out

Open funes79 opened this issue 10 months ago • 4 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

Some of the SQL Sensors from time to time does not respect the timeout interval and run the entire DAG for multiple days. The sensors we use have waiting timeout 12 hours and usually starts after midnight or early morning. Normally all sensors finish - either skips (soft fail = True) or succeeds - by afternoon. However from time to time it happen that the sensor has one or more retries, and it is running for more than 24 or 36 hours. In the last case I thought maybe the counting is from the last retry attempt, but no, because the log says it started to poke 17 hours ago.

What you think should happen instead?

The sensor should quit (time out) and skip reliably.

How to reproduce

I dont have a way to reproduce, it happens cca twice per month with 500 sensors running every day.

Operating System

mwaa

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

This is a log of the sensor what started 23.4.2024 and should time out after 12 hours, but was running until manually marked as failed on 24.4.2024.


# 1st attempt log
cannot find in Cloudwatch logs
# 2nd attempt log
[2024-04-23, 00:11:33 CEST] {{taskinstance.py:1946}} INFO - Dependencies not met for <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for retry yet but will be retried automatically. Current date is 2024-04-22T22:11:33.317570+00:00 and task will be retried at 2024-04-22T22:15:30.875998+00:00.
[2024-04-23, 00:11:33 CEST] {{local_task_job_runner.py:160}} INFO - Task is not able to be run
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 2 of 3
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-23, 00:15:51 CEST] {{standard_task_runner.py:60}} INFO - Started process 22332 to run task
[2024-04-23, 00:15:51 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4288708', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpgk9mn9v9']
[2024-04-23, 00:15:51 CEST] {{standard_task_runner.py:88}} INFO - Job 4288708: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-23, 00:15:52 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-91.eu-west-1.compute.internal
[2024-04-23, 00:15:52 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-23, 00:15:53 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 00:15:54 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-23, 00:15:55 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 00:15:58 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-23, 00:15:58 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 00:15:58 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 00:15:58 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-23, 00:15:58 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-23, 00:15:58 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 00:15:58 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 00:15:58 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-23, 00:15:58 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-23, 00:15:58 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3d917-0103-a838-0000-1e65c92bb7b6
[2024-04-23, 00:15:58 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-23, 00:15:58 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 00:15:58 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 00:15:58 CEST] {{connection.py:718}} INFO - closed
[2024-04-23, 00:15:58 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-23, 00:15:58 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-23, 00:15:58 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-23, 00:15:58 CEST] {{utils.py:39}} INFO - Now: 2024-04-23 00:15:58.613292+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe/Oslo tz. Diff: 16 hours
[2024-04-23, 00:15:58 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-23, 00:15:58 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-23, 00:15:59 CEST] {{local_task_job_runner.py:234}} INFO - Task exited with return code 0
[2024-04-23, 00:15:59 CEST] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check
... and goes until 13:04
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 2 of 3
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-23, 13:04:34 CEST] {{standard_task_runner.py:60}} INFO - Started process 3851 to run task
[2024-04-23, 13:04:34 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4385255', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpwsumf49p']
[2024-04-23, 13:04:34 CEST] {{standard_task_runner.py:88}} INFO - Job 4385255: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-23, 13:04:34 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-80.eu-west-1.compute.internal
[2024-04-23, 13:04:35 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-23, 13:04:35 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:04:35 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-23, 13:04:35 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:04:35 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-23, 13:04:35 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:04:35 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:04:35 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-23, 13:04:35 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-23, 13:04:35 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:04:35 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:04:35 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-23, 13:04:35 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-23, 13:04:35 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3dc18-0103-a828-0000-1e65c96e9756
[2024-04-23, 13:04:35 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-23, 13:04:35 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:04:35 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:04:35 CEST] {{connection.py:718}} INFO - closed
[2024-04-23, 13:04:35 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-23, 13:04:35 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-23, 13:04:35 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-23, 13:04:35 CEST] {{utils.py:39}} INFO - Now: 2024-04-23 13:04:35.788770+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe/Oslo tz. Diff: 28 hours
[2024-04-23, 13:04:35 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-23, 13:04:35 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-23, 13:04:35 CEST] {{local_task_job_runner.py:234}} INFO - Task exited with return code 0
[2024-04-23, 13:04:36 CEST] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check


# this is the 3rd attempt log:
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 3 of 3
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-23, 13:21:44 CEST] {{standard_task_runner.py:60}} INFO - Started process 27489 to run task
[2024-04-23, 13:21:44 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4385455', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpul8_47ot']
[2024-04-23, 13:21:44 CEST] {{standard_task_runner.py:88}} INFO - Job 4385455: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-23, 13:21:44 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-91.eu-west-1.compute.internal
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-23, 13:21:44 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:21:44 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-23, 13:21:44 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:21:45 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-23, 13:21:45 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:21:45 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:21:45 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-23, 13:21:45 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-23, 13:21:45 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:21:45 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:21:45 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-23, 13:21:45 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-23, 13:21:45 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3dc29-0103-a828-0000-1e65c96f367a
[2024-04-23, 13:21:45 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-23, 13:21:45 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:21:45 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:21:45 CEST] {{connection.py:718}} INFO - closed
[2024-04-23, 13:21:45 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-23, 13:21:45 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-23, 13:21:45 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-23, 13:21:45 CEST] {{utils.py:39}} INFO - Now: 2024-04-23 13:21:45.779916+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe tz. Diff: 29 hours
[2024-04-23, 13:21:45 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-23, 13:21:45 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.... more than 12 hours the same logs ....
[2024-04-24, 08:32:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-24, 08:32:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-24, 08:32:44 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 3 of 3
[2024-04-24, 08:32:45 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-24, 08:32:45 CEST] {{standard_task_runner.py:60}} INFO - Started process 21816 to run task
[2024-04-24, 08:32:45 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4465363', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpunyvl5kv']
[2024-04-24, 08:32:45 CEST] {{standard_task_runner.py:88}} INFO - Job 4465363: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-24, 08:32:45 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-7.eu-west-1.compute.internal
[2024-04-24, 08:32:45 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-24, 08:32:45 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-24, 08:32:45 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-24, 08:32:45 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-24, 08:32:46 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-24, 08:32:46 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-24, 08:32:46 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-24, 08:32:46 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-24, 08:32:46 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-24, 08:32:46 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-24, 08:32:46 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-24, 08:32:46 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-24, 08:32:46 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-24, 08:32:46 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3e0a8-0103-a8ac-0000-1e65c9c140be
[2024-04-24, 08:32:46 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-24, 08:32:47 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-24, 08:32:47 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-24, 08:32:47 CEST] {{connection.py:718}} INFO - closed
[2024-04-24, 08:32:47 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-24, 08:32:47 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-24, 08:32:47 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-24, 08:32:47 CEST] {{utils.py:39}} INFO - Now: 2024-04-24 08:32:47.097400+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe tz. Diff: 48 hours
[2024-04-24, 08:32:47 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-24, 08:32:47 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-24, 08:32:47 CEST] {{local_task_job_runner.py:234}} INFO - Task exited with return code 0
[2024-04-24, 08:32:47 CEST] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

funes79 avatar Apr 24 '24 07:04 funes79

This won't solve the problem, but might help surface more logs and lead to finding the solution. You'll have a complete set of logs if you increase the celery visibility timeout (details here).

RNHTTR avatar Apr 25 '24 15:04 RNHTTR

Unfortunately it looks like it is a locked down parameter :-(

Amazon MWAA reserves a set of critical environment variables. If you overwrite a reserved variable, Amazon MWAA restores it to its default. The following lists the reserved variables: AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html

funes79 avatar Apr 26 '24 05:04 funes79

Unfortunately it looks like it is a locked down parameter :-(

Amazon MWAA reserves a set of critical environment variables. If you overwrite a reserved variable, Amazon MWAA restores it to its default. The following lists the reserved variables: AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html

If this is MWAA specific problem then you need to contact MWAA support

eladkal avatar May 20 '24 11:05 eladkal

Some of the SQL Sensors from time to time does not respect the timeout interval and run the entire DAG for multiple days. The sensors we use have waiting timeout 12 hours and usually starts after midnight or early morning. Normally all sensors finish - either skips (soft fail = True) or succeeds - by afternoon. However from time to time it happen that the sensor has one or more retries, and it is running for more than 24 or 36 hours. In the last case I thought maybe the counting is from the last retry attempt, but no, because the log says it started to poke 17 hours ago.

I assume you have mode=reschedule ? If so I assume you are using execution_timeout parameter and missing timeout parameter.

If so please read https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts

eladkal avatar May 20 '24 11:05 eladkal

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Jun 04 '24 00:06 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Jun 11 '24 00:06 github-actions[bot]