incubator-wayang
incubator-wayang copied to clipboard
Performance issue on TPCH comparing with sparksql
Description
I'm trying to run TPC-H Q3 and compare the performance between Wayang and SparkSQL under the following setup:
- Running both Spark (3.5.1) and Wayang on a local VM with 32 CPU cores and 128GB memory
- Running a postgres instance that maintains all the TPC-H tables (sf=10) on a remote VM
I try to keep the spark setting the same on both runs. And for Q3 wayang took around 3 minutes while spark took only 40 seconds.
To reproduce
To run Wayang, I compile the project locally (using tag 1.0.0) and use the benchmark code under wayang-benchmark
directly: ./wayang-1.0.0-SNAPSHOT/bin/wayang-submit org.apache.wayang.apps.tpch.TpcH exp\(123\) spark,postgres file:///path/to/wayang.properties Q3
The wayang.properties file is like the following:
wayang.postgres.jdbc.url = jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}
wayang.postgres.jdbc.user = {POSTGRES_USER}
wayang.postgres.jdbc.password = {POSTGRES_PASSWORD}
spark.master = local[32]
spark.driver.memory = 110G
spark.executor.memory = 110G
spark.executor.cores = 32
wayang.giraph.hdfs.tempdir = file:///tmp/result/
spark.rdd.compress = true
spark.log.level = INFO
To run Spark, I use the following code:
import sys
import time
from pyspark.sql import SparkSession
from contexttimer import Timer
SPARK_JARS = "path/to/jar/postgresql-42.3.8.jar"
POSTGRES_URL = "jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}"
POSTGRES_USER = "{POSTGRES_USER}"
POSTGRES_PASSWORD = "{POSTGRES_PASSWORD}"
TPCH_Q3 = """SELECT
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
o_orderdate,
o_shippriority
FROM
customer,
orders,
lineitem
WHERE
c_mktsegment = 'BUILDING'
AND c_custkey = o_custkey
AND l_orderkey = o_orderkey
AND o_orderdate < CAST('1995-03-15' AS date)
AND l_shipdate > CAST('1995-03-15' AS date)
GROUP BY
l_orderkey,
o_orderdate,
o_shippriority
ORDER BY
revenue DESC,
o_orderdate"""
def registerPostgres(spark, tables, url):
for name in tables:
spark.sql(f"""
CREATE TEMPORARY VIEW {name}
USING org.apache.spark.sql.jdbc
OPTIONS (
driver "org.postgresql.Driver",
url "{url}",
dbtable "public.{name}",
user '{POSTGRES_USER}',
password '{POSTGRES_PASSWORD}',
pushDownAggregate 'true'
)
""")
def registerViews(spark):
registerPostgres(spark, ["lineitem", "customer", "orders", "nation", "region", "supplier", "part", "partsupp"], POSTGRES_URL)
def run_query(spark, query):
with Timer() as timer:
df = spark.sql(query)
df.collect()
print(f"get {df.count()} rows, {len(df.columns)} cols")
print(f"plan: {df.explain()}")
print(f"took {timer.elapsed:.2f} in total")
# print(df)
print()
sys.stdout.flush()
if __name__ == '__main__':
spark = (
SparkSession.builder.master("local[32]")
.appName("test-spark")
.config("spark.jars", SPARK_JARS)
.config("spark.executor.memory", "110g")
.config("spark.driver.memory", "110g")
.config("spark.log.level", "INFO")
.config("spark.ui.port", "4040")
.getOrCreate()
)
print(spark.sparkContext.getConf().getAll())
registerViews(spark)
run_query(spark, TPCH_Q3)
time.sleep(2)
spark.stop()
Some investigation
The queries that are used to fetch data from postgres using both platforms, which are basically the same (filter and projection pushdown are enabled).
I try to print the logs of spark execution as much as I can to see the difference between the two. One significant overhead I found is that wayang produces much larger ShuffleMapTask
for join than spark does (~46500000 bytes v.s. 8000 bytes), which causes ~2 seconds to serialize each task (64 tasks in total) one by one and result in a 1 minutes overhead. On the other hand, the serialization time on spark is negligible.
I'm not very familiar with spark execution, so I'm not sure why it is the case. Can anyone give me a pointer? Is there anything I'm missing such as the way I run the query or something in configuration? Thank you!