velox
velox copied to clipboard
Enhance AggregationFuzzer to verify results against Spark
Description
Currently, Aggregation Fuzzer verifies results against DuckDB. However, not all functions are available in DuckDB and sometimes semantics don't match. It would be better to verify against Spark.
After several rounds of investigation, we would like to implement the SparkQueryRunner based on Spark Connect. In Spark 3.4, Spark Connect introduced a decoupled client-server architecture for Spark that allows remote connectivity to Spark clusters as described in spark-connect-overview. From the client perspective, Spark Connect mostly behaves as any other gRPC client, which is polyglot and and cross-platforms. Protocols used by Spark Connect are proto files defined in https://github.com/apache/spark/tree/master/connector/connect/common/src/main/protobuf/spark/connect.
Start Spark Connect server
Firstly, we need to deploy an executable Spark, and download spark-connect_2.12-3.5.1.jar from https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/.
Then in $SPARK_HOME, we can start Spark Connect server with below command.
./sbin/start-connect-server.sh --jars $SPARK_HOME/jars/spark-connect_2.12-3.5.1.jar
If the sever is started successfully, we can see log as below.
INFO SparkConnectServer: Spark Connect server started at: 0:0:0:0:0:0:0:0%0:15002
Work with Spark Connect to submit query and get the result
Below diagram illustrates how query is submitted from native to Spark through Spark Connect for execution. Firstly, we create a protobuf message ExecutePlanRequest from a string query based on defined protocols. Then we submit the message to a gRPC API ExecutePlan for execution, and result can be read from its response. Since Spark stores data in Arrow IPC stream format, arrow::ipc::RecordBatchReader is used to read bytes as Arrow RecordBatch. By converting Arrow RecordBatch as Velox vector, we can compare the results of Spark and Velox.
We have implemented a prototype SparkClient.cpp and verified its functionality. It could submit a query to Spark and fetch the results back to native.
https://github.com/facebookincubator/velox/issues/6595#issuecomment-1829141664
Let's track the updates of SparkQueryRunner here. cc: @mbasmanova @zhztheplayer
CC: @kgpai @kagamiori
CC: @amitkdutta
Hi @mbasmanova, I have updated our recent updates in this issue. Your feedback is appreciated, thanks. cc: @FelixYBW
@rui-mo Rui, this is great. Looks like you have a working prototype. What would be the next steps towards "productizing" this?
CC: @kgpai @assignUser @duanmeng
@mbasmanova If this approach makes sense in Velox, we plan to introduce the Spark client as well as the Spark connect protocols into Velox, and start to enhance the aggregation fuzzer based on them. To test against Spark, we would like to know where we can set-up the Spark environment needed for execution, thanks.
where we can set-up the Spark environment needed for execution
What's the requirement for this? I assume this would be more inline of an ephemeral test setup (e.g. how we use hadoop on the adapters test) and not a permanent production grade thing?
We could add spark, spark connect and deps to the adapers docker image. I think it's also possible to run additional containers as serviced in a github action job but I haven't used that feature yet.
I assume this would be more inline of an ephemeral test setup
@assignUser Yes, you are right. Could you provide us a reference on how hadoop or presto is deployed in the test? I assume we can deploy spark in a similar way. Thank you.
It looks like the tests themselves start hadoop/azurite for example see connectors/hive/storage_adapters/hive/tests
If you show me how you setup spark for your poc tests (e.g. your bash script) I can help with getting that setup in an action.
Here is the service container reference, sounds useful https://docs.github.com/en/actions/using-containerized-services/about-service-containers