DALI icon indicating copy to clipboard operation
DALI copied to clipboard

Wrong accuracy with external data in a distributed setting

Open thomas-bouvier opened this issue 2 years ago • 4 comments

Hello,

First of all, many thanks for the great library.

I'm using the custom DALI dalaloader object below, which is in turn leveraging an external source ExternalInputIterator. The latter is reading the files train_tasksets.txt (containing paths to images) and val_tasksets.txt (containing labels). I'm running that code in a distributed setting using Horovod.

My issue: when training a ResNet-50 model with the ImageNet dataset (blurred version), I get a top1 accuracy which is too high (above > 80%), which makes me think something is wrong with the data pipeline. Do you see something fishy with the code below?

I added is a "wrapper" to yield the data returned by the DALIGenericIterator.

What I tried: the accuracy achieved with the exact same dataset passed to fn.readers.file (without using an external source) is correct (about 75.3). Then the issue must come from my ExternalInputIterator.

class ExternalInputIterator:
    def __init__(self, data_dir, split, batch_size, shard_id, num_shards):
        self.batch_size = batch_size
        self.shard_id = shard_id
        self.num_shards = num_shards

        with open(f"{data_dir}/{split}_taskset.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line != '']
        with open(f"{data_dir}/{split}_taskset_labels.txt", 'r') as f:
            self.labels = [line.rstrip() for line in f if line != '']

        self.data_set_len = len(self.files)
        inf = self.data_set_len * shard_id // num_shards
        sup = self.data_set_len * (shard_id + 1) // num_shards
        self.files = np.array(self.files[inf:sup])
        self.labels = np.array(self.labels[inf:sup])
        self.n = len(self.files)
        self.full_iterations = self.n // batch_size
        self.iterations = math.ceil(self.n / batch_size)

    def __iter__(self):
        self.i = 0
        perm = np.random.permutation(len(self.files))
        self.files = self.files[perm]
        self.labels = self.labels[perm]
        return self

    def __next__(self):
        if self.i >= self.n:
            self.__iter__()
            raise StopIteration
        batch_files = []
        batch_labels = []
        for _ in range(self.batch_size):
            sample_idx = self.i % self.n
            with open(self.files[sample_idx], 'rb') as f:
                batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
            batch_labels.append(np.int64([self.labels[sample_idx]]))
            self.i += 1
        return batch_files, batch_labels

class DaliDataLoader(object):
    def __init__(self, data_dir, batch_size, num_workers=1,
                 device_id=0, shard_id=0, num_shards=1, precision=32,
                 training=True, **kwargs):
        self.batch_size = batch_size
        decoder_device, device = ("mixed", "gpu") if cuda else ("cpu", "cpu")
        crop_size = 224
        val_size = 256
        img_type = types.FLOAT16 if precision == 16 else types.FLOAT

        # ask nvJPEG to preallocate memory for the biggest sample in ImageNet for CPU and GPU to avoid reallocations in runtime
        device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
        host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
        # ask HW NVJPEG to allocate memory ahead for the biggest image in the data set to avoid reallocations in runtime
        preallocate_width_hint = 5980 if decoder_device == 'mixed' else 0
        preallocate_height_hint = 6430 if decoder_device == 'mixed' else 0

        split = 'train' if training else 'val'
        self.external_data = ExternalInputIterator(
            data_dir, split, batch_size, shard_id, num_shards)

        pipeline = Pipeline(batch_size, num_workers, device_id)
        with pipeline:
           // I get the expected accuracy using the reader directly
            #inputs, target = fn.readers.file(file_root=data_dir,
            #                shard_id=shard_id,
            #                num_shards=num_shards,
            #                random_shuffle=training,
            #                pad_last_batch=True,
            #                name="Reader")
            inputs, target = fn.external_source(
                source=self.external_data, num_outputs=2)
            if training:
                images = fn.decoders.image_random_crop(inputs,
                                                       device=decoder_device, output_type=types.RGB,
                                                       device_memory_padding=device_memory_padding,
                                                       host_memory_padding=host_memory_padding,
                                                       preallocate_width_hint=preallocate_width_hint,
                                                       preallocate_height_hint=preallocate_height_hint,
                                                       random_aspect_ratio=[
                                                           0.8, 1.25],
                                                       random_area=[0.1, 1.0],
                                                       num_attempts=100)
                images = fn.resize(images,
                                   device=device,
                                   resize_x=crop_size,
                                   resize_y=crop_size,
                                   interp_type=types.INTERP_TRIANGULAR)
                mirror = fn.random.coin_flip(probability=0.5)
            else:
                images = fn.decoders.image(inputs,
                                           device=decoder_device,
                                           output_type=types.RGB)
                images = fn.resize(images,
                                   device=device,
                                   size=val_size,
                                   mode="not_smaller",
                                   interp_type=types.INTERP_TRIANGULAR)
                mirror = False

            images = fn.crop_mirror_normalize(images.gpu() if cuda else images,
                                              dtype=img_type,
                                              output_layout="CHW",
                                              crop=(crop_size, crop_size),
                                              mean=[0.485 * 255, 0.456 *
                                                    255, 0.406 * 255],
                                              std=[0.229 * 255, 0.224 *
                                                   255, 0.225 * 255],
                                              mirror=mirror)
            if cuda:
                target = target.gpu()
            pipeline.set_outputs(images, target)

        self.iterator = DALIGenericIterator(
            pipeline,
            ["x", "y"],
            auto_reset=True
        )

    def __len__(self):
        return self.external_data.full_iterations

    def __iter__(self):
        for token in self.iterator:
            x = token[0]['x']
            y = token[0]['y'].squeeze().long()
            yield x, y

This is how I create my data loaders.

    train_loader = DaliDataLoader(
                args.train_dir, allreduce_batch_size,
                device_id=hvd.local_rank(), shard_id=hvd.rank(),
                num_shards=hvd.size(), training=True)

    val_loader = DaliDataLoader(
            args.val_dir, args.val_batch_size,
            device_id=hvd.local_rank(), shard_id=hvd.rank(),
            num_shards=hvd.size(), training=False)

What do I do wrong?

Thanks!

thomas-bouvier avatar Aug 28 '22 03:08 thomas-bouvier

Hi @thomas-bouvier, my suggestion would be to check the behaviour around the end of the epoch. By default DALIGenericIterator uses Fill policy, and the reader you used is configured as pad_last_batch=True - where the last sample is repeated.

In the case of your external source iterator, you may fill the last batch with up to batch_size - 1 samples from the beginning of the epoch. This may impact the results if the epoch size is not divisible by the batch size. You also have __len__ defined that suggest using only full batches, in that case you are dropping some samples, maybe it impacts the validation?

I recommend the sharding section of the doc to see what are the possible options: https://docs.nvidia.com/deeplearning/dali/main-user-guide/docs/advanced_topics_sharding.html

I would certainly check what is happening around the epoch start/end, you can, for example use both the external source operator and the file reader without shuffling, and see what images are returned. We recently introduced some experimental debugging capabilities, but you would need to run the pipeline using the .run() method, as debugging via framework iterators is not supported yet: https://docs.nvidia.com/deeplearning/dali/main-user-guide/docs/advanced_topics_sharding.html Still there are some printing capabilities that can be used with outputs of regular pipeline. Some other tip for debugging is to assign distinct label to every file, so you can easily track what is happening.

klecki avatar Aug 29 '22 15:08 klecki

Hello @klecki,

Thank you for your detailed answer. I used your tip of assigning distinct labels to every file to help debugging. I was then able to identify the behavior at the end of the epoch. I tweaked my function as follows to keep a partial batch only (before, I was filling it reusing samples from the beginning of the epoch using the % operation). This complies with my reader's behavior, for the end of the epoch at least.

    def __next__(self):
        if self.i >= self.n:
            self.__iter__()
            raise StopIteration
        batch_files = []
        batch_labels = []
        for _ in range(self.batch_size):
            if self.i >= self.n:
                break
            sample_idx = self.i
            with open(self.files[sample_idx], 'rb') as f:
                batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
            batch_labels.append(np.int64([self.labels[sample_idx]]))
            self.i += 1
        return batch_files, batch_labels

However, this doesn't fix the problem. I still achieve an accuracy 10% higher with my iterator.

The pipeline must be returning wrong images for some reason... I will try to compare the two outputs. First thing I observed though: the first batch doesn't contain the same images when turning the shuffling off. I'm at a total loss here.

thomas-bouvier avatar Sep 05 '22 04:09 thomas-bouvier

Hi, sorry, I linked the wrong page for the debug mode: https://docs.nvidia.com/deeplearning/dali/user-guide/docs/examples/general/debug_mode.html

Can you elaborate on the:

First thing I observed though: the first batch doesn't contain the same images when turning the shuffling off. I'm at a total loss here.

Is it comparing to a previous run, or to your external source input? If you had shuffling turned on, the first epoch would also be shuffled.

You can also see if passing fixed seeds to DALI pipeline and numpy helps with repeatability of your reproduction.

klecki avatar Sep 07 '22 13:09 klecki

You can also check the validation with external source against the network trained with file reader (or with other preprocessing) and other way round. That could probably help to narrow the problem down to just training or just validation.

klecki avatar Sep 07 '22 13:09 klecki

I'm coming back to this issue, which I still don't understand. I will provide more info tomorrow. 10% of difference in accuracy on ImageNet is huge, so I would be surprised that the behavior towards the epoch is the issue. I observe the higher accuracy both on validation and training curves.

Just to recall, this is the implementation of my current iterator, which now keeps partial batches at the end of the epoch.

class ExternalInputIterator:
    def __init__(self, data_dir, split, batch_size, shard_id, num_shards):
        self.split = split
        self.batch_size = batch_size
        self.shard_id = shard_id
        self.num_shards = num_shards

        with open(f"{data_dir}/{split}_paths.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line != '']
        with open(f"{data_dir}/{split}_labels.txt", 'r') as f:
            self.labels = [line.rstrip() for line in f if line != '']

        self.data_set_len = len(self.files)

        # based on the shard_id and total number of GPUs - world size
        # get proper shard
        inf = self.data_set_len * shard_id // num_shards
        sup = self.data_set_len * (shard_id + 1) // num_shards
        self.files = np.array(self.files[inf:sup])
        self.labels = np.array(self.labels[inf:sup])
        self.n = len(self.files)
        self.full_iterations = self.n // batch_size
        self.iterations = math.ceil(self.n / batch_size)

    def __iter__(self):
        self.i = 0
        if self.split == 'train':
            perm = np.random.permutation(len(self.files))
            self.files = self.files[perm]
            self.labels = self.labels[perm]
        return self

    def __next__(self):
        if self.i >= self.n:
            self.__iter__()
            raise StopIteration
        batch_files = []
        batch_labels = []
        for _ in range(self.batch_size):
            # Use partial batch
            if self.i >= self.n:
                break
            sample_idx = self.i % self.n
            with open(self.files[sample_idx], 'rb') as f:
                batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
            batch_labels.append(np.int64([self.labels[sample_idx]]))
            self.i += 1
        return batch_files, batch_labels

thomas-bouvier avatar Oct 19 '22 23:10 thomas-bouvier

Hi @thomas-bouvier,

I created a toy example to see if there is something wrong with shuffling or sharding but it seems to work as expected:

import nvidia.dali.fn as fn
from nvidia.dali import pipeline_def
from nvidia.dali.plugin.pytorch import DALIGenericIterator
import numpy as np
import nvidia.dali.types as types

import math
class ExternalInputIterator:
    def __init__(self, batch_size, shard_id, num_shards, split = 'train'):
        self.split = split
        self.batch_size = batch_size
        self.shard_id = shard_id
        self.num_shards = num_shards

        self.files = [i for i in range (1000)]
        self.labels = [i for i in range (1000)]

        self.data_set_len = len(self.files)

        # based on the shard_id and total number of GPUs - world size
        # get proper shard
        inf = self.data_set_len * shard_id // num_shards
        sup = self.data_set_len * (shard_id + 1) // num_shards
        self.files = np.array(self.files[inf:sup])
        self.labels = np.array(self.labels[inf:sup])
        self.n = len(self.files)
        self.full_iterations = self.n // batch_size
        self.iterations = math.ceil(self.n / batch_size)

    def __iter__(self):
        self.i = 0
        if self.split == 'train':
            perm = np.random.permutation(len(self.files))
            self.files = self.files[perm]
            self.labels = self.labels[perm]
        return self

    def __next__(self):
        if self.i >= self.n:
            self.__iter__()
            raise StopIteration
        batch_files = []
        batch_labels = []
        for _ in range(self.batch_size):
            # Use partial batch
            if self.i >= self.n:
                break
            sample_idx = self.i % self.n
            batch_files.append(np.int64([self.files[sample_idx]]))
            batch_labels.append(np.int64([self.labels[sample_idx]]))
            self.i += 1
        return batch_files, batch_labels

batch_size = 5
shards_num = 3
epochs = 3
for _ in range(epochs):
    for shard_id in range(shards_num):
        @pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
        def callable_pipeline():
            files, labels = fn.external_source(source=ExternalInputIterator(batch_size, shard_id, shards_num), num_outputs=2,
                                               dtype=[types.INT64, types.INT64])
            return files, labels
        pipe_new = callable_pipeline()
        train_data = DALIGenericIterator(
            [pipe_new], ['data', 'labels'],
        )

        out = []
        for i, d in enumerate(train_data):
            out.append(np.array(d[0]["data"]))

        concat_out = np.reshape(np.concatenate(out), -1)
        print(concat_out)
        out = sorted(concat_out)
        print(len(out), len(set(out)), min(out), max(out))

JanuszL avatar Oct 20 '22 08:10 JanuszL

Thank you for your example @JanuszL. It is also working for me. Actually, every toy example I'm trying seems to be working perfectly fine. However with the full application (using ResNet-50 on ImageNet-blurred), I still achieve a 10% difference in final accuracy.

I came up with 3 different data loaders to reproduce the issue. The baseline is the PyTorch default data loader: 76% accuracy.

  1. DataLoader using the DALI reader + the file_root parameter. The application is reading all the files stored in file_root. This is working, I achieve the same accuracy.
  2. DataLoader using the DALI reader + the file_list parameter, pointing to a whitespace-separated filename label pair per line. The achieved accuracy is too high (> 80%).
  3. DataLoader using the external iterator. I'm using the same whitespace-separated filename label pair to read from. The achieved accuracy is too high (> 80%).

Item 2) is not using the iterator, and is still achieving a (too) high accuracy. So I'll double check the content of the paths/labels file.

Here is the code. Dataloaders 2, 3 and 4 can be found by searching "===1", "===2", and "===3".

import torch
import argparse
import torch.backends.cudnn as cudnn
import torch.multiprocessing as mp
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
import horovod.torch as hvd
import os
import math
import wandb
import collections
import numpy as np
import types
import nvidia.dali.types as types
import nvidia.dali.fn as fn
import random

from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms, models
from tqdm import tqdm
from datetime import datetime
from nvidia.dali import pipeline_def
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy
from nvidia.dali.pipeline import Pipeline
from random import shuffle
from continuum.datasets import ImageNet1000


# Training settings
parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',
                                 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
                    help='path to training data')
parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),
                    help='path to validation data')
parser.add_argument('--log-dir', default='./logs',
                    help='tensorboard log directory')
parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',
                    help='checkpoint file format')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                    help='use fp16 compression during allreduce')
parser.add_argument('--batches-per-allreduce', type=int, default=1,
                    help='number of batches processed locally before '
                         'executing allreduce across workers; it multiplies '
                         'total batch size.')
parser.add_argument('--use-adasum', action='store_true', default=False,
                    help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
                    help='apply gradient predivide factor in optimizer (default: 1.0)')

# Default settings from https://arxiv.org/abs/1706.02677.
parser.add_argument('--batch-size', type=int, default=32,
                    help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
                    help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=90,
                    help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.0125,
                    help='learning rate for a single GPU')
parser.add_argument('--warmup-epochs', type=float, default=5,
                    help='number of warmup epochs')
parser.add_argument('--momentum', type=float, default=0.9,
                    help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.00005,
                    help='weight decay')

parser.add_argument('--no-cuda', action='store_true', default=False,
                    help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42,
                    help='random seed')


class ExternalInputIterator:
    def __init__(self, split, batch_size, shard_id, num_shards, shuffle):
        self.split = split
        self.batch_size = batch_size
        self.shard_id = shard_id
        self.num_shards = num_shards
        self.shuffle = shuffle

        with open(f"{split}_paths.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line != '']
        with open(f"{split}_labels.txt", 'r') as f:
            self.labels = [line.rstrip() for line in f if line != '']

        self.data_set_len = len(self.files)

        # based on the shard_id and total number of GPUs - world size
        # get proper shard
        inf = self.data_set_len * shard_id // num_shards
        sup = self.data_set_len * (shard_id + 1) // num_shards
        self.files = np.array(self.files[inf:sup])
        self.labels = np.array(self.labels[inf:sup])
        self.n = len(self.files)
        self.full_iterations = self.n // batch_size
        self.iterations = math.ceil(self.n / batch_size)

    def __iter__(self):
        self.i = 0
        if self.split == 'train' and self.shuffle:
            perm = np.random.permutation(len(self.files))
            self.files = self.files[perm]
            self.labels = self.labels[perm]
        return self

    def __next__(self):
        if self.i >= self.n:
            self.__iter__()
            raise StopIteration
        batch_files = []
        batch_labels = []
        #batch_paths = []
        for _ in range(self.batch_size):
            # Drop the batch
            #if self.i >= self.n:
            #    self.__iter__()
            #    raise StopIteration
            # Use partial batch
            if self.i >= self.n:
                break
            sample_idx = self.i % self.n
            with open(self.files[sample_idx], 'rb') as f:
                batch_files.append(np.frombuffer(f.read(), dtype=np.uint8))
            batch_labels.append(np.int64([self.labels[sample_idx]]))
            #batch_paths.append(self.files[sample_idx])
            self.i += 1
        return batch_files, batch_labels
        #return batch_files, batch_labels, batch_paths

class DaliDataLoader(object):
    def __init__(self, data_dir, task_id, cuda, batch_size, num_workers=1,
                 device_id=0, shard_id=0, num_shards=1, precision=32,
                 training=True, iterator=False, discover_files=False, shuffle=False, **kwargs):
        self.batch_size = batch_size
        self.iterator = iterator
        decoder_device, device = ("mixed", "gpu") if cuda else ("cpu", "cpu")
        crop_size = 224
        val_size = 256
        img_type = types.FLOAT16 if precision == 16 else types.FLOAT

        # ask nvJPEG to preallocate memory for the biggest sample in ImageNet for CPU and GPU to avoid reallocations in runtime
        device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
        host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
        # ask HW NVJPEG to allocate memory ahead for the biggest image in the data set to avoid reallocations in runtime
        preallocate_width_hint = 5980 if decoder_device == 'mixed' else 0
        preallocate_height_hint = 6430 if decoder_device == 'mixed' else 0

        split = 'train' if training else 'val'

        pipeline = Pipeline(batch_size, num_workers, device_id)
        with pipeline:
            if iterator:
                self.external_data = ExternalInputIterator(
                    split, batch_size, shard_id, num_shards, shuffle)
                inputs, target = fn.external_source(
                    source=self.external_data, num_outputs=2)
            else:
                if discover_files:
                    inputs, target = fn.readers.file(file_root=data_dir,
                                                    shard_id=shard_id,
                                                    num_shards=num_shards,
                                                    random_shuffle=(training and shuffle),
                                                    pad_last_batch=True,
                                                    name="Reader")
                else:
                    # Testing this
                    inputs, target = fn.readers.file(file_list=f"{split}_paths_labels.txt",
                                                    shard_id=shard_id,
                                                    num_shards=num_shards,
                                                    random_shuffle=(training and shuffle),
                                                    pad_last_batch=True,
                                                    name="Reader")

            if training:
                images = fn.decoders.image_random_crop(inputs,
                                                       device=decoder_device, output_type=types.RGB,
                                                       device_memory_padding=device_memory_padding,
                                                       host_memory_padding=host_memory_padding,
                                                       preallocate_width_hint=preallocate_width_hint,
                                                       preallocate_height_hint=preallocate_height_hint,
                                                       random_aspect_ratio=[
                                                           0.8, 1.25],
                                                       random_area=[0.1, 1.0],
                                                       num_attempts=100)
                images = fn.resize(images,
                                   device=device,
                                   resize_x=crop_size,
                                   resize_y=crop_size,
                                   interp_type=types.INTERP_TRIANGULAR)
                mirror = fn.random.coin_flip(probability=0.5)
            else:
                images = fn.decoders.image(inputs,
                                           device=decoder_device,
                                           output_type=types.RGB)
                images = fn.resize(images,
                                   device=device,
                                   size=val_size,
                                   mode="not_smaller",
                                   interp_type=types.INTERP_TRIANGULAR)
                mirror = False

            images = fn.crop_mirror_normalize(images.gpu() if cuda else images,
                                              dtype=img_type,
                                              output_layout="CHW",
                                              crop=(crop_size, crop_size),
                                              mean=[0.485 * 255, 0.456 *
                                                    255, 0.406 * 255],
                                              std=[0.229 * 255, 0.224 *
                                                   255, 0.225 * 255],
                                              mirror=mirror)
            if cuda:
                target = target.gpu()
            pipeline.set_outputs(images, target)

        self.dali_iterator = DALIGenericIterator(
            pipeline,
            ["x", "y"],
            last_batch_policy=LastBatchPolicy.PARTIAL,
            auto_reset=True
        )

    def __len__(self):
        if self.iterator:
            return self.external_data.iterations
        else:
            return len(self.dali_iterator)

    def __iter__(self):
        for token in self.dali_iterator:
            x = token[0]['x']
            y = token[0]['y'].squeeze(-1).long()
            yield x, y

def train(epoch):
    model.train()
    train_loss = Metric('train_loss')
    train_accuracy = Metric('train_accuracy')

    with tqdm(total=len(train_loader),
              desc='Train Epoch     #{}'.format(epoch + 1),
              disable=not verbose) as t:
        for batch_idx, (data, target) in enumerate(train_loader):
            adjust_learning_rate(epoch, batch_idx)

            if args.cuda:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            # Split data into sub-batches of size batch_size
            for i in range(0, len(data), args.batch_size):
                data_batch = data[i:i + args.batch_size]
                target_batch = target[i:i + args.batch_size]
                output = model(data_batch)
                train_accuracy.update(accuracy(output, target_batch))
                loss = F.cross_entropy(output, target_batch)
                train_loss.update(loss)
                # Average gradients among sub-batches
                loss.div_(math.ceil(float(len(data)) / args.batch_size))
                loss.backward()
            # Gradient is applied across all ranks
            optimizer.step()
            t.set_postfix({'loss': train_loss.avg.item(),
                           'accuracy': 100. * train_accuracy.avg.item()})
            t.update(1)

    if log_writer:
        log_writer.add_scalar('train/loss', train_loss.avg, epoch)
        log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch)
    if hvd.rank() == 0 and hvd.local_rank() == 0:
        wandb.log({f"epoch": epoch,
                    f"train_loss": train_loss.avg,
                    f"train_prec1": train_accuracy.avg})


