Cannot schedule a DAG on a DatasetAlias when using a clean Airflow docker image (for CI)
Apache Airflow version
2.10.4
If "Other Airflow 2 version" selected, which one?
Observed also on 2.10.2
What happened?
One of the tests in our CI is creating a DagBag. After adding a DAG that's scheduled on a DatasetAlias, the bag cannot be created. Confusingly, this issue does not occur in other settings/environments (e.g. dag.test, local pytest, non-CI Airflow run in Docker).
Relevant stack trace
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: dataset_alias
[SQL: SELECT dataset_alias.id, dataset_alias.name
FROM dataset_alias
WHERE dataset_alias.name = ?
LIMIT ? OFFSET ?]
[parameters: ('bar', 1, 0)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-12-08T16:08:22.227+0000] {variable.py:357} ERROR - Unable to retrieve variable from secrets backend (MetastoreBackend). Checking subsequent secrets backend.
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
sqlite3.OperationalError: no such table: variable
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/variable.py", line 353, in get_variable_from_secrets
var_val = secrets_backend.get_variable(key=key)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/secrets/metastore.py", line 66, in get_variable
return MetastoreBackend._fetch_variable(key=key, session=session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/api_internal/internal_api_call.py", line 139, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/secrets/metastore.py", line 84, in _fetch_variable
var_value = session.scalar(select(Variable).where(Variable.key == key).limit(1))
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1747, in scalar
return self.execute(
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self._handle_dbapi_exception(
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
util.raise_(
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: variable
[SQL: SELECT variable.val, variable.id, variable."key", variable.description, variable.is_encrypted
FROM variable
WHERE variable."key" = ?
LIMIT ? OFFSET ?]
[parameters: ('latest_processed_instrument_file_path', 1, 0)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
What you think should happen instead?
DagBag creation should not fail on a one-shot parse pass of the dags folder regardless of order.
How to reproduce
- Obtain a recent docker image, such as:
apache/airflow:2.10.4-python3.11 - Spin up a container and open a docker shell.
- Add the following DAG to the
dagsfolder:
from pendulum import datetime
from airflow import DAG
from airflow.datasets import DatasetAlias
with DAG(
dag_id="foo",
start_date=datetime(2000, 1, 1),
schedule=[
DatasetAlias("bar"),
],
catchup=False,
):
pass
- Open a python shell, and run:
from airflow.models import DagBag
DagBag(include_examples=False)
Operating System
Rocky Linux 9.3
Versions of Apache Airflow Providers
(Irrelevant)
Deployment
Other Docker-based deployment
Deployment details
Dockerfile-ci:
FROM apache/airflow:2.10.2-python3.9
# Install system packages
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends build-essential vim strace iproute2 git \
pkg-config libxml2-dev libxmlsec1-dev libxmlsec1-openssl \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
ci script:
script:
- git config --global --add safe.directory $PWD
- pip install uv pre-commit-uv --upgrade
- uv pip install -e .[dev] --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.9.txt"
- uv tool install pre-commit --force --with pre-commit-uv --force-reinstall
- export PIP_USER=false && pre-commit install --install-hooks
- pre-commit run --all-files --show-diff-on-failure
Anything else?
Presumably, this issue isn't present in a "running" Airflow instance where there is at least one DAG that outputs a DatasetAlias, which causes the necessary tables to be created, and then the 2nd parse of the alias-scheduled DAG succeeds.
I believe this happens because the docker image doesn't come with an initialized sqlite database, an issue that surfaces when the DagBag is built.
As a workaround, I tried adding an airflow db migrate step to the image creation, and while this did help with the manual test explained above, the CI kept failing. Adding airflow db migrate to the CI script itself similarly did not help.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Looks like this was fixed in 2.10.3.
Reopening because this issue is still present on 2.10.4. The root cause seems to be the fact the sqlite db that comes with the docker image is uninitialized and can't handle a DatasetAlias schedule in this state.
Possible solution:
Modify airflow/datasets/__init__.py as follows:
@@ 26 @@
- from sqlalchemy import select
+ from sqlalchemy import exc, select
@@ 142 @@
@internal_api_call
@provide_session
def expand_alias_to_datasets(
alias: str | DatasetAlias, *, session: Session = NEW_SESSION
) -> list[BaseDataset]:
"""Expand dataset alias to resolved datasets."""
from airflow.models.dataset import DatasetAliasModel
alias_name = alias.name if isinstance(alias, DatasetAlias) else alias
+ try:
dataset_alias_obj = session.scalar(
select(DatasetAliasModel).where(DatasetAliasModel.name == alias_name).limit(1)
)
+ except exc.OperationalError:
+ return []
if dataset_alias_obj:
return [Dataset(uri=dataset.uri, extra=dataset.extra) for dataset in dataset_alias_obj.datasets]
return []
Reopening because this issue is still present on 2.10.4. The root cause seems to be the fact the sqlite db that comes with the docker image is uninitialized and can't handle a DatasetAlias schedule in this state.
Interesting 🤔 I'll try to repro and see whether I can do it on my end.
@Lee-W Thank you for taking a look! Even if I'm wrong about the root cause, this issue needs a better error message. When it throws an sqla message about a missing "variable" or "dataset-alias" tables, when that isn't the case, it doesn't really help with debugging.
Btw, I didn't mention it, but on CI we're running unit tests using a pytest pre-commit hook.
If you need any more details, I'm also available on slack.
What is the exact error you are getting? Catching an OperationalError at runtime is generally the wrong thing to do. Even in a blank database, the database structure should still be there and not throw an OperationalError. If the official Docker image does, it may mean we are not cxreating the image correctly, and that should be fixed instead.
@uranusjr I included the stack trace in the issue - please see the collapsed section under "what happened?".
I just had another thought - maybe when running airflow and AIRFLOW_HOME cannot be determined, the new db created is broken.
So when running pytest, it somehow (perhaps running from the wrong folder or missing some environment variables) misses the correct db and creates one that's empty or at least has some tables missing.
While I cannot help you to answer your question, I might try to guide you with looking at the reasoning.
I think you need to describe what you are exactly doing and how you are initializing the databse of Airflow and which tests you are talking about. If your tests are accessing the DB, you have to make sure in your test setup that the database is created. This is what various fixtures are doing usually.
Airflow DB tests in Airflow CI do this by auto-use fixture that creates and initializes the DB: https://github.com/apache/airflow/blob/main/tests_common/pytest_plugin.py#L317 when it has not been initialized (which is generally the first time it runs in a clear environment - say in a new docker container). And it creates a file ".airflow_db_initialised" in HOME DIR of airlow when it does so, so it does not attempt to do it again. This file does not survive container restart usually so the intialization happens every time breeze container is started for example. This behaviour can be overwritten with --with-db-init flag that is added by our pytest plugin - when this flag is passed, database initialization happens at the beginning of pytest session.
But this is how "airflow" test suite works - we have no idea what test suite you are talking about and how you run it, and what kind of assertions your containers have (which files are preserved between runs - for example being mounted, and which are not). This is all the question of how your CI and test suite is organized.
Generally speaking - you have to make sure that your fixtures (if you use pytest) are doing the right thing and setting up the datebase for you. One of the difficulties you might have is that this also might depend on import sequence of things. Unfortunately airflow import does a lot of implicit things, some lazy loading of various components - because we are sort of trying to initialize everything when we import airflow, but we also try to avoid that initialization and do some magic with lazy loading to sometimes not to complete that intitialization to speed up things in some cases. This is a bit of duality we have - because we do "import airflow" pretty much with every possible command, but some of the commands, tests cases or direct imports should be way faster than completely importing and initializing everything that airflow needs to import (configuraiton, settings, database, plugins, providers and so on).
I hope we will do it differently in Airlfow 3 - and do much more explicit initialization of whatever we need, rather than do that half-initialization and lazy-loading dance (which also causes occassional resursive import errors when modules are importing each other while not being fully imported - depending on sequence of imports). But that's something that we will likely discuss in coming weeks when we will be discussing Airflow 3 packaging and initialization.
The consequence of that implicit loading is that we also attempt to make sure that all the models are imported before the database is reset. Most of this happens here:
https://github.com/apache/airflow/blob/main/airflow/models/init.py
And there are basically three ways how models are imported:
- TYPE_CHECKING - loading models for mypy type hint verification
- lazy-loading some old models when they are accessed using "from airlfow.models import Model" rather than importing models from sub-packages (this is done lazily because otherwise it will cause recursive imports and unnecessary loading the models
import_all_modelsmethod - that is supposed to make sure all models are loaded - for example when we run "init db" command - because sqlalchemy will only create the tables, when corresponding ORM models are imported and registered in SQL Alchemy engine. Generally you should make sure you run this method before you run "initdb" with sqlite when you create a new database. Maybe your test fixture does not do it.
So in general it's not too explicit and there are few places/paths where creation of database of Airflow might go wrong. If you have any path where you are runnning db init but do not import/load the models in mamory - you might simply create an empty database, or maybe you have similar "sentinel file" or some kind of guard that prevents the database from being initialized when needed and this file is not removed in your CI ? But I am only guessing.
My guess is that your tests do not do proper initialization, or maybe delete/remove the tables created at some point in time or maybe some combination of these.
BTW. I am converting this into a discussion. This is not really "issue" in Airlfow - it's more troubleshooting of what your test suite does or does not.