velox icon indicating copy to clipboard operation
velox copied to clipboard

Support std::stable_sort in Orderby operator

Open JkSelf opened this issue 10 months ago • 8 comments

When running Spark unit tests, we noticed that the results from Gluten are inconsistent with those from Vanilla Spark. The input data contains three fields: employee_name, department, and salary.

The query statement is as follows:

SELECT employee_name, department, salary, FIRST_VALUE(employee_name) OVER w AS highest_salary
FROM basic_pays
WINDOW w AS (
  PARTITION BY department
  ORDER BY salary DESC
  RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
)
ORDER BY department

This query first uses a window function to partition the data by the department field and sorts it in descending order by the salary field. The data output from the window function is already sorted by the department and salary fields. After the window operation, there is an additional ORDER BY sorting based on the department field.

In Spark, the final ORDER BY sorting is stable, which means that the salary field will also maintain a descending order in the final results. However, in Velox, because std::sort is used for sorting, the stability of the final sorting result cannot be guaranteed.

This PR aims to add an option to the ORDER BY sorting to control whether stable sorting is enabled.

JkSelf avatar Apr 25 '24 05:04 JkSelf

Deploy Preview for meta-velox canceled.

Name Link
Latest commit 759c1669159b9b255235a15a6049fbe926aa3de6
Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/6629e96f76d0c50008a6b4d7

netlify[bot] avatar Apr 25 '24 05:04 netlify[bot]

@mbasmanova Can you help to review? Thanks.

JkSelf avatar Apr 25 '24 05:04 JkSelf

@JkSelf : Window doesn't guarantee sorted order of results.

Also ORDER BY even if with SortMerge shuffle only guarantees ORDER BY department.

There isn't any part of this SQL that implies the results should be ordered by salary as well.

The test seems over-restrictive to me.

aditi-pandit avatar Apr 25 '24 21:04 aditi-pandit

Please update the document

jinchengchenghh avatar May 09 '24 03:05 jinchengchenghh

@mbasmanova @aditi-pandit Indeed, enabling stable sort may have an impact on performance, but in the current Gluten environment, we are facing issues with inconsistent results, as the results of the Spark sort operator are stable sorted. Therefore, could we consider providing flexibility to enable or disable stable sort through configuration options?

JkSelf avatar May 11 '24 07:05 JkSelf

in the current Gluten environment, we are facing issues with inconsistent results

Would you elaborate a bit on the use cases that are affected? Are you referring to unit tests? As I mentioned earlier, order of inputs cannot be guaranteed for distributed execution, hence, I cannot see how this functionality is useful for production scenarios.

mbasmanova avatar May 13 '24 13:05 mbasmanova

in the current Gluten environment, we are facing issues with inconsistent results

Would you elaborate a bit on the use cases that are affected? Are you referring to unit tests? As I mentioned earlier, order of inputs cannot be guaranteed for distributed execution, hence, I cannot see how this functionality is useful for production scenarios.

@mbasmanova Indeed, if a sort operation is immediately followed by a shuffle operation, the significance of whether the sort is stable or unstable is not substantial. However, the issue we are currently facing is that the result of the OrderBy operator in Spark is a stable sort. Therefore, if the user's SQL does not include a shuffle operation following the OrderBy operator, it could lead to inconsistencies between the results from Spark and Velox.

The problematic SQL we encountered is as follows:

SELECT employee_name, department, salary, FIRST_VALUE(employee_name) OVER w highest_salary FROM basic_pays WINDOW w AS (PARTITION BY department ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) ORDER BY department

As can be seen from the execution plan below, there is no shuffle operation following the OrderBy in the SQL, which is the cause of the inconsistent results.

image

JkSelf avatar May 14 '24 00:05 JkSelf

@JkSelf Stable sort makes sense only if order of input rows for the sort is fixed. However, this is not the case. In the query above, inputs to sort is basic_pays table partitioned on department. Assuming basic_pays is a large table, it needs to be read from multiple nodes and then data needs to be shuffled. Hence, the order of rows in sort input is not guaranteed. Am I missing anything?

mbasmanova avatar May 14 '24 13:05 mbasmanova

@JkSelf Stable sort makes sense only if order of input rows for the sort is fixed. However, this is not the case. In the query above, inputs to sort is basic_pays table partitioned on department. Assuming basic_pays is a large table, it needs to be read from multiple nodes and then data needs to be shuffled. Hence, the order of rows in sort input is not guaranteed. Am I missing anything?

@mbasmanova Yes. If the data volume is large, the data that has been pre-sorted after a shuffle read will also be disrupted. I checked my unit test again and there was only one partition during the shuffle. Therefore, the issue of incorrect results arises.