def validate(epoch):
    model.eval()
    val_loss = Metric('val_loss')
    val_accuracy = Metric('val_accuracy')

    with tqdm(total=len(val_loader),
              desc='Validate Epoch  #{}'.format(epoch + 1),
              disable=not verbose) as t:
        with torch.no_grad():
            for data, target in val_loader:
                if args.cuda:
                    data, target = data.cuda(), target.cuda()
                output = model(data)

                val_loss.update(F.cross_entropy(output, target))
                val_accuracy.update(accuracy(output, target))
                t.set_postfix({'loss': val_loss.avg.item(),
                               'accuracy': 100. * val_accuracy.avg.item()})
                t.update(1)

    if log_writer:
        log_writer.add_scalar('val/loss', val_loss.avg, epoch)
        log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch)
    if hvd.rank() == 0 and hvd.local_rank() == 0:
        wandb.log({f"epoch": epoch,
                    f"val_loss": val_loss.avg,
                    f"val_prec1": val_accuracy.avg})

# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
def adjust_learning_rate(epoch, batch_idx):
    if epoch < args.warmup_epochs:
        epoch += float(batch_idx + 1) / len(train_loader)
        lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
    elif epoch < 30:
        lr_adj = 1.
    elif epoch < 60:
        lr_adj = 1e-1
    elif epoch < 80:
        lr_adj = 1e-2
    else:
        lr_adj = 1e-3
    for param_group in optimizer.param_groups:
        param_group['lr'] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj


