great_expectations icon indicating copy to clipboard operation
great_expectations copied to clipboard

Can't run checkpoints in parallel because great_expectations.yml is opened in write mode

Open Chr96er opened this issue 1 year ago • 6 comments

Describe the bug (I have looked into the code to the best of my abilities without being able to debug because this happens when multiple threads run at the same time) When multiple checkpoints are triggered simultaneously it sometimes happens that an exception is caused. In my case 5 checkpoints are triggered from Airflow and most of the time 2-4 of them fail with the exception below. Setting retries to 5 and the retry delay to 3 solves my particular problem, although the amount of checkpoints is likely to grow to 50-100 soon.

As far as I can tell this happens because Great Expectations opens great_expectations.yml in write mode in line 159 of file_data_context.py: with open(config_filepath, "w") as outfile:

This was completely unexpected to me, since I don't think running a checkpoint requires any write operation on that file.

Airflow logs:


 scheduler | [2023-07-12T11:09:43.252-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
 scheduler | [2023-07-12T11:09:43.251-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
 scheduler | [2023-07-12T11:09:43.252-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
 scheduler | [2023-07-12T11:09:43.260-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
 scheduler | [2023-07-12T11:09:43.260-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
 scheduler | [2023-07-12T11:09:46.414-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
 scheduler | [2023-07-12T11:09:46.426-0500] {config.py:185} INFO - Loading 'datasources' ->
 scheduler | [{'assets': [...],
 scheduler | 'REDACTED
 scheduler | 'create_temp_table': True,
 scheduler | 'name': 'REDACTED',
 scheduler | 'type': 'sql'}]
 scheduler | [2023-07-12T11:09:46.483-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
 scheduler | [2023-07-12T11:09:46.488-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
 scheduler | [2023-07-12T11:09:46.499-0500] {config.py:185} INFO - Loading 'datasources' ->
 scheduler | [{'assets': [...],
 scheduler | 'REDACTED
 scheduler | 'create_temp_table': True,
 scheduler | 'name': 'REDACTED',
 scheduler | 'type': 'sql'}]
 scheduler | [2023-07-12T11:09:46.501-0500] {config.py:185} INFO - Loading 'datasources' ->
 scheduler | [{'assets': [...],
 scheduler | 'REDACTED
 scheduler | 'create_temp_table': True,
 scheduler | 'name': 'REDACTED',
 scheduler | 'type': 'sql'}]
 scheduler | [2023-07-12T11:09:46.643-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
 scheduler | [2023-07-12T11:09:46.649-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
 scheduler | [2023-07-12T11:09:46.649-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
 scheduler | [2023-07-12T11:09:46.654-0500] {config.py:185} INFO - Loading 'datasources' ->
 scheduler | [{'assets': [...],
 scheduler | 'REDACTED
 scheduler | 'create_temp_table': True,
 scheduler | 'name': 'REDACTED',
 scheduler | 'type': 'sql'}]
 scheduler | [2023-07-12T11:09:46.699-0500] {file_data_context.py:214} INFO - FileDataContext loading fluent config
 scheduler | [2023-07-12T11:09:46.707-0500] {config.py:185} INFO - Loading 'datasources' ->
 scheduler | [{'assets': [...], 'name': 'REDACTED', 'type': 'sql'}]
 scheduler | [2023-07-12T11:09:46.711-0500] {dagbag.py:350} ERROR - Failed to import: airflow/dags/run-constraints-checkpoint.py
 scheduler | Traceback (most recent call last):
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/models/dagbag.py", line 346, in parse
 scheduler | loader.exec_module(new_module)
 scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
 scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
 scheduler | File "airflow/dags/run-constraints-checkpoint.py", line 29, in <module>
 scheduler | context = gx.get_context(context_root_dir=gx_context_root_dir)
 scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 1917, in get_context
 scheduler | file_context = _get_file_context(
 scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 2047, in _get_file_context
 scheduler | return FileDataContext(
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
 scheduler | super().__init__(
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 69, in __init__
 scheduler | super().__init__(runtime_environment=runtime_environment)
 scheduler | File "gx_fork/great_expectations/great_expectations/core/usage_statistics/usage_statistics.py", line 260, in usage_statistics_wrapped_method
 scheduler | result = func(*args, **kwargs)
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/abstract_data_context.py", line 284, in __init__
 scheduler | self.fluent_config = self._load_fluent_config(self._config_provider)
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 220, in _load_fluent_config
 scheduler | gx_config = GxConfig.parse_yaml(path_to_fluent_yaml, _allow_empty=True)
 scheduler | File "gx_fork/great_expectations/great_expectations/datasource/fluent/config.py", line 252, in parse_yaml
 scheduler | config = cls(**loaded)
 scheduler | File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
 scheduler | pydantic.error_wrappers.ValidationError: 1 validation error for GxConfig
 scheduler | fluent_datasources -> REDACTED
 scheduler | field required (type=value_error.missing)
 scheduler | [2023-07-12T11:09:46.717-0500] {cli.py:232} WARNING - Dag 'gx_run_constraints_checkpoints' not found in path airflow/dags/run-constraints-checkpoint.py; trying path airflow/dags/run-constraints-checkpoint.py
 scheduler | [2023-07-12T11:09:46.717-0500] {dagbag.py:541} INFO - Filling up the DagBag from airflow/dags/run-constraints-checkpoint.py
 scheduler | [2023-07-12T11:09:46.718-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
 scheduler | [2023-07-12T11:09:46.718-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
 scheduler | [2023-07-12T11:09:46.720-0500] {dagbag.py:350} ERROR - Failed to import: airflow/dags/run-constraints-checkpoint.py
 scheduler | Traceback (most recent call last):
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/models/dagbag.py", line 346, in parse
 scheduler | loader.exec_module(new_module)
 scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
 scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
 scheduler | File "airflow/dags/run-constraints-checkpoint.py", line 29, in <module>
 scheduler | context = gx.get_context(context_root_dir=gx_context_root_dir)
 scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 1917, in get_context
 scheduler | file_context = _get_file_context(
 scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 2047, in _get_file_context
 scheduler | return FileDataContext(
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 61, in __init__
 scheduler | self._scaffold_project()
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 86, in _scaffold_project
 scheduler | if self.is_project_scaffolded(self._context_root_directory):
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 480, in is_project_scaffolded
 scheduler | and cls.config_variables_yml_exist(ge_dir)
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 240, in config_variables_yml_exist
 scheduler | config_var_path = config.get("config_variables_file_path")
 scheduler | AttributeError: 'NoneType' object has no attribute 'get'
 scheduler | [2023-07-12T11:09:46.721-0500] {local_executor.py:135} ERROR - Failed to execute task Dag 'gx_run_constraints_checkpoints' could not be found; either it does not exist or it failed to parse..
 scheduler | Traceback (most recent call last):
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 131, in _execute_work_in_fork
 scheduler | args.func(args)
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/cli/cli_config.py", line 52, in command
 scheduler | return func(*args, **kwargs)
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 112, in wrapper
 scheduler | return f(*args, **kwargs)
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 401, in task_run
 scheduler | _dag = get_dag(args.subdir, args.dag_id)
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 235, in get_dag
 scheduler | raise AirflowException(
 scheduler | airflow.exceptions.AirflowException: Dag 'gx_run_constraints_checkpoints' could not be found; either it does not exist or it failed to parse.
 scheduler | [2023-07-12T11:09:46.732-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
 scheduler | [2023-07-12T11:09:46.733-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
 scheduler | [2023-07-12T11:09:46.850-0500] {abstract_data_context.py:5891} INFO - Loaded 'REDACTED' from fluent config
 scheduler | [2023-07-12T11:09:46.850-0500] {file_data_context.py:165} INFO - Saving 1 Fluent Datasources to data-testing/great_expectations/great_expectations.yml
 scheduler | [2023-07-12T11:09:47.472-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.fragments.3-0-0
 scheduler | [2023-07-12T11:09:47.537-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.fragments.3-0-0
 scheduler | [2023-07-12T11:09:47.547-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.fragments.3-0-0
 scheduler | [2023-07-12T11:09:47.607-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.segments.2-5
 scheduler | [2023-07-12T11:09:47.613-0500] {scheduler_job_runner.py:677} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='gx_run_constraints_checkpoints', task_id='REDACTED_constraints_fragments_3-0-0.load_templated_json', run_id='scheduled__2023-07-11T00:00:00+00:00', try_number=2, map_index=-1)
 scheduler | [2023-07-12T11:09:47.618-0500] {run-constraints-checkpoint.py:51} INFO - REDACTED.constraints.segments.3-0-0
 scheduler | [2023-07-12T11:09:47.621-0500] {scheduler_job_runner.py:713} INFO - TaskInstance Finished: dag_id=gx_run_constraints_checkpoints, task_id=REDACTED_constraints_fragments_3-0-0.load_templated_json, run_id=scheduled__2023-07-11T00:00:00+00:00, map_index=-1, run_start_date=2023-07-12 00:00:04.725685+00:00, run_end_date=2023-07-12 00:00:04.987151+00:00, run_duration=0.261466, state=queued, executor_state=failed, try_number=2, max_tries=6, job_id=197, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2023-07-12 16:09:43.009241+00:00, queued_by_job_id=206, pid=2120366
 scheduler | [2023-07-12T11:09:47.621-0500] {scheduler_job_runner.py:761} ERROR - Executor reports task instance <TaskInstance: gx_run_constraints_checkpoints.REDACTED_constraints_fragments_3-0-0.load_templated_json scheduled__2023-07-11T00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
 scheduler | [2023-07-12T11:09:47.625-0500] {taskinstance.py:1826} ERROR - Executor reports task instance <TaskInstance: gx_run_constraints_checkpoints.REDACTED_constraints_fragments_3-0-0.load_templated_json scheduled__2023-07-11T00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
 scheduler | [2023-07-12T11:09:47.638-0500] {taskinstance.py:1345} INFO - Marking task as UP_FOR_RETRY. dag_id=gx_run_constraints_checkpoints, task_id=REDACTED_constraints_fragments_3-0-0.load_templated_json, execution_date=20230711T000000, start_date=20230712T000004, end_date=20230712T160947

To Reproduce Trigger many checkpoints that use the same great_expectations.yml at the same time.

Stack trace (same as above):

 scheduler | [2023-07-12T11:09:46.711-0500] {dagbag.py:350} ERROR - Failed to import: airflow/dags/run-constraints-checkpoint.py
 scheduler | Traceback (most recent call last):
 scheduler | File "data-testing/venv/lib/python3.9/site-packages/airflow/models/dagbag.py", line 346, in parse
 scheduler | loader.exec_module(new_module)
 scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
 scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
 scheduler | File "airflow/dags/run-constraints-checkpoint.py", line 29, in <module>
 scheduler | context = gx.get_context(context_root_dir=gx_context_root_dir)
 scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 1917, in get_context
 scheduler | file_context = _get_file_context(
 scheduler | File "gx_fork/great_expectations/great_expectations/util.py", line 2047, in _get_file_context
 scheduler | return FileDataContext(
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
 scheduler | super().__init__(
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/serializable_data_context.py", line 69, in __init__
 scheduler | super().__init__(runtime_environment=runtime_environment)
 scheduler | File "gx_fork/great_expectations/great_expectations/core/usage_statistics/usage_statistics.py", line 260, in usage_statistics_wrapped_method
 scheduler | result = func(*args, **kwargs)
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/abstract_data_context.py", line 284, in __init__
 scheduler | self.fluent_config = self._load_fluent_config(self._config_provider)
 scheduler | File "gx_fork/great_expectations/great_expectations/data_context/data_context/file_data_context.py", line 220, in _load_fluent_config
 scheduler | gx_config = GxConfig.parse_yaml(path_to_fluent_yaml, _allow_empty=True)
 scheduler | File "gx_fork/great_expectations/great_expectations/datasource/fluent/config.py", line 252, in parse_yaml
 scheduler | config = cls(**loaded)
 scheduler | File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
 scheduler | pydantic.error_wrappers.ValidationError: 1 validation error for GxConfig

Expected behavior I'd expect Great Expectations to successfully trigger the checkpoints and not open great_expectations.yml in write mode, since no changes on that file are required for running a checkpoint.

Environment (please complete the following information): Operating System: Ubuntu 22.04 Great Expectations Version: 0.17.3 Datasource: BigQuery Cloud environment: GCP

Chr96er avatar Jul 12 '23 16:07 Chr96er

Hmm...that's a weird error @Chr96er. Let me put this into our backlog and back to you.

HaebichanGX avatar Jul 13 '23 13:07 HaebichanGX

This is also happening to me. On Cloud Composer, sometimes, possibly due to DAG crashes, since the GX YML was open in write mode by the checkpoint trigger, it even corrupts the great_expectations.yml file and I have to load a fresh copy for it start running fine again. Any Checkpoint run should only read from the GX file and not open in write mode.

Environment

  • Operating System: Ubuntu 22.04 WSL
  • Great Expectations Version: 0.18.3
  • Datasource: BigQuery
  • Cloud environment: GCP, Google Cloud Composer

TheEverlastingBish avatar Dec 07 '23 18:12 TheEverlastingBish

This also happened to me, same error, when running with Prefect and native GE task - from prefect_great_expectations import run_checkpoint_validation and trying to run validations on multiple files in parallel (standard Prefect feature)

Environment

Great Expectations Version: 0.18.8 Prefect GE version: 0.2.1 Datasource: csv Cloud environment: AWS, ECS, Prefect

mp-nujsuper avatar Jan 28 '24 23:01 mp-nujsuper

same for me - no problem running checkpoints one by one but a lot of different errors when run in parallel

error examples:

 ComposerError: expected a single document in the stream
  in "/mounted-azure-file-share/gx/great_expectations.yml", line 39, column 1
but found another document
  in "/mounted-azure-file-share/gx/great_expectations.yml", line 41, column 1  

ParserError: while parsing a block mapping
  in "/mounted-azure-file-share/gx/great_expectations.yml", line 27, column 7
expected <block end>, but found '<block mapping start>'

IndexError: string index out of range 

AttributeError: 'StreamStartEvent' object has no attribute 'tag' 

Environment: Azure Functions App (python 3.10) Great Expectations Version: tried 0.18.7, 0.18.6 and 0.18.1 GE project location - Azure File share mounted to Azure Functions App in read/write mode using CLI https://learn.microsoft.com/en-us/azure/azure-functions/scripts/functions-cli-mount-files-storage-linux Cloud environment: Azure Cloud OS: Debian Bullseye

PLEASE make GE multitask-enabled ;)

kujaska avatar Feb 20 '24 16:02 kujaska

update: looks like setting PYTHON_THREADPOOL_THREAD_COUNT = 1 in function app configuration solves the issue of getting random errors when running great expectations in azure durable functions

kujaska avatar Feb 29 '24 06:02 kujaska

Any update on the fix team ? This is an important feature, tests can't be in series always :) @HaebichanGX

apexcoder7 avatar Apr 08 '24 17:04 apexcoder7

I'd love to try to reproduce this to make sure it's addressed as part of the 1.0 release, and I think we're wondering whether there is something specific about the data source / asset combination involved here. Could someone share a more detailed reproduction / snippet of an offending great_expectations.yml?

jcampbell avatar Jun 12 '24 18:06 jcampbell

So, even if I just run great_expectations checkpoint list, the command alters the great_expectations.yml file. Changes include:

  • Re-ordering and adding back of YAML keys: Before:
include_rendered_content:
  globally: false
  expectation_suite: false
  expectation_validation_result: true
# notebooks:
# datasources: {}

After:

include_rendered_content:
  globally: false
  expectation_suite: false
  expectation_validation_result: true
datasources: {}
notebooks:
  • Formatting: For example in fluent_datasources, from say
      bq__marts_core__dim_subscriptions_slim:
        type: query
        order_by: []
        batch_metadata: {}
        query: >
          SELECT * 
          FROM analytics_marts_core.dim_subscriptions 
          WHERE subscription_status NOT IN ("draft")

to ...

      bq__marts_core__dim_subscriptions_slim:
        type: query
        order_by: []
        batch_metadata: {}
        query: "SELECT *  FROM analytics_marts_core.dim_subscriptions WHERE subscription_status\
          \ NOT IN (\"draft\")\n"

TheEverlastingBish avatar Jun 14 '24 15:06 TheEverlastingBish