airflow icon indicating copy to clipboard operation
airflow copied to clipboard

fixes serialize dag problem when `TriggerDagRunOperator` has XComArg value for `trigger_dag_id` argument

Open nyoungstudios opened this issue 1 year ago • 2 comments

Description

This pull request makes this example DAG work. This DAG uses the XComArg (in other words, .output) from the first task as the input for the trigger_dag_id argument in the TriggerDagRunOperator.

# example dag
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
    dag_id="trigger_dag",
    description="Tests triggering a dag",
    catchup=False,
) as dag:
    
    get_dag_id = PythonOperator(
        task_id="get_dag_id",
        python_callable=lambda: "other_dag",
    )

    trigger_dag = TriggerDagRunOperator(
        task_id="trigger_other_dag",
        trigger_dag_id=get_dag_id.output,
    )
    
    get_dag_id >> trigger_dag

Open Questions

I left the or statement in the get_link function of the TriggerDagRunLink so that past operator links would still work (ones that were run before this change). However, if you click the operator link too fast, before the self.trigger_dag_id is pushed to the XCom, it will use the second value in the or statement - which will result in an error message.

Here is the code piece I'm referencing: https://github.com/nyoungstudios/airflow/blob/4ea12c7283000ad5ae1b736c67fd1d2ed9f747ef/airflow/operators/trigger_dagrun.py#L75-L86

This is the error message if you click too fast. Screenshot from 2024-05-22 17-20-28

I wrote a workaround in my last commit so that we do not see this error message, but I am not much of a fan of how it is implemented. If you have any better ideas on how to do this, let me know.

Testing

Tested with both Postgres and SQLite databases.


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

nyoungstudios avatar May 23 '24 01:05 nyoungstudios

I fixed up the failing checks from when I first opened this pr. And I also tried my hand at removing the error message as I described in my description.

nyoungstudios avatar May 24 '24 01:05 nyoungstudios

@uranusjr it has been a while since your last review, can you take another look? I have addressed your comments and resolved the merge conflicts

nyoungstudios avatar Oct 18 '24 06:10 nyoungstudios

I'm also running into this issue, and am happy to take over the pull request if there's any other additional code changes that are needed! cc @uranusjr @nyoungstudios

TobyMellor avatar Nov 23 '24 15:11 TobyMellor

@TobyMellor Thanks for reaching out. My main problem is just having to resolve merge conflicts ever so often. This change was initially proposed in May of this year.

The current status is that I need to resolve the merge conflicts. I have done so on this separate branch https://github.com/apache/airflow/compare/main...nyoungstudios:airflow:fix/serialize-trigger-dag-run-has-xcom-arg-new-version

however, the now the link in the TriggerDagRunOperator doesn't work anymore. I'm getting this error in the scheduler:

airflow.exceptions.SerializationError: Failed to serialize DAG 'trigger_dag': '<' not supported between instances of 'str' and 'PlainXComArg'

I've yet to debug this problem, but I think it needs a change in the DagDependency class. Anyway, I'll try to eventually get around to fixing this again. But you are also free to submit a pr against my branch if you have a fix. I'm online on GitHub on all weekdays, so I can review it if you have a fix.

nyoungstudios avatar Nov 27 '24 09:11 nyoungstudios

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jan 14 '25 00:01 github-actions[bot]

This pr isn't working since merging in the main branch from when the main branch was switched to Airflow 3.X.

This is the current error I am receiving in the scheduler:

--- Last chance exception handler ---
Traceback (most recent call last):
  File "/path/to/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 262, in _fork_main
    target()
  File "/path/to/airflow/airflow/dag_processing/processor.py", line 69, in _parse_file_entrypoint
    comms_decoder.send_request(log, result)
  File "/path/to/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py", line 382, in send_request
    encoded_msg = msg.model_dump_json().encode() + b"\n"
  File "/path/to/apache-airflow/lib/python3.10/site-packages/pydantic/main.py", line 477, in model_dump_json
    return self.__pydantic_serializer__.to_json(
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'airflow.models.xcom_arg.PlainXComArg'>

And this is the error in the UI: (This example dag is the sample in the original pr description)

Broken DAG: [/path/to/airflow/dags/example_dag.py] 
Traceback (most recent call last):
  File "/path/to/airflow/airflow/serialization/serialized_objects.py", line 1615, in serialize_dag
    serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)]
  File "<string>", line 4, in __lt__
TypeError: '<' not supported between instances of 'str' and 'PlainXComArg'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/path/to/airflow/airflow/serialization/serialized_objects.py", line 1717, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/path/to/airflow/airflow/serialization/serialized_objects.py", line 1631, in serialize_dag
    raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG 'trigger_dag': '<' not supported between instances of 'str' and 'PlainXComArg'

I haven't been able to track down where this serialization problem is occurring yet, but if anyone has any suggestions - that would be appreciated!

nyoungstudios avatar Jan 18 '25 05:01 nyoungstudios

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Mar 29 '25 00:03 github-actions[bot]

:(

nyoungstudios avatar Apr 04 '25 04:04 nyoungstudios