DALI icon indicating copy to clipboard operation
DALI copied to clipboard

Designing my video-based pipeline

Open sdpetrides opened this issue 8 months ago • 8 comments

Describe the question.

I'm very interested in using DALI for my model training. My simple benchmarks show great speed-ups over OpenCV when loading directly to the GPU.

However, I'd like some insight, from the developer/maintainer/community, on how I can adapt my current data loading paradigm to DALI. I believe I can adapt everything except for two aspects.

First, the datasets I am working with do not have consistent frame rates. They can range from 20 to 60 FPS. Currently, I control for this when I sample/read frames to make sure the "best" frame is chosen for a given scenario. With rn.readers.video having constant stride param complicates this.

Second, I would like the flexibility to sample at different rates for different segments of frame sequence. For example, I would like the ability to sample at 1 FPS for X frames at offset 0, then 2 FPS for Y frames at offset X for a total of X + Y frames in the resulting tensor.

Third, some datasets are stored as images rather than videos I'll need to support both.

Lastly, I'd like to keep the PyTorch DataLoader and Sampler interface, at the top level. (This one I am willing to budge if the other challenges can be solved.) It seems there are ways to do this but they may not be compatible with my demands above.

-- TLDR: I'd like explicit control over that frames are included to the sequence when loading directly to the GPU, and I want to control over batching, sampling, distribution across devices.

Looking forward to any suggestions, thank you!

Check for duplicates

  • [x] I have searched the open bugs/issues and have found no duplicates for this bug report

sdpetrides avatar Apr 17 '25 18:04 sdpetrides

Hi @sdpetrides,

Your use case is a great fit for our new experimental feature: DALI proxy.

With DALI proxy:

Run a DALI server on the main process and keep your PyTorch DataLoader. Use the DALI proxy as a transform function to send data to the DALI server and return a reference to the future processed data. In the main loop, replace those references with the actual data. You can find more information on DALI proxy here. This feature is new, so please let us know if you encounter any issues or have any questions.

Additionally, you mentioned the need to control which video frames are decoded. We recently added support for this in our experimental video decoder operator. You can find the documentation here. You can use the frames argument to specify a collection of frame indices to decode or use start, stride, and end to decode evenly spaced frames.

In your DataLoader, read the raw encoded video file and send it to the decoder operator via DALI proxy.

I hope you find these links helpful. Please let us know if you need further assistance.

Best regards, Joaquin

jantonguirao avatar Apr 22 '25 07:04 jantonguirao

Thank you for the response! I'll give this a try and report back.

sdpetrides avatar Apr 22 '25 13:04 sdpetrides

I revisited this library and have found some success.

# Load 5 large video files as buffers
# Create examples as explicit frame sequences (kept simple for now)
dali_server_examples = []
for _, path in list(train_paths)[:5]:
    buffer = np.frombuffer(open(path, 'rb').read(), dtype=np.uint8)

    for i in range(12):
        dali_server_examples.append(
            (
                (
                    buffer,
                    [(i + 1) * 30 * j for j in range(16)]
                ),
                random.randint(0, 12)
            )
        )


class DaliClipDataset(Dataset):
    def __init__(self, dali_server, examples):
        self.dali_server = dali_server
        self.examples = examples
    
    def __getitem__(self, idx):
        (video_bytes, frames), target = self.examples[idx]
        frames = np.array(frames, dtype=np.int32)
        data = self.dali_server.proxy(video_bytes=video_bytes, frames=frames)
        return data, target

    def __len__(self):
        return len(self.examples)


