pytorch_geometric
pytorch_geometric copied to clipboard
Using `DistributedDataParallel` with PyG raises `TypeError: default_collate: batch must contain ...; found <class 'torch_geometric.data.data.Data'>`
🚀 The feature, motivation and pitch
I did not see an example for this, but I am trying to run a PyG model using pytorch DDP with nccl as in this example multigpu But if I pass a PyG dataloader instead of the Dataloader from torch.util.data, I get the following error TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'torch_geometric.data.data.Data'> in line 104
Any help on how this can be done is much appreciated. I am running a pretty large number of graphs so I need the DDP for performance.
Alternatives
No response
Additional context
No response
@Sayan-m90 Thank you for reporting this issue. It'd be easier for us to comment or investigate this further if you could share your full code, error message and env details.
from torch_geometric.datasets import Planetoid
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
import torch
import os
from floe.api import ComputeCube, DecimalParameter, BooleanParameter, IntegerParameter
from orionplatform import RecordPortsMixin
import os.path as osp
from typing import List, Dict, Tuple
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from torch_geometric.datasets import QM7b
import torch.nn.functional as F
from torch_geometric.loader import DataLoader as GDL
from torch.utils.data.distributed import DistributedSampler
from ogb.graphproppred import PygGraphPropPredDataset as OGBDataset
from torch.distributed import init_process_group
import torch.multiprocessing as mp
from torch.distributed import destroy_process_group
from .helper_files import get_open_port
import random
import torch_geometric.transforms as T
class CustomPairDataset(Dataset):
def __init__(self, udata: List[Tuple[Data, Data, float]]):
self.udata = udata
def __len__(self):
return len(self.udata)
def __getitem__(self, idx):
print('self udata', self.udata[idx])
return self.udata[idx][0], self.udata[idx][1], self.udata[idx][2]
def ddp_setup(rank, world_size, freeport):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
freeport:
"""
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = f"{freeport}"
# os.environ["NCCL_DEBUG"] = "INFO"
init_process_group(backend="nccl", rank=rank, world_size=world_size, )
torch.cuda.set_device(rank)
class GCN(torch.nn.Module):
def __init__(self, num_features, num_classes):
super().__init__()
self.conv1 = GCNConv(num_features, 16)
self.conv2 = GCNConv(16, num_classes)
def forward(self, x, edge_index):
x = F.relu(self.conv1(x, edge_index))
x = F.dropout(x, training=self.training)
x = self.conv2(x, edge_index)
return F.log_softmax(x, dim=1)
def nccltrainer(rank: int, world_size: int, epochs: int, batch_size: int,
num_workers: int, freeport: int,
model: torch.nn.Module, optimizer: torch.optim.Optimizer, verbose: bool,
train_pair_graph: Dataset, test_pair_graph: Dataset, ):
print('inside nccl ', rank, world_size, train_pair_graph)
ddp_setup(rank, world_size, freeport)
ptr = None
if train_pair_graph is not None:
ptr = GDL(train_pair_graph, batch_size=batch_size, sampler=DistributedSampler(train_pair_graph)) # , \
trainer = Trainer(model, ptr, None, optimizer, gpu_id=rank, batch_size=batch_size)
trainer.train(epochs)
trainer.train(epochs)
destroy_process_group()
torch.cuda.empty_cache()
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
test_data: None,
optimizer: torch.optim.Optimizer,
gpu_id: int,
batch_size: int,
) -> None:
self.gpu_id = gpu_id
model.to(torch.double)
self.train_data = train_data
# self.test_data = test_data
self.optimizer = optimizer
self.model = model
self.model.to(gpu_id)
print('moved model to gpu ', gpu_id)
self.model = DDP(self.model, device_ids=[self.gpu_id], ) # find_unused_parameters=False,
# self.loss_criterion = loss_criterion
self.batch_size = batch_size
self.train_loss, self.train_mae, self.train_mse = [], [], []
self.val_loss, self.val_mae, self.val_mse = [], [], []
print('set init for trainer ', self.model, self.gpu_id)
def train(self, max_epochs: int):
print('inside train')
for epoch in range(0, max_epochs):
self.model.train()
print(f'epoch {epoch} pre batch')
for batch, batch1, y in self.train_data:
print(f'epoch {epoch}, batch {batch}')
self.optimizer.zero_grad()
out = self.model(batch.x, batch.edge_index)
print('model out ', out)
out = out[:batch.batch_size]
print('trimmed out ', out)
loss = F.nll_loss(out, batch.y[:batch.batch_size].long())
loss.backward()
self.optimizer.step()
def loadSingle_PytorchGeom():
# dataset = 'Cora'
# path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', 'Planetoid')
# dataset = Planetoid(path, dataset)
# train_loader = dataset[0]
# test_loader = dataset[0]
# print('found data ', train_loader, '\n dataset ', dataset, test_loader)
dataset_name = 'ogbg-molbace'
root = './data/ogbg-molbace'
# Download and process the dataset on main process.
dataset = OGBDataset(dataset_name, root, )
print('dataset ', dataset, dataset[0], len(dataset))
print('dataset1 ', dataset[1])
# for data in dataset:
return dataset
def loadRandom_PytorchGeom(num_nodes, features_dim, edge_dim, num_edges, nummols):
udata = []
for i in range(nummols):
data1 = Data(
x=torch.rand(num_nodes, features_dim),
edge_index=torch.randint(0, num_nodes, (2, num_edges)),
edge_attr=torch.rand(num_nodes, edge_dim)
)
data2 = Data(
x=torch.rand(num_nodes, features_dim),
edge_index=torch.randint(0, num_nodes, (2, num_edges)),
edge_attr=torch.rand(num_nodes, edge_dim)
)
udata.append([data1, data2, random.uniform(0, 1)])
return udata
def begin(self):
dataset_pyg = loadSingle_PytorchGeom()
num_nodes, features_dim, edge_dim, num_edges = 10, 6, 3, 10
batch_size, epoch, verbose = 64, 2, True
dataset = loadRandom_PytorchGeom(num_nodes, features_dim, edge_dim, num_edges, batch_size)
datasetCl = CustomPairDataset(dataset)
model = GCNConv(features_dim, 2)
optimizer = torch.optim.Adam(model.parameters(),
lr=0.001)
world_size = torch.cuda.device_count()
free_port = get_open_port()
torch.cuda.mem_get_info()
print('world size ', world_size)
print('out of trained model with data')
try:
if optionforspawn == 0:
mp.spawn(nccltrainer, args=(world_size, epoch, batch_size,
1, free_port,
model, optimizer, verbose,
datasetCl, None), nprocs=world_size)
elif optionforspawn == 1:
mp.spawn(nccltrainer, args=(world_size, epoch, batch_size,
1, free_port,
model, optimizer, verbose,
dataset_pyg, None), nprocs=world_size)
elif optionforspawn == 2:
mp.spawn(nccltrainer, args=(world_size, epoch, batch_size,
1, free_port,
model, optimizer, verbose,
None, None), nprocs=world_size)
except Exception as ve:
print('caught value error for spawn Dataset ', ve)
if __name__ == '__main__':
begin()
Here is a snippet apologies for the delay. My only concern is loading the data in a multi GPU DDP setup. In this regard, this is where I am stuck:
- My data is in the form of graph pairs and am saving them as a simple list (def loadRandom_PytorchGeom), Is there a way to create a custom class for torchgeometric dataloader to load from
- The Dataloader for Pytorch Geometric which I use as GDL does not seem to have a num_worker option like the torch counterpart, is there any way to use multiple cpus to speed up this loading process?
Sorry, I have a hard time to understand your example. Where are you passing the data loader in the first-place? It doesn't seem to be used anywhere.
ahh I removed that line while trying to remove some commented out code. It should make sense now,using it right after I load the dataloader
I am getting ValueError: bad value(s) in fds_to_keep
when running your script.
let me try re-running, can you tell me the line number/reference line? I don't see fds_to_keep
in the code.
If its from the loadSingle_PytorchGeom function, I am not using it for this run
Got it. For datasetCl
, the script works for me (except for some dtype
issue since model
is moved to torch.double
). Which PyTorch lightning version are you using?
I am not using pytorch lightning for my code. I can if it is recommended. I was looking to optimize the dataloading part since i have a large amount of data and that is my current bottleneck. Here are my torchgeometric and pytorch versions:
torch-geometric==2.3.1
torch==2.1.1
I am not using pytorch lightning for my code.
Yeah, sorry. I misread. Since I am not able to reproduce, I am not so sure what's the issue here. Can you confirm that it is picking up the PyG DataLoader
rather than the one coming from torch.utils.data.DataLoader
?
yes indeed! here are the relevant lines:
from torch_geometric.loader import DataLoader as GDL
ptr = GDL(train_pair_graph, batch_size=batch_size, sampler=DistributedSampler(train_pair_graph))
so far my train_pair_graph is a list. it works for length 1000 but I have >5000 elements in train_pair_graph, am getting an error:
RuntimeError: unable to mmap 1480 bytes from file </torch_1_1816571983_62963>: Cannot allocate memory (12)
Please help with the dataloading, I thought the code ran on your end, what seems to be the issue now?
This looks like you are running out of shared memory.