incubator-graphar
incubator-graphar copied to clipboard
[Doc][Improvment] Performance evaluation of filter pushdown
Is your feature request related to a problem? Please describe. Now that GraphAr already support property-based filter pushdown https://github.com/alibaba/GraphAr/pull/221, we need to evaluate the performance improvement.
Describe the solution you'd like
- Test Overview: Use
VertexReader
andEdgeReader
to load all vertices and edges of ldbc-sf100 with filter conditions at the same time, and compare the performance when the push-down options are turned on and off. - Filter conditions:
- Select firstName, lastName and gender columns:
select("firstName", "lastName", "gender");
- Filter records with female gender:
filter(col(gender) === "female");
- Select firstName, lastName and gender columns:
- push-down options:
-
spark.sql.parquet.filterPushdown
-
spark.sql.optimizer.nestedSchemaPruning.enabled
-
- expected result:
- The logs of parquet push down are printed.
- Memory load: reduced;
- CPU load: reduced;
- Execution time: not much change.
Describe alternatives you've considered None.
Additional context Test code snippet:
object GraphArPushDown {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("read GraphAr with(out) filter pushdown")
.config("spark.sql.parquet.filterPushdown", args(0))
.config("spark.sql.optimizer.nestedSchemaPruning.enabled", args(0))
.master("local[*]")
.getOrCreate()
val start = System.nanoTime()
// construct the vertex information
val prefix = args(1)
val vertex_yaml = prefix + "person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
// construct the vertex reader
val vertex_reader = new VertexReader(prefix, vertex_info, spark)
// test reading the number of vertices
assert(vertex_reader.readVerticesNumber() == 448626)
val vertex_pg = vertex_info.getPropertyGroup("id")
var newColumns = (0 to 50000).map(i => {
val columnName = s"gender$i"
val newColumn = concat(
col("firstName"),
lit("-"),
col("lastName"),
lit("-"),
col("gender")
).as(columnName)
newColumn
})
newColumns +:= col("firstName")
newColumns +:= col("lastName")
newColumns +:= col("gender")
val vertex_df = vertex_reader
.readVertexPropertyGroup(vertex_pg, addIndex = false)
.select(newColumns: _*)
.filter(col("gender") === "male")
vertex_df.explain()
assert(vertex_df.count() == 223843)
// construct the edge information
val edge_yaml = prefix + "person_knows_person.yml"
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark)
// construct the edge reader
var adj_list_type = AdjListType.ordered_by_source
var edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)
assert(edge_reader.readEdgesNumber() == 19941198)
var edge_df = edge_reader.readEdges(false)
assert(edge_df.count() == 19941198)
adj_list_type = AdjListType.ordered_by_dest
edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)
assert(edge_reader.readEdgesNumber() == 19941198)
edge_df = edge_reader.readEdges(false)
assert(edge_df.count() == 19941198)
println(s"execution time: ${(System.nanoTime() - start) / 1e9} s")
}
}
Hey, @lixueclaire @acezen. Could you evaluate the test plan? Point it out If there are anying missing.:) Also, is there any aliyun ecs that I can evaluate, because I currently only have a Macbook air M1.
I'm sorry that I've been busy applying for jobs recently. But I will continue to push forward with this task.
Hey, @lixueclaire @acezen. Could you evaluate the test plan? Point it out If there are anying missing.:) Also, is there any aliyun ecs that I can evaluate, because I currently only have a Macbook air M1.
Thanks for your plan, it looks good to me. You could refer to this page to report the result. By the way, if sf-100 is too large to fit in your Macbook, @acezen will provide a dataset for sf-30.
I'm sorry that I've been busy applying for jobs recently. But I will continue to push forward with this task.
the dataset for performance evaluation has been upload here: labc-sf30