def test_decoder():

    mp.set_start_method("spawn", force=True)

    batch_size = 16
    num_threads = 12

    @pipeline_def
    def video_pipe_exp():
        encoded_video_bytes = fn.external_source(name='video_bytes', device="cpu", dtype=types.UINT8, ndim=1, no_copy=True)
        frames = fn.external_source(name='frames', device="cpu", dtype=types.INT32, ndim=1, no_copy=True)
        # TODO: initial crop & scale
        
        video = fn.experimental.decoders.video(encoded_video_bytes, device="mixed", frames=frames)

        # Scale to [0, 1]
        video = video / 255.0

        # Resize frames to 224x224
        video = fn.resize(video, resize_x=224, resize_y=224)

        # Permute to [F, C, H, W]
        video = fn.transpose(video, perm=[0, 3, 1, 2])  # HWC → CHW per frame

        return video

    pipe = video_pipe_exp(
        batch_size=batch_size,
        num_threads=num_threads,
        device_id=0,
        output_dtype=types.FLOAT,
        output_ndim=4
    )

    try:
        dali_server = dali_proxy.DALIServer(pipe)

        dali_server.start_thread()

        print("dataset init begins now...")
        dataset = DaliClipDataset(dali_server, dali_server_examples)
        print("dataset complete")

        loader = DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=0,
        )
        for data, target in loader:
            print('before produce_data')
            data = dali_server.produce_data(data)
            print('after produce_data', data.shape, data.device)
    
    except RuntimeError as e:
        raise e
    except KeyboardInterrupt as e:
        raise e
    finally:
        dali_server.stop_thread()

During batch loading, usually on the second iteration, I run into this RuntimeError.

