petastorm
petastorm copied to clipboard
Regarding performance of petastorm with shuffle
Hi, I have 24GB data set (1.28 million images), stored as parquet files with 800 mb each with total as 31 parquet files. I have 60 core cpu machine with 120GB RAM.
I use below code
def just_read_batch(dataset_url):
with make_batch_reader(dataset_url, num_epochs=1, workers_count=16, shuffle_row_drop_partitions=10) as train_reader:
i = 0
for schema_view in train_reader:
i += len(schema_view.imagename)
print(i)
parquet with shuffle (shuffle_row_drop_partitions = 5) : 202 seconds (PERFORMANCE IS 6k image per second) run again with shuffle : 200 seconds run again with shuffle : 200 seconds
with (shuffle_row_drop_partitions = 10) : 361 seconds run again : 370 seconds
how can I improve the performance ? I need shuffled data on each epoch. please check my code snippet again and suggest me whether I need to change configuration to get better performance ?
Shorter version
- Don't set
shuffle_row_drop_partitions
: I would expect you to get 30K images per second, unless network/disk saturates. - If your images are encoded using Petastorm codec, using
make_batch_reader
would not decode them for you at the moment (a bug : you would get unusable binary blobs). You should usemake_reader
which will handle the decoding correctly. Decoding the images however are likely to stress your CPUs and you may see lower throughput. - Place a shuffling queue after the reader to shuffle your data.
Longer version
shuffle_row_drop_partitions
is a very wasteful parameter: it decreases correlation between produced samples at the expense of network traffic. In case of shuffle_row_drop_partitions=10
, 1/10 of the downloaded samples in a rowgroup are used. (the same rowgroup would be loaded later and another 1/10 will be used next time, and so on). I'd suggest to use this switch only in extreme cases due to its wastefulness.
Reading from Apache Parquet format can be done only in atomic units of a row-group. shuffle_row_groups=True
results in random order of row-groups being consumed, but there is still inter-row-group correlation issue. Here are some ideas of reducing correlation between samples:
- Placing a shuffling queue after the reader. Usually we rely on Tensorflow shuffling queue (
tf_tensors
takeshuffling_queue_capacity
parameter, or you can usetf.data
facilities). I think you can also useRandomShufflingBuffer
inpetastorm.reader_impl.shuffling_buffer
- we did not intend it to be a public class, but I think it would work as a pure-Python shuffling queue implementation. - Reduce size of rowgroup (hence decrease amount of correlated samples). If you generate your dataset using Petastorm,
materialize_dataset
takesrow_group_size_mb
. Reducing its value will decrease number of rows in a rowgroup (note that with an off-the-shelf parquet Java library implementation you can not get less then 100 rows. We have a patch I can share with you that would allow to overcome that issue) - Depends on the nature of data you are using, shuffling the rows in the dataset before writing it out may help (we preshuffle this with some of our datasets).
I am a bit confused with the fact that you use make_batch_reader
: you mentioned you work with images, but for images, I would expect you to use Tensor and have your own custom Unischema. make_batch_reader
however is designed to work with standard Apache Parquet types (I think the image fields are currently unusable in this setup (we will fix it soon)).
@selitvin : Thanks for the reply
a) I already had an image data in native parquet files. so used a make_batch_reader
b) I have a question, Just keeping shuffle_row_groups
=True, will just change order of the rowgroup. But untill one rowgroup is completely consumed, it wont give other row group data right ? so, If I want to shuffle data within rowgroup, I have to use external shuffling queue as shuffle_row_drop_partitions
is a very wasteful parameter?
b) You are correct.
I wonder, in your case, how many rows do you get in a single batch (that would be the size of your rowgroup)?
Do you think post shuffling queue is a sufficient solution for your use-case?
At some stage we were considering to implement an on-local-disk shuffling but never got to it. Would it be a good idea for your scenario?
shuffle_row_drop_partitions
is wasteful. But for lower values (shuffle_row_drop_partitions=2 e.g.), you would be reducing your batch size by the factor of 2, which is effective in reducing samples correlation.
@selitvin : yes, my batch size is number of rows in rowgroup. And I have another doubt with petastrom performance with ''local-disk" cache.
I have the same 24GB data in S3a, now when I read on local file system first epoch takes 100 seconds, from second epoch its 40 seconds. But I cannot use local, I have data in s3a, so I have started using ''local-disk" cache.
shards=30 and I keep cache size limit as 30GB (as data 24GB). I saw that it created ".val" files. First epoch took 186 seconds. second 80 seconds third 64 seconds.
still I am not getting the performance as fast as local read performance in the subsequent epoch. What's the problem ? Any configuration I can change to improve this performance ?
Sorry for the delay in my response.
If I understand you correctly you were expecting 40 seconds per epoch while you observed 64s, i.e. 1.5 slowdown.
Up until now, we did not try to look into the bottlenecks of reading from cache. We are using diskcache
library for caching, and not sure if it is optimized well enough for our usecase. The reason we chose this library was because of its disk space management features, but in your case, you are not using them.
Implementing a fast cache that does not manage disk usage should be trivial. Maybe it's best for you to provide your own implementation of CacheBase
interface? You can borrow serialization ideas from petastorm/local_disk_arrow_table_cache.py
, which should be fairly fast.
Then it should be an easy task to make make_batch_reader
aware of your cache.
Another thought: if, by any chance you have enough memory on your machine, you could implement in-memory caching... No doubt there won't be anything faster than that...
If you come up with some useful cache implementation, we would be happy to accept your contribution :) !
This 2-pass shuffle algo could be a starting point for shuffling improvements: https://blog.janestreet.com/how-to-shuffle-a-big-dataset/?utm_source=share
If I understand correctly, the 2-pass shuffling mentioned in the article describes an algorithm for shuffling that utilizes additional disk storage. A preprocessing step that reshuffles and persists shuffled data is definitely something that can be done be a user but is not in the scope of the responsibility of the Petastorm library..
@selitvin we are using pytorch, and need shuffle rows also. As per your suggestion, we can shuffle with tf.Dataset like https://github.com/uber/petastorm/issues/329#issuecomment-472701296. What's the recommended way to convert data from tf.Dataset to pytorch?