delta-rs icon indicating copy to clipboard operation
delta-rs copied to clipboard

feat: no longer load full table into ram in write by using concurrent write

Open aersam opened this issue 1 year ago • 20 comments

Description

This is a followup of #2265

It additionally uses streams/channels to concurrently write at the cost of more memory consumption. Default is keeping one recordbatch in RAM only, so it's opt-in.

I tested this with a local file and it went from 700s to 200s if I work with 10 concurrent streams. Of course memory consumption goes up, but given that we currently load the whole table in RAM, it's OK :)

This adds a depenency on async-channel as I need a multi-consumer channel.

Related Issue(s)

Fixes #2255

aersam avatar Mar 15 '24 14:03 aersam

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

github-actions[bot] avatar Mar 15 '24 14:03 github-actions[bot]

@aersam your old PR can be closed then? Can you also add a python test where you use this concurrent_stream parameter?

ion-elgreco avatar Mar 15 '24 14:03 ion-elgreco

@aersam your old PR can be closed then? Can you also add a python test where you use this concurrent_stream parameter?

Yes, if you're ok to add an extra dependency?

aersam avatar Mar 15 '24 14:03 aersam

Let's get the other one in first

ion-elgreco avatar Mar 15 '24 15:03 ion-elgreco

Let's get the other one in first

I hope it's ok to prioritize this one from my side to not have to keep both branches up-to-date

aersam avatar Mar 21 '24 11:03 aersam

Let's get the other one in first

I hope it's ok to prioritize this one from my side to not have to keep both branches up-to-date

That's ok!

ion-elgreco avatar Mar 21 '24 11:03 ion-elgreco

@aersam I think we should max out the concurrent streams for python users.

In most use cases we are passing a recordBatchReader where the recordBatches are already in memory before constructing the reader, in that case you won't see any memory difference. And it wouldn't be different than the prior behavior since the reader was always collected.

I also have one suggestion on the python side, I think it's better if we simplify it and just provide a parameter called parallelize, which is always set to True. If users want to control the amount of concurrent streams, they should set an env_var which we then can parse in python_lib, if it's not set and parallelize = True, then we take the max possible streams.

ion-elgreco avatar Mar 28 '24 22:03 ion-elgreco

How about parallelize:bool|int on python side? 🙂

aersam avatar Mar 29 '24 07:03 aersam

@aersam that also works! :)

ion-elgreco avatar Mar 29 '24 07:03 ion-elgreco

I finally had the time to update this branch with the new parallel parameter in python. Hope it's looking good now!

aersam avatar Apr 15 '24 08:04 aersam

@aersam btw, did you have any profiling numbers on speed ups/memory trade offs when parallel is True. Would be nice to share those in the release notes later on

ion-elgreco avatar Apr 15 '24 09:04 ion-elgreco

I only did some manual test on my own data, but could probably write some benchmark in python, using duckdb or polars as source. Would it make sense to add this to the code somehow?

aersam avatar Apr 15 '24 10:04 aersam

@aersam here you could add it, and even maybe reuse some of the benchmarks there: https://github.com/delta-io/delta-rs/tree/main/crates/benchmarks

ion-elgreco avatar Apr 15 '24 14:04 ion-elgreco

I did some very basic benchmarking, but the results were not as I hoped :) While RAM consumption is significantly lower, the speed is not good enough yet. I think maybe the channel must be bigger, I'll do some more testing

I did my test quick and dirty using python, I can share the code if you want. Basically it's this:

import duckdb
from deltalake.writer import write_deltalake
from uuid import uuid4

with duckdb.connect() as con: # get your 42.parquet here: https://duckdb.org/2024/03/26/42-parquet-a-zip-bomb-for-the-big-data-age.html
    con.execute("select b, random() as a from read_parquet('42.parquet') limit 300000000")
    reader = con.fetch_record_batch()
    write_deltalake(f"_test/{uuid4()}", reader, schema=reader.schema, mode="overwrite", engine="rust") 

aersam avatar Apr 17 '24 19:04 aersam

Pretty sure the non-async write causes issues. But object_store 0.10 will change a lot there, so maybe better to wait for that

aersam avatar Apr 19 '24 05:04 aersam

Pretty sure the non-async write causes issues. But object_store 0.10 will change a lot there, so maybe better to wait for that

Yes let's see how effective these changes are with new upload trait

ion-elgreco avatar Apr 19 '24 06:04 ion-elgreco

@aersam fyi, ObjectStore just got bumped in the repo to 0.10

ion-elgreco avatar Jun 11 '24 20:06 ion-elgreco

@aersam hey, do you think you have time to resolve the merge conflicts?

ion-elgreco avatar Jul 24 '24 17:07 ion-elgreco

@aersam hey, do you think you have time to resolve the merge conflicts?

I'm sorry, I had a bit a shift in priorities, so it will take time to do so. Especially since there were quite some changes in the writer as I see

aersam avatar Jul 24 '24 17:07 aersam

@aersam hey, do you think you have time to resolve the merge conflicts?

I'm sorry, I had a bit a shift in priorities, so it will take time to do so. Especially since there were quite some changes in the writer as I see

No worries! Just ping me once it's ready for another review round

ion-elgreco avatar Jul 24 '24 17:07 ion-elgreco