airflow
airflow copied to clipboard
hookimpl on_dag_run_failed goes to infinite loop
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.6.3
What happened?
We have our own small Airflow plugin to notify a slack channel when a DAG run fails. We use the hookimpl Listeners. We had twice a strange issue when the plugin has gone to infinite loop and spammed the channel with lots of messages. We have to remove the plugin and restart Airflow instances to stop it. We didn't see any anomalies in logs except that scheduler detected a zombie job. So we think that something may be wrong with the way how we wrote the plugin, or with Airflow.
What you think should happen instead?
No response
How to reproduce
Unfortunately we don't know how to reproduce it, but this issue happened two times.
Operating System
Ubuntu 20.04.6 LTS
Versions of Apache Airflow Providers
No response
Deployment
Google Cloud Composer
Deployment details
Composer version: composer-2.5.1-airflow-2.6.3
Number of schedulers: 2
Anything else?
This is the log row that appeared near the time when the spam started, for this exact dag id.
Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/myfolder/dag_player_value_model.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'player_value_model', 'Task Id': 'main', 'Run Id': 'scheduled__2024-04-19T08:00:00+00:00', 'Hostname': 'airflow-worker-48ssg', 'External Executor Id': '6c81b467-3ba0-430f-9fe1-fe48f3aca8a1'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f4050d6e760>, 'is_failure_callback': True}
This is our plugin code
PLUGIN CODE
"""Airflow plugin to notify channels on DagRun failure."""
import logging
from datetime import datetime
from typing import List
import requests
from airflow import settings
from airflow.listeners import hookimpl
from airflow.models import DagModel, DagRun, DagTag, TaskInstance, Variable
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import DagRunState, TaskInstanceState
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.session import Session
logger = logging.getLogger(__name__)
# Tag to disable alert for a dag.
NO_ALERT_TAG = "no alert"
def _is_alert_enabled(dag_id: str, session: Session) -> bool:
"""Return True if the alert is enabled for the given dag id."""
dag_tags: List[DagTag] = (
session.query(DagModel)
.options(joinedload(DagModel.tags, innerjoin=False))
.filter(DagModel.dag_id == dag_id)
.first()
.tags
)
for tag in dag_tags:
if tag.name.lower() == NO_ALERT_TAG:
return False
return True
def notify_slack_on_dag(dag_run: DagRun, msg: str, session: Session) -> None:
logger.info(
f"Sending notification to slack channel... dag id {dag_run.dag_id} dag state {dag_run.state}"
)
now = datetime.utcnow()
dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
if not _is_alert_enabled(dag_run.dag_id, session):
logger.info(
"Alert is disabled for the dag. "
"Skip sending notification to slack channel."
)
return
# For some reason, variable is not accessible in scheduler.
# So, we are using session to get the variable.
variable = session.query(Variable).filter(Variable.key == "GCP_PROJECT_ID").first()
project_id = variable.get_val()
failed_tasks = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_run == dag_run,
TaskInstance.state == TaskInstanceState.FAILED,
)
.all()
)
failed_tasks_str = "\n".join(
f"{task.task_id} - {task.log_url}" for task in failed_tasks
)
request_variables = {
"notificationType": "Airflow Task Fail",
"projectId": project_id,
"processName": dag_run.dag_id,
"timeStamp": dt_string,
"errorMessage": msg + "\n" + failed_tasks_str,
}
variable = (
session.query(Variable)
.filter(Variable.key == "url_notifications_to_slack_channels")
.first()
)
url = variable.get_val()
response = requests.post(
url=url,
json=request_variables,
)
logger.info(
f"Response from slack channel is {response.status_code} {response.text}"
)
class NotifyOnFailurePlugin(AirflowPlugin):
"""Airflow plugin to notify channels on DagRun failure.
For more information, see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html
"""
class Listener:
@hookimpl
def on_dag_run_failed(
self,
dag_run: DagRun,
msg: str,
) -> None:
if dag_run.state != DagRunState.FAILED:
return
try:
session = settings.Session()
notify_slack_on_dag(dag_run, msg, session)
except Exception as e:
logger.error(f"Error in on_dag_run_failed: {e}")
finally:
session.close()
# Name of the plugin.
name = "NotifyOnFailurePlugin"
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python classes or modules.
listeners = [Listener()]
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Not sure we can invistigate without reproduce example
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.
Hi @vilozio Did you manage to resolve this issue?