spark-celery
spark-celery copied to clipboard
AttributeError: 'SparkCeleryApp' object has no attribute 'sc'
Dear all, I try to use spark-celery in my application, when i do like the code below, I get an error "AttributeError: 'SparkCeleryApp' object has no attribute 'sc'". Could you please give any suggestion - where could be the possible problem? Thank you. Here is the code.
` from spark_celery import SparkCeleryApp, SparkCeleryTask, RDD_builder, main
backendURL = os.getenv("BACKEND_URL", "redis://redis:6379/0") brokerURL = os.getenv("BROKER_URL", "rabbitmq") brokerUser = os.getenv("BROKER_USER", "guest") brokerPassw = os.getenv("BROKER_PASSW", "guest") backendURL = os.getenv("BACKEND_URL", "redis://redis:6379/0")
def sparkconfig_builder():
from pyspark import SparkConf
return SparkConf().setAppName('SparkCeleryTask')
.set('spark.dynamicAllocation.enabled', 'true')
.set('spark.dynamicAllocation.schedulerBacklogTimeout', 1)
.set('spark.dynamicAllocation.minExecutors', 1)
.set('spark.dynamicAllocation.executorIdleTimeout', 20)
.set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)
app = SparkCeleryApp(broker='pyamqp://'+ brokerUser + ":" + brokerPassw + "@" + brokerURL+'//', backend=backendURL, sparkconfig_builder=sparkconfig_builder) `
In the main, I call class qs, class QuerySystem(cbir_defunc.QuerySystem, SparkCeleryTask):
, then I get the error.
` if name == 'main': qs = QuerySystem(CB_file,idx_file, knn) print(app.sc) <----- here is the error app.tasks.register(qs) main(options={ #'queues': 'querySystemQueue',
})
`
The Spark context stuff isn't created until app.worker_init(). That's called from the main() function.
Hello, Thanks for the suggestion. But I am getting the same error, even after trying your solution. Is there any other work around? Thanks.
Kind regards, Neel
i have the same problem, spark context is not initialized, i had to create it manually outside of spark celery but it's not a good idea.
Hi All,
In the function worker_init(self, loader), we need a loader. Can you please tell me what is loader ?
I have an error when i call this function
def sparkconfig_builder():
from pyspark import SparkConf
return SparkConf().setAppName('SparkCeleryTask')
.set('spark.dynamicAllocation.enabled', 'true')
.set('spark.dynamicAllocation.schedulerBacklogTimeout', 1)
.set('spark.dynamicAllocation.minExecutors', 1)
.set('spark.dynamicAllocation.executorIdleTimeout', 20)
.set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)
app = SparkCeleryApp(broker='pyamqp://'+ brokerUser + ":" + brokerPassw + "@" + brokerURL+'//', backend=backendURL, sparkconfig_builder=sparkconfig_builder)
app.worker_init()
The error message that i have: app.worker_init() TypeError: worker_init() missing 1 required positional argument: 'loader'
You shouldn't call worker_init() yourself. To get things going:
- create
app
(as you have) - register some tasks
- call the spark_celery.main function to get things started.
from spark_celery import main
if __name__ == '__main__':
# When called as a worker, run as a worker.
main()
Thank you very much for your solution.
I tried it but i still have a problem, i need the spark context in registed task to broadcast some database file but after register task then excute main fucntion to initialize the spark context. I can not add spark context to my task as following:
from spark_celery import main
if __name__ == '__main__':
app.tasks.register(qs)
main()
qs.setSparkContext(app.sc)
qs.broadCasting()
print('broadcasting done')
Do you have any idea?