raydp
raydp copied to clipboard
RayDP on Databricks
Hi all,
I read on Databricks websites - and I learn about RayDP: https://databricks.com/session_na21/build-large-scale-data-analytics-and-ai-pipeline-using-raydp
I tried to make this example works, but the system crash at the first line:
spark = raydp.init_spark(......)
The error keeps saying: "java gateway process exited before sending its port number".
Could you please share your thinking on how I could setup RayDP on Databricks? Thank you so much.
My current system:
- 1 clusters: 1 driver - 2 workers
- CPU: 8 cores - RAM 32GB for each node
- Spark 3.2.1
Hi @LAITRUNGMINHDUC, did you start a Ray cluster on a Databricks Spark cluster and then want to use RayDP? RayDP is Spark on Ray so it makes it look like Spark on Ray on Spark. Usually you can just start a Ray cluster in cloud and use RayDP to run the Spark program and connect with other Ray components. Could you please share more about your use case that you want to run on Databricks?
Hi @carsonwang, thank you for your explanation.
My use-case is... I want to test for the Spark SQL implementation on RayDP - cause the current solution on Spark takes so much time to run. My data scientist works mainly on SQL - so migrating the solution into full Python and run-on Ray is hard.
For more details on my setup, I use the Init-script that is shared on this website to initialize the system. I also tried the single-node installation, but nothing works. Link: https://databricks.com/blog/2021/11/19/ray-on-databricks.html
Here is my sample code:
import sys; sys.stdout.fileno = lambda: False
import ray
import raydp
ray.init(address='auto')
spark = raydp.init_spark(
app_name = "myApp",
num_executors = 2,
executor_cores = 4,
executor_memory = "28GB"
)
And here is the error:
Exception Traceback (most recent call last)
<command-1007474167784282> in <module>
----> 1 spark = raydp.init_spark(
2 app_name = "myApp",
3 num_executors = 2,
4 executor_cores = 4,
5 executor_memory = "28GB"
/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/context.py in init_spark(app_name, num_executors, executor_cores, executor_memory, configs)
124 _global_spark_context = _SparkContext(
125 app_name, num_executors, executor_cores, executor_memory, configs)
--> 126 return _global_spark_context.get_or_create_session()
127 except:
128 _global_spark_context = None
/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/context.py in get_or_create_session(self)
68 return self._spark_session
69 self.handle = RayDPConversionHelper.options(name=RAYDP_OBJ_HOLDER_NAME).remote()
---> 70 spark_cluster = self._get_or_create_spark_cluster()
71 self._spark_session = spark_cluster.get_spark_session(
72 self._app_name,
/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/context.py in _get_or_create_spark_cluster(self)
61 if self._spark_cluster is not None:
62 return self._spark_cluster
---> 63 self._spark_cluster = SparkCluster(self._configs)
64 return self._spark_cluster
65
/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster.py in __init__(self, configs)
32 self._app_master_bridge = None
33 self._configs = configs
---> 34 self._set_up_master(None, None)
35 self._spark_session: SparkSession = None
36
/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster.py in _set_up_master(self, resources, kwargs)
38 # TODO: specify the app master resource
39 self._app_master_bridge = RayClusterMaster(self._configs)
---> 40 self._app_master_bridge.start_up()
41
42 def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]):
/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py in start_up(self, popen_kwargs)
52 return
53 extra_classpath = os.pathsep.join(self._prepare_jvm_classpath())
---> 54 self._gateway = self._launch_gateway(extra_classpath, popen_kwargs)
55 self._app_master_java_bridge = self._gateway.entry_point.getAppMasterBridge()
56 self._set_properties()
/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py in _launch_gateway(self, class_path, popen_kwargs)
116
117 if not os.path.isfile(conn_info_file):
--> 118 raise Exception("Java gateway process exited before sending its port number")
119
120 with open(conn_info_file, "rb") as info:
Exception: Java gateway process exited before sending its port number
hi @LAITRUNGMINHDUC , RayDP does not has its own SQL implementation for now, I think the performance would be very similar to vanilla spark. If you want to have better SQL performance, maybe you can try our another project, gazelle.
I can't identify the problem with the error you provide. Can you please run ps aux| grep raydp, and shutdown the related process(a java process, main class is AppMasterEntryPoint), then try again? And what's your ray and spark version?
close as stale