def accuracy(output, target):
    # get the index of the max log-probability
    pred = output.max(1, keepdim=True)[1]
    return pred.eq(target.view_as(pred)).cpu().float().mean()


def save_checkpoint(epoch):
    if hvd.rank() == 0:
        filepath = args.checkpoint_format.format(epoch=epoch + 1)
        state = {
            'model': model.state_dict(),
            'optimizer': optimizer.state_dict(),
        }
        torch.save(state, filepath)


# Horovod: average metrics from distributed training.
class Metric(object):
    def __init__(self, name):
        self.name = name
        self.sum = torch.tensor(0.)
        self.n = torch.tensor(0.)

    def update(self, val):
        self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)
        self.n += 1

    @property
    def avg(self):
        return self.sum / self.n


if __name__ == '__main__':
    args = parser.parse_args()
    args.cuda = not args.no_cuda and torch.cuda.is_available()

    allreduce_batch_size = args.batch_size * args.batches_per_allreduce

    hvd.init()
    torch.manual_seed(args.seed)

    if args.cuda:
        # Horovod: pin GPU to local rank.
        torch.cuda.set_device(hvd.local_rank())
        torch.cuda.manual_seed(args.seed)

    if hvd.rank() == 0 and hvd.local_rank() == 0:
        wandb.init(project="horovod-baselines")
        run_name = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}-{wandb.run.name}"
        wandb.run.name = run_name

    cudnn.benchmark = True

    # If set > 0, will resume training from a given checkpoint.
    resume_from_epoch = 0

    # Horovod: broadcast resume_from_epoch from rank 0 (which will have
    # checkpoints) to other ranks.
    resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,
                                      name='resume_from_epoch').item()

    # Horovod: print logs on the first worker.
    verbose = 1 if hvd.rank() == 0 else 0

    # Horovod: write TensorBoard logs on first worker.
    log_writer = SummaryWriter(args.log_dir) if hvd.rank() == 0 else None

    # Horovod: limit # of CPU threads to be used per worker.
    torch.set_num_threads(4)

    kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}
    # When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
    # issues with Infiniband implementations that are not fork-safe
    if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
            mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
        kwargs['multiprocessing_context'] = 'forkserver'

    """
    # file_root parameter
    # ===1
    train_loader = DaliDataLoader(
            args.train_dir, 0, True, allreduce_batch_size,
            device_id=hvd.local_rank(), shard_id=hvd.rank(),
            num_shards=hvd.size(), training=True, iterator=False, discover_files=True, shuffle=True)

    val_loader = DaliDataLoader(
            args.val_dir, 0, True, args.val_batch_size,
            device_id=hvd.local_rank(), shard_id=hvd.rank(),
            num_shards=hvd.size(), training=False, iterator=False, discover_files=True, shuffle=False)

    # file_list parameter
    # ===2
    train_loader = DaliDataLoader(
            args.train_dir, 0, True, allreduce_batch_size,
            device_id=hvd.local_rank(), shard_id=hvd.rank(),
            num_shards=hvd.size(), training=True, iterator=False, shuffle=True)

    val_loader = DaliDataLoader(
            args.val_dir, 0, True, args.val_batch_size,
            device_id=hvd.local_rank(), shard_id=hvd.rank(),
            num_shards=hvd.size(), training=False, iterator=False, shuffle=False)
    """

    # iterator
    # ===3
    train_loader = DaliDataLoader(
            args.train_dir, 0, True, allreduce_batch_size,
            device_id=hvd.local_rank(), shard_id=hvd.rank(),
            num_shards=hvd.size(), training=True, iterator=True, shuffle=True)

    val_loader = DaliDataLoader(
            args.val_dir, 0, True, args.val_batch_size,
            device_id=hvd.local_rank(), shard_id=hvd.rank(),
            num_shards=hvd.size(), training=False, iterator=True, shuffle=False)


    # Set up standard ResNet-50 model.
    model = models.resnet50()

    # By default, Adasum doesn't need scaling up learning rate.
    # For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce
    lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1

    if args.cuda:
        # Move model to GPU.
        model.cuda()
        # If using GPU Adasum allreduce, scale learning rate by local_size.
        if args.use_adasum and hvd.nccl_built():
            lr_scaler = args.batches_per_allreduce * hvd.local_size()

    # Horovod: scale learning rate by the number of GPUs.
    optimizer = optim.SGD(model.parameters(),
                          lr=(args.base_lr *
                              lr_scaler),
                          momentum=args.momentum, weight_decay=args.wd)

    # Horovod: (optional) compression algorithm.
    compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
        optimizer, named_parameters=model.named_parameters(),
        compression=compression,
        backward_passes_per_step=args.batches_per_allreduce,
        op=hvd.Adasum if args.use_adasum else hvd.Average,
        gradient_predivide_factor=args.gradient_predivide_factor)

    # Restore from a previous checkpoint, if initial_epoch is specified.
    # Horovod: restore on the first worker which will broadcast weights to other workers.
    if resume_from_epoch > 0 and hvd.rank() == 0:
        filepath = args.checkpoint_format.format(epoch=resume_from_epoch)
        checkpoint = torch.load(filepath)
        model.load_state_dict(checkpoint['model'])
        optimizer.load_state_dict(checkpoint['optimizer'])

    # Horovod: broadcast parameters & optimizer state.
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)

    for epoch in range(resume_from_epoch, args.epochs):
        train(epoch)
        validate(epoch)
        #save_checkpoint(epoch)

