vortex icon indicating copy to clipboard operation
vortex copied to clipboard

fix: filter pushdown for nested fields

Open a10y opened this issue 2 weeks ago • 13 comments

In #5295, we accidentally broke nested filter pushdown. The issue is that FileSource::try_pushdown_filters seems like it's meant to evaluate using the whole file schema, rather than any projected schema. As an example, in the Github Archive benchmark dataset, we have the following query, which should trivially pushdown and be pruned, executing about 30ms or so:

SELECT COUNT(*) from events WHERE payload.ref = 'refs/head/main'

However, after this change, pushdown of this field was failing, pushing query time up 100x. The root cause is that the old logic attempted to apply the file schema to the source_expr directly.

Concretely, for the gharchive query, the whole expression is something like:

BinaryExpr {
    lhs: GetField {
    	source_expr: Column { name: "payload", index: 0 },
	field_expr: Literal { value: "ref" }
    }
    rhs: Literal { value: "refs/head/main" }
    operator: Eq
}

The issue is that the column index 0 is wrong for the whole file. Instead, we need to recursively ensure that the source_expr is a valid sequence of Column and GetField expressions that resolve properly.

Note how we already were doing this for checking if a standalone Column expression can be pushed down:

    } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
        schema
            .field_with_name(col.name())
            .ok()
            .is_some_and(|field| supported_data_types(field.data_type()))

a10y avatar Nov 19 '25 19:11 a10y

GH Archive query 0 before: 3 seconds

image

After: 30ms

image

a10y avatar Nov 19 '25 19:11 a10y

Codecov Report

:x: Patch coverage is 96.71533% with 9 lines in your changes missing coverage. Please review. :white_check_mark: Project coverage is 85.50%. Comparing base (fe4c81b) to head (17a6089).

Files with missing lines Patch % Lines
vortex-datafusion/src/convert/exprs.rs 92.59% 6 Missing :warning:
vortex-datafusion/src/persistent/opener.rs 98.54% 2 Missing :warning:
vortex-datafusion/src/persistent/source.rs 95.00% 1 Missing :warning:

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov[bot] avatar Nov 19 '25 19:11 codecov[bot]

Actually, I'm not sure why this check was added. AFAICT, attempts to create a GetFieldFunction should fail at planning time, before we even try and pushdown any filter expressions to the source

image

a10y avatar Nov 19 '25 21:11 a10y

I'm actually fairly confident that we just need to validate the ScalarFunction being pushed down to us is GetFieldFunc and let DF do the rest.

a10y avatar Nov 19 '25 21:11 a10y

The case I found while reading parquet logic for this is that there might be a constant synthetic column created and you might have a filter on it where you could get to see the column. There might be test cases in DataFusion parquet

robert3005 avatar Nov 19 '25 22:11 robert3005

The reason I added this check was that I was seeing getfield fail at execution time unexpectedly because the table schema (merged schema over all files) does have the field so the df planning works fine (as it should), but a specific file does not. I do think we need to check field existence.

asubiotto avatar Nov 20 '25 09:11 asubiotto

I was seeing getfield fail at execution time unexpectedly because the table schema (merged schema over all files) does have the field so the df planning works fine (as it should), but a specific file does not.

That makes sense, wasn't thinking about schema evolution.

The Source reports what can be pushed down, and it has access to the table schema but doesn't know the individual file schemas. So, I think we should be adapting the predicate in the FileOpener instead of the FileSource.

I've also noticed that some of these APIs have changed in DF 51 so I can double-check that today.

a10y avatar Nov 20 '25 12:11 a10y

Ahh, that makes sense. We already do pushdown checks in the opener against the file schema, so can_be_pushed_down should return true in try_pushdown_filters at the source level against the table schema, but each individual opener should check field existence in the file schema. I think this should basically already work, can_be_pushed_down is called at both levels.

asubiotto avatar Nov 20 '25 13:11 asubiotto

https://github.com/vortex-data/vortex/blob/879a53baab7b2a4f72e928fba3cbf0fb92eb989a/vortex-datafusion/src/persistent/opener.rs#L297-L308

I think we should error here instead of dropping the predicate silently after we had told DF that we're going to handle it.

By my reading of DF, when we report PushDownPredicate::supported back up to DF, it assumes that the filter is wholly executed by the DataSource. Here's how the Filter node for reference: https://github.com/apache/datafusion/blob/f17cc09fb839431b469e7c707364c1cf99042650/datafusion/physical-plan/src/filter.rs#L514

I think once we add some protection there this should be gtg

a10y avatar Nov 20 '25 18:11 a10y

CodSpeed Performance Report

Merging #5406 will improve performances by 17.18%

Comparing aduffy/filter-pushdown-fix (373a365) with develop (fe34efa)

Summary

⚡ 5 improvements
✅ 1473 untouched
⏩ 235 skipped[^skipped]

Benchmarks breakdown

Benchmark BASE HEAD Change
slice_arrow_buffer[1024] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[128] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[16384] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[2048] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[65536] 397.8 ns 339.4 ns +17.18%
[^skipped]: 235 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

codspeed-hq[bot] avatar Nov 20 '25 21:11 codspeed-hq[bot]

I had a more general message on discord:

One other thing I'd like to add is that IIRC, if the file source returns that it cannot push down a filter, there is currently no way to communicate that filter to the vortex opener. This is currently done after a successful try_pushdown_filters by setting source.vortex_predicate. Parquet has a with_predicate option so whoever's creating a data source can independently pass down a filter that will be best-effort applied to the scan layer with no guarantees to readers.

Mis-applying filters on missing columns filled in by the schema adapter is a more general concern, so I think short-term we should just do what everyone else does. We just need to make sure that even if we return that we can't push down filters at the source level, we still apply these in the opener.

But related to this PR is the opener erroring out on the predicate. I think it should not error even if the expression is on a missing column.

asubiotto avatar Nov 21 '25 12:11 asubiotto

Deploying vortex-bench with  Cloudflare Pages  Cloudflare Pages

Latest commit: 0959703
Status: ✅  Deploy successful!
Preview URL: https://3e31eebe.vortex-93b.pages.dev
Branch Preview URL: https://aduffy-filter-pushdown-fix.vortex-93b.pages.dev

View logs

Here's GHArchive query SELECT * FROM events WHERE payload.ref = 'main' with current state of this branch: https://share.firefox.dev/481IcOj

Post-filtering the string match in DF is actually a trivial amount of the overall runtime (0.3%)

image

The bigger problem is that when we tell DF that we can't push the filter, it prompts us to return a projection of payload, so it forces us to read and decode the entire payload column and all of its nested fields. So instead of getting a 10x speedup it's more like a 35% speedup. boo!

a10y avatar Nov 25 '25 22:11 a10y