airflow icon indicating copy to clipboard operation
airflow copied to clipboard

hookimpl on_dag_run_failed goes to infinite loop

Open vilozio opened this issue 10 months ago • 1 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3

What happened?

We have our own small Airflow plugin to notify a slack channel when a DAG run fails. We use the hookimpl Listeners. We had twice a strange issue when the plugin has gone to infinite loop and spammed the channel with lots of messages. We have to remove the plugin and restart Airflow instances to stop it. We didn't see any anomalies in logs except that scheduler detected a zombie job. So we think that something may be wrong with the way how we wrote the plugin, or with Airflow.

What you think should happen instead?

No response

How to reproduce

Unfortunately we don't know how to reproduce it, but this issue happened two times.

Operating System

Ubuntu 20.04.6 LTS

Versions of Apache Airflow Providers

No response

Deployment

Google Cloud Composer

Deployment details

Composer version: composer-2.5.1-airflow-2.6.3 Number of schedulers: 2

Anything else?

This is the log row that appeared near the time when the spam started, for this exact dag id.

Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/myfolder/dag_player_value_model.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'player_value_model', 'Task Id': 'main', 'Run Id': 'scheduled__2024-04-19T08:00:00+00:00', 'Hostname': 'airflow-worker-48ssg', 'External Executor Id': '6c81b467-3ba0-430f-9fe1-fe48f3aca8a1'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f4050d6e760>, 'is_failure_callback': True}

This is our plugin code

PLUGIN CODE
"""Airflow plugin to notify channels on DagRun failure."""

import logging
from datetime import datetime
from typing import List

import requests
from airflow import settings
from airflow.listeners import hookimpl
from airflow.models import DagModel, DagRun, DagTag, TaskInstance, Variable
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import DagRunState, TaskInstanceState
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.session import Session

logger = logging.getLogger(__name__)


# Tag to disable alert for a dag.
NO_ALERT_TAG = "no alert"


def _is_alert_enabled(dag_id: str, session: Session) -> bool:
    """Return True if the alert is enabled for the given dag id."""
    dag_tags: List[DagTag] = (
        session.query(DagModel)
        .options(joinedload(DagModel.tags, innerjoin=False))
        .filter(DagModel.dag_id == dag_id)
        .first()
        .tags
    )
    for tag in dag_tags:
        if tag.name.lower() == NO_ALERT_TAG:
            return False
    return True


def notify_slack_on_dag(dag_run: DagRun, msg: str, session: Session) -> None:
    logger.info(
        f"Sending notification to slack channel... dag id {dag_run.dag_id} dag state {dag_run.state}"
    )
    now = datetime.utcnow()
    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")

    if not _is_alert_enabled(dag_run.dag_id, session):
        logger.info(
            "Alert is disabled for the dag. "
            "Skip sending notification to slack channel."
        )
        return
    # For some reason, variable is not accessible in scheduler.
    # So, we are using session to get the variable.
    variable = session.query(Variable).filter(Variable.key == "GCP_PROJECT_ID").first()
    project_id = variable.get_val()

    failed_tasks = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_run == dag_run,
            TaskInstance.state == TaskInstanceState.FAILED,
        )
        .all()
    )

    failed_tasks_str = "\n".join(
        f"{task.task_id} - {task.log_url}" for task in failed_tasks
    )

    request_variables = {
        "notificationType": "Airflow Task Fail",
        "projectId": project_id,
        "processName": dag_run.dag_id,
        "timeStamp": dt_string,
        "errorMessage": msg + "\n" + failed_tasks_str,
    }
    variable = (
        session.query(Variable)
        .filter(Variable.key == "url_notifications_to_slack_channels")
        .first()
    )
    url = variable.get_val()
    response = requests.post(
        url=url,
        json=request_variables,
    )
    logger.info(
        f"Response from slack channel is {response.status_code} {response.text}"
    )


class NotifyOnFailurePlugin(AirflowPlugin):
    """Airflow plugin to notify channels on DagRun failure.

    For more information, see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html
    """

    class Listener:
        @hookimpl
        def on_dag_run_failed(
            self,
            dag_run: DagRun,
            msg: str,
        ) -> None:
            if dag_run.state != DagRunState.FAILED:
                return
            try:
                session = settings.Session()
                notify_slack_on_dag(dag_run, msg, session)
            except Exception as e:
                logger.error(f"Error in on_dag_run_failed: {e}")
            finally:
                session.close()

    # Name of the plugin.
    name = "NotifyOnFailurePlugin"
    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python classes or modules.
    listeners = [Listener()]

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

vilozio avatar Apr 20 '24 12:04 vilozio

Not sure we can invistigate without reproduce example

eladkal avatar May 07 '24 05:05 eladkal

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Jun 23 '24 00:06 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Jul 03 '24 00:07 github-actions[bot]

Hi @vilozio Did you manage to resolve this issue?

pawsok avatar Sep 26 '24 08:09 pawsok