thomas-bouvier avatar Nov 02 '22 09:11 thomas-bouvier

Hi @thomas-bouvier,

  1. and 2) should be functionally equivalent. When the file_root is provided DALI traverse the root directory, assigns a unique label to each subdirectory, and includes all images from it. When the file_list is provided it is used directly to build the list of all files. It seems that in your case the file_list doesn't reflect the file you have on your disc. What you can do is to:
print(pipeline.reader_meta())

for the cases with file_root and file_list to compare if the reader sees the same amount of files.

JanuszL avatar Nov 02 '22 09:11 JanuszL

1 and 2 return the same string:

{'Reader': {'epoch_size': 1281066, 'epoch_size_padded': 1281072, 'number_of_shards': 8, 'shard_id': 1, 'pad_last_batch': 1, 'stick_to_shard': 0}}

I still have to check the content of file_list then.

How could I get the list of image paths returned with file_root? This would be useful to have for the next step :)

thomas-bouvier avatar Nov 02 '22 14:11 thomas-bouvier

Hi @thomas-bouvier,

Please check the get_property operator to get the source info of the returned tensor. The metadata is not propagated through the operator so you need to call it indirectly on the output from the reader, like:

from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import numpy as np

max_batch_size = 2

@pipeline_def
def simple_pipeline():
    rgb_files, _ = fn.readers.file(file_root=YOUR_PATH)
    info = fn.get_property(rgb_files, key="source_info")
    info = fn.pad(info)
    return info

