DALI
DALI copied to clipboard
Handling batch sizes with fn.experimental.inputs.video
Describe the question.
I want to crop each frame of a video. I currently have a pipeline that takes JPEG images of each frame and crops them, and I am trying to convert it to take the whole video file at once to save time encoding and decoding JPEG files.
For each frame I have a bounding box that defines where I want to crop that video. I am using fn.experimental.inputs.video
to parse the video but I am not sure how I can match frames to the proper bounding box. I am basing my code loosely off of the video_decode_remap example in the triton dali backend. My code looks something like this:
def pipeline():
# Decode video, split into 1 second batches (25 FPS)
vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
bbox = dali.fn.external_source(device="cpu", name="DALI_INPUT_BBOX", batch=False)
# bboxes need to be int32 and of dims [4,1] to work as anchors for slice
bbox = dali.fn.cast(bbox, dtype=types.INT32)
bbox = dali.fn.expand_dims(bbox, axes=1)
bbox_xy = dali.fn.cat(*dali.fn.element_extract(bbox, element_map=[0,1]))
bbox_wh = dali.fn.cat(*dali.fn.element_extract(bbox, element_map=[2,3]))
# data, anchor, shape
crop = dali.fn.slice(images, bbox_xy, bbox_wh)
return dali.fn.resize(crop, device="gpu", size=(320, 320))
My issue has to do with understanding how batches work. In each batch I am sending a single video file and want to send a single bbox per frame. Calling fn.experimental.inputs.video batches the frames into sequences of length 25, and so I want the bbox variable to be a batch of size 25 of bboxes.
Trying to call the pipeline with DALI_INPUT_VIDEO as a 2 second long video file and DALI_INPUT_BBOX as a dim [50, 4] causes the error:
Exception in CPU stage: [/opt/dali/dali/pipeline/executor/executor.cc:544] Assert on "bsps[0]->NextBatchSize() == bsps[i]->NextBatchSize()" failed: Batch size must be uniform across an iteration
Stacktrace (10 entries):
I also tried to change the sequence_length to 1 but that didn't help. Is there a way to do what I want to do with dali?
Check for duplicates
- [X] I have searched the open bugs/issues and have found no duplicates for this bug report
Assuming you want to crop the whole sequence of frames with a given anchor and size of the cropping window, you can do something like this:
from nvidia.dali import fn, pipeline_def
import numpy as np
@pipeline_def(batch_size=1, num_threads=3, device_id=0)
def pipe():
vid = np.zeros([25, 200, 300, 3], dtype=np.uint8) # video-like
bbox = np.array([10, 20, 150, 250]) # assuming xywh order
xy = fn.slice(bbox, 0, 2, axes=[0])
wh = fn.slice(bbox, 2, 2, axes=[0])
cropped = fn.slice(vid, xy, wh, axes=[1, 2])
return cropped
p = pipe()
p.build()
print(p.run())
If you wanted to apply a different cropping window per frame, currently there's no built-in way to do so, but you can do it by processing each frame independently, like this:
from nvidia.dali import fn, pipeline_def, types
import numpy as np
@pipeline_def(batch_size=1, num_threads=3, device_id=0)
def pipe(seq_len=25):
vid = np.zeros([seq_len, 300, 200, 3], dtype=np.uint8) # video-like (seq_len, H, W, 3)
bboxes = np.zeros([seq_len, 4], dtype=np.int32) # bboxes like (seq_len, 4)
bboxes[:, 0] = 10 # x
bboxes[:, 1] = 20 # y
bboxes[:, 2] = 150 # w
bboxes[:, 3] = 250 # h
bboxes_arg = types.Constant(bboxes) # in your example this comes from external source
frames = []
for i in range(seq_len):
# processing each frame separately
frames.append(
fn.slice(vid[i], bboxes_arg[i, :2], bboxes_arg[i, 2:], axes=[1, 0])
)
# stack all frames back to a sequence
return fn.stack(*frames)
p = pipe()
p.build()
print(p.run())
Please keep in mind that sequences are expected to have the same frame dimensions, so if your bbox has different size, it will not work
Thanks! That solves half of the problem of how to handle copping each frame separately. The other problem is how to handle experimental.inputs.video
's streaming functionality.
If I have this code:
def pipeline():
# Decode video, split into 1 second chunks (25 FPS)
vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
return vid
and I call it from python with:
triton_client.async_stream_infer(...)
I will receive multiple responses, each containing 25 frames.
On the other hand if I build this pipeline:
def pipeline():
# Decode video, split into 1 second chunks (25 FPS)
vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
bbox = dali.fn.external_source(device="cpu", name="DALI_INPUT_BBOX")
frames = []
for i in range(25):
frames.append(
dali.fn.slice(vid[i], bbox[i, :2], bbox[i, 2:])
)
return dali.fn.stack(*frames)
I receive only a single response with the first 25 frames.
How does the async happen and what do I need to do to continue an async pipeline?
Hi @joey-trigo !
Actually, you did the latter pipeline properly. The reason you're getting single response is that DALI Backend has one missing functionality to properly handle async pipelines, that have both fn.inputs.video
and fn.external_source
inside.
This PR adds this functionality: https://github.com/triton-inference-server/dali_backend/pull/191 Due to the release cycles, we need to wait until DALI 1.26 release with merging it. If you need this feature urgently, you'd need to build DALI Backend yourself (with DALI nightly release). I expect this feature to be available in Triton 23.06 release.
Thanks @szalpal! I'm still a bit confused about how this functionality will work though. Let's say I have a video comprising 50 frames and each frame has an equal sized bounding box I want to crop to.
So I send a single batch to my pipeline consisting of 1 video file and 50 bounding boxes.
def pipeline():
# Decode video, split into 1 second chunks (25 FPS)
vid = fn.experimental.inputs.video(name="DALI_INPUT_VIDEO", sequence_length=25, device='mixed')
bbox = dali.fn.external_source(device="cpu", name="DALI_INPUT_BBOX")
frames = []
for i in range(25):
frames.append(
dali.fn.slice(vid[i], bbox[i, :2], bbox[i, 2:])
)
return dali.fn.stack(*frames)
In the first sequence vid[0] will refer to the first frame of the video, and in the second sequence vid[0] will be the 26th frame. Would I be able to match up the first frame to the first bounding box (bbox[0]) and the 26th frame to the 26th (bbox[25])?
@joey-trigo ,
My apologies, I didn't fully understand your case earlier.
Unfortunately, at the moment this couldn't be achieved. I would envision two ways of introducing such features:
- Enhancing DALI's external_source operator to understand a queue of batches
- Allow DALI Backend to accept subset of inputs in the request.
I would prefer to resolve it by solution 2
, but I don't know how feasible it is from Triton perspective (this might be a Triton limitation). If this won't be possible this way, we'll try to enhance external_source
with the notion of a queue of batches.