airflow
airflow copied to clipboard
SQLSensor does not time out
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.8.1
What happened?
Some of the SQL Sensors from time to time does not respect the timeout interval and run the entire DAG for multiple days. The sensors we use have waiting timeout 12 hours and usually starts after midnight or early morning. Normally all sensors finish - either skips (soft fail = True) or succeeds - by afternoon. However from time to time it happen that the sensor has one or more retries, and it is running for more than 24 or 36 hours. In the last case I thought maybe the counting is from the last retry attempt, but no, because the log says it started to poke 17 hours ago.
What you think should happen instead?
The sensor should quit (time out) and skip reliably.
How to reproduce
I dont have a way to reproduce, it happens cca twice per month with 500 sensors running every day.
Operating System
mwaa
Versions of Apache Airflow Providers
No response
Deployment
Amazon (AWS) MWAA
Deployment details
No response
Anything else?
This is a log of the sensor what started 23.4.2024 and should time out after 12 hours, but was running until manually marked as failed on 24.4.2024.
# 1st attempt log
cannot find in Cloudwatch logs
# 2nd attempt log
[2024-04-23, 00:11:33 CEST] {{taskinstance.py:1946}} INFO - Dependencies not met for <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for retry yet but will be retried automatically. Current date is 2024-04-22T22:11:33.317570+00:00 and task will be retried at 2024-04-22T22:15:30.875998+00:00.
[2024-04-23, 00:11:33 CEST] {{local_task_job_runner.py:160}} INFO - Task is not able to be run
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 2 of 3
[2024-04-23, 00:15:51 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-23, 00:15:51 CEST] {{standard_task_runner.py:60}} INFO - Started process 22332 to run task
[2024-04-23, 00:15:51 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4288708', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpgk9mn9v9']
[2024-04-23, 00:15:51 CEST] {{standard_task_runner.py:88}} INFO - Job 4288708: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-23, 00:15:52 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-91.eu-west-1.compute.internal
[2024-04-23, 00:15:52 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-23, 00:15:53 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 00:15:54 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-23, 00:15:55 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 00:15:58 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-23, 00:15:58 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 00:15:58 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 00:15:58 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-23, 00:15:58 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-23, 00:15:58 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 00:15:58 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 00:15:58 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-23, 00:15:58 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-23, 00:15:58 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3d917-0103-a838-0000-1e65c92bb7b6
[2024-04-23, 00:15:58 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-23, 00:15:58 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 00:15:58 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 00:15:58 CEST] {{connection.py:718}} INFO - closed
[2024-04-23, 00:15:58 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-23, 00:15:58 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-23, 00:15:58 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-23, 00:15:58 CEST] {{utils.py:39}} INFO - Now: 2024-04-23 00:15:58.613292+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe/Oslo tz. Diff: 16 hours
[2024-04-23, 00:15:58 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-23, 00:15:58 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-23, 00:15:59 CEST] {{local_task_job_runner.py:234}} INFO - Task exited with return code 0
[2024-04-23, 00:15:59 CEST] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check
... and goes until 13:04
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 2 of 3
[2024-04-23, 13:04:34 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-23, 13:04:34 CEST] {{standard_task_runner.py:60}} INFO - Started process 3851 to run task
[2024-04-23, 13:04:34 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4385255', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpwsumf49p']
[2024-04-23, 13:04:34 CEST] {{standard_task_runner.py:88}} INFO - Job 4385255: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-23, 13:04:34 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-80.eu-west-1.compute.internal
[2024-04-23, 13:04:35 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-23, 13:04:35 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:04:35 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-23, 13:04:35 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:04:35 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-23, 13:04:35 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:04:35 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:04:35 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-23, 13:04:35 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-23, 13:04:35 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:04:35 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:04:35 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-23, 13:04:35 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-23, 13:04:35 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3dc18-0103-a828-0000-1e65c96e9756
[2024-04-23, 13:04:35 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-23, 13:04:35 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:04:35 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:04:35 CEST] {{connection.py:718}} INFO - closed
[2024-04-23, 13:04:35 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-23, 13:04:35 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-23, 13:04:35 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-23, 13:04:35 CEST] {{utils.py:39}} INFO - Now: 2024-04-23 13:04:35.788770+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe/Oslo tz. Diff: 28 hours
[2024-04-23, 13:04:35 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-23, 13:04:35 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-23, 13:04:35 CEST] {{local_task_job_runner.py:234}} INFO - Task exited with return code 0
[2024-04-23, 13:04:36 CEST] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check
# this is the 3rd attempt log:
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 3 of 3
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-23, 13:21:44 CEST] {{standard_task_runner.py:60}} INFO - Started process 27489 to run task
[2024-04-23, 13:21:44 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4385455', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpul8_47ot']
[2024-04-23, 13:21:44 CEST] {{standard_task_runner.py:88}} INFO - Job 4385455: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-23, 13:21:44 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-91.eu-west-1.compute.internal
[2024-04-23, 13:21:44 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-23, 13:21:44 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:21:44 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-23, 13:21:44 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-23, 13:21:45 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-23, 13:21:45 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:21:45 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:21:45 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-23, 13:21:45 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-23, 13:21:45 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:21:45 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:21:45 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-23, 13:21:45 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-23, 13:21:45 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3dc29-0103-a828-0000-1e65c96f367a
[2024-04-23, 13:21:45 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-23, 13:21:45 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-23, 13:21:45 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-23, 13:21:45 CEST] {{connection.py:718}} INFO - closed
[2024-04-23, 13:21:45 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-23, 13:21:45 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-23, 13:21:45 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-23, 13:21:45 CEST] {{utils.py:39}} INFO - Now: 2024-04-23 13:21:45.779916+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe tz. Diff: 29 hours
[2024-04-23, 13:21:45 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-23, 13:21:45 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.... more than 12 hours the same logs ....
[2024-04-24, 08:32:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-24, 08:32:44 CEST] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [queued]>
[2024-04-24, 08:32:44 CEST] {{taskinstance.py:2170}} INFO - Starting attempt 3 of 3
[2024-04-24, 08:32:45 CEST] {{taskinstance.py:2191}} INFO - Executing <Task(SqlSensor): waiting-sensor-BASE.IMPORT_TBL> on 2024-04-21 22:00:00+00:00
[2024-04-24, 08:32:45 CEST] {{standard_task_runner.py:60}} INFO - Started process 21816 to run task
[2024-04-24, 08:32:45 CEST] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_inputs_BASE', 'waiting-sensor-BASE.IMPORT_TBL', 'scheduled__2024-04-21T22:00:00+00:00', '--job-id', '4465363', '--raw', '--subdir', 'DAGS_FOLDER/orgcode/custom_dags/dag_sensors_per_schema.py', '--cfg-path', '/tmp/tmpunyvl5kv']
[2024-04-24, 08:32:45 CEST] {{standard_task_runner.py:88}} INFO - Job 4465363: Subtask waiting-sensor-BASE.IMPORT_TBL
[2024-04-24, 08:32:45 CEST] {{task_command.py:423}} INFO - Running <TaskInstance: dbt_inputs_BASE.waiting-sensor-BASE.IMPORT_TBL scheduled__2024-04-21T22:00:00+00:00 [running]> on host ip-10-24-192-7.eu-west-1.compute.internal
[2024-04-24, 08:32:45 CEST] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dbt_inputs_BASE' AIRFLOW_CTX_TASK_ID='waiting-sensor-BASE.IMPORT_TBL' AIRFLOW_CTX_EXECUTION_DATE='2024-04-21T22:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-21T22:00:00+00:00'
[2024-04-24, 08:32:45 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-24, 08:32:45 CEST] {{sql.py:95}} INFO - Poking: ["select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL"] (with parameters None)
[2024-04-24, 08:32:45 CEST] {{base.py:83}} INFO - Using connection ID 'SF' for task execution.
[2024-04-24, 08:32:46 CEST] {{cursor.py:1028}} INFO - query: [ALTER SESSION SET autocommit=False]
[2024-04-24, 08:32:46 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-24, 08:32:46 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-24, 08:32:46 CEST] {{sql.py:450}} INFO - Running statement: select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'), count(*) from BASE.IMPORT_TBL, parameters: None
[2024-04-24, 08:32:46 CEST] {{cursor.py:1028}} INFO - query: [select SYSTEM$LAST_CHANGE_COMMIT_TIME('BASE.IMPORT_TBL'...]
[2024-04-24, 08:32:46 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-24, 08:32:46 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-24, 08:32:46 CEST] {{sql.py:459}} INFO - Rows affected: 1
[2024-04-24, 08:32:46 CEST] {{snowflake.py:400}} INFO - Rows affected: 1
[2024-04-24, 08:32:46 CEST] {{snowflake.py:401}} INFO - Snowflake query id: 01b3e0a8-0103-a8ac-0000-1e65c9c140be
[2024-04-24, 08:32:46 CEST] {{cursor.py:1028}} INFO - query: [COMMIT]
[2024-04-24, 08:32:47 CEST] {{cursor.py:1041}} INFO - query execution done
[2024-04-24, 08:32:47 CEST] {{cursor.py:1205}} INFO - Number of results in first chunk: 1
[2024-04-24, 08:32:47 CEST] {{connection.py:718}} INFO - closed
[2024-04-24, 08:32:47 CEST] {{connection.py:724}} INFO - No async queries seem to be running, deleting session
[2024-04-24, 08:32:47 CEST] {{logging_mixin.py:188}} INFO - Readiness: {} record: (1713766201148000000, 668705)
[2024-04-24, 08:32:47 CEST] {{logging_mixin.py:188}} INFO - 1713766201148000000 has to be today
[2024-04-24, 08:32:47 CEST] {{utils.py:39}} INFO - Now: 2024-04-24 08:32:47.097400+02:00, checked ts: 2024-04-22 08:10:01.148000+02:00 - both in Europe tz. Diff: 48 hours
[2024-04-24, 08:32:47 CEST] {{logging_mixin.py:188}} INFO - Records check: True 668705
[2024-04-24, 08:32:47 CEST] {{taskinstance.py:2658}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-24, 08:32:47 CEST] {{local_task_job_runner.py:234}} INFO - Task exited with return code 0
[2024-04-24, 08:32:47 CEST] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This won't solve the problem, but might help surface more logs and lead to finding the solution. You'll have a complete set of logs if you increase the celery visibility timeout (details here).
Unfortunately it looks like it is a locked down parameter :-(
Amazon MWAA reserves a set of critical environment variables. If you overwrite a reserved variable, Amazon MWAA restores it to its default. The following lists the reserved variables: AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
Unfortunately it looks like it is a locked down parameter :-(
Amazon MWAA reserves a set of critical environment variables. If you overwrite a reserved variable, Amazon MWAA restores it to its default. The following lists the reserved variables: AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
If this is MWAA specific problem then you need to contact MWAA support
Some of the SQL Sensors from time to time does not respect the timeout interval and run the entire DAG for multiple days. The sensors we use have waiting timeout 12 hours and usually starts after midnight or early morning. Normally all sensors finish - either skips (soft fail = True) or succeeds - by afternoon. However from time to time it happen that the sensor has one or more retries, and it is running for more than 24 or 36 hours. In the last case I thought maybe the counting is from the last retry attempt, but no, because the log says it started to poke 17 hours ago.
I assume you have mode=reschedule
? If so I assume you are using execution_timeout
parameter and missing timeout
parameter.
If so please read https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.