How to uniformly sample a fixed number of frames with fn.readers.video in DALI?
Describe the question.
Hi, I'm using NVIDIA DALI's fn.readers.video_resize (or fn.readers.video) to load video sequences. I want to uniformly sample a fixed number of frames (e.g., 32) from videos of varying lengths.
Currently, my code looks like this:
name="Reader",
device="gpu",
filenames=filenames,
sequence_length=sequence_length,
random_shuffle=False,
initial_fill=2,
labels=classes,
resize_shorter=256,
stride=1,
pad_last_batch=True,
file_list_include_preceding_frame=False
)
But it uses a fixed stride and sequence length, and I don’t see an option to sample frames uniformly across the full video length or to change the stride dynamically.
My questions are:
Is there a built-in way in DALI to uniformly sample a fixed number of frames no matter the length of the video?
Can I dynamically adjust the stride value during loading (e.g., per video or batch)?
If not, what’s the recommended approach to achieve this efficiently within DALI?
Check for duplicates
- [x] I have searched the open bugs/issues and have found no duplicates for this bug report
@ImtiazUlHassan Thank you for your question. That is actually an interesting use-case.
Unfortunately, we don't have currently a built-in way to achieve uniform sampling across the video length. We will add this request to our backlog.
You could, however, use fn.external_source and fn.experimental.decoders.video to provide the encoded video stream and a list or arbitrary frame indices, that you would have to generate, first obtaining the length of the video by other means. Here's a prototype:
import numpy as np
import os
from itertools import cycle
from nvidia.dali import pipeline_def, fn
import cv2
batch_size = 3
nframes_per_clip = 10
device = "mixed"
video_path = os.path.join(os.path.expanduser("~"), "git/DALI_extra/db/video/small/")
def get_frame_count(video_path):
cap = cv2.VideoCapture(video_path)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
return length
def input_gen():
# Get all video files from the specified path
filenames = [os.path.join(video_path, f) for f in os.listdir(video_path)
if f.endswith(('.mp4', '.mkv'))]
if not filenames:
raise ValueError(f"No video files found in {video_path}")
# Create infinite cycle through the files
filenames = cycle(filenames)
while True:
batch_encoded_stream = []
batch_frame_idxs = []
for _ in range(batch_size):
filename = next(filenames)
batch_encoded_stream.append(np.fromfile(filename, dtype=np.uint8))
frame_count = get_frame_count(filename)
frame_idxs = np.linspace(0, frame_count-1, nframes_per_clip, dtype=np.int32)
batch_frame_idxs.append(frame_idxs)
yield (batch_encoded_stream, batch_frame_idxs)
for _ in range(10):
print(next(input_gen()))
@pipeline_def
def test_pipeline():
vid, frame_idxs = fn.external_source(device="cpu", batch=True, num_outputs=2, source=input_gen)
seq = fn.experimental.decoders.video(vid, device=device, frames=frame_idxs)
return seq
pipe = test_pipeline(batch_size=batch_size, num_threads=3, device_id=0)
pipe.build()
print(pipe.run())
@jantonguirao Thanks for your code example!
First I tested on CPU, and the code showed stable but poor performance in my test for using a batch size of 16 to extract 100 batches, which cost 154 seconds.
Then I tried to use these code to extract frames from video on GPU to accelerate the decoding phase, but it seems not valid to use in this way:
vid, audio, frame_idxs, index = fn.external_source(device="gpu", batch=False,
num_outputs=4, source=eii, parallel=parallel,
prefetch_queue_depth=4)
video = fn.experimental.decoders.video(vid, build_index=True, device="mixed", frames=frame_idxs)
And got this:
Assert on "IsCompatibleDevice(device, inp_dev, op_type)" failed: The input 0 for mixed operator `nvidia.dali.fn.experimental.decoders.video` is stored on incompatible device "gpu". Valid device is "cpu".
Is there any way to speed up this process? This feature is very practical!
The problem here is that video decoder expects a CPU input (the output is on the GPU though). Please change your external source to produce the data on CPU memory.
@jantonguirao Thanks for the helpful solution!
I had a related idea: If I set stride=1 and sequence_length=32 , I can retrieve all the frames from each video and then uniformly sample from those later. For example, a 54-frame video would yield two sequences, with the last one potentially padded. I can calculate the total number of frames per video in advance using OpenCV.
My question is: when using Dali video reader eg ,
fn.readers.video(name="Reader", ...)), is there a way to also retrieve the filename associated with each returned sequence? This would help me keep track of which video the sequence belongs to.
You can get the filename on the particular sample returning by DALI with .source_info()
out0, ... = pipe.run()
print(out0[sample_idx].source_info())
@ImtiazUlHassan Thank you for your question. That is actually an interesting use-case.
Unfortunately, we don't have currently a built-in way to achieve uniform sampling across the video length. We will add this request to our backlog.
You could, however, use
fn.external_sourceandfn.experimental.decoders.videoto provide the encoded video stream and a list or arbitrary frame indices, that you would have to generate, first obtaining the length of the video by other means. Here's a prototype:import numpy as np import os from itertools import cycle from nvidia.dali import pipeline_def, fn import cv2 batch_size = 3 nframes_per_clip = 10 device = "mixed" video_path = os.path.join(os.path.expanduser("~"), "git/DALI_extra/db/video/small/") def get_frame_count(video_path): cap = cv2.VideoCapture(video_path) length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) return length def input_gen(): # Get all video files from the specified path filenames = [os.path.join(video_path, f) for f in os.listdir(video_path) if f.endswith(('.mp4', '.mkv'))] if not filenames: raise ValueError(f"No video files found in {video_path}") # Create infinite cycle through the files filenames = cycle(filenames) while True: batch_encoded_stream = [] batch_frame_idxs = [] for _ in range(batch_size): filename = next(filenames) batch_encoded_stream.append(np.fromfile(filename, dtype=np.uint8)) frame_count = get_frame_count(filename) frame_idxs = np.linspace(0, frame_count-1, nframes_per_clip, dtype=np.int32) batch_frame_idxs.append(frame_idxs) yield (batch_encoded_stream, batch_frame_idxs) for _ in range(10): print(next(input_gen())) @pipeline_def def test_pipeline(): vid, frame_idxs = fn.external_source(device="cpu", batch=True, num_outputs=2, source=input_gen) seq = fn.experimental.decoders.video(vid, device=device, frames=frame_idxs) return seq pipe = test_pipeline(batch_size=batch_size, num_threads=3, device_id=0) pipe.build() print(pipe.run())
import pandas as pd
import os
import numpy as np
from itertools import cycle
import cv2
csv_path = '~/Desktop/FineGym/trainfinegym99/test.csv'
video_dir = '~/Desktop/FineGym/trainfinegym99/0/'
df = pd.read_csv(csv_path)
df=df.head()
batch_size = 4
nframes = 32
def input_gen_from_csv():
entries = list(zip(df['FileName'], df['Class'], df['TotalFrames']))
entries = [(os.path.join(video_dir, fname), label, total) for fname, label, total in entries]
entries = cycle(entries) # Infinite cycling
while True:
batch_encoded = []
batch_frame_idxs = []
for _ in range(batch_size):
path, label, total_frames = next(entries)
encoded = np.fromfile(path, dtype=np.uint8)
if total_frames < nframes:
idxs = np.linspace(0, total_frames - 1, total_frames, dtype=np.int32)
idxs = np.pad(idxs, (0, nframes - total_frames), mode='edge')
else:
idxs = np.linspace(0, total_frames - 1, nframes, dtype=np.int32)
batch_encoded.append(encoded)
batch_frame_idxs.append(idxs)
yield batch_encoded, batch_frame_idxs
from nvidia.dali import pipeline_def, fn
device = "mixed" # or "gpu"
@pipeline_def
def video_pipeline():
encoded, frame_idxs = fn.external_source(
source=input_gen_from_csv(), batch=True, num_outputs=2, device="cpu"
)
# Decode video frames
decoded = fn.experimental.decoders.video(encoded, frames=frame_idxs, device=device)
# Resize decoded frames
resized = fn.resize(
decoded,
resize_x=224, # desired width
resize_y=224 # desired height
)
return resized
from nvidia.dali.types import DALIDataType
pipe = video_pipeline(batch_size=batch_size, num_threads=4, device_id=0)
pipe.build()
for _ in range(10):
output = pipe.run()
video_batch = output[0].as_cpu().as_array() # shape: [B, 32, H, W, C]
print("Video batch shape:", video_batch.shape)
i did try something like this but it is way slow ,Is it common for pipelines using fn.external_source (especially in combination with fn.experimental.decoders.video) to show noticeably slower performance compared to using fn.readers.video? I'm still quite new to DALI, so I’m not entirely sure if this is expected behavior or if I might be missing some
The key difference I see here is that when you use external_source + decoder, you are actually reading all the encoded stream before passing it to decode. With the reader, it is parsed directly from the file stream. Another difference is that we actually build a frame index for each video file. If you are reusing the same video file multiple times, the frame index is cached in the case of the reader, but with the decoder we don't really know that it is the same file, so it gets rebuilt every time.
That being said, the best would be to gather a profile (https://developer.nvidia.com/nsight-systems/get-started) You can grab a profile with the following command:
nsys profile --gpu-video-device all --trace=osrt,cuda,opengl,nvtx,nvvideo --trace-fork-before-exec=true --force-overwrite=true -o test_profile python your_script.py
Alternatively, if you share some data to test on our side, we could investigate what is causing the slowdown.
So my original torch dataloader based on decord, which I tried to reproduce in dali, looks like this
num_frames = 32 # Number of frames we want to extract
side_size = 256
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
crop_size = 224
slowfast_alpha = 4
class PackPathway(torch.nn.Module):
"""
Transform for converting video frames as a list of tensors.
"""
def __init__(self):
super().__init__()
def forward(self, frames: torch.Tensor):
fast_pathway = frames
# Perform temporal sampling from the fast pathway.
slow_pathway = torch.index_select(
frames,
1,
torch.linspace(
0, frames.shape[1] - 1, frames.shape[1] // slowfast_alpha
).long(),
)
frame_list = [slow_pathway, fast_pathway]
return frame_list
transform = Compose([
Lambda(lambda x: x / 255.0),
NormalizeVideo(mean=[0.45, 0.45, 0.45], std=[0.225, 0.225, 0.225]),
ShortSideScale(size=256),
CenterCropVideo(224),
PackPathway()
])
class Imtiaz_Custom(Dataset):
def __getitem__(self, idx):
vr = VideoReader(vid_path, ctx=cpu(0))
total_length = len(vr)
indices = np.linspace(0, total_length - 1, 32, dtype=int)
frames = vr.get_batch(indices).permute(3, 0, 1, 2) # (C, T, H, W)
slow, fast = self.transform(frames)
label = ...
return slow.to(device), fast.to(device), label
Now, I tried to reproduce this using DALI as follows:
def input_gen_from_csv(csv_path, video_dir, batch_size, nframes, train):
df = pd.read_csv(csv_path)
if train:
df = df.sample(frac=1).reset_index(drop=True)
entries = list(zip(df['FileName'], df['Class'], df['TotalFrames'], df['ClassEncoded']))
entries = [(os.path.join(video_dir, label, fname), label, total, encoded)
for fname, label, total, encoded in entries]
entries = cycle(entries)
while True:
batch_encoded = []
batch_frame_idxs = []
batch_class_labels = []
for _ in range(batch_size):
path, label, total_frames, class_encoded = next(entries)
encoded = np.fromfile(path, dtype=np.uint8)
if total_frames < nframes:
idxs = np.linspace(0, total_frames - 1, total_frames, dtype=np.int32)
idxs = np.pad(idxs, (0, nframes - total_frames), mode='edge')
else:
idxs = np.linspace(0, total_frames - 1, nframes, dtype=np.int32)
batch_encoded.append(encoded)
batch_frame_idxs.append(idxs)
batch_class_labels.append(np.array([class_encoded], dtype=np.int32))
yield batch_encoded, batch_frame_idxs, batch_class_labels
Pipeline
@pipeline_def
def video_pipeline():
encoded, frame_idxs, labels = fn.external_source(
source=input_gen_from_csv(csv_path, video_dir, batch_size, nframes, train),
batch=True,
num_outputs=3,
device="cpu"
)
decoded = fn.experimental.decoders.video(encoded, frames=frame_idxs, device=device)
resized = fn.resize(decoded, resize_shorter=256)
cropped = fn.crop(resized, crop=(224, 224))
scaled = cropped / 255.0
normalized = fn.normalize(scaled, mean=0.45, stddev=0.225)
normalized_transposed = fn.transpose(normalized, perm=[3, 0, 1, 2]) # (C, T, H, W)
return normalized_transposed, labels
then i am using it as follows
batch_size = 4
nframes = 32
pipetrain = build_dali_video_pipeline(csv_path_train, video_dir, batch_size, nframes,train=True)
pipeval = build_dali_video_pipeline(csv_path_val, video_dir, batch_size, nframes,train=False)
dali_iterator_train = DALIGenericIterator(
[pipetrain],
output_map=["data", "labels"],
size=trainsize,
auto_reset=True
)
dali_iterator_val = DALIGenericIterator(
[pipeval],
output_map=["data","labels"],
size=valsize, # Number of samples in dataset
auto_reset=True
)
While this is the function for PackPathway()
# SlowFast Pathway:
def sample(fast):
indices = torch.linspace(0, 31, steps=8).long()
slow = fast[:, :, indices, :, :] # [B, C, 8, H, W]
return slow
The DALI pipeline runs very fast (which is great), but when I run training with the same model and this DALI pipeline, the results (accuracy/loss) are quite different from when I use my original PyTorch DataLoader.
Am I reproducing the pipeline correctly?
Specifically:
In original PyTorch loader I use:
NormalizeVideo
ShortSideScale(256)
CenterCropVideo(224)
In DALI:
resize_shorter=256
crop=224x224
normalize(mean=0.45, stddev=0.225)
Hi @Ash-one,
I don't see anything obviously different in your code compared to the reference. Have you run both pipelines side by side for one or two videos and compared the results side by side? Are there any differences in the pixel values or image sizes?