fixes serialize dag problem when `TriggerDagRunOperator` has XComArg value for `trigger_dag_id` argument
Description
This pull request makes this example DAG work. This DAG uses the XComArg (in other words, .output) from the first task as the input for the trigger_dag_id argument in the TriggerDagRunOperator.
# example dag
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="trigger_dag",
description="Tests triggering a dag",
catchup=False,
) as dag:
get_dag_id = PythonOperator(
task_id="get_dag_id",
python_callable=lambda: "other_dag",
)
trigger_dag = TriggerDagRunOperator(
task_id="trigger_other_dag",
trigger_dag_id=get_dag_id.output,
)
get_dag_id >> trigger_dag
Open Questions
I left the or statement in the get_link function of the TriggerDagRunLink so that past operator links would still work (ones that were run before this change). However, if you click the operator link too fast, before the self.trigger_dag_id is pushed to the XCom, it will use the second value in the or statement - which will result in an error message.
Here is the code piece I'm referencing: https://github.com/nyoungstudios/airflow/blob/4ea12c7283000ad5ae1b736c67fd1d2ed9f747ef/airflow/operators/trigger_dagrun.py#L75-L86
This is the error message if you click too fast.
I wrote a workaround in my last commit so that we do not see this error message, but I am not much of a fan of how it is implemented. If you have any better ideas on how to do this, let me know.
Testing
Tested with both Postgres and SQLite databases.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
I fixed up the failing checks from when I first opened this pr. And I also tried my hand at removing the error message as I described in my description.
@uranusjr it has been a while since your last review, can you take another look? I have addressed your comments and resolved the merge conflicts
I'm also running into this issue, and am happy to take over the pull request if there's any other additional code changes that are needed! cc @uranusjr @nyoungstudios
@TobyMellor Thanks for reaching out. My main problem is just having to resolve merge conflicts ever so often. This change was initially proposed in May of this year.
The current status is that I need to resolve the merge conflicts. I have done so on this separate branch https://github.com/apache/airflow/compare/main...nyoungstudios:airflow:fix/serialize-trigger-dag-run-has-xcom-arg-new-version
however, the now the link in the TriggerDagRunOperator doesn't work anymore. I'm getting this error in the scheduler:
airflow.exceptions.SerializationError: Failed to serialize DAG 'trigger_dag': '<' not supported between instances of 'str' and 'PlainXComArg'
I've yet to debug this problem, but I think it needs a change in the DagDependency class. Anyway, I'll try to eventually get around to fixing this again. But you are also free to submit a pr against my branch if you have a fix. I'm online on GitHub on all weekdays, so I can review it if you have a fix.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
This pr isn't working since merging in the main branch from when the main branch was switched to Airflow 3.X.
This is the current error I am receiving in the scheduler:
--- Last chance exception handler ---
Traceback (most recent call last):
File "/path/to/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 262, in _fork_main
target()
File "/path/to/airflow/airflow/dag_processing/processor.py", line 69, in _parse_file_entrypoint
comms_decoder.send_request(log, result)
File "/path/to/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py", line 382, in send_request
encoded_msg = msg.model_dump_json().encode() + b"\n"
File "/path/to/apache-airflow/lib/python3.10/site-packages/pydantic/main.py", line 477, in model_dump_json
return self.__pydantic_serializer__.to_json(
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'airflow.models.xcom_arg.PlainXComArg'>
And this is the error in the UI: (This example dag is the sample in the original pr description)
Broken DAG: [/path/to/airflow/dags/example_dag.py]
Traceback (most recent call last):
File "/path/to/airflow/airflow/serialization/serialized_objects.py", line 1615, in serialize_dag
serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)]
File "<string>", line 4, in __lt__
TypeError: '<' not supported between instances of 'str' and 'PlainXComArg'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/path/to/airflow/airflow/serialization/serialized_objects.py", line 1717, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "/path/to/airflow/airflow/serialization/serialized_objects.py", line 1631, in serialize_dag
raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG 'trigger_dag': '<' not supported between instances of 'str' and 'PlainXComArg'
I haven't been able to track down where this serialization problem is occurring yet, but if anyone has any suggestions - that would be appreciated!
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
:(