Fix TopK aggregation for UTF-8/Utf8View group keys and add safe fallback for unsupported string aggregates
Which issue does this PR close?
- Closes #19219.
Rationale for this change
Queries that group by text columns and apply an ORDER BY ... LIMIT can currently fail with an execution error such as Can't group type: Utf8View when the TopK aggregation optimization is enabled. The same queries work without LIMIT, which indicates the issue is in the TopK-specific execution path rather than in the core aggregation logic.
This is particularly confusing for users because:
- The failure only appears when the optimizer decides to use TopK aggregation.
- Disabling
datafusion.optimizer.enable_topk_aggregationworks around the problem, but at the cost of performance and with non-obvious configuration.
The goal of this PR is to:
- Ensure TopK aggregation correctly supports UTF-8 key types (including
Utf8View) where possible. - Ensure TopK gracefully declines to optimize unsupported type combinations instead of panicking or returning confusing errors.
- Preserve or improve performance for supported cases while restoring correctness for queries that previously errored.
What changes are included in this PR?
This PR makes the following changes:
1. Centralized TopK type compatibility checks
-
Introduces
topk_types_supported(key_type: &DataType, value_type: &DataType) -> boolindatafusion_physical_plan::aggregatesto express which key/value type combinations are supported by TopK aggregation. -
TopK support is now defined in terms of two internal helpers:
is_supported_hash_key_type(&DataType) -> boolintopk::hash_table(grouping key type support), andis_supported_heap_type(&DataType) -> boolintopk::heap(aggregate value type support).
-
Supported types include Arrow primitive numeric/temporal/decimal/interval types and UTF-8 string variants (
Utf8,LargeUtf8,Utf8View). -
Explicitly disallows unsupported combinations (for example, binary key types or non-primitive/non-string value types).
2. Extend TopK heap implementation to support string aggregate values
-
Adds a new
StringHeapimplementation ofArrowHeapthat stores ownedStringvalues and uses lexicographic ordering for comparisons. -
Supports all three UTF-8 string representations:
Utf8,LargeUtf8, andUtf8View. -
Introduces a small helper
extract_string_valueto consistently extract aStringfrom anArrayReffor these types. -
Updates
new_heapto:- Dispatch to
StringHeapfor UTF-8 value types. - Continue to use
PrimitiveHeapfor primitive types. - Return a clearer error message (
"Unsupported TopK aggregate value type") for unsupported types instead of the previous generic grouping error.
- Dispatch to
3. Reuse AggregateExec when pushing down LIMIT for TopK
-
Adds
AggregateExec::with_new_limit(&self, limit: Option<usize>) -> Self, which clones anAggregateExecwhile overriding only thelimitfield. -
Refactors the
TopKAggregationoptimizer rule to:- Use
topk_types_supportedto gate TopK-based rewrite decisions. - Use
with_new_limitinstead of manually reconstructingAggregateExec.
- Use
-
This reduces duplication, keeps the optimizer and execution layers consistent, and avoids the risk of forgetting future
AggregateExecfields in the optimizer code.
4. Safer TopK stream selection in AggregateExec
-
Extends
AggregateExec's stream selection logic to consult a new helper:GroupedTopKAggregateStream::can_use_topk(&AggregateExec) -> Result<bool>.
-
can_use_topkperforms structural validation (single grouping key, single min/max aggregate, presence of a limit) and also validates types viatopk_types_supported. -
The TopK execution path is now only used when both of the following are true:
- The query structure matches TopK expectations (min/max with a single group key and limit), and
- The key and aggregate value types are supported by the underlying TopK data structures.
-
This ensures that if the optimizer ever fails to filter out an unsupported combination, the execution plan will still fall back to the standard aggregation path instead of attempting to construct a
GroupedTopKAggregateStreamthat cannot handle the types.
5. Improved hash-table type support for TopK keys
- Introduces
is_supported_hash_key_typeintopk::hash_tableand uses it to define which types can be used as grouping keys for TopK. - For now, this matches the previous behavior (primitive numeric/temporal/decimal/interval types plus UTF-8 variants) but centralizes the knowledge in a single function for reuse by both the optimizer and runtime.
6. Tests and regressions
This PR includes several new tests to validate behavior and prevent regressions:
Rust tests
-
utf8_grouping_min_max_limit_fallbacksinaggregate_statistics.rs:-
Builds an in-memory table with an UTF-8 grouping key and both string and numeric value columns.
-
Verifies that:
- A supported query (
GROUP BY g, max(val_num) ORDER BY max(val_num) DESC LIMIT 1) returns the correct row and can use TopK. - An unsupported combination (
GROUP BY g, max(val_str) ORDER BY max(val_str) DESC LIMIT 1) executes successfully but does not useGroupedTopKAggregateStreamin the physical plan (falls back to standard aggregation without panicking).
- A supported query (
-
Uses
displayableto validate thatGroupedTopKAggregateStreamdoes not appear in the plan for the unsupported case.
-
-
New tests in
topk::priority_map:should_track_lexicographic_min_utf8_valueshould_track_lexicographic_max_utf8_value_descshould_track_large_utf8_valuesshould_track_utf8_view_values- These tests exercise lexicographic min/max behavior and verify that
PriorityMapworks withUtf8,LargeUtf8, andUtf8Viewvalue types in both ascending and descending modes.
SQL logic tests
-
Extends
aggregates_topk.sltwith string-focused TopK scenarios:- Creates a
string_topktable and astring_topk_viewusingarrow_cast(..., 'Utf8View')to exercise both regularUtf8andUtf8Viewpaths. - Verifies
GROUP BY category, max(val)withORDER BY max(val) DESC LIMIT 2returns correct results for both table and view. - Asserts that the physical plan uses TopK on the aggregate (
AggregateExecwithlim=[2]andSortExec: TopK(fetch=2)).
- Creates a
-
Adds regression tests for string
maxover thetraces.trace_idcolumn withORDER BY max_trace DESC LIMIT 2to ensure schema stability and correct plan selection.
7. Error message polish
- When a TopK heap cannot be constructed for a given value type, the error now reports
"Unsupported TopK aggregate value type: ..."instead of the more confusing generic grouping error. - This should make it clearer to users when they hit a genuine type limitation versus a bug.
Are these changes tested?
DataFusion CLI v51.0.0
> create table t(a text, b text) as values ('123', '4'), ('1', '3');
0 row(s) fetched.
Elapsed 0.094 seconds.
> select a, max(b) as y_axis from t group by a order by y_axis desc;
+-----+--------+
| a | y_axis |
+-----+--------+
| 123 | 4 |
| 1 | 3 |
+-----+--------+
2 row(s) fetched.
Elapsed 0.046 seconds.
> select a, max(b) as y_axis from t group by a order by y_axis desc limit 1;
+-----+--------+
| a | y_axis |
+-----+--------+
| 123 | 4 |
+-----+--------+
1 row(s) fetched.
Elapsed 0.020 seconds.
Yes.
-
New unit tests have been added for:
GroupedTopKAggregateStreambehavior with UTF-8 grouping and both numeric and string aggregate values.PriorityMaphandling ofUtf8,LargeUtf8, andUtf8Viewvalue types and lexicographic min/max behavior.
-
New sqllogictest cases have been added to
aggregates_topk.sltto cover:GROUP BY+ORDER BY ... LIMITwith string keys and values.- Regression coverage for
Utf8View-backed queries to prevent reintroducing the"Can't group type: Utf8View"error.
-
Existing test suites (unit tests and SQL logic tests) are expected to continue to pass, ensuring backwards compatibility for existing queries and plans.
Are there any user-facing changes?
Yes, there are user-visible behavior changes, but no breaking changes to SQL semantics:
-
Queries that group by UTF-8 (including
Utf8View) text columns and useORDER BYon aMIN/MAXaggregate withLIMITwill now:- Execute successfully, and
- Use the TopK aggregation optimization when the key and aggregate value types are supported.
-
Queries that involve unsupported TopK type combinations will:
- Fall back to the standard aggregation path, and
- Avoid panics or confusing
"Can't group type"errors.
-
Error messages for unsupported TopK value types are more explicit (
"Unsupported TopK aggregate value type: ...").
From an API surface perspective:
datafusion_physical_plan::aggregatesnow exposestopk_types_supported(&DataType, &DataType) -> boolas a helper for determining whether TopK can be used for a particular key/value type pair.AggregateExecgains a newwith_new_limit(&self, limit: Option<usize>) -> Selfhelper to cheaply clone with a different limit hint. This is additive and backwards-compatible.
No configuration changes are required for users. The existing datafusion.optimizer.enable_topk_aggregation setting continues to control whether TopK is considered at all; the main difference is that enabling it is now safe for UTF-8 grouping/query patterns that previously errored.
LLM-generated code disclosure
This PR includes LLM-generated code, tests, and comments. All LLM-generated content has been reviewed and tested.