datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Introduce ProjectionMask To Allow Nested Projection Pushdown

Open tustvold opened this issue 3 years ago • 11 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently projection indices are pushed down to scans as Vec<usize>. This creates some ambiguities:

  • How to handle out of order or repeated indices - https://github.com/apache/arrow-datafusion/issues/2543
  • How to handle nested types - https://github.com/apache/arrow-datafusion/issues/2453

To demonstrate how these problems intertwine, consider the case of

Struct {
   first: Struct {
      a: Integer,
      b: Integer,
   },
   second: Struct {
      c: Integer
   }
}

If I project ["first.a", "second.c", "first.b"] what is the resulting schema?

Describe the solution you'd like

I would like to propose we instead pushdown a leaf column mask, where leaf columns are fields with no children, as enumerated by a depth-first-scan of the schema tree. This avoids any ordering ambiguities, whilst also being relatively straightforward to implement and interpret.

I recently introduced a similar concept to the parquet reader https://github.com/apache/arrow-rs/pull/1716. We could theoretically lift this into arrow-rs, potentially adding support to RecordBatch for it, and then use this in DataFusion.

Describe alternatives you've considered

We could not support nested pushdown

Additional context

Currently pushdown for nested types in ParquetExec is broken - https://github.com/apache/arrow-datafusion/issues/2453

Thoughts @andygrove @alamb

tustvold avatar May 21 '22 10:05 tustvold

I like the idea of a single Projection structure that understand how to select nested types.

We could theoretically lift this into arrow-rs

As long as we can maintain independence for parquet (aka one can select subsets of parquet files without having to use Arrow) that is great.

If we don't get the Projection into arrow-rs in time, we could also copy/paste the Projection code into DataFusion and provide a conversion back and forth during the interim period

alamb avatar May 23 '22 11:05 alamb

It gets little more indepth if the struct has members which are list of struct again. How will the schema pushdown will happen to them? Eg:

Struct Employee {
  name: String,
  departments: Vec<Department>,
}
Struct Department {
    id: u32,
    name: String,
}

How will the proection appear for this?

kesavkolla avatar May 24 '22 14:05 kesavkolla

How will the projection appear for this?

In this case you have roots of

  1. name
  2. departments

And leaves of

  1. name
  2. department.id
  3. department.name

So if say you projected with leaves [1, 3] you would get a schema of

Struct Employee {
  name: String,
  departments: Vec<ProjectedDepartment>
}
struct ProjectedDepartment {
  name: String
}

Or in terms of the arrow datatype

DataType::Struct(vec![
  Field("name", DataType::Utf8, <nullable>),
  Field("departments", DataType::List(Box::new(
    Field::new("element", DataType::Struct(vec![
      Field::new("name", DataType::Utf8, <nullable>)
    ]), <nullable>)
  )), <nullable>)
])

Does that make sense?

tustvold avatar May 24 '22 14:05 tustvold

My question is will the projection goes to nested levels?

Eg: employee.departments[*].name, employee.departments[0].name

kesavkolla avatar May 25 '22 05:05 kesavkolla

At least in the case of arrow and parquet, list indexing is more of a filter than a cheap projection - it requires rewriting the buffers.

Perhaps we could do the common case as described here, and potentially add list index pushdown as an extension once we have workloads that stand to benefit?? Or did you have a particular suggestion on how to handle it?

tustvold avatar May 25 '22 05:05 tustvold

I agree with your thought process. List indexing is not push down as much as the column itself. I am guessing at some point datafusion will have support at the SQL level for the list indexing.

kesavkolla avatar May 25 '22 07:05 kesavkolla

@tustvold is this something you're working on already?

nl5887 avatar Jun 13 '22 11:06 nl5887

This PR is slightly related, as predicates aren't being pushed down currently:

https://github.com/apache/arrow-datafusion/pull/2724

nl5887 avatar Jun 13 '22 11:06 nl5887

@nl5887 I am not currently working on this, but would be happy to assist if you or someone else wanted to pick it up 😀

tustvold avatar Jun 13 '22 11:06 tustvold

@tustvold I think quite a lot needs to be changed. Most of the code will do column selection by name, whereas the relevant data of the sql parsing (the indexed field structure) is lost.

Correct me if I'm wrong, but I think the datafusion core column needs to be converted to an enum consisting of Column and IndexedField. The retrieval from the DFSchema needs to be done using the column itself instead of (qualified) name, and the required_columns shouldn't be derived from the output schema, but from the plan itself.

Probably a lot more needs to be done, but this is necessary to be able to be able to push down the projections.

Looking forward to your thoughts!

nl5887 avatar Jun 24 '22 21:06 nl5887

but I think the datafusion core column needs to be converted to an enum consisting of Column and IndexedField

As described above, list index pushdown is likely to yield limited benefits for any of the formats we currently support. As such I don't think we need to support it in projection pushdown as a first step.

whereas the relevant data of the sql parsing (the indexed field structure) is lost.

I'm not sure I follow what you mean, I would have expected list indexing to just be a different type of PhysicalExpr without the need to leak into the DFSchema at all?

tustvold avatar Jun 28 '22 15:06 tustvold