airflow icon indicating copy to clipboard operation
airflow copied to clipboard

[QUARANTINED] test_scheduler_task_start_date is flaky

Open potiuk opened this issue 4 years ago • 3 comments

I've seen this test failing more than once for example:

https://github.com/potiuk/airflow/runs/2223465940?check_suite_focus=true#step:6:10954

Marking it as quarantined.

_______________ TestSchedulerJob.test_scheduler_task_start_date ________________
  
  self = <tests.jobs.test_scheduler_job.TestSchedulerJob testMethod=test_scheduler_task_start_date>
  
      def test_scheduler_task_start_date(self):
          """
          Test that the scheduler respects task start dates that are different from DAG start dates
          """
      
          dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), include_examples=False)
          dag_id = 'test_task_start_date_scheduling'
          dag = self.dagbag.get_dag(dag_id)
          dag.is_paused_upon_creation = False
          dagbag.bag_dag(dag=dag, root_dag=dag)
      
          # Deactivate other dags in this file so the scheduler doesn't waste time processing them
          other_dag = self.dagbag.get_dag('test_start_date_scheduling')
          other_dag.is_paused_upon_creation = True
          dagbag.bag_dag(dag=other_dag, root_dag=other_dag)
      
          dagbag.sync_to_db()
      
          self.scheduler_job = SchedulerJob(executor=self.null_exec, subdir=dag.fileloc, num_runs=2)
          self.scheduler_job.run()
      
          session = settings.Session()
          tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
          ti1s = tiq.filter(TaskInstance.task_id == 'dummy1').all()
          ti2s = tiq.filter(TaskInstance.task_id == 'dummy2').all()
          assert len(ti1s) == 0
  >       assert len(ti2s) == 2
  E       AssertionError: assert 1 == 2
  E        +  where 1 = len([<TaskInstance: test_task_start_date_scheduling.dummy2 2016-01-01 00:00:00+00:00 [success]>])
  
  tests/jobs/test_scheduler_job.py:2518: AssertionError

Apache Airflow version:

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • Cloud provider or hardware configuration:
  • OS (e.g. from /etc/os-release):
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:

What happened:

What you expected to happen:

How to reproduce it:

Anything else we need to know:

potiuk avatar Mar 30 '21 08:03 potiuk

Did it happen again? https://github.com/apache/airflow/actions/runs/7153230354/job/19479506575?pr=36144#step:6:14652

I can't catch it on my laptop. I've run it ~1000 times. Maybe it only occurs in parallel?

avkirilishin avatar Dec 09 '23 23:12 avkirilishin

Yeah. Let me re-open it then. Yes. Flaky tests with very low probability of failing are the worst to track. Parallel runs have this property that they are running in a very "SLOW" environment (per individual test type).

Things are often taking minutes there - which usually take seconds - when there is something else happening in other tests (I/O contention mostly, but also some kernel synchronisation as all the test runs have kernel in common (all docker containers share the same kernel) and use the same I/O under the hood for the database. This makes the test more "real" (Airflow in production will be equally or even more "busy" often).

So in a way it's good we have those because they might detect some issues we would not detect running them on a FAST system which is not busy - races, deadlocks and the like. Of course it might be a "bad test" issue (and often is) - but I can recall at least several times in the past where such flaky test indicated an actual problem in airflow code.

Just to add - capturing the stack trace here, of the failing test - the logs will be gone in 4 weeks or so, so it's good to capture the stack trace in the comment.

=================================== FAILURES ===================================
__________ TestSchedulerJob.test_scheduler_task_start_date[configs0] ___________

self = <tests.jobs.test_scheduler_job.TestSchedulerJob object at 0x7fc6e8cd8190>
configs = {('scheduler', 'standalone_dag_processor'): 'False'}

    @pytest.mark.parametrize(
        "configs",
        [
            {("scheduler", "standalone_dag_processor"): "False"},
            {("scheduler", "standalone_dag_processor"): "True"},
        ],
    )
    def test_scheduler_task_start_date(self, configs):
        """
        Test that the scheduler respects task start dates that are different from DAG start dates
        """
        with conf_vars(configs):
            dagbag = DagBag(
                dag_folder=os.path.join(settings.DAGS_FOLDER, "test_scheduler_dags.py"),
                include_examples=False,
            )
            dag_id = "test_task_start_date_scheduling"
            dag = self.dagbag.get_dag(dag_id)
            dag.is_paused_upon_creation = False
            dagbag.bag_dag(dag=dag, root_dag=dag)
    
            # Deactivate other dags in this file so the scheduler doesn't waste time processing them
            other_dag = self.dagbag.get_dag("test_start_date_scheduling")
            other_dag.is_paused_upon_creation = True
            dagbag.bag_dag(dag=other_dag, root_dag=other_dag)
    
            dagbag.sync_to_db()
    
            scheduler_job = Job(
                executor=self.null_exec,
            )
            self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=dag.fileloc, num_runs=3)
            run_job(scheduler_job, execute_callable=self.job_runner._execute)
    
            session = settings.Session()
            tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
            ti1s = tiq.filter(TaskInstance.task_id == "dummy1").all()
            ti2s = tiq.filter(TaskInstance.task_id == "dummy2").all()
            assert len(ti1s) == 0
            assert len(ti2s) >= 2
            for task in ti2s:
>               assert task.state == State.SUCCESS
E               AssertionError: assert equals failed
E                 None                             <TaskInstanceState.SUCCESS: 'su 
E                                                  ccess'>

tests/jobs/test_scheduler_job.py:2528: AssertionError

potiuk avatar Dec 10 '23 08:12 potiuk

@potiuk It seems that test interference is causing the issue. I propose using unique task IDs across tests to avoid interference and ensure isolation during execution. This should help prevent conflicts between tasks and improve test reliability. What do you think?

dirrao avatar Oct 19 '24 06:10 dirrao

Yes. But question is how to make them unique

potiuk avatar Oct 24 '24 00:10 potiuk

This is not happenning recently. Likely fixed by other flaky test fixes.

potiuk avatar Dec 01 '24 17:12 potiuk