datasets icon indicating copy to clipboard operation
datasets copied to clipboard

Video and audio decoding with torchcodec

Open TyTodd opened this issue 6 months ago • 16 comments

Feature request

Pytorch is migrating video processing to torchcodec and it's pretty cool. It would be nice to migrate both the audio and video features to use torchcodec instead of torchaudio/video.

Motivation

My use case is I'm working on a multimodal AV model, and what's nice about torchcodec is I can extract the audio tensors directly from MP4 files. Also, I can easily resample video data to whatever fps I like on the fly. I haven't found an easy/efficient way to do this with torchvision.

Your contribution

I’m modifying the Video dataclass to use torchcodec in place of the current backend, starting from a stable commit for a project I’m working on. If it ends up working well, I’m happy to open a PR on main.

TyTodd avatar Jun 11 '25 07:06 TyTodd

Good idea ! let me know if you have any question or if I can help

lhoestq avatar Jun 11 '25 14:06 lhoestq

@lhoestq Almost finished, but I'm having trouble understanding this test case. This is how it looks originally. The map function is called, and then with_format is called. According to the test case example["video"] is supposed to be a VideoReader. However, according to the docs its supposed to be the type passed into with_format (numpy in this case). My implementation with VideoDecoder currently does the latter, is that correct, or should it be a VideoDecoder object instead?

@require_torchvision
def test_dataset_with_video_map_and_formatted(shared_datadir):
    from torchvision.io import VideoReader

    video_path = str(shared_datadir / "test_video_66x50.mov")
    data = {"video": [video_path]}
    features = Features({"video": Video()})
    dset = Dataset.from_dict(data, features=features)
    dset = dset.map(lambda x: x).with_format("numpy")
    example = dset[0]
    assert isinstance(example["video"], VideoReader)
    # assert isinstance(example["video"][0], np.ndarray)

    # from bytes
    with open(video_path, "rb") as f:
        data = {"video": [f.read()]}
    dset = Dataset.from_dict(data, features=features)
    dset = dset.map(lambda x: x).with_format("numpy")
    example = dset[0]
    assert isinstance(example["video"], VideoReader)
    # assert isinstance(example["video"][0], np.ndarray)

TyTodd avatar Jun 12 '25 07:06 TyTodd

