DALI icon indicating copy to clipboard operation
DALI copied to clipboard

Numpy Decoder for Webdataset Source

Open 5had3z opened this issue 10 months ago • 5 comments

Is this a new feature, an improvement, or a change to existing functionality?

Improvement

How would you describe the priority of this feature request

Should have (e.g. Adoption is possible, but the performance shortcomings make the solution inferior).

Please provide a clear description of problem this feature solves

Numpy files stored in a webdataset are tricky to decode.

Feature Description

From an end-user-perspective, I have an object-detection dataset stored in webdataset tar files as image-numpy pairs with the same key. I want to be able to decode both the image and label data (a numpy array of x1,y1,x2,y2,cls) easily and efficiently.

@pipeline_def
def pipe()
    images, bboxes = fn.readers.webdataset(...)
    images = fn.decoders.image(images)
    bboxes = fn.decoders.numpy(bboxes)
    return (images, bboxes)

Describe your ideal solution

Similar to fn.decoders.image, there could be a fn.decoder.numpy that reads a raw numpy file format.

Describe any alternatives you have considered

Reading raw data into numpy array is a bit of a mess, this is what I have found to work reliably.

from torch.utils.dlpack import from_dlpack, to_dlpack

@pipeline_def
def pipe()
    images, bboxes  = fn.readers.webdataset(...)
    images = fn.decoders.image(images)
    bboxes = fn.dl_tensor_python_function(bboxes, function=read_numpy, batch_processing=True)
    return (images, bboxes)

def read_numpy(raw_data: list):
    """Read raw data that defines a numpy file and return decoded numpy arrays"""
    raw_arrays = [from_dlpack(d).numpy() for d in raw_data]
    test: list[np.ndarray] = [
        np.lib.format.read_array(io.BytesIO(d.copy().tobytes())) for d in raw_arrays
    ]
    return [to_dlpack(torch.from_numpy(t)) for t in test]

Additional context

I tried a normal python_function with numpy array, and not use a mess of intermediate pytorch dl_pack apis, but the program often exits and a segfault is logged in dmesg.

[344269.869129] [DALI] Executor[1221509]: segfault at 10 ip 00007a8a793d61be sp 00007a88e67ff590 error 4 in libpython3.12.so.1.0[7a8a79306000+287000] likely on CPU 8 (core 10, socket 0)
[344269.869143] Code: 48 85 f6 0f 84 b3 00 00 00 41 54 55 48 89 f5 53 66 48 8d 3d 64 1d 38 00 66 66 48 e8 6c 24 f3 ff 4c 8b 25 65 1b 38 00 48 8b 00 <48> 8b 58 10 49 3b 5c 24 30 74 0d f6 83 b0 06 00 00 20 0f 85 a2 00

Check for duplicates

  • [x] I have searched the open bugs/issues and have found no duplicates for this bug report

5had3z avatar Mar 17 '25 05:03 5had3z

Hi @5had3z,

Thank you for reaching out.

Let's add this to our To-Do list. Do you have a standalone reproduction that we can run to check the crash in the Python function? This behavior is not expected.

Additionally, consider using the DALI proxy. This allows you to use the native webdataset library and ask DALI to accelerate only the relevant parts of the data processing for you.

JanuszL avatar Mar 18 '25 11:03 JanuszL

Well, I made a minimal example, that didn't trigger the problem, so I reverted the code in the main codebase, and it didn't trigger anymore either. So I've changed back to the code that was originally crashing shown below. But it still stands it would be nice to decode numpy data natively, rather than jumping in and out of python.

@pipeline_def()
def mypipeline():
    # ....
    pipedata["bboxes"] = fn.python_function(
        pipedata["bboxes"], function=read_numpy, batch_processing=True
    )

def read_numpy(raw_data: list[np.ndarray]):
    """Read raw data that defines a numpy file and return decoded numpy arrays"""
    return [np.lib.format.read_array(io.BytesIO(d.tobytes())) for d in raw_data]

The pipeline was working fine before attempting to decode the numpy, consistently causing the errors in the image below, which was fixed after moving to using dlpack based function. I thought using that may help due to the section in the docs quoted below, maybe the normal python function didn't like the changing shape and size after decoding the array data or something.

The most universal operator is dl_tensor_python_function. DLPack is an open standard for tensor storage and many frameworks and libraries implement conversion methods to and from DLPack tensors. Internally it is used to implement all the other kinds of Python operators.

Image

5had3z avatar Mar 18 '25 12:03 5had3z

Testing code for posterity

import io
from pathlib import Path
from subprocess import run
from tempfile import TemporaryDirectory

