DALI icon indicating copy to clipboard operation
DALI copied to clipboard

Handling batch sizes with fn.experimental.inputs.video

Open joey-trigo opened this issue 1 year ago • 6 comments

Describe the question.

I want to crop each frame of a video. I currently have a pipeline that takes JPEG images of each frame and crops them, and I am trying to convert it to take the whole video file at once to save time encoding and decoding JPEG files. For each frame I have a bounding box that defines where I want to crop that video. I am using fn.experimental.inputs.video to parse the video but I am not sure how I can match frames to the proper bounding box. I am basing my code loosely off of the video_decode_remap example in the triton dali backend. My code looks something like this:

def pipeline():
    # Decode video, split into 1 second batches (25 FPS)
    vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
    bbox = dali.fn.external_source(device="cpu", name="DALI_INPUT_BBOX", batch=False)

    # bboxes need to be int32 and of dims [4,1] to work as anchors for slice
    bbox = dali.fn.cast(bbox, dtype=types.INT32)
    bbox = dali.fn.expand_dims(bbox, axes=1)

    bbox_xy = dali.fn.cat(*dali.fn.element_extract(bbox, element_map=[0,1]))
    bbox_wh = dali.fn.cat(*dali.fn.element_extract(bbox, element_map=[2,3]))

    # data, anchor, shape
    crop = dali.fn.slice(images, bbox_xy, bbox_wh)

    return dali.fn.resize(crop, device="gpu", size=(320, 320))

My issue has to do with understanding how batches work. In each batch I am sending a single video file and want to send a single bbox per frame. Calling fn.experimental.inputs.video batches the frames into sequences of length 25, and so I want the bbox variable to be a batch of size 25 of bboxes.

Trying to call the pipeline with DALI_INPUT_VIDEO as a 2 second long video file and DALI_INPUT_BBOX as a dim [50, 4] causes the error:

Exception in CPU stage: [/opt/dali/dali/pipeline/executor/executor.cc:544] Assert on "bsps[0]->NextBatchSize() == bsps[i]->NextBatchSize()" failed: Batch size must be uniform across an iteration
Stacktrace (10 entries):

I also tried to change the sequence_length to 1 but that didn't help. Is there a way to do what I want to do with dali?

Check for duplicates

  • [X] I have searched the open bugs/issues and have found no duplicates for this bug report

joey-trigo avatar Apr 25 '23 15:04 joey-trigo

Assuming you want to crop the whole sequence of frames with a given anchor and size of the cropping window, you can do something like this:

from nvidia.dali import fn, pipeline_def
import numpy as np

@pipeline_def(batch_size=1, num_threads=3, device_id=0)
def pipe():
	vid = np.zeros([25, 200, 300, 3], dtype=np.uint8)  # video-like
	bbox = np.array([10, 20, 150, 250])  # assuming xywh order
	xy = fn.slice(bbox, 0, 2, axes=[0])
	wh = fn.slice(bbox, 2, 2, axes=[0])
	cropped = fn.slice(vid, xy, wh, axes=[1, 2])
	return cropped

p = pipe()
p.build()
print(p.run())

jantonguirao avatar Apr 25 '23 16:04 jantonguirao

If you wanted to apply a different cropping window per frame, currently there's no built-in way to do so, but you can do it by processing each frame independently, like this:

from nvidia.dali import fn, pipeline_def, types
import numpy as np

@pipeline_def(batch_size=1, num_threads=3, device_id=0)
def pipe(seq_len=25):
    vid = np.zeros([seq_len, 300, 200, 3], dtype=np.uint8)  # video-like (seq_len, H, W, 3)
    bboxes = np.zeros([seq_len, 4], dtype=np.int32) # bboxes like (seq_len, 4)
    bboxes[:, 0] = 10  # x
    bboxes[:, 1] = 20  # y
    bboxes[:, 2] = 150  # w
    bboxes[:, 3] = 250  # h
    bboxes_arg = types.Constant(bboxes)  # in your example this comes from external source
    
    frames = []
    for i in range(seq_len):
        # processing each frame separately
        frames.append(
            fn.slice(vid[i], bboxes_arg[i, :2], bboxes_arg[i, 2:], axes=[1, 0])
        )
    # stack all frames back to a sequence
    return fn.stack(*frames)

p = pipe()
p.build()
print(p.run())

Please keep in mind that sequences are expected to have the same frame dimensions, so if your bbox has different size, it will not work

jantonguirao avatar Apr 25 '23 16:04 jantonguirao

Thanks! That solves half of the problem of how to handle copping each frame separately. The other problem is how to handle experimental.inputs.video's streaming functionality.

If I have this code:

def pipeline():
    # Decode video, split into 1 second chunks (25 FPS)
    vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
    return vid

and I call it from python with:

triton_client.async_stream_infer(...)

I will receive multiple responses, each containing 25 frames.

On the other hand if I build this pipeline:

def pipeline():
    # Decode video, split into 1 second chunks (25 FPS)
    vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
    bbox = dali.fn.external_source(device="cpu", name="DALI_INPUT_BBOX")

    frames = []
    for i in range(25):
        frames.append(
            dali.fn.slice(vid[i], bbox[i, :2], bbox[i, 2:])
        )

    return dali.fn.stack(*frames)

I receive only a single response with the first 25 frames.

How does the async happen and what do I need to do to continue an async pipeline?

joey-trigo avatar Apr 27 '23 07:04 joey-trigo

Hi @joey-trigo !

Actually, you did the latter pipeline properly. The reason you're getting single response is that DALI Backend has one missing functionality to properly handle async pipelines, that have both fn.inputs.video and fn.external_source inside.

This PR adds this functionality: https://github.com/triton-inference-server/dali_backend/pull/191 Due to the release cycles, we need to wait until DALI 1.26 release with merging it. If you need this feature urgently, you'd need to build DALI Backend yourself (with DALI nightly release). I expect this feature to be available in Triton 23.06 release.

szalpal avatar Apr 27 '23 11:04 szalpal

Thanks @szalpal! I'm still a bit confused about how this functionality will work though. Let's say I have a video comprising 50 frames and each frame has an equal sized bounding box I want to crop to.

So I send a single batch to my pipeline consisting of 1 video file and 50 bounding boxes.

def pipeline():
    # Decode video, split into 1 second chunks (25 FPS)
    vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
    bbox = dali.fn.external_source(device="cpu", name="DALI_INPUT_BBOX")

    frames = []
    for i in range(25):
        frames.append(
            dali.fn.slice(vid[i], bbox[i, :2], bbox[i, 2:])
        )

    return dali.fn.stack(*frames)

In the first sequence vid[0] will refer to the first frame of the video, and in the second sequence vid[0] will be the 26th frame. Would I be able to match up the first frame to the first bounding box (bbox[0]) and the 26th frame to the 26th (bbox[25])?

joey-trigo avatar Apr 27 '23 12:04 joey-trigo

@joey-trigo ,

My apologies, I didn't fully understand your case earlier.

Unfortunately, at the moment this couldn't be achieved. I would envision two ways of introducing such features:

  1. Enhancing DALI's external_source operator to understand a queue of batches
  2. Allow DALI Backend to accept subset of inputs in the request.

I would prefer to resolve it by solution 2, but I don't know how feasible it is from Triton perspective (this might be a Triton limitation). If this won't be possible this way, we'll try to enhance external_source with the notion of a queue of batches.

szalpal avatar Apr 28 '23 10:04 szalpal