great_expectations icon indicating copy to clipboard operation
great_expectations copied to clipboard

`row_condition` with Spark not working as documented

Open matthiasgomolka opened this issue 1 year ago • 2 comments

Describe the bug The documentation states that row conditions for Spark should be specified like this: row_condition='col("foo") == "Two Two"'.

However, if I try it like this, I get this error:

MetricResolutionError: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;

Interestingly, it works like this:

row_condition='foo == "Two  Two"'

To Reproduce great_expectations.yml:

config_version: 3.0
datasources: {}
config_variables_file_path: uncommitted/config_variables.yml
plugins_directory: plugins/
stores:
  expectations_store:
    class_name: ExpectationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: expectations/
  validations_store:
    class_name: ValidationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted/validations/
  evaluation_parameter_store:
    class_name: EvaluationParameterStore
  checkpoint_store:
    class_name: CheckpointStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      suppress_store_backend_id: true
      base_directory: checkpoints/
  profiler_store:
    class_name: ProfilerStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      suppress_store_backend_id: true
      base_directory: profilers/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_store
data_docs_sites:
  local_site:
    class_name: SiteBuilder
    show_how_to_buttons: true
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted\data_docs\local_site
    site_index_builder:
      class_name: DefaultSiteIndexBuilder
anonymous_usage_statistics:
  data_context_id: 8185aa6f-6303-4897-8bb4-6e23b494e231
  enabled: true
notebooks:
include_rendered_content:
  globally: false
  expectation_suite: false
  expectation_validation_result: false
fluent_datasources:
  my_ds:
    type: spark_filesystem
    assets:
      my_asset:
        type: parquet
        batching_regex: my_regex
    base_directory: my\dir

Code:

import great_expectations as gx

context = gx.get_context()

datasource_name = "my_ds"
path_to_folder_containing_pq_directories = "R:/Zentrale/ZB-S/Daten_DSZ/Daten/mifir/great_expectations"

datasource = context.sources.add_or_update_spark_filesystem(
    name=datasource_name, base_directory=path_to_folder_containing_pq_directories
)

datasource.add_parquet_asset("buyer_account_owner", batching_regex="buyer_account_owner")
data_asset = context.get_datasource(datasource_name).get_asset("buyer_account_owner")

expectation_suite = context.add_or_update_expectation_suite(expectation_suite_name=datasource_name)

validator = context.get_validator(
    batch_request=data_asset.build_batch_request(),
    expectation_suite_name="my_es",
    datasource_name=datasource_name, 
    data_asset_name="my_asset"
)

validator.expect_column_value_lengths_to_equal(
    "sdc_buyer_account_owner", value=20, condition_parser="spark", row_condition='col("condition_col") == "ABC"'
)

Traceback:

Traceback (most recent call last):
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 548, in _process_direct_and_bundled_metric_computation_configurations
    ] = metric_computation_configuration.metric_fn(  # type: ignore[misc] # F not callable
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\metrics\metric_provider.py", line 60, in inner_func
    return metric_fn(*args, **kwargs)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\metrics\table_metrics\table_column_types.py", line 84, in _spark
    df, _, _ = execution_engine.get_compute_domain(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 663, in get_compute_domain
    data: pyspark.DataFrame = self.get_domain_records(domain_kwargs=domain_kwargs)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 522, in get_domain_records
    data = data.filter(row_condition)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\pyspark\sql\dataframe.py", line 1715, in filter
    jdf = self._jdf.filter(condition)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\pyspark\sql\utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 621, in inst_expectation
    raise err
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 584, in inst_expectation
    validation_result = expectation.validate(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\expectation.py", line 1277, in validate
    ] = validator.graph_validate(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1096, in graph_validate
    raise err
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1075, in graph_validate
    ) = self._resolve_suite_level_graph_and_process_metric_evaluation_errors(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1234, in _resolve_suite_level_graph_and_process_metric_evaluation_errors
    ) = self._metrics_calculator.resolve_validation_graph(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\metrics_calculator.py", line 314, in resolve_validation_graph
    resolved_metrics, aborted_metrics_info = graph.resolve(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 209, in resolve
    ] = self._resolve(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 315, in _resolve
    raise err
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 285, in _resolve
    self._execution_engine.resolve_metrics(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 283, in resolve_metrics
    return self._process_direct_and_bundled_metric_computation_configurations(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 552, in _process_direct_and_bundled_metric_computation_configurations
    raise gx_exceptions.MetricResolutionError(
great_expectations.exceptions.exceptions.MetricResolutionError: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

Expected behavior I would expect that GX works as documented.

Environment (please complete the following information):

  • Operating System: Windows
  • Great Expectations Version: 0.17.22
  • Data Source: Spark
  • Cloud environment: None (we create and test expectations locally before running them in a Spark cluster.

matthiasgomolka avatar Oct 19 '23 12:10 matthiasgomolka

Hi @matthiasgomolka thank you for letting us know, this is interesting to see. We'll put it into our backlog for review.

HaebichanGX avatar Oct 24 '23 14:10 HaebichanGX

Had the same issue

GX Documentation: row_condition='col("foo").notNull()' # foo is not null

Spark Documentation: column.IsNotNull() [see: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.isNotNull.html]

Traceback error: raise gx_exceptions.MetricResolutionError(\ngreat_expectations.exceptions.exceptions.MetricResolutionError: unable to parse condition: col('column_name').notNULL()\n"

godzzi avatar Mar 18 '24 09:03 godzzi

PLEEEEASE get rid of this row_condition='col("foo").notNull()'

and allow simple SQL syntax passthru: row_condition = 'fld1=5 OR fld2<>7 AND fld3 <9'

kujaska avatar Jul 05 '24 14:07 kujaska

you may use this - it works like a SQL WHERE:

"condition_parser": "spark", "row_condition": "date_of_birth > DATE "0001-01-01" AND gender = 'M' OR dead = true",

kujaska avatar Aug 20 '24 06:08 kujaska

Hello @matthiasgomolka. With the launch of Great Expectations Core (GX 1.0), we are closing old issues posted regarding previous versions. Moving forward, we will focus our resources on supporting and improving GX Core (version 1.0 and beyond). If you find that an issue you previously reported still exists in GX Core, we encourage you to resubmit it against the new version. With more resources dedicated to community support, we aim to tackle new issues swiftly. For specific details on what is GX-supported vs community-supported, you can reference our integration and support policy.

To get started on your transition to GX Core, check out the GX Core quickstart (click “Full example code” tab to see a code example).

You can also join our upcoming community meeting on August 28th at 9am PT (noon ET / 4pm UTC) for a comprehensive rundown of everything GX Core, plus Q&A as time permits. Go to https://greatexpectations.io/meetup and click “follow calendar” to follow the GX community calendar.

Thank you for being part of the GX community and thank you for submitting this issue. We're excited about this new chapter and look forward to your feedback on GX Core. 🤗

molliemarie avatar Aug 23 '24 00:08 molliemarie