torchrec
torchrec copied to clipboard
use horovod error
when I use horovod to run deepFM, it error
File "/home/maer/zhipeng.li/project/torch_rec_demo/torchrec_ctr/run_horovod_deepfm.py", line 451, in
the code
import argparse
import os
from distutils.version import LooseVersion
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from filelock import FileLock
from torchvision import datasets, transforms
import torchrec
import horovod
import horovod.torch as hvd
from torchrec import EmbeddingBagCollection
from torchrec.datasets.utils import Batch
from torchrec.distributed import TrainPipelineSparseDist
from torchrec.distributed.embeddingbag import EmbeddingBagCollectionSharder
from torchrec.distributed.model_parallel import DistributedModelParallel
from torchrec.distributed.types import ModuleSharder
from torchrec.models.dlrm import DLRM, DLRMV2, DLRMTrain
from torchrec.models.deepfm import DeepFM,SimpleDeepFMNN
from torchrec.modules.embedding_configs import EmbeddingBagConfig
from torchrec.optim.keyed import CombinedOptimizer, KeyedOptimizerWrapper
from typing import cast, Iterator, List, Optional, Tuple
# Training settings
def parse_args(argv: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42, metavar='S',
help='random seed (default: 42)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
parser.add_argument('--use-mixed-precision', action='store_true', default=False,
help='use mixed precision for training')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
help='apply gradient predivide factor in optimizer (default: 1.0)')
parser.add_argument('--data-dir',
help='location of the training dataset in the local filesystem (will be downloaded if needed)')
# DMP config
parser.add_argument(
"--num_embeddings",
type=int,
default=100_000,
help="max_ind_size. The number of embeddings in each embedding table. Defaults"
" to 100_000 if num_embeddings_per_feature is not supplied.",
)
parser.add_argument(
"--num_embeddings_per_feature",
type=str,
default=None,
help="Comma separated max_ind_size per sparse feature. The number of embeddings"
" in each embedding table. 26 values are expected for the Criteo dataset.",
)
parser.add_argument(
"--embedding_dim",
type=int,
default=64,
help="Size of each embedding.",
)
# Arguments when not run through horovodrun
parser.add_argument('--num-proc', type=int)
parser.add_argument('--hosts', help='hosts to run on in notation: hostname:slots[,host2:slots[,...]]')
parser.add_argument('--communication', help='collaborative communication to use: gloo, mpi')
return parser.parse_args(argv)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
class CTRModelTrain(nn.Module):
"""
nn.Module to wrap DLRM model to use with train_pipeline.
dlrm_model = DLRMTrain(dlrm_module)
"""
def __init__(
self,
ctr_module: SimpleDeepFMNN,
) -> None:
super().__init__()
self.model = ctr_module
self.loss_fn: nn.Module = nn.BCEWithLogitsLoss()
def forward(
self, batch: Batch
) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor]]:
"""
Args:
batch: batch used with criteo and random data from torchrec.datasets
Returns:
Tuple[loss, Tuple[loss, logits, labels]]
"""
logits = self.model(batch.dense_features, batch.sparse_features)
logits = logits.squeeze()
loss = self.loss_fn(logits, batch.labels.float())
return loss, (loss.detach(), logits.detach(), batch.labels.detach())
def FunctionName(args,backend):
from data.ctr_dataloader import get_dataloader, STAGES
# TODO add CriteoIterDataPipe support and add random_dataloader arg
train_dataloader = get_dataloader(args, backend, "train")
val_dataloader = get_dataloader(args, backend, "val")
test_dataloader = get_dataloader(args, backend, "test")
def init_model(device=torch.device("cpu")):
print("start init the DMP for embedding")
# 1.the embedding for EmbeddingBag
from pyre_extensions import none_throws
eb_configs = [
EmbeddingBagConfig(
name=f"t_{feature_name}",
embedding_dim=args.embedding_dim,
num_embeddings=none_throws(args.num_embeddings_per_feature)[feature_idx]
if args.num_embeddings is None
else args.num_embeddings,
feature_names=[feature_name],
)
for feature_idx, feature_name in enumerate(torchrec.datasets.criteo.DEFAULT_CAT_NAMES)
]
ebc = EmbeddingBagCollection(
tables=eb_configs,
device=torch.device("meta")
)
# 2.build the model
deepfm_model = SimpleDeepFMNN(num_dense_features=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES),
embedding_bag_collection=ebc, hidden_layer_size=1024, deep_fm_dimension=16 )
train_model = CTRModelTrain(deepfm_model)
#print the model for train
print("the train model struct:",train_model)
# 3.config and build dmp model
from fbgemm_gpu.split_embedding_configs import EmbOptimType as OptimType
fused_params = {
"learning_rate": args.lr,
"optimizer": OptimType.EXACT_ROWWISE_ADAGRAD
# if args.adagrad
# else OptimType.EXACT_SGD,
}
sharders = [
EmbeddingBagCollectionSharder(fused_params=fused_params),
]
dmp_model = DistributedModelParallel(
module=train_model,
init_data_parallel=False,
device=device,
sharders=cast(List[ModuleSharder[nn.Module]], sharders),
)
return dmp_model
#the dmp combine optimizer
def optimizer_with_params(model:DistributedModelParallel ):
def optimizer_by_params():
if args.adagrad:
return lambda params: torch.optim.Adagrad(params, lr=args.lr)
else:
return lambda params: torch.optim.SGD(params, lr=args.lr)
dense_optimizer = KeyedOptimizerWrapper(
dict(model.named_parameters()),
optimizer_with_params(),
)
optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])
return optimizer
def main(args):
print("start main ....")
def train_mixed_precision(epoch, scaler):
model.train()
# Horovod: set epoch to sampler for shuffling.
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
with torch.cuda.amp.autocast():
output = model(data)
loss = F.nll_loss(output, target)
scaler.scale(loss).backward()
# Make sure all async allreduces are done
optimizer.synchronize()
# In-place unscaling of all gradients before weights update
scaler.unscale_(optimizer)
with optimizer.skip_synchronize():
scaler.step(optimizer)
# Update scaler in case of overflow/underflow
scaler.update()
if batch_idx % args.log_interval == 0:
# Horovod: use train_sampler to determine the number of examples in
# this worker's partition.
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tLoss Scale: {}'.format(
epoch, batch_idx * len(data), len(train_sampler),
100. * batch_idx / len(train_loader), loss.item(), scaler.get_scale()))
def train_epoch(epoch):
model.train()
# Horovod: set epoch to sampler for shuffling.
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
# Horovod: use train_sampler to determine the number of examples in
# this worker's partition.
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_sampler),
100. * batch_idx / len(train_loader), loss.item()))
def metric_average(val, name):
tensor = torch.tensor(val)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
def test():
model.eval()
test_loss = 0.
test_accuracy = 0.
for data, target in test_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, size_average=False).item()
# get the index of the max log-probability
pred = output.data.max(1, keepdim=True)[1]
test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()
# Horovod: use test_sampler to determine the number of examples in
# this worker's partition.
test_loss /= len(test_sampler)
test_accuracy /= len(test_sampler)
# Horovod: average metric values across workers.
test_loss = metric_average(test_loss, 'avg_loss')
test_accuracy = metric_average(test_accuracy, 'avg_accuracy')
# Horovod: print output only on first rank.
if hvd.rank() == 0:
print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
test_loss, 100. * test_accuracy))
# Horovod: initialize library.
hvd.init()
torch.manual_seed(args.seed)
print("the args.num_proc:",str(args.num_proc)," rank:"+str(hvd.rank())," size:"+str(hvd.size())+" !!!")
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
else:
if args.use_mixed_precision:
raise ValueError("Mixed precision is only supported with cuda enabled.")
if (args.use_mixed_precision and LooseVersion(torch.__version__)
< LooseVersion('1.6.0')):
raise ValueError("""Mixed precision is using torch.cuda.amp.autocast(),
which requires torch >= 1.6.0""")
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(1)
kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
# When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
# issues with Infiniband implementations that are not fork-safe
if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
kwargs['multiprocessing_context'] = 'forkserver'
data_dir = args.data_dir or './data'
with FileLock(os.path.expanduser("~/.horovod_lock")):
# train_dataset = \
# datasets.MNIST(data_dir, train=True, download=True,
# transform=transforms.Compose([
# transforms.ToTensor(),
# transforms.Normalize((0.1307,), (0.3081,))
# ]))
train_dataset=torchrec.datasets.random.RandomRecDataset(
keys=torchrec.datasets.criteo.DEFAULT_CAT_NAMES,
batch_size=args.batch_size,
hash_size=args.num_embeddings,
hash_sizes=args.num_embeddings_per_feature
if hasattr(args, "num_embeddings_per_feature")
else None,
manual_seed=args.seed if hasattr(args, "seed") else None,
ids_per_feature=1,
num_dense=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES),
num_generated_batches=1000,
),
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
# train_loader = torch.utils.data.DataLoader(
# train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
train_loader = torch.utils.data.DataLoader(
train_dataset,batch_size=args.batch_size, sampler=train_sampler, **kwargs
)
# test_dataset = \
# datasets.MNIST(data_dir, train=False, transform=transforms.Compose([
# transforms.ToTensor(),
# transforms.Normalize((0.1307,), (0.3081,))
# ]))
test_dataset=torchrec.datasets.random.RandomRecDataset(
keys=torchrec.datasets.criteo.DEFAULT_CAT_NAMES,
batch_size=args.batch_size,
hash_size=args.num_embeddings,
hash_sizes=args.num_embeddings_per_feature
if hasattr(args, "num_embeddings_per_feature")
else None,
manual_seed=args.seed if hasattr(args, "seed") else None,
ids_per_feature=1,
num_dense=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES),
),
# Horovod: use DistributedSampler to partition the test data.
test_sampler = torch.utils.data.distributed.DistributedSampler(
test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
sampler=test_sampler, **kwargs)
if args.cuda and torch.cuda.is_available():
device: torch.device = torch.device(f"cuda:{hvd.rank()}")
else:
device: torch.device = torch.device("cpu")
#model = Net()
model = init_model(device=device)
#print the plan for DMP model
print("print the plan for dmp module:")
print(model.plan)
# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1
if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = hvd.local_size()
# Horovod: scale learning rate by lr_scaler.
# optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
# momentum=args.momentum)
optimizer = optim.Adagrad(model.parameters(), lr=args.lr * lr_scaler)
dense_optimizer = KeyedOptimizerWrapper(
dict(model.named_parameters()),
optimizer,
)
combine_optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(combine_optimizer, root_rank=0)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(combine_optimizer,
named_parameters=model.named_parameters(),
compression=compression,
op=hvd.Adasum if args.use_adasum else hvd.Average,
gradient_predivide_factor=args.gradient_predivide_factor)
if args.use_mixed_precision:
# Initialize scaler in global scale
scaler = torch.cuda.amp.GradScaler()
for epoch in range(1, args.epochs + 1):
if args.use_mixed_precision:
train_mixed_precision(epoch, scaler)
else:
train_epoch(epoch)
# Keep test in full precision since computation is relatively light.
test()
if __name__ == '__main__':
import sys
args = parse_args(sys.argv[1:])
args.cuda = not args.no_cuda and torch.cuda.is_available()
if args.num_proc:
# run training through horovod.run
print('Running training through horovod.run')
horovod.run(main,
args=(args,),
np=args.num_proc,
hosts=args.hosts,
use_gloo=args.communication == 'gloo',
use_mpi=args.communication == 'mpi')
else:
# this is running via horovodrun
main(args)
hey @davidxiaozhi , not too familiar with horovod, but when using pure pytorch usually you need to call a dist.init_process_group(backend=backend), could you try explicitly calling this?
Is this something that horovod should be doing on the backend?
cc @colin2328 if you know who knows about horovod
@YLGH en, use horovod don't need call dist.init_process_group(backend=backend),
@davidxiaozhi I"m not so familiar with horovod, but my understanding is that it does not use pytorch distributed (https://pytorch.org/docs/stable/distributed.html) library and does the collective / p2p comms itself.
torchrec is built on pytorch distributed. It should be possible to extend to use horovod instead, but is not on our roadmap.
You're welcome to submit a PR to do so. we'd also be interested in understanding your use case (is there a reason you want to use horovod over torch distributed?)
closing due to lack of engagement, @davidxiaozhi feel free ot reopen or follow up about horovod integration if you are still itnerested