Initialization of SparklySession when SparkContext is already exists
I have an issue with initialization of SparklySession instance.
The problem is that, by the time I want to create SparklySession instance SparkContext is already initialized by other service and just calling SparklySession constructor fails on the line spark_context = SparkContext() (due to trying create multiple SparkContext's).
To give more context on the issue here is the link https://docs.databricks.com/user-guide/jobs.html#use-the-shared-sparkcontext
Is there any advices on how to address this issue? I would like to be able using features of SparklySession along with SparklyTest.
I would probably think about having SparkContext().getOrCreate() inside of SparklySession constructor. Or as an optional spark_context parameter which if defined, SparkContext do not have to be initialized.
hey @kosteev ! does it work for you to use SparkContext.get_or_create()?
https://github.com/tubular/sparkly/blob/master/sparkly/session.py#L50-L55 https://github.com/tubular/sparkly/blob/master/sparkly/session.py#L121
Of course, if the creation of SparkContext() is external to your control, we'd have to do something to support this case.
hey! How is it going?
This actually doesn't work in my case because context is initialized before with SparkSession class and SparklySession tries to initialize it again (SparklySession._instantiated_session == None).
>>> from pyspark.sql import SparkSession >>> from pyspark import SparkContext >>> spark = SparkSession(SparkContext()) >>> from sparkly import SparklySession >>> SparklySession.get_or_create() Traceback (most recent call last): File "", line 1, in File "/usr/lib/python3.6/site-packages/sparkly/session.py", line 132, in get_or_create cls() File "/usr/lib/python3.6/site-packages/sparkly/session.py", line 108, in __init__ spark_context = SparkContext() File "/usr/lib/python3.6/site-packages/pyspark/context.py", line 115, in __init__ SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) File "/usr/lib/python3.6/site-packages/pyspark/context.py", line 314, in _ensure_initialized callsite.function, callsite.file, callsite.linenum)) ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at :1
Hi @kosteev! Sorry for the late reply.
As a temporary workaround, you can do the following when you are in development or test mode:
from sparkly import SparklySession
from sparkly.instant_testing import InstantTesting
InstantTesting.activate()
InstantTesting.set_context(sc) # where sc is your already initialized spark context
spark = SparklySession.get_or_create()
When it comes to a production setting, I think it's a little more complicated to allow a SparklySession to be initialized with a foreign SparkContext because then there's some ambiguity on how to handle conflicting options between SparklySession and the already initialized context - these could include config like spark.executor.memory that is set through SparklySession.options, packages that are made available to the cluster through SparklySession.packages, etc.
Is the usecase you had in mind more of the dev/testing variety, or you'd like to have this ability in production as well?
Hey @mantzouratos! I would like to have it for production.
It would be great, if SparklySession is able to receive spark_context as an optional parameter in constructor. And if it is passed, just skips first part of actually initializing Spark Context. Something like this:
def __init__(self, additional_options=None, spark_context=None):
if spark_context is None:
os.environ['PYSPARK_PYTHON'] = sys.executable
# ...
# ...
# ...
# Init HiveContext
super(SparklySession, self).__init__(spark_context)
self._setup_udfs()
self.read_ext = SparklyReader(self)
self.catalog_ext = SparklyCatalog(self)
attach_writer_to_dataframe()
SparklySession._instantiated_session = self
Not sure if you want to implement this.
Currently, I am just inheriting SparklySession class and overriding __init__ method. Looks a bit weird but satisfies my use case.
@kosteev how about the next api?
spark = SparklySession.wrap(spark_context)
@drudim works for me!