Errors when converting a Dali tensor to a pytoch tensor
Hi, guys. I tried to combine DALI with the torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook) API to speed up the offloading and prefetching of intermediate feature maps to SSDs. I converted the Pytorch tensor to numpy for storage on the SSD during forward propagation, and used pipe_gds() to fetch back to the GPU during backward propagation, then completed the DALI Tensor to Pytorch via nvidia.dali.plugin.pytorch.feed_ndarray() Tensor conversion. When executing the feature map generated by the convolution layer, some errors occur and the error output is as follows.
(wjpytorch) root@li:~/wj# python test_with_hook_gds.py
Files already downloaded and verified
train_data_size:50000
cuda:0
Number of parameter: 0.14 M
Memory of parameter: 0.56 M
--------------The 1 training begins----------
inputimages type is torch.Size([64, 10])
inputimages type is torch.Size([2560, 64])
successfully free 7d4e1136-ed45-4a2c-9afd-ad109689260b.npy
successfully free 7c970ca2-82b1-43f2-87ac-4df69e35a576.npy
inputimages type is torch.Size([1024, 64])
inputimages type is torch.Size([2560, 1024])
successfully free 4375fe98-b486-418e-b70e-74e80de8cf99.npy
successfully free bf5766bb-3f17-4df0-a7c8-5fe7d17e0c3b.npy
inputimages type is torch.Size([2560, 64, 8, 8])
Traceback (most recent call last):
File "/root/wj/test_with_hook_gds.py", line 123, in <module>
loss.backward()
File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/torch/_tensor.py", line 363, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph, inputs=inputs)
File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/torch/autograd/__init__.py", line 173, in backward
Variable._execution_engine.run_backward( # Calls into the C++ engine to run the backward pass
RuntimeError: AssertionError: The element type of DALI Tensor/TensorList doesn't match the element type of the target PyTorch Tensor: torch.int64 vs torch.float32
At:
/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/plugin/pytorch.py(55): feed_ndarray
/root/wj/test_with_hook_gds.py(59): unpack_hook
I'm not sure if this is due to DALI or the Pytorch API, when I use torch.load() directly to read a Pytorch tensor file, no errors occur. Could you give me some suggestions for adjustments?
The reproducible code is as follows:
import os
import time
import uuid
import inspect
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torchvision
from torch import cuda
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
from torch.utils.data import DataLoader
device=torch.device('cuda:0')
#pynvml.nvmlInit()
#frame = inspect.currentframe()
#gpu_tracker = MemTracker(frame)
train_data=torchvision.datasets.CIFAR10(root='./data',train=True,transform=torchvision.transforms.ToTensor(),download=True)
#test_data=torchvision.datasets.CIFAR10(root='./data',train=False,transform=torchvision.transforms.ToTensor(),download=True)
train_data_size=len(train_data)
#test_data_size=len(test_data)
print("train_data_size:{}".format(train_data_size))
#print("test_data_size:{}".format(test_data_size))
train_dataloader=DataLoader(train_data,batch_size=2560)
#test_dataloader=DataLoader(test_data,batch_size=128)
'''hook'''
class SelfDeletingTempFile():
def __init__(self):
self.name=os.path.join("",str(uuid.uuid4()))
def __del__(self):
freefilename = self.name +'.npy'
os.remove(freefilename)
print("successfully free " + freefilename)
@pipeline_def(batch_size=1, num_threads=8, device_id=0)
def pipe_gds(filename):
data = fn.readers.numpy(device='gpu',file_root='.', files=filename)
return data
def pack_hook(tensor):
temp_file=SelfDeletingTempFile()
tensorshape = tensor.shape
Inputnumpy = tensor.cpu().numpy()
np.save(temp_file.name,Inputnumpy)
file = [temp_file, tensorshape]
return file
def unpack_hook(file):
Inputimages = torch.zeros(file[1]).to(device)
p = pipe_gds(filename=(file[0].name+'.npy'))
p.build()
pipe_out = p.run()
nvidia.dali.plugin.pytorch.feed_ndarray(pipe_out[0][0], Inputimages)
print("inputimages type is",Inputimages.shape)
return Inputimages
""" Network architecture. """
class mymodel(nn.Module):
def __init__(self):
super(mymodel, self).__init__()
self.model1=nn.Sequential(
nn.Conv2d(3, 32, 5, padding=2),
nn.MaxPool2d(2),
nn.Conv2d(32, 32, 5, padding=2),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 5, padding=2),
nn.MaxPool2d(2),
nn.Flatten(), # 展平
nn.Linear(64 * 4 * 4, 64),
nn.Linear(64, 10),
)
def forward(self, x): # input:32*32*3
with torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook):
x=self.model1(x)
return x
net1=mymodel()
net1=net1.to(device)
loss_fn=nn.CrossEntropyLoss()
loss_fn=loss_fn.to(device)
optimizer=torch.optim.SGD(net1.parameters(),lr=0.1)
total_train_step=0
total_test_step=0
epoch=20
print(next(net1.parameters()).device)
total = sum([param.nelement() for param in net1.parameters()])
print("Number of parameter: %.2f M" % (total/1024/1024))
print("Memory of parameter: %.2f M " % (cuda.memory_allocated()/1024/1024))
'''start training'''
totalbegin=time.time()
#gpu_tracker.track()
for i in range(epoch):
print("--------------The {} training begins----------".format(i+1))
running_loss=0
running_correct=0
begin=time.time()
for data in train_dataloader:
images,targets=data
#print(images.device)
images=images.to(device)
targets=targets.to(device)
outputs=net1(images)
loss=loss_fn(outputs,targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss+=loss.item()
running_correct += (outputs.argmax(1) == targets).sum()
total_train_step+=1
#if total_train_step%100==0:
# print("number of training:{},loss:{}".format(total_train_step,loss))
end = time.time()
print("spend time: ",(end-begin)/60)
print("epoch:{}, loss:{}, accuracy:{}".format(i+1,running_loss/train_data_size,running_correct/train_data_size))
totalend=time.time()
print("total real runtime: ", (totalend - totalbegin) / 60)
#gpu_tracker.track()
print("gpu memory allocated: %2.f M " % (cuda.memory_allocated()/1024/1024))
My GPU is Nvidia P100, and my Pytorch is 1.11.0+cu113. My Dali version is 1.14.

Hi @Weigaa,
The error tells that the Torch tensor and DALI output used in feed_ndarray types don't match. DALI returns int64 while torch.zeros creates float32.
Your code should do something like:
to_torch_type = {
types.DALIDataType.FLOAT : torch.float32,
types.DALIDataType.FLOAT64 : torch.float64,
types.DALIDataType.FLOAT16 : torch.float16,
types.DALIDataType.UINT8 : torch.uint8,
types.DALIDataType.INT8 : torch.int8,
types.DALIDataType.INT16 : torch.int16,
types.DALIDataType.INT32 : torch.int32,
types.DALIDataType.INT64 : torch.int64
}
dali_tensor = pipe_out[0][0]
torch_type = to_torch_type[dali_tensor.dtype]
Inputimages = torch.zeros(file[1], dtype=torch_type).to(device)
nvidia.dali.plugin.pytorch.feed_ndarray(dali_tensor, Inputimages)
@Januszl
Thank you very much.
After adding import nvidia.dali.types as types and your suggested code, my code can run fine.
I noticed that the tensor I stored earlier are torch.cuda.FloatTensor and torch.cuda.LongTensor, can I directly use nvidia.dali.plugin.pytorch.feed_ndarray() change DALIDataType.INT64 to torch.cuda.LongTensor and DALIDataType.FLOAT to
torch.cuda.FloatTensor ?
I think the best would be to use:
Inputimages = torch.empty(file[1], dtype=torch_type, device=device)
and keep types as proposed. According to https://pytorch.org/docs/stable/tensors.html torch.cuda.LongTensor is the TensorType while the torch.int64 is the dtype expected used to allocate an empty tensor.
@JanuszL I use Inputimages = torch.empty(file[1], dtype=torch_type, device=device) to replace Inputimages = torch.zeros(file[1], dtype=torch_type).to(device).
Unfortunately, Then, I used DALI and torch.load() to read the feature map from SSD to GPU respectively, and I found that DALI with GDS was much slower than torch.load() by the time.time() function to count the time, which seems abnormal, what is the reason for this?
The torch.save && torch.load code is:
import os
import time
import uuid
import inspect
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch import cuda
from torch.utils.data import DataLoader
#from gpu_mem_track import MemTracker
device=torch.device('cuda:0')
train_data=torchvision.datasets.CIFAR10(root='./data',train=True,transform=torchvision.transforms.ToTensor(),download=True)
train_data_size=len(train_data)
#test_data_size=len(test_data)
print("train_data_size:{}".format(train_data_size))
train_dataloader=DataLoader(train_data,batch_size=2560)
'''hook'''
class SelfDeletingTempFile():
def __init__(self):
self.name=os.path.join("./",str(uuid.uuid4()))
def __del__(self):
os.remove(self.name)
def pack_hook(tensor):
temp_file=SelfDeletingTempFile()
begin = time.time()
torch.save(tensor,temp_file.name)
end = time.time()
print(tensor.shape,"offload time is", end - begin)
return temp_file
def unpack_hook(temp_file):
begin = time.time()
tensor = torch.load(temp_file.name)
end = time.time()
print(tensor.shape, "load time is", end - begin)
return tensor
""" Network architecture. """
class mymodel(nn.Module):
def __init__(self):
super(mymodel, self).__init__()
self.model1=nn.Sequential(
nn.Conv2d(3, 32, 5, padding=2),
nn.MaxPool2d(2),
nn.Conv2d(32, 32, 5, padding=2),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 5, padding=2),
nn.MaxPool2d(2),
nn.Flatten(), # 展平
nn.Linear(64 * 4 * 4, 64),
nn.Linear(64, 10),
)
def forward(self, x): # input:32*32*3
with torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook):
x=self.model1(x)
return x
net1=mymodel()
net1=net1.to(device)
loss_fn=nn.CrossEntropyLoss()
loss_fn=loss_fn.to(device)
optimizer=torch.optim.SGD(net1.parameters(),lr=0.1)
total_train_step=0
total_test_step=0
epoch=1
print(next(net1.parameters()).device)
total = sum([param.nelement() for param in net1.parameters()])
print("Number of parameter: %.2f M" % (total/1024/1024))
print("Memory of parameter: %.2f M " % (cuda.memory_allocated()/1024/1024))
'''start training'''
totalbegin=time.time()
#gpu_tracker.track()
for i in range(epoch):
print("--------------The {} training begins----------".format(i+1))
running_loss=0
running_correct=0
begin=time.time()
j = 0
if (j > 0):
break
for data in train_dataloader:
if (j > 0):
break
images,targets=data
#print(images.device)
images=images.to(device)
targets=targets.to(device)
outputs=net1(images)
loss=loss_fn(outputs,targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss+=loss.item()
running_correct += (outputs.argmax(1) == targets).sum()
total_train_step+=1
j += 1
#if total_train_step%100==0:
# print("number of training:{},loss:{}".format(total_train_step,loss))
end = time.time()
print("spend time: ",(end-begin)/60)
print("epoch:{}, loss:{}, accuracy:{}".format(i+1,running_loss/train_data_size,running_correct/train_data_size))
totalend=time.time()
print("total real runtime: ", (totalend - totalbegin) / 60)
The DALI code is:
import os
import time
import uuid
import inspect
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torchvision
from torch import cuda
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
from torch.utils.data import DataLoader
import nvidia.dali.types as types
device=torch.device('cuda:0')
#pynvml.nvmlInit()
#frame = inspect.currentframe()
#gpu_tracker = MemTracker(frame)
train_data=torchvision.datasets.CIFAR10(root='./data',train=True,transform=torchvision.transforms.ToTensor(),download=True)
#test_data=torchvision.datasets.CIFAR10(root='./data',train=False,transform=torchvision.transforms.ToTensor(),download=True)
train_data_size=len(train_data)
#test_data_size=len(test_data)
print("train_data_size:{}".format(train_data_size))
#print("test_data_size:{}".format(test_data_size))
train_dataloader=DataLoader(train_data,batch_size=2560)
#test_dataloader=DataLoader(test_data,batch_size=128)
'''hook'''
class SelfDeletingTempFile():
def __init__(self):
self.name=os.path.join("",str(uuid.uuid4()))
def __del__(self):
freefilename = self.name +'.npy'
os.remove(freefilename)
# print("successfully free " + freefilename)
@pipeline_def(batch_size=1, num_threads=8, device_id=0)
def pipe_gds(filename):
data = fn.readers.numpy(device='gpu',file_root='.', files=filename)
return data
def pack_hook(tensor):
temp_file=SelfDeletingTempFile()
begin = time.time()
tensorshape = tensor.shape
# print("save tensor type is", tensor.type(), "shape is",tensorshape )
Inputnumpy = tensor.cpu().numpy()
np.save(temp_file.name,Inputnumpy)
file = [temp_file, tensorshape]
end = time.time()
print(tensor.shape,"offload time is", end - begin)
return file
def unpack_hook(file):
begin = time.time()
begin2 = time.time()
p = pipe_gds(filename=(file[0].name+'.npy'))
p.build()
pipe_out = p.run()
end2 = time.time()
to_torch_type = {
types.DALIDataType.FLOAT: torch.float32,
types.DALIDataType.FLOAT64: torch.float64,
types.DALIDataType.FLOAT16: torch.float16,
types.DALIDataType.UINT8: torch.uint8,
types.DALIDataType.INT8: torch.int8,
types.DALIDataType.INT16: torch.int16,
types.DALIDataType.INT32: torch.int32,
types.DALIDataType.INT64: torch.int64
}
dali_tensor = pipe_out[0][0]
# print(dali_tensor.dtype)
torch_type = to_torch_type[dali_tensor.dtype]
Inputimages = torch.empty(file[1], dtype=torch_type).to(device)
nvidia.dali.plugin.pytorch.feed_ndarray(dali_tensor, Inputimages)
end = time.time()
print(Inputimages.shape, "pure load time is", end2 - begin2)
print(Inputimages.shape, "load time is", end - begin)
# print("inputimages type is",Inputimages.shape)
return Inputimages
""" Network architecture. """
class mymodel(nn.Module):
def __init__(self):
super(mymodel, self).__init__()
self.model1=nn.Sequential(
nn.Conv2d(3, 32, 5, padding=2),
nn.MaxPool2d(2),
nn.Conv2d(32, 32, 5, padding=2),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 5, padding=2),
nn.MaxPool2d(2),
nn.Flatten(), # 展平
nn.Linear(64 * 4 * 4, 64),
nn.Linear(64, 10),
)
def forward(self, x): # input:32*32*3
with torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook):
x=self.model1(x)
return x
net1=mymodel()
net1=net1.to(device)
loss_fn=nn.CrossEntropyLoss()
loss_fn=loss_fn.to(device)
optimizer=torch.optim.SGD(net1.parameters(),lr=0.1)
total_train_step=0
total_test_step=0
epoch=1
print(next(net1.parameters()).device)
total = sum([param.nelement() for param in net1.parameters()])
print("Number of parameter: %.2f M" % (total/1024/1024))
print("Memory of parameter: %.2f M " % (cuda.memory_allocated()/1024/1024))
'''start training'''
totalbegin=time.time()
#gpu_tracker.track()
for i in range(epoch):
print("--------------The {} training begins----------".format(i+1))
running_loss=0
running_correct=0
begin=time.time()
j = 0
if (j > 0):
break
for data in train_dataloader:
if (j > 0):
break
images,targets=data
#print(images.device)
images=images.to(device)
targets=targets.to(device)
outputs=net1(images)
loss=loss_fn(outputs,targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss+=loss.item()
running_correct += (outputs.argmax(1) == targets).sum()
total_train_step+=1
j += 1
#if total_train_step%100==0:
# print("number of training:{},loss:{}".format(total_train_step,loss))
end = time.time()
print("spend time: ",(end-begin)/60)
print("epoch:{}, loss:{}, accuracy:{}".format(i+1,running_loss/train_data_size,running_correct/train_data_size))
totalend=time.time()
print("total real runtime: ", (totalend - totalbegin) / 60)
#gpu_tracker.track()
print("gpu memory allocated: %2.f M " % (cuda.memory_allocated()/1024/1024))
We noticed that p.build() code occupied almost 97% load time via DALI, does it mean that DALI has a serious pipeline construction overhead (start-up overhead) and is there a way we can avoid this?
Hi @Weigaa,
DALI and GDS that DALI uses inside the numpy reader for the GPU have noticeable construction time overhead. DALI is not optimized for frequent pipeline recreation but the fast execution once the flow is defined. If your goal is to load the files from the drive to the GPU memory without any processing DALI maybe not be the best choice. You may consider trying out https://github.com/rapidsai/kvikio just for that.
Hi@JanuszL We tried using kvikio for data transfer from SSD to GPU and it seems that kvikio.Cufile() also has a large build overhead in write mode ("w"). In read mode ("r") there is no build overhead, but it seems to have much lower read bandwidth than DALI (even with the DALI pipeline batchsize set to 1). After removing the building overhead, my test gds read from SSD to GPU(3080Ti) results are shown in the table below, are the results of this experiment accurate?
| Block size(MB) | Kvikio_Cufile Bandwidth(MB/s) | DALI Bandwidth (MB/s) pipeline=1 | DALI Bandwidth(MB/s) pipeline=best |
|---|---|---|---|
| 1568 | 1590.909091 | 7720.33481 | 8371.596369 |
| 784 | 1549.407115 | 7619.047619 | 8276.153278 |
| 392 | 1584.478577 | 6555.183946 | 8202.552835 |
| 196 | 1542.09284 | 4708.143166 | 8112.582781 |
| 98 | 1517.027864 | 3110.12377 | 8536.585366 |
| 49 | 1585.760518 | 1749.375223 | 8044.65605 |
| 24.5 | 1531.25 | 989.4991922 | 8288.227334 |
| 12.25 | 1346.153846 | 535.1681957 | 8448.275862 |
| 6.125 | 1177.884615 | 510.8423686 | 7862.644416 |
| 3.0625 | 995.2876178 | 279.9360146 | 8019.114952 |
Hi @Weigaa,
Regarding the measurements, I would check profiles using Nsight Systems profiler to make sure that you measure not only the time taken by the issue of the copy on the CPU but the whole transfer till the end. If you could share the profiles it would make the comparison much easier.
@JanuszL We are not currently using the Nsight System profiler and it will take us some time to learn and try out the tool, if you could help us with testing then I would be very grateful. To ensure that our time statistics are accurate, we add the torch.cuda.synchronize() statement before using the time.time() function. The code we used to test DALI you can find in the previous communication, below is the code we used to test kvikio for your reference.
device=torch.device('cuda:0')
testmodelbsz = 512
N = 30
average_save_tensor = 0
average_save_numpy = 0
#
f = kvikio.CuFile('Inputnumpy.npy', "w")
#使用torch.save()写张量到SSD
for i in range(1):
Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
# Inputimages = torch.randn(256, 56, 56)
# path = 'Inputtensor' + str(i) + '.pt'
# path2 = 'Inputnumpy' + str(i) + '.npy'
Inputcupy = cupy.asarray(Inputimages)
torch.cuda.synchronize()
begin = time.time()
# f = kvikio.CuFile(path2, "w")
f.write(Inputcupy)
# f.close()
torch.cuda.synchronize()
end = time.time()
time2 = end - begin
if i > 0:
average_save_numpy += time2
# os.remove(path2)
print("numpysave spendtime is", time2)
f.close()
print( "average cupysave spendtime is" , average_save_numpy / (N - 1))
# #使用kvikio读数据时间
averageloadtime = 0
f = kvikio.CuFile('Inputnumpy.npy', "r")
#CPU到GPU传输测试
cupyimages = cupy.asarray(torch.empty(testmodelbsz, 256, 56, 56))
for i in range(N):
torch.cuda.synchronize()
# path3 = 'Inputcupy' + str(i)
begin = time.time()
Inputimages = cupy.empty_like(cupyimages)
# f = kvikio.CuFile(path3, "r")
# Read whole array from file
f.read(Inputimages)
Inputtensor = torch.as_tensor(Inputimages, device = device)
torch.cuda.synchronize()
end = time.time()
if i > 0:
averageloadtime += end - begin
print("load time is", end - begin)
# os.remove(path3)
print("average load time is", averageloadtime/(N -1))
os.remove('Inputnumpy.npy')
Hello @Weigaa,
thanks for your insights. Before trying to dig deeper into this issue, did you make sure that your drive is GDS compatible? Each gds installation comes with a checker tool, called gdschecker. If you call gdschecker -P it should tell you what the GDS supported partitions are. Can you please give me the output of that? Also, what kind of storage are you reading from? Is it remote storage (e.g. GPFS or LUSTRE) or local (NVME)? If it is NVME, are you using a software RAID?
For more info, please check the GDS configuration Guide.
Concerning DALI and GDS: I am unfamiliar with the hooks you are using. Are those hooks executed on every fwd pass? If yes, then it seems to rebuild the pipeline at each step which is not recommended. In terms of GDS, this would open and destroy the GDS driver at every single iteration and adds additional overhead to the overall DALI construction overhead.
Please let me know what you think
Best and thanks Thorsten
Hi@azrael417 @JanuszL
Thank you very much. For your first question, the output of gdschecker -P are:
I'm not sure if this output means that the system is compatible. I'm not sure if this is due to my GPU version not supporting GDS, or not using a Mellanox Connect5 or newer NIC? Could you tell me something useful about that?
And my storage read from local NVME SSD without RAID, the SSD information is:
09:00.0 Non-Volatile memory controller: Intel Corporation Optane SSD 900P Series
I use mount /dev/nvme0n1 /mnt/optane/ to mount the SSD to my directory.
For your second question, I rewrote the code without using hook to test DALI, you could ignore the code above and just focus on the following: Step 1: create some test files:
import torch
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
import os
import time
import numpy as np
device=torch.device('cuda:0')
batch_size = 32 # to be used in pipelines
dali_extra_dir = os.environ['DALI_EXTRA_PATH']
# data_dir_2d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_2d_slices', 'STU00001')
# data_dir_3d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_3d', 'STU00001')
# data_dir = os.path.join(data_dir_2d, 'SER00001')
# # Listing all *.npy files in data_dir
data_dir = '.'
files = sorted([f for f in os.listdir(data_dir) if '.npy' in f])
testmodelbsz = 32
# files = files[0:5]
N = 10
for i in range(N):
# Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
Inputimages = torch.randn(testmodelbsz, 256, 56, 56)
path2 = 'Inputnumpy' + str(i) + '.npy'
# save by numpy
torch.cuda.synchronize()
begin = time.time()
Inputnumpy = Inputimages.cpu().numpy()
np.save(path2, Inputnumpy)
torch.cuda.synchronize()
end = time.time()
time2 = end - begin
average_save_numpy += time2
print("numpysave spendtime is", time2)
Step 2: use DALI pipeline load data:
@pipeline_def(batch_size=batch_size, num_threads=8, device_id=0)
def pipe_gds():
data = fn.readers.numpy(device='gpu', file_root=data_dir, files=files)
return data
#DALI_Batch Load
average_transfer_pipeline_numpy = 0
p = pipe_gds()
p.build()
for i in range(N):
torch.cuda.synchronize()
begin = time.time()
pipe_out= p.run()
torch.cuda.synchronize()
end = time.time()
time1 = end - begin
print("numpyload pipeline spendtime is", time1)
print(len(pipe_out[0]))
if i > 1:
average_transfer_pipeline_numpy += time1
print("average numpyload pipeline spendtime is", average_transfer_pipeline_numpy / ((N -2) * batch_size) )
and we use the following pipeline to test no pipeline DALI:
@pipeline_def(batch_size=1, num_threads=8, device_id=0)
def pipe_gds(filename):
data = fn.readers.numpy(device='gpu', file_root=data_dir, files=filename)
return data
The calculated times turned out to be consistent with our previous feedback.
Last for the GDS, we often save different Cuda tensors with a different name, so open and destroy the GDS by f = kvikio.CuFile(path2, "w") and f.close() is necessary, because each iteration the path2 is different.
We apologise for the previous error in providing GDS to "write" code instead of "read" code.
In order to achieve a fair comparison, I modified some code for kvikio.cufile(), and retested the results, the codes are as follows:
device=torch.device('cuda:0')
testmodelbsz = 512
N = 30
average_save_tensor = 0
average_save_numpy = 0
#
f = kvikio.CuFile('Inputnumpy.npy', "w")
#使用torch.save()写张量到SSD
for i in range(1):
Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
# Inputimages = torch.randn(256, 56, 56)
# path = 'Inputtensor' + str(i) + '.pt'
# path2 = 'Inputnumpy' + str(i) + '.npy'
Inputcupy = cupy.asarray(Inputimages)
torch.cuda.synchronize()
begin = time.time()
# f = kvikio.CuFile(path2, "w")
f.write(Inputcupy)
# f.close()
torch.cuda.synchronize()
end = time.time()
time2 = end - begin
if i > 0:
average_save_numpy += time2
# os.remove(path2)
print("numpysave spendtime is", time2)
f.close()
print( "average cupysave spendtime is" , average_save_numpy / (N - 1))
# #使用kvikio读数据时间
averageloadtime = 0
f = kvikio.CuFile('Inputnumpy.npy', "r")
#CPU到GPU传输测试
cupyimages = cupy.asarray(torch.empty(testmodelbsz, 256, 56, 56))
for i in range(N):
torch.cuda.synchronize()
# path3 = 'Inputcupy' + str(i)
begin = time.time()
Inputimages = cupy.empty_like(cupyimages)
# f = kvikio.CuFile(path3, "r")
# Read whole array from file
f.read(Inputimages)
Inputtensor = torch.as_tensor(Inputimages, device = device)
torch.cuda.synchronize()
end = time.time()
if i > 0:
averageloadtime += end - begin
print("load time is", end - begin)
# os.remove(path3)
print("average load time is", averageloadtime/(N -1))
os.remove('Inputnumpy.npy')
The kvikio_Cufile read bandwidth is:
| Block size(MB) | Kvikio_Cufile Read Bandwidth(MB/s) |
|---|---|
| 1568 | 1940.209859 |
| 784 | 1888.701518 |
| 392 | 1930.083703 |
| 196 | 1861.596034 |
| 98 | 1901.804774 |
| 49 | 1772.151899 |
| 24.5 | 1788.321168 |
| 12.25 | 157.6373697 |
| 6.125 | 1447.306238 |
| 3.0625 | 1126.747609 |
Hello @Weigaa ,
A few things to mention here:
-
it seems that your installation does not support GDS. When you installed CUDA on the system, did you also install the GDS drivers? In the meantime, I will crosscheck with the GDS product team if Intel Optane memory is supported by GDS.
-
For general DL workloads, I expect that a custom reader using kvikio will deliver worse performance if it is not properly called in a subprocess, since basically all the IO is serialized with the training. If you are using it inside a pytorch DataLoader, be careful that the subprocess needs to be either spawned or forkserver forked because otherwise the CUDA context will get corrupted. In general, DALI avoids this. In any case, you can experiment with different number of threads. It is not documented but can be set by a module function. Also, kvikio supports async IO. Please check this part, I think it is not documented either. But basically it allows you to fire off IO on a batch and then wait for completion.
-
So better let's focus on DALI with GDS and without GDS fully serialized, that means without any NN attached to it. What performance do you get when you use the exact same pipeline, once with GDS (i.e.
fn.readers.numpydevice isgpu) vs the POSIX based pipeline (device iscpu). -
While employing 8 threads sounds reasonable, please play around with the numbers of threads. Also, please state
register_buffers = Trueandcache_header_information=Truein the reader for best GDS performance.
Best Thorsten
Hi @azrael417 , Thanks again, Thorsten. We still have some questions that we would like to get your help with.
- We do have GDS installed, we use
gdscheckand get a "Platform Verification succeeded", but it seems we are running in "compatibility mode" and not in real GDS. Could you please help us to confirm ① whether the 3080TI GPU supports GPU Direct storage and ② whether a mellanox Connect5 or higher smart network card (NIC) must be installed? - Kvikio indeed has Non-blocking read API, but I can't find the async write API. I will try the kvikio Non-blocking read API later. I agree that DALI can help me solve serial I/O problems and that is exactly why I chose it. Although I use it in the training process and not just when reading input data. A rule of thumb is that I/O speeds are faster when the number of threads is the same as the number of real processor cores on the host, and I will try different numbers of threads to enrich my results.
- We will add and report on the results of the tests using device='cpu' as soon as possible.
- Our Intel(R) Xeon(R) CPU E5-2680 v4 server has 28 physical cores and 56 logical cores, and we increased the test thread number range from 2 to 128 in powers of 2 as well as testing 28 and 56 separately. And we also add
register_buffers = Trueandcache_header_information=Trueto the numpy reader. In order to make our experiment more meaningful, I hope to get a response to question 1 as soon as possible, thank you very much.
Best Jia Wei
@azrael417 @JanuszL We have done more experiments on DALI, comparing device='cpu' and decive='gpu' under the same experimental environment and pipeline settings, and the results are shown in the table below.
In addition, we re-checked the GDS development documentation and we found that GDS does not seem to support the 3080TI GPU, so we used the P100 for the following test experiments with different threads. Unfortunately, even though we reinstalled the CUDA and GDS drivers on the P100, we found that all devices, including NVME, were in the "unsupported" state via gdscheck. Therefore, the following experiments are still conducted in gds-compatible mode.
| Block size(MB) | Kvikio_Cufile Bandwidth(MB/s) | DALI Bandwidth (MB/s) pipeline=1 | DALI Bandwidth(MB/s) pipeline=best | DALI Bandwidth CPU(MB/s) pipeline=1 | DALI Bandwidth CPU(MB/s) pipeline=best |
|---|---|---|---|---|---|
| 1568 | 1940.209859 | 7720.33481 | 8371.596369 | 3438.596491 | 5915.864931 |
| 784 | 1888.701518 | 7619.047619 | 8276.153278 | 3684.210526 | 7468.08916 |
| 392 | 1930.083703 | 6555.183946 | 8202.552835 | 4288.840263 | 7013.777062 |
| 196 | 1861.596034 | 4708.143166 | 8112.582781 | 4458.598726 | 7486.631016 |
| 98 | 1901.804774 | 3110.12377 | 8536.585366 | 4322.893692 | 6712.328767 |
| 49 | 1772.151899 | 1749.375223 | 8044.65605 | 4313.380282 | 7059.501513 |
| 24.5 | 1788.321168 | 989.4991922 | 8288.227334 | 2793.296089 | 6193.124368 |
| 12.25 | 157.6373697 | 535.1681957 | 8448.275862 | 3134.595701 | 6702.412869 |
| 6.125 | 1447.306238 | 510.8423686 | 7862.644416 | 4316.420014 | 7036.1861 |
| 3.0625 | 1126.747609 | 279.9360146 | 8019.114952 | 3835.796593 | 6564.844587 |
In addition, we re-checked the GDS development documentation and we found that GDS does not seem to support the 3080TI GPU, so we used the P100 for the following test experiments with different threads. Unfortunately, even though we reinstalled the CUDA and GDS drivers on the P100 GPU, we found that all devices, including NVME, were in the "unsupported" state via gdscheck. Therefore, the following experiments are still conducted in gds-compatible mode. We have reported our installation problems here https://github.com/NVIDIA/DALI/issues/4009.
The results of the different threads are shown below. "with optimization" means that register_buffers = True and cache_header_information = True have been added to the numpy reader, while the opposite is true. "no optimization" does not add the above statements. The experimental results show that the difference between the two is not significant. When the data blocks are small, it seems to be more efficient to use a smaller number of threads.
| num_threads | datablock_size | Batchsize | Bandwidth (no optimization) | Bandwidth (with optimization) |
|---|---|---|---|---|
| 1 | 12.25 | 256 | 5197.284684 | 5223.880597 |
| 2 | 12.25 | 256 | 8832.011536 | 8944.21729 |
| 4 | 12.25 | 256 | 9960.159363 | 9974.757756 |
| 8 | 12.25 | 256 | 8897.443347 | 9016.634771 |
| 16 | 12.25 | 256 | 8433.73494 | 8434.315616 |
| 32 | 12.25 | 256 | 7949.899409 | 7966.443389 |
| 64 | 12.25 | 256 | 7642.874969 | 7699.560025 |
| 128 | 12.25 | 256 | 7117.789244 | 6995.203289 |
| 28 | 12.25 | 256 | 8115.269957 | 8335.601524 |
| 56 | 12.25 | 256 | 7608.695652 | 7660.08004 |
| 256 | 12.25 | 256 | 6334.022751 | 6963.788301 |
We've done all we can, but the biggest challenge at the moment seems to be that we don't know how to tune our hardware and software configurations to get our NVMe SSD and P100 GPU to support GDS?
Hello Weigaa,
sorry for the late reply. I am trying to figure that out with the GDS developers. So good news is, P100 is supported. Therefore, the reason why you are seeing this unsupported messages is a different one.
There are several issues here, let's go step by step. Let's debug the setup issue at the other ticket, 4009 and let's keep this to the DALI/GDS interaction itself. So, the perf you shared above for DALI GPU and CPU does not look so. bad, does it? What is the peak bandwidth of your drive you used for that test? 8GB/sec looks reasonable to me. Is that single GPU or aggregate?
It seems that the GDS version performs better than the POSIX one, even in compat mode, do I read that correctly? So that is good news. It likely has to do with the fact that the parallelization in the GDS code branch is a bit more aggressive. Therefore, perf speedups can also be due to the different parallelization scheme. In any case, it would be good to know what bandwidth you expect from the drive per employed GPU.
Best regards Thorsten
Hi @azrael417 , Thank you for your reply.
We found that the reason for the NVMe unsupported error may be due to the linux kernel version. We found that the GDS development documentation requires a linux kernel of 5.4.x. Therefore, we previously used 5.4.120 but it appeared unsupported. However, when we try to use the linux 5.4.70 kernel, as shown in Figure 1 below, NVMe seems to be supported, but our Mellanox Peerdirect is still disable, causing an error when we run our program after we turn off compatibility mode, with the error result shown in Figure 2. We would be grateful if you could help us to resolve this issue or ask the GDS team for help.
Fig.1
Fig.2
Let's go back to DALI itself, our PCIe drives (NVMe SSD and P100) are mounted on PCIe 3.0 × 16 with a peak bandwidth of 16GB/s and we are only using one GPU.
Yes, transferring data to the GPU using DALI.GDS can sometimes be faster than transferring to the CPU using DALI.CPU, even if we are using GDS-compatible mode. However, in GDS compatibility mode we have higher CPU utilisation (about the same as POSIX reads to the CPU) and lower CPU and DRAM utilisation is what we want to use GDS for, which we don't do in compatibility mode. We would like our GPU drive to reach close to the full bandwidth of PCIe 3.0 x16 (16GB/s) and CPU utilisation should be low as GPUDirect-Fig-7 said.
Look forward to your reply!
Best regards, Jia Wei
Hi, @azrael417
After communicating with the GDS engineer via email, our GDS environment now appears to be working properly. This is shown in the figure below.
However, when we turned off compatibility mode and tested our program, the previous error reappeared.
(wjpytorch) root@nuosen:/mnt/optane/wjtest# python testGDS.py
Traceback (most recent call last):
File "/mnt/optane/wjtest/testGDS.py", line 186, in <module>
pipe_out= p.run()
File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/pipeline.py", line 980, in run
return self.outputs()
File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/pipeline.py", line 879, in outputs
return self._outputs()
File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/pipeline.py", line 963, in _outputs
return self._pipe.Outputs()
RuntimeError: Critical error in pipeline:
Error when executing GPU operator readers__Numpy encountered:
Error in thread 0: [/opt/dali/dali/util/std_cufile.cc:73] CUFile import failed: ./Inputnumpy.npy. GPUDirect Storage not supported on current file.
Stacktrace (9 entries):
[frame 0]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x83f5f) [0x7f9be77a2f5f]
[frame 1]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x1b44ef) [0x7f9be78d34ef]
[frame 2]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x1b4f87) [0x7f9be78d3f87]
[frame 3]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(dali::CUFileStream::Open(std::string const&, bool, bool)+0xbf) [0x7f9be78d29ef]
[frame 4]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali_operators.so(+0x2ddc749) [0x7f9bca232749]
[frame 5]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(dali::ThreadPool::ThreadMain(int, int, bool)+0x1fe) [0x7f9be787c9de]
[frame 6]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x721d9f) [0x7f9be7e40d9f]
[frame 7]: /lib/x86_64-linux-gnu/libpthread.so.0(+0x76db) [0x7f9cbfaf06db]
[frame 8]: /lib/x86_64-linux-gnu/libc.so.6(clone+0x3f) [0x7f9cbf81961f]
When we turn on compatibility mode, the program runs, but it doesn't run faster than before, it doesn't use less CPU and memory, and GDS doesn't seem to take effect. I would like to know if this is a DALI problem or a GDS problem? Best regards, Jia Wei
Hello @Weigaa,
so this is a GDS issue: inside DALI, I am printing the CUFILE error I receive. It says:
Error in thread 0: [/opt/dali/dali/util/std_cufile.cc:73] CUFile import failed: ./Inputnumpy.npy. GPUDirect Storage not supported on current file.
So that means it tries to load from unsupported storage. Now I am wondering why it thinks that. The files are all on the supported NVME drive, right? What I would do to check is to run gdsio in write mode on the drive in question and see what you get. For examples check the documentation page. It looks like your system config is in some kind of weird spot in terms of compatibility which wasn't tested I believe (you mentioned that your previous kernel should have been supported but it wasn't). GDSIO will tell you at least if DALI-GDS is supposed to work.
Best Thorsten
Please make sure that you point it to an empty directory on the NVME drive. I recommend to create some dummy files there or let gdsio write some files there which you then read back in with gdsio as well. If that works, then we take it from there. Are you running DALI inside the container? If yes, it might make sense to use strace to check which libcufile DALI dlopens.
Hi, @azrael417 The files are all on the Intel Optane SSD which I mentioned before, I think it is a supported NVME drive. I run gdsio to read file by GDS(Turn off compatibility mode), and the same error occurs:
(base) root@nuosen:/usr/local/cuda-11.7/gds/tools# ./gdsio -f /mnt/optane/test10G -d 0 -n 0 -w 1 -s 10G -x 0 -I 1 -T 10 -i 4096K
file register error: GPUDirect Storage not supported on current file filename :/mnt/optane/test10G
I am not running DALI on containers but on bare metal.
I checked the documentation for this error and it seems to be caused by the file system or the NVMe device that I mounted. However, I think they are both suitable, so I am very confused about the results that I got.
In addition, we found a very strange phenomenon. Even with the error message, when we use gdsio to do a "write" operation, we still find that a file of the specified size is successfully written. Even writing a file by gdsio(GDS mode) to a folder mounted on hdd can be done successfully although with the "file register error".
@azrael417
We discovered the cause of our error, we had previously mounted our optane SSD using the following statement.
mount /dev/nvme0n1 /mnt/optane
Later, we modified the mount to the following statement.
mount -t ext4 -o data=ordered /dev/nvme0n1 /mnt/optane
We specified the data file mode for ext4, and now the shutdown compatibility mode program can be executed as well.
This is something I didn't find in any of the GDS user manuals, and I would suggest that subsequent GDS teams could add this to the installation manual (as it did take us a lot of time to find, and hopefully future researchers can avoid it).
However, we found that it seems to be slower in DALI with GDS than in GDS compatibility mode with a larger batchsize. We will repeat our previous experiments in GDS non-compatible mode to verify this.
Hi, @azrael417 The results of our tests in real GDS are shown in the table below.
| Block size(MB) | Kvikio_Cufile Bandwidth(MB/s) | DALI Bandwidth (MB/s) pipeline=1 | DALI Bandwidth(MB/s) pipeline=best | DALI Bandwidth CPU(MB/s) pipeline=1 | DALI Bandwidth CPU(MB/s) pipeline=best | DALI Bandwidth (MB/s) no_comp pipeline=1 | DALI Bandwidth(MB/s) no_comp pipeline=best |
|---|---|---|---|---|---|---|---|
| 1568 | 1940.209859 | 7720.33481 | 8371.596369 | 3438.596491 | 5915.864931 | 2538.396896 | 2541.87535 |
| 784 | 1888.701518 | 7619.047619 | 8276.153278 | 3684.210526 | 7468.08916 | 2547.59873 | 2555.52564 |
| 392 | 1930.083703 | 6555.183946 | 8202.552835 | 4288.840263 | 7013.777062 | 2544.852704 | 2559.371544 |
| 196 | 1861.596034 | 4708.143166 | 8112.582781 | 4458.598726 | 7486.631016 | 2523.756792 | 2560.137098 |
| 98 | 1901.804774 | 3110.12377 | 8536.585366 | 4322.893692 | 6712.328767 | 2486.047692 | 2558.593087 |
| 49 | 1772.151899 | 1749.375223 | 8044.65605 | 4313.380282 | 7059.501513 | 2405.498282 | 2532.493441 |
| 24.5 | 1788.321168 | 989.4991922 | 8288.227334 | 2793.296089 | 6193.124368 | 2281.829189 | 2521.473693 |
| 12.25 | 157.6373697 | 535.1681957 | 8448.275862 | 3134.595701 | 6702.412869 | 2114.402099 | 2529.805236 |
| 6.125 | 1447.306238 | 510.8423686 | 7862.644416 | 4316.420014 | 7036.1861 | 1707.983603 | 2490.242316 |
| 3.0625 | 1126.747609 | 279.9360146 | 8019.114952 | 3835.796593 | 6564.844587 | 1581.004202 | 2417.508683 |
| num_threads | datablock_size | Batchsize | no_comp Bandwidth (no optimization) | no_comp Bandwidth (with optimization) |
|---|---|---|---|---|
| 1 | 12.25 | 256 | 1656.591883 | 1886.066205 |
| 2 | 12.25 | 256 | 1948.930077 | 2244.206284 |
| 4 | 12.25 | 256 | 1981.928052 | 2269.443106 |
| 8 | 12.25 | 256 | 1989.5407 | 2260.314414 |
| 16 | 12.25 | 256 | 1997.456301 | 2264.409035 |
| 32 | 12.25 | 256 | 2024.458767 | 2261.040337 |
| 64 | 12.25 | 256 | 2089.195873 | 2246.346249 |
| 128 | 12.25 | 256 | 2107.889529 | 2237.900721 |
| 28 | 12.25 | 256 | 2139.868813 | 2222.746407 |
| 56 | 12.25 | 256 | 2153.6076 | 2206.51146 |
| 256 | 12.25 | 256 | 2167.903699 | 2186.890795 |
Hello @Weigaa,
I apologize for the silence but I was/am on vacation till July 15. What I read from the table is that GDS compat mode is faster than GDS native, is that correct?
The DALI backend for compat vs non compat is the same. The DALI backend for the CPU based numpy loader has different threading, so there is no surprise to me that the GDS one can utilize threads better especially for larger block sizes (the CPU version does inter sample parallelization, the GDS one inter as well as intra sample parallelization). When it comes to native vs compat mode, this is a bit more complicated. Is that pipeline running in IO only mode still or are you running training besides it?
Best Thorsten
Hi @azrael417 ,
Thank you. Have a great vacation!
When running GDS compatibility mode, we uninstalled nvidia_fs with the following command(modprobe -r nvidia_fs) to force GDS to use the POSIX interface of linux, and NVME shows "unsupported". When running native mode, we introduced nvidia_fs normally to make GDS work in non-compatible mode.
Our experimental results are achieved with the above settings.
This means that when GDS uses Linux's POSIX interface, it can parallelize very well. When GDS uses Cufile() in true, non-compatible mode(native mode), even an increase in pipeline and threads seems to have some effect on I/O performance, but IO performance is still significantly inferior to compatibility mode.
This means that the compatibility mode used to enable non-GDS-enabled devices to use the cufile() interface transfers faster than the true GDS mode. This result looks very strange.
And I ran that pipeline in IO only mode without running any other training.
Hello @Weigaa,
What you did sounds correct. In more recent GDS versions, there is a simpler method to enforce compat mode but your solution works. I think this is an issue for the GDS engineers, I will point them to this thread.
Another thing we need to watch out for is data caching. How big is your file cache and how big is the overall dataset. It is artificial afaics, which means it is likely small, is that correct? So this means, it might get cached into DRAM and then POSIX can read it from there and will be faster than GDS. I am not 100% certain if compat mode uses O_DIRECT (in which case it won't get cached), but I am certain that data in native mode does not get cached. We should run a test to see if that could be an issue. The following two methods should work:
- create a big enough dataset so that it does not fit into file cache. This could be multiple TB in some cases, I do not recommend this solution.
- Flush the cache after writing the files. What I would do is:
a) create all the artificial files, say a total of 2 GB or so. After creating those files, they likely reside in file cache.
b) build the pipeline and run a few warmup iterations, stuff will be loaded from file cache but all buffers etc are created which is what we want.
c) flush the file system caches
echo 3 > /proc/sys/vm/drop_caches.free -mshould show you that the file cache is (almost) empty. Run the free command before and after flushing to verify that indeed something changed. d) run a single iteration over the full dataset and time it. Compute average bandwidth in GB/s as well as samples / sec from that. Do not run more than one iteration or otherwise the data could come from file cache in POSIX case and from disk in GDS case (I need to verify the former still). Do this exercise for native and compat mode.
Please let me know what you get.
Best Thorsten
Hi @Weigaa,
In addition to caching of data in sys memory, GDS p2p can be slower on some platforms when the GPU and storage device are not under PCIe switch. please see the GDS design guide for recommended config and determine if the storage device is close to the GPU https://docs.nvidia.com/gpudirect-storage/configuration-guide/index.html#benchmarking-gds
If your organization has Nvidia enterprise support, please file a NVBug.
Hi, @KiranModukuri Thank u. Since I was only using a PCIe switch, a GPU card and an NVMe device, I made sure they were under the same PCIe switch. The output is shown below. And sorry I'm not sure what you mean about filing an NVBug(for what)? Now I focus on the caching problem.
(base) root@nuosen:~# lspci -tv | egrep -i "nvidia | optane"
+-02.0-[02-05]----00.0-[03-05]--+-08.0-[04]----00.0 NVIDIA Corporation GP100GL [Tesla P100 PCIe 16GB]
| \-10.0-[09]----00.0 Intel Corporation Optane SSD 900P Series
Hello @Weigaa,
can you please hand us a simple reproducer for this problem. I lost track of what the latest code is, it would be good if you can just drop a Python script which we can look at.
Best and thanks Thorsten
Hi @azrael417 , No problem! You may need to have a Pytorch (v1.11.0 is best) to run the script, we write some files firstly, then use DALI to read them. You do not need to modify the code whenever running in compatibility mode or native (p2p) mode. P.S. Comments are used to read and write files in other ways and you can choose to ignore them. Code:
import torch
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
import os
import time
# misc python stuff
import numpy as np
from glob import glob
import shutil
import tempfile
import cupy
import kvikio
# visualization
from PIL import Image
# def plot_batch(np_arrays, nsamples=None):
# if nsamples is None:
# nsamples = len(np_arrays)
# fig, axvec = plt.subplots(nrows=1, ncols=nsamples, figsize=(10, 10 * nsamples))
# for i in range(nsamples):
# ax = axvec[i]
# ax.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False)
# ax.imshow(Image.fromarray(np_arrays[i]))
# plt.tight_layout()
device=torch.device('cuda:0')
batch_size = 1024 # to be used in pipelines
dali_extra_dir = os.environ['DALI_EXTRA_PATH']
# data_dir_2d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_2d_slices', 'STU00001')
# data_dir_3d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_3d', 'STU00001')
# data_dir = os.path.join(data_dir_2d, 'SER00001')
# # Listing all *.npy files in data_dir
# data_dir = '../..'
data_dir = '.'
files = sorted([f for f in os.listdir(data_dir) if '.npy' in f])
# files = sorted([f for f in os.listdir(data_dir) if 'Inputcupy' in f])
testmodelbsz = 1
# files = files[0:5]
@pipeline_def(batch_size=batch_size, num_threads=4, device_id=0)
def pipe_gds():
data = fn.readers.numpy(device='gpu', file_root=data_dir, files=files, register_buffers = True, cache_header_information=True)
return data
# def pipe_gds():
# data = fn.readers.numpy(device='gpu', file_root=data_dir, files=files)
# return data
#DALI_CPU_Batch
# def pipe_gds():
# data = fn.readers.numpy(device='cpu', file_root=data_dir, files=files)
# return data
# def run(p):
# p.build() # build the pipeline
# outputs = p.run() # Run once
# # Getting the batch as a list of numpy arrays, for displaying
# batch = [np.array(outputs[0][s]) for s in range(batch_size)]
# return batch
# print(files)
# def pipe_gds(filename):
# data = fn.readers.numpy(device='gpu', file_root=data_dir, files=filename, register_buffers = True, cache_header_information=True)
# return data
N = 50
average_save_tensor = 0
average_save_numpy = 0
average_save_cupy = 0
#使用torch.save()写张量到SSD
for i in range(N):
Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
# Inputimages = torch.randn(testmodelbsz, 256, 56, 56)
path = 'Inputtensor' + str(i) + '.pt'
path2 = 'Inputnumpy' + str(i) + '.npy'
path3 = 'Inputcupy'+ str(i)
# Inputimages = torch.randn(512, 256, 56, 56)
# path = 'Inputtensor.pt'
# path2 = 'Inputnumpy.npy'
#save by torch.save()
# torch.cuda.synchronize()
# begin = time.time()
# torch.save(Inputimages, path)
# torch.cuda.synchronize()
# end = time.time()
# time1 = end -begin
# average_save_tensor += time1
# print("torchsave spendtime is", time1)
#save by GDS
# torch.cuda.synchronize()
# begin = time.time()
# Inputcupy = cupy.asarray(Inputimages)
# f = kvikio.CuFile(path3, "w")
# f.write(Inputcupy)
# f.close()
# torch.cuda.synchronize()
# end = time.time()
# time3 = end - begin
# print("cupysave spendtime is", time3)
# if i > 0:
# average_save_cupy += time3
#save by numpy
torch.cuda.synchronize()
begin = time.time()
Inputnumpy = Inputimages.cpu().numpy()
end = time.time()
print("transfer time is", end - begin)
torch.cuda.synchronize()
begin = time.time()
np.save(path2, Inputnumpy)
torch.cuda.synchronize()
end = time.time()
time2 = end - begin
average_save_numpy += time2
print("numpysave spendtime is", time2)
# os.remove(path)
# os.remove(path2)
# os.remove(path3)
print("average tensorsave spendtime is", average_save_tensor / N, "average numpysave spendtime is" , average_save_numpy / N,"average cupysave spendtime is" , average_save_cupy / (N -1))
average_load_tensor = 0
average_load_numpy = 0
average_transfer_numpy = 0
Inputimages = torch.empty(testmodelbsz, 256, 56, 56).to(device)
# for i in range(N):
# path = 'Inputtensor' + str(i) + '.pt'
# path2 = 'Inputnumpy' + str(i) + '.npy'
# # # 使用torch.load()读到GPU的时间
# # # path = 'Inputtensor.pt'
# # torch.cuda.synchronize()
# # begin = time.time()
# # Inputimages = torch.load(path, map_location=lambda storage, loc: storage.cuda(0))
# # torch.cuda.synchronize()
# # end = time.time()
# # time1 = end - begin
# # average_load_tensor += time1
# # os.remove(path)
# # print("torch.load spendtime is", time1)
# # #使用DALi读到GPU的时间
# p = pipe_gds(filename=path2)
# p.build()
# torch.cuda.synchronize()
# begin = time.time()
# pipe_out = p.run()
# torch.cuda.synchronize()
# end = time.time()
# time1 = end - begin
# # print("numpyload spendtime is", time1)
# # print(pipe_out[0][0])
# torch.cuda.synchronize()
# begin = time.time()
# nvidia.dali.plugin.pytorch.feed_ndarray(pipe_out[0][0], Inputimages)
# torch.cuda.synchronize()
# end= time.time()
# time2 = end - begin
# # print("transfer time is", time2)
# time3 = time1 + time2
# if i > 1:
# average_load_numpy += time3
# average_transfer_numpy += time2
# # print("load time", time1)
# # print("transfer time", time2)
# os.remove(path2)
# print("total gdsload time",time3)
# print("average tensorload spendtime is", average_load_tensor / N , "average numpyload spendtime is" , average_load_numpy / (N - 2),"average transfer spendtime is" , average_transfer_numpy / (N - 2))
#DALI_Batch Load
average_transfer_pipeline_numpy = 0
p = pipe_gds()
p.build()
for i in range(N):
torch.cuda.synchronize()
begin = time.time()
pipe_out= p.run()
torch.cuda.synchronize()
end = time.time()
time1 = end - begin
print("numpyload pipeline spendtime is", time1)
print(len(pipe_out[0]))
if i > 1:
average_transfer_pipeline_numpy += time1
print("average numpyload pipeline spendtime is", average_transfer_pipeline_numpy / ((N -2) * batch_size))
# #DALI_Batch Load_CPU
# average_transfer_pipeline_numpy = 0
# p = pipe_gds()
# p.build()
# for i in range(N):
# torch.cuda.synchronize()
# begin = time.time()
# pipe_out= p.run()
# torch.cuda.synchronize()
# end = time.time()
# time1 = end - begin
# print("numpyload pipeline spendtime is", time1)
# print(len(pipe_out[0]))
# if i > 1:
# average_transfer_pipeline_numpy += time1
# print("average numpyload pipeline spendtime is", average_transfer_pipeline_numpy / ((N -2) * batch_size))
# averagemovetime = 0
# #CPU到GPU传输测试
# for i in range(N):
# Inputimages = torch.zeros(testmodelbsz, 256, 56, 56)
# torch.cuda.synchronize()
# begin = time.time()
# Inputimages.to(device)
# torch.cuda.synchronize()
# end = time.time()
# if i > 0:
# averagemovetime += end - begin
# print("move time is", end - begin)
# print("average move time is", averagemovetime/(N -1 ))
# #使用Numpy读数据时间
# average_load_numpy = 0
# for i in range(N):
# path = 'Inputnumpy' + str(i) + '.npy'
# # 使用np.load()读到GPU的时间
# torch.cuda.synchronize()
# begin = time.time()
# Inputimages = torch.from_numpy(np.load(path)).to(device)
# torch.cuda.synchronize()
# end = time.time()
# time1 = end - begin
# if i > 1:
# average_load_numpy += time1
# print("numpy.load spendtime is", time1)
# os.remove(path)
# print("average numpyload spendtime is" , average_load_numpy / (N - 2))
#使用kvikio读数据时间
# averageloadtime = 0
# #CPU到GPU传输测试
# cupyimages = cupy.asarray(torch.empty(testmodelbsz, 256, 56, 56))
# for i in range(N):
# torch.cuda.synchronize()
# path3 = 'Inputcupy' + str(i)
# begin = time.time()
# Inputimages = cupy.empty_like(cupyimages)
# f = kvikio.CuFile(path3, "r")
# # Read whole array from file
# f.read(Inputimages)
# Inputtensor = torch.as_tensor(Inputimages, device = device)
# torch.cuda.synchronize()
# end = time.time()
# if i > 0:
# averageloadtime += end - begin
# print("load time is", end - begin)
# os.remove(path3)
# print("average load time is", averageloadtime/(N -1))
# data_gds = pipe_out[0].as_cpu().as_array() # as_cpu() to copy the data back to CPU memory
# print(data_gds.shape)
Hi @azrael417 , I am sorry that it took me longer to complete the experiment as the equipment in our lab is shared by multiple people. I generated 64 blocks of size 49MB, to make them a total size of about 3GB. For each test I use a DALI reader with a batchsize of 4 to read the data 16 times, ensuring that each data is only read once. The result of clearing the cache is shown below.
total used free shared buff/cache available
Mem: 64287 2893 61033 9 360 60862
Swap: 2047 0 2047
We test the native and compat mode both on cache and non_cache, the test results are as follows:
| datasize | number | totalsize | batch_size | native_cache(GB/s) | native_nocache(GB/s) | compat_cache(GB/s) | compat_nocache(GB/s) | native_cache(samples/s) | native_nocache(samples/s) | compat_cache(samples/s) | compat_nocache(samples/s) |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 49 | 64 | 3136 | 4 | 2.04319225 | 2.046687874 | 7.530935238 | 1.965963948 | 2732.707088 | 2737.382378 | 10072.39534 | 2629.416598 |
The experimental results seem to indicate that the fast reads and writes in compatibility mode come from the use of cache. However, my PCIe bandwidth could achieve almost 16GB/s, but even using native GDS could get almost 2.1GB/s, why the bandwidth is so slow and how could I get higher bandwidth like GDS developer technical blog