modin icon indicating copy to clipboard operation
modin copied to clipboard

to_parquet() needs option of how many files to create, or like rays implementation: num_rows_per_file

Open Liquidmasl opened this issue 1 year ago • 2 comments

Currently loading a .parquet folder with mutliple .parquet files is kinda broken with read_parquet()

But generally is it harder for ray to manage less big files, then many small files, as the RAM requirement goes up.

So loading a large dataset can only be done by loading small batches and concatenating them. https://github.com/modin-project/modin/issues/6639#issuecomment-2209027785 This is all fine and well

BUT the current to_parquet() implementation does not support choosing how many parts, or how big those parts of saved df might be. It is (as it seams) just the number of logical processors. Also repartitioning before writing to parquet does not change this.

So while I can load a large dataset a million lines at a time, and concatenating the partial dataframes, i can (on my pc) just save it as 20 parts in the parquet. Those parts are then to big to load again, on the same machine.

Rays implementation of write_parquet gives the option to suggest the lines per file: https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_parquet.html

This would give more control over the size of parquet partitions, and maybe even enable loading the same data again. (as soon as read_parquet()) works again

Liquidmasl avatar Aug 06 '24 17:08 Liquidmasl

Hi @Liquidmasl

The number of files in the output depends on the number of partitions in the DataFrame.

If you want to customize the number of files, you need to change NPartitions and call _repartition by axis=0 on the DataFrame:

import modin.config as cfg

with cfg.context(NPartitions=1):
    df = df._repartition(axis=0)

df.to_parquet(...)

Be careful, _repartition requires loading the full DataFrame into RAM. If you do not have enough memory, you can customize the NPartitions before creating the DataFrame:

import modin.config as cfg
import modin.pandas as pd

cfg.NPartitions.put(1)

df = pd.DataFrame(...)
df.to_parquet(...)

Retribution98 avatar Aug 07 '24 16:08 Retribution98

Hi @Liquidmasl

The number of files in the output depends on the number of partitions in the DataFrame.

If you want to customize the number of files, you need to change NPartitions and call _repartition by axis=0 on the DataFrame:

import modin.config as cfg

with cfg.context(NPartitions=1):
    df = df._repartition(axis=0)

df.to_parquet(...)

Be careful, _repartition requires loading the full DataFrame into RAM. If you do not have enough memory, you can customize the NPartitions before creating the DataFrame:

import modin.config as cfg
import modin.pandas as pd

cfg.NPartitions.put(1)

df = pd.DataFrame(...)
df.to_parquet(...)

Hi there, good morning and thank you for your numerous responses to my cry for helps. Also, sorry for them

I did try to change number of partitions, and repartition before saving, without success. But between all I have tried I might have made a mistake. I will try again.

Although I still think some separate logic here would be nice. It might not be practical to use so many partitions while processing a dataset, but still necessary to have small .parquet parts for transferring data, or further processing them on less powerful machines. (In case repartitioning does not work because of RAM limitations) Rays documentation states that the row_per_file parameter is a suggestion that might be over or undershot. Maybe an approach for modin similar to this would be neat. This logic could still just apply to each partition separately, so to not hurt parallelism. If a partition is out of points for the last .parquet part of a partition it could just have less points.

Liquidmasl avatar Aug 08 '24 07:08 Liquidmasl

Repartitioning should work fine. My privious concerns are unwarrented as repartitioning is impressively fast.

Liquidmasl avatar Sep 19 '24 11:09 Liquidmasl