Hi ! It's maybe more convenient for users to always have a VideoDecoder, since they might only access a few frames and not the full video. So IMO it's fine to always return a VideoDecoder (maybe later we can extend the VideoDecoder to return other types of tensors than numpy arrays though ? 👀 it's not crucial for now though)

lhoestq avatar Jun 12 '25 14:06 lhoestq

@lhoestq ya that makes sense, looks like this functionality lives in src/datasets/formatting, where an exception is made for VideoReader objects to remain as themselves when being formatted. I'll make the necessary changes.

TyTodd avatar Jun 12 '25 14:06 TyTodd

@lhoestq I'm assuming this was also the case for torchaudio objects?

TyTodd avatar Jun 12 '25 14:06 TyTodd

We're not using torchaudio but soundfile. But anyway we unfortunately decode full audio files instead of returning a Reader and it can be interesting to fix this. Currently it always returns a dict {"array": np.array(...), "sampling_rate": int(...)}, while it would be cool to return a reader with seek() and read() - like methods as for videos.

(there is a way to make the audio change backward compatible anyway by allowing reader["array"] to return the full array)

lhoestq avatar Jun 12 '25 14:06 lhoestq

@lhoestq (sorry for the spam btw) Looks like there's a # TODO to have these returned as np.arrays instead. I'm curious why the authors didn't do it initially. Maybe a performance thing? This is from /src/datasets/formatting/np_formatter.py line 70

if config.TORCHVISION_AVAILABLE and "torchvision" in sys.modules:
            from torchvision.io import VideoReader

            if isinstance(value, VideoReader):
                return value  # TODO(QL): set output to np arrays ?

TyTodd avatar Jun 12 '25 15:06 TyTodd

Oh cool ya this is something that I could implement with torchcodec. I can add that to the PR as well.

TyTodd avatar Jun 12 '25 15:06 TyTodd

Looks like there's a # TODO to have these returned as np.arrays instead. I'm curious why the authors didn't do it initially. Maybe a performance thing?

yea that was me, I focused on a simple logic to start with, since I knew there was torchcodec coming and maybe wasn't worth it at the time ^^

but anyway it's fine to start with a logic without formatting to start with and then iterate

lhoestq avatar Jun 12 '25 19:06 lhoestq

Hey @lhoestq I ran into an error with this test case for the Audio feature

@require_sndfile
@require_torchcodec
def test_dataset_with_audio_feature_map_is_decoded(shared_datadir):
    audio_path = str(shared_datadir / "test_audio_44100.wav")
    data = {"audio": [audio_path], "text": ["Hello"]}
    features = Features({"audio": Audio(), "text": Value("string")})
    dset = Dataset.from_dict(data, features=features)

    def process_audio_sampling_rate_by_example(example):
        sample_rate = example["audio"].get_all_samples().sample_rate
        example["double_sampling_rate"] = 2 * sample_rate
        return example

    decoded_dset = dset.map(process_audio_sampling_rate_by_example)
    for item in decoded_dset.cast_column("audio", Audio(decode=False)):
        assert item.keys() == {"audio", "text", "double_sampling_rate"}
        assert item["double_sampling_rate"] == 88200

    def process_audio_sampling_rate_by_batch(batch):
        double_sampling_rates = []
        for audio in batch["audio"]:
            double_sampling_rates.append(2 * audio.get_all_samples().sample_rate)
        batch["double_sampling_rate"] = double_sampling_rates
        return batch

    decoded_dset = dset.map(process_audio_sampling_rate_by_batch, batched=True)
    for item in decoded_dset.cast_column("audio", Audio(decode=False)):
        assert item.keys() == {"audio", "text", "double_sampling_rate"}
        assert item["double_sampling_rate"] == 88200

this is the error below

src/datasets/arrow_writer.py:626: in write_batch
    arrays.append(pa.array(typed_sequence))
.....
FAILED tests/features/test_audio.py::test_dataset_with_audio_feature_map_is_decoded - pyarrow.lib.ArrowInvalid: Could not convert <torchcodec.decoders._audio_decoder.AudioDecoder object at 0x138cdd810> with type AudioDecoder: did not recognize Python value type when inferring an Arrow data type

By the way I copied the test case and ran it on the original implementation of the Video feature, which uses the torchvision backend and I got a similar error.

def test_dataset_with_video_feature_map_is_decoded(shared_datadir):
    video_path = str(shared_datadir / "test_video_66x50.mov")
    data = {"video": [video_path], "text": ["Hello"]}
    features = Features({"video": Video(), "text": Value("string")})
    dset = Dataset.from_dict(data, features=features)

    def process_audio_sampling_rate_by_example(example):
        metadata = example["video"].get_metadata()
        example["double_fps"] = 2 * metadata["video"]["fps"][0]
        return example

    decoded_dset = dset.map(process_audio_sampling_rate_by_example)
    for item in decoded_dset.cast_column("video", Video(decode=False)):
        assert item.keys() == {"video", "text", "double_fps"}
        assert item["double_fps"] == 2 * 10 # prollly wont work past 2*10 is made up!! shouldn't pass

    def process_audio_sampling_rate_by_batch(batch):
        double_fps = []
        for video in batch["video"]:
            double_fps.append(2 * video.metadata.begin_stream_seconds)
        batch["double_fps"] = double_fps
        return batch

    decoded_dset = dset.map(process_audio_sampling_rate_by_batch, batched=True)
    for item in decoded_dset.cast_column("video", Video(decode=False)):
        assert item.keys() == {"video", "text", "double_fps"}
        assert item["double_fps"] == 2 * 10 # prollly wont work past this no reason it should

I was wondering if these error's are expected. They seem to be coming from the fact that the function _cast_to_python_objects in src/datasets/features/features.py doesn't handle VideoDecoders or AudioDecoders. I was able to fix it and get rid of the error by adding this to the bottom of the function

    elif config.TORCHCODEC_AVAILABLE and "torchcodec" in sys.modules and isinstance(obj, VideoDecoder):
        v = Video()
        return v.encode_example(obj), True
    elif config.TORCHCODEC_AVAILABLE and "torchcodec" in sys.modules and isinstance(obj, AudioDecoder):
        a = Audio()
        return a.encode_example(obj), True

This fixed it, but I just want to make sure I'm not adding things that are messing up the intended functionality.

TyTodd avatar Jun 13 '25 08:06 TyTodd

This is the right fix ! :)

