great_expectations
great_expectations copied to clipboard
Performance degradation using GE v3 API with PySpark on Cluster EMR
Describe the bug
Passing from GE v2 API to GE v3 API, called with SparkEngine and run on Cluster EMR, I noticed performance degradation.
With GE v2 API I executed the command context.run_validation_operator()
and with GE v3 API I execute the command context.run_checkpoint()
.
I am still using Spark Engine, but with GE v3 I started to use RuntimeBatchRequest
.
The same suite, on the same dataset, on GE v3 API took between 10% and 20% more time than run it with GE v2 API.
Unfortunately I did not find specific documentation (and examples) on how to use GE v3 API with pySpark on Cluster EMR, so I replicated what I did with GE v2. With GE v2 I didn't have any problem. But when I use GE v3, even if I increase the number of instances of the cluster, the computation time doesn’t change too much.
To Reproduce
GE V2 API validation code
# Context
data_context_config = DataContextConfig(
plugins_directory=None,
config_variables_file_path=None,
datasources={
"spark_ds": {
"data_asset_type": {
"class_name": "SparkDFDataset",
"module_name": "great_expectations.dataset",
},
"class_name": "SparkDFDatasource",
"module_name": "great_expectations.datasource",
"batch_kwargs_generators": {},
}
},
stores={
"expectations_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": "/home/jovyan/work/expectation_suites",
}
},
"validations_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": "/home/jovyan/work/validations",
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
},
expectations_store_name="expectations_store",
validations_store_name="validations_store",
evaluation_parameter_store_name="evaluation_parameter_store",
data_docs_sites={
"site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": "/home/jovyan/work/site",
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": False,
"show_how_to_buttons": False,
},
}
},
validation_operators={
"action_list_operator": {
"class_name": "ActionListValidationOperator",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
}
},
anonymous_usage_statistics={
"enabled": False
}
)
context = BaseDataContext(project_config=data_context_config)
# Validation
batch_kwargs = {'dataset': df, # SparkDF
'datasource': datasource_name,
'data_asset_name': data_asset_name
}
batch = context.get_batch(batch_kwargs, expectation_suite_name)
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch],
run_id=run_id
)
GE V3 API validation code
# Context
data_context_config_test = DataContextConfig(
plugins_directory="/home/jovyan/work/data_quality/plugins",
config_variables_file_path=None,
datasources={
"my_filesystem_datasource": {
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
'execution_engine': {
'module_name': 'great_expectations.execution_engine',
'class_name': 'SparkDFExecutionEngine'
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["batch_id"],
},
},
}
},
stores={
"expectations_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": "/home/jovyan/work/expectation_suites",
}
},
"validations_S3_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": "/home/jovyan/work/validations",
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
},
expectations_store_name="expectations_store",
validations_store_name="validations_S3_store",
evaluation_parameter_store_name="evaluation_parameter_store",
data_docs_sites={
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": "/home/jovyan/work/site",
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": False,
},
}
},
anonymous_usage_statistics={
"enabled": False
}
)
context = BaseDataContext(project_config=data_context_config_test)
# Validation
batch_request = {
"datasource_name":"datasource",
"data_connector_name": "data_connector_name",
"data_asset_name":"data_asset_name",
"runtime_parameters": {"batch_data": df}, # SparkDF
"batch_identifiers":{"batch_id": "batch_id"}
}
validations = []
validations.append(
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name
}
)
checkpoint = SimpleCheckpoint(
name="checkpoint",
data_context=context,
class_name="Checkpoint",
validations=validations
)
checkpoint_result = checkpoint.run(
run_id=run_id,
run_name_template="%Y%m%d_%H%M%S",
)
Expected behavior
If you run those 2 validations on a cluster EMR, using the same dataset and the same expectation suite, GE v3 API will take more time to complete the validation run.
Environment (please complete the following information):
- EMR version emr-5.28.0
- Great Expectations Version: 0.14.0
Hi @davideromano thanks for bringing this to our attention. We will review internally and be in touch, hopefully within a few weeks.
Hi @kenwade4 , any news about this issue? Thanks
Hey @davideromano ! We have eyes on this issue and will be investigating further in the near future.
Hey @davideromano ! We have eyes on this issue and will be investigating further in the near future.
Is there an update on this?