JkSelf avatar May 15 '24 07:05 JkSelf

It seems the orderby operator doesn't need to ensure stable sort. Will closing this PR.

JkSelf avatar May 15 '24 07:05 JkSelf

I encountered a situation where SQL contains two consecutive window functions, such as row_number() (partition by a, order by b, c) as rn1 and row_number() (partition by a, b order by c) as rn2. In this case, since Vanilla Spark uses stable sorting, even after shuffle, rn2 and rn1 still maintain a stable relationship. Specifically, for each piece of data where a and b are the same, the value of rn2 - rn1 remains consistent. In such cases, we need stable sorting. cc @JkSelf @mbasmanova @jinchengchenghh

lyy-pineapple avatar Jul 16 '24 03:07 lyy-pineapple

@lyy-pineapple Would you share the query plan? I'd like to understand where sorting / shuffle happens.

mbasmanova avatar Jul 16 '24 12:07 mbasmanova

@mbasmanova

select
    a,
   b,
    c,
    row_number() over (
        partition by
            c
        order by
            a,
            b
    ) as rn1,
    row_number() over (
        partition by
            c,
            b
        order by
            a
    ) as rn 2
from table

where a and b are the same, the value of rn2 - rn1 remains consistent

lyy-pineapple avatar Jul 18 '24 01:07 lyy-pineapple

@lyy-pineapple Can you show your detailed physical plan? Maybe it is due to only one partition and no shuffle after sort.

JkSelf avatar Jul 18 '24 03:07 JkSelf

ROW_NUMBER with PARTITION BY involves a shuffle, but this issue is unrelated to whether there is a shuffle or if it is within the same partition. If stable sorting is used, within a partition, when sorting by a, b to get rn1, the data is in an ordered state based on a, b. Then, when sorting by a, since it is stable sorting, when b is the same, the order remains based on rn1, resulting in rn2.

== Physical Plan ==
VeloxColumnarToRowExec
+- ^(384) WindowExecTransformer [row_number() windowspecdefinition(c#19568, b#19567, a#19566 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn2#19559], [c#19568, b#19567], [a#19566 ASC NULLS FIRST]
   +- ^(384) SortExecTransformer [c#19568 ASC NULLS FIRST, b#19567 ASC NULLS FIRST, a#19566 ASC NULLS FIRST], false, 0
      +- ^(384) WindowExecTransformer [row_number() windowspecdefinition(c#19568, a#19566 ASC NULLS FIRST, b#19567 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn1#19558], [c#19568], [a#19566 ASC NULLS FIRST, b#19567 ASC NULLS FIRST]
         +- ^(384) SortExecTransformer [c#19568 ASC NULLS FIRST, a#19566 ASC NULLS FIRST, b#19567 ASC NULLS FIRST], false, 0
            +- ^(384) InputIteratorTransformer[a#19566, b#19567, c#19568]
               +- ^(384) InputAdapter
                  +- ^(384) ColumnarExchange hashpartitioning(c#19568, 200), ENSURE_REQUIREMENTS, [a#19566, b#19567, c#19568], [plan_id=108878], [id=#108878], [OUTPUT] List(a:IntegerType, b:IntegerType, c:IntegerType)
                     +- ^(383) ProjectExecTransformer [hash(c#19568, 42) AS hash_partition_key#19579, a#19566, b#19567, c#19568]
                        +- ^(383) InputIteratorTransformer[a#19566, b#19567, c#19568]
                           +- ^(383) InputAdapter
                              +- ^(383) RowToVeloxColumnar
                                 +- ^(383) BatchScan[a#19566, b#19567, c#19568] xxxx [filters=] RuntimeFilters: []

image

lyy-pineapple avatar Jul 18 '24 03:07 lyy-pineapple

@lyy-pineapple Thank you for sharing the example, both SQL and query plan. I that Spark shuffles on 'c' first, then sorts by (c, a, b), then by (c, b, a). Both sorts happen within a single machine (I assume single thread as well). There is no shuffle between the 2 sorts. I see how results may be affected by whether sort is stable or not.

That said, it feels like this just happens to work like this, but the user (author of SQL) cannot reasonably rely on that particular behavior. Am I missing something? What would be the logical assumption that a SQL author may have that is violated if stable sort is not used.

Also, I wonder how stable sort is implemented when spilling is required.

mbasmanova avatar Jul 18 '24 14:07 mbasmanova

@mbasmanova Sorry for the late reply. You make a good point. A better solution would be to add rn1 during the second row_number sorting. The fact that the expected result was achieved without shuffle between the row_number operations in the above scenario is somewhat coincidental. thanks!

lyy-pineapple avatar Jul 22 '24 07:07 lyy-pineapple