airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Python operators are printing more than one task log at a time

Open rustikk opened this issue 3 years ago • 2 comments

Apache Airflow version

main (development)

What happened

When using the python operator on scheduled runs the task logs are repeating multiple times. This issue doesn't occur when using the web UI to trigger the dags Screen Shot 2022-03-03 at 4 24 09 PM

What you expected to happen

I expected the task details for each task to show only one print out of that tasks logs

How to reproduce

from airflow.models import DAG
from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta

dag_name = "test_uri_generation"

def dummy_check4bugs():
    pass

with DAG(
    dag_id=dag_name,
    start_date=datetime(2021,1, 1),
    schedule_interval=timedelta(days=30, hours=12),
) as dag:

    t0 = PythonOperator(
        task_id="add_conn",
        #python_callable=add_conn,
        python_callable=dummy_check4bugs,
    )

    t1 = PythonOperator(
        task_id="check_uri_generation",
        #python_callable=check_uri_gen,
        python_callable=dummy_check4bugs,
    )
t0 >> t1

Operating System

Docker

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Airflow Breeze

Anything else

This bug occurs every time though only on scheduled runs and the first dag that runs typically repeats the logs the most.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

rustikk avatar Mar 03 '22 23:03 rustikk

@potiuk @kaxil @rustikk ,

Can I be assigned for this issue? I know what the problem is, but still thinking of how to resolve it.

The problem is that the log is printed whenever "are_dependencies_met" is called.

In taskinstance.py, are_dependencies_met is called even if the verbose_aware_logger("Dependencies all met for %s", self) line doesn't need to be called twice in the same __init__.

I think verbose_aware_logger("Dependencies all met for %s", self) should be called only once in __init__ regardless the number of calls for are_dependencies_met.

# taskinstance.py
# line 1202~
if not self.are_dependencies_met(
                dep_context=non_requeueable_dep_context, session=session, verbose=True
            ):
                session.commit()
                return False
# taskinstance.py
# line 1230~
if not self.are_dependencies_met(dep_context=dep_context, session=session, verbose=True):
                self.state = State.NONE
                self.log.warning(hr_line_break)
                self.log.warning(
                    "Rescheduling due to concurrency limits reached "
                    "at task runtime. Attempt %s of "
                    "%s. State set to NONE.",
                    self.try_number,
                    self.max_tries + 1,
                )
                self.log.warning(hr_line_break)
                self.queued_dttm = timezone.utcnow()
                session.merge(self)
                session.commit()
                return False
# taskinstance.py
# line 1050~
...
verbose_aware_logger("Dependencies all met for %s", self)

In terms of PR, I would love to hear any suggestions from you. Thank you.

treyyi avatar Mar 04 '22 06:03 treyyi

Go ahead a submit a PR!

uranusjr avatar Mar 04 '22 09:03 uranusjr

This doesn't have an active assignee. I will take this up and fix it

amoghrajesh avatar Jul 08 '23 04:07 amoghrajesh

I tried reproducing with the given DAG but i am unable to. Tried minute wise scheduling but the results don't show this. @treyyi can you confirm that this is still an issue, or we can close this issue.

amoghrajesh avatar Jul 09 '23 05:07 amoghrajesh

Closing this issue as "not reproducible". Feel free to reopen if you can reproduce it

amoghrajesh avatar Jul 27 '23 13:07 amoghrajesh