great_expectations
great_expectations copied to clipboard
Arguments for custom connectors (plugins) are stripped from the config
Describe the bug The docs describe that you can build your own custom connectors and include them in your config.
When you create your own connector, you face 2 issues related to validations that don't hold this behavior into account:
- Arguments cannot be named similarly to internal connectors. E.g. you cannot use the argument
default_regex
since there is a validation that checks that you're using one of the predefined inferred connectors and raises an exception if you're not. The check is on the exact type name, so you can't even inherit from an inferred connector an use that argument. - Custom arguments are not allowed. Since there are strict schemas defined for every part of the config, the code also enforces schema validation on your config. The schemas do not contain your custom arguments and the custom arguments are removed from the config. E.g. you can't create an argument
custom_default_regex
in your connector, the value will simply be stripped out during schema validation.
To Reproduce Steps to reproduce the behavior:
- Create a custom connector inheriting from an inferred connector
- Override the init method and pass the existing args to the super
- Add a custom argument to your init method
- Add this connector to your yaml config, providing a value for the custom argument and for default_regex
- First error you get is about the default_regex etc., you cannot use this arg since your connector doesn't have an exact allowed type
- Remove that argument from your config :(
- Next error you get is that your init function is missing a required argument, since the value for your custom argument was left out
Expected behavior Let the custom connectors work with the args they require and don't limit the usage of default argument keywords so that we can easily create variants of the built-in connectors
Howdy @sdebruyn :wave: thanks for raising this with us :bow:
Can you please share the code backing the workflow so we can better look into this 🔬
Hi, something like this would not work:
from __future__ import annotations
import os.path
from functools import cached_property
from typing import Any, List, Optional
from fsspec import AbstractFileSystem
from REDACTED import get_filesystem_and_handle
from great_expectations.datasource.data_connector import InferredAssetFilePathDataConnector
from great_expectations.execution_engine import ExecutionEngine
class InferredFsspecDataConnector(InferredAssetFilePathDataConnector):
"""Fsspec version of InferredAssetFilesystemDataConnector."""
def __init__(
self,
name: str,
datasource_name: str,
full_base_path: str,
storage_options: dict[str, Any] | None = None,
glob_directive: str = "**",
execution_engine: Optional[ExecutionEngine] = None,
default_regex: Optional[dict] = None,
sorters: Optional[list] = None,
batch_spec_passthrough: Optional[dict] = None,
) -> None:
self.full_base_path = full_base_path
self.storage_options = storage_options
self._glob_directive = glob_directive
super().__init__(name, datasource_name, execution_engine, default_regex, sorters, batch_spec_passthrough)
@cached_property
def filesystem(self) -> AbstractFileSystem:
if not self._filesystem:
self._set_filesystem_and_handle()
return self._filesystem
@cached_property
def path_or_handle(self) -> str:
if not self._path_or_handle:
self._set_filesystem_and_handle()
return self._path_or_handle
def _set_filesystem_and_handle(self) -> None:
self._filesystem, self._path_or_handle = get_filesystem_and_handle(self.full_base_path, self.storage_options)
def _get_full_file_path(self, path: str, data_asset_name: Optional[str] = None) -> str:
return os.path.join(self.path_or_handle, path)
def _get_data_reference_list(self, data_asset_name: Optional[str] = None) -> List[str]:
glob_path = os.path.join(self.path_or_handle, self._glob_directive)
path_list = self.filesystem.glob(glob_path)
return sorted(path_list)
Hey @sdebruyn we have noted this as a feature/enhancement request. Product management will make a determination about its priority.