DALI icon indicating copy to clipboard operation
DALI copied to clipboard

How to uniformly sample a fixed number of frames with fn.readers.video in DALI?

Open ImtiazUlHassan opened this issue 7 months ago • 7 comments

Describe the question.

Hi, I'm using NVIDIA DALI's fn.readers.video_resize (or fn.readers.video) to load video sequences. I want to uniformly sample a fixed number of frames (e.g., 32) from videos of varying lengths.

Currently, my code looks like this:

    name="Reader",
    device="gpu",
    filenames=filenames,
    sequence_length=sequence_length,
    random_shuffle=False,
    initial_fill=2,
    labels=classes,
    resize_shorter=256,
    stride=1,
    pad_last_batch=True,
    file_list_include_preceding_frame=False
)

But it uses a fixed stride and sequence length, and I don’t see an option to sample frames uniformly across the full video length or to change the stride dynamically.

My questions are:

Is there a built-in way in DALI to uniformly sample a fixed number of frames no matter the length of the video?

Can I dynamically adjust the stride value during loading (e.g., per video or batch)?

If not, what’s the recommended approach to achieve this efficiently within DALI?

Check for duplicates

  • [x] I have searched the open bugs/issues and have found no duplicates for this bug report

ImtiazUlHassan avatar May 19 '25 09:05 ImtiazUlHassan

@ImtiazUlHassan Thank you for your question. That is actually an interesting use-case.

Unfortunately, we don't have currently a built-in way to achieve uniform sampling across the video length. We will add this request to our backlog.

You could, however, use fn.external_source and fn.experimental.decoders.video to provide the encoded video stream and a list or arbitrary frame indices, that you would have to generate, first obtaining the length of the video by other means. Here's a prototype:

import numpy as np
import os
from itertools import cycle
from nvidia.dali import pipeline_def, fn
import cv2

batch_size = 3
nframes_per_clip = 10
device = "mixed"
video_path = os.path.join(os.path.expanduser("~"), "git/DALI_extra/db/video/small/")

def get_frame_count(video_path):
    cap = cv2.VideoCapture(video_path)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    return length

def input_gen():
    # Get all video files from the specified path
    filenames = [os.path.join(video_path, f) for f in os.listdir(video_path) 
                if f.endswith(('.mp4', '.mkv'))]
    if not filenames:
        raise ValueError(f"No video files found in {video_path}")
        
    # Create infinite cycle through the files
    filenames = cycle(filenames)
    
    while True:
        batch_encoded_stream = []
        batch_frame_idxs = []
        for _ in range(batch_size):
            filename = next(filenames)
            batch_encoded_stream.append(np.fromfile(filename, dtype=np.uint8))
            frame_count = get_frame_count(filename)
            frame_idxs = np.linspace(0, frame_count-1, nframes_per_clip, dtype=np.int32)
            batch_frame_idxs.append(frame_idxs)
        yield (batch_encoded_stream, batch_frame_idxs)

for _ in range(10):
    print(next(input_gen()))

@pipeline_def
def test_pipeline():
    vid, frame_idxs = fn.external_source(device="cpu", batch=True, num_outputs=2, source=input_gen)
    seq = fn.experimental.decoders.video(vid, device=device, frames=frame_idxs)
    return seq

pipe = test_pipeline(batch_size=batch_size, num_threads=3, device_id=0)
pipe.build()

print(pipe.run())

jantonguirao avatar May 19 '25 11:05 jantonguirao

@jantonguirao Thanks for your code example!

First I tested on CPU, and the code showed stable but poor performance in my test for using a batch size of 16 to extract 100 batches, which cost 154 seconds.

Then I tried to use these code to extract frames from video on GPU to accelerate the decoding phase, but it seems not valid to use in this way:

    vid, audio, frame_idxs, index = fn.external_source(device="gpu", batch=False, 
                                                       num_outputs=4, source=eii, parallel=parallel,
                                                       prefetch_queue_depth=4)
    video = fn.experimental.decoders.video(vid, build_index=True, device="mixed", frames=frame_idxs)

