great_expectations
great_expectations copied to clipboard
Inconsistent behavior when dealing with spark session
Describe the bug
The way the get_or_create_spark_application
function works is different when running with different spark versions, so spark 2.4.4 and 3.0.1
Case 1 - Running with 2.4.4
The function calls get_or_create_spark_session
which will find the existing session, and it will later change the spark.app.name to "default_great_expectations_spark_application". Then it calls spark_restart_required
and since now the app has this specific name, the function returns False, so it won't stop the active spark application.
This behavior was OK in my application. I didn't even notice this was happening.
Case 2 - Running with 3.0.1
The function calls get_or_create_spark_session
which will find the existing session, but it won't change the spark.app.name to "default_great_expectations_spark_application". So when it calls spark_restart_required
the app name isn't what's expected, and the function returns True, causing GE to stop the active spark application. It will later log "Stopping existing spark context to reconfigure" and create a new session.
This behavior isn't OK in my application because it won't really use the same spark configuration than the original session. I also don't see the point of stopping my previously configured application just because it doesn't have a certain name. Also, I couldn't figure out a way to use the force_reuse_spark_context
argument from get_or_create_spark_application
from a usual entry point such as DataContext.
To Reproduce Steps to reproduce the behavior:
- Run a validation on a spark dataframe in spark 2.4.4, there won't be any logs about "Stopping existing spark context to reconfigure"
- Run a validation on a spark dataframe in spark 3.0.1, there will be a log on stdout about "Stopping existing spark context to reconfigure"
Expected behavior GE shouldn't just stop my spark application. It should get the existing session and use it as is.
Environment (please complete the following information):
- Operating System: MacOS Catalina 10.15.7
- Great Expectations Version: 0.13.26
Additional context Add any other context about the problem here.
Thanks for opening this issue, @alssau! We will review and be in touch.
Hey @alssau! Thanks for opening this and providing all this context around the issue.
Would this be something you might potentially be interested in working on? You seem to have a lot of the requisite understanding of the issue at hand so any help here would be greatly appreciated. If not, no worries! We can review and determine the next steps forward.
I have the same issue. Adding force_reuse_spark_context or spark_config not helping. Do somebody have updates on this problem?
{
"class_name": "SparkDFDatasource",
"module_name": "great_expectations.datasource",
"data_asset_type": {
"module_name": "great_expectations.dataset",
"class_name": "SparkDFDataset",
},
"force_reuse_spark_context": True,
#"spark_config": dict(SparkSession.builder.getOrCreate().sparkContext.getConf().getAll())
}
@cdkini any updates on this? I think this is a blocker for using the spark data source with spark 3+?
Or is there a known workaround?
Facing this issue too while running Spark3 via AWS Glue3.0. Is there any workaround for this as of now?
I also face this issue when running in Spark3. Has resolving this issue been picked up?
It seems I am faced with the same issues. In my case I got an error as below. I think it is caused by different spark session.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/execution_engine.py", line 665, in _process_direct_and_bundled_metric_computation_configurations
] = self.resolve_metric_bundle(
File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/sparkdf_execution_engine.py", line 751, in resolve_metric_bundle
res = df.agg(*aggregate["column_aggregates"]).collect()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 692, in collect
with SCCallSiteSync(self._sc) as css:
File "/opt/spark/python/pyspark/traceback_utils.py", line 72, in __enter__
self._context._jsc.setCallSite(self._call_site)
AttributeError: 'NoneType' object has no attribute 'setCallSite'
It seems I am faced with the same issues. In my case I got an error as below. I think it is caused by different spark session.
Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/execution_engine.py", line 665, in _process_direct_and_bundled_metric_computation_configurations ] = self.resolve_metric_bundle( File "/usr/local/lib/python3.8/dist-packages/great_expectations/execution_engine/sparkdf_execution_engine.py", line 751, in resolve_metric_bundle res = df.agg(*aggregate["column_aggregates"]).collect() File "/opt/spark/python/pyspark/sql/dataframe.py", line 692, in collect with SCCallSiteSync(self._sc) as css: File "/opt/spark/python/pyspark/traceback_utils.py", line 72, in __enter__ self._context._jsc.setCallSite(self._call_site) AttributeError: 'NoneType' object has no attribute 'setCallSite'
same
Adding the "force_reuse_spark_context" under the "execution_engine" key worked in my case:
{
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SparkDFExecutionEngine",
"force_reuse_spark_context": True
},
Adding the "force_reuse_spark_context" under the "execution_engine" key worked in my case:
{ "class_name": "Datasource", "module_name": "great_expectations.datasource", "execution_engine": { "module_name": "great_expectations.execution_engine", "class_name": "SparkDFExecutionEngine", "force_reuse_spark_context": True },
Looks like the force_reuse_spark_context
param now defaults to True
. But I still got the error. What worked for me was passing the exisitng spark_config
dict as parameter to DatasourceConfig
and not to as key:value for execution_engine
.
@leovcunha Thank you very much for reporting this issue and providing the configuration snippet! I would like to confirm -- is your example working only when you specify "force_reuse_spark_context: True
as in the above -- and if you omit force_reuse_spark_context
from this configuration, it does not work? Could you please paste the full Exception stack trace along with the configuration for the situation that does not work? Thank you so much again in advance.
@leovcunha Thank you very much for reporting this issue and providing the configuration snippet! I would like to confirm -- is your example working only when you specify
"force_reuse_spark_context: True
as in the above -- and if you omitforce_reuse_spark_context
from this configuration, it does not work? Could you please paste the full Exception stack trace along with the configuration for the situation that does not work? Thank you so much again in advance.
NP! The snippet sent was quoted from the previous comment before mine. The one I used looks more like below:
I didn't have to specify force_reuse_spark_context: True
since it now defaults to true as you know :) #7444
I also didn't get an exception, I was just getting a new SparkSession from GE instead of it getting the existing session.
{
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SparkDFExecutionEngine"
}
"spark_config": dict(spark.builder.getOrCreate().sparkContext.getConf().getAll())
},
This worked.
Hello, I'd like to get confirmation that this issue is still persisting. I've tried to replicate this error with no success. We launched a new API around datasources starting in version 0.16 which includes the fix mentioned above.
Hey @alssau ; closing this out for now. If this is still an issue on recent versions of GX, please feel free to re-open!