great_expectations icon indicating copy to clipboard operation
great_expectations copied to clipboard

Performance degradation using GE v3 API with PySpark on Cluster EMR

Open davideromano opened this issue 2 years ago • 4 comments

Describe the bug

Passing from GE v2 API to GE v3 API, called with SparkEngine and run on Cluster EMR, I noticed performance degradation.

With GE v2 API I executed the command context.run_validation_operator() and with GE v3 API I execute the command context.run_checkpoint(). I am still using Spark Engine, but with GE v3 I started to use RuntimeBatchRequest . The same suite, on the same dataset, on GE v3 API took between 10% and 20% more time than run it with GE v2 API.

Unfortunately I did not find specific documentation (and examples) on how to use GE v3 API with pySpark on Cluster EMR, so I replicated what I did with GE v2. With GE v2 I didn't have any problem. But when I use GE v3, even if I increase the number of instances of the cluster, the computation time doesn’t change too much.

To Reproduce

GE V2 API validation code

# Context
data_context_config = DataContextConfig(
    plugins_directory=None,
    config_variables_file_path=None,
    datasources={
        "spark_ds": {
            "data_asset_type": {
                "class_name": "SparkDFDataset",
                "module_name": "great_expectations.dataset",
            },
            "class_name": "SparkDFDatasource",
            "module_name": "great_expectations.datasource",
            "batch_kwargs_generators": {},
        }
    },
    stores={
        "expectations_store": {
            "class_name": "ExpectationsStore",
            "store_backend": {
                "class_name": "TupleFilesystemStoreBackend",
                "base_directory": "/home/jovyan/work/expectation_suites",
            }
        },
        "validations_store": {
            "class_name": "ValidationsStore",
            "store_backend": {
                "class_name": "TupleFilesystemStoreBackend",
                "base_directory": "/home/jovyan/work/validations",
            },
        },
        "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
    },
    expectations_store_name="expectations_store",
    validations_store_name="validations_store",
    evaluation_parameter_store_name="evaluation_parameter_store",
    data_docs_sites={
        "site": {
            "class_name": "SiteBuilder",
            "store_backend": {
                "class_name": "TupleFilesystemStoreBackend",
                "base_directory": "/home/jovyan/work/site",
            },
            "site_index_builder": {
                "class_name": "DefaultSiteIndexBuilder",
                "show_cta_footer": False,
                "show_how_to_buttons": False,
            },
        }
    },
    validation_operators={
        "action_list_operator": {
            "class_name": "ActionListValidationOperator",
            "action_list": [
                {
                    "name": "store_validation_result",
                    "action": {"class_name": "StoreValidationResultAction"},
                },
                {
                    "name": "store_evaluation_params",
                    "action": {"class_name": "StoreEvaluationParametersAction"},
                },
                {
                    "name": "update_data_docs",
                    "action": {"class_name": "UpdateDataDocsAction"},
                },
            ],
        }
    },
    anonymous_usage_statistics={
      "enabled": False
    }
)

context = BaseDataContext(project_config=data_context_config)

# Validation
batch_kwargs = {'dataset': df,  # SparkDF
                'datasource': datasource_name,
                'data_asset_name': data_asset_name
}
batch = context.get_batch(batch_kwargs, expectation_suite_name)

results = context.run_validation_operator(
    "action_list_operator",
    assets_to_validate=[batch],
    run_id=run_id
)

GE V3 API validation code

# Context
data_context_config_test = DataContextConfig(
    plugins_directory="/home/jovyan/work/data_quality/plugins",
    config_variables_file_path=None,
    datasources={
        "my_filesystem_datasource": {
            "class_name": "Datasource",
            "module_name": "great_expectations.datasource",
            'execution_engine': {
                'module_name': 'great_expectations.execution_engine',
                'class_name': 'SparkDFExecutionEngine'
            },
            "data_connectors": {
            "default_runtime_data_connector_name": {
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": ["batch_id"],
            },
        },
        }
    },
    stores={
        "expectations_store": {
        "class_name": "ExpectationsStore",
        "store_backend": {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": "/home/jovyan/work/expectation_suites",
        }
    },
        "validations_S3_store": {
            "class_name": "ValidationsStore",
            "store_backend": {
                "class_name": "TupleFilesystemStoreBackend",
                "base_directory": "/home/jovyan/work/validations",
            },
        },
        "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
    },
    expectations_store_name="expectations_store",
    validations_store_name="validations_S3_store",
    evaluation_parameter_store_name="evaluation_parameter_store",
    data_docs_sites={
        "s3_site": {
            "class_name": "SiteBuilder",
            "store_backend": {
                "class_name": "TupleFilesystemStoreBackend",
                "base_directory": "/home/jovyan/work/site",
            },
            "site_index_builder": {
                "class_name": "DefaultSiteIndexBuilder",
                "show_cta_footer": False,
            },
        }
    },
    anonymous_usage_statistics={
      "enabled": False
    }
)

context = BaseDataContext(project_config=data_context_config_test)


# Validation
batch_request = {
    "datasource_name":"datasource",
    "data_connector_name": "data_connector_name",
    "data_asset_name":"data_asset_name",
    "runtime_parameters": {"batch_data": df},  # SparkDF
    "batch_identifiers":{"batch_id": "batch_id"}
}

validations = []
validations.append(
    {
        "batch_request": batch_request,
        "expectation_suite_name": expectation_suite_name
    }
)

checkpoint = SimpleCheckpoint(
    name="checkpoint",
    data_context=context,
    class_name="Checkpoint",
    validations=validations   
)

checkpoint_result = checkpoint.run(
    run_id=run_id,
    run_name_template="%Y%m%d_%H%M%S",
    
)

Expected behavior

If you run those 2 validations on a cluster EMR, using the same dataset and the same expectation suite, GE v3 API will take more time to complete the validation run.

Environment (please complete the following information):

  • EMR version emr-5.28.0
  • Great Expectations Version: 0.14.0

davideromano avatar Feb 02 '22 18:02 davideromano

Hi @davideromano thanks for bringing this to our attention. We will review internally and be in touch, hopefully within a few weeks.

kenwade4 avatar Feb 04 '22 16:02 kenwade4

Hi @kenwade4 , any news about this issue? Thanks

davideromano avatar Mar 15 '22 14:03 davideromano

Hey @davideromano ! We have eyes on this issue and will be investigating further in the near future.

austiezr avatar Aug 08 '22 17:08 austiezr

Hey @davideromano ! We have eyes on this issue and will be investigating further in the near future.

Is there an update on this?

sahilravinder avatar Nov 08 '23 22:11 sahilravinder