def _uint8_tensor_to_string(t):
    return np.array(t).tobytes().decode()

pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)
pipe.build()
o = pipe.run()
print(_uint8_tensor_to_string(o[0][0]))

JanuszL avatar Nov 02 '22 14:11 JanuszL

Thank you. I'm trying right now. However I'm not sure of anything given the issues I have. How do I go through the whole dataset once using this example, having each image exactly once?

A question by the way, is it correct to use len() on a DALIGenericIterator?

thomas-bouvier avatar Nov 02 '22 15:11 thomas-bouvier

Hi @thomas-bouvier,

A question by the way, is it correct to use len() on a DALIGenericIterator?

Yes, it should work. It returns the number of iterations for given iterator instances (not the number of samples) - see https://github.com/NVIDIA/DALI/blob/main/dali/python/nvidia/dali/plugin/base_iterator.py#L439.

JanuszL avatar Nov 02 '22 16:11 JanuszL

Thanks. Something is definitely wrong with the content of file_list.

I run my experiments on a single machine with 8 workers = 8 GPUs. What should be the content of file_list? Should I use one file with the N paths or 8 different files with 1/8 of the paths in each of them?

thomas-bouvier avatar Nov 09 '22 14:11 thomas-bouvier

Hi @thomas-bouvier,

Please use one file with the whole data set description. The reader will shard it later.

