Slow download speeds compared to Statement Execution API
Context
I've been exploring different ways of getting large amounts of data (100GB+) out of Unity Catalog and into external ray clusters for distributed ml model training and while assessing databricks-sql-python, noticed the download speeds are significantly slower than using the statement execution API. In the actual external ray cluster, the difference is 10x, however I was able to also replicate this to a lesser extent in a databricks notebook.
Replication
The first two approaches both lead to a ~45MB/s download speed on an i3.4xlarge
Using databricks-sql-python directly
from databricks.sql import connect
with connect(
server_hostname="same-host",
http_path="same-http-path",
access_token="token",
use_cloud_fetch=True,
) as connection:
cursor = connection.cursor()
cursor.execute(
"SELECT * from foo.bar.baz"
)
print(cursor.fetchall_arrow())
Using databricks-sql-python + ray.data.read_sql
reference: https://docs.ray.io/en/latest/data/api/doc/ray.data.read_sql.html#ray.data.read_sql
from databricks.sql import connect
from ray.data import read_sql
import ray
ray.init(num_cpu=16)
def connection_factory():
return connect(
server_hostname="same-host",
http_path="same-http-path",
access_token="token"
use_cloud_fetch=True,
)
ray_dataset = read_sql(
sql="SELECT * from foo.bar.baz",
connection_factory=connection_factory,
override_num_blocks=1,
ray_remote_args={"num_cpus": 16},
)
print(f"Ray dataset count: {ray_dataset.count()}")
However when I use ray.data.read_databricks_tables, I can reach download speeds of ~150MB/s on the same machine.
Using ray.data.read_databricks_tables
import os
import ray
from ray.data import read_databricks_tables
ray.init(num_cpus=16)
os.environ["DATABRICKS_TOKEN"] = "token"
os.environ["DATABRICKS_HOST"] = "same-host"
ray_dataset = read_databricks_tables(
warehouse_id="same-id-in-http-path-above",
catalog="foo",
schema="bar",
query="SELECT * from baz",
)
print(f"Ray dataset size: {ray_dataset.size_bytes()}")
print(f"Ray dataset count: {ray_dataset.count()}")
print(
f"Ray dataset summed: {ray_dataset.sum('some_column')}"
)
Potential Cause
I suspect this is because the statement execution api allows you to make separate parallel requests to retrieve different "chunks" of data vs how the sql connector adopts a cursor based approach where you can only retrieve data sequentially.
Ask
Are there any plans on supporting a similar chunking pattern for databricks-sql-python and in lieu of that, is there currently any way to reach download speed parity with the statement execution api?
databricks-sql-python is great because it does not have the 100GB limit of the statement execution api but the slow download speed is a major blocker for use in ml applications requiring the transfer of large data, which to be fair may not the use case that databricks-sql-python has been designed for.
I had this speed issue also, I asked on the databricks forum and they answer they gave me is "Retrieve your data in chuncks"
https://community.databricks.com/t5/data-engineering/slow-fetching-results-by-client-in-databricks-sql-calling-from/td-p/13362
I also had a Databricks Contact that told me that the SQL connector in python was not meant to retrieve large quantities of data.
What I did, is to use spark sql. You can follow this to set-up pyspark to databricks https://docs.databricks.com/aws/en/dev-tools/databricks-connect/python/ . Be carefull of the runtime you attribute your cluster and the package version of databricks-connect.
And once you made your spark_df = spark.sql("select * from xxx.xxxx), you store it as parket file somewhere (dbfs, S3/ADLS Gen2) and then you download the folder containing all the parquet files. Finally you can use pandas to read the data as parquet from the folder.
All theses extra steps, make process faster for downloading large amount of data.