torchrec icon indicating copy to clipboard operation
torchrec copied to clipboard

use horovod error

Open davidxiaozhi opened this issue 2 years ago • 3 comments

when I use horovod to run deepFM, it error File "/home/maer/zhipeng.li/project/torch_rec_demo/torchrec_ctr/run_horovod_deepfm.py", line 451, in main(args) File "/home/maer/zhipeng.li/project/torch_rec_demo/torchrec_ctr/run_horovod_deepfm.py", line 380, in main model = init_model(device=device) File "/home/maer/zhipeng.li/project/torch_rec_demo/torchrec_ctr/run_horovod_deepfm.py", line 187, in init_model dmp_model = DistributedModelParallel( File "/root/conda/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 211, in init assert pg is not None, "Process group is not initialized"

the code

import argparse
import os
from distutils.version import LooseVersion


import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from filelock import FileLock
from torchvision import datasets, transforms
import torchrec
import horovod
import horovod.torch as hvd

from torchrec import EmbeddingBagCollection
from torchrec.datasets.utils import Batch
from torchrec.distributed import TrainPipelineSparseDist
from torchrec.distributed.embeddingbag import EmbeddingBagCollectionSharder
from torchrec.distributed.model_parallel import DistributedModelParallel
from torchrec.distributed.types import ModuleSharder
from torchrec.models.dlrm import DLRM, DLRMV2, DLRMTrain
from torchrec.models.deepfm import DeepFM,SimpleDeepFMNN
from torchrec.modules.embedding_configs import EmbeddingBagConfig
from torchrec.optim.keyed import CombinedOptimizer, KeyedOptimizerWrapper

from typing import cast, Iterator, List, Optional, Tuple


# Training settings
def parse_args(argv: List[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                        help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=42, metavar='S',
                        help='random seed (default: 42)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                        help='use fp16 compression during allreduce')
    parser.add_argument('--use-mixed-precision', action='store_true', default=False,
                        help='use mixed precision for training')
    parser.add_argument('--use-adasum', action='store_true', default=False,
                        help='use adasum algorithm to do reduction')
    parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
                        help='apply gradient predivide factor in optimizer (default: 1.0)')
    parser.add_argument('--data-dir',
                        help='location of the training dataset in the local filesystem (will be downloaded if needed)')
    # DMP config 
    parser.add_argument(
        "--num_embeddings",
        type=int,
        default=100_000,
        help="max_ind_size. The number of embeddings in each embedding table. Defaults"
        " to 100_000 if num_embeddings_per_feature is not supplied.",
    )
    parser.add_argument(
        "--num_embeddings_per_feature",
        type=str,
        default=None,
        help="Comma separated max_ind_size per sparse feature. The number of embeddings"
        " in each embedding table. 26 values are expected for the Criteo dataset.",
    )
    parser.add_argument(
        "--embedding_dim",
        type=int,
        default=64,
        help="Size of each embedding.",
    )
    
    # Arguments when not run through horovodrun
    parser.add_argument('--num-proc', type=int)
    parser.add_argument('--hosts', help='hosts to run on in notation: hostname:slots[,host2:slots[,...]]')
    parser.add_argument('--communication', help='collaborative communication to use: gloo, mpi')
    return parser.parse_args(argv)


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)
    
    
    
class CTRModelTrain(nn.Module):
    """
        nn.Module to wrap DLRM model to use with train_pipeline.
        dlrm_model = DLRMTrain(dlrm_module)
    """

    def __init__(
        self,
        ctr_module: SimpleDeepFMNN,
    ) -> None:
        super().__init__()
        self.model = ctr_module
        self.loss_fn: nn.Module = nn.BCEWithLogitsLoss()

    def forward(
        self, batch: Batch
    ) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor]]:
        """
        Args:
            batch: batch used with criteo and random data from torchrec.datasets
        Returns:
            Tuple[loss, Tuple[loss, logits, labels]]
        """
        logits = self.model(batch.dense_features, batch.sparse_features)
        logits = logits.squeeze()
        loss = self.loss_fn(logits, batch.labels.float())

        return loss, (loss.detach(), logits.detach(), batch.labels.detach())