JanuszL avatar Nov 09 '22 14:11 JanuszL

Ok, my mistake. I got confused with my previous message. As stated in the original message of this issue, only the external iterator has an unexpected result. Here are the updated results. Baseline is the PyTorch DataLoader, 76% accuracy, orange on the plot.

  1. DataLoader using the DALI reader + the file_root parameter. The application is reading all the files stored in file_root. This is working, I achieve the same accuracy. ===1 in the code, yellow on the plot.
  2. DataLoader using the DALI reader + the file_list parameter, pointing to a whitespace-separated filename label pair per line. This is working, I achieve the same accuracy. ===2 in the code, green on the plot.
  3. DataLoader using the external iterator. I'm using the same whitespace-separated filename label pair to read from. The achieved accuracy is too high (> 80%). ===3 in the code, red on the plot.

This results shows that only the external iterator has an inconsistent accuracy. Thus the content of file_list is not the problem.

Here are the plots:

Screenshot_20221110_101256

I wanted to use the external iterator to eventually load the training data dynamically, but my colleagues can't move on with that approach as this simple ImageNet use case doesn't achieve the expected accuracy. I just don't understand what can go wrong with such a simple use case. My last test will be to try printing paths. If this doesn't work out, what other alternative could we use as a replacement for the DALI external iterator?

