airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Run unit tests with airflow installed from packages

Open potiuk opened this issue 1 year ago • 15 comments

This PR adds the option of running unit tests for providers against a specific airflow version (for example released version in PyPI) and enables it for back-compatibility testing for 2.9.1. In the future it could be used to run forward-compatibility testing with Airflow 3 as well.


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

potiuk avatar May 09 '24 07:05 potiuk

As explained in the devlist - This PR - specifically this job https://github.com/apache/airflow/actions/runs/9023444361/job/24795410794?pr=39513 runs Provider unit tests against Airflow 2.9.1 installed from PyPI. I will add a few comments and tag relevant people in the places where I need help to understand if my changes are ok or not as we need to add compatibility layer for our tests to make it works for 2.9.1 (and later we should do the same for 2.8 and 2.7).

potiuk avatar May 09 '24 21:05 potiuk

I will extract breeze changes to separate PRs to make this one easier to review (it will only be about running the tests and necessary changes in tests compatibility after I extract the breeze changes ).

potiuk avatar May 10 '24 05:05 potiuk

The breeze changes extracted to #https://github.com/apache/airflow/pull/39535 and https://github.com/apache/airflow/pull/39536 - once we merge those and I rebase, it should get way simpler.

potiuk avatar May 10 '24 06:05 potiuk

cc: @ashb @kaxil -> per our discussion - there are still >~10 tests to fix + review of those changes I've already applied to make Provider's test suite compatible with 2.9.1.

This is what I told you will be the "difficculty" with making the provider's test suite of ours work with past versions of Airflow.

For 2.9 it's not a lot (but sill an effort to make and it will make our test code slightly more complicated because it will have to handle all the compatibility issues. What you see in this PR in "tests" folder are the changes needed so far.

This is just after ~ month of refactors and changes after we split-off main for 2.10.

I guess for 2.8 and 2.7 there will be quite a bit more places where our tests make use of some internals of airflow - for assertions, mocking, imports and the like. Let's get the 2.9.1 change merged first and I will attempt to do 2.8 and 2.7 after to see how far we can go to not clutter our Provider's test suite too much with compatiblity code.

potiuk avatar May 10 '24 08:05 potiuk

OK. It's much smaller now after splitting out breeze changes and rebasing - looking forward to more comments/ help now.

potiuk avatar May 10 '24 09:05 potiuk

The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where airflow 2.9.1 is installed from package and tests are used from the "tests" folder.

Some of the serialized objects miss __var field that is expected when then the objects get deserialized.

The first case (and I already added comment about it above https://github.com/apache/airflow/pull/39513#discussion_r1596001681 is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources.

  • For what it's worth when serialized data is used in Airflow 2.10. The serialized form is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]

But when airflow 2.9.1 is installed, it is:

operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]

But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from airflow

For example:

    def test_operator_extra_links_mapped_without_applicationui_enabled(
        self,
    ):
        operator = EmrServerlessStartJobOperator.partial(
            task_id=task_id,
            application_id=application_id,
            execution_role_arn=execution_role_arn,
            job_driver=spark_job_driver,
            enable_application_ui_links=False,
        ).expand(
            configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides],
        )

        ser_operator = BaseSerialization.serialize(operator)
        deser_operator = BaseSerialization.deserialize(ser_operator)  # <-- HERE deserialization fails

        assert deser_operator.operator_extra_links == [
            EmrServerlessS3LogsLink(),
            EmrServerlessCloudWatchLogsLink(),
        ]

The failure is

    @classmethod
    def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
        """
        Deserialize an object; helper function of depth first search for deserialization.
    
        :meta private:
        """
        # JSON primitives (except for dict) are not encoded.
        if use_pydantic_models and not _ENABLE_AIP_44:
            raise RuntimeError(
                "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
                "This parameter will be removed eventually when new serialization is used by AIP-44"
            )
        if cls._is_primitive(encoded_var):
            return encoded_var
        elif isinstance(encoded_var, list):
            return [cls.deserialize(v, use_pydantic_models) for v in encoded_var]
    
        if not isinstance(encoded_var, dict):
            raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}")
>       var = encoded_var[Encoding.VAR]
E       KeyError: <Encoding.VAR: '__var'>

And when I dump the serialized object, indeed it does not have Encoding.var key at top level:

{'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.aws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True}

Those are all the dict keys:

dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs', 
'_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color', 
'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped']

And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless - for some reason - some part of serialization would be inlined in provider tests ??? . But I am really puzzled and a bit blocked with it.

@bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle.

potiuk avatar May 10 '24 10:05 potiuk

Ok. We are down to 4 errors. The remaining issues are:

  • serialization __var issue
  • executor compatibility
  • openlineage enablement/configuration
  • try_num impact

potiuk avatar May 10 '24 13:05 potiuk

The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where airflow 2.9.1 is installed from package and tests are used from the "tests" folder.

Some of the serialized objects miss __var field that is expected when then the objects get deserialized.

The first case (and I already added comment about it above #39513 (comment) is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources.

  • For what it's worth when serialized data is used in Airflow 2.10. The serialized form is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]

But when airflow 2.9.1 is installed, it is:

operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]

But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from airflow

For example:

    def test_operator_extra_links_mapped_without_applicationui_enabled(
        self,
    ):
        operator = EmrServerlessStartJobOperator.partial(
            task_id=task_id,
            application_id=application_id,
            execution_role_arn=execution_role_arn,
            job_driver=spark_job_driver,
            enable_application_ui_links=False,
        ).expand(
            configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides],
        )

        ser_operator = BaseSerialization.serialize(operator)
        deser_operator = BaseSerialization.deserialize(ser_operator)  # <-- HERE deserialization fails

        assert deser_operator.operator_extra_links == [
            EmrServerlessS3LogsLink(),
            EmrServerlessCloudWatchLogsLink(),
        ]

