great_expectations
great_expectations copied to clipboard
Bug: TypeError when using a custom_filter_function
Bug description
When running a checkpoint from Python, using a "custom_filter_function" in the "data_connector_query" of the "batch_request", raises the following error:
TypeError: <function ge_checkpoint_files.<locals>.__extracted_files_filter at 0x000002A992E1AC10> is of type function which cannot be serialized.
Environment
- Windows 10
- Great Expectations 0.14.1
Example code
def ge_checkpoint_files(extracted_filenames, ge_dir):
def __extracted_files_filter(batch_identifiers: dict) -> bool:
filename_template = "redacted_{year}_{month}_{day}.csv"
f = filename_template.format(
**batch_identifiers
)
return f in extracted_filenames
val = {
"batch_request": {
"datasource_name": "redacted",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "redacted",
"data_connector_query": {
"index": 0,
"custom_filter_function": __extracted_files_filter
}
}
}
context = DataContext(ge_dir)
results = context.run_checkpoint(checkpoint_name="redacted.file", validations=[val])
Full Stack Trace
(.venv) PS C:\redacted_path_to_project> & c:/redacted_path_to_project/.venv/Scripts/python.exe c:/redacted_path_to_project/sandbox/checkpoint.py
C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\data_context\data_context.py:458: DeprecationWarning: The 'warn' method
is deprecated, use 'warning' instead
logger.warn(f"Cannot initialize datasource {datasource_name}: {e}")
Cannot initialize datasource gcp_bigquery: Cannot initialize datasource gcp_bigquery, error: Can't load plugin: sqlalchemy.dialects:bigquery
Traceback (most recent call last):
File "c:\redacted_path_to_project\sandbox\checkpoint.py", line 75, in <module>
res = ge_checkpoint_files(
File "c:\redacted_path_to_project\sandbox\checkpoint.py", line 54, in ge_checkpoint_files
results = context.run_checkpoint(
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\usage_statistics\usage_statistics.py", line 307, in usage_statistics_wrapped_method
result = func(*args, **kwargs)
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\data_context\data_context.py", line 3179, in run_checkpoint
return checkpoint_toolkit.run_checkpoint(
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\checkpoint\toolkit.py", line 373, in run_checkpoint
return checkpoint.run(**checkpoint_run_arguments)
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\usage_statistics\usage_statistics.py", line 307, in usage_statistics_wrapped_method
result = func(*args, **kwargs)
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\checkpoint\checkpoint.py", line 349, in run
substituted_runtime_config: CheckpointConfig = self.get_substituted_config(
substituted_config = copy.deepcopy(config)
File "C:\Program Files\Python39\lib\copy.py", line 153, in deepcopy
y = copier(memo)
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\data_context\types\base.py", line 2398, in __deepcopy__
for key, value in self.to_json_dict().items():
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\data_context\types\base.py", line 123, in to_json_dict
return convert_to_json_serializable(data=commented_map)
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\util.py", line 168, in convert_to_json_serializable
new_list.append(convert_to_json_serializable(val))
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "C:\redacted_path_to_project\.venv\lib\site-packages\great_expectations\core\util.py", line 256, in convert_to_json_serializable
raise TypeError(
TypeError: <function ge_checkpoint_files.<locals>.__extracted_files_filter at 0x0000021360F7A700> is of type function which cannot be serialized.
(.venv) PS C:\redacted_path_to_project> great_expectations --version
great_expectations, version 0.14.1
(.venv) PS C:\redacted_path_to_project>
Howdy @akerone, thank you for telling us and providing the full stack trace. We'll get back to you, and again thank you.
Is this issue still relevant? If so, what is blocking it? Is there anything you can do to help move it forward?
This issue has been automatically marked as stale because it has not had recent activity.
It will be closed if no further activity occurs. Thank you for your contributions 🙇
Is there any progress on this? I am facing the same issue and not being able to use custom filters is a pretty big deal for our project. My BatchRequest is returning the right assets, filtered by custom_filter_function, but checkpoint.run() seems to want to serialize the function for some reason.
Hello, @akerone and @pfuerste -- thank you for reporting this issue and for your interest in seeing its resolution.
The situation is actually well-known. In summary, dynamic/ephemeral entities, such as DataFrame
and function, cannot be serialized. However, GX documentation states that Checkpoint
can be created and its configuration saved (in the checkpoint_store
). However, this only works when the Checkpoint
(whose configuration is being serialized in order to be stored) does not contain these dynamic components (i.e., BatchRequest
points only to data on filesystem and custom function in Batch
querying is not used).
If you must have these non-serializable entities in your Checkpoint
, then I would like to suggest an alternative: instantiate the Checkpoint
class with all the fields that you need, but do not try to save this Checkpoint
-- just instantiate and call its run()
method.
In addition (and since you are already doing the work, it is worth your time, since it is important and can be very helpful), GX has undergone a lot of improvements:
- Please use the "Fluent Datasources" approach -- this is a new, more intuitive way, to connect to data.
- As you did before, please report your code with full error stack trace -- with as much detail as possible.
You can also reach me directly in our GX Slack Community, and we can schedule a Zoom troubleshooting session and then share the outcome with the Community so that everybody can benefit from our learnings.
Thank you very much.
@akerone and @pfuerste -- I am hoping that our suggestions helped. I am closing this issue for now. Please feel free to reopen and let us know at any time. Thank you very much for using GX!