great_expectations
great_expectations copied to clipboard
`row_condition` with Spark not working as documented
Describe the bug
The documentation states that row conditions for Spark should be specified like this: row_condition='col("foo") == "Two Two"'
.
However, if I try it like this, I get this error:
MetricResolutionError: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;
Interestingly, it works like this:
row_condition='foo == "Two Two"'
To Reproduce great_expectations.yml:
config_version: 3.0
datasources: {}
config_variables_file_path: uncommitted/config_variables.yml
plugins_directory: plugins/
stores:
expectations_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: expectations/
validations_store:
class_name: ValidationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/validations/
evaluation_parameter_store:
class_name: EvaluationParameterStore
checkpoint_store:
class_name: CheckpointStore
store_backend:
class_name: TupleFilesystemStoreBackend
suppress_store_backend_id: true
base_directory: checkpoints/
profiler_store:
class_name: ProfilerStore
store_backend:
class_name: TupleFilesystemStoreBackend
suppress_store_backend_id: true
base_directory: profilers/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_store
data_docs_sites:
local_site:
class_name: SiteBuilder
show_how_to_buttons: true
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted\data_docs\local_site
site_index_builder:
class_name: DefaultSiteIndexBuilder
anonymous_usage_statistics:
data_context_id: 8185aa6f-6303-4897-8bb4-6e23b494e231
enabled: true
notebooks:
include_rendered_content:
globally: false
expectation_suite: false
expectation_validation_result: false
fluent_datasources:
my_ds:
type: spark_filesystem
assets:
my_asset:
type: parquet
batching_regex: my_regex
base_directory: my\dir
Code:
import great_expectations as gx
context = gx.get_context()
datasource_name = "my_ds"
path_to_folder_containing_pq_directories = "R:/Zentrale/ZB-S/Daten_DSZ/Daten/mifir/great_expectations"
datasource = context.sources.add_or_update_spark_filesystem(
name=datasource_name, base_directory=path_to_folder_containing_pq_directories
)
datasource.add_parquet_asset("buyer_account_owner", batching_regex="buyer_account_owner")
data_asset = context.get_datasource(datasource_name).get_asset("buyer_account_owner")
expectation_suite = context.add_or_update_expectation_suite(expectation_suite_name=datasource_name)
validator = context.get_validator(
batch_request=data_asset.build_batch_request(),
expectation_suite_name="my_es",
datasource_name=datasource_name,
data_asset_name="my_asset"
)
validator.expect_column_value_lengths_to_equal(
"sdc_buyer_account_owner", value=20, condition_parser="spark", row_condition='col("condition_col") == "ABC"'
)
Traceback:
Traceback (most recent call last):
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 548, in _process_direct_and_bundled_metric_computation_configurations
] = metric_computation_configuration.metric_fn( # type: ignore[misc] # F not callable
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\metrics\metric_provider.py", line 60, in inner_func
return metric_fn(*args, **kwargs)
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\metrics\table_metrics\table_column_types.py", line 84, in _spark
df, _, _ = execution_engine.get_compute_domain(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 663, in get_compute_domain
data: pyspark.DataFrame = self.get_domain_records(domain_kwargs=domain_kwargs)
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 522, in get_domain_records
data = data.filter(row_condition)
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\pyspark\sql\dataframe.py", line 1715, in filter
jdf = self._jdf.filter(condition)
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\pyspark\sql\utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 621, in inst_expectation
raise err
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 584, in inst_expectation
validation_result = expectation.validate(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\expectation.py", line 1277, in validate
] = validator.graph_validate(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1096, in graph_validate
raise err
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1075, in graph_validate
) = self._resolve_suite_level_graph_and_process_metric_evaluation_errors(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1234, in _resolve_suite_level_graph_and_process_metric_evaluation_errors
) = self._metrics_calculator.resolve_validation_graph(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\metrics_calculator.py", line 314, in resolve_validation_graph
resolved_metrics, aborted_metrics_info = graph.resolve(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 209, in resolve
] = self._resolve(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 315, in _resolve
raise err
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 285, in _resolve
self._execution_engine.resolve_metrics(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 283, in resolve_metrics
return self._process_direct_and_bundled_metric_computation_configurations(
File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 552, in _process_direct_and_bundled_metric_computation_configurations
raise gx_exceptions.MetricResolutionError(
great_expectations.exceptions.exceptions.MetricResolutionError: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0
Expected behavior I would expect that GX works as documented.
Environment (please complete the following information):
- Operating System: Windows
- Great Expectations Version: 0.17.22
- Data Source: Spark
- Cloud environment: None (we create and test expectations locally before running them in a Spark cluster.
Hi @matthiasgomolka thank you for letting us know, this is interesting to see. We'll put it into our backlog for review.
Had the same issue
GX Documentation:
row_condition='col("foo").notNull()' # foo is not null
Spark Documentation:
column.IsNotNull()
[see: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.isNotNull.html]
Traceback error:
raise gx_exceptions.MetricResolutionError(\ngreat_expectations.exceptions.exceptions.MetricResolutionError: unable to parse condition: col('column_name').notNULL()\n"
PLEEEEASE get rid of this row_condition='col("foo").notNull()'
and allow simple SQL syntax passthru: row_condition = 'fld1=5 OR fld2<>7 AND fld3 <9'
you may use this - it works like a SQL WHERE:
"condition_parser": "spark", "row_condition": "date_of_birth > DATE "0001-01-01" AND gender = 'M' OR dead = true",
Hello @matthiasgomolka. With the launch of Great Expectations Core (GX 1.0), we are closing old issues posted regarding previous versions. Moving forward, we will focus our resources on supporting and improving GX Core (version 1.0 and beyond). If you find that an issue you previously reported still exists in GX Core, we encourage you to resubmit it against the new version. With more resources dedicated to community support, we aim to tackle new issues swiftly. For specific details on what is GX-supported vs community-supported, you can reference our integration and support policy.
To get started on your transition to GX Core, check out the GX Core quickstart (click “Full example code” tab to see a code example).
You can also join our upcoming community meeting on August 28th at 9am PT (noon ET / 4pm UTC) for a comprehensive rundown of everything GX Core, plus Q&A as time permits. Go to https://greatexpectations.io/meetup and click “follow calendar” to follow the GX community calendar.
Thank you for being part of the GX community and thank you for submitting this issue. We're excited about this new chapter and look forward to your feedback on GX Core. 🤗