fairseq2 icon indicating copy to clipboard operation
fairseq2 copied to clipboard

Video decoder for data_pipeline

Open am831 opened this issue 6 months ago • 1 comments

What does this PR do? Please describe: Implements video_decoder for loading a video dataset as part of the data_pipeline using ffmpeg libraries. The libraries used are libavcodec, libavformat, libavutil, and libswscale. video_decoder class is exposed to pybind11 which uses the classes in the detail directory for decoding. ffmpeg_decoder class does the heavy lifting and handles the resources acquired by libavformat. stream class handles metadata for the streams and the resources acquired by libavcodec. transform class handles transformations on frame data and resources acquired by libswscale.

You can use this example to test it manually:

from dataclasses import dataclass
from pathlib import Path
from typing import Generator, Sequence, Tuple

import logging 
import torch
from torch import Tensor

from fairseq2.data import Collater, FileMapper, StringLike
from fairseq2.data.video import VideoDecoder
from fairseq2.data.text import StrSplitter, read_text
from fairseq2.typing import DataType, Device
from fairseq2.data.data_pipeline import DataPipeline

@dataclass
class DataContext:
    data_file: Path
    """The pathname of the test TSV data file."""

    video_field: str
    """The string field corresponding to the relative path of the audio file."""

    video_root_dir: Path
    """The pathname of the directory under which audio files are stored."""

    device: Device
    """The device on which to run inference."""

    dtype: DataType
    """The dtype with which to run inference."""


def build_data_pipeline(ctx: DataContext) -> DataPipeline:
    # TODO: This will be soon auto-tuned. Right now hand-tuned for devfair.
    n_parallel = 4

    # Open TSV, skip the header line, split into fields, and return three fields
    # only.
    split_tsv = StrSplitter(
        # We assume the tsv file has these 3 fields.
        names=["id", ctx.video_field, "raw_target_text"], indices=[0, 1, 2]
    )

    pipeline_builder = read_text(ctx.data_file, rtrim=True).skip(1).map(split_tsv)

    # Memory map video files and cache up to 10 files.
    map_file = FileMapper(root_dir=ctx.video_root_dir, cached_fd_count=10)

    pipeline_builder.map(map_file, selector=ctx.video_field, num_parallel_calls=n_parallel)

    # Decode mmap'ed video using ffmpeg and convert them from waveform to fbank.
    decode_vid = VideoDecoder()

    pipeline_builder.map(
        [decode_vid],
        selector=f"{ctx.video_field}.data",
        num_parallel_calls=n_parallel,
    )

    # Build and return the data pipeline.
    return pipeline_builder.and_return()


def run_pipeline(ctx: DataContext):
    """Iterate through the specified TSV file and return translation + reference text + units"""
    # Build a simple pipeline that just reads a single TSV file.
    pipeline = build_data_pipeline(ctx)
    
    # Iterate through each example in the TSV file until CTRL-C.
    for example in pipeline:
        print(example)    

if __name__ == "__main__":
    # fmt: off
    ctx = DataContext(
        data_file=Path("yourtsv"),
        video_field="mp4_file",
        video_root_dir=Path("path"),
        device=torch.device("cpu"),
        dtype=torch.float32
    )
    # fmt: on

    run_pipeline(ctx)

Check list:

  • [ ] Was the content of this PR discussed and approved via a GitHub issue? (no need for typos or documentation improvements)
  • [x] Did you read the contributor guideline?
  • [x] Did you make sure that your PR does only one thing instead of bundling different changes together?
  • [ ] Did you make sure to update the documentation with your changes? (if necessary)
  • [x] Did you write any new necessary tests?
  • [x] Did you verify new and existing tests pass locally with your changes?
  • [ ] Did you update the CHANGELOG? (no need for typos, documentation, or minor internal changes)

am831 avatar Dec 02 '23 01:12 am831