def FunctionName(args,backend):
    from data.ctr_dataloader import get_dataloader, STAGES
    # TODO add CriteoIterDataPipe support and add random_dataloader arg
    train_dataloader = get_dataloader(args, backend, "train")
    val_dataloader = get_dataloader(args, backend, "val")
    test_dataloader = get_dataloader(args, backend, "test")

def init_model(device=torch.device("cpu")):
    print("start init the DMP for embedding")
    
    # 1.the embedding for EmbeddingBag
    from pyre_extensions import none_throws
    eb_configs = [
        EmbeddingBagConfig(
            name=f"t_{feature_name}",
            embedding_dim=args.embedding_dim,
            num_embeddings=none_throws(args.num_embeddings_per_feature)[feature_idx]
            if args.num_embeddings is None
            else args.num_embeddings,
            feature_names=[feature_name],
        )
        for feature_idx, feature_name in enumerate(torchrec.datasets.criteo.DEFAULT_CAT_NAMES)
    ]

    ebc =  EmbeddingBagCollection(
        tables=eb_configs,
        device=torch.device("meta") 
    )
    # 2.build the model  
    deepfm_model = SimpleDeepFMNN(num_dense_features=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES), 
                                 embedding_bag_collection=ebc, hidden_layer_size=1024, deep_fm_dimension=16 )
    train_model = CTRModelTrain(deepfm_model)
    
    #print the model for train
    print("the train model struct:",train_model)
    
    # 3.config and build dmp model
    from fbgemm_gpu.split_embedding_configs import EmbOptimType as OptimType
    fused_params = {
        "learning_rate": args.lr,
        "optimizer": OptimType.EXACT_ROWWISE_ADAGRAD
        # if args.adagrad
        # else OptimType.EXACT_SGD,
    }
    sharders = [
        EmbeddingBagCollectionSharder(fused_params=fused_params),
    ]
    
    dmp_model = DistributedModelParallel(
        module=train_model,
        init_data_parallel=False,
        device=device,
        sharders=cast(List[ModuleSharder[nn.Module]], sharders),
    )
    
    return dmp_model
 
#the  dmp  combine  optimizer
def optimizer_with_params(model:DistributedModelParallel ):
    def optimizer_by_params():
        if args.adagrad:
            return lambda params: torch.optim.Adagrad(params, lr=args.lr)
        else:
            return lambda params: torch.optim.SGD(params, lr=args.lr)

    dense_optimizer = KeyedOptimizerWrapper(
        dict(model.named_parameters()),
        optimizer_with_params(),
    )
    optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])
    return optimizer


