Airflow assume task context is serialized with Pydantic models
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.10.2
What happened?
The triggerer terminates when attempting to deserialize the task context dictionary.
[2024-09-25T13:25:57.492-0500] {triggerer_job_runner.py:338} INFO - Starting the triggerer
[2024-09-25T13:25:57.629-0500] {triggerer_job_runner.py:348} ERROR - Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 346, in _execute
self._run_trigger_loop()
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 374, in _run_trigger_loop
self.load_triggers()
File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 400, in load_triggers
self.trigger_runner.update_triggers(set(ids))
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 719, in update_triggers
new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 94, in kwargs
return self._decrypt_kwargs(self.encrypted_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 130, in _decrypt_kwargs
return BaseSerialization.deserialize(decrypted_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 831, in deserialize
return {k: cls.deserialize(v, use_pydantic_models) for k, v in var.items()}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 821, in deserialize
d[k] = cls.deserialize(v, use_pydantic_models=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 803, in deserialize
raise RuntimeError(
RuntimeError: Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. This parameter will be removed eventually when new serialization is used by AIP-44
The issue started appearing when I upgraded from Airflow 2.9.3 to Airflow 2.10.2.
Do note that I have a custom trigger where I am serializing the task context.
class CustomTrigger(BaseTrigger):
...
def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serializes the trigger's arguments and classpath.
:return: A tuple containing the classpath and a dictionary of arguments.
"""
return (
"CustomTrigger",
{
"context": self.context,
},
)
What you think should happen instead?
I believe the deserialize operation should not be forcing use_pydantic_models to be true.
Instead, it should be using the value passed as a parameter.
d[k] = cls.deserialize(v, use_pydantic_models)
Also, when the task context is being serialized, it is respecting the value passed to the serialized function.
How to reproduce
- Create a custom Trigger that serializes the task context
- Create a deferrable operator that uses the custom trigger
- Run the task
Operating System
n/a
Versions of Apache Airflow Providers
No response
Deployment
Astronomer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Currently context is not supported in trigger's, if you need any values from the context object , extract and send as key-->values.
Yeah. We might want to handle it better - so better error message should be printed in this case. Marked it as good-first-issue.
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.