DALI icon indicating copy to clipboard operation
DALI copied to clipboard

Errors when converting a Dali tensor to a pytoch tensor

Open Weigaa opened this issue 3 years ago • 42 comments

Hi, guys. I tried to combine DALI with the torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook) API to speed up the offloading and prefetching of intermediate feature maps to SSDs. I converted the Pytorch tensor to numpy for storage on the SSD during forward propagation, and used pipe_gds() to fetch back to the GPU during backward propagation, then completed the DALI Tensor to Pytorch via nvidia.dali.plugin.pytorch.feed_ndarray() Tensor conversion. When executing the feature map generated by the convolution layer, some errors occur and the error output is as follows.

(wjpytorch) root@li:~/wj# python test_with_hook_gds.py
Files already downloaded and verified
train_data_size:50000
cuda:0
Number of parameter: 0.14 M
Memory of parameter: 0.56 M
--------------The 1 training begins----------
inputimages type is torch.Size([64, 10])
inputimages type is torch.Size([2560, 64])
successfully free 7d4e1136-ed45-4a2c-9afd-ad109689260b.npy
successfully free 7c970ca2-82b1-43f2-87ac-4df69e35a576.npy
inputimages type is torch.Size([1024, 64])
inputimages type is torch.Size([2560, 1024])
successfully free 4375fe98-b486-418e-b70e-74e80de8cf99.npy
successfully free bf5766bb-3f17-4df0-a7c8-5fe7d17e0c3b.npy
inputimages type is torch.Size([2560, 64, 8, 8])
Traceback (most recent call last):
  File "/root/wj/test_with_hook_gds.py", line 123, in <module>
    loss.backward()
  File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/torch/_tensor.py", line 363, in backward
    torch.autograd.backward(self, gradient, retain_graph, create_graph, inputs=inputs)
  File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/torch/autograd/__init__.py", line 173, in backward
    Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass
RuntimeError: AssertionError: The element type of DALI Tensor/TensorList doesn't match the element type of the target PyTorch Tensor: torch.int64 vs torch.float32

At:
  /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/plugin/pytorch.py(55): feed_ndarray
  /root/wj/test_with_hook_gds.py(59): unpack_hook

I'm not sure if this is due to DALI or the Pytorch API, when I use torch.load() directly to read a Pytorch tensor file, no errors occur. Could you give me some suggestions for adjustments?

The reproducible code is as follows:

import os
import time
import uuid
import inspect
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torchvision
from torch import cuda
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
from torch.utils.data import DataLoader

device=torch.device('cuda:0')

#pynvml.nvmlInit()
#frame = inspect.currentframe()
#gpu_tracker = MemTracker(frame)

train_data=torchvision.datasets.CIFAR10(root='./data',train=True,transform=torchvision.transforms.ToTensor(),download=True)
#test_data=torchvision.datasets.CIFAR10(root='./data',train=False,transform=torchvision.transforms.ToTensor(),download=True)

train_data_size=len(train_data)
#test_data_size=len(test_data)
print("train_data_size:{}".format(train_data_size))
#print("test_data_size:{}".format(test_data_size))

train_dataloader=DataLoader(train_data,batch_size=2560)
#test_dataloader=DataLoader(test_data,batch_size=128)

'''hook'''
class SelfDeletingTempFile():
    def __init__(self):
        self.name=os.path.join("",str(uuid.uuid4()))
    def __del__(self):
        freefilename = self.name +'.npy'
        os.remove(freefilename)
        print("successfully free " + freefilename)

@pipeline_def(batch_size=1, num_threads=8, device_id=0)
def pipe_gds(filename):
    data = fn.readers.numpy(device='gpu',file_root='.', files=filename)
    return data

def pack_hook(tensor):
    temp_file=SelfDeletingTempFile()
    tensorshape = tensor.shape
    Inputnumpy = tensor.cpu().numpy()
    np.save(temp_file.name,Inputnumpy)
    file = [temp_file, tensorshape]
    return file

def unpack_hook(file):
    Inputimages = torch.zeros(file[1]).to(device)
    p = pipe_gds(filename=(file[0].name+'.npy'))
    p.build()
    pipe_out = p.run()
    nvidia.dali.plugin.pytorch.feed_ndarray(pipe_out[0][0], Inputimages)
    print("inputimages type is",Inputimages.shape)
    return Inputimages


