velox
velox copied to clipboard
Support std::stable_sort in Orderby operator
When running Spark unit tests, we noticed that the results from Gluten are inconsistent with those from Vanilla Spark. The input data contains three fields: employee_name
, department
, and salary
.
The query statement is as follows:
SELECT employee_name, department, salary, FIRST_VALUE(employee_name) OVER w AS highest_salary
FROM basic_pays
WINDOW w AS (
PARTITION BY department
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
)
ORDER BY department
This query first uses a window function to partition the data by the department field and sorts it in descending order by the salary field. The data output from the window function is already sorted by the department and salary fields. After the window operation, there is an additional ORDER BY sorting based on the department field.
In Spark, the final ORDER BY sorting is stable, which means that the salary field will also maintain a descending order in the final results. However, in Velox, because std::sort is used for sorting, the stability of the final sorting result cannot be guaranteed.
This PR aims to add an option to the ORDER BY sorting to control whether stable sorting is enabled.
Deploy Preview for meta-velox canceled.
Name | Link |
---|---|
Latest commit | 759c1669159b9b255235a15a6049fbe926aa3de6 |
Latest deploy log | https://app.netlify.com/sites/meta-velox/deploys/6629e96f76d0c50008a6b4d7 |
@mbasmanova Can you help to review? Thanks.
@JkSelf : Window doesn't guarantee sorted order of results.
Also ORDER BY even if with SortMerge shuffle only guarantees ORDER BY department.
There isn't any part of this SQL that implies the results should be ordered by salary as well.
The test seems over-restrictive to me.
Please update the document
@mbasmanova @aditi-pandit Indeed, enabling stable sort may have an impact on performance, but in the current Gluten environment, we are facing issues with inconsistent results, as the results of the Spark sort operator are stable sorted. Therefore, could we consider providing flexibility to enable or disable stable sort through configuration options?
in the current Gluten environment, we are facing issues with inconsistent results
Would you elaborate a bit on the use cases that are affected? Are you referring to unit tests? As I mentioned earlier, order of inputs cannot be guaranteed for distributed execution, hence, I cannot see how this functionality is useful for production scenarios.
in the current Gluten environment, we are facing issues with inconsistent results
Would you elaborate a bit on the use cases that are affected? Are you referring to unit tests? As I mentioned earlier, order of inputs cannot be guaranteed for distributed execution, hence, I cannot see how this functionality is useful for production scenarios.
@mbasmanova Indeed, if a sort operation is immediately followed by a shuffle operation, the significance of whether the sort is stable or unstable is not substantial. However, the issue we are currently facing is that the result of the OrderBy operator in Spark is a stable sort. Therefore, if the user's SQL does not include a shuffle operation following the OrderBy operator, it could lead to inconsistencies between the results from Spark and Velox.
The problematic SQL we encountered is as follows:
SELECT employee_name, department, salary, FIRST_VALUE(employee_name) OVER w highest_salary FROM basic_pays WINDOW w AS (PARTITION BY department ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) ORDER BY department
As can be seen from the execution plan below, there is no shuffle operation following the OrderBy in the SQL, which is the cause of the inconsistent results.
@JkSelf Stable sort makes sense only if order of input rows for the sort is fixed. However, this is not the case. In the query above, inputs to sort is basic_pays table partitioned on department. Assuming basic_pays is a large table, it needs to be read from multiple nodes and then data needs to be shuffled. Hence, the order of rows in sort input is not guaranteed. Am I missing anything?
@JkSelf Stable sort makes sense only if order of input rows for the sort is fixed. However, this is not the case. In the query above, inputs to sort is basic_pays table partitioned on department. Assuming basic_pays is a large table, it needs to be read from multiple nodes and then data needs to be shuffled. Hence, the order of rows in sort input is not guaranteed. Am I missing anything?
@mbasmanova Yes. If the data volume is large, the data that has been pre-sorted after a shuffle read will also be disrupted. I checked my unit test again and there was only one partition during the shuffle. Therefore, the issue of incorrect results arises.
It seems the orderby operator doesn't need to ensure stable sort. Will closing this PR.
I encountered a situation where SQL contains two consecutive window functions, such as row_number() (partition by a, order by b, c) as rn1 and row_number() (partition by a, b order by c) as rn2. In this case, since Vanilla Spark uses stable sorting, even after shuffle, rn2 and rn1 still maintain a stable relationship. Specifically, for each piece of data where a and b are the same, the value of rn2 - rn1 remains consistent. In such cases, we need stable sorting. cc @JkSelf @mbasmanova @jinchengchenghh
@lyy-pineapple Would you share the query plan? I'd like to understand where sorting / shuffle happens.
@mbasmanova
select
a,
b,
c,
row_number() over (
partition by
c
order by
a,
b
) as rn1,
row_number() over (
partition by
c,
b
order by
a
) as rn 2
from table
where a and b are the same, the value of rn2 - rn1 remains consistent
@lyy-pineapple Can you show your detailed physical plan? Maybe it is due to only one partition and no shuffle after sort.
ROW_NUMBER
with PARTITION BY
involves a shuffle, but this issue is unrelated to whether there is a shuffle or if it is within the same partition. If stable sorting is used, within a partition, when sorting by a, b to get rn1, the data is in an ordered state based on a, b. Then, when sorting by a, since it is stable sorting, when b is the same, the order remains based on rn1, resulting in rn2.
== Physical Plan ==
VeloxColumnarToRowExec
+- ^(384) WindowExecTransformer [row_number() windowspecdefinition(c#19568, b#19567, a#19566 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn2#19559], [c#19568, b#19567], [a#19566 ASC NULLS FIRST]
+- ^(384) SortExecTransformer [c#19568 ASC NULLS FIRST, b#19567 ASC NULLS FIRST, a#19566 ASC NULLS FIRST], false, 0
+- ^(384) WindowExecTransformer [row_number() windowspecdefinition(c#19568, a#19566 ASC NULLS FIRST, b#19567 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn1#19558], [c#19568], [a#19566 ASC NULLS FIRST, b#19567 ASC NULLS FIRST]
+- ^(384) SortExecTransformer [c#19568 ASC NULLS FIRST, a#19566 ASC NULLS FIRST, b#19567 ASC NULLS FIRST], false, 0
+- ^(384) InputIteratorTransformer[a#19566, b#19567, c#19568]
+- ^(384) InputAdapter
+- ^(384) ColumnarExchange hashpartitioning(c#19568, 200), ENSURE_REQUIREMENTS, [a#19566, b#19567, c#19568], [plan_id=108878], [id=#108878], [OUTPUT] List(a:IntegerType, b:IntegerType, c:IntegerType)
+- ^(383) ProjectExecTransformer [hash(c#19568, 42) AS hash_partition_key#19579, a#19566, b#19567, c#19568]
+- ^(383) InputIteratorTransformer[a#19566, b#19567, c#19568]
+- ^(383) InputAdapter
+- ^(383) RowToVeloxColumnar
+- ^(383) BatchScan[a#19566, b#19567, c#19568] xxxx [filters=] RuntimeFilters: []
@lyy-pineapple Thank you for sharing the example, both SQL and query plan. I that Spark shuffles on 'c' first, then sorts by (c, a, b), then by (c, b, a). Both sorts happen within a single machine (I assume single thread as well). There is no shuffle between the 2 sorts. I see how results may be affected by whether sort is stable or not.
That said, it feels like this just happens to work like this, but the user (author of SQL) cannot reasonably rely on that particular behavior. Am I missing something? What would be the logical assumption that a SQL author may have that is violated if stable sort is not used.
Also, I wonder how stable sort is implemented when spilling is required.
@mbasmanova Sorry for the late reply. You make a good point. A better solution would be to add rn1 during the second row_number sorting. The fact that the expected result was achieved without shuffle between the row_number operations in the above scenario is somewhat coincidental. thanks!