great_expectations icon indicating copy to clipboard operation
great_expectations copied to clipboard

Inconsistent behavior when dealing with spark session

Open alssau opened this issue 2 years ago • 5 comments

Describe the bug The way the get_or_create_spark_application function works is different when running with different spark versions, so spark 2.4.4 and 3.0.1

Case 1 - Running with 2.4.4 The function calls get_or_create_spark_session which will find the existing session, and it will later change the spark.app.name to "default_great_expectations_spark_application". Then it calls spark_restart_required and since now the app has this specific name, the function returns False, so it won't stop the active spark application.

This behavior was OK in my application. I didn't even notice this was happening.

Case 2 - Running with 3.0.1 The function calls get_or_create_spark_session which will find the existing session, but it won't change the spark.app.name to "default_great_expectations_spark_application". So when it calls spark_restart_required the app name isn't what's expected, and the function returns True, causing GE to stop the active spark application. It will later log "Stopping existing spark context to reconfigure" and create a new session.

This behavior isn't OK in my application because it won't really use the same spark configuration than the original session. I also don't see the point of stopping my previously configured application just because it doesn't have a certain name. Also, I couldn't figure out a way to use the force_reuse_spark_context argument from get_or_create_spark_application from a usual entry point such as DataContext.

To Reproduce Steps to reproduce the behavior:

  1. Run a validation on a spark dataframe in spark 2.4.4, there won't be any logs about "Stopping existing spark context to reconfigure"
  2. Run a validation on a spark dataframe in spark 3.0.1, there will be a log on stdout about "Stopping existing spark context to reconfigure"

Expected behavior GE shouldn't just stop my spark application. It should get the existing session and use it as is.

Environment (please complete the following information):

  • Operating System: MacOS Catalina 10.15.7
  • Great Expectations Version: 0.13.26

Additional context Add any other context about the problem here.

alssau avatar Nov 10 '21 20:11 alssau

Thanks for opening this issue, @alssau! We will review and be in touch.

talagluck avatar Nov 10 '21 20:11 talagluck

Hey @alssau! Thanks for opening this and providing all this context around the issue.

Would this be something you might potentially be interested in working on? You seem to have a lot of the requisite understanding of the issue at hand so any help here would be greatly appreciated. If not, no worries! We can review and determine the next steps forward.

cdkini avatar Nov 19 '21 21:11 cdkini

I have the same issue. Adding force_reuse_spark_context or spark_config not helping. Do somebody have updates on this problem?

{
"class_name": "SparkDFDatasource",
"module_name": "great_expectations.datasource",
"data_asset_type": {
    "module_name": "great_expectations.dataset",
    "class_name": "SparkDFDataset",
},
"force_reuse_spark_context": True,
#"spark_config": dict(SparkSession.builder.getOrCreate().sparkContext.getConf().getAll())
}

Faith-des avatar Jan 31 '22 18:01 Faith-des

@cdkini any updates on this? I think this is a blocker for using the spark data source with spark 3+?

Or is there a known workaround?

darkcofy avatar May 05 '22 07:05 darkcofy

Facing this issue too while running Spark3 via AWS Glue3.0. Is there any workaround for this as of now?

pwrstar avatar Sep 15 '22 22:09 pwrstar

I also face this issue when running in Spark3. Has resolving this issue been picked up?

robertmacyiii avatar Jan 25 '23 17:01 robertmacyiii

It seems I am faced with the same issues. In my case I got an error as below. I think it is caused by different spark session.

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/execution_engine.py", line 665, in _process_direct_and_bundled_metric_computation_configurations
    ] = self.resolve_metric_bundle(
  File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/sparkdf_execution_engine.py", line 751, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 692, in collect
    with SCCallSiteSync(self._sc) as css:
  File "/opt/spark/python/pyspark/traceback_utils.py", line 72, in __enter__
    self._context._jsc.setCallSite(self._call_site)
AttributeError: 'NoneType' object has no attribute 'setCallSite'

tiruka avatar Feb 13 '23 10:02 tiruka

It seems I am faced with the same issues. In my case I got an error as below. I think it is caused by different spark session.

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/execution_engine.py", line 665, in _process_direct_and_bundled_metric_computation_configurations
    ] = self.resolve_metric_bundle(
  File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/sparkdf_execution_engine.py", line 751, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 692, in collect
    with SCCallSiteSync(self._sc) as css:
  File "/opt/spark/python/pyspark/traceback_utils.py", line 72, in __enter__
    self._context._jsc.setCallSite(self._call_site)
AttributeError: 'NoneType' object has no attribute 'setCallSite'

same

gaoyibin0001 avatar Mar 09 '23 10:03 gaoyibin0001

Adding the "force_reuse_spark_context" under the "execution_engine" key worked in my case:

{
  "class_name": "Datasource",
  "module_name": "great_expectations.datasource",
  "execution_engine": {
    "module_name": "great_expectations.execution_engine",
    "class_name": "SparkDFExecutionEngine",
    "force_reuse_spark_context": True
  },

IOANNIS1234 avatar Mar 14 '23 19:03 IOANNIS1234

Adding the "force_reuse_spark_context" under the "execution_engine" key worked in my case:

{
  "class_name": "Datasource",
  "module_name": "great_expectations.datasource",
  "execution_engine": {
    "module_name": "great_expectations.execution_engine",
    "class_name": "SparkDFExecutionEngine",
    "force_reuse_spark_context": True
  },

Looks like the force_reuse_spark_context param now defaults to True. But I still got the error. What worked for me was passing the exisitng spark_config dict as parameter to DatasourceConfig and not to as key:value for execution_engine.

leovcunha avatar Apr 26 '23 09:04 leovcunha

@leovcunha Thank you very much for reporting this issue and providing the configuration snippet! I would like to confirm -- is your example working only when you specify "force_reuse_spark_context: True as in the above -- and if you omit force_reuse_spark_context from this configuration, it does not work? Could you please paste the full Exception stack trace along with the configuration for the situation that does not work? Thank you so much again in advance.

alexsherstinsky avatar Apr 26 '23 15:04 alexsherstinsky

@leovcunha Thank you very much for reporting this issue and providing the configuration snippet! I would like to confirm -- is your example working only when you specify "force_reuse_spark_context: True as in the above -- and if you omit force_reuse_spark_context from this configuration, it does not work? Could you please paste the full Exception stack trace along with the configuration for the situation that does not work? Thank you so much again in advance.

NP! The snippet sent was quoted from the previous comment before mine. The one I used looks more like below: I didn't have to specify force_reuse_spark_context: True since it now defaults to true as you know :) #7444 I also didn't get an exception, I was just getting a new SparkSession from GE instead of it getting the existing session.

{
  "class_name": "Datasource",
  "module_name": "great_expectations.datasource",
  "execution_engine": {
    "module_name": "great_expectations.execution_engine",
    "class_name": "SparkDFExecutionEngine"
   }
  "spark_config": dict(spark.builder.getOrCreate().sparkContext.getConf().getAll())
  },

This worked.

leovcunha avatar Apr 27 '23 13:04 leovcunha

Hello, I'd like to get confirmation that this issue is still persisting. I've tried to replicate this error with no success. We launched a new API around datasources starting in version 0.16 which includes the fix mentioned above.

josectobar avatar Jul 11 '23 00:07 josectobar

Hey @alssau ; closing this out for now. If this is still an issue on recent versions of GX, please feel free to re-open!

austiezr avatar Jul 24 '23 15:07 austiezr