[GLUTEN-4241][VL] Add plan node to convert Vanilla spark columnar format data to Velox columnar format data
What changes were proposed in this pull request?
Add plan node to convert Vanilla spark columnar format data to Velox columnar format data
This pr tries to convert from Spark columnar batch -> arrow Array -> velox columnar batch, all primitive types(decimal types requires: https://github.com/facebookincubator/velox/pull/8957) and Map/Array types are supported.
Users can enable it by setting spark.gluten.sql.columnar.vanillaColumnarToNativeColumnar, which is by default false.
(Fixes: #4241)
How was this patch tested?
Added tests
https://github.com/oap-project/gluten/issues/4241
Run Gluten Clickhouse CI
Run Gluten Clickhouse CI
Run Gluten Clickhouse CI
Do you convert to Velox format directly? or convert to Arrow then to Velox?
Do you convert to Velox format directly? or convert to Arrow then to Velox?
Convert to Arrow firstly, then to velox
Do you convert to Velox format directly? or convert to Arrow then to Velox?
Convert to Arrow firstly, then to velox
make sense. We may upstream the parquet columnar format => arrow format to Spark
Run Gluten Clickhouse CI
Run Gluten Clickhouse CI
Run Gluten Clickhouse CI
Run Gluten Clickhouse CI
@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?
@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?
@boneanxs If you'd like to address the questions also, thanks.
I believe the patch ~~reused our old ArrowWritableColumnarVector code~~ to write Spark columnar data to native so there should be a bunch of "onheap => offheap" copies. And we should count on how much of copies the implementation exactly does ideally. @boneanxs You can also check on this part.
~~What I was worried about is ArrowWritableColumnarVector have not actually been under active maintenance for a period of time~~ so we should have more tests here especially for complex data types.
Also would be great if you could share thoughts about the risk of memory leaks this approach may bring @boneanxs . Overall the PR's writing looks fine to me and we had removed most of the unsafe APIs but still there might be some. Let's check this part carefully too.
Let's document the conversion clearly here. I have a impression that parquet-mr can take use of offheap memory for columnar data. If so the best case is that we can avoid any memcpy during the conversion (not considered about arrow=>velox conversion). But we will needs some work to avoid it.
If parquet scan is onheap, we can't avoid the onheap->offheap copy, but then we should reuse the offheap buffer.
If we can't make sure how the conversion happens, let's use memcpy as long we need, make sure it's 100% no memleak for now.
@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?
Hey, @FelixYBW for each columnar batch, before the conversion, this pr tries to allocate offheap memory to perform spark columnar batch -> arrow array(here this pr doesn't treat onheap/offheap spark columnar batch, it uses ArrowFieldWriter(which is implemented by spark) to do this transformation, can see ArrowColumnarBatchConverter#write, which is this pr newly added).
So yes, there will be one memcpy from spark columnar batch -> arrow array, no matter spark columnar batch is on heap or off heap.
In native side, I simply uses importFromArrow to convert arrow array to velox columnar batch, there will be some memcpy either for String, timestamp, shortDecimal, etc(I'm not fully checked).
What I was worried about is ArrowWritableColumnarVector have not actually been under active maintenance for a period of time so we should have more tests here especially for complex data types.
Sry @zhztheplayer, I might miss something, are you saying ArrowFieldWriter? This pr uses ArrowFieldWriter to do the conversion, it's wildly used by pyspark, so reusing it here should be safe.
Also would be great if you could share thoughts about the risk of memory leaks this approach may bring @boneanxs .
The extra memory here acquired are arrowArray, cSchema, velox columnar batch, and they're all well handled by TaskResources and recyclePayload, arrowArray and velox columnar batch will be released during each iterator, and cSchema will be released until the iterator ends, TaskResources is the extra ensureance that all allocated memory will be released if the iterator interrupted abnormally.
I have a impression that parquet-mr can take use of offheap memory for columnar data. If so the best case is that we can avoid any memcpy during the conversion
I'm not sure whether parquet-mr can directly use offheap memory, but spark can export parquet data as offheap columnar batch by enabling spark.sql.columnVector.offheap.enabled, but directly reuse offheap columnar batch to arrow array is not supported in this pr(by the way, can we do so? Not sure abt this)
I can also try to do benchmarks comparing with VanillarColumnarBatchToRow->RowToVeloxColumnar if necessary, I think at least for memcpy, RowToVeloxColumnar also does an extra copy even if the row might from an offheap ColumnarBatch
This pr uses ArrowFieldWriter to do the conversion, it's wildly used by pyspark, so reusing it here should be safe.
Ahh then it's fine enough. Some of the code is so similar to Gluten's ArrowWritableColumnarVector which had pasted some code from vanilla Spark so I was led by wrong intuition. Sorry for the mistake.
does an extra copy even if the row might from an off
Thank you for explanation. You may try to enable spark.sql.columnVector.offheap.enabled.
onheap to offheap memcpy is more expensive than offheap to offheap. Do you know how ArrowFieldWriter allocate memory? is it from unsafe API or direct memory?
@zhztheplayer Do you know?
Hey @FelixYBW ArrowFieldWriter calls arrow ValueVector, which is internally uses ArrowBuf to store values, so it should be offheap memory.
https://github.com/apache/spark/blob/1f58f4c68e8de03a8b4c314488dd4f342beb8de2/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala#L53
for example, IntVector https://github.com/apache/arrow/blob/b202ede131e3c54628616330162f7854ba0c0d70/java/vector/src/main/java/org/apache/arrow/vector/IntVector.java#L151
ValueBuffer is allocated from arrow ArrowBuf
https://github.com/apache/arrow/blob/b202ede131e3c54628616330162f7854ba0c0d70/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java#L73
I think it's OK to have it disabled by default.
@boneanxs Can you add a CI case for the feature to run TPC-H / TPC-DS tests?
Example:
https://github.com/apache/incubator-gluten/blob/8ab9b012db7ebbd7110ba1288b5c4e8a702ccc09/.github/workflows/velox_docker.yml#L293
You can use gluten-it arg --extra-conf=... to enable C2C during testing.
Also if -s=30.0 (SF 30) is too large, we can make it -s=1.0.
And sorry for late response. Let's have this merged asap.
Oh, just noted the PR is still open and have many conflict. @boneanxs would you like to continue?
Hey @FelixYBW @zhztheplayer, yea, I'm willing to continue this pr. After last comment, I actually have run some benchmarks in my local environment, and found there's no obviously improvement comparing VanillaColumnar->row->VeloxColumnar(Also I notice there's a compatible issue since this pr relies on spark arrow version, which is conflict with gluten). I'm still checking why this(performance not improved) could happen(but didn't catch time recently), will update here once I'm done other things.
@boneanxs hi, could you please help to do rebase? There's a big commit(renaming io.glutneproject to org.apache.gluten) when we migrate to apache repo.
thanks -yuan
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
comment to keep the PR open as it could be a valuable topic
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
This PR was auto-closed because it has been stalled for 10 days with no activity. Please feel free to reopen if it is still valid. Thanks.