Run unit tests with airflow installed from packages
This PR adds the option of running unit tests for providers against a specific airflow version (for example released version in PyPI) and enables it for back-compatibility testing for 2.9.1. In the future it could be used to run forward-compatibility testing with Airflow 3 as well.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
As explained in the devlist - This PR - specifically this job https://github.com/apache/airflow/actions/runs/9023444361/job/24795410794?pr=39513 runs Provider unit tests against Airflow 2.9.1 installed from PyPI. I will add a few comments and tag relevant people in the places where I need help to understand if my changes are ok or not as we need to add compatibility layer for our tests to make it works for 2.9.1 (and later we should do the same for 2.8 and 2.7).
I will extract breeze changes to separate PRs to make this one easier to review (it will only be about running the tests and necessary changes in tests compatibility after I extract the breeze changes ).
The breeze changes extracted to #https://github.com/apache/airflow/pull/39535 and https://github.com/apache/airflow/pull/39536 - once we merge those and I rebase, it should get way simpler.
cc: @ashb @kaxil -> per our discussion - there are still >~10 tests to fix + review of those changes I've already applied to make Provider's test suite compatible with 2.9.1.
This is what I told you will be the "difficculty" with making the provider's test suite of ours work with past versions of Airflow.
For 2.9 it's not a lot (but sill an effort to make and it will make our test code slightly more complicated because it will have to handle all the compatibility issues. What you see in this PR in "tests" folder are the changes needed so far.
This is just after ~ month of refactors and changes after we split-off main for 2.10.
I guess for 2.8 and 2.7 there will be quite a bit more places where our tests make use of some internals of airflow - for assertions, mocking, imports and the like. Let's get the 2.9.1 change merged first and I will attempt to do 2.8 and 2.7 after to see how far we can go to not clutter our Provider's test suite too much with compatiblity code.
OK. It's much smaller now after splitting out breeze changes and rebasing - looking forward to more comments/ help now.
The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where airflow 2.9.1 is installed from package and tests are used from the "tests" folder.
Some of the serialized objects miss __var field that is expected when then the objects get deserialized.
The first case (and I already added comment about it above https://github.com/apache/airflow/pull/39513#discussion_r1596001681 is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources.
- For what it's worth when serialized data is used in Airflow 2.10. The serialized form is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
But when airflow 2.9.1 is installed, it is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]
But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from airflow
For example:
def test_operator_extra_links_mapped_without_applicationui_enabled(
self,
):
operator = EmrServerlessStartJobOperator.partial(
task_id=task_id,
application_id=application_id,
execution_role_arn=execution_role_arn,
job_driver=spark_job_driver,
enable_application_ui_links=False,
).expand(
configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides],
)
ser_operator = BaseSerialization.serialize(operator)
deser_operator = BaseSerialization.deserialize(ser_operator) # <-- HERE deserialization fails
assert deser_operator.operator_extra_links == [
EmrServerlessS3LogsLink(),
EmrServerlessCloudWatchLogsLink(),
]
The failure is
@classmethod
def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
"""
Deserialize an object; helper function of depth first search for deserialization.
:meta private:
"""
# JSON primitives (except for dict) are not encoded.
if use_pydantic_models and not _ENABLE_AIP_44:
raise RuntimeError(
"Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
"This parameter will be removed eventually when new serialization is used by AIP-44"
)
if cls._is_primitive(encoded_var):
return encoded_var
elif isinstance(encoded_var, list):
return [cls.deserialize(v, use_pydantic_models) for v in encoded_var]
if not isinstance(encoded_var, dict):
raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}")
> var = encoded_var[Encoding.VAR]
E KeyError: <Encoding.VAR: '__var'>
And when I dump the serialized object, indeed it does not have Encoding.var key at top level:
{'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.aws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True}
Those are all the dict keys:
dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs',
'_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color',
'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped']
And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless - for some reason - some part of serialization would be inlined in provider tests ??? . But I am really puzzled and a bit blocked with it.
@bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle.
Ok. We are down to 4 errors. The remaining issues are:
- serialization
__varissue - executor compatibility
- openlineage enablement/configuration
- try_num impact
The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where
airflow 2.9.1is installed from package and tests are used from the "tests" folder.Some of the serialized objects miss
__varfield that is expected when then the objects get deserialized.The first case (and I already added comment about it above #39513 (comment) is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources.
- For what it's worth when serialized data is used in Airflow 2.10. The serialized form is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]But when airflow 2.9.1 is installed, it is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from
airflowFor example:
def test_operator_extra_links_mapped_without_applicationui_enabled( self, ): operator = EmrServerlessStartJobOperator.partial( task_id=task_id, application_id=application_id, execution_role_arn=execution_role_arn, job_driver=spark_job_driver, enable_application_ui_links=False, ).expand( configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], ) ser_operator = BaseSerialization.serialize(operator) deser_operator = BaseSerialization.deserialize(ser_operator) # <-- HERE deserialization fails assert deser_operator.operator_extra_links == [ EmrServerlessS3LogsLink(), EmrServerlessCloudWatchLogsLink(), ]The failure is
@classmethod def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: """ Deserialize an object; helper function of depth first search for deserialization. :meta private: """ # JSON primitives (except for dict) are not encoded. if use_pydantic_models and not _ENABLE_AIP_44: raise RuntimeError( "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. " "This parameter will be removed eventually when new serialization is used by AIP-44" ) if cls._is_primitive(encoded_var): return encoded_var elif isinstance(encoded_var, list): return [cls.deserialize(v, use_pydantic_models) for v in encoded_var] if not isinstance(encoded_var, dict): raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}") > var = encoded_var[Encoding.VAR] E KeyError: <Encoding.VAR: '__var'>And when I dump the serialized object, indeed it does not have
Encoding.varkey at top level:{'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.aws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True}Those are all the dict keys:
dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs', '_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color', 'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped']And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless - for some reason - some part of serialization would be inlined in provider tests ??? . But I am really puzzled and a bit blocked with it.
@bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle.
I'm not extremely familiar with this code @potiuk . I recognize the issue if I recall correctly and it's one of the reasons for me to work on the "other" serialization.
I don't mind looking into it but I'm AFK at the moment.
OK. I removed it out of DRAFT state.
Looks like this one should get green - which means that we can run full test suite against Airflow 2.9.1 - but we will need to get some responses on the conversations I started here - as I am not sure if the way compatibility is implemented is the right call.
Yep. All green. So now just waiting for comments / approvals.
The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where
airflow 2.9.1is installed from package and tests are used from the "tests" folder.Some of the serialized objects miss
__varfield that is expected when then the objects get deserialized.The first case (and I already added comment about it above #39513 (comment) is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources.
* For what it's worth when serialized data is used in Airflow 2.10. The serialized form is:operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]But when airflow 2.9.1 is installed, it is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from
airflowFor example:
def test_operator_extra_links_mapped_without_applicationui_enabled( self, ): operator = EmrServerlessStartJobOperator.partial( task_id=task_id, application_id=application_id, execution_role_arn=execution_role_arn, job_driver=spark_job_driver, enable_application_ui_links=False, ).expand( configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], ) ser_operator = BaseSerialization.serialize(operator) deser_operator = BaseSerialization.deserialize(ser_operator) # <-- HERE deserialization fails assert deser_operator.operator_extra_links == [ EmrServerlessS3LogsLink(), EmrServerlessCloudWatchLogsLink(), ]The failure is
@classmethod def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: """ Deserialize an object; helper function of depth first search for deserialization. :meta private: """ # JSON primitives (except for dict) are not encoded. if use_pydantic_models and not _ENABLE_AIP_44: raise RuntimeError( "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. " "This parameter will be removed eventually when new serialization is used by AIP-44" ) if cls._is_primitive(encoded_var): return encoded_var elif isinstance(encoded_var, list): return [cls.deserialize(v, use_pydantic_models) for v in encoded_var] if not isinstance(encoded_var, dict): raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}") > var = encoded_var[Encoding.VAR] E KeyError: <Encoding.VAR: '__var'>And when I dump the serialized object, indeed it does not have
Encoding.varkey at top level:{'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.aws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True}Those are all the dict keys:
dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs', '_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color', 'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped']And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless - for some reason - some part of serialization would be inlined in provider tests ??? . But I am really puzzled and a bit blocked with it.
@bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle.
I did encounter this kind of failure during https://github.com/apache/airflow/pull/38674. But once I fixed everything else, these error no longer happened 🤔
I did encounter this kind of failure during https://github.com/apache/airflow/pull/38674. But once I fixed everything else, these error no longer happened 🤔
Yeah. I already know what's going on (and it's fixed). It's a side-effect of https://github.com/apache/airflow/pull/38570 that changes the way how operators are serialized - and it basically means that we will have to force reserialization of all serialized DB data for DAGs/Triggers when we migrate to 2.10.0 -> cc: @dstandish
All good and green here :)
I'd love to get that one reviewed/merged. I already started working on Suport for Airlfow 2.8 in https://github.com/apache/airflow/pull/39606 - basic changes to enable tests to run 2.8 are implemented and vast majority of the tests is green there (we have 40-50 tests to fix there).
But it would have been a bit easier after that one is merged.
Applied comments and back in green zone after fixing the side-effect from executors.
Rebased after some recent additions - still green :). Looking for an approval :)
Comments resolved / nits merged :)
All conversations resolved and answers found :). ... I think we are good to go.
Finaly :). RIGHT before a meetup where I will talk about CI/CD tests :)