quinn
quinn copied to clipboard
Better SparkSession settings for localhost
Users need to configure their SparkSession for localhost development so computations run fast and so that they don't run out of memory.
Here are some examples I ran on my local machine that has 64GB of RAM on the 1e9 h2o groupby dataset (has 1 billion rows of data).
Here's the "better config":
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.executor.memory", "10G")
.config("spark.driver.memory", "25G")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.sql.shuffle.partitions", "2")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Here's the default config:
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
groupby query
This query takes 104 seconds with the "better config":
delta_table = delta.DeltaTable.forPath(
spark, f"{Path.home()}/data/deltalake/G1_1e9_1e2_0_0"
)
delta_table.toDF().groupby("id3").agg(F.sum("v1"), F.mean("v3")).limit(10).collect()
This same query errors out with the default config.
join query
This query takes 69 seconds with the "better config", but 111 seconds with the default config:
x = spark.read.format("delta").load(f"{Path.home()}/data/deltalake/J1_1e9_1e9_0_0")
small = spark.read.format("parquet").load(f"{Path.home()}/data/J1_1e9_1e3_0_0.parquet")
spark.sql('select x.id2, sum(small.v2) from x join small using (id1) group by x.id2').show()
Conclusion
SparkSession
configurations significantly impact the localhost Spark runtime experience.
How can we make it easy for Spark users to get optimal configurations for localhost development?
Good job @MrPowers to get this conversation started
I think we have 3 ways to do this (going from less likely to have an impact to most likely to have an impact)
1- recommend the different configuration on the Spark docs, get started guide, etc This will help some users, but most users just use the standard config so this will likely have a small impact
2- change Spark so it automatically uses the ‘better’ configuration above However the ‘better’ configuration will depend on the machine and the dataset. a better configuration in one case does not mean it’s the best configuration always
3- change Spark so that it ‘guesses’ and uses a configuration based on the ram available, the number of CPU cores, etc I believe this is what Polars does. This config should changeable by the user
On top of improving Spark performance (which is important), I wonder what should be the Spark positioning : should Spark be the engine for big data or the engine for data of any size? if the positioning for Spark is the engine for big data, then I would not mind if it’s slower than duckdb on small data because that’s not what it’s built for.
Your thoughts?
@lucazanna - I think we should first figure out the true capabilities of Spark locally and then figure out the best messaging. Here are the results for one of the h2o queries:
I think the current benchmarks are really misleading...
I'm in favor of the automatic configuration (leaning towards higher memory consumption) with configurable parameters that the user can change if needed.
I think these are good configurable parameters:
- executor memory
- driver memory
- shuffle partitions
For executor and driver memory we could do a percentage of available system memory. It doesn't look like there's a good way to do this with Python's standard library but psutil
https://pypi.org/project/psutil/ looks like a popular way to get this info.
@jeffbrennan - figuring out how to programatically set the best settings is a great goal.
The first step is to get everyone with the same datasets on their local machines so we can tinker and find what settings work best. There are so many Spark configuration options and I'm not even sure which knobs need to be turned (let alone how to optimally turn them)!
Here are some other suggestions that might be useful: https://luminousmen.com/post/how-to-speed-up-spark-jobs-on-small-test-datasets
I do use this one
# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random
import time
# Third-party imports
import numpy as np
import pandas as pd
import pyarrow
# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
IntegerType, StringType, StructField, StructType,
DoubleType, TimestampType
)
from pyspark import pandas as ps
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
number_cores = int(multiprocessing.cpu_count())
mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74
def get_spark_session(app_name: str, conf: SparkConf):
conf.setMaster("local[{}]".format(number_cores))
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
"spark.sql.repl.eagerEval.enabled", "True"
).set("spark.sql.adaptive.enabled", "True").set(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
).set(
"spark.sql.repl.eagerEval.maxNumRows", "10000"
).set(
"sc.setLogLevel", "ERROR"
)
return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
spark = get_spark_session("My app", SparkConf())
spark.sparkContext.setLogLevel("ERROR")
With this one users get max mem and cpu in local mode.
Here are some other settings that might be useful: https://www.linkedin.com/posts/dipanjan-s-5874b1a0_pyspark-unittesting-optimization-activity-7122896865762701312-oF-j?utm_source=share&utm_medium=member_desktop
Do you absolutely need spark? What about polars or duckdb in case you only target single node deployments?
Do you absolutely need spark? What about polars or duckdb in case you only target single node deployments?
The topic is about running unit tests of spark routines. These tests are running on a single node (locally). Maybe I'm missing something, but how polars/duckdb may help here?
No- for these purposes it will not help