DALI icon indicating copy to clipboard operation
DALI copied to clipboard

DaLi Pipeline is hanged somewhere?

Open ChangXuSunny opened this issue 2 years ago • 1 comments

Hi, I am using DaLi for fast bad images check. In my subprocess, I first download images from website and then evoke DaLi to decode these images. But I found my subprocess is hanged somewhere. This is non-deterministic. And at most of time, it works very well. And it is hard to reproduce the problem.

Below is the example code. My questions are:

  • When Dali Pipeline is built, will Dali produce Child processes? If yes, what should I do to make sure child processes exit safely?
  • If the Pipeline encounters some error (e.g., some images can't be decoded), is it possible at somewhere the process will be hanged?
  • BTW, when encounters bad images, I will delete the pipeline and rebuild, is my code safe enough?

Thanks a lot! Looking forward to your answers!

class DaliChecker:
    def __init__(self, batchsize=1, prefetch=1):
        self.batch_size = batchsize
        self.prefetch = prefetch
        self.device_id = None
        if self.device_id is None:
            self.device = "cpu"
        else:
            self.device = "mixed"
        self.make_pipe()
        self.pipe.build()

    def make_pipe(self):
        self.pipe = Pipeline(batch_size=self.batch_size, num_threads=2, prefetch_queue_depth=self.prefetch, device_id=self.device_id)
        with self.pipe:
            self.files = fn.external_source()
            images = fn.decoders.image(self.files, device=self.device)
            self.pipe.set_outputs(images)

    def feed(self, images):
        self.pipe.feed_input(self.files, images)


def check_bad_img_w_dali(dali_decoder, img_path):
    def dali_check(dali_decoder, img_buffer):
        try:      
            dali_decoder.feed(img_buffer)
            dali_decoder.pipe.run()
            return True, dali_decoder
        except:
            del dali_decoder
            gc.collect()
            dali_decoder = DaliChecker()
            return False, dali_decoder
    try:
        with open(img_path, 'rb') as f:
            img_bytes = f.read()
        img_data = np.frombuffer(img_bytes, dtype=np.uint8)
        status, dali_decoder = dali_check(dali_decoder, [img_data])
        return status, dali_decoder
    except:
        return False, dali_decoder

def download_subprocess(targetlist):
    dali_decoder = DaliChecker()
    for img_url in targetlist:
        #download image from website
        img_path = download(img_url)
        #check whether this image can be decoded by Dali
        status, dali_decoder = check_bad_img_w_dali(dali_decoder, img_path)

if __name__ == '__main__':
    import multiprocessing as mp
    mp.set_start_method('spawn')
    for idx in range(num_workers):
        p = Process(target = download_subprocess, args=(targetlist, ))
        p.daemon = True
        p.start()

ChangXuSunny avatar May 13 '22 02:05 ChangXuSunny

Hi @ChangXuSunny. image_decoder should not hang on bad images. In such case, there should be an error (exception). The code you show looks safe to me, it should allow to recover after an error by deleting the pipeline and creating again. I can't think of a reason why your pipeline would hang. If you could provide self-contained reproduction instructions (even if not deterministic) we could give it a run on our end to figure out what is going on. Couple of questions:

  • Does it hang when hitting a bad image? Does it error out first?
  • What kind of bad images are you checking (data corruption?) Thank you.

jantonguirao avatar May 13 '22 07:05 jantonguirao

Closing this issue now. If you still need help, please reopen.

jantonguirao avatar Feb 06 '23 10:02 jantonguirao