great_expectations
great_expectations copied to clipboard
Can't run checkpoints in parallel because great_expectations.yml is opened in write mode
Describe the bug (I have looked into the code to the best of my abilities without being able to debug because this happens when multiple threads run at the same time) When multiple checkpoints are triggered simultaneously it sometimes happens that an exception is caused. In my case 5 checkpoints are triggered from Airflow and most of the time 2-4 of them fail with the exception below. Setting retries to 5 and the retry delay to 3 solves my particular problem, although the amount of checkpoints is likely to grow to 50-100 soon.
As far as I can tell this happens because Great Expectations opens great_expectations.yml in write mode in line 159 of file_data_context.py
:
with open(config_filepath, "w") as outfile:
This was completely unexpected to me, since I don't think running a checkpoint requires any write operation on that file.
Airflow logs:
scheduler | [2023-07-12T11:09:43.252-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
scheduler | [2023-07-12T11:09:43.251-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
scheduler | [2023-07-12T11:09:43.252-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
scheduler | [2023-07-12T11:09:43.260-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
scheduler | [2023-07-12T11:09:43.260-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
scheduler | [2023-07-12T11:09:46.414-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
scheduler | [2023-07-12T11:09:46.426-0500] {config.py:185} INFO - Loading 'datasources' ->
scheduler | [{'assets': [...],
scheduler | 'REDACTED
scheduler | 'create_temp_table': True,
scheduler | 'name': 'REDACTED',
scheduler | 'type': 'sql'}]
scheduler | [2023-07-12T11:09:46.483-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
scheduler | [2023-07-12T11:09:46.488-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
scheduler | [2023-07-12T11:09:46.499-0500] {config.py:185} INFO - Loading 'datasources' ->
scheduler | [{'assets': [...],
scheduler | 'REDACTED
scheduler | 'create_temp_table': True,
scheduler | 'name': 'REDACTED',
scheduler | 'type': 'sql'}]
scheduler | [2023-07-12T11:09:46.501-0500] {config.py:185} INFO - Loading 'datasources' ->
scheduler | [{'assets': [...],
scheduler | 'REDACTED
scheduler | 'create_temp_table': True,
scheduler | 'name': 'REDACTED',
scheduler | 'type': 'sql'}]
scheduler | [2023-07-12T11:09:46.643-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
scheduler | [2023-07-12T11:09:46.649-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
scheduler | [2023-07-12T11:09:46.649-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
scheduler | [2023-07-12T11:09:46.654-0500] {config.py:185} INFO - Loading 'datasources' ->
scheduler | [{'assets': [...],
scheduler | 'REDACTED
scheduler | 'create_temp_table': True,
scheduler | 'name': 'REDACTED',
scheduler | 'type': 'sql'}]
scheduler | [2023-07-12T11:09:46.699-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
scheduler | [2023-07-12T11:09:46.707-0500] {config.py:185} INFO - Loading 'datasources' ->
scheduler | [{'assets': [...], 'name': 'REDACTED', 'type': 'sql'}]
scheduler | [2023-07-12T11:09:46.711-0500] {dagbag.py:350} ERROR - Failed to import: airflow/dags/run-constraints-checkpoint.py
scheduler | Traceback (most recent call last):
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/models/dagbag.py", line 346, in parse
scheduler | loader.exec_module(new_module)
scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
scheduler | File "airflow/dags/run-constraints-checkpoint.py", line 29, in <module>
scheduler | context = gx.get_context(context_root_dir=gx_context_root_dir)
scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 1917, in get_context
scheduler | file_context = _get_file_context(
scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 2047, in _get_file_context
scheduler | return FileDataContext(
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
scheduler | super().__init__(
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 69, in __init__
scheduler | super().__init__(runtime_environment=runtime_environment)
scheduler | File "gx_fork/great_expectations/great_expectations/core/usage_statistics/usage_statistics.py", line 260, in usage_statistics_wrapped_method
scheduler | result = func(*args, **kwargs)
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/abstract_data_context.py", line 284, in __init__
scheduler | self.fluent_config = self._load_fluent_config(self._config_provider)
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 220, in _load_fluent_config
scheduler | gx_config = GxConfig.parse_yaml(path_to_fluent_yaml, _allow_empty=True)
scheduler | File "gx_fork/great_expectations/great_expectations/datasource/fluent/config.py", line 252, in parse_yaml
scheduler | config = cls(**loaded)
scheduler | File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
scheduler | pydantic.error_wrappers.ValidationError: 1 validation error for GxConfig
scheduler | fluent_datasources -> REDACTED
scheduler | field required (type=value_error.missing)
scheduler | [2023-07-12T11:09:46.717-0500] {cli.py:232} WARNING - Dag 'gx_run_constraints_checkpoints' not found in path airflow/dags/run-constraints-checkpoint.py; trying path airflow/dags/run-constraints-checkpoint.py
scheduler | [2023-07-12T11:09:46.717-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
scheduler | [2023-07-12T11:09:46.718-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
scheduler | [2023-07-12T11:09:46.718-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
scheduler | [2023-07-12T11:09:46.720-0500] {dagbag.py:350} ERROR - Failed to import: airflow/dags/run-constraints-checkpoint.py
scheduler | Traceback (most recent call last):
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/models/dagbag.py", line 346, in parse
scheduler | loader.exec_module(new_module)
scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
scheduler | File "airflow/dags/run-constraints-checkpoint.py", line 29, in <module>
scheduler | context = gx.get_context(context_root_dir=gx_context_root_dir)
scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 1917, in get_context
scheduler | file_context = _get_file_context(
scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 2047, in _get_file_context
scheduler | return FileDataContext(
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 61, in __init__
scheduler | self._scaffold_project()
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 86, in _scaffold_project
scheduler | if self.is_project_scaffolded(self._context_root_directory):
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 480, in is_project_scaffolded
scheduler | and cls.config_variables_yml_exist(ge_dir)
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 240, in config_variables_yml_exist
scheduler | config_var_path = config.get("config_variables_file_path")
scheduler | AttributeError: 'NoneType' object has no attribute 'get'
scheduler | [2023-07-12T11:09:46.721-0500] {local_executor.py:135} ERROR - Failed to execute task Dag 'gx_run_constraints_checkpoints' could not be found; either it does not exist or it failed to parse..
scheduler | Traceback (most recent call last):
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 131, in _execute_work_in_fork
scheduler | args.func(args)
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/cli/cli_config.py", line 52, in command
scheduler | return func(*args, **kwargs)
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 112, in wrapper
scheduler | return f(*args, **kwargs)
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 401, in task_run
scheduler | _dag = get_dag(args.subdir, args.dag_id)
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 235, in get_dag
scheduler | raise AirflowException(
scheduler | airflow.exceptions.AirflowException: Dag 'gx_run_constraints_checkpoints' could not be found; either it does not exist or it failed to parse.
scheduler | [2023-07-12T11:09:46.732-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
scheduler | [2023-07-12T11:09:46.733-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
scheduler | [2023-07-12T11:09:46.850-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
scheduler | [2023-07-12T11:09:46.850-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
scheduler | [2023-07-12T11:09:47.472-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.fragments.3-0-0
scheduler | [2023-07-12T11:09:47.537-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.fragments.3-0-0
scheduler | [2023-07-12T11:09:47.547-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.fragments.3-0-0
scheduler | [2023-07-12T11:09:47.607-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.segments.2-5
scheduler | [2023-07-12T11:09:47.613-0500] {scheduler_job_runner.py:677} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='gx_run_constraints_checkpoints', task_id='REDACTED_constraints_fragments_3-0-0.load_templated_json', run_id='scheduled__2023-07-11T00:00:00+00:00', try_number=2, map_index=-1)
scheduler | [2023-07-12T11:09:47.618-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.segments.3-0-0
scheduler | [2023-07-12T11:09:47.621-0500] {scheduler_job_runner.py:713} INFO - TaskInstance Finished: dag_id=gx_run_constraints_checkpoints, task_id=REDACTED_constraints_fragments_3-0-0.load_templated_json, run_id=scheduled__2023-07-11T00:00:00+00:00, map_index=-1, run_start_date=2023-07-12 00:00:04.725685+00:00, run_end_date=2023-07-12 00:00:04.987151+00:00, run_duration=0.261466, state=queued, executor_state=failed, try_number=2, max_tries=6, job_id=197, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2023-07-12 16:09:43.009241+00:00, queued_by_job_id=206, pid=2120366
scheduler | [2023-07-12T11:09:47.621-0500] {scheduler_job_runner.py:761} ERROR - Executor reports task instance <TaskInstance: gx_run_constraints_checkpoints.REDACTED_constraints_fragments_3-0-0.load_templated_json scheduled__2023-07-11T00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
scheduler | [2023-07-12T11:09:47.625-0500] {taskinstance.py:1826} ERROR - Executor reports task instance <TaskInstance: gx_run_constraints_checkpoints.REDACTED_constraints_fragments_3-0-0.load_templated_json scheduled__2023-07-11T00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
scheduler | [2023-07-12T11:09:47.638-0500] {taskinstance.py:1345} INFO - Marking task as UP_FOR_RETRY. dag_id=gx_run_constraints_checkpoints, task_id=REDACTED_constraints_fragments_3-0-0.load_templated_json, execution_date=20230711T000000, start_date=20230712T000004, end_date=20230712T160947
To Reproduce Trigger many checkpoints that use the same great_expectations.yml at the same time.
Stack trace (same as above):
scheduler | [2023-07-12T11:09:46.711-0500] {dagbag.py:350} ERROR - Failed to import: airflow/dags/run-constraints-checkpoint.py
scheduler | Traceback (most recent call last):
scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/models/dagbag.py", line 346, in parse
scheduler | loader.exec_module(new_module)
scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
scheduler | File "airflow/dags/run-constraints-checkpoint.py", line 29, in <module>
scheduler | context = gx.get_context(context_root_dir=gx_context_root_dir)
scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 1917, in get_context
scheduler | file_context = _get_file_context(
scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 2047, in _get_file_context
scheduler | return FileDataContext(
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
scheduler | super().__init__(
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 69, in __init__
scheduler | super().__init__(runtime_environment=runtime_environment)
scheduler | File "gx_fork/great_expectations/great_expectations/core/usage_statistics/usage_statistics.py", line 260, in usage_statistics_wrapped_method
scheduler | result = func(*args, **kwargs)
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/abstract_data_context.py", line 284, in __init__
scheduler | self.fluent_config = self._load_fluent_config(self._config_provider)
scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 220, in _load_fluent_config
scheduler | gx_config = GxConfig.parse_yaml(path_to_fluent_yaml, _allow_empty=True)
scheduler | File "gx_fork/great_expectations/great_expectations/datasource/fluent/config.py", line 252, in parse_yaml
scheduler | config = cls(**loaded)
scheduler | File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
scheduler | pydantic.error_wrappers.ValidationError: 1 validation error for GxConfig
Expected behavior I'd expect Great Expectations to successfully trigger the checkpoints and not open great_expectations.yml in write mode, since no changes on that file are required for running a checkpoint.
Environment (please complete the following information): Operating System: Ubuntu 22.04 Great Expectations Version: 0.17.3 Datasource: BigQuery Cloud environment: GCP
Hmm...that's a weird error @Chr96er. Let me put this into our backlog and back to you.
This is also happening to me. On Cloud Composer, sometimes, possibly due to DAG crashes, since the GX YML was open in write mode by the checkpoint trigger, it even corrupts the great_expectations.yml
file and I have to load a fresh copy for it start running fine again. Any Checkpoint run should only read from the GX file and not open in write mode.
Environment
- Operating System:
Ubuntu 22.04 WSL
- Great Expectations Version:
0.18.3
- Datasource: BigQuery
- Cloud environment: GCP, Google Cloud Composer
This also happened to me, same error, when running with Prefect and native GE task - from prefect_great_expectations import run_checkpoint_validation
and trying to run validations on multiple files in parallel (standard Prefect feature)
Environment
Great Expectations Version: 0.18.8 Prefect GE version: 0.2.1 Datasource: csv Cloud environment: AWS, ECS, Prefect
same for me - no problem running checkpoints one by one but a lot of different errors when run in parallel
error examples:
ComposerError: expected a single document in the stream
in "/mounted-azure-file-share/gx/great_expectations.yml", line 39, column 1
but found another document
in "/mounted-azure-file-share/gx/great_expectations.yml", line 41, column 1
ParserError: while parsing a block mapping
in "/mounted-azure-file-share/gx/great_expectations.yml", line 27, column 7
expected <block end>, but found '<block mapping start>'
IndexError: string index out of range
AttributeError: 'StreamStartEvent' object has no attribute 'tag'
Environment: Azure Functions App (python 3.10) Great Expectations Version: tried 0.18.7, 0.18.6 and 0.18.1 GE project location - Azure File share mounted to Azure Functions App in read/write mode using CLI https://learn.microsoft.com/en-us/azure/azure-functions/scripts/functions-cli-mount-files-storage-linux Cloud environment: Azure Cloud OS: Debian Bullseye
PLEASE make GE multitask-enabled ;)
update: looks like setting PYTHON_THREADPOOL_THREAD_COUNT = 1 in function app configuration solves the issue of getting random errors when running great expectations in azure durable functions
Any update on the fix team ? This is an important feature, tests can't be in series always :) @HaebichanGX
I'd love to try to reproduce this to make sure it's addressed as part of the 1.0 release, and I think we're wondering whether there is something specific about the data source / asset combination involved here. Could someone share a more detailed reproduction / snippet of an offending great_expectations.yml
?
So, even if I just run great_expectations checkpoint list
, the command alters the great_expectations.yml
file. Changes include:
- Re-ordering and adding back of YAML keys: Before:
include_rendered_content:
globally: false
expectation_suite: false
expectation_validation_result: true
# notebooks:
# datasources: {}
After:
include_rendered_content:
globally: false
expectation_suite: false
expectation_validation_result: true
datasources: {}
notebooks:
- Formatting:
For example in
fluent_datasources
, from say
bq__marts_core__dim_subscriptions_slim:
type: query
order_by: []
batch_metadata: {}
query: >
SELECT *
FROM analytics_marts_core.dim_subscriptions
WHERE subscription_status NOT IN ("draft")
to ...
bq__marts_core__dim_subscriptions_slim:
type: query
order_by: []
batch_metadata: {}
query: "SELECT * FROM analytics_marts_core.dim_subscriptions WHERE subscription_status\
\ NOT IN (\"draft\")\n"