import numpy as np
import torch
import webdataset as wds
from nvidia.dali import fn
from nvidia.dali.pipeline import pipeline_def
from nvidia.nvimgcodec import EncodeParams, Encoder
from nvidia.dali.data_node import DataNode
from nvidia.dali.plugin.pytorch import DALIGenericIterator
from torch.utils.dlpack import from_dlpack, to_dlpack

IM_EXT = ".webp"


def generate_images_label_pair(
    resolution: tuple[int, int], num_images: int
) -> tuple[np.ndarray, np.ndarray]:
    """Generates an image of random noise and a set of bounding box labels for the image"""
    images = np.random.rand(num_images, resolution[0], resolution[1], 3)
    images = (images * 255).astype(np.uint8)
    label = np.random.rand(np.random.randint(1, 8), 5).astype(np.float32)
    return images, label


def create_webdataset(
    num_samples: int, resolution: tuple[int, int], imgs: list[str], outfolder: Path
):
    """Creates a webdataset from the generate_image_label_pair function"""
    enc_params = EncodeParams()
    enc_params.quality = 101  # Lossless for webp
    encoder = Encoder()

    with wds.ShardWriter(str(outfolder / "dataset-%06d.tar")) as sink:
        for i in range(num_samples):
            images, label = generate_images_label_pair(resolution, len(imgs))
            label = wds.numpy_dumps(label)
            encoded = encoder.encode(list(images), IM_EXT, enc_params)
            sample = {"__key__": f"{i:08d}", "label.npy": label}
            sample.update(dict(zip([f"{im}{IM_EXT}" for im in imgs], encoded)))
            sink.write(sample)


def create_indices(outfolder: Path):
    for tar in outfolder.glob("*.tar"):
        index = tar.with_suffix(".idx")
        run(["wds2idx", str(tar), str(index)], check=True)


NAME = "wds_reader"


def handle_label(data: DataNode):
    # data = fn.dl_tensor_python_function(
    #     data, function=read_dlpack, batch_processing=True
    # )
    data = fn.python_function(data, function=read_numpy, batch_processing=True)
    data = fn.pad(data, fill_value=-1).gpu()
    return data


@pipeline_def(batch_size=4, num_threads=4, device_id=0)
def data_pipeline(datadir: Path, imgs: list[str]):
    tars = list(map(str, datadir.glob("*.tar")))
    indices = list(map(str, datadir.glob("*.idx")))
    exts = list(map(lambda x: f"{x}{IM_EXT}", imgs))
    exts.append("label.npy")
    wds_data = fn.readers.webdataset(
        paths=tars, index_paths=indices, ext=exts, name=NAME
    )
    pipedata: dict[str, DataNode] = dict(zip(imgs + ["label"], wds_data))
    for img in imgs:
        pipedata[img] = fn.decoders.image(pipedata[img], device="mixed")
    pipedata["label"] = handle_label(pipedata["label"])

    return tuple(pipedata[k] for k in imgs + ["label"])


def read_dlpack(raw_data: list):
    """Read raw data that defines a numpy file and return decoded numpy arrays"""
    raw_arrays = [from_dlpack(d).numpy() for d in raw_data]
    test: list[np.ndarray] = [
        np.lib.format.read_array(io.BytesIO(d.copy().tobytes())) for d in raw_arrays
    ]
    return [to_dlpack(torch.from_numpy(t)) for t in test]


def read_numpy(raw_data: list[np.ndarray]):
    """Read raw data that defines a numpy file and return decoded numpy arrays"""
    test: list[np.ndarray] = [
        np.lib.format.read_array(io.BytesIO(d.copy().tobytes())) for d in raw_data
    ]
    return test


def main():
    with TemporaryDirectory() as tmpdir:
        outfolder = Path(tmpdir)
        imgs = ["RGB", "SWIR", "LWIR"]
        create_webdataset(100, (512, 512), imgs, outfolder)
        create_indices(outfolder)
        dataloader = DALIGenericIterator(
            data_pipeline(outfolder, imgs), imgs + ["label"], reader_name=NAME
        )

        for data in dataloader:
            print(data[0].keys())
            print([d.shape for d in data[0].values()])


if __name__ == "__main__":
    main()

5had3z avatar Mar 18 '25 12:03 5had3z

Anyway, looking forward to GTC!

5had3z avatar Mar 18 '25 12:03 5had3z

Unfortunately I tried to repro that with:

docker run --rm -ti --gpus all nvcr.io/nvidia/pytorch:25.02-py3

and running the scrip the loos as below:

pip install webdataset
x=0
while : ; do
    x=$((x+1))
    echo $x
    python test.py
    if [ $? != "0" ]; then
        break;
    fi
done

I reached out 150 iterations without any issue.

JanuszL avatar Mar 18 '25 16:03 JanuszL