lhoestq avatar Jun 13 '25 08:06 lhoestq

Btw I just remembered that we were using soundfile because it can support a wide range of audio formats, is it also the case for torchcodec ? including ogg, opus for example

lhoestq avatar Jun 13 '25 12:06 lhoestq

Yes from what I understand torchcodec supports everything ffmpeg supports.

TyTodd avatar Jun 13 '25 12:06 TyTodd

Okay just finished. However, I wasn't able to pass this test case:

@require_torchcodec
@require_sndfile
@pytest.mark.parametrize("streaming", [False, True])
def test_load_dataset_with_audio_feature(streaming, jsonl_audio_dataset_path, shared_datadir):
    from torchcodec.decoders import AudioDecoder
    audio_path = str(shared_datadir / "test_audio_44100.wav")
    data_files = jsonl_audio_dataset_path
    features = Features({"audio": Audio(), "text": Value("string")})
    dset = load_dataset("json", split="train", data_files=data_files, features=features, streaming=streaming)
    item = dset[0] if not streaming else next(iter(dset))
    assert item.keys() == {"audio", "text"}
    assert isinstance(item["audio"], AudioDecoder)
    samples = item["audio"].get_all_samples()
    assert samples.sample_rate == 44100
    assert samples.data.shape == (1, 202311)

It returned this error

streaming = False, jsonl_audio_dataset_path = '/private/var/folders/47/c7dlgs_n6lx8rtr8f5w5m1m00000gn/T/pytest-of-tytodd/pytest-103/data2/audio_dataset.jsonl'
shared_datadir = PosixPath('/private/var/folders/47/c7dlgs_n6lx8rtr8f5w5m1m00000gn/T/pytest-of-tytodd/pytest-103/test_load_dataset_with_audio_f0/data')

    @require_torchcodec
    @require_sndfile
    @pytest.mark.parametrize("streaming", [False, True])
    def test_load_dataset_with_audio_feature(streaming, jsonl_audio_dataset_path, shared_datadir):
        from torchcodec.decoders import AudioDecoder
        audio_path = str(shared_datadir / "test_audio_44100.wav")
        data_files = jsonl_audio_dataset_path
        features = Features({"audio": Audio(), "text": Value("string")})
>       dset = load_dataset("json", split="train", data_files=data_files, features=features, streaming=streaming)

tests/features/test_audio.py:686: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src/datasets/load.py:1418: in load_dataset
    builder_instance.download_and_prepare(
src/datasets/builder.py:925: in download_and_prepare
    self._download_and_prepare(
src/datasets/builder.py:1019: in _download_and_prepare
    verify_splits(self.info.splits, split_dict)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

expected_splits = {'train': SplitInfo(name='train', num_bytes=2351563, num_examples=10000, shard_lengths=None, dataset_name=None), 'validation': SplitInfo(name='validation', num_bytes=238418, num_examples=1000, shard_lengths=None, dataset_name=None)}
recorded_splits = {'train': SplitInfo(name='train', num_bytes=167, num_examples=1, shard_lengths=None, dataset_name='json')}

    def verify_splits(expected_splits: Optional[dict], recorded_splits: dict):
        if expected_splits is None:
            logger.info("Unable to verify splits sizes.")
            return
        if len(set(expected_splits) - set(recorded_splits)) > 0:
>           raise ExpectedMoreSplitsError(str(set(expected_splits) - set(recorded_splits)))
E           datasets.exceptions.ExpectedMoreSplitsError: {'validation'}

src/datasets/utils/info_utils.py:68: ExpectedMoreSplitsError

It looks like this test case wasn't passing when I forked the repo, so I assume I didn't do anything to break it. I also added this case to test_video.py, and it fails there as well. If this looks good, I'll go ahead and submit the PR.

TyTodd avatar Jun 13 '25 14:06 TyTodd

Awesome ! yes feel free to submit the PR, I can see what I can do for the remaining tests

lhoestq avatar Jun 13 '25 14:06 lhoestq

@lhoestq just submitted it #7616

TyTodd avatar Jun 13 '25 19:06 TyTodd