great_expectations icon indicating copy to clipboard operation
great_expectations copied to clipboard

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext

Open olemara opened this issue 3 years ago • 3 comments

Describe the bug I am configuring a SimpleCheckpoint that should be used in combination with a Spark dataframe. I am bootstrapping this with the following code.

def getDataContext(store_root_dir: str) -> BaseDataContext:
    datasource_config = DatasourceConfig(
        class_name="Datasource",
        execution_engine={"class_name": "SparkDFExecutionEngine"},
        data_connectors={
            data_connector_name: {
                "module_name": "great_expectations.datasource.data_connector",
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": [
                    "timestamp",
                ]
            }
        }
    )
    data_context_config = DataContextConfig(
        datasources={data_source_name: datasource_config},
        store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=store_root_dir),
    )
    return BaseDataContext(project_config=data_context_config)
def getRuntimeBatchRequest(dataframe, name_of_asset: str) -> RuntimeBatchRequest:
    return RuntimeBatchRequest(
        datasource_name=data_source_name,
        data_connector_name=data_connector_name,
        data_asset_name=name_of_asset,
        runtime_parameters={
        "batch_data": dataframe
        },
        batch_identifiers={
            "timestamp": f"{datetime.now()}"
        }
    )
def getTestSuite(suite_name: str, context: BaseDataContext, overwrite = True)-> ExpectationSuite:
    return context.create_expectation_suite(
        expectation_suite_name=suite_name,
        overwrite_existing=overwrite
        )
def createCheckpoint(suite: ExpectationSuite, batch: RuntimeBatchRequest, context: BaseDataContext) -> SimpleCheckpoint:
    checkpoint_config = {
        "class_name": "SimpleCheckpoint",
        "validations": [
            {
                "batch_request": batch,
                "expectation_suite_name": suite.expectation_suite_name
            }
        ]
    }
    return SimpleCheckpoint(
        f"_tmp_checkpoint_{suite.expectation_suite_name}",
        context,
        **checkpoint_config
    )

To Reproduce Steps to reproduce the behavior:

Use code above.

simple_checkpoint = getCheckpoint(df, name_of_suite="suite_name")
result = simple_checkpoint.run()

then try to write the dataframe to disk.

I am getting the following error message:

Py4JJavaError: An error occurred while calling o312.save.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

Expected behavior The dataframe and its content is written to disk.

Environment (please complete the following information):

  • Operating System: Ubuntu
  • Great Expectations Version: great-expectations = "^0.13.41"

Additional context Add any other context about the problem here.

olemara avatar Nov 09 '21 06:11 olemara

Hey @olemara! Thanks so much for opening this issue.

I'll bring this up to our engineering team to determine next steps. Appreciate your patience 🙇🏽

cdkini avatar Nov 19 '21 17:11 cdkini

@olemara would you mind proving me with the full script you're using? I want to ensure the issue here is on the GE side of things since writing a DataFrame to disk is not necessarily a GE-specific feature

cdkini avatar Nov 19 '21 20:11 cdkini

Any updates on this issue . facing the same issue

zhaokaikai666 avatar Feb 18 '22 07:02 zhaokaikai666

Closing as stale

talagluck avatar Mar 08 '23 13:03 talagluck