great_expectations icon indicating copy to clipboard operation
great_expectations copied to clipboard

Data Context cannot get blob files from ABS when there are regex patterns outside of groups (e.g. for redundant info)

Open tunayokumus opened this issue 2 years ago • 2 comments

Describe the bug Hi, I'm using InferredAssetAzureDataConnector to connect to data in ABS. I'd like to use regex wildcards to ignore the timestamp info when grouping data assets. When I only use regex patterns within groups, everything works fine. However, when I use patterns for the redundant info in the filename, context.get_validator method throws an error.

Note: context.test_yaml_config(yaml.dump(datasource_config)) is functioning fine for both approaches.

To Reproduce Steps to reproduce the behavior:

  1. When creating a data connector with InferredAssetAzureDataConnector include regex wildcards outside of a group in the regex pattern.
datasource_config = {
    "name": "my_azure_datasource",
    "class_name": "Datasource",
    "execution_engine": {
        "class_name": "PandasExecutionEngine",
        "azure_options": {
            "account_url": "https://mystorage.blob.core.windows.net",
            "credential": CREDENTIAL,
        },
    },
    "data_connectors": {
        "default_inferred_data_connector_name": {
            "class_name": "InferredAssetAzureDataConnector",
            "azure_options": {
                "account_url": "https://mystorage.blob.core.windows.net",
                "credential": CREDENTIAL,
            },
            "container": "my-container",
            "name_starts_with": "data/bronze_tables/",
            "default_regex": {
                "pattern": ".*/(\\d{4})_(\\d{2})_(\\d{2})T\\d{2}_\\d{2}_\\d{2}/(.*)\\.parquet",
                "group_names": ["year", "month", "day", "data_asset_name"],
            },
        },
    },
}
  1. Run context.test_yaml_config(yaml.dump(datasource_config)) It should return something like this:
...
Data Connectors:
	default_inferred_data_connector_name : InferredAssetAzureDataConnector

	Available data_asset_names (1 of 4):
		bronze_table (3 of 49): ['*/2021_12_08T*_*_*/bronze_table.parquet', '*/2021_12_17T*_*_*/bronze_table.parquet', '*/2021_12_18T*_*_*/bronze_table.parquet']
	Unmatched data_references (0 of 0):[]
  1. Run
context.add_datasource(**datasource_config)
batch_request = BatchRequest(
    datasource_name="my_azure_datasource",
    data_connector_name="default_inferred_data_connector_name",
    data_asset_name="bronze_tables",
)
context.create_expectation_suite(
    expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
    batch_request=batch_request, expectation_suite_name="test_suite"
)
  1. See error
Click to expand the error message!

---------------------------------------------------------------------------
ResourceNotFoundError                     Traceback (most recent call last)
File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_download.py:386, in StorageStreamDownloader._initial_request(self)
    385 try:
--> 386     location_mode, response = self._clients.blob.download(
    387         range=range_header,
    388         range_get_content_md5=range_validation,
    389         validate_content=self._validate_content,
    390         data_stream_total=None,
    391         download_stream_current=0,
    392         **self._request_options
    393     )
    395     # Check the location we read from to ensure we use the same one
    396     # for subsequent requests.

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_generated/operations/_blob_operations.py:183, in BlobOperations.download(self, snapshot, version_id, timeout, range, range_get_content_md5, range_get_content_crc64, request_id_parameter, lease_access_conditions, cpk_info, modified_access_conditions, **kwargs)
    182 if response.status_code not in [200, 206]:
--> 183     map_error(status_code=response.status_code, response=response, error_map=error_map)
    184     error = self._deserialize(_models.StorageError, response)

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/core/exceptions.py:105, in map_error(status_code, response, error_map)
    104 error = error_type(response=response)
--> 105 raise error

ResourceNotFoundError: Operation returned an invalid status 'The specified blob does not exist.'

During handling of the above exception, another exception occurred:

ResourceNotFoundError                     Traceback (most recent call last)
/home/my_user/repos/my_repo/great_expectations/data.ipynb Cell 11' in <module>
      1 context.create_expectation_suite(
      2     expectation_suite_name="test_suite", overwrite_existing=True
      3 )
