petastorm
petastorm copied to clipboard
Performance on large amounts of data
Hello!
I'm attempting to train a relatively simple transformer model on a large amount of data (35m rows, 20 features). The data have been materialized as parquet, where each column is an array of size ~30. These are just small enough that with some manipulation I can fit them into a pandas data frame and keep that in memory, but I'd like to be able to train on larger datasets -- and more workers -- in the future.
At least with my naive use of petastorm, it appears that throughput is quite low. Simply iterating over a petastorm.pytorch.DataLoader
can take hours, timings which make my use case somewhat intractable. Changing the worker type or number of workers did not seem to make things better or worse.
I'm materializing the dataset this way:
import numpy as np
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.codecs import NdarrayCodec
from petastorm.unischema import Unischema, UnischemaField
train_path = f"{base_path}/train"
fields = [
UnischemaField(
column,
np.float32,
(encoder_length if column.endswith("encoder") else decoder_length,),
NdarrayCodec(),
False,
)
for column in data_result.data_train.columns
]
schema = Unischema("schema", fields)
with materialize_dataset(spark, train_path, schema):
data_result.data_train.write.mode("overwrite").parquet(train_path)
And reading it this way:
from petastorm.pytorch import DataLoader
train_reader = make_batch_reader(
train_path,
num_epochs=None,
# reader_pool_type="process",
# workers_count=15,
# cur_shard=hvd.rank(),
# shard_count=hvd.size(),
)
with DataLoader(train_reader, batch_size=batch_size) as train_dataloader:
train_dataloader_iter = iter(train_dataloader)
for _ in range(steps_per_epoch):
batch = next(train_dataloader_iter)
Any hints as to what I can do to improve throughput? Some option or technique I might be missing? Using BatchedDataLoader
instead of DataLoader
did help substantially, but I'm running into possibly memory-related errors when using that with Horovod (any insight into that would also be appreciated -- unfortunately Databricks doesn't give me much information other than telling me the process has died).
Is it possible to use a plain parquet store (do not serialize it with materialize_dataset
and unischema)? It works well with large numpy arrays, but with relatively small data, the overhead of numpy serialization overhead would be too noticable.
Using make_batch_reader
is the right way to go, as it will read entire row-groups in one go and deliver data to you doing minimal processing.
I'd try benchmarking for batch in train_reader
, since that's what BatchDataLoader
is using underneath. If iterating over train_reader
is slow, there is no chance BatchDataLoader will do better.
If you can provide a runnable snippet with some mocked data that I could run "out-of-the-box", I should be able to help you get the optimal throughput.