Python operators are printing more than one task log at a time
Apache Airflow version
main (development)
What happened
When using the python operator on scheduled runs the task logs are repeating multiple times. This issue doesn't occur when using the web UI to trigger the dags

What you expected to happen
I expected the task details for each task to show only one print out of that tasks logs
How to reproduce
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
dag_name = "test_uri_generation"
def dummy_check4bugs():
pass
with DAG(
dag_id=dag_name,
start_date=datetime(2021,1, 1),
schedule_interval=timedelta(days=30, hours=12),
) as dag:
t0 = PythonOperator(
task_id="add_conn",
#python_callable=add_conn,
python_callable=dummy_check4bugs,
)
t1 = PythonOperator(
task_id="check_uri_generation",
#python_callable=check_uri_gen,
python_callable=dummy_check4bugs,
)
t0 >> t1
Operating System
Docker
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
Airflow Breeze
Anything else
This bug occurs every time though only on scheduled runs and the first dag that runs typically repeats the logs the most.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
@potiuk @kaxil @rustikk ,
Can I be assigned for this issue? I know what the problem is, but still thinking of how to resolve it.
The problem is that the log is printed whenever "are_dependencies_met" is called.
In taskinstance.py, are_dependencies_met is called even if the verbose_aware_logger("Dependencies all met for %s", self) line doesn't need to be called twice in the same __init__.
I think verbose_aware_logger("Dependencies all met for %s", self) should be called only once in __init__ regardless the number of calls for are_dependencies_met.
# taskinstance.py
# line 1202~
if not self.are_dependencies_met(
dep_context=non_requeueable_dep_context, session=session, verbose=True
):
session.commit()
return False
# taskinstance.py
# line 1230~
if not self.are_dependencies_met(dep_context=dep_context, session=session, verbose=True):
self.state = State.NONE
self.log.warning(hr_line_break)
self.log.warning(
"Rescheduling due to concurrency limits reached "
"at task runtime. Attempt %s of "
"%s. State set to NONE.",
self.try_number,
self.max_tries + 1,
)
self.log.warning(hr_line_break)
self.queued_dttm = timezone.utcnow()
session.merge(self)
session.commit()
return False
# taskinstance.py
# line 1050~
...
verbose_aware_logger("Dependencies all met for %s", self)
In terms of PR, I would love to hear any suggestions from you. Thank you.
Go ahead a submit a PR!
This doesn't have an active assignee. I will take this up and fix it
I tried reproducing with the given DAG but i am unable to. Tried minute wise scheduling but the results don't show this. @treyyi can you confirm that this is still an issue, or we can close this issue.
Closing this issue as "not reproducible". Feel free to reopen if you can reproduce it