velox icon indicating copy to clipboard operation
velox copied to clipboard

Enhance AggregationFuzzer to verify results against Spark

Open rui-mo opened this issue 1 year ago • 10 comments

Description

Currently, Aggregation Fuzzer verifies results against DuckDB. However, not all functions are available in DuckDB and sometimes semantics don't match. It would be better to verify against Spark.

After several rounds of investigation, we would like to implement the SparkQueryRunner based on Spark Connect. In Spark 3.4, Spark Connect introduced a decoupled client-server architecture for Spark that allows remote connectivity to Spark clusters as described in spark-connect-overview. From the client perspective, Spark Connect mostly behaves as any other gRPC client, which is polyglot and and cross-platforms. Protocols used by Spark Connect are proto files defined in https://github.com/apache/spark/tree/master/connector/connect/common/src/main/protobuf/spark/connect.

Start Spark Connect server Firstly, we need to deploy an executable Spark, and download spark-connect_2.12-3.5.1.jar from https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/. Then in $SPARK_HOME, we can start Spark Connect server with below command. ./sbin/start-connect-server.sh --jars $SPARK_HOME/jars/spark-connect_2.12-3.5.1.jar If the sever is started successfully, we can see log as below. INFO SparkConnectServer: Spark Connect server started at: 0:0:0:0:0:0:0:0%0:15002

Work with Spark Connect to submit query and get the result Below diagram illustrates how query is submitted from native to Spark through Spark Connect for execution. Firstly, we create a protobuf message ExecutePlanRequest from a string query based on defined protocols. Then we submit the message to a gRPC API ExecutePlan for execution, and result can be read from its response. Since Spark stores data in Arrow IPC stream format, arrow::ipc::RecordBatchReader is used to read bytes as Arrow RecordBatch. By converting Arrow RecordBatch as Velox vector, we can compare the results of Spark and Velox. We have implemented a prototype SparkClient.cpp and verified its functionality. It could submit a query to Spark and fetch the results back to native.

Untitled Diagram

rui-mo avatar Mar 27 '24 01:03 rui-mo

https://github.com/facebookincubator/velox/issues/6595#issuecomment-1829141664

rui-mo avatar Mar 27 '24 01:03 rui-mo

Let's track the updates of SparkQueryRunner here. cc: @mbasmanova @zhztheplayer

rui-mo avatar Mar 27 '24 01:03 rui-mo

CC: @kgpai @kagamiori

mbasmanova avatar Mar 27 '24 07:03 mbasmanova

CC: @amitkdutta

mbasmanova avatar Apr 05 '24 11:04 mbasmanova

Hi @mbasmanova, I have updated our recent updates in this issue. Your feedback is appreciated, thanks. cc: @FelixYBW

rui-mo avatar Apr 09 '24 08:04 rui-mo

@rui-mo Rui, this is great. Looks like you have a working prototype. What would be the next steps towards "productizing" this?

CC: @kgpai @assignUser @duanmeng

mbasmanova avatar Apr 09 '24 09:04 mbasmanova

@mbasmanova If this approach makes sense in Velox, we plan to introduce the Spark client as well as the Spark connect protocols into Velox, and start to enhance the aggregation fuzzer based on them. To test against Spark, we would like to know where we can set-up the Spark environment needed for execution, thanks.

rui-mo avatar Apr 10 '24 03:04 rui-mo

where we can set-up the Spark environment needed for execution

What's the requirement for this? I assume this would be more inline of an ephemeral test setup (e.g. how we use hadoop on the adapters test) and not a permanent production grade thing?

We could add spark, spark connect and deps to the adapers docker image. I think it's also possible to run additional containers as serviced in a github action job but I haven't used that feature yet.

assignUser avatar Apr 10 '24 03:04 assignUser

I assume this would be more inline of an ephemeral test setup

@assignUser Yes, you are right. Could you provide us a reference on how hadoop or presto is deployed in the test? I assume we can deploy spark in a similar way. Thank you.

rui-mo avatar Apr 10 '24 03:04 rui-mo

It looks like the tests themselves start hadoop/azurite for example see connectors/hive/storage_adapters/hive/tests If you show me how you setup spark for your poc tests (e.g. your bash script) I can help with getting that setup in an action.

Here is the service container reference, sounds useful https://docs.github.com/en/actions/using-containerized-services/about-service-containers

assignUser avatar Apr 10 '24 03:04 assignUser