----> 4 validator = context.get_validator(
      5     batch_request=batch_request, expectation_suite_name="test_suite"
      6 )

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/great_expectations/data_context/data_context.py:1817, in BaseDataContext.get_validator(self, datasource_name, data_connector_name, data_asset_name, batch_request, batch_request_list, batch_data, data_connector_query, batch_identifiers, limit, index, custom_filter_function, sampling_method, sampling_kwargs, splitter_method, splitter_kwargs, runtime_parameters, query, path, batch_filter_parameters, expectation_suite_ge_cloud_id, batch_spec_passthrough, expectation_suite_name, expectation_suite, create_expectation_suite_with_name, **kwargs)
   1814 batch_list: List = []
   1815 for batch_request in batch_request_list:
   1816     batch_list.extend(
-> 1817         self.get_batch_list(
   1818             datasource_name=datasource_name,
   1819             data_connector_name=data_connector_name,
   1820             data_asset_name=data_asset_name,
   1821             batch_request=batch_request,
   1822             batch_data=batch_data,
   1823             data_connector_query=data_connector_query,
   1824             batch_identifiers=batch_identifiers,
   1825             limit=limit,
   1826             index=index,
   1827             custom_filter_function=custom_filter_function,
   1828             sampling_method=sampling_method,
   1829             sampling_kwargs=sampling_kwargs,
   1830             splitter_method=splitter_method,
   1831             splitter_kwargs=splitter_kwargs,
   1832             runtime_parameters=runtime_parameters,
   1833             query=query,
   1834             path=path,
   1835             batch_filter_parameters=batch_filter_parameters,
   1836             batch_spec_passthrough=batch_spec_passthrough,
   1837             **kwargs,
   1838         )
   1839     )
   1841 # We get a single batch_definition so we can get the execution_engine here. All batches will share the same one
   1842 # So the batch itself doesn't matter. But we use -1 because that will be the latest batch loaded.
   1843 batch_definition = batch_list[-1].batch_definition

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/great_expectations/core/usage_statistics/usage_statistics.py:306, in usage_statistics_enabled_method.<locals>.usage_statistics_wrapped_method(*args, **kwargs)
    303     if args_payload_fn is not None:
    304         nested_update(event_payload, args_payload_fn(*args, **kwargs))
--> 306     result = func(*args, **kwargs)
    307     message["success"] = True
    308 except Exception:

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/great_expectations/data_context/data_context.py:1736, in BaseDataContext.get_batch_list(self, datasource_name, data_connector_name, data_asset_name, batch_request, batch_data, data_connector_query, batch_identifiers, limit, index, custom_filter_function, sampling_method, sampling_kwargs, splitter_method, splitter_kwargs, runtime_parameters, query, path, batch_filter_parameters, batch_spec_passthrough, **kwargs)
   1731 else:
   1732     raise ge_exceptions.DatasourceError(
   1733         datasource_name,
   1734         "The given datasource could not be retrieved from the DataContext; please confirm that your configuration is accurate.",
   1735     )
-> 1736 return datasource.get_batch_list_from_batch_request(batch_request=batch_request)

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/great_expectations/datasource/new_datasource.py:194, in BaseDatasource.get_batch_list_from_batch_request(self, batch_request)
    188 batch_spec: PathBatchSpec
    189 batch_markers: BatchMarkers
    190 (
    191     batch_data,
    192     batch_spec,
    193     batch_markers,
--> 194 ) = data_connector.get_batch_data_and_metadata(
    195     batch_definition=batch_definition
    196 )
    197 new_batch: Batch = Batch(
    198     data=batch_data,
    199     batch_request=batch_request,
   (...)
    202     batch_markers=batch_markers,
    203 )
    204 batches.append(new_batch)

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/great_expectations/datasource/data_connector/data_connector.py:110, in DataConnector.get_batch_data_and_metadata(self, batch_definition)
    101 """
    102 Uses batch_definition to retrieve batch_data and batch_markers by building a batch_spec from batch_definition,
    103 then using execution_engine to return batch_data and batch_markers
   (...)
    107 
    108 """
    109 batch_spec: BatchSpec = self.build_batch_spec(batch_definition=batch_definition)
--> 110 batch_data, batch_markers = self._execution_engine.get_batch_data_and_markers(
    111     batch_spec=batch_spec
    112 )
    113 self._execution_engine.load_batch_data(batch_definition.id, batch_data)
    114 return (
    115     batch_data,
    116     batch_spec,
    117     batch_markers,
    118 )

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/great_expectations/execution_engine/pandas_execution_engine.py:273, in PandasExecutionEngine.get_batch_data_and_markers(self, batch_spec)
    269 azure_url = AzureUrl(path)
    270 blob_client = azure_engine.get_blob_client(
    271     container=azure_url.container, blob=azure_url.blob
    272 )
