[QUARANTINED] test_scheduler_task_start_date is flaky
I've seen this test failing more than once for example:
https://github.com/potiuk/airflow/runs/2223465940?check_suite_focus=true#step:6:10954
Marking it as quarantined.
_______________ TestSchedulerJob.test_scheduler_task_start_date ________________
self = <tests.jobs.test_scheduler_job.TestSchedulerJob testMethod=test_scheduler_task_start_date>
def test_scheduler_task_start_date(self):
"""
Test that the scheduler respects task start dates that are different from DAG start dates
"""
dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), include_examples=False)
dag_id = 'test_task_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.is_paused_upon_creation = False
dagbag.bag_dag(dag=dag, root_dag=dag)
# Deactivate other dags in this file so the scheduler doesn't waste time processing them
other_dag = self.dagbag.get_dag('test_start_date_scheduling')
other_dag.is_paused_upon_creation = True
dagbag.bag_dag(dag=other_dag, root_dag=other_dag)
dagbag.sync_to_db()
self.scheduler_job = SchedulerJob(executor=self.null_exec, subdir=dag.fileloc, num_runs=2)
self.scheduler_job.run()
session = settings.Session()
tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
ti1s = tiq.filter(TaskInstance.task_id == 'dummy1').all()
ti2s = tiq.filter(TaskInstance.task_id == 'dummy2').all()
assert len(ti1s) == 0
> assert len(ti2s) == 2
E AssertionError: assert 1 == 2
E + where 1 = len([<TaskInstance: test_task_start_date_scheduling.dummy2 2016-01-01 00:00:00+00:00 [success]>])
tests/jobs/test_scheduler_job.py:2518: AssertionError
Apache Airflow version:
Kubernetes version (if you are using kubernetes) (use kubectl version):
Environment:
- Cloud provider or hardware configuration:
- OS (e.g. from /etc/os-release):
- Kernel (e.g.
uname -a): - Install tools:
- Others:
What happened:
What you expected to happen:
How to reproduce it:
Anything else we need to know:
Did it happen again? https://github.com/apache/airflow/actions/runs/7153230354/job/19479506575?pr=36144#step:6:14652
I can't catch it on my laptop. I've run it ~1000 times. Maybe it only occurs in parallel?
Yeah. Let me re-open it then. Yes. Flaky tests with very low probability of failing are the worst to track. Parallel runs have this property that they are running in a very "SLOW" environment (per individual test type).
Things are often taking minutes there - which usually take seconds - when there is something else happening in other tests (I/O contention mostly, but also some kernel synchronisation as all the test runs have kernel in common (all docker containers share the same kernel) and use the same I/O under the hood for the database. This makes the test more "real" (Airflow in production will be equally or even more "busy" often).
So in a way it's good we have those because they might detect some issues we would not detect running them on a FAST system which is not busy - races, deadlocks and the like. Of course it might be a "bad test" issue (and often is) - but I can recall at least several times in the past where such flaky test indicated an actual problem in airflow code.
Just to add - capturing the stack trace here, of the failing test - the logs will be gone in 4 weeks or so, so it's good to capture the stack trace in the comment.
=================================== FAILURES ===================================
__________ TestSchedulerJob.test_scheduler_task_start_date[configs0] ___________
self = <tests.jobs.test_scheduler_job.TestSchedulerJob object at 0x7fc6e8cd8190>
configs = {('scheduler', 'standalone_dag_processor'): 'False'}
@pytest.mark.parametrize(
"configs",
[
{("scheduler", "standalone_dag_processor"): "False"},
{("scheduler", "standalone_dag_processor"): "True"},
],
)
def test_scheduler_task_start_date(self, configs):
"""
Test that the scheduler respects task start dates that are different from DAG start dates
"""
with conf_vars(configs):
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "test_scheduler_dags.py"),
include_examples=False,
)
dag_id = "test_task_start_date_scheduling"
dag = self.dagbag.get_dag(dag_id)
dag.is_paused_upon_creation = False
dagbag.bag_dag(dag=dag, root_dag=dag)
# Deactivate other dags in this file so the scheduler doesn't waste time processing them
other_dag = self.dagbag.get_dag("test_start_date_scheduling")
other_dag.is_paused_upon_creation = True
dagbag.bag_dag(dag=other_dag, root_dag=other_dag)
dagbag.sync_to_db()
scheduler_job = Job(
executor=self.null_exec,
)
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=dag.fileloc, num_runs=3)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
session = settings.Session()
tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
ti1s = tiq.filter(TaskInstance.task_id == "dummy1").all()
ti2s = tiq.filter(TaskInstance.task_id == "dummy2").all()
assert len(ti1s) == 0
assert len(ti2s) >= 2
for task in ti2s:
> assert task.state == State.SUCCESS
E AssertionError: assert equals failed
E None <TaskInstanceState.SUCCESS: 'su
E ccess'>
tests/jobs/test_scheduler_job.py:2528: AssertionError
@potiuk It seems that test interference is causing the issue. I propose using unique task IDs across tests to avoid interference and ensure isolation during execution. This should help prevent conflicts between tasks and improve test reliability. What do you think?
Yes. But question is how to make them unique
This is not happenning recently. Likely fixed by other flaky test fixes.