And got this:

Assert on "IsCompatibleDevice(device, inp_dev, op_type)" failed: The input 0 for mixed operator `nvidia.dali.fn.experimental.decoders.video` is stored on incompatible device "gpu". Valid device is "cpu".

Is there any way to speed up this process? This feature is very practical!

Ash-one avatar May 28 '25 09:05 Ash-one

The problem here is that video decoder expects a CPU input (the output is on the GPU though). Please change your external source to produce the data on CPU memory.

jantonguirao avatar May 28 '25 09:05 jantonguirao

@jantonguirao Thanks for the helpful solution!

I had a related idea: If I set stride=1 and sequence_length=32 , I can retrieve all the frames from each video and then uniformly sample from those later. For example, a 54-frame video would yield two sequences, with the last one potentially padded. I can calculate the total number of frames per video in advance using OpenCV.

My question is: when using Dali video reader eg , fn.readers.video(name="Reader", ...)), is there a way to also retrieve the filename associated with each returned sequence? This would help me keep track of which video the sequence belongs to.

ImtiazUlHassan avatar May 30 '25 10:05 ImtiazUlHassan

You can get the filename on the particular sample returning by DALI with .source_info()

out0, ... = pipe.run()
print(out0[sample_idx].source_info())

jantonguirao avatar May 30 '25 11:05 jantonguirao

@ImtiazUlHassan Thank you for your question. That is actually an interesting use-case.

Unfortunately, we don't have currently a built-in way to achieve uniform sampling across the video length. We will add this request to our backlog.

You could, however, use fn.external_source and fn.experimental.decoders.video to provide the encoded video stream and a list or arbitrary frame indices, that you would have to generate, first obtaining the length of the video by other means. Here's a prototype:

import numpy as np
import os
from itertools import cycle
from nvidia.dali import pipeline_def, fn
import cv2

batch_size = 3
nframes_per_clip = 10
device = "mixed"
video_path = os.path.join(os.path.expanduser("~"), "git/DALI_extra/db/video/small/")

def get_frame_count(video_path):
    cap = cv2.VideoCapture(video_path)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    return length

def input_gen():
    # Get all video files from the specified path
    filenames = [os.path.join(video_path, f) for f in os.listdir(video_path) 
                if f.endswith(('.mp4', '.mkv'))]
    if not filenames:
        raise ValueError(f"No video files found in {video_path}")
        
    # Create infinite cycle through the files
    filenames = cycle(filenames)
    
    while True:
        batch_encoded_stream = []
        batch_frame_idxs = []
        for _ in range(batch_size):
            filename = next(filenames)
            batch_encoded_stream.append(np.fromfile(filename, dtype=np.uint8))
            frame_count = get_frame_count(filename)
            frame_idxs = np.linspace(0, frame_count-1, nframes_per_clip, dtype=np.int32)
            batch_frame_idxs.append(frame_idxs)
        yield (batch_encoded_stream, batch_frame_idxs)

for _ in range(10):
    print(next(input_gen()))

@pipeline_def
def test_pipeline():
    vid, frame_idxs = fn.external_source(device="cpu", batch=True, num_outputs=2, source=input_gen)
    seq = fn.experimental.decoders.video(vid, device=device, frames=frame_idxs)
    return seq

pipe = test_pipeline(batch_size=batch_size, num_threads=3, device_id=0)
pipe.build()

print(pipe.run())
import pandas as pd
import os
import numpy as np
from itertools import cycle
import cv2

csv_path = '~/Desktop/FineGym/trainfinegym99/test.csv'
video_dir = '~/Desktop/FineGym/trainfinegym99/0/'

df = pd.read_csv(csv_path)
df=df.head()
batch_size = 4
nframes = 32

