datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Fix TopK aggregation for UTF-8/Utf8View group keys and add safe fallback for unsupported string aggregates

Open kosiew opened this issue 4 weeks ago • 0 comments

Which issue does this PR close?

  • Closes #19219.

Rationale for this change

Queries that group by text columns and apply an ORDER BY ... LIMIT can currently fail with an execution error such as Can't group type: Utf8View when the TopK aggregation optimization is enabled. The same queries work without LIMIT, which indicates the issue is in the TopK-specific execution path rather than in the core aggregation logic.

This is particularly confusing for users because:

  • The failure only appears when the optimizer decides to use TopK aggregation.
  • Disabling datafusion.optimizer.enable_topk_aggregation works around the problem, but at the cost of performance and with non-obvious configuration.

The goal of this PR is to:

  • Ensure TopK aggregation correctly supports UTF-8 key types (including Utf8View) where possible.
  • Ensure TopK gracefully declines to optimize unsupported type combinations instead of panicking or returning confusing errors.
  • Preserve or improve performance for supported cases while restoring correctness for queries that previously errored.

What changes are included in this PR?

This PR makes the following changes:

1. Centralized TopK type compatibility checks

  • Introduces topk_types_supported(key_type: &DataType, value_type: &DataType) -> bool in datafusion_physical_plan::aggregates to express which key/value type combinations are supported by TopK aggregation.

  • TopK support is now defined in terms of two internal helpers:

    • is_supported_hash_key_type(&DataType) -> bool in topk::hash_table (grouping key type support), and
    • is_supported_heap_type(&DataType) -> bool in topk::heap (aggregate value type support).
  • Supported types include Arrow primitive numeric/temporal/decimal/interval types and UTF-8 string variants (Utf8, LargeUtf8, Utf8View).

  • Explicitly disallows unsupported combinations (for example, binary key types or non-primitive/non-string value types).

2. Extend TopK heap implementation to support string aggregate values

  • Adds a new StringHeap implementation of ArrowHeap that stores owned String values and uses lexicographic ordering for comparisons.

  • Supports all three UTF-8 string representations: Utf8, LargeUtf8, and Utf8View.

  • Introduces a small helper extract_string_value to consistently extract a String from an ArrayRef for these types.

  • Updates new_heap to:

    • Dispatch to StringHeap for UTF-8 value types.
    • Continue to use PrimitiveHeap for primitive types.
    • Return a clearer error message ("Unsupported TopK aggregate value type") for unsupported types instead of the previous generic grouping error.

3. Reuse AggregateExec when pushing down LIMIT for TopK

  • Adds AggregateExec::with_new_limit(&self, limit: Option<usize>) -> Self, which clones an AggregateExec while overriding only the limit field.

  • Refactors the TopKAggregation optimizer rule to:

    • Use topk_types_supported to gate TopK-based rewrite decisions.
    • Use with_new_limit instead of manually reconstructing AggregateExec.
  • This reduces duplication, keeps the optimizer and execution layers consistent, and avoids the risk of forgetting future AggregateExec fields in the optimizer code.

4. Safer TopK stream selection in AggregateExec

  • Extends AggregateExec's stream selection logic to consult a new helper:

    • GroupedTopKAggregateStream::can_use_topk(&AggregateExec) -> Result<bool>.
  • can_use_topk performs structural validation (single grouping key, single min/max aggregate, presence of a limit) and also validates types via topk_types_supported.

  • The TopK execution path is now only used when both of the following are true:

    • The query structure matches TopK expectations (min/max with a single group key and limit), and
    • The key and aggregate value types are supported by the underlying TopK data structures.
  • This ensures that if the optimizer ever fails to filter out an unsupported combination, the execution plan will still fall back to the standard aggregation path instead of attempting to construct a GroupedTopKAggregateStream that cannot handle the types.

5. Improved hash-table type support for TopK keys

  • Introduces is_supported_hash_key_type in topk::hash_table and uses it to define which types can be used as grouping keys for TopK.
  • For now, this matches the previous behavior (primitive numeric/temporal/decimal/interval types plus UTF-8 variants) but centralizes the knowledge in a single function for reuse by both the optimizer and runtime.

6. Tests and regressions

This PR includes several new tests to validate behavior and prevent regressions:

