incubator-wayang icon indicating copy to clipboard operation
incubator-wayang copied to clipboard

Performance issue on TPCH comparing with sparksql

Open wangxiaoying opened this issue 11 months ago • 12 comments

Description

I'm trying to run TPC-H Q3 and compare the performance between Wayang and SparkSQL under the following setup:

  • Running both Spark (3.5.1) and Wayang on a local VM with 32 CPU cores and 128GB memory
  • Running a postgres instance that maintains all the TPC-H tables (sf=10) on a remote VM

I try to keep the spark setting the same on both runs. And for Q3 wayang took around 3 minutes while spark took only 40 seconds.

To reproduce

To run Wayang, I compile the project locally (using tag 1.0.0) and use the benchmark code under wayang-benchmark directly: ./wayang-1.0.0-SNAPSHOT/bin/wayang-submit org.apache.wayang.apps.tpch.TpcH exp\(123\) spark,postgres file:///path/to/wayang.properties Q3

The wayang.properties file is like the following:

wayang.postgres.jdbc.url = jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}
wayang.postgres.jdbc.user = {POSTGRES_USER}
wayang.postgres.jdbc.password = {POSTGRES_PASSWORD}

spark.master = local[32]
spark.driver.memory = 110G
spark.executor.memory = 110G
spark.executor.cores = 32
wayang.giraph.hdfs.tempdir = file:///tmp/result/

spark.rdd.compress = true
spark.log.level = INFO

To run Spark, I use the following code:

import sys
import time
from pyspark.sql import SparkSession
from contexttimer import Timer

SPARK_JARS = "path/to/jar/postgresql-42.3.8.jar"
POSTGRES_URL = "jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}"
POSTGRES_USER = "{POSTGRES_USER}"
POSTGRES_PASSWORD = "{POSTGRES_PASSWORD}"

TPCH_Q3 = """SELECT
    l_orderkey,
    sum(l_extendedprice * (1 - l_discount)) AS revenue,
    o_orderdate,
    o_shippriority
FROM
    customer,
    orders,
    lineitem
WHERE
    c_mktsegment = 'BUILDING'
    AND c_custkey = o_custkey
    AND l_orderkey = o_orderkey
    AND o_orderdate < CAST('1995-03-15' AS date)
    AND l_shipdate > CAST('1995-03-15' AS date)
GROUP BY
    l_orderkey,
    o_orderdate,
    o_shippriority
ORDER BY
    revenue DESC,
    o_orderdate"""

def registerPostgres(spark, tables, url):
    for name in tables:
        spark.sql(f"""
            CREATE TEMPORARY VIEW {name}
            USING org.apache.spark.sql.jdbc
            OPTIONS (
              driver "org.postgresql.Driver",
              url "{url}",
              dbtable "public.{name}",
              user '{POSTGRES_USER}',
              password '{POSTGRES_PASSWORD}',
              pushDownAggregate 'true'
            )
            """)
            

def registerViews(spark):
    registerPostgres(spark, ["lineitem", "customer", "orders", "nation", "region", "supplier", "part", "partsupp"], POSTGRES_URL)


def run_query(spark, query):
    with Timer() as timer:
        df = spark.sql(query)
        df.collect()
    print(f"get {df.count()} rows, {len(df.columns)} cols")
    print(f"plan: {df.explain()}")
    print(f"took {timer.elapsed:.2f} in total")
    # print(df)
    print()
    sys.stdout.flush()

        

if __name__ == '__main__':

    spark = (
        SparkSession.builder.master("local[32]")
        .appName("test-spark")
        .config("spark.jars", SPARK_JARS)
        .config("spark.executor.memory", "110g")
        .config("spark.driver.memory", "110g")
        .config("spark.log.level", "INFO")
        .config("spark.ui.port", "4040")
        .getOrCreate()
    )

    print(spark.sparkContext.getConf().getAll())
    registerViews(spark)

    run_query(spark, TPCH_Q3)
    time.sleep(2)
    spark.stop()

Some investigation

The queries that are used to fetch data from postgres using both platforms, which are basically the same (filter and projection pushdown are enabled).

I try to print the logs of spark execution as much as I can to see the difference between the two. One significant overhead I found is that wayang produces much larger ShuffleMapTask for join than spark does (~46500000 bytes v.s. 8000 bytes), which causes ~2 seconds to serialize each task (64 tasks in total) one by one and result in a 1 minutes overhead. On the other hand, the serialization time on spark is negligible.

I'm not very familiar with spark execution, so I'm not sure why it is the case. Can anyone give me a pointer? Is there anything I'm missing such as the way I run the query or something in configuration? Thank you!

wangxiaoying avatar Mar 27 '24 01:03 wangxiaoying