incubator-graphar icon indicating copy to clipboard operation
incubator-graphar copied to clipboard

[Doc][Improvment] Performance evaluation of filter pushdown

Open Ziy1-Tan opened this issue 11 months ago • 4 comments

Is your feature request related to a problem? Please describe. Now that GraphAr already support property-based filter pushdown https://github.com/alibaba/GraphAr/pull/221, we need to evaluate the performance improvement.

Describe the solution you'd like

  • Test Overview: Use VertexReader and EdgeReader to load all vertices and edges of ldbc-sf100 with filter conditions at the same time, and compare the performance when the push-down options are turned on and off.
  • Filter conditions:
    • Select firstName, lastName and gender columns: select("firstName", "lastName", "gender");
    • Filter records with female gender: filter(col(gender) === "female");
  • push-down options:
    • spark.sql.parquet.filterPushdown
    • spark.sql.optimizer.nestedSchemaPruning.enabled
  • expected result:
    • The logs of parquet push down are printed.
    • Memory load: reduced;
    • CPU load: reduced;
    • Execution time: not much change.

Describe alternatives you've considered None.

Additional context Test code snippet:

object GraphArPushDown {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("read GraphAr with(out) filter pushdown")
      .config("spark.sql.parquet.filterPushdown", args(0))
      .config("spark.sql.optimizer.nestedSchemaPruning.enabled", args(0))
      .master("local[*]")
      .getOrCreate()

    val start = System.nanoTime()
    // construct the vertex information
    val prefix = args(1)
    val vertex_yaml = prefix + "person.vertex.yml"
    val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
    // construct the vertex reader
    val vertex_reader = new VertexReader(prefix, vertex_info, spark)

    // test reading the number of vertices
    assert(vertex_reader.readVerticesNumber() == 448626)
    val vertex_pg = vertex_info.getPropertyGroup("id")
    var newColumns = (0 to 50000).map(i => {
      val columnName = s"gender$i"
      val newColumn = concat(
        col("firstName"),
        lit("-"),
        col("lastName"),
        lit("-"),
        col("gender")
      ).as(columnName)
      newColumn
    })
    newColumns +:= col("firstName")
    newColumns +:= col("lastName")
    newColumns +:= col("gender")

    val vertex_df = vertex_reader
      .readVertexPropertyGroup(vertex_pg, addIndex = false)
      .select(newColumns: _*)
      .filter(col("gender") === "male")
    vertex_df.explain()
    assert(vertex_df.count() == 223843)

    // construct the edge information
    val edge_yaml = prefix + "person_knows_person.yml"
    val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark)
    // construct the edge reader
    var adj_list_type = AdjListType.ordered_by_source
    var edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)

    assert(edge_reader.readEdgesNumber() == 19941198)
    var edge_df = edge_reader.readEdges(false)
    assert(edge_df.count() == 19941198)

    adj_list_type = AdjListType.ordered_by_dest
    edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)

    assert(edge_reader.readEdgesNumber() == 19941198)
    edge_df = edge_reader.readEdges(false)
    assert(edge_df.count() == 19941198)

    println(s"execution time: ${(System.nanoTime() - start) / 1e9} s")
  }
}

Ziy1-Tan avatar Sep 26 '23 03:09 Ziy1-Tan