Rust tests

  • utf8_grouping_min_max_limit_fallbacks in aggregate_statistics.rs:

    • Builds an in-memory table with an UTF-8 grouping key and both string and numeric value columns.

    • Verifies that:

      • A supported query (GROUP BY g, max(val_num) ORDER BY max(val_num) DESC LIMIT 1) returns the correct row and can use TopK.
      • An unsupported combination (GROUP BY g, max(val_str) ORDER BY max(val_str) DESC LIMIT 1) executes successfully but does not use GroupedTopKAggregateStream in the physical plan (falls back to standard aggregation without panicking).
    • Uses displayable to validate that GroupedTopKAggregateStream does not appear in the plan for the unsupported case.

  • New tests in topk::priority_map:

    • should_track_lexicographic_min_utf8_value
    • should_track_lexicographic_max_utf8_value_desc
    • should_track_large_utf8_values
    • should_track_utf8_view_values
    • These tests exercise lexicographic min/max behavior and verify that PriorityMap works with Utf8, LargeUtf8, and Utf8View value types in both ascending and descending modes.

SQL logic tests

  • Extends aggregates_topk.slt with string-focused TopK scenarios:

    • Creates a string_topk table and a string_topk_view using arrow_cast(..., 'Utf8View') to exercise both regular Utf8 and Utf8View paths.
    • Verifies GROUP BY category, max(val) with ORDER BY max(val) DESC LIMIT 2 returns correct results for both table and view.
    • Asserts that the physical plan uses TopK on the aggregate (AggregateExec with lim=[2] and SortExec: TopK(fetch=2)).
  • Adds regression tests for string max over the traces.trace_id column with ORDER BY max_trace DESC LIMIT 2 to ensure schema stability and correct plan selection.

7. Error message polish

  • When a TopK heap cannot be constructed for a given value type, the error now reports "Unsupported TopK aggregate value type: ..." instead of the more confusing generic grouping error.
  • This should make it clearer to users when they hit a genuine type limitation versus a bug.

Are these changes tested?

DataFusion CLI v51.0.0
> create table t(a text, b text) as values ('123', '4'), ('1', '3');
0 row(s) fetched.
Elapsed 0.094 seconds.

> select a, max(b) as y_axis from t group by a order by y_axis desc;
+-----+--------+
| a   | y_axis |
+-----+--------+
| 123 | 4      |
| 1   | 3      |
+-----+--------+
2 row(s) fetched.
Elapsed 0.046 seconds.

> select a, max(b) as y_axis from t group by a order by y_axis desc limit 1;
+-----+--------+
| a   | y_axis |
+-----+--------+
| 123 | 4      |
+-----+--------+
1 row(s) fetched.
Elapsed 0.020 seconds.

Yes.

  • New unit tests have been added for:

    • GroupedTopKAggregateStream behavior with UTF-8 grouping and both numeric and string aggregate values.
    • PriorityMap handling of Utf8, LargeUtf8, and Utf8View value types and lexicographic min/max behavior.
  • New sqllogictest cases have been added to aggregates_topk.slt to cover:

    • GROUP BY + ORDER BY ... LIMIT with string keys and values.
    • Regression coverage for Utf8View-backed queries to prevent reintroducing the "Can't group type: Utf8View" error.
  • Existing test suites (unit tests and SQL logic tests) are expected to continue to pass, ensuring backwards compatibility for existing queries and plans.

Are there any user-facing changes?

Yes, there are user-visible behavior changes, but no breaking changes to SQL semantics:

  • Queries that group by UTF-8 (including Utf8View) text columns and use ORDER BY on a MIN/MAX aggregate with LIMIT will now:

    • Execute successfully, and
    • Use the TopK aggregation optimization when the key and aggregate value types are supported.
  • Queries that involve unsupported TopK type combinations will:

    • Fall back to the standard aggregation path, and
    • Avoid panics or confusing "Can't group type" errors.
  • Error messages for unsupported TopK value types are more explicit ("Unsupported TopK aggregate value type: ...").

From an API surface perspective:

  • datafusion_physical_plan::aggregates now exposes topk_types_supported(&DataType, &DataType) -> bool as a helper for determining whether TopK can be used for a particular key/value type pair.
  • AggregateExec gains a new with_new_limit(&self, limit: Option<usize>) -> Self helper to cheaply clone with a different limit hint. This is additive and backwards-compatible.

No configuration changes are required for users. The existing datafusion.optimizer.enable_topk_aggregation setting continues to control whether TopK is considered at all; the main difference is that enabling it is now safe for UTF-8 grouping/query patterns that previously errored.

LLM-generated code disclosure

This PR includes LLM-generated code, tests, and comments. All LLM-generated content has been reviewed and tested.

kosiew avatar Dec 11 '25 15:12 kosiew