DALI
DALI copied to clipboard
Low reading performance of TFRecords via S3
Version
1.40.0.dev20240628
Describe the bug.
Hi,
I am testing the throughput of DALI (v1.40.0.dev20240628) when reading TFRecords via S3 and I am obtaining unexpectedly poor results.
I am scanning the ImageNet training dataset (140 GB, on a system tuned to have 30 GB of available RAM, to prevent automatic memory caching). The dataset is either in its original FILES format or in the form of TFRECORD files, each with a size of 64 MB. In both cases the dataset is read in batches of size 128.
When reading (with no decoding) from fast NVME disks, these are the results I am getting:
- FILES: 5,000 images/s
- TFRECORD: 20,000 images/s
This difference seems reasonable, since TFRECORD entails less file I/O overhead.
However, when I run the same tests with the datasets stored on the same disks and the same node, but made available via a MinIO server, the results are:
- FILES: 1,300 images/s
- TFRECORD: 300 images/s
In this second scenario, the speed of accessing FILES might be reasonable in comparison to the previous case, as accessing S3 incurs higher overhead than directly accessing the filesystem. However, I would have anticipated a much faster speed for TFRECORD, given that it is supposed to distribute access latencies across 64 MB blocks, which is significantly larger than the average 115 kB JPEG files. Surprisingly, TFRECORD is even slower than the FILES setup.
We also conducted a test using your 12 MB tfrecord example with a batch size of 47, and we achieved performance consistent with the previous results.
- Filesystem: 11,000 images/s
- S3: 300 images/s
Do you have any ideas about what could be causing the low reading performance of TFRecords via S3? Are we overlooking some optimization parameters in the TFRecord reader?
Thanks!
Minimum reproducible example
To evaluate the performance of reading the DALI_extra TFRecord example file, both from the filesystem and a pre-configured S3 server:
# read DALI_extra train from the filesystem
python3 tf_read.py --reader=tfrecord --tfrecord-file=/data/tmp/train \
--tfrecord-index=/data/tmp/train.idx --bs 47 --epochs 20
# read DALI_extra train via S3
python3 tf_read.py --reader=tfrecord --tfrecord-file=s3://tmp/train \
--tfrecord-index=s3://tmp/train.idx --bs 47 --epochs 20
Similarly, but utilizing the ImageNet training dataset:
```bash
# scan imagenet train images (original jpegs) from the filesystem
python3 tf_read.py --reader=file --file-root=/data/imagenet/train/
# scan imagenet train images (tfrecord) from then filesystem
python3 tf_read.py --reader=tfrecord --file-root=/data/imagenet/tfrecords/train/ \
--index-root=/data/imagenet/tfrecords/train_idx/
# scan imagenet train images (original jpegs) via S3
python3 tf_read.py --reader=file --file-root=s3://imagenet/train/
# scan imagenet train images (tfrecord) via S3
python3 tf_read.py --reader=tfrecord --file-root=s3://imagenet/tfrecords/train/ \
--index-root=s3://imagenet/tfrecords/train_idx/
########################################################################
# tf_read.py
########################################################################
# dali
from nvidia.dali.pipeline import pipeline_def
from nvidia.dali.plugin.base_iterator import LastBatchPolicy
from nvidia.dali.plugin.pytorch import DALIGenericIterator
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import nvidia.dali.tfrecord as tfrec
# varia
from clize import run
from tqdm import trange, tqdm
import math
import time
import boto3
import os
global_rank = int(os.getenv("RANK", default=0))
local_rank = int(os.getenv("LOCAL_RANK", default=0))
world_size = int(os.getenv("WORLD_SIZE", default=1))
def parse_s3_uri(s3_uri):
if not s3_uri.startswith("s3://"):
raise ValueError("Invalid S3 URI")
# Remove the "s3://" prefix
s3_uri = s3_uri[5:]
# Split the remaining part into bucket and prefix
parts = s3_uri.split("/", 1)
bucket_name = parts[0]
prefix = parts[1] if len(parts) > 1 else ""
return bucket_name, prefix
def list_s3_files(s3_uri):
bucket_name, prefix = parse_s3_uri(s3_uri)
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
paths = []
for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
paths.append(f"s3://{bucket_name}/{obj['Key']}")
return sorted(paths)
def read_data(
*,
reader="tfrecord",
use_gpu=False,
bs=128,
epochs=10,
file_root=None,
index_root=None,
tfrecord_file=None,
tfrecord_index=None,
):
"""Read images from tfrecords or filesystem, in a tight loop
:param reader: "file" or "tfrecord" (default: tfrecord)
:param use_gpu: enable output to GPU (default: False)
:param bs: batch size (default: 128)
:param epochs: Number of epochs (default: 10)
:param file_root: File root to be used when reading files or tfrecords
:param index_root: Root path to index files (only when reading tfrecords)
:param tfrecord_file: Single tfrecord
:param tfrecord_index: Single index file
"""
if use_gpu:
device_id = local_rank
else:
device_id = types.CPU_ONLY_DEVICE_ID
if reader == "file":
file_reader = fn.readers.file(
file_root=file_root,
name="Reader",
shard_id=global_rank,
num_shards=world_size,
pad_last_batch=True,
# speed up reading
prefetch_queue_depth=4,
# dont_use_mmap=True,
read_ahead=True,
)
chosen_reader = file_reader
elif reader == "tfrecord":
if tfrecord_file and tfrecord_index:
path = [tfrecord_file]
index_path = [tfrecord_index]
# path *= 10
# index_path *= 10
#### alternalitvely, read list of tfrecords
elif file_root.startswith("s3://"):
path = list_s3_files(file_root)
index_path = list_s3_files(index_root)
else:
path = sorted([f.path for f in os.scandir(file_root) if f.is_file()])
index_path = sorted([f.path for f in os.scandir(index_root) if f.is_file()])
tf_reader = fn.readers.tfrecord(
path=path,
index_path=index_path,
features={
"image/encoded": tfrec.FixedLenFeature([], tfrec.string, ""),
"image/class/label": tfrec.FixedLenFeature([], tfrec.int64, -1),
},
name="Reader",
shard_id=global_rank,
num_shards=world_size,
pad_last_batch=True,
# speed up reading
prefetch_queue_depth=4,
# dont_use_mmap=True,
read_ahead=True,
)
chosen_reader = tf_reader["image/encoded"], tf_reader["image/class/label"]
else:
raise ('--reader: expecting either "file" or "tfrecord"')
# create dali pipeline
@pipeline_def(
batch_size=bs,
num_threads=4,
device_id=device_id,
prefetch_queue_depth=2,
)
def get_dali_pipeline():
images, labels = chosen_reader
if device_id != types.CPU_ONLY_DEVICE_ID:
images = images.gpu()
labels = labels.gpu()
return images, labels
pl = get_dali_pipeline()
pl.build()
########################################################################
# DALI iterator
########################################################################
# produce images
shard_size = math.ceil(pl.epoch_size()["Reader"] / world_size)
steps = math.ceil(shard_size / bs)
# consume uuids to get images from DB
for _ in range(epochs):
# read data for current epoch
for _ in trange(steps):
pl.run()
pl.reset()
# parse arguments
if __name__ == "__main__":
run(read_data)
### Relevant log output
_No response_
### Other/Misc.
_No response_
### Check for duplicates
- [X] I have searched the [open bugs/issues](https://github.com/NVIDIA/DALI/issues) and have found no duplicates for this bug report