""" Network architecture. """
class mymodel(nn.Module):
    def __init__(self):
        super(mymodel, self).__init__()

        self.model1=nn.Sequential(
        nn.Conv2d(3, 32, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Conv2d(32, 32, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Conv2d(32, 64, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Flatten(),  # 展平

        nn.Linear(64 * 4 * 4, 64),
        nn.Linear(64, 10),
        )

    def forward(self, x):  # input:32*32*3
        with torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook):
            x=self.model1(x)
        return x

net1=mymodel()
net1=net1.to(device)
loss_fn=nn.CrossEntropyLoss()
loss_fn=loss_fn.to(device)
optimizer=torch.optim.SGD(net1.parameters(),lr=0.1)

total_train_step=0
total_test_step=0
epoch=20

print(next(net1.parameters()).device)
total = sum([param.nelement() for param in net1.parameters()])
print("Number of parameter: %.2f M"  % (total/1024/1024))
print("Memory of parameter: %.2f M " % (cuda.memory_allocated()/1024/1024))


'''start training'''
totalbegin=time.time()
#gpu_tracker.track()
for i in range(epoch):
    print("--------------The {} training begins----------".format(i+1))

    running_loss=0
    running_correct=0

    begin=time.time()
    for data in train_dataloader:
        images,targets=data
        #print(images.device)
        images=images.to(device)
        targets=targets.to(device)

        outputs=net1(images)
        loss=loss_fn(outputs,targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss+=loss.item()
        running_correct += (outputs.argmax(1) == targets).sum()
        total_train_step+=1

        #if total_train_step%100==0:
        #    print("number of training:{},loss:{}".format(total_train_step,loss))

    end = time.time()
    print("spend time: ",(end-begin)/60)
    print("epoch:{}, loss:{}, accuracy:{}".format(i+1,running_loss/train_data_size,running_correct/train_data_size))

totalend=time.time()
print("total real runtime: ", (totalend - totalbegin) / 60)
#gpu_tracker.track()
print("gpu memory allocated: %2.f M " % (cuda.memory_allocated()/1024/1024))

My GPU is Nvidia P100, and my Pytorch is 1.11.0+cu113. My Dali version is 1.14. 1654687519961

Weigaa avatar Jun 08 '22 11:06 Weigaa

Hi @Weigaa,

The error tells that the Torch tensor and DALI output used in feed_ndarray types don't match. DALI returns int64 while torch.zeros creates float32. Your code should do something like:

    to_torch_type = {
        types.DALIDataType.FLOAT   : torch.float32,
        types.DALIDataType.FLOAT64 : torch.float64,
        types.DALIDataType.FLOAT16 : torch.float16,
        types.DALIDataType.UINT8   : torch.uint8,
        types.DALIDataType.INT8    : torch.int8,
        types.DALIDataType.INT16   : torch.int16,
        types.DALIDataType.INT32   : torch.int32,
        types.DALIDataType.INT64   : torch.int64
    }
    dali_tensor = pipe_out[0][0]
    torch_type = to_torch_type[dali_tensor.dtype]
    Inputimages = torch.zeros(file[1], dtype=torch_type).to(device)
    nvidia.dali.plugin.pytorch.feed_ndarray(dali_tensor, Inputimages)

JanuszL avatar Jun 08 '22 13:06 JanuszL

@Januszl Thank you very much. After adding import nvidia.dali.types as types and your suggested code, my code can run fine. I noticed that the tensor I stored earlier are torch.cuda.FloatTensor and torch.cuda.LongTensor, can I directly use nvidia.dali.plugin.pytorch.feed_ndarray() change DALIDataType.INT64 to torch.cuda.LongTensor and DALIDataType.FLOAT to torch.cuda.FloatTensor ?

Weigaa avatar Jun 08 '22 15:06 Weigaa

I think the best would be to use:

Inputimages = torch.empty(file[1], dtype=torch_type, device=device)

and keep types as proposed. According to https://pytorch.org/docs/stable/tensors.html torch.cuda.LongTensor is the TensorType while the torch.int64 is the dtype expected used to allocate an empty tensor.

JanuszL avatar Jun 08 '22 15:06 JanuszL

@JanuszL I use Inputimages = torch.empty(file[1], dtype=torch_type, device=device) to replace Inputimages = torch.zeros(file[1], dtype=torch_type).to(device). Unfortunately, Then, I used DALI and torch.load() to read the feature map from SSD to GPU respectively, and I found that DALI with GDS was much slower than torch.load() by the time.time() function to count the time, which seems abnormal, what is the reason for this?

The torch.save && torch.load code is:

import os
import time
import uuid
import inspect
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch import cuda

from torch.utils.data import DataLoader
#from gpu_mem_track import MemTracker

device=torch.device('cuda:0')


train_data=torchvision.datasets.CIFAR10(root='./data',train=True,transform=torchvision.transforms.ToTensor(),download=True)
train_data_size=len(train_data)
#test_data_size=len(test_data)
print("train_data_size:{}".format(train_data_size))
train_dataloader=DataLoader(train_data,batch_size=2560)

'''hook'''
class SelfDeletingTempFile():
    def __init__(self):
        self.name=os.path.join("./",str(uuid.uuid4()))
    def __del__(self):
        os.remove(self.name)

def pack_hook(tensor):
    temp_file=SelfDeletingTempFile()
    begin = time.time()
    torch.save(tensor,temp_file.name)
    end = time.time()
    print(tensor.shape,"offload time is", end - begin)
    return temp_file

def unpack_hook(temp_file):
    begin = time.time()
    tensor = torch.load(temp_file.name)
    end = time.time()
    print(tensor.shape, "load time is", end - begin)
    return tensor


""" Network architecture. """
class mymodel(nn.Module):
    def __init__(self):
        super(mymodel, self).__init__()

        self.model1=nn.Sequential(
        nn.Conv2d(3, 32, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Conv2d(32, 32, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Conv2d(32, 64, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Flatten(),  # 展平

        nn.Linear(64 * 4 * 4, 64),
        nn.Linear(64, 10),
        )

    def forward(self, x):  # input:32*32*3
        with torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook):
            x=self.model1(x)
        return x

net1=mymodel()
net1=net1.to(device)
loss_fn=nn.CrossEntropyLoss()
loss_fn=loss_fn.to(device)
optimizer=torch.optim.SGD(net1.parameters(),lr=0.1)

total_train_step=0
total_test_step=0
epoch=1

print(next(net1.parameters()).device)
total = sum([param.nelement() for param in net1.parameters()])
print("Number of parameter: %.2f M"  % (total/1024/1024))
print("Memory of parameter: %.2f M " % (cuda.memory_allocated()/1024/1024))


'''start training'''
totalbegin=time.time()
#gpu_tracker.track()
for i in range(epoch):
    print("--------------The {} training begins----------".format(i+1))
    running_loss=0
    running_correct=0
    begin=time.time()
    j = 0
    if (j > 0):
        break
    for data in train_dataloader:
        if (j > 0):
            break
        images,targets=data
        #print(images.device)
        images=images.to(device)
        targets=targets.to(device)

        outputs=net1(images)
        loss=loss_fn(outputs,targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss+=loss.item()
        running_correct += (outputs.argmax(1) == targets).sum()
        total_train_step+=1
        j += 1
        #if total_train_step%100==0:
        #    print("number of training:{},loss:{}".format(total_train_step,loss))

    end = time.time()
    print("spend time: ",(end-begin)/60)
    print("epoch:{}, loss:{}, accuracy:{}".format(i+1,running_loss/train_data_size,running_correct/train_data_size))

totalend=time.time()
print("total real runtime: ", (totalend - totalbegin) / 60)

The DALI code is:

import os
import time
import uuid
import inspect
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torchvision
from torch import cuda
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
from torch.utils.data import DataLoader
import nvidia.dali.types as types


device=torch.device('cuda:0')

#pynvml.nvmlInit()
#frame = inspect.currentframe()
#gpu_tracker = MemTracker(frame)

train_data=torchvision.datasets.CIFAR10(root='./data',train=True,transform=torchvision.transforms.ToTensor(),download=True)
#test_data=torchvision.datasets.CIFAR10(root='./data',train=False,transform=torchvision.transforms.ToTensor(),download=True)

train_data_size=len(train_data)
#test_data_size=len(test_data)
print("train_data_size:{}".format(train_data_size))
#print("test_data_size:{}".format(test_data_size))

train_dataloader=DataLoader(train_data,batch_size=2560)
#test_dataloader=DataLoader(test_data,batch_size=128)

'''hook'''
class SelfDeletingTempFile():
    def __init__(self):
        self.name=os.path.join("",str(uuid.uuid4()))
    def __del__(self):
        freefilename = self.name +'.npy'
        os.remove(freefilename)
        # print("successfully free " + freefilename)

@pipeline_def(batch_size=1, num_threads=8, device_id=0)
def pipe_gds(filename):
    data = fn.readers.numpy(device='gpu',file_root='.', files=filename)
    return data

def pack_hook(tensor):
    temp_file=SelfDeletingTempFile()
    begin = time.time()
    tensorshape = tensor.shape
    # print("save tensor type is", tensor.type(), "shape is",tensorshape )
    Inputnumpy = tensor.cpu().numpy()
    np.save(temp_file.name,Inputnumpy)
    file = [temp_file, tensorshape]
    end = time.time()
    print(tensor.shape,"offload time is", end - begin)
    return file

def unpack_hook(file):
    begin = time.time()
    begin2 = time.time()
    p = pipe_gds(filename=(file[0].name+'.npy'))
    p.build()
    pipe_out = p.run()
    end2 = time.time()
    to_torch_type = {
        types.DALIDataType.FLOAT: torch.float32,
        types.DALIDataType.FLOAT64: torch.float64,
        types.DALIDataType.FLOAT16: torch.float16,
        types.DALIDataType.UINT8: torch.uint8,
        types.DALIDataType.INT8: torch.int8,
        types.DALIDataType.INT16: torch.int16,
        types.DALIDataType.INT32: torch.int32,
        types.DALIDataType.INT64: torch.int64
    }
    dali_tensor = pipe_out[0][0]
    # print(dali_tensor.dtype)
    torch_type = to_torch_type[dali_tensor.dtype]
    Inputimages = torch.empty(file[1], dtype=torch_type).to(device)
    nvidia.dali.plugin.pytorch.feed_ndarray(dali_tensor, Inputimages)
    end = time.time()
    print(Inputimages.shape, "pure load time is", end2 - begin2)
    print(Inputimages.shape, "load time is", end - begin)
    # print("inputimages type is",Inputimages.shape)
    return Inputimages


""" Network architecture. """
class mymodel(nn.Module):
    def __init__(self):
        super(mymodel, self).__init__()

        self.model1=nn.Sequential(
        nn.Conv2d(3, 32, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Conv2d(32, 32, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Conv2d(32, 64, 5, padding=2),
        nn.MaxPool2d(2),
        nn.Flatten(),  # 展平

        nn.Linear(64 * 4 * 4, 64),
        nn.Linear(64, 10),
        )

    def forward(self, x):  # input:32*32*3
        with torch.autograd.graph.saved_tensors_hooks(pack_hook,unpack_hook):
            x=self.model1(x)
        return x

net1=mymodel()
net1=net1.to(device)
loss_fn=nn.CrossEntropyLoss()
loss_fn=loss_fn.to(device)
optimizer=torch.optim.SGD(net1.parameters(),lr=0.1)

total_train_step=0
total_test_step=0
epoch=1

print(next(net1.parameters()).device)
total = sum([param.nelement() for param in net1.parameters()])
print("Number of parameter: %.2f M"  % (total/1024/1024))
print("Memory of parameter: %.2f M " % (cuda.memory_allocated()/1024/1024))


'''start training'''
totalbegin=time.time()
#gpu_tracker.track()
for i in range(epoch):
    print("--------------The {} training begins----------".format(i+1))

    running_loss=0
    running_correct=0
    begin=time.time()
    j = 0
    if (j > 0):
        break
    for data in train_dataloader:
        if (j > 0):
            break
        images,targets=data
        #print(images.device)
        images=images.to(device)
        targets=targets.to(device)

        outputs=net1(images)
        loss=loss_fn(outputs,targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss+=loss.item()
        running_correct += (outputs.argmax(1) == targets).sum()
        total_train_step+=1
        j += 1

        #if total_train_step%100==0:
        #    print("number of training:{},loss:{}".format(total_train_step,loss))

    end = time.time()
    print("spend time: ",(end-begin)/60)
    print("epoch:{}, loss:{}, accuracy:{}".format(i+1,running_loss/train_data_size,running_correct/train_data_size))

totalend=time.time()
print("total real runtime: ", (totalend - totalbegin) / 60)
#gpu_tracker.track()
print("gpu memory allocated: %2.f M " % (cuda.memory_allocated()/1024/1024))

We noticed that p.build() code occupied almost 97% load time via DALI, does it mean that DALI has a serious pipeline construction overhead (start-up overhead) and is there a way we can avoid this?

Weigaa avatar Jun 09 '22 04:06 Weigaa

Hi @Weigaa,

DALI and GDS that DALI uses inside the numpy reader for the GPU have noticeable construction time overhead. DALI is not optimized for frequent pipeline recreation but the fast execution once the flow is defined. If your goal is to load the files from the drive to the GPU memory without any processing DALI maybe not be the best choice. You may consider trying out https://github.com/rapidsai/kvikio just for that.

JanuszL avatar Jun 09 '22 06:06 JanuszL

Hi@JanuszL We tried using kvikio for data transfer from SSD to GPU and it seems that kvikio.Cufile() also has a large build overhead in write mode ("w"). In read mode ("r") there is no build overhead, but it seems to have much lower read bandwidth than DALI (even with the DALI pipeline batchsize set to 1). After removing the building overhead, my test gds read from SSD to GPU(3080Ti) results are shown in the table below, are the results of this experiment accurate?

Block size(MB) Kvikio_Cufile Bandwidth(MB/s) DALI Bandwidth (MB/s) pipeline=1 DALI Bandwidth(MB/s) pipeline=best
1568 1590.909091 7720.33481 8371.596369
784 1549.407115 7619.047619 8276.153278
392 1584.478577 6555.183946 8202.552835
196 1542.09284 4708.143166 8112.582781
98 1517.027864 3110.12377 8536.585366
49 1585.760518 1749.375223 8044.65605
24.5 1531.25 989.4991922 8288.227334
12.25 1346.153846 535.1681957 8448.275862
6.125 1177.884615 510.8423686 7862.644416
3.0625 995.2876178 279.9360146 8019.114952

Weigaa avatar Jun 11 '22 11:06 Weigaa

Hi @Weigaa,

Regarding the measurements, I would check profiles using Nsight Systems profiler to make sure that you measure not only the time taken by the issue of the copy on the CPU but the whole transfer till the end. If you could share the profiles it would make the comparison much easier.

JanuszL avatar Jun 12 '22 20:06 JanuszL

@JanuszL We are not currently using the Nsight System profiler and it will take us some time to learn and try out the tool, if you could help us with testing then I would be very grateful. To ensure that our time statistics are accurate, we add the torch.cuda.synchronize() statement before using the time.time() function. The code we used to test DALI you can find in the previous communication, below is the code we used to test kvikio for your reference.

device=torch.device('cuda:0')
testmodelbsz = 512
N = 30
average_save_tensor = 0
average_save_numpy = 0
#
f = kvikio.CuFile('Inputnumpy.npy', "w")
#使用torch.save()写张量到SSD
for i in range(1):
    Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
    # Inputimages = torch.randn(256, 56, 56)
    # path = 'Inputtensor' + str(i) + '.pt'
    # path2 = 'Inputnumpy' + str(i) + '.npy'
    Inputcupy = cupy.asarray(Inputimages)
    torch.cuda.synchronize()
    begin = time.time()
    # f = kvikio.CuFile(path2, "w")
    f.write(Inputcupy)
    # f.close()
    torch.cuda.synchronize()
    end = time.time()
    time2 = end - begin
    if i > 0:
        average_save_numpy += time2
    # os.remove(path2)
    print("numpysave spendtime is", time2)
f.close()
print( "average cupysave spendtime is" , average_save_numpy / (N - 1))

# #使用kvikio读数据时间
averageloadtime = 0
f = kvikio.CuFile('Inputnumpy.npy', "r")
#CPU到GPU传输测试
cupyimages = cupy.asarray(torch.empty(testmodelbsz, 256, 56, 56))
for i in range(N):
    torch.cuda.synchronize()
    # path3 = 'Inputcupy' + str(i)
    begin = time.time()
    Inputimages = cupy.empty_like(cupyimages)
    # f = kvikio.CuFile(path3, "r")
    # Read whole array from file
    f.read(Inputimages)
    Inputtensor = torch.as_tensor(Inputimages, device = device)
    torch.cuda.synchronize()
    end = time.time()
    if i > 0:
        averageloadtime += end - begin
    print("load time is", end - begin)
    # os.remove(path3)
print("average load time is", averageloadtime/(N -1))
os.remove('Inputnumpy.npy')

Weigaa avatar Jun 13 '22 02:06 Weigaa

Hello @Weigaa,

thanks for your insights. Before trying to dig deeper into this issue, did you make sure that your drive is GDS compatible? Each gds installation comes with a checker tool, called gdschecker. If you call gdschecker -P it should tell you what the GDS supported partitions are. Can you please give me the output of that? Also, what kind of storage are you reading from? Is it remote storage (e.g. GPFS or LUSTRE) or local (NVME)? If it is NVME, are you using a software RAID?

For more info, please check the GDS configuration Guide.

Concerning DALI and GDS: I am unfamiliar with the hooks you are using. Are those hooks executed on every fwd pass? If yes, then it seems to rebuild the pipeline at each step which is not recommended. In terms of GDS, this would open and destroy the GDS driver at every single iteration and adds additional overhead to the overall DALI construction overhead.

Please let me know what you think

Best and thanks Thorsten

azrael417 avatar Jun 13 '22 08:06 azrael417

Hi@azrael417 @JanuszL Thank you very much. For your first question, the output of gdschecker -P are: 1655113066258 I'm not sure if this output means that the system is compatible. I'm not sure if this is due to my GPU version not supporting GDS, or not using a Mellanox Connect5 or newer NIC? Could you tell me something useful about that? And my storage read from local NVME SSD without RAID, the SSD information is: 09:00.0 Non-Volatile memory controller: Intel Corporation Optane SSD 900P Series I use mount /dev/nvme0n1 /mnt/optane/ to mount the SSD to my directory.

For your second question, I rewrote the code without using hook to test DALI, you could ignore the code above and just focus on the following: Step 1: create some test files:

import torch
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
import os
import time
import numpy as np
device=torch.device('cuda:0')
batch_size = 32 # to be used in pipelines
dali_extra_dir = os.environ['DALI_EXTRA_PATH']
# data_dir_2d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_2d_slices', 'STU00001')
# data_dir_3d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_3d', 'STU00001')
# data_dir = os.path.join(data_dir_2d, 'SER00001')
# # Listing all *.npy files in data_dir
data_dir = '.'
files = sorted([f for f in os.listdir(data_dir) if '.npy' in f])
testmodelbsz = 32
# files  = files[0:5]
N = 10
for i in range(N):
    # Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
    Inputimages = torch.randn(testmodelbsz, 256, 56, 56)
    path2 = 'Inputnumpy' + str(i) + '.npy'
    # save by numpy
    torch.cuda.synchronize()
    begin = time.time()
    Inputnumpy = Inputimages.cpu().numpy()
    np.save(path2, Inputnumpy)
    torch.cuda.synchronize()
    end = time.time()
    time2 = end - begin
    average_save_numpy += time2
    print("numpysave spendtime is", time2)

Step 2: use DALI pipeline load data:

@pipeline_def(batch_size=batch_size, num_threads=8, device_id=0)
def pipe_gds():
    data = fn.readers.numpy(device='gpu', file_root=data_dir, files=files)
    return data
#DALI_Batch Load
average_transfer_pipeline_numpy = 0
p = pipe_gds()
p.build()
for i in range(N):
    torch.cuda.synchronize()
    begin = time.time()
    pipe_out= p.run()
    torch.cuda.synchronize()
    end = time.time()
    time1 = end - begin
    print("numpyload pipeline spendtime is", time1)
    print(len(pipe_out[0]))
    if i > 1:
        average_transfer_pipeline_numpy += time1
print("average numpyload pipeline spendtime is", average_transfer_pipeline_numpy / ((N -2) * batch_size) )

and we use the following pipeline to test no pipeline DALI:

@pipeline_def(batch_size=1, num_threads=8, device_id=0)
def pipe_gds(filename):
    data = fn.readers.numpy(device='gpu', file_root=data_dir, files=filename)
    return data

The calculated times turned out to be consistent with our previous feedback.

Last for the GDS, we often save different Cuda tensors with a different name, so open and destroy the GDS by f = kvikio.CuFile(path2, "w") and f.close() is necessary, because each iteration the path2 is different. We apologise for the previous error in providing GDS to "write" code instead of "read" code. In order to achieve a fair comparison, I modified some code for kvikio.cufile(), and retested the results, the codes are as follows:

device=torch.device('cuda:0')
testmodelbsz = 512
N = 30
average_save_tensor = 0
average_save_numpy = 0
#
f = kvikio.CuFile('Inputnumpy.npy', "w")
#使用torch.save()写张量到SSD
for i in range(1):
    Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
    # Inputimages = torch.randn(256, 56, 56)
    # path = 'Inputtensor' + str(i) + '.pt'
    # path2 = 'Inputnumpy' + str(i) + '.npy'
    Inputcupy = cupy.asarray(Inputimages)
    torch.cuda.synchronize()
    begin = time.time()
    # f = kvikio.CuFile(path2, "w")
    f.write(Inputcupy)
    # f.close()
    torch.cuda.synchronize()
    end = time.time()
    time2 = end - begin
    if i > 0:
        average_save_numpy += time2
    # os.remove(path2)
    print("numpysave spendtime is", time2)
f.close()
print( "average cupysave spendtime is" , average_save_numpy / (N - 1))

# #使用kvikio读数据时间
averageloadtime = 0
f = kvikio.CuFile('Inputnumpy.npy', "r")
#CPU到GPU传输测试
cupyimages = cupy.asarray(torch.empty(testmodelbsz, 256, 56, 56))
for i in range(N):
    torch.cuda.synchronize()
    # path3 = 'Inputcupy' + str(i)
    begin = time.time()
    Inputimages = cupy.empty_like(cupyimages)
    # f = kvikio.CuFile(path3, "r")
    # Read whole array from file
    f.read(Inputimages)
    Inputtensor = torch.as_tensor(Inputimages, device = device)
    torch.cuda.synchronize()
    end = time.time()
    if i > 0:
        averageloadtime += end - begin
    print("load time is", end - begin)
    # os.remove(path3)
print("average load time is", averageloadtime/(N -1))
os.remove('Inputnumpy.npy')

The kvikio_Cufile read bandwidth is:

Block size(MB) Kvikio_Cufile Read Bandwidth(MB/s)
1568 1940.209859
784 1888.701518
392 1930.083703
196 1861.596034
98 1901.804774
49 1772.151899
24.5 1788.321168
12.25 157.6373697
6.125 1447.306238
3.0625 1126.747609

Weigaa avatar Jun 13 '22 10:06 Weigaa

Hello @Weigaa ,

A few things to mention here:

  1. it seems that your installation does not support GDS. When you installed CUDA on the system, did you also install the GDS drivers? In the meantime, I will crosscheck with the GDS product team if Intel Optane memory is supported by GDS.

  2. For general DL workloads, I expect that a custom reader using kvikio will deliver worse performance if it is not properly called in a subprocess, since basically all the IO is serialized with the training. If you are using it inside a pytorch DataLoader, be careful that the subprocess needs to be either spawned or forkserver forked because otherwise the CUDA context will get corrupted. In general, DALI avoids this. In any case, you can experiment with different number of threads. It is not documented but can be set by a module function. Also, kvikio supports async IO. Please check this part, I think it is not documented either. But basically it allows you to fire off IO on a batch and then wait for completion.

  3. So better let's focus on DALI with GDS and without GDS fully serialized, that means without any NN attached to it. What performance do you get when you use the exact same pipeline, once with GDS (i.e. fn.readers.numpy device is gpu) vs the POSIX based pipeline (device is cpu).

  4. While employing 8 threads sounds reasonable, please play around with the numbers of threads. Also, please state register_buffers = True and cache_header_information=True in the reader for best GDS performance.

Best Thorsten

azrael417 avatar Jun 14 '22 07:06 azrael417

Hi @azrael417 , Thanks again, Thorsten. We still have some questions that we would like to get your help with.

  1. We do have GDS installed, we use gdscheck and get a "Platform Verification succeeded", but it seems we are running in "compatibility mode" and not in real GDS. Could you please help us to confirm ① whether the 3080TI GPU supports GPU Direct storage and ② whether a mellanox Connect5 or higher smart network card (NIC) must be installed?
  2. Kvikio indeed has Non-blocking read API, but I can't find the async write API. I will try the kvikio Non-blocking read API later. I agree that DALI can help me solve serial I/O problems and that is exactly why I chose it. Although I use it in the training process and not just when reading input data. A rule of thumb is that I/O speeds are faster when the number of threads is the same as the number of real processor cores on the host, and I will try different numbers of threads to enrich my results.
  3. We will add and report on the results of the tests using device='cpu' as soon as possible.
  4. Our Intel(R) Xeon(R) CPU E5-2680 v4 server has 28 physical cores and 56 logical cores, and we increased the test thread number range from 2 to 128 in powers of 2 as well as testing 28 and 56 separately. And we also add register_buffers = True and cache_header_information=True to the numpy reader. In order to make our experiment more meaningful, I hope to get a response to question 1 as soon as possible, thank you very much.

Best Jia Wei

Weigaa avatar Jun 14 '22 07:06 Weigaa

@azrael417 @JanuszL We have done more experiments on DALI, comparing device='cpu' and decive='gpu' under the same experimental environment and pipeline settings, and the results are shown in the table below.

In addition, we re-checked the GDS development documentation and we found that GDS does not seem to support the 3080TI GPU, so we used the P100 for the following test experiments with different threads. Unfortunately, even though we reinstalled the CUDA and GDS drivers on the P100, we found that all devices, including NVME, were in the "unsupported" state via gdscheck. Therefore, the following experiments are still conducted in gds-compatible mode.

Block size(MB) Kvikio_Cufile Bandwidth(MB/s) DALI Bandwidth (MB/s) pipeline=1 DALI Bandwidth(MB/s) pipeline=best DALI Bandwidth CPU(MB/s) pipeline=1 DALI Bandwidth CPU(MB/s) pipeline=best
1568 1940.209859 7720.33481 8371.596369 3438.596491 5915.864931
784 1888.701518 7619.047619 8276.153278 3684.210526 7468.08916
392 1930.083703 6555.183946 8202.552835 4288.840263 7013.777062
196 1861.596034 4708.143166 8112.582781 4458.598726 7486.631016
98 1901.804774 3110.12377 8536.585366 4322.893692 6712.328767
49 1772.151899 1749.375223 8044.65605 4313.380282 7059.501513
24.5 1788.321168 989.4991922 8288.227334 2793.296089 6193.124368
12.25 157.6373697 535.1681957 8448.275862 3134.595701 6702.412869
6.125 1447.306238 510.8423686 7862.644416 4316.420014 7036.1861
3.0625 1126.747609 279.9360146 8019.114952 3835.796593 6564.844587

In addition, we re-checked the GDS development documentation and we found that GDS does not seem to support the 3080TI GPU, so we used the P100 for the following test experiments with different threads. Unfortunately, even though we reinstalled the CUDA and GDS drivers on the P100 GPU, we found that all devices, including NVME, were in the "unsupported" state via gdscheck. Therefore, the following experiments are still conducted in gds-compatible mode. We have reported our installation problems here https://github.com/NVIDIA/DALI/issues/4009.

The results of the different threads are shown below. "with optimization" means that register_buffers = True and cache_header_information = True have been added to the numpy reader, while the opposite is true. "no optimization" does not add the above statements. The experimental results show that the difference between the two is not significant. When the data blocks are small, it seems to be more efficient to use a smaller number of threads.

num_threads datablock_size Batchsize Bandwidth (no optimization) Bandwidth (with optimization)
1 12.25 256 5197.284684 5223.880597
2 12.25 256 8832.011536 8944.21729
4 12.25 256 9960.159363 9974.757756
8 12.25 256 8897.443347 9016.634771
16 12.25 256 8433.73494 8434.315616
32 12.25 256 7949.899409 7966.443389
64 12.25 256 7642.874969 7699.560025
128 12.25 256 7117.789244 6995.203289
28 12.25 256 8115.269957 8335.601524
56 12.25 256 7608.695652 7660.08004
256 12.25 256 6334.022751 6963.788301

We've done all we can, but the biggest challenge at the moment seems to be that we don't know how to tune our hardware and software configurations to get our NVMe SSD and P100 GPU to support GDS?

Weigaa avatar Jun 27 '22 04:06 Weigaa

Hello Weigaa,

sorry for the late reply. I am trying to figure that out with the GDS developers. So good news is, P100 is supported. Therefore, the reason why you are seeing this unsupported messages is a different one.

There are several issues here, let's go step by step. Let's debug the setup issue at the other ticket, 4009 and let's keep this to the DALI/GDS interaction itself. So, the perf you shared above for DALI GPU and CPU does not look so. bad, does it? What is the peak bandwidth of your drive you used for that test? 8GB/sec looks reasonable to me. Is that single GPU or aggregate?

It seems that the GDS version performs better than the POSIX one, even in compat mode, do I read that correctly? So that is good news. It likely has to do with the fact that the parallelization in the GDS code branch is a bit more aggressive. Therefore, perf speedups can also be due to the different parallelization scheme. In any case, it would be good to know what bandwidth you expect from the drive per employed GPU.

Best regards Thorsten

azrael417 avatar Jun 28 '22 14:06 azrael417

Hi @azrael417 , Thank you for your reply. We found that the reason for the NVMe unsupported error may be due to the linux kernel version. We found that the GDS development documentation requires a linux kernel of 5.4.x. Therefore, we previously used 5.4.120 but it appeared unsupported. However, when we try to use the linux 5.4.70 kernel, as shown in Figure 1 below, NVMe seems to be supported, but our Mellanox Peerdirect is still disable, causing an error when we run our program after we turn off compatibility mode, with the error result shown in Figure 2. We would be grateful if you could help us to resolve this issue or ask the GDS team for help. res1 Fig.1 res2 Fig.2

Let's go back to DALI itself, our PCIe drives (NVMe SSD and P100) are mounted on PCIe 3.0 × 16 with a peak bandwidth of 16GB/s and we are only using one GPU.

Yes, transferring data to the GPU using DALI.GDS can sometimes be faster than transferring to the CPU using DALI.CPU, even if we are using GDS-compatible mode. However, in GDS compatibility mode we have higher CPU utilisation (about the same as POSIX reads to the CPU) and lower CPU and DRAM utilisation is what we want to use GDS for, which we don't do in compatibility mode. We would like our GPU drive to reach close to the full bandwidth of PCIe 3.0 x16 (16GB/s) and CPU utilisation should be low as GPUDirect-Fig-7 said.

Look forward to your reply!

Best regards, Jia Wei

Weigaa avatar Jun 29 '22 03:06 Weigaa

Hi, @azrael417 After communicating with the GDS engineer via email, our GDS environment now appears to be working properly. This is shown in the figure below. 1656561501473 However, when we turned off compatibility mode and tested our program, the previous error reappeared.

(wjpytorch) root@nuosen:/mnt/optane/wjtest# python testGDS.py
Traceback (most recent call last):
  File "/mnt/optane/wjtest/testGDS.py", line 186, in <module>
    pipe_out= p.run()
  File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/pipeline.py", line 980, in run
    return self.outputs()
  File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/pipeline.py", line 879, in outputs
    return self._outputs()
  File "/root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/pipeline.py", line 963, in _outputs
    return self._pipe.Outputs()
RuntimeError: Critical error in pipeline:
Error when executing GPU operator readers__Numpy encountered:
Error in thread 0: [/opt/dali/dali/util/std_cufile.cc:73] CUFile import failed: ./Inputnumpy.npy. GPUDirect Storage not supported on current file.
Stacktrace (9 entries):
[frame 0]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x83f5f) [0x7f9be77a2f5f]
[frame 1]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x1b44ef) [0x7f9be78d34ef]
[frame 2]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x1b4f87) [0x7f9be78d3f87]
[frame 3]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(dali::CUFileStream::Open(std::string const&, bool, bool)+0xbf) [0x7f9be78d29ef]
[frame 4]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali_operators.so(+0x2ddc749) [0x7f9bca232749]
[frame 5]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(dali::ThreadPool::ThreadMain(int, int, bool)+0x1fe) [0x7f9be787c9de]
[frame 6]: /root/anaconda3/envs/wjpytorch/lib/python3.9/site-packages/nvidia/dali/libdali.so(+0x721d9f) [0x7f9be7e40d9f]
[frame 7]: /lib/x86_64-linux-gnu/libpthread.so.0(+0x76db) [0x7f9cbfaf06db]
[frame 8]: /lib/x86_64-linux-gnu/libc.so.6(clone+0x3f) [0x7f9cbf81961f]

When we turn on compatibility mode, the program runs, but it doesn't run faster than before, it doesn't use less CPU and memory, and GDS doesn't seem to take effect. I would like to know if this is a DALI problem or a GDS problem? Best regards, Jia Wei

Weigaa avatar Jun 30 '22 04:06 Weigaa

Hello @Weigaa,

so this is a GDS issue: inside DALI, I am printing the CUFILE error I receive. It says:

Error in thread 0: [/opt/dali/dali/util/std_cufile.cc:73] CUFile import failed: ./Inputnumpy.npy. GPUDirect Storage not supported on current file.

So that means it tries to load from unsupported storage. Now I am wondering why it thinks that. The files are all on the supported NVME drive, right? What I would do to check is to run gdsio in write mode on the drive in question and see what you get. For examples check the documentation page. It looks like your system config is in some kind of weird spot in terms of compatibility which wasn't tested I believe (you mentioned that your previous kernel should have been supported but it wasn't). GDSIO will tell you at least if DALI-GDS is supposed to work.

Best Thorsten

azrael417 avatar Jun 30 '22 06:06 azrael417

Please make sure that you point it to an empty directory on the NVME drive. I recommend to create some dummy files there or let gdsio write some files there which you then read back in with gdsio as well. If that works, then we take it from there. Are you running DALI inside the container? If yes, it might make sense to use strace to check which libcufile DALI dlopens.

azrael417 avatar Jun 30 '22 06:06 azrael417

Hi, @azrael417 The files are all on the Intel Optane SSD which I mentioned before, I think it is a supported NVME drive. I run gdsio to read file by GDS(Turn off compatibility mode), and the same error occurs:

(base) root@nuosen:/usr/local/cuda-11.7/gds/tools# ./gdsio -f /mnt/optane/test10G -d 0 -n 0 -w 1 -s 10G -x 0 -I 1 -T 10 -i 4096K
file register error: GPUDirect Storage not supported on current file filename :/mnt/optane/test10G

I am not running DALI on containers but on bare metal.

I checked the documentation for this error and it seems to be caused by the file system or the NVMe device that I mounted. However, I think they are both suitable, so I am very confused about the results that I got.

Weigaa avatar Jun 30 '22 07:06 Weigaa

In addition, we found a very strange phenomenon. Even with the error message, when we use gdsio to do a "write" operation, we still find that a file of the specified size is successfully written. Even writing a file by gdsio(GDS mode) to a folder mounted on hdd can be done successfully although with the "file register error".

Weigaa avatar Jun 30 '22 09:06 Weigaa

@azrael417 We discovered the cause of our error, we had previously mounted our optane SSD using the following statement. mount /dev/nvme0n1 /mnt/optane Later, we modified the mount to the following statement. mount -t ext4 -o data=ordered /dev/nvme0n1 /mnt/optane We specified the data file mode for ext4, and now the shutdown compatibility mode program can be executed as well. This is something I didn't find in any of the GDS user manuals, and I would suggest that subsequent GDS teams could add this to the installation manual (as it did take us a lot of time to find, and hopefully future researchers can avoid it). However, we found that it seems to be slower in DALI with GDS than in GDS compatibility mode with a larger batchsize. We will repeat our previous experiments in GDS non-compatible mode to verify this.

Weigaa avatar Jun 30 '22 10:06 Weigaa

Hi, @azrael417 The results of our tests in real GDS are shown in the table below.

Block size(MB) Kvikio_Cufile Bandwidth(MB/s) DALI Bandwidth (MB/s) pipeline=1 DALI Bandwidth(MB/s) pipeline=best DALI Bandwidth CPU(MB/s) pipeline=1 DALI Bandwidth CPU(MB/s) pipeline=best DALI Bandwidth (MB/s)  no_comp pipeline=1 DALI Bandwidth(MB/s)  no_comp pipeline=best
1568 1940.209859 7720.33481 8371.596369 3438.596491 5915.864931 2538.396896 2541.87535
784 1888.701518 7619.047619 8276.153278 3684.210526 7468.08916 2547.59873 2555.52564
392 1930.083703 6555.183946 8202.552835 4288.840263 7013.777062 2544.852704 2559.371544
196 1861.596034 4708.143166 8112.582781 4458.598726 7486.631016 2523.756792 2560.137098
98 1901.804774 3110.12377 8536.585366 4322.893692 6712.328767 2486.047692 2558.593087
49 1772.151899 1749.375223 8044.65605 4313.380282 7059.501513 2405.498282 2532.493441
24.5 1788.321168 989.4991922 8288.227334 2793.296089 6193.124368 2281.829189 2521.473693
12.25 157.6373697 535.1681957 8448.275862 3134.595701 6702.412869 2114.402099 2529.805236
6.125 1447.306238 510.8423686 7862.644416 4316.420014 7036.1861 1707.983603 2490.242316
3.0625 1126.747609 279.9360146 8019.114952 3835.796593 6564.844587 1581.004202 2417.508683
num_threads datablock_size Batchsize no_comp  Bandwidth (no optimization) no_comp Bandwidth (with optimization)
1 12.25 256 1656.591883 1886.066205
2 12.25 256 1948.930077 2244.206284
4 12.25 256 1981.928052 2269.443106
8 12.25 256 1989.5407 2260.314414
16 12.25 256 1997.456301 2264.409035
32 12.25 256 2024.458767 2261.040337
64 12.25 256 2089.195873 2246.346249
128 12.25 256 2107.889529 2237.900721
28 12.25 256 2139.868813 2222.746407
56 12.25 256 2153.6076 2206.51146
256 12.25 256 2167.903699 2186.890795

Weigaa avatar Jul 01 '22 14:07 Weigaa

Hello @Weigaa,

I apologize for the silence but I was/am on vacation till July 15. What I read from the table is that GDS compat mode is faster than GDS native, is that correct?

The DALI backend for compat vs non compat is the same. The DALI backend for the CPU based numpy loader has different threading, so there is no surprise to me that the GDS one can utilize threads better especially for larger block sizes (the CPU version does inter sample parallelization, the GDS one inter as well as intra sample parallelization). When it comes to native vs compat mode, this is a bit more complicated. Is that pipeline running in IO only mode still or are you running training besides it?

Best Thorsten

azrael417 avatar Jul 15 '22 05:07 azrael417

Hi @azrael417 , Thank you. Have a great vacation! When running GDS compatibility mode, we uninstalled nvidia_fs with the following command(modprobe -r nvidia_fs) to force GDS to use the POSIX interface of linux, and NVME shows "unsupported". When running native mode, we introduced nvidia_fs normally to make GDS work in non-compatible mode. Our experimental results are achieved with the above settings. This means that when GDS uses Linux's POSIX interface, it can parallelize very well. When GDS uses Cufile() in true, non-compatible mode(native mode), even an increase in pipeline and threads seems to have some effect on I/O performance, but IO performance is still significantly inferior to compatibility mode. This means that the compatibility mode used to enable non-GDS-enabled devices to use the cufile() interface transfers faster than the true GDS mode. This result looks very strange. And I ran that pipeline in IO only mode without running any other training.

Weigaa avatar Jul 15 '22 06:07 Weigaa

Hello @Weigaa,

What you did sounds correct. In more recent GDS versions, there is a simpler method to enforce compat mode but your solution works. I think this is an issue for the GDS engineers, I will point them to this thread.

Another thing we need to watch out for is data caching. How big is your file cache and how big is the overall dataset. It is artificial afaics, which means it is likely small, is that correct? So this means, it might get cached into DRAM and then POSIX can read it from there and will be faster than GDS. I am not 100% certain if compat mode uses O_DIRECT (in which case it won't get cached), but I am certain that data in native mode does not get cached. We should run a test to see if that could be an issue. The following two methods should work:

  1. create a big enough dataset so that it does not fit into file cache. This could be multiple TB in some cases, I do not recommend this solution.
  2. Flush the cache after writing the files. What I would do is: a) create all the artificial files, say a total of 2 GB or so. After creating those files, they likely reside in file cache. b) build the pipeline and run a few warmup iterations, stuff will be loaded from file cache but all buffers etc are created which is what we want. c) flush the file system caches echo 3 > /proc/sys/vm/drop_caches. free -m should show you that the file cache is (almost) empty. Run the free command before and after flushing to verify that indeed something changed. d) run a single iteration over the full dataset and time it. Compute average bandwidth in GB/s as well as samples / sec from that. Do not run more than one iteration or otherwise the data could come from file cache in POSIX case and from disk in GDS case (I need to verify the former still). Do this exercise for native and compat mode.

Please let me know what you get.

Best Thorsten

azrael417 avatar Jul 18 '22 06:07 azrael417

Hi @Weigaa,

In addition to caching of data in sys memory, GDS p2p can be slower on some platforms when the GPU and storage device are not under PCIe switch. please see the GDS design guide for recommended config and determine if the storage device is close to the GPU https://docs.nvidia.com/gpudirect-storage/configuration-guide/index.html#benchmarking-gds

If your organization has Nvidia enterprise support, please file a NVBug.

KiranModukuri avatar Jul 20 '22 16:07 KiranModukuri

Hi, @KiranModukuri Thank u. Since I was only using a PCIe switch, a GPU card and an NVMe device, I made sure they were under the same PCIe switch. The output is shown below. And sorry I'm not sure what you mean about filing an NVBug(for what)? Now I focus on the caching problem.

(base) root@nuosen:~# lspci -tv | egrep -i "nvidia | optane"
             +-02.0-[02-05]----00.0-[03-05]--+-08.0-[04]----00.0  NVIDIA Corporation GP100GL [Tesla P100 PCIe 16GB]
             |                               \-10.0-[09]----00.0  Intel Corporation Optane SSD 900P Series

Weigaa avatar Jul 21 '22 03:07 Weigaa

Hello @Weigaa,

can you please hand us a simple reproducer for this problem. I lost track of what the latest code is, it would be good if you can just drop a Python script which we can look at.

Best and thanks Thorsten

azrael417 avatar Jul 21 '22 11:07 azrael417

Hi @azrael417 , No problem! You may need to have a Pytorch (v1.11.0 is best) to run the script, we write some files firstly, then use DALI to read them. You do not need to modify the code whenever running in compatibility mode or native (p2p) mode. P.S. Comments are used to read and write files in other ways and you can choose to ignore them. Code:

import torch
from nvidia.dali import pipeline_def, fn
import nvidia.dali.plugin.pytorch
import os
import time
# misc python stuff
import numpy as np
from glob import glob
import shutil
import tempfile
import cupy
import kvikio
# visualization
from PIL import Image

# def plot_batch(np_arrays, nsamples=None):
#     if nsamples is None:
#         nsamples = len(np_arrays)
#     fig, axvec = plt.subplots(nrows=1, ncols=nsamples, figsize=(10, 10 * nsamples))
#     for i in range(nsamples):
#         ax = axvec[i]
#         ax.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False)
#         ax.imshow(Image.fromarray(np_arrays[i]))
#     plt.tight_layout()
device=torch.device('cuda:0')
batch_size = 1024 # to be used in pipelines
dali_extra_dir = os.environ['DALI_EXTRA_PATH']
# data_dir_2d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_2d_slices', 'STU00001')
# data_dir_3d = os.path.join(dali_extra_dir, 'db', '3D', 'MRI', 'Knee', 'npy_3d', 'STU00001')
# data_dir = os.path.join(data_dir_2d, 'SER00001')
# # Listing all *.npy files in data_dir
# data_dir = '../..'
data_dir = '.'
files = sorted([f for f in os.listdir(data_dir) if '.npy' in f])
# files = sorted([f for f in os.listdir(data_dir) if 'Inputcupy' in f])
testmodelbsz = 1
# files  = files[0:5]

@pipeline_def(batch_size=batch_size, num_threads=4, device_id=0)
def pipe_gds():
    data = fn.readers.numpy(device='gpu', file_root=data_dir, files=files, register_buffers = True, cache_header_information=True)
    return data

# def pipe_gds():
#     data = fn.readers.numpy(device='gpu', file_root=data_dir, files=files)
#     return data

#DALI_CPU_Batch
# def pipe_gds():
#     data = fn.readers.numpy(device='cpu', file_root=data_dir, files=files)
#     return data

# def run(p):
#     p.build()  # build the pipeline
#     outputs = p.run()  # Run once
#     # Getting the batch as a list of numpy arrays, for displaying
#     batch = [np.array(outputs[0][s]) for s in range(batch_size)]
#     return batch

# print(files)
# def pipe_gds(filename):
#     data = fn.readers.numpy(device='gpu', file_root=data_dir, files=filename, register_buffers = True, cache_header_information=True)
#     return data

N = 50
average_save_tensor = 0
average_save_numpy = 0
average_save_cupy = 0
#使用torch.save()写张量到SSD
for i in range(N):
    Inputimages = torch.randn(testmodelbsz, 256, 56, 56).cuda()
    # Inputimages = torch.randn(testmodelbsz, 256, 56, 56)
    path = 'Inputtensor' + str(i) + '.pt'
    path2 = 'Inputnumpy' + str(i) + '.npy'
    path3 = 'Inputcupy'+ str(i)
    # Inputimages = torch.randn(512, 256, 56, 56)
    # path = 'Inputtensor.pt'
    # path2 = 'Inputnumpy.npy'
    #save by torch.save()
    # torch.cuda.synchronize()
    # begin = time.time()
    # torch.save(Inputimages, path)
    # torch.cuda.synchronize()
    # end = time.time()
    # time1 = end -begin
    # average_save_tensor += time1
    # print("torchsave spendtime is", time1)
    #save by GDS
    # torch.cuda.synchronize()
    # begin = time.time()
    # Inputcupy = cupy.asarray(Inputimages)
    # f = kvikio.CuFile(path3, "w")
    # f.write(Inputcupy)
    # f.close()
    # torch.cuda.synchronize()
    # end = time.time()
    # time3 = end - begin
    # print("cupysave spendtime is", time3)
    # if i > 0:
    #     average_save_cupy += time3
    #save by numpy
    torch.cuda.synchronize()
    begin = time.time()
    Inputnumpy = Inputimages.cpu().numpy()
    end = time.time()
    print("transfer time is", end - begin)
    torch.cuda.synchronize()
    begin = time.time()
    np.save(path2, Inputnumpy)
    torch.cuda.synchronize()
    end = time.time()
    time2 = end - begin
    average_save_numpy += time2
    print("numpysave spendtime is", time2)
    # os.remove(path)
    # os.remove(path2)
    # os.remove(path3)
print("average tensorsave spendtime is", average_save_tensor / N, "average numpysave spendtime is" , average_save_numpy / N,"average cupysave spendtime is" , average_save_cupy / (N -1))


average_load_tensor = 0
average_load_numpy = 0
average_transfer_numpy = 0
Inputimages = torch.empty(testmodelbsz, 256, 56, 56).to(device)
# for i in range(N):
#     path = 'Inputtensor' + str(i) + '.pt'
#     path2 = 'Inputnumpy' + str(i) + '.npy'
#     # # 使用torch.load()读到GPU的时间
#     # # path = 'Inputtensor.pt'
#     # torch.cuda.synchronize()
#     # begin = time.time()
#     # Inputimages = torch.load(path, map_location=lambda storage, loc: storage.cuda(0))
#     # torch.cuda.synchronize()
#     # end = time.time()
#     # time1 = end - begin
#     # average_load_tensor += time1
#     # os.remove(path)
#     # print("torch.load spendtime is", time1)
#     # #使用DALi读到GPU的时间
#     p = pipe_gds(filename=path2)
#     p.build()
#     torch.cuda.synchronize()
#     begin = time.time()
#     pipe_out = p.run()
#     torch.cuda.synchronize()
#     end = time.time()
#     time1 = end - begin
#     # print("numpyload spendtime is", time1)
#     # print(pipe_out[0][0])
#     torch.cuda.synchronize()
#     begin = time.time()
#     nvidia.dali.plugin.pytorch.feed_ndarray(pipe_out[0][0], Inputimages)
#     torch.cuda.synchronize()
#     end= time.time()
#     time2 = end - begin
#     # print("transfer time is", time2)
#     time3 = time1 + time2
#     if i > 1:
#         average_load_numpy += time3
#         average_transfer_numpy += time2
#     # print("load time", time1)
#     # print("transfer time", time2)
#     os.remove(path2)
#     print("total gdsload time",time3)
# print("average tensorload spendtime is", average_load_tensor / N , "average numpyload spendtime is" , average_load_numpy / (N - 2),"average transfer spendtime is" , average_transfer_numpy / (N - 2))

#DALI_Batch Load
average_transfer_pipeline_numpy = 0
p = pipe_gds()
p.build()
for i in range(N):
    torch.cuda.synchronize()
    begin = time.time()
    pipe_out= p.run()
    torch.cuda.synchronize()
    end = time.time()
    time1 = end - begin
    print("numpyload pipeline spendtime is", time1)
    print(len(pipe_out[0]))
    if i > 1:
        average_transfer_pipeline_numpy += time1
print("average numpyload pipeline spendtime is", average_transfer_pipeline_numpy / ((N -2) * batch_size))

# #DALI_Batch Load_CPU
# average_transfer_pipeline_numpy = 0
# p = pipe_gds()
# p.build()
# for i in range(N):
#     torch.cuda.synchronize()
#     begin = time.time()
#     pipe_out= p.run()
#     torch.cuda.synchronize()
#     end = time.time()
#     time1 = end - begin
#     print("numpyload pipeline spendtime is", time1)
#     print(len(pipe_out[0]))
#     if i > 1:
#         average_transfer_pipeline_numpy += time1
# print("average numpyload pipeline spendtime is", average_transfer_pipeline_numpy / ((N -2) * batch_size))

# averagemovetime = 0
# #CPU到GPU传输测试
# for i in range(N):
#     Inputimages = torch.zeros(testmodelbsz, 256, 56, 56)
#     torch.cuda.synchronize()
#     begin = time.time()
#     Inputimages.to(device)
#     torch.cuda.synchronize()
#     end = time.time()
#     if i > 0:
#         averagemovetime += end - begin
#     print("move time is", end - begin)
# print("average move time is", averagemovetime/(N -1 ))

# #使用Numpy读数据时间
# average_load_numpy = 0
# for i in range(N):
#     path = 'Inputnumpy' + str(i) + '.npy'
#     # 使用np.load()读到GPU的时间
#     torch.cuda.synchronize()
#     begin = time.time()
#     Inputimages = torch.from_numpy(np.load(path)).to(device)
#     torch.cuda.synchronize()
#     end = time.time()
#     time1 = end - begin
#     if i > 1:
#         average_load_numpy += time1
#     print("numpy.load spendtime is", time1)
#     os.remove(path)
# print("average numpyload spendtime is" , average_load_numpy / (N - 2))

#使用kvikio读数据时间
# averageloadtime = 0
# #CPU到GPU传输测试
# cupyimages = cupy.asarray(torch.empty(testmodelbsz, 256, 56, 56))
# for i in range(N):
#     torch.cuda.synchronize()
#     path3 = 'Inputcupy' + str(i)
#     begin = time.time()
#     Inputimages = cupy.empty_like(cupyimages)
#     f = kvikio.CuFile(path3, "r")
#     # Read whole array from file
#     f.read(Inputimages)
#     Inputtensor = torch.as_tensor(Inputimages, device = device)
#     torch.cuda.synchronize()
#     end = time.time()
#     if i > 0:
#         averageloadtime += end - begin
#     print("load time is", end - begin)
#     os.remove(path3)
# print("average load time is", averageloadtime/(N -1))



# data_gds = pipe_out[0].as_cpu().as_array()  # as_cpu() to copy the data back to CPU memory
# print(data_gds.shape)

Weigaa avatar Jul 21 '22 12:07 Weigaa

Hi @azrael417 , I am sorry that it took me longer to complete the experiment as the equipment in our lab is shared by multiple people. I generated 64 blocks of size 49MB, to make them a total size of about 3GB. For each test I use a DALI reader with a batchsize of 4 to read the data 16 times, ensuring that each data is only read once. The result of clearing the cache is shown below.


              total        used        free      shared  buff/cache   available
Mem:          64287        2893       61033           9         360       60862
Swap:          2047           0        2047

We test the native and compat mode both on cache and non_cache, the test results are as follows:

datasize number totalsize batch_size native_cache(GB/s) native_nocache(GB/s) compat_cache(GB/s) compat_nocache(GB/s) native_cache(samples/s) native_nocache(samples/s) compat_cache(samples/s) compat_nocache(samples/s)
49 64 3136 4 2.04319225 2.046687874 7.530935238 1.965963948 2732.707088 2737.382378 10072.39534 2629.416598

The experimental results seem to indicate that the fast reads and writes in compatibility mode come from the use of cache. However, my PCIe bandwidth could achieve almost 16GB/s, but even using native GDS could get almost 2.1GB/s, why the bandwidth is so slow and how could I get higher bandwidth like GDS developer technical blog

Weigaa avatar Jul 21 '22 16:07 Weigaa