DALI
DALI copied to clipboard
DaLi Pipeline is hanged somewhere?
Hi, I am using DaLi for fast bad images check. In my subprocess, I first download images from website and then evoke DaLi to decode these images. But I found my subprocess is hanged somewhere. This is non-deterministic. And at most of time, it works very well. And it is hard to reproduce the problem.
Below is the example code. My questions are:
- When Dali Pipeline is built, will Dali produce Child processes? If yes, what should I do to make sure child processes exit safely?
- If the Pipeline encounters some error (e.g., some images can't be decoded), is it possible at somewhere the process will be hanged?
- BTW, when encounters bad images, I will delete the pipeline and rebuild, is my code safe enough?
Thanks a lot! Looking forward to your answers!
class DaliChecker:
def __init__(self, batchsize=1, prefetch=1):
self.batch_size = batchsize
self.prefetch = prefetch
self.device_id = None
if self.device_id is None:
self.device = "cpu"
else:
self.device = "mixed"
self.make_pipe()
self.pipe.build()
def make_pipe(self):
self.pipe = Pipeline(batch_size=self.batch_size, num_threads=2, prefetch_queue_depth=self.prefetch, device_id=self.device_id)
with self.pipe:
self.files = fn.external_source()
images = fn.decoders.image(self.files, device=self.device)
self.pipe.set_outputs(images)
def feed(self, images):
self.pipe.feed_input(self.files, images)
def check_bad_img_w_dali(dali_decoder, img_path):
def dali_check(dali_decoder, img_buffer):
try:
dali_decoder.feed(img_buffer)
dali_decoder.pipe.run()
return True, dali_decoder
except:
del dali_decoder
gc.collect()
dali_decoder = DaliChecker()
return False, dali_decoder
try:
with open(img_path, 'rb') as f:
img_bytes = f.read()
img_data = np.frombuffer(img_bytes, dtype=np.uint8)
status, dali_decoder = dali_check(dali_decoder, [img_data])
return status, dali_decoder
except:
return False, dali_decoder
def download_subprocess(targetlist):
dali_decoder = DaliChecker()
for img_url in targetlist:
#download image from website
img_path = download(img_url)
#check whether this image can be decoded by Dali
status, dali_decoder = check_bad_img_w_dali(dali_decoder, img_path)
if __name__ == '__main__':
import multiprocessing as mp
mp.set_start_method('spawn')
for idx in range(num_workers):
p = Process(target = download_subprocess, args=(targetlist, ))
p.daemon = True
p.start()
Hi @ChangXuSunny. image_decoder
should not hang on bad images. In such case, there should be an error (exception). The code you show looks safe to me, it should allow to recover after an error by deleting the pipeline and creating again. I can't think of a reason why your pipeline would hang. If you could provide self-contained reproduction instructions (even if not deterministic) we could give it a run on our end to figure out what is going on.
Couple of questions:
- Does it hang when hitting a bad image? Does it error out first?
- What kind of bad images are you checking (data corruption?) Thank you.
Closing this issue now. If you still need help, please reopen.