Designing my video-based pipeline
Describe the question.
I'm very interested in using DALI for my model training. My simple benchmarks show great speed-ups over OpenCV when loading directly to the GPU.
However, I'd like some insight, from the developer/maintainer/community, on how I can adapt my current data loading paradigm to DALI. I believe I can adapt everything except for two aspects.
First, the datasets I am working with do not have consistent frame rates. They can range from 20 to 60 FPS. Currently, I control for this when I sample/read frames to make sure the "best" frame is chosen for a given scenario. With rn.readers.video having constant stride param complicates this.
Second, I would like the flexibility to sample at different rates for different segments of frame sequence. For example, I would like the ability to sample at 1 FPS for X frames at offset 0, then 2 FPS for Y frames at offset X for a total of X + Y frames in the resulting tensor.
Third, some datasets are stored as images rather than videos I'll need to support both.
Lastly, I'd like to keep the PyTorch DataLoader and Sampler interface, at the top level. (This one I am willing to budge if the other challenges can be solved.) It seems there are ways to do this but they may not be compatible with my demands above.
-- TLDR: I'd like explicit control over that frames are included to the sequence when loading directly to the GPU, and I want to control over batching, sampling, distribution across devices.
Looking forward to any suggestions, thank you!
Check for duplicates
- [x] I have searched the open bugs/issues and have found no duplicates for this bug report
Hi @sdpetrides,
Your use case is a great fit for our new experimental feature: DALI proxy.
With DALI proxy:
Run a DALI server on the main process and keep your PyTorch DataLoader. Use the DALI proxy as a transform function to send data to the DALI server and return a reference to the future processed data. In the main loop, replace those references with the actual data. You can find more information on DALI proxy here. This feature is new, so please let us know if you encounter any issues or have any questions.
Additionally, you mentioned the need to control which video frames are decoded. We recently added support for this in our experimental video decoder operator. You can find the documentation here. You can use the frames argument to specify a collection of frame indices to decode or use start, stride, and end to decode evenly spaced frames.
In your DataLoader, read the raw encoded video file and send it to the decoder operator via DALI proxy.
I hope you find these links helpful. Please let us know if you need further assistance.
Best regards, Joaquin
Thank you for the response! I'll give this a try and report back.
I revisited this library and have found some success.
# Load 5 large video files as buffers
# Create examples as explicit frame sequences (kept simple for now)
dali_server_examples = []
for _, path in list(train_paths)[:5]:
buffer = np.frombuffer(open(path, 'rb').read(), dtype=np.uint8)
for i in range(12):
dali_server_examples.append(
(
(
buffer,
[(i + 1) * 30 * j for j in range(16)]
),
random.randint(0, 12)
)
)
class DaliClipDataset(Dataset):
def __init__(self, dali_server, examples):
self.dali_server = dali_server
self.examples = examples
def __getitem__(self, idx):
(video_bytes, frames), target = self.examples[idx]
frames = np.array(frames, dtype=np.int32)
data = self.dali_server.proxy(video_bytes=video_bytes, frames=frames)
return data, target
def __len__(self):
return len(self.examples)
def test_decoder():
mp.set_start_method("spawn", force=True)
batch_size = 16
num_threads = 12
@pipeline_def
def video_pipe_exp():
encoded_video_bytes = fn.external_source(name='video_bytes', device="cpu", dtype=types.UINT8, ndim=1, no_copy=True)
frames = fn.external_source(name='frames', device="cpu", dtype=types.INT32, ndim=1, no_copy=True)
# TODO: initial crop & scale
video = fn.experimental.decoders.video(encoded_video_bytes, device="mixed", frames=frames)
# Scale to [0, 1]
video = video / 255.0
# Resize frames to 224x224
video = fn.resize(video, resize_x=224, resize_y=224)
# Permute to [F, C, H, W]
video = fn.transpose(video, perm=[0, 3, 1, 2]) # HWC → CHW per frame
return video
pipe = video_pipe_exp(
batch_size=batch_size,
num_threads=num_threads,
device_id=0,
output_dtype=types.FLOAT,
output_ndim=4
)
try:
dali_server = dali_proxy.DALIServer(pipe)
dali_server.start_thread()
print("dataset init begins now...")
dataset = DaliClipDataset(dali_server, dali_server_examples)
print("dataset complete")
loader = DataLoader(
dataset,
batch_size=batch_size,
num_workers=0,
)
for data, target in loader:
print('before produce_data')
data = dali_server.produce_data(data)
print('after produce_data', data.shape, data.device)
except RuntimeError as e:
raise e
except KeyboardInterrupt as e:
raise e
finally:
dali_server.stop_thread()
During batch loading, usually on the second iteration, I run into this RuntimeError.
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
Traceback (most recent call last):
...
File "envs/vit/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py", line 601, in _produce_data_impl
cache[pipe_run_ref_id] = self._get_outputs(obj.pipe_run_ref)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "envs/vit/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py", line 552, in _get_outputs
raise err
File "envs/vit/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py", line 729, in _thread_fn
pipe_outputs = self._pipe.outputs()
^^^^^^^^^^^^^^^^^^^^
File "envs/vit/lib/python3.11/site-packages/nvidia/dali/pipeline.py", line 1291, in outputs
return self._outputs(cuda_stream)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "envs/vit/lib/python3.11/site-packages/nvidia/dali/pipeline.py", line 1401, in _outputs
return self._pipe.Outputs(types._raw_cuda_stream_ptr(cuda_stream))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Critical error in pipeline:
Error in MIXED operator `nvidia.dali.fn.experimental.decoders.video`,
which was used in the pipeline definition with the following traceback:
File ".../test/test_loader_benchmark.py", line 328, in video_pipe_exp
video = fn.experimental.decoders.video(encoded_video_bytes, device="mixed", frames=frames)
encountered:
Error in thread 2: Assert on "frames_decoders_[s]->IsValid()" failed: Failed to create video decoder for "memory file"
Current pipeline object is no longer valid.
I am not sure what is happening to the buffer/pipeline. Looking for guidance.
Hi @sdpetrides,
Thank you for checking this out. Does it happen for all your video files or only some? Does it repro with any of the files from https://github.com/NVIDIA/DALI_extra/tree/main/db/video?
I used the ffmpeg command to create cfr_test-like videos of various durations. I am not running into the Failed to create video decoder error any longer.
For my original dataset of videos, I have found this error is stochastic: for a particular video it may or not be raised. Probing the videos, there doesn't seem to be an issue decoding the frames.
Using the cfr_test-like videos, I am finding some curious results with my benchmarking.
fn.experimental.decoders.video device |
transfer to GPU before transformations | CPU utilization | results |
|---|---|---|---|
| mixed | no (already on CUDA) | up to 100% | 16s to load a batch |
| cpu | yes | up to 100% for each example in batch | 12s to load a batch |
I am finding that it is faster to decode on the CPU and transfer to the GPU than to decode on the GPU. This gap widens as the videos get longer. Is this expected?
@sdpetrides That is surprising. Is there a chance you could share a reproduction script that we can check?
Sure, here is the full benchmarking script.
import random
import tempfile
import cv2
import pytest
import torch
import numpy as np
from tqdm import tqdm
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms.v2 import Compose, ToImage, ToDtype, Resize
from torchvision.transforms.v2 import functional as F
import torch.multiprocessing as mp
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import DALIGenericIterator
from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy
class DaliClipDataset(Dataset):
def __init__(self, examples, dali_server=None):
self.dali_server = dali_server
self.examples = examples
self.transform = Compose(
[
ToDtype(torch.float32, scale=True),
Resize((224, 224))
],
)
def __getitem__(self, idx):
(video_bytes, frames), (target, path) = self.examples[idx]
if self.dali_server:
frames = np.array(frames, dtype=np.int32)
data = self.dali_server.proxy(video_bytes=video_bytes, frames=frames)
else:
cap = cv2.VideoCapture(path)
if not cap.isOpened():
raise ValueError(f"Unable to open video file: {path}")
clip = []
for frame_position in frames:
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_position)
ret, frame = cap.read()
if not ret:
raise ValueError(f"No frame at position {frame_position}, {path}")
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
clip.append(F.to_image(frame))
cap.release()
data = self.transform(torch.stack(clip))
return data, target
def __len__(self):
return len(self.examples)
video_fps = 60
sequence_length = 16
@pipeline_def
def video_pipe(file_list):
video, label = fn.readers.video(
device='gpu',
file_list=file_list,
file_list_frame_num=True,
sequence_length=sequence_length,
image_type=types.RGB,
dtype=types.FLOAT,
normalized=False,
random_shuffle=False,
stride=video_fps,
step=1, # no-op: each record will be a single clip
tensor_init_bytes=sequence_length * 3 * 1920 * 1080 * 4,
file_list_include_preceding_frame=False,
)
# Scale to [0, 1]
video = video / 255.0
# Resize frames to 224x224
video = fn.resize(video, resize_x=224, resize_y=224)
# Permute to [F, C, H, W]
video = fn.transpose(video, perm=[0, 3, 1, 2]) # HWC → CHW per frame
return video, label
@pipeline_def
def video_pipe_exp_cpu():
encoded_video_bytes = fn.external_source(name='video_bytes', device="cpu", dtype=types.UINT8, ndim=1, no_copy=True)
frames = fn.external_source(name='frames', device="cpu", dtype=types.INT32, ndim=1, no_copy=True)
video = fn.experimental.decoders.video(
encoded_video_bytes,
build_index=True,
device="cpu",
frames=frames
)
# Move to GPU for transformation
video = video.gpu()
# Scale to [0, 1]
video = video / 255.0
# Resize frames to 224x224
video = fn.resize(video, resize_x=224, resize_y=224)
# Permute to [F, C, H, W]
video = fn.transpose(video, perm=[0, 3, 1, 2]) # HWC → CHW per frame
return video
@pipeline_def
def video_pipe_exp_mixed():
encoded_video_bytes = fn.external_source(name='video_bytes', device="cpu", dtype=types.UINT8, ndim=1, no_copy=True)
frames = fn.external_source(name='frames', device="cpu", dtype=types.INT32, ndim=1, no_copy=True)
video = fn.experimental.decoders.video(
encoded_video_bytes,
build_index=True,
device="mixed",
frames=frames
)
# Scale to [0, 1]
video = video / 255.0
# Resize frames to 224x224
video = fn.resize(video, resize_x=224, resize_y=224)
# Permute to [F, C, H, W]
video = fn.transpose(video, perm=[0, 3, 1, 2]) # HWC → CHW per frame
return video
def load_data(dali_server, loader, total_batch_loads):
batch_loads = 0
for batch in tqdm(loader, total=len(loader), ncols=80):
if not dali_server is None:
data, _ = batch
data = dali_server.produce_data(data)
batch_loads += 1
if batch_loads >= total_batch_loads:
break
# Because of the spawn vs fork difference, OpenCV experiments
# need to be run first, then change the spawn method and
# switch to DALI.
# Thus, each of these sets are run one at a time.
benchmark_experiments = [
pytest.param('OpenCV', None, 60, 4),
pytest.param('DALIGenericIterator', None, 60, None),
pytest.param('DALIServer', 'cpu', 60, None),
pytest.param('DALIServer', 'mixed', 60, None),
pytest.param('OpenCV', None, 120, 4),
pytest.param('DALIGenericIterator', None, 120, None),
pytest.param('DALIServer', 'cpu', 120, None),
pytest.param('DALIServer', 'mixed', 120, None),
pytest.param('OpenCV', None, 300, 4),
pytest.param('DALIGenericIterator', None, 300, None),
pytest.param('DALIServer', 'cpu', 300, None),
pytest.param('DALIServer', 'mixed', 300, None),
]
@pytest.mark.parametrize(
"video_reader,pipeline_type,duration_sec,num_workers",
benchmark_experiments,
)
@pytest.mark.benchmark(group="video_reading")
def test_random_frame_reading(benchmark, video_reader, pipeline_type, duration_sec, num_workers):
test_video_path = f"./test_videos/cfr_test_{str(duration_sec).rjust(5, '0')}.mp4"
fps = 60
total_frames = duration_sec * fps
num_videos = int(1200000 / total_frames) # make sure that the memory footprint is consistent
num_clip_frames = 16
num_examples_per_video = 16
total_examples = 1024
num_examples_per_video = int(total_examples / num_videos)
video_paths = [test_video_path for _ in range(num_videos)]
examples = []
segment_list = []
for path in video_paths:
buffer = np.frombuffer(open(path, 'rb').read(), dtype=np.uint8)
for i in range(num_examples_per_video):
start_frame = random.randint(0, total_frames - ((num_clip_frames + 1) * fps))
frames = [start_frame + fps * j for j in range(num_clip_frames)]
label = random.randint(0, 12)
# For DaliClipDataset-style loader
examples.append(((buffer, frames), (label, path)))
# For fn.readers.video-style loader
segment_list.append(
f"{path} {label} {frames[0]} {frames[-1] + 1}"
)
file_list_txt = "\n".join(segment_list)
print(file_list_txt)
print("video buffers & examples initialized")
batch_size = 128
mini_batch_size = 64
num_loads = batch_size // mini_batch_size
if video_reader == 'OpenCV':
dali_server = None
dataset = DaliClipDataset(examples, dali_server=dali_server)
loader = DataLoader(
dataset,
batch_size=mini_batch_size,
shuffle=False,
num_workers=num_workers,
)
if video_reader == 'DALIGenericIterator':
dali_server = None
mp.set_start_method("spawn", force=True)
tf = tempfile.NamedTemporaryFile()
tf.write(str.encode(file_list_txt))
tf.flush()
pipe = video_pipe(
batch_size=mini_batch_size,
num_threads=12,
device_id=0,
file_list=tf.name,
)
loader = DALIGenericIterator([pipe], ["input", "label"])
tf.close()
elif video_reader == 'DALIServer':
mp.set_start_method("spawn", force=True)
if pipeline_type == 'cpu':
video_pipe_exp = video_pipe_exp_cpu
elif pipeline_type == 'mixed':
video_pipe_exp = video_pipe_exp_mixed
else:
raise ValueError(f"pipeline_type must be either 'cpu' or 'mixed', got '{pipeline_type}'")
pipe = video_pipe_exp(
batch_size=batch_size,
num_threads=12,
device_id=0,
output_dtype=types.FLOAT,
output_ndim=4
)
dali_server = dali_proxy.DALIServer(pipe)
dali_server.start_thread()
num_workers = 0 # hard-coded for DALI
dataset = DaliClipDataset(examples, dali_server=dali_server)
loader = DataLoader(
dataset,
batch_size=mini_batch_size,
shuffle=False,
num_workers=num_workers,
)
benchmark.pedantic(
load_data,
args=(
dali_server,
loader,
num_loads
),
rounds=1,
warmup_rounds=0,
iterations=1,
)
if video_reader == 'DALIServer':
dali_server.stop_thread()
I am running with:
$ CUDA_VISIBLE_DEVICES=1 pytest test/test_benchmark.py --benchmark-enable -vv
Description
Generate examples that sample 16 frames in random places, across the video dataset. Each loader is tasked with loading a two mini batches of 64 examples (128 in total).
Videos are generated with the following command (i.e. 3000 secs).
$ ffmpeg -f lavfi -i color=c=blue:s=1280x720:d=3000:r=60 -c:v libx264 -vf "format=pix_fmts=yuv420p, drawtext=fontsize=64: fontcolor=white: font=monospace: x=(w-text_w)/2: y=(h-text_h)/2: r=60: text='%{frame_num}'" cfr_test_03000.mp4
Setup
Single H100 CUDA 12.4 Python 3.11.9
nvidia-cublas-cu12==12.4.5.8
nvidia-cuda-cupti-cu12==12.4.127
nvidia-cuda-nvrtc-cu12==12.4.127
nvidia-cuda-runtime-cu12==12.4.127
nvidia-cudnn-cu12==9.1.0.70
nvidia-cufft-cu12==11.2.1.3
nvidia-curand-cu12==10.3.5.147
nvidia-cusolver-cu12==11.6.1.9
nvidia-cusparse-cu12==12.3.1.170
nvidia-cusparselt-cu12==0.6.2
nvidia-dali-cuda120==1.49.0
nvidia-nccl-cu12==2.21.5
nvidia-nvcomp-cu12==4.2.0.14
nvidia-nvimgcodec-cu12==0.5.0.13
nvidia-nvjitlink-cu12==12.4.127
nvidia-nvjpeg-cu12==12.4.0.16
nvidia-nvjpeg2k-cu12==0.8.1.40
nvidia-nvtiff-cu12==0.4.0.62
nvidia-nvtx-cu12==12.4.127
opencv-python==4.9.0.80
torch==2.6.0
torchvision==0.21.0
Results
There are four sets of experiments, each with a different video length: 60s, 120s, 300s, 3000s.
| Name | (time in s) |
|---|---|
| [DALIServer-cpu-60-None] | 25.6196 (1.0) |
| [DALIServer-mixed-60-None] | 36.2055 (1.41) |
| [OpenCV-None-60-4] | 48.7927 (1.90) |
| [DALIGenericIterator-None-60-None] | 52.4797 (2.05) |
| Name | (time in s) |
|---|---|
| [DALIServer-cpu-120-None] | 42.2635 (1.0) |
| [OpenCV-None-120-4] | 45.3316 (1.07) |
| [DALIGenericIterator-None-120-None] | 51.7309 (1.22) |
| [DALIServer-mixed-120-None] | 62.9430 (1.49) |
| Name | (time in s) |
|---|---|
| [OpenCV-None-300-4] | 51.8756 (1.0) |
| [DALIGenericIterator-None-300-None] | 52.7815 (1.02) |
| [DALIServer-cpu-300-None] | 98.7108 (1.90) |
| [DALIServer-mixed-300-None] | 152.0365 (2.93) |
| Name | (time in s) |
|---|---|
| [DALIGenericIterator-None-3000-None] | 51.8803 |
| [OpenCV-None-3000-4] | 71.5433 |
| [DALIServer-cpu-3000-None] | 728.0250 |
| [DALIServer-mixed-3000-None] | 1,254.2895 |
Some of my observations
- DALIGenericIterator is very consistent and performs well as the video size increases. Unfortunately, I want more explicit control over example sampling and frame sampling so I don't want to move forward with this paradigm.
- DALIServer-cpu is better than DALIServer-mixed in every instance and this gap widens as the video files get larger.
- OpenCV is still the fastest for large video files (>5 mins)
@sdpetrides Thank you for your detailed reproduction script! We will investigate this and come back to you.