airflow
airflow copied to clipboard
@hookimpl on_dag_run_running, on_dag_run_success, on_dag_run_failed do not find Connections and Variables
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.8.1
What happened?
I wrote a listener plugin and added it to the plugins directory. The listener includes the methods:
- on_task_instance_running
- on_task_instance_success
- on_task_instance_failed
- on_dag_run_running
- on_dag_run_success
- on_dag_run_failed. When trying to extract values from Variables and Connections in the on_dag_run_running, on_dag_run_success, on_dag_run_failed methods, an error occurs already when running dag run:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 52, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 79, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 393, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 422, in execute_job
ret = execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 855, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 987, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1063, in _do_scheduling
self._start_queued_dagruns(session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1404, in _start_queued_dagruns
dag_run.notify_dagrun_state_changed()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 862, in notify_dagrun_state_changed
get_listener_manager().hook.on_dag_run_running(dag_run=self, msg=msg)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_hooks.py", line 493, in __call__
return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_manager.py", line 115, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 113, in _multicall
raise exception.with_traceback(exception.__traceback__)
File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 77, in _multicall
res = hook_impl.function(*args)
File "/opt/airflow/plugins/metadata/airflow_metadata.py", line 206, in on_dag_run_running
my_connection = BaseHook.get_connection("my_connection")
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/hooks/base.py", line 82, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/connection.py", line 479, in get_connection_from_secrets
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `my_connection` isn't defined
Although the connection is explicitly defined in Connections and everything works in the case of listening to tasks.
How can I fix it so that if I listen to Dog Run, everything works too?
What you think should happen instead?
I expect the values to be pulled up from Connection and Variable when listening to DagRun.
How to reproduce
Add to Connection "my_connection" and add to Variable "environment"
from airflow.listeners import hookimpl
from airflow.models.taskinstance import TaskInstance
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
class AirflowListener:
@hookimpl
def on_task_instance_running(self, task_instance: TaskInstance) -> None:
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_task_instance_success(self, task_instance: TaskInstance) -> None:
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_task_instance_failed(self, task_instance: TaskInstance) -> None:
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_dag_run_running(self, dag_run: DagRun):
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_dag_run_success(self, dag_run: DagRun):
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
@hookimpl
def on_dag_run_failed(self, dag_run: DagRun):
my_connection = BaseHook.get_connection("my_connection")
env = Variable.get("environment")
class AirflowListenerPlugin(AirflowPlugin):
name = "AirflowListener"
listeners = [AirflowListener()]
Operating System
macOS Sonoma 14.1.2
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
apache/airflow:2.8.1-python3.9 executor: CeleryExecutor
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.