delta-rs
delta-rs copied to clipboard
DataFusion filter on partition column doesn't work. (when the phsical schema ordering is different to logical one)
Environment
Linux, Rust Delta-rs version: 0.17.3
Binding:
Environment:
- Cloud provider:
- OS:
- Other:
Bug
What happened: The filter expr didn't return expected rows. My table is relatively big so I tried to construct a minimal test to reproduce it, see below code. Besides, from what I see in the log, my guess is:
- delta scan is good, it successfully prune irrelated files.
- https://github.com/delta-io/delta-rs/pull/1071/files#diff-f3a4847c9506848f6f5bf021b5f10fb24602373580e58739bd2a2a24f9878e77R438 we use
InExact
filter push down, so datafusion apply the same filter again, but however, the physical plan gets wrong column index. - I am not an expert on datafusion or delta-rs.. so I stop here... thank you in advance for any help...
What you expected to happen:
How to reproduce it:
I wrote a unit test to check it, but it seems like I don't have permission to push it?
#[tokio::test]
async fn delta_scan_mixed_partition_order_and_filter() {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let table = crate::DeltaOps::new_in_memory()
.create()
.with_columns(get_delta_schema().fields().clone())
.with_partition_columns(["modified", "id"])
.await
.unwrap();
assert_eq!(table.version(), 0);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
])),
Arc::new(arrow::array::StringArray::from(vec!["A"])),
Arc::new(arrow::array::Int32Array::from(vec![1])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
let provider = Arc::new(table);
let ctx = SessionContext::new();
let df = ctx.read_table(provider).unwrap();
let actual = df.clone().collect().await.unwrap();
let expected = vec![
"+-------+------------+----+",
"| value | modified | id |",
"+-------+------------+----+",
"| 1 | 2021-02-01 | A |",
"+-------+------------+----+",
];
assert_batches_sorted_eq!(&expected, &actual);
let actual = df.clone().filter(col("value").eq(lit(1))).unwrap().collect().await.unwrap();
assert_batches_sorted_eq!(&expected, &actual);
let actual = df.clone().filter(col("id").eq(lit("A"))).unwrap().collect().await.unwrap();
assert_batches_sorted_eq!(&expected, &actual);
}
More details:
expected:
[
"+-------+------------+----+",
"| value | modified | id |",
"+-------+------------+----+",
"| 1 | 2021-02-01 | A |",
"+-------+------------+----+",
]
actual:
[
"++",
"++",
]
left: ["+-------+------------+----+", "| value | modified | id |", "+-------+------------+----+", "| 1 | 2021-02-01 | A |", "+-------+------------+----+"]
right: ["++", "++"]
One more suggestion: Actually, we are able to return dynamic filter push down flag?
pub enum TableProviderFilterPushDown {
/// The expression cannot be used by the provider.
Unsupported,
/// The expression can be used to reduce the data retrieved,
/// but the provider cannot guarantee it will omit all tuples that
/// may be filtered. In this case, DataFusion will apply an additional
/// `Filter` operation after the scan to ensure all rows are filtered correctly.
Inexact,
/// The provider **guarantees** that it will omit **all** tuples that are
/// filtered by the filter expression. This is the fastest option, if available
/// as DataFusion will not apply additional filtering.
Exact,
}
when the expr only includes partition columns, we should return Exact
.
Thanks for taking the time to write a test @Veiasai ! I'll take a look at this shortly
hey, any updates?
@rtyler I have a local fix for this issue - I am not sure on what the delta protocol dictates, but in some of our test tables the partitioning columns would appear in a different order in the json schema and in the partition columns array.
_arrow_schema
uses an iterator + chain + 2 filters on the schema, while the rest of the code (e.g. DeltaScanBuilder.build) will filter them out, then append them explicitly in the order dictated by partition_columns.
This is the essence of my fix
fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
let meta = snapshot.metadata();
let schema = meta.schema()?;
let fields = schema
.fields()
.filter(|f| !meta.partition_columns.contains(&f.name().to_string()))
.map(|f| f.try_into())
.chain(
// keep consistent order of partitioning columns
meta.partition_columns.iter().map(|partition_col| {
let f = schema.field(partition_col).unwrap();
let field = Field::try_from(f)?;
// ...
LMK if this is enough as a pointer or I should send a PR with this.
@rtyler I've sent a PR just in case https://github.com/delta-io/delta-rs/pull/2614
Would be glad to add some tests if you point me at the correct suite or an example, I was looking for a test with more than one partitioning column and didn't find anything.