DALI icon indicating copy to clipboard operation
DALI copied to clipboard

How to move depths to the same gpu as decoded_imgs?

Open kristinat8 opened this issue 1 year ago • 6 comments

Describe the question.

Hi, I want to load images and depth via external_source,images are in jpg format and depth is in npy format, I need the depth and images to match. In pipe, images are on gpu, but depths are on cpu, for subsequent processing I need to do the same cropping, and stitching. The following problem occurs in fn.cat:

batch_size = 32

class ExternalInputCallable:
    def __init__(self, batch_size):
        self.img_dir = 'dataset-all'
        self.depth_dir = 'depth'
        self.batch_size = batch_size
        self.img_files = sorted(glob.glob(os.path.join(self.img_dir, '*.jpg')), key = self.extract_number)
        self.npz_files = [
            os.path.join(self.depth_dir, os.path.splitext(os.path.basename(f))[0]+ '.npz')
            for f in self.img_files
        ]
        self.full_iterations = len(self.img_files) // batch_size

    def extract_number(self, filename):
        base_name = os.path.basename(filename)
        frame_number = int(base_name.split('_')[1].split('.')[0])
        return frame_number
    
    def __call__(self, sample_info):
        sample_index = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            raise StopIteration
        
        jpg_filename = self.img_files[sample_index]
        depth_filename = self.npz_files[sample_index]


        with open(jpg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)

        with np.load(depth_filename, allow_pickle=True) as data:
            depth_data = data['depth'].astype(np.uint8)
            depth_data = np.expand_dims(depth_data, axis=2)

        return encoded_img, depth_data

    
@pipeline_def(
    batch_size=batch_size, num_threads=16, device_id=0, py_num_workers=32
)
def combined_pipeline():
    jpegs , depths = fn.external_source(
        source=ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.UINT8],
    )
    decoded_imgs = fn.decoders.image(jpegs, device="mixed")
    
    combined = fn.cat(decoded_imgs, depths, axis=2)
    return combined ,depths

pipe = combined_pipeline()
pipe.build()

start_time = time.time()
for i in range(100):
    outputs = pipe.run()

    combined_data = outputs

end_time = time.time()
print(f"time: {end_time-start_time}")

Traceback (most recent call last): File "mashangshan.py", line 70, in outputs = pipe.run() File "/home/u202320081200023/miniconda3/envs/dora/lib/python3.8/site-packages/nvidia/dali/pipeline.py", line 1328, in run return self.outputs() File "/home/u202320081200023/miniconda3/envs/dora/lib/python3.8/site-packages/nvidia/dali/pipeline.py", line 1166, in outputs return self._outputs() File "/home/u202320081200023/miniconda3/envs/dora/lib/python3.8/site-packages/nvidia/dali/pipeline.py", line 1251, in _outputs return self._pipe.Outputs() RuntimeError: Critical error in pipeline: Error in GPU operator nvidia.dali.fn.cat, which was used in the pipeline definition with the following traceback:

File "mashangshan.py", line 62, in combined_pipeline combined = fn.cat(decoded_imgs, depths, axis=2)

encountered:

Assert on "inputs_[idx].device == StorageDevice::GPU" failed: The input 1 is not on the requested device (GPU). C++ context: [/opt/dali/dali/pipeline/workspace/workspace.h:637] Current pipeline object is no longer valid.

Check for duplicates

  • [X] I have searched the open bugs/issues and have found no duplicates for this bug report

kristinat8 avatar Aug 25 '24 11:08 kristinat8

Hi @kristinat8,

Thank you for reaching out. Can you try:

 combined = fn.cat(decoded_imgs, depths.gpu(), axis=2)

JanuszL avatar Aug 26 '24 06:08 JanuszL

Hi @kristinat8,

Thank you for reaching out. Can you try:

 combined = fn.cat(decoded_imgs, depths.gpu(), axis=2)

If I want to process the depth in the pipeline in CHW format is there any way I can do it?,Thanks!

kristinat8 avatar Aug 26 '24 09:08 kristinat8

Hi @kristinat8,

I'm not sure if I understand your ask.

JanuszL avatar Aug 26 '24 12:08 JanuszL

Hi @kristinat8,

I'm not sure if I understand your ask.

Hello, when I use distributed computing, I encountered a decoding error with the following code. Could you please tell me what might be the reason?