[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
[/opt/dali/dali/operators/video/frames_decoder_base.cc:245] Failed to open video file from memory buffer due to: Invalid data found when processing input
Traceback (most recent call last):
...
  File "envs/vit/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py", line 601, in _produce_data_impl
    cache[pipe_run_ref_id] = self._get_outputs(obj.pipe_run_ref)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "envs/vit/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py", line 552, in _get_outputs
    raise err
  File "envs/vit/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py", line 729, in _thread_fn
    pipe_outputs = self._pipe.outputs()
                   ^^^^^^^^^^^^^^^^^^^^
  File "envs/vit/lib/python3.11/site-packages/nvidia/dali/pipeline.py", line 1291, in outputs
    return self._outputs(cuda_stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "envs/vit/lib/python3.11/site-packages/nvidia/dali/pipeline.py", line 1401, in _outputs
    return self._pipe.Outputs(types._raw_cuda_stream_ptr(cuda_stream))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Critical error in pipeline:
Error in MIXED operator `nvidia.dali.fn.experimental.decoders.video`,
which was used in the pipeline definition with the following traceback:

  File ".../test/test_loader_benchmark.py", line 328, in video_pipe_exp
    video = fn.experimental.decoders.video(encoded_video_bytes, device="mixed", frames=frames)

encountered:

Error in thread 2: Assert on "frames_decoders_[s]->IsValid()" failed: Failed to create video decoder for "memory file"
Current pipeline object is no longer valid.

I am not sure what is happening to the buffer/pipeline. Looking for guidance.

sdpetrides avatar May 07 '25 22:05 sdpetrides

Hi @sdpetrides,

Thank you for checking this out. Does it happen for all your video files or only some? Does it repro with any of the files from https://github.com/NVIDIA/DALI_extra/tree/main/db/video?

JanuszL avatar May 08 '25 05:05 JanuszL

I used the ffmpeg command to create cfr_test-like videos of various durations. I am not running into the Failed to create video decoder error any longer.

For my original dataset of videos, I have found this error is stochastic: for a particular video it may or not be raised. Probing the videos, there doesn't seem to be an issue decoding the frames.

Using the cfr_test-like videos, I am finding some curious results with my benchmarking.

fn.experimental.decoders.video device transfer to GPU before transformations CPU utilization results
mixed no (already on CUDA) up to 100% 16s to load a batch
cpu yes up to 100% for each example in batch 12s to load a batch

I am finding that it is faster to decode on the CPU and transfer to the GPU than to decode on the GPU. This gap widens as the videos get longer. Is this expected?

sdpetrides avatar May 08 '25 16:05 sdpetrides

@sdpetrides That is surprising. Is there a chance you could share a reproduction script that we can check?

jantonguirao avatar May 09 '25 07:05 jantonguirao

Sure, here is the full benchmarking script.

import random
import tempfile

import cv2
import pytest
import torch

import numpy as np
from tqdm import tqdm

from torch.utils.data import DataLoader, Dataset
from torchvision.transforms.v2 import Compose, ToImage, ToDtype, Resize
from torchvision.transforms.v2 import functional as F
import torch.multiprocessing as mp

from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import DALIGenericIterator
from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy

class DaliClipDataset(Dataset):
    def __init__(self, examples, dali_server=None):
        self.dali_server = dali_server
        self.examples = examples
        self.transform = Compose(
            [
                ToDtype(torch.float32, scale=True),
                Resize((224, 224))
            ],
        )
    
    def __getitem__(self, idx):
        (video_bytes, frames), (target, path) = self.examples[idx]
        
        if self.dali_server:
            frames = np.array(frames, dtype=np.int32)
            data = self.dali_server.proxy(video_bytes=video_bytes, frames=frames)
        else:
            cap = cv2.VideoCapture(path)
            if not cap.isOpened():
                raise ValueError(f"Unable to open video file: {path}")

            clip = []
            for frame_position in frames:
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_position)
                ret, frame = cap.read()
                if not ret:
                    raise ValueError(f"No frame at position {frame_position}, {path}")
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                clip.append(F.to_image(frame))

            cap.release()

            data = self.transform(torch.stack(clip))

        return data, target

    def __len__(self):
        return len(self.examples)

video_fps = 60
sequence_length = 16

@pipeline_def
def video_pipe(file_list):
    video, label = fn.readers.video(
        device='gpu',
        file_list=file_list,
        file_list_frame_num=True,
        sequence_length=sequence_length,
        image_type=types.RGB,
        dtype=types.FLOAT,
        normalized=False,
        random_shuffle=False,
        stride=video_fps,
        step=1, # no-op: each record will be a single clip
        tensor_init_bytes=sequence_length * 3 * 1920 * 1080 * 4,
        file_list_include_preceding_frame=False,
    )

    # Scale to [0, 1]
    video = video / 255.0

    # Resize frames to 224x224
    video = fn.resize(video, resize_x=224, resize_y=224)

    # Permute to [F, C, H, W]
    video = fn.transpose(video, perm=[0, 3, 1, 2])  # HWC → CHW per frame

    return video, label


@pipeline_def
def video_pipe_exp_cpu():
    encoded_video_bytes = fn.external_source(name='video_bytes', device="cpu", dtype=types.UINT8, ndim=1, no_copy=True)
    frames = fn.external_source(name='frames', device="cpu", dtype=types.INT32, ndim=1, no_copy=True)
    
    video = fn.experimental.decoders.video(
        encoded_video_bytes,
        build_index=True,
        device="cpu",
        frames=frames
    )

    # Move to GPU for transformation
    video = video.gpu()

    # Scale to [0, 1]
    video = video / 255.0

    # Resize frames to 224x224
    video = fn.resize(video, resize_x=224, resize_y=224)

    # Permute to [F, C, H, W]
    video = fn.transpose(video, perm=[0, 3, 1, 2])  # HWC → CHW per frame

    return video

@pipeline_def
def video_pipe_exp_mixed():
    encoded_video_bytes = fn.external_source(name='video_bytes', device="cpu", dtype=types.UINT8, ndim=1, no_copy=True)
    frames = fn.external_source(name='frames', device="cpu", dtype=types.INT32, ndim=1, no_copy=True)
    
    video = fn.experimental.decoders.video(
        encoded_video_bytes,
        build_index=True,
        device="mixed",
        frames=frames
    )

    # Scale to [0, 1]
    video = video / 255.0

    # Resize frames to 224x224
    video = fn.resize(video, resize_x=224, resize_y=224)

    # Permute to [F, C, H, W]
    video = fn.transpose(video, perm=[0, 3, 1, 2])  # HWC → CHW per frame

    return video


def load_data(dali_server, loader, total_batch_loads):
    batch_loads = 0
    for batch in tqdm(loader, total=len(loader), ncols=80):

        if not dali_server is None:
            data, _ = batch
            data = dali_server.produce_data(data)
        
        batch_loads += 1

        if batch_loads >= total_batch_loads:
            break

# Because of the spawn vs fork difference, OpenCV experiments
# need to be run first, then change the spawn method and 
# switch to DALI.

# Thus, each of these sets are run one at a time.
benchmark_experiments = [
    pytest.param('OpenCV', None, 60, 4),
    pytest.param('DALIGenericIterator', None, 60, None),
    pytest.param('DALIServer', 'cpu', 60, None),
    pytest.param('DALIServer', 'mixed', 60, None),

    pytest.param('OpenCV', None, 120, 4), 
    pytest.param('DALIGenericIterator', None, 120, None),
    pytest.param('DALIServer', 'cpu', 120, None),
    pytest.param('DALIServer', 'mixed', 120, None),

    pytest.param('OpenCV', None, 300, 4), 
    pytest.param('DALIGenericIterator', None, 300, None),
    pytest.param('DALIServer', 'cpu', 300, None),
    pytest.param('DALIServer', 'mixed', 300, None),
]


@pytest.mark.parametrize(
    "video_reader,pipeline_type,duration_sec,num_workers",
    benchmark_experiments,
)
@pytest.mark.benchmark(group="video_reading")
def test_random_frame_reading(benchmark, video_reader, pipeline_type, duration_sec, num_workers):
    
    test_video_path = f"./test_videos/cfr_test_{str(duration_sec).rjust(5, '0')}.mp4"
    
    fps = 60
    total_frames = duration_sec * fps
    num_videos = int(1200000 / total_frames) # make sure that the memory footprint is consistent
    num_clip_frames = 16
    num_examples_per_video = 16
    total_examples = 1024
    num_examples_per_video = int(total_examples / num_videos)
    video_paths = [test_video_path for _ in range(num_videos)]

    examples = []
    segment_list = []
    for path in video_paths:
        buffer = np.frombuffer(open(path, 'rb').read(), dtype=np.uint8)

        for i in range(num_examples_per_video):
            start_frame = random.randint(0, total_frames - ((num_clip_frames + 1) * fps))
            frames = [start_frame + fps * j for j in range(num_clip_frames)]
            label = random.randint(0, 12)
            
            # For DaliClipDataset-style loader
            examples.append(((buffer, frames), (label, path)))
        
            # For fn.readers.video-style loader
            segment_list.append(
                f"{path} {label} {frames[0]} {frames[-1] + 1}"
            )
        
    file_list_txt = "\n".join(segment_list)
    print(file_list_txt)
    
    print("video buffers & examples initialized")

    batch_size = 128
    mini_batch_size = 64
    num_loads = batch_size // mini_batch_size

    if video_reader == 'OpenCV':
        dali_server = None

        dataset = DaliClipDataset(examples, dali_server=dali_server)

        loader = DataLoader(
            dataset,
            batch_size=mini_batch_size,
            shuffle=False,
            num_workers=num_workers,
        )
    if video_reader == 'DALIGenericIterator':
        dali_server = None

        mp.set_start_method("spawn", force=True)

        tf = tempfile.NamedTemporaryFile()
        tf.write(str.encode(file_list_txt))
        tf.flush()

        pipe = video_pipe(
            batch_size=mini_batch_size,
            num_threads=12,
            device_id=0,
            file_list=tf.name,
        )

        loader = DALIGenericIterator([pipe], ["input", "label"])

        tf.close()

    elif video_reader == 'DALIServer':
        mp.set_start_method("spawn", force=True)

        if pipeline_type == 'cpu':
            video_pipe_exp = video_pipe_exp_cpu
        elif pipeline_type == 'mixed':
            video_pipe_exp = video_pipe_exp_mixed
        else:
            raise ValueError(f"pipeline_type must be either 'cpu' or 'mixed', got '{pipeline_type}'")
        
        pipe = video_pipe_exp(
            batch_size=batch_size,
            num_threads=12,
            device_id=0,
            output_dtype=types.FLOAT,
            output_ndim=4
        )
        dali_server = dali_proxy.DALIServer(pipe)
        dali_server.start_thread()
        num_workers = 0  # hard-coded for DALI

        dataset = DaliClipDataset(examples, dali_server=dali_server)

        loader = DataLoader(
            dataset,
            batch_size=mini_batch_size,
            shuffle=False,
            num_workers=num_workers,
        )

    benchmark.pedantic(
        load_data,
        args=(
            dali_server,
            loader,
            num_loads
        ),
        rounds=1,
        warmup_rounds=0,
        iterations=1,
    )

    if video_reader == 'DALIServer':
        dali_server.stop_thread()

I am running with:

$ CUDA_VISIBLE_DEVICES=1 pytest test/test_benchmark.py --benchmark-enable -vv

Description

Generate examples that sample 16 frames in random places, across the video dataset. Each loader is tasked with loading a two mini batches of 64 examples (128 in total).

Videos are generated with the following command (i.e. 3000 secs).

$ ffmpeg -f lavfi -i color=c=blue:s=1280x720:d=3000:r=60 -c:v libx264 -vf "format=pix_fmts=yuv420p, drawtext=fontsize=64: fontcolor=white: font=monospace: x=(w-text_w)/2: y=(h-text_h)/2: r=60: text='%{frame_num}'" cfr_test_03000.mp4

Setup

Single H100 CUDA 12.4 Python 3.11.9

nvidia-cublas-cu12==12.4.5.8
nvidia-cuda-cupti-cu12==12.4.127
nvidia-cuda-nvrtc-cu12==12.4.127
nvidia-cuda-runtime-cu12==12.4.127
nvidia-cudnn-cu12==9.1.0.70
nvidia-cufft-cu12==11.2.1.3
nvidia-curand-cu12==10.3.5.147
nvidia-cusolver-cu12==11.6.1.9
nvidia-cusparse-cu12==12.3.1.170
nvidia-cusparselt-cu12==0.6.2
nvidia-dali-cuda120==1.49.0
nvidia-nccl-cu12==2.21.5
nvidia-nvcomp-cu12==4.2.0.14
nvidia-nvimgcodec-cu12==0.5.0.13
nvidia-nvjitlink-cu12==12.4.127
nvidia-nvjpeg-cu12==12.4.0.16
nvidia-nvjpeg2k-cu12==0.8.1.40
nvidia-nvtiff-cu12==0.4.0.62
nvidia-nvtx-cu12==12.4.127
opencv-python==4.9.0.80
torch==2.6.0
torchvision==0.21.0

Results

There are four sets of experiments, each with a different video length: 60s, 120s, 300s, 3000s.

Name (time in s)
[DALIServer-cpu-60-None] 25.6196 (1.0)
[DALIServer-mixed-60-None] 36.2055 (1.41)
[OpenCV-None-60-4] 48.7927 (1.90)
[DALIGenericIterator-None-60-None] 52.4797 (2.05)
Name (time in s)
[DALIServer-cpu-120-None] 42.2635 (1.0)
[OpenCV-None-120-4] 45.3316 (1.07)
[DALIGenericIterator-None-120-None] 51.7309 (1.22)
[DALIServer-mixed-120-None] 62.9430 (1.49)
Name (time in s)
[OpenCV-None-300-4] 51.8756 (1.0)
[DALIGenericIterator-None-300-None] 52.7815 (1.02)
[DALIServer-cpu-300-None] 98.7108 (1.90)
[DALIServer-mixed-300-None] 152.0365 (2.93)
Name (time in s)
[DALIGenericIterator-None-3000-None] 51.8803
[OpenCV-None-3000-4] 71.5433
[DALIServer-cpu-3000-None] 728.0250
[DALIServer-mixed-3000-None] 1,254.2895

Some of my observations

  • DALIGenericIterator is very consistent and performs well as the video size increases. Unfortunately, I want more explicit control over example sampling and frame sampling so I don't want to move forward with this paradigm.
  • DALIServer-cpu is better than DALIServer-mixed in every instance and this gap widens as the video files get larger.
  • OpenCV is still the fastest for large video files (>5 mins)

sdpetrides avatar May 09 '25 19:05 sdpetrides

@sdpetrides Thank you for your detailed reproduction script! We will investigate this and come back to you.

jantonguirao avatar May 12 '25 08:05 jantonguirao