--> 273 azure_object = blob_client.download_blob()
    274 logger.debug(
    275     f"Fetching Azure blob. Container: {azure_url.container} Blob: {azure_url.blob}"
    276 )
    277 reader_fn = self._get_reader_fn(reader_method, azure_url.blob)

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/core/tracing/decorator.py:83, in distributed_trace.<locals>.decorator.<locals>.wrapper_use_tracer(*args, **kwargs)
     81 span_impl_type = settings.tracing_implementation()
     82 if span_impl_type is None:
---> 83     return func(*args, **kwargs)
     85 # Merge span is parameter is set, but only if no explicit parent are passed
     86 if merge_span and not passed_in_parent:

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_blob_client.py:828, in BlobClient.download_blob(self, offset, length, **kwargs)
    745 """Downloads a blob to the StorageStreamDownloader. The readall() method must
    746 be used to read all the content or readinto() must be used to download the blob into
    747 a stream. Using chunks() returns an iterator which allows the user to iterate over the content in chunks.
   (...)
    822         :caption: Download a blob.
    823 """
    824 options = self._download_blob_options(
    825     offset=offset,
    826     length=length,
    827     **kwargs)
--> 828 return StorageStreamDownloader(**options)

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_download.py:349, in StorageStreamDownloader.__init__(self, clients, config, start_range, end_range, validate_content, encryption_options, max_concurrency, name, container, encoding, **kwargs)
    343     initial_request_end = initial_request_start + self._first_get_size - 1
    345 self._initial_range, self._initial_offset = process_range_and_offset(
    346     initial_request_start, initial_request_end, self._end_range, self._encryption_options
    347 )
--> 349 self._response = self._initial_request()
    350 self.properties = self._response.properties
    351 self.properties.name = self.name

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_download.py:429, in StorageStreamDownloader._initial_request(self)
    427         self._file_size = 0
    428     else:
--> 429         process_storage_error(error)
    431 try:
    432     if self.size == 0:

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py:150, in process_storage_error(storage_error)
    148 error.error_code = error_code
    149 error.additional_info = additional_data
--> 150 error.raise_with_traceback()

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/core/exceptions.py:247, in AzureError.raise_with_traceback(self)
    245 def raise_with_traceback(self):
    246     try:
--> 247         raise super(AzureError, self).with_traceback(self.exc_traceback)
    248     except AttributeError:
    249         self.__traceback__ = self.exc_traceback

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_download.py:386, in StorageStreamDownloader._initial_request(self)
    384 while retry_active:
    385     try:
--> 386         location_mode, response = self._clients.blob.download(
    387             range=range_header,
    388             range_get_content_md5=range_validation,
    389             validate_content=self._validate_content,
    390             data_stream_total=None,
    391             download_stream_current=0,
    392             **self._request_options
    393         )
    395         # Check the location we read from to ensure we use the same one
    396         # for subsequent requests.
    397         self._location_mode = location_mode

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/storage/blob/_generated/operations/_blob_operations.py:183, in BlobOperations.download(self, snapshot, version_id, timeout, range, range_get_content_md5, range_get_content_crc64, request_id_parameter, lease_access_conditions, cpk_info, modified_access_conditions, **kwargs)
    180 response = pipeline_response.http_response
    182 if response.status_code not in [200, 206]:
--> 183     map_error(status_code=response.status_code, response=response, error_map=error_map)
    184     error = self._deserialize(_models.StorageError, response)
    185     raise HttpResponseError(response=response, model=error)

File ~/miniconda3/envs/my_env/lib/python3.8/site-packages/azure/core/exceptions.py:105, in map_error(status_code, response, error_map)
    103     return
    104 error = error_type(response=response)
--> 105 raise error

ResourceNotFoundError: The specified blob does not exist.
RequestId:<request-id>
Time:2022-02-15T14:27:18.548101Z
ErrorCode:BlobNotFound
Error:None

Expected behavior When the wildcards are replaced by specific characters or if the wildcards are also included in the groups then everything is working fine. I'd also expect the get_validator method to properly find the data. However, that's not the case.

Environment (please complete the following information):

  • Operating System: Linux (Ubuntu 20.04)
  • Great Expectations Version: v0.14.6

Additional context Probably the "*" wildcard is being passed directly to azure blob storage manager package. Does it support wildcards as requested by great-expectations?

tunayokumus avatar Feb 15 '22 13:02 tunayokumus

Hey @tunayokumus ! Thanks for reaching out here; we'll begin reviewing this internally and contact you soon.

austiezr avatar Feb 16 '22 16:02 austiezr

Hi @tunayokumus, so sorry but we have not been able to make progress on this issue. Are you still experiencing this problem?

To confirm, are you saying that this works as expected when running test_yaml_config and doesn’t when running context.get_validator?

kyleaton avatar Aug 31 '22 15:08 kyleaton