airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Airflow assume task context is serialized with Pydantic models

Open wolfier opened this issue 1 year ago • 3 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.2

What happened?

The triggerer terminates when attempting to deserialize the task context dictionary.

[2024-09-25T13:25:57.492-0500] {triggerer_job_runner.py:338} INFO - Starting the triggerer
[2024-09-25T13:25:57.629-0500] {triggerer_job_runner.py:348} ERROR - Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 346, in _execute
    self._run_trigger_loop()
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 374, in _run_trigger_loop
    self.load_triggers()
  File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 400, in load_triggers
    self.trigger_runner.update_triggers(set(ids))
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 719, in update_triggers
    new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
                                           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 94, in kwargs
    return self._decrypt_kwargs(self.encrypted_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 130, in _decrypt_kwargs
    return BaseSerialization.deserialize(decrypted_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 831, in deserialize
    return {k: cls.deserialize(v, use_pydantic_models) for k, v in var.items()}
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 821, in deserialize
    d[k] = cls.deserialize(v, use_pydantic_models=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 803, in deserialize
    raise RuntimeError(
RuntimeError: Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. This parameter will be removed eventually when new serialization is used by AIP-44

The issue started appearing when I upgraded from Airflow 2.9.3 to Airflow 2.10.2.

Do note that I have a custom trigger where I am serializing the task context.

class CustomTrigger(BaseTrigger):
...
    def serialize(self) -> tuple[str, dict[str, Any]]:
        """
        Serializes the trigger's arguments and classpath.

        :return: A tuple containing the classpath and a dictionary of arguments.
        """
        return (
            "CustomTrigger",
            {
                "context": self.context,
            },
        )

What you think should happen instead?

I believe the deserialize operation should not be forcing use_pydantic_models to be true.

Instead, it should be using the value passed as a parameter.

                d[k] = cls.deserialize(v, use_pydantic_models)

Also, when the task context is being serialized, it is respecting the value passed to the serialized function.

How to reproduce

  1. Create a custom Trigger that serializes the task context
  2. Create a deferrable operator that uses the custom trigger
  3. Run the task

Operating System

n/a

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

wolfier avatar Sep 25 '24 21:09 wolfier

Currently context is not supported in trigger's, if you need any values from the context object , extract and send as key-->values.

gopidesupavan avatar Oct 03 '24 14:10 gopidesupavan

Yeah. We might want to handle it better - so better error message should be printed in this case. Marked it as good-first-issue.

potiuk avatar Oct 04 '24 01:10 potiuk

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Oct 19 '24 00:10 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Oct 26 '24 00:10 github-actions[bot]