sparkly icon indicating copy to clipboard operation
sparkly copied to clipboard

Initialization of SparklySession when SparkContext is already exists

Open kosteev opened this issue 6 years ago • 6 comments

I have an issue with initialization of SparklySession instance.

The problem is that, by the time I want to create SparklySession instance SparkContext is already initialized by other service and just calling SparklySession constructor fails on the line spark_context = SparkContext() (due to trying create multiple SparkContext's).

To give more context on the issue here is the link https://docs.databricks.com/user-guide/jobs.html#use-the-shared-sparkcontext

Is there any advices on how to address this issue? I would like to be able using features of SparklySession along with SparklyTest.

I would probably think about having SparkContext().getOrCreate() inside of SparklySession constructor. Or as an optional spark_context parameter which if defined, SparkContext do not have to be initialized.

kosteev avatar Feb 07 '19 09:02 kosteev

hey @kosteev ! does it work for you to use SparkContext.get_or_create()?

https://github.com/tubular/sparkly/blob/master/sparkly/session.py#L50-L55 https://github.com/tubular/sparkly/blob/master/sparkly/session.py#L121

Of course, if the creation of SparkContext() is external to your control, we'd have to do something to support this case.

srstrickland avatar Feb 08 '19 19:02 srstrickland

hey! How is it going?

This actually doesn't work in my case because context is initialized before with SparkSession class and SparklySession tries to initialize it again (SparklySession._instantiated_session == None).

>>> from pyspark.sql import SparkSession
>>> from pyspark import SparkContext
>>> spark = SparkSession(SparkContext())

>>> from sparkly import SparklySession
>>> SparklySession.get_or_create()
Traceback (most recent call last):
  File "", line 1, in 
  File "/usr/lib/python3.6/site-packages/sparkly/session.py", line 132, in get_or_create
    cls()
  File "/usr/lib/python3.6/site-packages/sparkly/session.py", line 108, in __init__
    spark_context = SparkContext()
  File "/usr/lib/python3.6/site-packages/pyspark/context.py", line 115, in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/usr/lib/python3.6/site-packages/pyspark/context.py", line 314, in _ensure_initialized
    callsite.function, callsite.file, callsite.linenum))
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at :1 

kosteev avatar Feb 09 '19 09:02 kosteev

Hi @kosteev! Sorry for the late reply.

As a temporary workaround, you can do the following when you are in development or test mode:

from sparkly import SparklySession
from sparkly.instant_testing import InstantTesting

InstantTesting.activate()
InstantTesting.set_context(sc)  # where sc is your already initialized spark context

spark = SparklySession.get_or_create()

When it comes to a production setting, I think it's a little more complicated to allow a SparklySession to be initialized with a foreign SparkContext because then there's some ambiguity on how to handle conflicting options between SparklySession and the already initialized context - these could include config like spark.executor.memory that is set through SparklySession.options, packages that are made available to the cluster through SparklySession.packages, etc.

Is the usecase you had in mind more of the dev/testing variety, or you'd like to have this ability in production as well?

mantzouratos avatar Feb 22 '19 05:02 mantzouratos

Hey @mantzouratos! I would like to have it for production.

It would be great, if SparklySession is able to receive spark_context as an optional parameter in constructor. And if it is passed, just skips first part of actually initializing Spark Context. Something like this:

    def __init__(self, additional_options=None, spark_context=None):
        if spark_context is None:
            os.environ['PYSPARK_PYTHON'] = sys.executable
            # ...
            # ...
            # ...

        # Init HiveContext
        super(SparklySession, self).__init__(spark_context)
        self._setup_udfs()

        self.read_ext = SparklyReader(self)
        self.catalog_ext = SparklyCatalog(self)

        attach_writer_to_dataframe()
        SparklySession._instantiated_session = self

Not sure if you want to implement this.

Currently, I am just inheriting SparklySession class and overriding __init__ method. Looks a bit weird but satisfies my use case.

kosteev avatar Mar 02 '19 13:03 kosteev

@kosteev how about the next api?

spark = SparklySession.wrap(spark_context)

drudim avatar Jun 26 '19 13:06 drudim

@drudim works for me!

kosteev avatar Jun 27 '19 08:06 kosteev