Thank you :)

thomas-bouvier avatar Nov 10 '22 09:11 thomas-bouvier

Hi @thomas-bouvier ,

I'm happy that the file reader works as expected. In the case of the external source, you need to make sure that in each epoch all samples from the data set are returned and there are no repetitions. Printing or dumping the loaded file names into the file would be probably the best way to go now.

JanuszL avatar Nov 10 '22 10:11 JanuszL

I did some more testing with the external iterator. I attached a unique id to each file, that I printed in my training loop. Each of my 8 GPUs writes to a local file. I then merged the 8 files for each epoch.

What I observed: for every epoch, the training loop sees 1120932 samples. But they are not necessarily different, as every sample is seen 8 times.

Here is my hypothesis given the code above: each GPU generates a permutation with the same seed, so each GPU is working with the exact same samples, producing a higher accuracy in the end. Would that make sense?

def __iter__(self):
        self.i = 0
        if self.split == 'train' and self.shuffle:
            perm = np.random.permutation(len(self.files))
            self.files = self.files[perm]
            self.labels = self.labels[perm]
        return self

thomas-bouvier avatar Nov 15 '22 15:11 thomas-bouvier

Hi @thomas-bouvier,

If I understand correctly:

        inf = self.data_set_len * shard_id // num_shards
        sup = self.data_set_len * (shard_id + 1) // num_shards
        self.files = np.array(self.files[inf:sup])
        self.labels = np.array(self.labels[inf:sup])

is supposed to make sure that the data set is split into the GPU-number of shards so even if the permutations (randomness) within each shard are similar it should not lead to repeating samples inside the epoch. One more thought comes to my mind. With this approach, each shard sees only a subset of labels. Can you preshuffle the whole data set globally first and then shard it, otherwise the first shard with see only labels 0-1000/GPU_num, and so on.

JanuszL avatar Nov 15 '22 16:11 JanuszL

Hey @JanuszL,

You're right. I got confused once again. The ids I attached to files were not sharded correctly, resulting in a mismatch between actual samples and ids. I confirm each id (and thus sample) is seen exactly once.

Your thought is really promising though. My first test is encouraging. Indeed, without pre-shuffling the dataset, each train/val shard contains a subset of the samples. I'll run a full experiment to confirm this is leading to my issue.

thomas-bouvier avatar Nov 16 '22 14:11 thomas-bouvier

I confirm that was the issue. I had to pre-shuffle the dataset before sharding it.

Thank you very much @JanuszL and @klecki for your valuable suggestions.

thomas-bouvier avatar Nov 19 '22 11:11 thomas-bouvier