incubator-gluten icon indicating copy to clipboard operation
incubator-gluten copied to clipboard

[GLUTEN-4241][VL] Add plan node to convert Vanilla spark columnar format data to Velox columnar format data

Open boneanxs opened this issue 1 year ago • 23 comments

What changes were proposed in this pull request?

Add plan node to convert Vanilla spark columnar format data to Velox columnar format data

This pr tries to convert from Spark columnar batch -> arrow Array -> velox columnar batch, all primitive types(decimal types requires: https://github.com/facebookincubator/velox/pull/8957) and Map/Array types are supported.

Users can enable it by setting spark.gluten.sql.columnar.vanillaColumnarToNativeColumnar, which is by default false.

(Fixes: #4241)

How was this patch tested?

Added tests

boneanxs avatar Feb 29 '24 12:02 boneanxs

https://github.com/oap-project/gluten/issues/4241

github-actions[bot] avatar Feb 29 '24 12:02 github-actions[bot]

Run Gluten Clickhouse CI

github-actions[bot] avatar Feb 29 '24 12:02 github-actions[bot]

Run Gluten Clickhouse CI

github-actions[bot] avatar Mar 04 '24 12:03 github-actions[bot]

Run Gluten Clickhouse CI

github-actions[bot] avatar Mar 04 '24 12:03 github-actions[bot]

Do you convert to Velox format directly? or convert to Arrow then to Velox?

FelixYBW avatar Mar 05 '24 06:03 FelixYBW

Do you convert to Velox format directly? or convert to Arrow then to Velox?

Convert to Arrow firstly, then to velox

boneanxs avatar Mar 05 '24 10:03 boneanxs

Do you convert to Velox format directly? or convert to Arrow then to Velox?

Convert to Arrow firstly, then to velox

make sense. We may upstream the parquet columnar format => arrow format to Spark

FelixYBW avatar Mar 05 '24 17:03 FelixYBW

Run Gluten Clickhouse CI

github-actions[bot] avatar Mar 06 '24 06:03 github-actions[bot]

Run Gluten Clickhouse CI

github-actions[bot] avatar Mar 07 '24 03:03 github-actions[bot]

Run Gluten Clickhouse CI

github-actions[bot] avatar Mar 07 '24 07:03 github-actions[bot]

Run Gluten Clickhouse CI

github-actions[bot] avatar Mar 07 '24 08:03 github-actions[bot]

@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?

FelixYBW avatar Mar 07 '24 19:03 FelixYBW

@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?

@boneanxs If you'd like to address the questions also, thanks.

I believe the patch ~~reused our old ArrowWritableColumnarVector code~~ to write Spark columnar data to native so there should be a bunch of "onheap => offheap" copies. And we should count on how much of copies the implementation exactly does ideally. @boneanxs You can also check on this part.

~~What I was worried about is ArrowWritableColumnarVector have not actually been under active maintenance for a period of time~~ so we should have more tests here especially for complex data types.

Also would be great if you could share thoughts about the risk of memory leaks this approach may bring @boneanxs . Overall the PR's writing looks fine to me and we had removed most of the unsafe APIs but still there might be some. Let's check this part carefully too.

zhztheplayer avatar Mar 07 '24 23:03 zhztheplayer

Let's document the conversion clearly here. I have a impression that parquet-mr can take use of offheap memory for columnar data. If so the best case is that we can avoid any memcpy during the conversion (not considered about arrow=>velox conversion). But we will needs some work to avoid it.

If parquet scan is onheap, we can't avoid the onheap->offheap copy, but then we should reuse the offheap buffer.

If we can't make sure how the conversion happens, let's use memcpy as long we need, make sure it's 100% no memleak for now.

FelixYBW avatar Mar 08 '24 01:03 FelixYBW

@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?

Hey, @FelixYBW for each columnar batch, before the conversion, this pr tries to allocate offheap memory to perform spark columnar batch -> arrow array(here this pr doesn't treat onheap/offheap spark columnar batch, it uses ArrowFieldWriter(which is implemented by spark) to do this transformation, can see ArrowColumnarBatchConverter#write, which is this pr newly added).

So yes, there will be one memcpy from spark columnar batch -> arrow array, no matter spark columnar batch is on heap or off heap.

In native side, I simply uses importFromArrow to convert arrow array to velox columnar batch, there will be some memcpy either for String, timestamp, shortDecimal, etc(I'm not fully checked).

What I was worried about is ArrowWritableColumnarVector have not actually been under active maintenance for a period of time so we should have more tests here especially for complex data types.

Sry @zhztheplayer, I might miss something, are you saying ArrowFieldWriter? This pr uses ArrowFieldWriter to do the conversion, it's wildly used by pyspark, so reusing it here should be safe.

Also would be great if you could share thoughts about the risk of memory leaks this approach may bring @boneanxs .

The extra memory here acquired are arrowArray, cSchema, velox columnar batch, and they're all well handled by TaskResources and recyclePayload, arrowArray and velox columnar batch will be released during each iterator, and cSchema will be released until the iterator ends, TaskResources is the extra ensureance that all allocated memory will be released if the iterator interrupted abnormally.

I have a impression that parquet-mr can take use of offheap memory for columnar data. If so the best case is that we can avoid any memcpy during the conversion

I'm not sure whether parquet-mr can directly use offheap memory, but spark can export parquet data as offheap columnar batch by enabling spark.sql.columnVector.offheap.enabled, but directly reuse offheap columnar batch to arrow array is not supported in this pr(by the way, can we do so? Not sure abt this)

I can also try to do benchmarks comparing with VanillarColumnarBatchToRow->RowToVeloxColumnar if necessary, I think at least for memcpy, RowToVeloxColumnar also does an extra copy even if the row might from an offheap ColumnarBatch

boneanxs avatar Mar 08 '24 10:03 boneanxs

This pr uses ArrowFieldWriter to do the conversion, it's wildly used by pyspark, so reusing it here should be safe.

Ahh then it's fine enough. Some of the code is so similar to Gluten's ArrowWritableColumnarVector which had pasted some code from vanilla Spark so I was led by wrong intuition. Sorry for the mistake.

zhztheplayer avatar Mar 08 '24 14:03 zhztheplayer

does an extra copy even if the row might from an off

Thank you for explanation. You may try to enable spark.sql.columnVector.offheap.enabled.

onheap to offheap memcpy is more expensive than offheap to offheap. Do you know how ArrowFieldWriter allocate memory? is it from unsafe API or direct memory?

@zhztheplayer Do you know?

FelixYBW avatar Mar 08 '24 18:03 FelixYBW

Hey @FelixYBW ArrowFieldWriter calls arrow ValueVector, which is internally uses ArrowBuf to store values, so it should be offheap memory. https://github.com/apache/spark/blob/1f58f4c68e8de03a8b4c314488dd4f342beb8de2/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala#L53

for example, IntVector https://github.com/apache/arrow/blob/b202ede131e3c54628616330162f7854ba0c0d70/java/vector/src/main/java/org/apache/arrow/vector/IntVector.java#L151

ValueBuffer is allocated from arrow ArrowBuf https://github.com/apache/arrow/blob/b202ede131e3c54628616330162f7854ba0c0d70/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java#L73

boneanxs avatar Mar 12 '24 10:03 boneanxs

I think it's OK to have it disabled by default.

@boneanxs Can you add a CI case for the feature to run TPC-H / TPC-DS tests?

Example:

https://github.com/apache/incubator-gluten/blob/8ab9b012db7ebbd7110ba1288b5c4e8a702ccc09/.github/workflows/velox_docker.yml#L293

You can use gluten-it arg --extra-conf=... to enable C2C during testing.

Also if -s=30.0 (SF 30) is too large, we can make it -s=1.0.

And sorry for late response. Let's have this merged asap.

zhztheplayer avatar Apr 10 '24 02:04 zhztheplayer

Oh, just noted the PR is still open and have many conflict. @boneanxs would you like to continue?

FelixYBW avatar Apr 10 '24 02:04 FelixYBW

Hey @FelixYBW @zhztheplayer, yea, I'm willing to continue this pr. After last comment, I actually have run some benchmarks in my local environment, and found there's no obviously improvement comparing VanillaColumnar->row->VeloxColumnar(Also I notice there's a compatible issue since this pr relies on spark arrow version, which is conflict with gluten). I'm still checking why this(performance not improved) could happen(but didn't catch time recently), will update here once I'm done other things.

boneanxs avatar Apr 10 '24 06:04 boneanxs

@boneanxs hi, could you please help to do rebase? There's a big commit(renaming io.glutneproject to org.apache.gluten) when we migrate to apache repo.

thanks -yuan

zhouyuan avatar May 20 '24 01:05 zhouyuan

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Jul 04 '24 01:07 github-actions[bot]

comment to keep the PR open as it could be a valuable topic

zhztheplayer avatar Jul 12 '24 00:07 zhztheplayer

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Sep 09 '24 01:09 github-actions[bot]

This PR was auto-closed because it has been stalled for 10 days with no activity. Please feel free to reopen if it is still valid. Thanks.

github-actions[bot] avatar Sep 19 '24 01:09 github-actions[bot]