def main(args):
    print("start main ....")
    def train_mixed_precision(epoch, scaler):
        model.train()
        # Horovod: set epoch to sampler for shuffling.
        train_sampler.set_epoch(epoch)
        for batch_idx, (data, target) in enumerate(train_loader):
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            with torch.cuda.amp.autocast():
                output = model(data)
                loss = F.nll_loss(output, target)

            scaler.scale(loss).backward()
            # Make sure all async allreduces are done
            optimizer.synchronize()
            # In-place unscaling of all gradients before weights update
            scaler.unscale_(optimizer)
            with optimizer.skip_synchronize():
                scaler.step(optimizer)
            # Update scaler in case of overflow/underflow
            scaler.update()

            if batch_idx % args.log_interval == 0:
                # Horovod: use train_sampler to determine the number of examples in
                # this worker's partition.
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tLoss Scale: {}'.format(
                    epoch, batch_idx * len(data), len(train_sampler),
                           100. * batch_idx / len(train_loader), loss.item(), scaler.get_scale()))

    def train_epoch(epoch):
        model.train()
        # Horovod: set epoch to sampler for shuffling.
        train_sampler.set_epoch(epoch)
        for batch_idx, (data, target) in enumerate(train_loader):
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % args.log_interval == 0:
                # Horovod: use train_sampler to determine the number of examples in
                # this worker's partition.
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_sampler),
                           100. * batch_idx / len(train_loader), loss.item()))

    def metric_average(val, name):
        tensor = torch.tensor(val)
        avg_tensor = hvd.allreduce(tensor, name=name)
        return avg_tensor.item()

    def test():
        model.eval()
        test_loss = 0.
        test_accuracy = 0.
        for data, target in test_loader:
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            # sum up batch loss
            test_loss += F.nll_loss(output, target, size_average=False).item()
            # get the index of the max log-probability
            pred = output.data.max(1, keepdim=True)[1]
            test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()

        # Horovod: use test_sampler to determine the number of examples in
        # this worker's partition.
        test_loss /= len(test_sampler)
        test_accuracy /= len(test_sampler)

        # Horovod: average metric values across workers.
        test_loss = metric_average(test_loss, 'avg_loss')
        test_accuracy = metric_average(test_accuracy, 'avg_accuracy')

        # Horovod: print output only on first rank.
        if hvd.rank() == 0:
            print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
                test_loss, 100. * test_accuracy))

    # Horovod: initialize library.
    hvd.init()
    torch.manual_seed(args.seed)
    print("the args.num_proc:",str(args.num_proc)," rank:"+str(hvd.rank())," size:"+str(hvd.size())+" !!!")
    if args.cuda:
        # Horovod: pin GPU to local rank.
        torch.cuda.set_device(hvd.local_rank())
        torch.cuda.manual_seed(args.seed)
    else:
        if args.use_mixed_precision:
            raise ValueError("Mixed precision is only supported with cuda enabled.")

    if (args.use_mixed_precision and LooseVersion(torch.__version__)
            < LooseVersion('1.6.0')):
        raise ValueError("""Mixed precision is using torch.cuda.amp.autocast(),
                            which requires torch >= 1.6.0""")

    # Horovod: limit # of CPU threads to be used per worker.
    torch.set_num_threads(1)

    kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
    # When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
    # issues with Infiniband implementations that are not fork-safe
    if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
            mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
        kwargs['multiprocessing_context'] = 'forkserver'

    data_dir = args.data_dir or './data'
    with FileLock(os.path.expanduser("~/.horovod_lock")):
        # train_dataset = \
        #     datasets.MNIST(data_dir, train=True, download=True,
        #                    transform=transforms.Compose([
        #                        transforms.ToTensor(),
        #                        transforms.Normalize((0.1307,), (0.3081,))
        #                    ]))
        train_dataset=torchrec.datasets.random.RandomRecDataset(
            keys=torchrec.datasets.criteo.DEFAULT_CAT_NAMES,
            batch_size=args.batch_size,
            hash_size=args.num_embeddings,
            hash_sizes=args.num_embeddings_per_feature
            if hasattr(args, "num_embeddings_per_feature")
            else None,
            manual_seed=args.seed if hasattr(args, "seed") else None,
            ids_per_feature=1,
            num_dense=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES),
            num_generated_batches=1000,
        ),

    # Horovod: use DistributedSampler to partition the training data.
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    # train_loader = torch.utils.data.DataLoader(
    #     train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,batch_size=args.batch_size, sampler=train_sampler, **kwargs
        
    )

    # test_dataset = \
    #     datasets.MNIST(data_dir, train=False, transform=transforms.Compose([
    #         transforms.ToTensor(),
    #         transforms.Normalize((0.1307,), (0.3081,))
    #     ]))
    test_dataset=torchrec.datasets.random.RandomRecDataset(
            keys=torchrec.datasets.criteo.DEFAULT_CAT_NAMES,
            batch_size=args.batch_size,
            hash_size=args.num_embeddings,
            hash_sizes=args.num_embeddings_per_feature
            if hasattr(args, "num_embeddings_per_feature")
            else None,
            manual_seed=args.seed if hasattr(args, "seed") else None,
            ids_per_feature=1,
            num_dense=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES),
        ),
    # Horovod: use DistributedSampler to partition the test data.
    test_sampler = torch.utils.data.distributed.DistributedSampler(
        test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
                                              sampler=test_sampler, **kwargs)
    
    if args.cuda and torch.cuda.is_available():
        device: torch.device = torch.device(f"cuda:{hvd.rank()}")
    else:
        device: torch.device = torch.device("cpu")
    #model = Net()
    model = init_model(device=device)
    
    #print the plan for DMP model
    print("print the plan for dmp module:")
    print(model.plan)
    

    # By default, Adasum doesn't need scaling up learning rate.
    lr_scaler = hvd.size() if not args.use_adasum else 1

    if args.cuda:
        # Move model to GPU.
        model.cuda()
        # If using GPU Adasum allreduce, scale learning rate by local_size.
        if args.use_adasum and hvd.nccl_built():
            lr_scaler = hvd.local_size()

    # Horovod: scale learning rate by lr_scaler.
    # optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
    #                       momentum=args.momentum)
    optimizer = optim.Adagrad(model.parameters(), lr=args.lr * lr_scaler)
    

    dense_optimizer = KeyedOptimizerWrapper(
        dict(model.named_parameters()),
        optimizer,
    )
    combine_optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])

    # Horovod: broadcast parameters & optimizer state.
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(combine_optimizer, root_rank=0)

    # Horovod: (optional) compression algorithm.
    compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(combine_optimizer,
                                         named_parameters=model.named_parameters(),
                                         compression=compression,
                                         op=hvd.Adasum if args.use_adasum else hvd.Average,
                                         gradient_predivide_factor=args.gradient_predivide_factor)

    if args.use_mixed_precision:
        # Initialize scaler in global scale
        scaler = torch.cuda.amp.GradScaler()

    for epoch in range(1, args.epochs + 1):
        if args.use_mixed_precision:
            train_mixed_precision(epoch, scaler)
        else:
            train_epoch(epoch)
        # Keep test in full precision since computation is relatively light.
        test()