def input_gen_from_csv():
    entries = list(zip(df['FileName'], df['Class'], df['TotalFrames']))
    entries = [(os.path.join(video_dir, fname), label, total) for fname, label, total in entries]
    entries = cycle(entries)  # Infinite cycling

    while True:
        batch_encoded = []
        batch_frame_idxs = []
        for _ in range(batch_size):
            path, label, total_frames = next(entries)
            encoded = np.fromfile(path, dtype=np.uint8)

            if total_frames < nframes:
                idxs = np.linspace(0, total_frames - 1, total_frames, dtype=np.int32)
                idxs = np.pad(idxs, (0, nframes - total_frames), mode='edge')
            else:
                idxs = np.linspace(0, total_frames - 1, nframes, dtype=np.int32)

            batch_encoded.append(encoded)
            batch_frame_idxs.append(idxs)

        yield batch_encoded, batch_frame_idxs


from nvidia.dali import pipeline_def, fn

device = "mixed"  # or "gpu"

@pipeline_def
def video_pipeline():
    encoded, frame_idxs = fn.external_source(
        source=input_gen_from_csv(), batch=True, num_outputs=2, device="cpu"
    )
    
    # Decode video frames
    decoded = fn.experimental.decoders.video(encoded, frames=frame_idxs, device=device)

    # Resize decoded frames
    resized = fn.resize(
        decoded,
        resize_x=224,  # desired width
        resize_y=224  # desired height
    )

    return resized

from nvidia.dali.types import DALIDataType

pipe = video_pipeline(batch_size=batch_size, num_threads=4, device_id=0)
pipe.build()

for _ in range(10):
    output = pipe.run()
    video_batch = output[0].as_cpu().as_array()  # shape: [B, 32, H, W, C]
    print("Video batch shape:", video_batch.shape)


i did try something like this but it is way slow ,Is it common for pipelines using fn.external_source (especially in combination with fn.experimental.decoders.video) to show noticeably slower performance compared to using fn.readers.video? I'm still quite new to DALI, so I’m not entirely sure if this is expected behavior or if I might be missing some

ImtiazUlHassan avatar May 31 '25 18:05 ImtiazUlHassan

The key difference I see here is that when you use external_source + decoder, you are actually reading all the encoded stream before passing it to decode. With the reader, it is parsed directly from the file stream. Another difference is that we actually build a frame index for each video file. If you are reusing the same video file multiple times, the frame index is cached in the case of the reader, but with the decoder we don't really know that it is the same file, so it gets rebuilt every time.

