DALI
DALI copied to clipboard
Wrong accuracy with external data in a distributed setting
Hello,
First of all, many thanks for the great library.
I'm using the custom DALI dalaloader object below, which is in turn leveraging an external source ExternalInputIterator
. The latter is reading the files train_tasksets.txt
(containing paths to images) and val_tasksets.txt
(containing labels). I'm running that code in a distributed setting using Horovod.
My issue: when training a ResNet-50 model with the ImageNet dataset (blurred version), I get a top1 accuracy which is too high (above > 80%), which makes me think something is wrong with the data pipeline. Do you see something fishy with the code below?
I added is a "wrapper" to yield the data returned by the DALIGenericIterator
.
What I tried: the accuracy achieved with the exact same dataset passed to fn.readers.file
(without using an external source) is correct (about 75.3). Then the issue must come from my ExternalInputIterator
.
class ExternalInputIterator:
def __init__(self, data_dir, split, batch_size, shard_id, num_shards):
self.batch_size = batch_size
self.shard_id = shard_id
self.num_shards = num_shards
with open(f"{data_dir}/{split}_taskset.txt", 'r') as f:
self.files = [line.rstrip() for line in f if line != '']
with open(f"{data_dir}/{split}_taskset_labels.txt", 'r') as f:
self.labels = [line.rstrip() for line in f if line != '']
self.data_set_len = len(self.files)
inf = self.data_set_len * shard_id // num_shards
sup = self.data_set_len * (shard_id + 1) // num_shards
self.files = np.array(self.files[inf:sup])
self.labels = np.array(self.labels[inf:sup])
self.n = len(self.files)
self.full_iterations = self.n // batch_size
self.iterations = math.ceil(self.n / batch_size)
def __iter__(self):
self.i = 0
perm = np.random.permutation(len(self.files))
self.files = self.files[perm]
self.labels = self.labels[perm]
return self
def __next__(self):
if self.i >= self.n:
self.__iter__()
raise StopIteration
batch_files = []
batch_labels = []
for _ in range(self.batch_size):
sample_idx = self.i % self.n
with open(self.files[sample_idx], 'rb') as f:
batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
batch_labels.append(np.int64([self.labels[sample_idx]]))
self.i += 1
return batch_files, batch_labels
class DaliDataLoader(object):
def __init__(self, data_dir, batch_size, num_workers=1,
device_id=0, shard_id=0, num_shards=1, precision=32,
training=True, **kwargs):
self.batch_size = batch_size
decoder_device, device = ("mixed", "gpu") if cuda else ("cpu", "cpu")
crop_size = 224
val_size = 256
img_type = types.FLOAT16 if precision == 16 else types.FLOAT
# ask nvJPEG to preallocate memory for the biggest sample in ImageNet for CPU and GPU to avoid reallocations in runtime
device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
# ask HW NVJPEG to allocate memory ahead for the biggest image in the data set to avoid reallocations in runtime
preallocate_width_hint = 5980 if decoder_device == 'mixed' else 0
preallocate_height_hint = 6430 if decoder_device == 'mixed' else 0
split = 'train' if training else 'val'
self.external_data = ExternalInputIterator(
data_dir, split, batch_size, shard_id, num_shards)
pipeline = Pipeline(batch_size, num_workers, device_id)
with pipeline:
// I get the expected accuracy using the reader directly
#inputs, target = fn.readers.file(file_root=data_dir,
# shard_id=shard_id,
# num_shards=num_shards,
# random_shuffle=training,
# pad_last_batch=True,
# name="Reader")
inputs, target = fn.external_source(
source=self.external_data, num_outputs=2)
if training:
images = fn.decoders.image_random_crop(inputs,
device=decoder_device, output_type=types.RGB,
device_memory_padding=device_memory_padding,
host_memory_padding=host_memory_padding,
preallocate_width_hint=preallocate_width_hint,
preallocate_height_hint=preallocate_height_hint,
random_aspect_ratio=[
0.8, 1.25],
random_area=[0.1, 1.0],
num_attempts=100)
images = fn.resize(images,
device=device,
resize_x=crop_size,
resize_y=crop_size,
interp_type=types.INTERP_TRIANGULAR)
mirror = fn.random.coin_flip(probability=0.5)
else:
images = fn.decoders.image(inputs,
device=decoder_device,
output_type=types.RGB)
images = fn.resize(images,
device=device,
size=val_size,
mode="not_smaller",
interp_type=types.INTERP_TRIANGULAR)
mirror = False
images = fn.crop_mirror_normalize(images.gpu() if cuda else images,
dtype=img_type,
output_layout="CHW",
crop=(crop_size, crop_size),
mean=[0.485 * 255, 0.456 *
255, 0.406 * 255],
std=[0.229 * 255, 0.224 *
255, 0.225 * 255],
mirror=mirror)
if cuda:
target = target.gpu()
pipeline.set_outputs(images, target)
self.iterator = DALIGenericIterator(
pipeline,
["x", "y"],
auto_reset=True
)
def __len__(self):
return self.external_data.full_iterations
def __iter__(self):
for token in self.iterator:
x = token[0]['x']
y = token[0]['y'].squeeze().long()
yield x, y
This is how I create my data loaders.
train_loader = DaliDataLoader(
args.train_dir, allreduce_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=True)
val_loader = DaliDataLoader(
args.val_dir, args.val_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=False)
What do I do wrong?
Thanks!
Hi @thomas-bouvier,
my suggestion would be to check the behaviour around the end of the epoch.
By default DALIGenericIterator uses Fill policy, and the reader you used is configured as pad_last_batch=True
- where the last sample is repeated.
In the case of your external source iterator, you may fill the last batch with up to batch_size - 1
samples from the beginning of the epoch. This may impact the results if the epoch size is not divisible by the batch size. You also have __len__
defined that suggest using only full batches, in that case you are dropping some samples, maybe it impacts the validation?
I recommend the sharding section of the doc to see what are the possible options: https://docs.nvidia.com/deeplearning/dali/main-user-guide/docs/advanced_topics_sharding.html
I would certainly check what is happening around the epoch start/end, you can, for example use both the external source operator and the file reader without shuffling, and see what images are returned.
We recently introduced some experimental debugging capabilities, but you would need to run the pipeline using the .run()
method, as debugging via framework iterators is not supported yet: https://docs.nvidia.com/deeplearning/dali/main-user-guide/docs/advanced_topics_sharding.html
Still there are some printing capabilities that can be used with outputs of regular pipeline.
Some other tip for debugging is to assign distinct label to every file, so you can easily track what is happening.
Hello @klecki,
Thank you for your detailed answer. I used your tip of assigning distinct labels to every file to help debugging. I was then able to identify the behavior at the end of the epoch. I tweaked my function as follows to keep a partial batch only (before, I was filling it reusing samples from the beginning of the epoch using the %
operation). This complies with my reader's behavior, for the end of the epoch at least.
def __next__(self):
if self.i >= self.n:
self.__iter__()
raise StopIteration
batch_files = []
batch_labels = []
for _ in range(self.batch_size):
if self.i >= self.n:
break
sample_idx = self.i
with open(self.files[sample_idx], 'rb') as f:
batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
batch_labels.append(np.int64([self.labels[sample_idx]]))
self.i += 1
return batch_files, batch_labels
However, this doesn't fix the problem. I still achieve an accuracy 10% higher with my iterator.
The pipeline must be returning wrong images for some reason... I will try to compare the two outputs. First thing I observed though: the first batch doesn't contain the same images when turning the shuffling off. I'm at a total loss here.
Hi, sorry, I linked the wrong page for the debug mode: https://docs.nvidia.com/deeplearning/dali/user-guide/docs/examples/general/debug_mode.html
Can you elaborate on the:
First thing I observed though: the first batch doesn't contain the same images when turning the shuffling off. I'm at a total loss here.
Is it comparing to a previous run, or to your external source input? If you had shuffling turned on, the first epoch would also be shuffled.
You can also see if passing fixed seeds to DALI pipeline and numpy helps with repeatability of your reproduction.
You can also check the validation with external source against the network trained with file reader (or with other preprocessing) and other way round. That could probably help to narrow the problem down to just training or just validation.
I'm coming back to this issue, which I still don't understand. I will provide more info tomorrow. 10% of difference in accuracy on ImageNet is huge, so I would be surprised that the behavior towards the epoch is the issue. I observe the higher accuracy both on validation and training curves.
Just to recall, this is the implementation of my current iterator, which now keeps partial batches at the end of the epoch.
class ExternalInputIterator:
def __init__(self, data_dir, split, batch_size, shard_id, num_shards):
self.split = split
self.batch_size = batch_size
self.shard_id = shard_id
self.num_shards = num_shards
with open(f"{data_dir}/{split}_paths.txt", 'r') as f:
self.files = [line.rstrip() for line in f if line != '']
with open(f"{data_dir}/{split}_labels.txt", 'r') as f:
self.labels = [line.rstrip() for line in f if line != '']
self.data_set_len = len(self.files)
# based on the shard_id and total number of GPUs - world size
# get proper shard
inf = self.data_set_len * shard_id // num_shards
sup = self.data_set_len * (shard_id + 1) // num_shards
self.files = np.array(self.files[inf:sup])
self.labels = np.array(self.labels[inf:sup])
self.n = len(self.files)
self.full_iterations = self.n // batch_size
self.iterations = math.ceil(self.n / batch_size)
def __iter__(self):
self.i = 0
if self.split == 'train':
perm = np.random.permutation(len(self.files))
self.files = self.files[perm]
self.labels = self.labels[perm]
return self
def __next__(self):
if self.i >= self.n:
self.__iter__()
raise StopIteration
batch_files = []
batch_labels = []
for _ in range(self.batch_size):
# Use partial batch
if self.i >= self.n:
break
sample_idx = self.i % self.n
with open(self.files[sample_idx], 'rb') as f:
batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
batch_labels.append(np.int64([self.labels[sample_idx]]))
self.i += 1
return batch_files, batch_labels
Hi @thomas-bouvier,
I created a toy example to see if there is something wrong with shuffling or sharding but it seems to work as expected:
import nvidia.dali.fn as fn
from nvidia.dali import pipeline_def
from nvidia.dali.plugin.pytorch import DALIGenericIterator
import numpy as np
import nvidia.dali.types as types
import math
class ExternalInputIterator:
def __init__(self, batch_size, shard_id, num_shards, split = 'train'):
self.split = split
self.batch_size = batch_size
self.shard_id = shard_id
self.num_shards = num_shards
self.files = [i for i in range (1000)]
self.labels = [i for i in range (1000)]
self.data_set_len = len(self.files)
# based on the shard_id and total number of GPUs - world size
# get proper shard
inf = self.data_set_len * shard_id // num_shards
sup = self.data_set_len * (shard_id + 1) // num_shards
self.files = np.array(self.files[inf:sup])
self.labels = np.array(self.labels[inf:sup])
self.n = len(self.files)
self.full_iterations = self.n // batch_size
self.iterations = math.ceil(self.n / batch_size)
def __iter__(self):
self.i = 0
if self.split == 'train':
perm = np.random.permutation(len(self.files))
self.files = self.files[perm]
self.labels = self.labels[perm]
return self
def __next__(self):
if self.i >= self.n:
self.__iter__()
raise StopIteration
batch_files = []
batch_labels = []
for _ in range(self.batch_size):
# Use partial batch
if self.i >= self.n:
break
sample_idx = self.i % self.n
batch_files.append(np.int64([self.files[sample_idx]]))
batch_labels.append(np.int64([self.labels[sample_idx]]))
self.i += 1
return batch_files, batch_labels
batch_size = 5
shards_num = 3
epochs = 3
for _ in range(epochs):
for shard_id in range(shards_num):
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def callable_pipeline():
files, labels = fn.external_source(source=ExternalInputIterator(batch_size, shard_id, shards_num), num_outputs=2,
dtype=[types.INT64, types.INT64])
return files, labels
pipe_new = callable_pipeline()
train_data = DALIGenericIterator(
[pipe_new], ['data', 'labels'],
)
out = []
for i, d in enumerate(train_data):
out.append(np.array(d[0]["data"]))
concat_out = np.reshape(np.concatenate(out), -1)
print(concat_out)
out = sorted(concat_out)
print(len(out), len(set(out)), min(out), max(out))
Thank you for your example @JanuszL. It is also working for me. Actually, every toy example I'm trying seems to be working perfectly fine. However with the full application (using ResNet-50 on ImageNet-blurred), I still achieve a 10% difference in final accuracy.
I came up with 3 different data loaders to reproduce the issue. The baseline is the PyTorch default data loader: 76% accuracy.
- DataLoader using the DALI reader + the
file_root
parameter. The application is reading all the files stored infile_root
. This is working, I achieve the same accuracy. - DataLoader using the DALI reader + the
file_list
parameter, pointing to a whitespace-separatedfilename label
pair per line. The achieved accuracy is too high (> 80%). - DataLoader using the external iterator. I'm using the same whitespace-separated
filename label
pair to read from. The achieved accuracy is too high (> 80%).
Item 2) is not using the iterator, and is still achieving a (too) high accuracy. So I'll double check the content of the paths/labels file.
Here is the code. Dataloaders 2, 3 and 4 can be found by searching "===1", "===2", and "===3".
import torch
import argparse
import torch.backends.cudnn as cudnn
import torch.multiprocessing as mp
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
import horovod.torch as hvd
import os
import math
import wandb
import collections
import numpy as np
import types
import nvidia.dali.types as types
import nvidia.dali.fn as fn
import random
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms, models
from tqdm import tqdm
from datetime import datetime
from nvidia.dali import pipeline_def
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy
from nvidia.dali.pipeline import Pipeline
from random import shuffle
from continuum.datasets import ImageNet1000
# Training settings
parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
help='path to training data')
parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),
help='path to validation data')
parser.add_argument('--log-dir', default='./logs',
help='tensorboard log directory')
parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',
help='checkpoint file format')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
parser.add_argument('--batches-per-allreduce', type=int, default=1,
help='number of batches processed locally before '
'executing allreduce across workers; it multiplies '
'total batch size.')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
help='apply gradient predivide factor in optimizer (default: 1.0)')
# Default settings from https://arxiv.org/abs/1706.02677.
parser.add_argument('--batch-size', type=int, default=32,
help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=90,
help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.0125,
help='learning rate for a single GPU')
parser.add_argument('--warmup-epochs', type=float, default=5,
help='number of warmup epochs')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.00005,
help='weight decay')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42,
help='random seed')
class ExternalInputIterator:
def __init__(self, split, batch_size, shard_id, num_shards, shuffle):
self.split = split
self.batch_size = batch_size
self.shard_id = shard_id
self.num_shards = num_shards
self.shuffle = shuffle
with open(f"{split}_paths.txt", 'r') as f:
self.files = [line.rstrip() for line in f if line != '']
with open(f"{split}_labels.txt", 'r') as f:
self.labels = [line.rstrip() for line in f if line != '']
self.data_set_len = len(self.files)
# based on the shard_id and total number of GPUs - world size
# get proper shard
inf = self.data_set_len * shard_id // num_shards
sup = self.data_set_len * (shard_id + 1) // num_shards
self.files = np.array(self.files[inf:sup])
self.labels = np.array(self.labels[inf:sup])
self.n = len(self.files)
self.full_iterations = self.n // batch_size
self.iterations = math.ceil(self.n / batch_size)
def __iter__(self):
self.i = 0
if self.split == 'train' and self.shuffle:
perm = np.random.permutation(len(self.files))
self.files = self.files[perm]
self.labels = self.labels[perm]
return self
def __next__(self):
if self.i >= self.n:
self.__iter__()
raise StopIteration
batch_files = []
batch_labels = []
#batch_paths = []
for _ in range(self.batch_size):
# Drop the batch
#if self.i >= self.n:
# self.__iter__()
# raise StopIteration
# Use partial batch
if self.i >= self.n:
break
sample_idx = self.i % self.n
with open(self.files[sample_idx], 'rb') as f:
batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
batch_labels.append(np.int64([self.labels[sample_idx]]))
#batch_paths.append(self.files[sample_idx])
self.i += 1
return batch_files, batch_labels
#return batch_files, batch_labels, batch_paths
class DaliDataLoader(object):
def __init__(self, data_dir, task_id, cuda, batch_size, num_workers=1,
device_id=0, shard_id=0, num_shards=1, precision=32,
training=True, iterator=False, discover_files=False, shuffle=False, **kwargs):
self.batch_size = batch_size
self.iterator = iterator
decoder_device, device = ("mixed", "gpu") if cuda else ("cpu", "cpu")
crop_size = 224
val_size = 256
img_type = types.FLOAT16 if precision == 16 else types.FLOAT
# ask nvJPEG to preallocate memory for the biggest sample in ImageNet for CPU and GPU to avoid reallocations in runtime
device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
# ask HW NVJPEG to allocate memory ahead for the biggest image in the data set to avoid reallocations in runtime
preallocate_width_hint = 5980 if decoder_device == 'mixed' else 0
preallocate_height_hint = 6430 if decoder_device == 'mixed' else 0
split = 'train' if training else 'val'
pipeline = Pipeline(batch_size, num_workers, device_id)
with pipeline:
if iterator:
self.external_data = ExternalInputIterator(
split, batch_size, shard_id, num_shards, shuffle)
inputs, target = fn.external_source(
source=self.external_data, num_outputs=2)
else:
if discover_files:
inputs, target = fn.readers.file(file_root=data_dir,
shard_id=shard_id,
num_shards=num_shards,
random_shuffle=(training and shuffle),
pad_last_batch=True,
name="Reader")
else:
# Testing this
inputs, target = fn.readers.file(file_list=f"{split}_paths_labels.txt",
shard_id=shard_id,
num_shards=num_shards,
random_shuffle=(training and shuffle),
pad_last_batch=True,
name="Reader")
if training:
images = fn.decoders.image_random_crop(inputs,
device=decoder_device, output_type=types.RGB,
device_memory_padding=device_memory_padding,
host_memory_padding=host_memory_padding,
preallocate_width_hint=preallocate_width_hint,
preallocate_height_hint=preallocate_height_hint,
random_aspect_ratio=[
0.8, 1.25],
random_area=[0.1, 1.0],
num_attempts=100)
images = fn.resize(images,
device=device,
resize_x=crop_size,
resize_y=crop_size,
interp_type=types.INTERP_TRIANGULAR)
mirror = fn.random.coin_flip(probability=0.5)
else:
images = fn.decoders.image(inputs,
device=decoder_device,
output_type=types.RGB)
images = fn.resize(images,
device=device,
size=val_size,
mode="not_smaller",
interp_type=types.INTERP_TRIANGULAR)
mirror = False
images = fn.crop_mirror_normalize(images.gpu() if cuda else images,
dtype=img_type,
output_layout="CHW",
crop=(crop_size, crop_size),
mean=[0.485 * 255, 0.456 *
255, 0.406 * 255],
std=[0.229 * 255, 0.224 *
255, 0.225 * 255],
mirror=mirror)
if cuda:
target = target.gpu()
pipeline.set_outputs(images, target)
self.dali_iterator = DALIGenericIterator(
pipeline,
["x", "y"],
last_batch_policy=LastBatchPolicy.PARTIAL,
auto_reset=True
)
def __len__(self):
if self.iterator:
return self.external_data.iterations
else:
return len(self.dali_iterator)
def __iter__(self):
for token in self.dali_iterator:
x = token[0]['x']
y = token[0]['y'].squeeze(-1).long()
yield x, y
def train(epoch):
model.train()
train_loss = Metric('train_loss')
train_accuracy = Metric('train_accuracy')
with tqdm(total=len(train_loader),
desc='Train Epoch #{}'.format(epoch + 1),
disable=not verbose) as t:
for batch_idx, (data, target) in enumerate(train_loader):
adjust_learning_rate(epoch, batch_idx)
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
# Split data into sub-batches of size batch_size
for i in range(0, len(data), args.batch_size):
data_batch = data[i:i + args.batch_size]
target_batch = target[i:i + args.batch_size]
output = model(data_batch)
train_accuracy.update(accuracy(output, target_batch))
loss = F.cross_entropy(output, target_batch)
train_loss.update(loss)
# Average gradients among sub-batches
loss.div_(math.ceil(float(len(data)) / args.batch_size))
loss.backward()
# Gradient is applied across all ranks
optimizer.step()
t.set_postfix({'loss': train_loss.avg.item(),
'accuracy': 100. * train_accuracy.avg.item()})
t.update(1)
if log_writer:
log_writer.add_scalar('train/loss', train_loss.avg, epoch)
log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch)
if hvd.rank() == 0 and hvd.local_rank() == 0:
wandb.log({f"epoch": epoch,
f"train_loss": train_loss.avg,
f"train_prec1": train_accuracy.avg})
def validate(epoch):
model.eval()
val_loss = Metric('val_loss')
val_accuracy = Metric('val_accuracy')
with tqdm(total=len(val_loader),
desc='Validate Epoch #{}'.format(epoch + 1),
disable=not verbose) as t:
with torch.no_grad():
for data, target in val_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
output = model(data)
val_loss.update(F.cross_entropy(output, target))
val_accuracy.update(accuracy(output, target))
t.set_postfix({'loss': val_loss.avg.item(),
'accuracy': 100. * val_accuracy.avg.item()})
t.update(1)
if log_writer:
log_writer.add_scalar('val/loss', val_loss.avg, epoch)
log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch)
if hvd.rank() == 0 and hvd.local_rank() == 0:
wandb.log({f"epoch": epoch,
f"val_loss": val_loss.avg,
f"val_prec1": val_accuracy.avg})
# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
def adjust_learning_rate(epoch, batch_idx):
if epoch < args.warmup_epochs:
epoch += float(batch_idx + 1) / len(train_loader)
lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
elif epoch < 30:
lr_adj = 1.
elif epoch < 60:
lr_adj = 1e-1
elif epoch < 80:
lr_adj = 1e-2
else:
lr_adj = 1e-3
for param_group in optimizer.param_groups:
param_group['lr'] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj
def accuracy(output, target):
# get the index of the max log-probability
pred = output.max(1, keepdim=True)[1]
return pred.eq(target.view_as(pred)).cpu().float().mean()
def save_checkpoint(epoch):
if hvd.rank() == 0:
filepath = args.checkpoint_format.format(epoch=epoch + 1)
state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
}
torch.save(state, filepath)
# Horovod: average metrics from distributed training.
class Metric(object):
def __init__(self, name):
self.name = name
self.sum = torch.tensor(0.)
self.n = torch.tensor(0.)
def update(self, val):
self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)
self.n += 1
@property
def avg(self):
return self.sum / self.n
if __name__ == '__main__':
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
allreduce_batch_size = args.batch_size * args.batches_per_allreduce
hvd.init()
torch.manual_seed(args.seed)
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
if hvd.rank() == 0 and hvd.local_rank() == 0:
wandb.init(project="horovod-baselines")
run_name = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}-{wandb.run.name}"
wandb.run.name = run_name
cudnn.benchmark = True
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,
name='resume_from_epoch').item()
# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0
# Horovod: write TensorBoard logs on first worker.
log_writer = SummaryWriter(args.log_dir) if hvd.rank() == 0 else None
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(4)
kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}
# When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
# issues with Infiniband implementations that are not fork-safe
if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
kwargs['multiprocessing_context'] = 'forkserver'
"""
# file_root parameter
# ===1
train_loader = DaliDataLoader(
args.train_dir, 0, True, allreduce_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=True, iterator=False, discover_files=True, shuffle=True)
val_loader = DaliDataLoader(
args.val_dir, 0, True, args.val_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=False, iterator=False, discover_files=True, shuffle=False)
# file_list parameter
# ===2
train_loader = DaliDataLoader(
args.train_dir, 0, True, allreduce_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=True, iterator=False, shuffle=True)
val_loader = DaliDataLoader(
args.val_dir, 0, True, args.val_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=False, iterator=False, shuffle=False)
"""
# iterator
# ===3
train_loader = DaliDataLoader(
args.train_dir, 0, True, allreduce_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=True, iterator=True, shuffle=True)
val_loader = DaliDataLoader(
args.val_dir, 0, True, args.val_batch_size,
device_id=hvd.local_rank(), shard_id=hvd.rank(),
num_shards=hvd.size(), training=False, iterator=True, shuffle=False)
# Set up standard ResNet-50 model.
model = models.resnet50()
# By default, Adasum doesn't need scaling up learning rate.
# For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce
lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1
if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = args.batches_per_allreduce * hvd.local_size()
# Horovod: scale learning rate by the number of GPUs.
optimizer = optim.SGD(model.parameters(),
lr=(args.base_lr *
lr_scaler),
momentum=args.momentum, weight_decay=args.wd)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters(),
compression=compression,
backward_passes_per_step=args.batches_per_allreduce,
op=hvd.Adasum if args.use_adasum else hvd.Average,
gradient_predivide_factor=args.gradient_predivide_factor)
# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast weights to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
filepath = args.checkpoint_format.format(epoch=resume_from_epoch)
checkpoint = torch.load(filepath)
model.load_state_dict(checkpoint['model'])
optimizer.load_state_dict(checkpoint['optimizer'])
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
for epoch in range(resume_from_epoch, args.epochs):
train(epoch)
validate(epoch)
#save_checkpoint(epoch)
Hi @thomas-bouvier,
- and 2) should be functionally equivalent. When the
file_root
is provided DALI traverse the root directory, assigns a unique label to each subdirectory, and includes all images from it. When thefile_list
is provided it is used directly to build the list of all files. It seems that in your case the file_list doesn't reflect the file you have on your disc. What you can do is to:
print(pipeline.reader_meta())
for the cases with file_root
and file_list
to compare if the reader sees the same amount of files.
1 and 2 return the same string:
{'Reader': {'epoch_size': 1281066, 'epoch_size_padded': 1281072, 'number_of_shards': 8, 'shard_id': 1, 'pad_last_batch': 1, 'stick_to_shard': 0}}
I still have to check the content of file_list
then.
How could I get the list of image paths returned with file_root
? This would be useful to have for the next step :)
Hi @thomas-bouvier,
Please check the get_property
operator to get the source info of the returned tensor. The metadata is not propagated through the operator so you need to call it indirectly on the output from the reader, like:
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import numpy as np
max_batch_size = 2
@pipeline_def
def simple_pipeline():
rgb_files, _ = fn.readers.file(file_root=YOUR_PATH)
info = fn.get_property(rgb_files, key="source_info")
info = fn.pad(info)
return info
def _uint8_tensor_to_string(t):
return np.array(t).tobytes().decode()
pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)
pipe.build()
o = pipe.run()
print(_uint8_tensor_to_string(o[0][0]))
Thank you. I'm trying right now. However I'm not sure of anything given the issues I have. How do I go through the whole dataset once using this example, having each image exactly once?
A question by the way, is it correct to use len()
on a DALIGenericIterator
?
Hi @thomas-bouvier,
A question by the way, is it correct to use len() on a DALIGenericIterator?
Yes, it should work. It returns the number of iterations for given iterator instances (not the number of samples) - see https://github.com/NVIDIA/DALI/blob/main/dali/python/nvidia/dali/plugin/base_iterator.py#L439.
Thanks. Something is definitely wrong with the content of file_list
.
I run my experiments on a single machine with 8 workers = 8 GPUs. What should be the content of file_list
? Should I use one file with the N paths or 8 different files with 1/8 of the paths in each of them?
Hi @thomas-bouvier,
Please use one file with the whole data set description. The reader will shard it later.
Ok, my mistake. I got confused with my previous message. As stated in the original message of this issue, only the external iterator has an unexpected result. Here are the updated results. Baseline is the PyTorch DataLoader, 76% accuracy, orange on the plot.
- DataLoader using the DALI reader + the file_root parameter. The application is reading all the files stored in
file_root
. This is working, I achieve the same accuracy.===1
in the code, yellow on the plot. - DataLoader using the DALI reader + the file_list parameter, pointing to a whitespace-separated
filename label
pair per line. This is working, I achieve the same accuracy.===2
in the code, green on the plot. - DataLoader using the external iterator. I'm using the same whitespace-separated
filename label
pair to read from. The achieved accuracy is too high (> 80%).===3
in the code, red on the plot.
This results shows that only the external iterator has an inconsistent accuracy. Thus the content of file_list
is not the problem.
Here are the plots:
I wanted to use the external iterator to eventually load the training data dynamically, but my colleagues can't move on with that approach as this simple ImageNet use case doesn't achieve the expected accuracy. I just don't understand what can go wrong with such a simple use case. My last test will be to try printing paths. If this doesn't work out, what other alternative could we use as a replacement for the DALI external iterator?
Thank you :)
Hi @thomas-bouvier ,
I'm happy that the file reader works as expected. In the case of the external source, you need to make sure that in each epoch all samples from the data set are returned and there are no repetitions. Printing or dumping the loaded file names into the file would be probably the best way to go now.
I did some more testing with the external iterator. I attached a unique id
to each file, that I printed in my training loop. Each of my 8 GPUs writes to a local file. I then merged the 8 files for each epoch.
What I observed: for every epoch, the training loop sees 1120932 samples. But they are not necessarily different, as every sample is seen 8 times.
Here is my hypothesis given the code above: each GPU generates a permutation with the same seed, so each GPU is working with the exact same samples, producing a higher accuracy in the end. Would that make sense?
def __iter__(self):
self.i = 0
if self.split == 'train' and self.shuffle:
perm = np.random.permutation(len(self.files))
self.files = self.files[perm]
self.labels = self.labels[perm]
return self
Hi @thomas-bouvier,
If I understand correctly:
inf = self.data_set_len * shard_id // num_shards
sup = self.data_set_len * (shard_id + 1) // num_shards
self.files = np.array(self.files[inf:sup])
self.labels = np.array(self.labels[inf:sup])
is supposed to make sure that the data set is split into the GPU-number of shards so even if the permutations (randomness) within each shard are similar it should not lead to repeating samples inside the epoch. One more thought comes to my mind. With this approach, each shard sees only a subset of labels. Can you preshuffle the whole data set globally first and then shard it, otherwise the first shard with see only labels 0-1000/GPU_num, and so on.
Hey @JanuszL,
You're right. I got confused once again. The ids I attached to files were not sharded correctly, resulting in a mismatch between actual samples and ids. I confirm each id (and thus sample) is seen exactly once.
Your thought is really promising though. My first test is encouraging. Indeed, without pre-shuffling the dataset, each train/val shard contains a subset of the samples. I'll run a full experiment to confirm this is leading to my issue.
I confirm that was the issue. I had to pre-shuffle the dataset before sharding it.
Thank you very much @JanuszL and @klecki for your valuable suggestions.