petastorm icon indicating copy to clipboard operation
petastorm copied to clipboard

Performance on large amounts of data

Open jaycunningham-8451 opened this issue 2 years ago • 1 comments

Hello!

I'm attempting to train a relatively simple transformer model on a large amount of data (35m rows, 20 features). The data have been materialized as parquet, where each column is an array of size ~30. These are just small enough that with some manipulation I can fit them into a pandas data frame and keep that in memory, but I'd like to be able to train on larger datasets -- and more workers -- in the future.

At least with my naive use of petastorm, it appears that throughput is quite low. Simply iterating over a petastorm.pytorch.DataLoader can take hours, timings which make my use case somewhat intractable. Changing the worker type or number of workers did not seem to make things better or worse.

I'm materializing the dataset this way:

import numpy as np

from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.codecs import NdarrayCodec
from petastorm.unischema import Unischema, UnischemaField

train_path = f"{base_path}/train"

fields = [
    UnischemaField(
        column,
        np.float32,
        (encoder_length if column.endswith("encoder") else decoder_length,),
        NdarrayCodec(),
        False,
    )
    for column in data_result.data_train.columns
]

schema = Unischema("schema", fields)

with materialize_dataset(spark, train_path, schema):
    data_result.data_train.write.mode("overwrite").parquet(train_path)

And reading it this way:

from petastorm.pytorch import DataLoader

train_reader = make_batch_reader(
    train_path,
    num_epochs=None,
    # reader_pool_type="process",
    # workers_count=15,
    # cur_shard=hvd.rank(),
    # shard_count=hvd.size(),
)

with DataLoader(train_reader, batch_size=batch_size) as train_dataloader:
    train_dataloader_iter = iter(train_dataloader)

    for _ in range(steps_per_epoch):
        batch = next(train_dataloader_iter)

Any hints as to what I can do to improve throughput? Some option or technique I might be missing? Using BatchedDataLoader instead of DataLoader did help substantially, but I'm running into possibly memory-related errors when using that with Horovod (any insight into that would also be appreciated -- unfortunately Databricks doesn't give me much information other than telling me the process has died).

jaycunningham-8451 avatar Jun 23 '22 16:06 jaycunningham-8451

Is it possible to use a plain parquet store (do not serialize it with materialize_dataset and unischema)? It works well with large numpy arrays, but with relatively small data, the overhead of numpy serialization overhead would be too noticable.

Using make_batch_reader is the right way to go, as it will read entire row-groups in one go and deliver data to you doing minimal processing.

I'd try benchmarking for batch in train_reader, since that's what BatchDataLoader is using underneath. If iterating over train_reader is slow, there is no chance BatchDataLoader will do better.

If you can provide a runnable snippet with some mocked data that I could run "out-of-the-box", I should be able to help you get the optimal throughput.

selitvin avatar Jun 24 '22 11:06 selitvin