The failure is

    @classmethod
    def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
        """
        Deserialize an object; helper function of depth first search for deserialization.
    
        :meta private:
        """
        # JSON primitives (except for dict) are not encoded.
        if use_pydantic_models and not _ENABLE_AIP_44:
            raise RuntimeError(
                "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
                "This parameter will be removed eventually when new serialization is used by AIP-44"
            )
        if cls._is_primitive(encoded_var):
            return encoded_var
        elif isinstance(encoded_var, list):
            return [cls.deserialize(v, use_pydantic_models) for v in encoded_var]
    
        if not isinstance(encoded_var, dict):
            raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}")
>       var = encoded_var[Encoding.VAR]
E       KeyError: <Encoding.VAR: '__var'>

And when I dump the serialized object, indeed it does not have Encoding.var key at top level:

{'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.aws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True}

Those are all the dict keys:

dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs', 
'_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color', 
'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped']

And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless - for some reason - some part of serialization would be inlined in provider tests ??? . But I am really puzzled and a bit blocked with it.

@bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle.

I'm not extremely familiar with this code @potiuk . I recognize the issue if I recall correctly and it's one of the reasons for me to work on the "other" serialization.

I don't mind looking into it but I'm AFK at the moment.

bolkedebruin avatar May 10 '24 15:05 bolkedebruin

OK. I removed it out of DRAFT state.

Looks like this one should get green - which means that we can run full test suite against Airflow 2.9.1 - but we will need to get some responses on the conversations I started here - as I am not sure if the way compatibility is implemented is the right call.

potiuk avatar May 11 '24 20:05 potiuk

Yep. All green. So now just waiting for comments / approvals.

potiuk avatar May 11 '24 21:05 potiuk

The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where airflow 2.9.1 is installed from package and tests are used from the "tests" folder.

Some of the serialized objects miss __var field that is expected when then the objects get deserialized.

The first case (and I already added comment about it above #39513 (comment) is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources.

* For what it's worth when serialized  data is used in Airflow 2.10. The serialized form is:
operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]

But when airflow 2.9.1 is installed, it is:

operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]

But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from airflow

For example:

    def test_operator_extra_links_mapped_without_applicationui_enabled(
        self,
    ):
        operator = EmrServerlessStartJobOperator.partial(
            task_id=task_id,
            application_id=application_id,
            execution_role_arn=execution_role_arn,
            job_driver=spark_job_driver,
            enable_application_ui_links=False,
        ).expand(
            configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides],
        )

        ser_operator = BaseSerialization.serialize(operator)
        deser_operator = BaseSerialization.deserialize(ser_operator)  # <-- HERE deserialization fails

        assert deser_operator.operator_extra_links == [
            EmrServerlessS3LogsLink(),
            EmrServerlessCloudWatchLogsLink(),
        ]

The failure is

    @classmethod
    def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
        """
        Deserialize an object; helper function of depth first search for deserialization.
    
        :meta private:
        """
        # JSON primitives (except for dict) are not encoded.
        if use_pydantic_models and not _ENABLE_AIP_44:
            raise RuntimeError(
                "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
                "This parameter will be removed eventually when new serialization is used by AIP-44"
            )
        if cls._is_primitive(encoded_var):
            return encoded_var
        elif isinstance(encoded_var, list):
            return [cls.deserialize(v, use_pydantic_models) for v in encoded_var]
    
        if not isinstance(encoded_var, dict):
            raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}")
>       var = encoded_var[Encoding.VAR]
E       KeyError: <Encoding.VAR: '__var'>

And when I dump the serialized object, indeed it does not have Encoding.var key at top level:

{'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.aws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True}

Those are all the dict keys:

dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs', 
'_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color', 
'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped']

And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless - for some reason - some part of serialization would be inlined in provider tests ??? . But I am really puzzled and a bit blocked with it.

@bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle.

I did encounter this kind of failure during https://github.com/apache/airflow/pull/38674. But once I fixed everything else, these error no longer happened 🤔

Lee-W avatar May 13 '24 06:05 Lee-W

I did encounter this kind of failure during https://github.com/apache/airflow/pull/38674. But once I fixed everything else, these error no longer happened 🤔

Yeah. I already know what's going on (and it's fixed). It's a side-effect of https://github.com/apache/airflow/pull/38570 that changes the way how operators are serialized - and it basically means that we will have to force reserialization of all serialized DB data for DAGs/Triggers when we migrate to 2.10.0 -> cc: @dstandish

potiuk avatar May 13 '24 06:05 potiuk

All good and green here :)

potiuk avatar May 13 '24 16:05 potiuk

image

potiuk avatar May 13 '24 16:05 potiuk

I'd love to get that one reviewed/merged. I already started working on Suport for Airlfow 2.8 in https://github.com/apache/airflow/pull/39606 - basic changes to enable tests to run 2.8 are implemented and vast majority of the tests is green there (we have 40-50 tests to fix there).

But it would have been a bit easier after that one is merged.

potiuk avatar May 14 '24 07:05 potiuk

Applied comments and back in green zone after fixing the side-effect from executors.

potiuk avatar May 14 '24 12:05 potiuk

Rebased after some recent additions - still green :). Looking for an approval :)

potiuk avatar May 15 '24 09:05 potiuk

Comments resolved / nits merged :)

potiuk avatar May 15 '24 19:05 potiuk

All conversations resolved and answers found :). ... I think we are good to go.

potiuk avatar May 15 '24 19:05 potiuk

Finaly :). RIGHT before a meetup where I will talk about CI/CD tests :)

potiuk avatar May 15 '24 20:05 potiuk