if __name__ == '__main__':
    import sys
    args = parse_args(sys.argv[1:])
    args.cuda = not args.no_cuda and torch.cuda.is_available()
    if args.num_proc:
        # run training through horovod.run
        print('Running training through horovod.run')
        horovod.run(main,
                    args=(args,),
                    np=args.num_proc,
                    hosts=args.hosts,
                    use_gloo=args.communication == 'gloo',
                    use_mpi=args.communication == 'mpi')
    else:
        # this is running via horovodrun
        main(args)

davidxiaozhi avatar Aug 04 '22 07:08 davidxiaozhi

hey @davidxiaozhi , not too familiar with horovod, but when using pure pytorch usually you need to call a dist.init_process_group(backend=backend), could you try explicitly calling this?

Is this something that horovod should be doing on the backend?

cc @colin2328 if you know who knows about horovod

YLGH avatar Aug 05 '22 16:08 YLGH

@YLGH en, use horovod don't need call dist.init_process_group(backend=backend),

davidxiaozhi avatar Aug 13 '22 07:08 davidxiaozhi

@davidxiaozhi I"m not so familiar with horovod, but my understanding is that it does not use pytorch distributed (https://pytorch.org/docs/stable/distributed.html) library and does the collective / p2p comms itself.

torchrec is built on pytorch distributed. It should be possible to extend to use horovod instead, but is not on our roadmap.

You're welcome to submit a PR to do so. we'd also be interested in understanding your use case (is there a reason you want to use horovod over torch distributed?)

colin2328 avatar Sep 08 '22 00:09 colin2328

closing due to lack of engagement, @davidxiaozhi feel free ot reopen or follow up about horovod integration if you are still itnerested

colin2328 avatar Nov 23 '22 21:11 colin2328