spark-celery icon indicating copy to clipboard operation
spark-celery copied to clipboard

AttributeError: 'SparkCeleryApp' object has no attribute 'sc'

Open NeelBhowmik opened this issue 6 years ago • 6 comments

Dear all, I try to use spark-celery in my application, when i do like the code below, I get an error "AttributeError: 'SparkCeleryApp' object has no attribute 'sc'". Could you please give any suggestion - where could be the possible problem? Thank you. Here is the code.

` from spark_celery import SparkCeleryApp, SparkCeleryTask, RDD_builder, main

backendURL = os.getenv("BACKEND_URL", "redis://redis:6379/0") brokerURL = os.getenv("BROKER_URL", "rabbitmq") brokerUser = os.getenv("BROKER_USER", "guest") brokerPassw = os.getenv("BROKER_PASSW", "guest") backendURL = os.getenv("BACKEND_URL", "redis://redis:6379/0")

def sparkconfig_builder(): from pyspark import SparkConf return SparkConf().setAppName('SparkCeleryTask')
.set('spark.dynamicAllocation.enabled', 'true')
.set('spark.dynamicAllocation.schedulerBacklogTimeout', 1)
.set('spark.dynamicAllocation.minExecutors', 1)
.set('spark.dynamicAllocation.executorIdleTimeout', 20)
.set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)

app = SparkCeleryApp(broker='pyamqp://'+ brokerUser + ":" + brokerPassw + "@" + brokerURL+'//', backend=backendURL, sparkconfig_builder=sparkconfig_builder) `

In the main, I call class qs, class QuerySystem(cbir_defunc.QuerySystem, SparkCeleryTask):, then I get the error.

` if name == 'main': qs = QuerySystem(CB_file,idx_file, knn) print(app.sc) <----- here is the error app.tasks.register(qs) main(options={ #'queues': 'querySystemQueue',

})    

`

NeelBhowmik avatar Apr 13 '18 14:04 NeelBhowmik

The Spark context stuff isn't created until app.worker_init(). That's called from the main() function.

gregbaker avatar Apr 13 '18 21:04 gregbaker

Hello, Thanks for the suggestion. But I am getting the same error, even after trying your solution. Is there any other work around? Thanks.

Kind regards, Neel

NeelBhowmik avatar May 18 '18 07:05 NeelBhowmik

i have the same problem, spark context is not initialized, i had to create it manually outside of spark celery but it's not a good idea.

nguyenign avatar May 22 '18 12:05 nguyenign

Hi All,

In the function worker_init(self, loader), we need a loader. Can you please tell me what is loader ?

I have an error when i call this function

def sparkconfig_builder(): from pyspark import SparkConf return SparkConf().setAppName('SparkCeleryTask')
.set('spark.dynamicAllocation.enabled', 'true')
.set('spark.dynamicAllocation.schedulerBacklogTimeout', 1)
.set('spark.dynamicAllocation.minExecutors', 1)
.set('spark.dynamicAllocation.executorIdleTimeout', 20)
.set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)

app = SparkCeleryApp(broker='pyamqp://'+ brokerUser + ":" + brokerPassw + "@" + brokerURL+'//', backend=backendURL, sparkconfig_builder=sparkconfig_builder)

app.worker_init()

The error message that i have: app.worker_init() TypeError: worker_init() missing 1 required positional argument: 'loader'

nguyenign avatar May 24 '18 14:05 nguyenign

You shouldn't call worker_init() yourself. To get things going:

  1. create app (as you have)
  2. register some tasks
  3. call the spark_celery.main function to get things started.
from spark_celery import main
if __name__ == '__main__':
    # When called as a worker, run as a worker.
    main()

gregbaker avatar May 24 '18 15:05 gregbaker

Thank you very much for your solution.

I tried it but i still have a problem, i need the spark context in registed task to broadcast some database file but after register task then excute main fucntion to initialize the spark context. I can not add spark context to my task as following:

from spark_celery import main
if __name__ == '__main__':
     app.tasks.register(qs)
     main()    
     qs.setSparkContext(app.sc)
     qs.broadCasting()
     print('broadcasting done')

Do you have any idea?

nguyenign avatar May 29 '18 11:05 nguyenign