That being said, the best would be to gather a profile (https://developer.nvidia.com/nsight-systems/get-started) You can grab a profile with the following command:

nsys profile --gpu-video-device all --trace=osrt,cuda,opengl,nvtx,nvvideo --trace-fork-before-exec=true --force-overwrite=true -o test_profile python your_script.py

Alternatively, if you share some data to test on our side, we could investigate what is causing the slowdown.

jantonguirao avatar Jun 02 '25 09:06 jantonguirao

So my original torch dataloader based on decord, which I tried to reproduce in dali, looks like this


num_frames = 32  # Number of frames we want to extract
side_size = 256
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
crop_size = 224
slowfast_alpha = 4



class PackPathway(torch.nn.Module):
    """
    Transform for converting video frames as a list of tensors.
    """
    def __init__(self):
        super().__init__()

    def forward(self, frames: torch.Tensor):
        fast_pathway = frames
        # Perform temporal sampling from the fast pathway.
        slow_pathway = torch.index_select(
            frames,
            1,
            torch.linspace(
                0, frames.shape[1] - 1, frames.shape[1] // slowfast_alpha
            ).long(),
        )
        frame_list = [slow_pathway, fast_pathway]
        return frame_list




    transform = Compose([
    Lambda(lambda x: x / 255.0),
    NormalizeVideo(mean=[0.45, 0.45, 0.45], std=[0.225, 0.225, 0.225]),
    ShortSideScale(size=256),
    CenterCropVideo(224),
    PackPathway()
])

class Imtiaz_Custom(Dataset):
    def __getitem__(self, idx):
        vr = VideoReader(vid_path, ctx=cpu(0))
        total_length = len(vr)
        indices = np.linspace(0, total_length - 1, 32, dtype=int)
        frames = vr.get_batch(indices).permute(3, 0, 1, 2)  # (C, T, H, W)
        slow, fast = self.transform(frames)
        label = ...
        return slow.to(device), fast.to(device), label

Now, I tried to reproduce this using DALI as follows:


 def input_gen_from_csv(csv_path, video_dir, batch_size, nframes, train):
    df = pd.read_csv(csv_path)
    if train:
        df = df.sample(frac=1).reset_index(drop=True)
    entries = list(zip(df['FileName'], df['Class'], df['TotalFrames'], df['ClassEncoded']))
    entries = [(os.path.join(video_dir, label, fname), label, total, encoded)
               for fname, label, total, encoded in entries]
    entries = cycle(entries)

    while True:
        batch_encoded = []
        batch_frame_idxs = []
        batch_class_labels = []
        for _ in range(batch_size):
            path, label, total_frames, class_encoded = next(entries)
            encoded = np.fromfile(path, dtype=np.uint8)

            if total_frames < nframes:
                idxs = np.linspace(0, total_frames - 1, total_frames, dtype=np.int32)
                idxs = np.pad(idxs, (0, nframes - total_frames), mode='edge')
            else:
                idxs = np.linspace(0, total_frames - 1, nframes, dtype=np.int32)

            batch_encoded.append(encoded)
            batch_frame_idxs.append(idxs)
            batch_class_labels.append(np.array([class_encoded], dtype=np.int32))
        yield batch_encoded, batch_frame_idxs, batch_class_labels

Pipeline

@pipeline_def
def video_pipeline():
    encoded, frame_idxs, labels = fn.external_source(
        source=input_gen_from_csv(csv_path, video_dir, batch_size, nframes, train),
        batch=True,
        num_outputs=3,
        device="cpu"
    )

    decoded = fn.experimental.decoders.video(encoded, frames=frame_idxs, device=device)
    resized = fn.resize(decoded, resize_shorter=256)
    cropped = fn.crop(resized, crop=(224, 224))
    scaled = cropped / 255.0
    normalized = fn.normalize(scaled, mean=0.45, stddev=0.225)
    normalized_transposed = fn.transpose(normalized, perm=[3, 0, 1, 2])  # (C, T, H, W)

    return normalized_transposed, labels

then i am using it as follows

batch_size = 4
nframes = 32

pipetrain = build_dali_video_pipeline(csv_path_train, video_dir, batch_size, nframes,train=True)
pipeval = build_dali_video_pipeline(csv_path_val, video_dir, batch_size, nframes,train=False)

dali_iterator_train = DALIGenericIterator(
    [pipetrain],
    output_map=["data", "labels"],
    size=trainsize,
    auto_reset=True
)

dali_iterator_val = DALIGenericIterator(
    [pipeval],
    output_map=["data","labels"],
    size=valsize,  # Number of samples in dataset
    auto_reset=True
)


While this is the function for PackPathway()

# SlowFast Pathway:
def sample(fast):
    indices = torch.linspace(0, 31, steps=8).long()
    slow = fast[:, :, indices, :, :]  # [B, C, 8, H, W]
    return slow

The DALI pipeline runs very fast (which is great), but when I run training with the same model and this DALI pipeline, the results (accuracy/loss) are quite different from when I use my original PyTorch DataLoader.

Am I reproducing the pipeline correctly?

Specifically:

In original PyTorch loader I use:

NormalizeVideo

ShortSideScale(256)

CenterCropVideo(224)

In DALI:

resize_shorter=256

crop=224x224

normalize(mean=0.45, stddev=0.225)

ImtiazUlHassan avatar Jun 20 '25 10:06 ImtiazUlHassan

Hi @Ash-one,

I don't see anything obviously different in your code compared to the reference. Have you run both pipelines side by side for one or two videos and compared the results side by side? Are there any differences in the pixel values or image sizes?

JanuszL avatar Jun 24 '25 05:06 JanuszL