spark-rapids
spark-rapids copied to clipboard
[FEA] Support z-ordering acceleration
This is another idea of something that we could try for performance.
z-ordering can really help improve the performance and reduce the data load for a large number of queries. It is explained rather well in the Wikipedia article, but the performance benefits are really shown in this blog post from Cloudera about Apache Impala support for it. In addition to Impala, Delta lake in Databricks supports it.
It would be really interesting to see what performance impact z-order on different columns can have to the performance of NDS runs. It also sounds like it would be something that we could accelerate on the GPU fairly easily.
As such the first steps for this task would be
- Create a prototype on the CPU that would let us do some basic z-order experiments with NDS queries.
- If the query performance is good, then prototype this same thing on the GPU and see what the speedup is for writes over the CPU.
After that we then would need to look at how would we turn this into a product that our users could take advantage of. Do we just wait for Databricks to open source it through deltalake and fit it into that? Do we write a custom UDF that will output binary data that we can sort on and then throw away? Something else?
I took a look at the impala code and mode of it exists here.
https://github.com/apache/impala/blob/da14fdcf35da28d2ff86c6ca9413a95cc3f8f346/be/src/util/tuple-row-compare.h https://github.com/apache/impala/blob/b28da054f3595bb92873433211438306fc22fbc7/be/src/util/tuple-row-compare.cc
They don't manifest the zorder byte array. The number of bytes used for each type is determined by the largest type being ordered by. The sizes are set here
https://github.com/apache/impala/blob/51126259bea2491316decef63d584f713d02a9a6/be/src/runtime/types.h#L312-L352
But from the comments it looks like they only support up to 128 bits.
Should spend some more time to really understand fully how they transform the values so that they get eh zpattern that they want. It also looks like they normalize the number values to be unsigned values. It also looks like they play similar games with float and double, but slightly differently.
We should also think about how we want to handle padding. Normally with zordering for two columns A and B we would have something like.
[A-0, B-0, A-1, B-2, ... A-N, B-N], but if the length of A and B differ, i.e. A-int and B-byte can we get away with.
[A-0, B-0, A-1, A-2, A-3], or should we pad out B with zeros? [A-0, B-0, A-1, 0, A-2, 0, A-3, 0]. What about if it is a String instead of a byte?
Also we might want to think about the size of the comparison that we are doing, and what is the most efficient for the GPU. Is a list of bytes as fast as a list of ints?
Other questions:
How do we handle arrays or other nested types? How do we handle nulls? Do we want to make this flexible so we can adapt it to deltalake, or are we just going to concentrate on something with NDS to start out with?
It looks like deltalake has released zorder support to open source in versions 2.0 and above. The algorithm appears to be more complex than impala's, and very different.
Delta will map the columns to integers using the RangePartitioner code. This means that for each column that you want to partition by, you will run the final stage of that query N + 2 times, where N is the number of columns in the zorder. The first N queries will use the RangePartitioner code from Spark to get range cutoffs for up to M partitions per column. M is set by a config a deltalake. Then it will generate a new expression that extracts the partition ID for each item in the corresponding column, and finally interleave the bits to produce what the data will be ordered by. It will then range partition the data based off of this new String of interleaved bits (This requires running the query twice. Once to get the ranges and another to actually do the partitioning) Be aware that there is a config to add a random byte as a second column to range partition by so that it can help spread out some data skew issues.
The configs in question are here
https://github.com/delta-io/delta/blob/91051df45f573a9dcdeaf9173aff6ba18314c82e/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala#L611-L625
The code is here
https://github.com/delta-io/delta/blob/91051df45f573a9dcdeaf9173aff6ba18314c82e/core/src/main/scala/org/apache/spark/sql/delta/skipping/MultiDimClustering.scala#L82-L88
https://github.com/delta-io/delta/blob/56e1b9b417e8815cd7003eaf1796988c670b8b0d/core/src/main/scala/org/apache/spark/sql/delta/expressions/InterleaveBits.scala
This is all really complicated, so here is an example.
Lets say we have a dataset that has 5 columns, but we are going to order by just two of them (A: Integer and B: Long). For this example we are going to have a relatively small amount of data (10 rows).
| A | B |
|---|---|
| 100 | 100 |
| 200 | 100 |
| 300 | 100 |
| 400 | 200 |
| 500 | 700 |
| 600 | 0 |
| 700 | 800 |
| 800 | 900 |
| 900 | 0 |
| 0 | 0 |
Now Delta will try to get a partition ranges for each column. We are only going to use 3 ranges to make this simpler. It uses the RangePartitioner to do this that does sub-sampling/etc, but I am just manually going to do this.
For A
Partition 0 is A < 300
Partition 1 is 300 <= A < 600
Partition 2 is 600 <= A
For B
Partition 0 is B < 100
Partition 1 is 100 <= B < 200
Partition 2 is 200 <= B
We then end up with the partition IDs for each column along with the original data
| A | B | A-PART | B-PART |
|---|---|---|---|
| 100 | 100 | 0 | 1 |
| 200 | 100 | 0 | 1 |
| 300 | 100 | 1 | 1 |
| 400 | 200 | 1 | 2 |
| 500 | 700 | 1 | 2 |
| 600 | 0 | 2 | 0 |
| 700 | 800 | 2 | 2 |
| 800 | 900 | 2 | 2 |
| 900 | 0 | 2 | 0 |
| 0 | 0 | 0 | 0 |
Then it will interleave the bits of the partitions to get the actual z-order field (as a string) and possibly include a random byte column at the end when it does the range partitioning.
Then it is done. There is no sorting done (as far as I can tell). It is just what file the data is written to so you would not get any wins for row groups within a file. As far as I can tell.