airflow-maintenance-dags
airflow-maintenance-dags copied to clipboard
Db cleanup failes after upgrade to python3.9 and airflow2.2.2
I have recently upgraded to Python3.9 Airflow2.2.2. Following error is occurring repeatedly after the upgrade. I've only changed the parameters indicated in the repo README and is running everything else the same. Other lib versions: SQLAlchemy 1.4.1 Flask-SQLAlchemy 2.4.3 Can you please check the problem here?
Job 108936: Subtask cleanup_TaskInstance
Running <TaskInstance: admin_airflow_db_cleanup.cleanup_TaskInstance manual__2021-12-01T09:58:53.886445+00:00 [running]> on host 2e7d9eccb27e
Exporting the following env vars:
[email protected],[email protected]
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=admin_airflow_db_cleanup
AIRFLOW_CTX_TASK_ID=cleanup_TaskInstance
AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T09:58:53.886445+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T09:58:53.886445+00:00
Retrieving max_execution_date from XCom
Configurations:
max_date: 2021-11-01 09:58:59.085046+00:00
enable_delete: True
session: <sqlalchemy.orm.session.Session object at 0x7f607aed7c40>
airflow_db_model: <class 'airflow.models.taskinstance.TaskInstance'>
state: None
age_check_column: ColumnAssociationProxyInstance(AssociationProxy('dag_run', 'execution_date'))
keep_last: False
keep_last_filters: None
keep_last_group_by: None
Running Cleanup Process...
Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 151, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 162, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/vimcar-bi/airflow_dags/admin_airflow_db_cleanup.py", line 291, in cleanup_function
query = session.query(airflow_db_model).options(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1619, in options
return self._options(False, *args)
File "<string>", line 2, in _options
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/base.py", line 227, in generate
fn(self, *args[1:], **kw)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1638, in _options
opt.process_query(self)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 176, in process_query
self._process(query, True)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 677, in _process
val._bind_loader(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 829, in _bind_loader
raise sa_exc.ArgumentError(
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
Marking task as UP_FOR_RETRY. dag_id=admin_airflow_db_cleanup, task_id=cleanup_TaskInstance, execution_date=20211201T095853, start_date=20211201T170532, end_date=20211201T170532
Failed to execute job 108936 for task cleanup_TaskInstance
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
ti._run_raw_task(
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 151, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 162, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/vimcar-bi/airflow_dags/admin_airflow_db_cleanup.py", line 291, in cleanup_function
query = session.query(airflow_db_model).options(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1619, in options
return self._options(False, *args)
File "<string>", line 2, in _options
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/base.py", line 227, in generate
fn(self, *args[1:], **kw)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1638, in _options
opt.process_query(self)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 176, in process_query
self._process(query, True)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 677, in _process
val._bind_loader(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 829, in _bind_loader
raise sa_exc.ArgumentError(
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
Task exited with return code 1
0 downstream tasks scheduled from follow-on schedule check
Seems duplicated with #117
I think that the issue is very similar but I'm using airflow 2.3.2 from docker with python 3.7. I'm getting the same error only for following tasks: cleanup_TaskInstance, cleanup_BaseXCom, cleanup_TaskReschedule, cleanup_RenderedTaskInstanceFields. Other tasks finish successfully.
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:265} INFO - Retrieving max_execution_date from XCom
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:278} INFO - Configurations:
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:279} INFO - max_date: 2022-05-21 17:18:41.939540+00:00
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:280} INFO - enable_delete: True
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:281} INFO - session: <sqlalchemy.orm.session.Session object at 0x7f7102e0c750>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:282} INFO - airflow_db_model: <class 'airflow.models.taskinstance.TaskInstance'>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:283} INFO - state: None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:284} INFO - age_check_column: ColumnAssociationProxyInstance(AssociationProxy('dag_run', 'execution_date'))
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:285} INFO - keep_last: False
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:286} INFO - keep_last_filters: None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:287} INFO - keep_last_group_by: None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:289} INFO -
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:291} INFO - Running Cleanup Process...
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/airflow-db-cleanup.py", line 298, in cleanup_function
logging.info("INITIAL QUERY : " + str(query))
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2848, in __str__
return str(statement.compile(bind))
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 506, in compile
return self._compiler(dialect, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 570, in _compiler
return dialect.statement_compiler(dialect, self, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 766, in __init__
Compiled.__init__(self, dialect, statement, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 455, in __init__
self.string = self.process(self.statement, **compile_kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 490, in process
return obj._compiler_dispatch(self, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 81, in _compiler_dispatch
return meth(self, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2981, in visit_select
select_stmt, self, **kwargs
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/base.py", line 501, in create_for_statement
return klass.create_for_statement(statement, compiler, **kw)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/context.py", line 579, in create_for_statement
opt.process_compile_state(self)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 185, in process_compile_state
self._process(compile_state, not bool(compile_state.current_path))
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 718, in _process
raiseerr,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 870, in _bind_loader
"mapper option expects " "string key or list of attributes"
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=airflow-db-cleanup, task_id=cleanup_TaskInstance, execution_date=20220619T000000, start_date=20220620T171951, end_date=20220620T171952
[2022-06-20, 17:19:52 UTC] {logging_mixin.py:115} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/models/param.py:62 DeprecationWarning: The use of non-json-serializable params is deprecated and will be removed in a future release
Tasks cleanup_TaskInstance
and cleanup_TaskReschedule
fail with Python 3.8.12
and Airflow 2.2.2
. Same error sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
This seems to be related to missing columns in the db tables to be cleaned for those two tasks.
I fixed it replacing execution_date
with end_date
in age_check_column
for both cleanup_TaskInstance
and cleanup_TaskReschedule
. Let me know whether there is a more appropriate field that should be used.
I also suggest adding
# Extract db table columns' names
table_columns = [str(colname).split(".")[1] for colname in airflow_db_model.__table__.columns]
age_check_col_name = str(age_check_column).split(".")[1]
if age_check_col_name not in table_columns:
raise ValueError(f"{age_check_col_name} field not in table {airflow_db_model.__table__.name}")
in cleanup_function
to check that the age_check_column
selected for a specific column is found in the table to be cleaned
I read thru #117 and it looks that comment from @PhilippDB makes sense and can be the solution.
@e-compagno please be aware of what wrote @tylerwmarrs - using start_date or end_date may refer to incorrect records that are not tied with records in dag_run table. The tables have following constrains in DDL:
CREATE TABLE task_instance(
task_id character varying(250) NOT NULL,
dag_id character varying(250) NOT NULL,
run_id character varying(250) NOT NULL,
[...]
PRIMARY KEY(task_id,dag_id,run_id,map_index),
CONSTRAINT task_instance_dag_run_fkey FOREIGN key(dag_id) REFERENCES dag_run(dag_id),
CONSTRAINT task_instance_dag_run_fkey FOREIGN key(run_id) REFERENCES dag_run(dag_id),
CONSTRAINT task_instance_trigger_id_fkey FOREIGN key(trigger_id) REFERENCES trigger(id)
);
https://github.com/GoogleCloudPlatform/python-docs-samples/pull/7847 Fixed it for me
The problem is still active in version 2.4.1. The cloud composer version does not fix TaskReschedule
cleaning and also break the xcom cleaning operation as execution_date
is not a field in the db. Is there any update on the issue resolution?
Would
"airflow_db_model": XCom,
"age_check_column": XCom.timestamp,
fixes the issue?
XCom.timestamp
solves the error on XCom
, not the others
XCom.timestamp
solves the error onXCom
, not the others
you can find the time column in model file. for example TaskInstance you can use queued_dttm/end_date
The problem is still active in version 2.4.1. The cloud composer version does not fix
TaskReschedule
cleaning and also break the xcom cleaning operation asexecution_date
is not a field in the db. Is there any update on the issue resolution?Would
"airflow_db_model": XCom, "age_check_column": XCom.timestamp,
fixes the issue?
Fixes the issue for Xcom
. For RenderedTaskInstanceFields
use RenderedTaskInstanceFields.run_id
to fix the issue.