@pipeline_def(batch_size=80, num_threads=16, enable_conditionals=True)
def VideoPipe(total_picture, file_list, dfile_list, local_crops_number, frame_per_clip):
    rank = utils.get_rank()
    world_size = utils.get_world_size()
    input = fn.readers.file(file_list=file_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
    depth = fn.readers.file(file_list=dfile_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
    shapes = fn.peek_image_shape(input[0])
    # crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
    # crop_anchor = fn.permute_batch(crop_anchor, indices=batch_size * [0])
    # crop_shape = fn.permute_batch(crop_shape , indices=batch_size * [0])
    # print(crop_shape)
    
    # init_crop
    num_clips = total_picture // frame_per_clip
    indices = np.concatenate([i * np.ones(frame_per_clip, dtype=int) for i in range(num_clips)])
    indices = indices.tolist()

    crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
    crop_anchor = fn.permute_batch(crop_anchor, indices=indices)
    crop_shape = fn.permute_batch(crop_shape , indices=indices)


    images = fn.decoders.image_slice(input[0], crop_anchor, crop_shape, device="mixed", axis_names="HW").gpu() #HWC
    depths = fn.decoders.image_slice(depth[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC

    images = fn.resize(images, resize_x=300, resize_y=300, device="gpu") #HWC
    depths = fn.resize(depths, resize_x=300, resize_y=300, device="gpu") #HWC

    frames = fn.transpose(images, perm=[2, 0, 1]) #CHW
    depths = fn.transpose(depths, perm=[2, 0, 1]) #CHW
    # frames = fn.normalize(frames, mean=0.0, stddev=255.0, device="gpu") #CHW

    global1, global1_depth= process_global(frames, depths, indices)
    # global1_combined = process_global(frames, depths, indices)
    locals, local_depths = map(list, zip(*[process_local(frames, depths) for _ in range(local_crops_number)]))
    return global1, global1_depth, *locals, *local_depths

ERROR] [nvjpeg_cudadecoder] Could not decode jpeg code stream - nvjpeg error #4 (Jpeg not supported) when running nvjpegJpegStreamParse(handle, static_cast<const unsigned char*>(stream_ctx->encoded_streamdata), stream_ctx->encoded_stream_datasize, false, false, p.parsestate.nvjpegstream) at /home/jenkins/agent/workspace/nvimagecodec/helpers/release_v0.3.0/Release_11/build/extensions/nvjpeg/cuda_decoder.cpp:472 [WARNING] [nvimgcodec] [nvjpeg_cuda_decoder] decode #110 fallback Epoch: [0/100]: 1%|█ | 5/974 [00:59<3:06:05, 11.52s/it][ERROR] [libjpeg_turbo_decoder] Could not decode jpeg code stream - Premature end of JPEG data. Stopped at line 962/2061

kristinat8 avatar Aug 27 '24 08:08 kristinat8

> > I'm not sure if I understand your ask.
> > 
> > * if you want to specify the `depths` layout - please use `layout` argument of [the `external source` operator](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.external_source.html#nvidia.dali.fn.external_source)
> > * you can use [the `transpose` operator](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.transpose.html) to change the layout
> > * you can use [th `crop-mirrorp-normalize`](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.crop_mirror_normalize.html#) operator to change the data layout using `output_layout` argument (and leave other arguments to their defaults)
> 
> Hello, when I use distributed computing, I encountered a decoding error with the following code. Could you please tell me what might be the reason?
> 
> @pipeline_def(batch_size=80, num_threads=16, enable_conditionals=True)
> def VideoPipe(total_picture, file_list, dfile_list, local_crops_number, frame_per_clip):
>     rank = utils.get_rank()
>     world_size = utils.get_world_size()
>     input = fn.readers.file(file_list=file_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
>     depth = fn.readers.file(file_list=dfile_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
>     shapes = fn.peek_image_shape(input[0])
>     # crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
>     # crop_anchor = fn.permute_batch(crop_anchor, indices=batch_size * [0])
>     # crop_shape = fn.permute_batch(crop_shape , indices=batch_size * [0])
>     # print(crop_shape)
>     
>     # init_crop
>     num_clips = total_picture // frame_per_clip
>     indices = np.concatenate([i * np.ones(frame_per_clip, dtype=int) for i in range(num_clips)])
>     indices = indices.tolist()
> 
>     crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
>     crop_anchor = fn.permute_batch(crop_anchor, indices=indices)
>     crop_shape = fn.permute_batch(crop_shape , indices=indices)
> 
> 
>     images = fn.decoders.image_slice(input[0], crop_anchor, crop_shape, device="mixed", axis_names="HW").gpu() #HWC
>     depths = fn.decoders.image_slice(depth[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC
> 
>     images = fn.resize(images, resize_x=300, resize_y=300, device="gpu") #HWC
>     depths = fn.resize(depths, resize_x=300, resize_y=300, device="gpu") #HWC
> 
>     frames = fn.transpose(images, perm=[2, 0, 1]) #CHW
>     depths = fn.transpose(depths, perm=[2, 0, 1]) #CHW
>     # frames = fn.normalize(frames, mean=0.0, stddev=255.0, device="gpu") #CHW
> 
>     global1, global1_depth= process_global(frames, depths, indices)
>     # global1_combined = process_global(frames, depths, indices)
>     locals, local_depths = map(list, zip(*[process_local(frames, depths) for _ in range(local_crops_number)]))
>     return global1, global1_depth, *locals, *local_depths
> ERROR] [nvjpeg_cudadecoder] Could not decode jpeg code stream - nvjpeg error [#4](https://github.com/NVIDIA/DALI/pull/4) (Jpeg not supported) when running nvjpegJpegStreamParse(handle, static_cast<const unsigned char*>(stream_ctx->encoded_streamdata), stream_ctx->encoded_stream_datasize, false, false, p.parsestate.nvjpegstream) at /home/jenkins/agent/workspace/nvimagecodec/helpers/release_v0.3.0/Release_11/build/extensions/nvjpeg/cuda_decoder.cpp:472 [WARNING] [nvimgcodec] [nvjpeg_cuda_decoder] decode [#110](https://github.com/NVIDIA/DALI/pull/110) fallback Epoch: [0/100]: 1%|█ | 5/974 [00:59<3:06:05, 11.52s/it][ERROR] [libjpeg_turbo_decoder] Could not decode jpeg code stream - Premature end of JPEG data. Stopped at line 962/2061

But I didn’t encounter any errors when decoding individually?
```python
@pipeline_def(batch_size=80, num_threads=8 , device_id=0 ,enable_conditionals=True)
def VideoPipe(total_picture, file_list, local_crops_number, frame_per_clip):
    # input = fn.readers.file(file_list=file_list, random_shuffle=False)
    depth = fn.readers.file(file_list=file_list, random_shuffle=False)
    # shapes = fn.peek_image_shape(input[0])
    # print(type(shapes))
    # crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
    # crop_anchor = fn.permute_batch(crop_anchor, indices=batch_size * [0])
    # crop_shape = fn.permute_batch(crop_shape , indices=batch_size * [0])
    # print(crop_shape)
    
    # init_crop
    num_clips = total_picture // frame_per_clip
    indices = np.concatenate([i * np.ones(frame_per_clip, dtype=int) for i in range(num_clips)])
    indices = indices.tolist()

    crop_anchor, crop_shape= fn.random_crop_generator(np.array([2160, 3840, 3], dtype=np.int64), random_area=[0.2, 1.0])
    crop_anchor = fn.permute_batch(crop_anchor, indices=indices)
    crop_shape = fn.permute_batch(crop_shape , indices=indices)


    # images = fn.decoders.image_slice(input[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC
    depths = fn.decoders.image_slice(depth[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC

    # images = fn.resize(images, resize_x=300, resize_y=300, device="gpu") #HWC
    depths = fn.resize(depths, resize_x=300, resize_y=300, device="gpu") #HWC

    # frames = fn.transpose(images, perm=[2, 0, 1]) #CHW
    depths = fn.transpose(depths, perm=[2, 0, 1]) #CHW
    # frames = fn.normalize(frames, mean=0.0, stddev=255.0, device="gpu") #CHW

    # global1, global1_depth= process_global(frames, depths, indices)
    # # global1_combined = process_global(frames, depths, indices)
    # locals, local_depths = map(list, zip(*[process_local(frames, depths) for _ in range(local_crops_number)]))


    return depths 

kristinat8 avatar Aug 27 '24 08:08 kristinat8

Hi @kristinat8,

Can you provide a full error log as it should print the name of the image that caused this error? It seems that one of the images in your data set is corrupted. Can you check if you can open it in any image viewer? If so then it would help us a lot if you could provide the image for our examination, maybe there is a gap in our decoding support.

JanuszL avatar Aug 27 '24 14:08 JanuszL