delayless video detector
As the video frame detector causes playback delay, I modified the "detect_video.py" for delayless video detection. Two parameters introduced, where "buffersize" reading consecutive frames ahead, "delay" processing a future frame within the buffered frames.
`import time from absl import app, flags, logging from absl.flags import FLAGS import cv2 import tensorflow as tf from yolov3_tf2.models import ( YoloV3, YoloV3Tiny ) from yolov3_tf2.dataset import transform_images from yolov3_tf2.utils import draw_outputs import threading import queue queue_img = queue.Queue() buffersize = 20
flags.DEFINE_string('classes', './data/coco.names', 'path to classes file') flags.DEFINE_string('weights', './checkpoints/yolov3.tf', 'path to weights file') flags.DEFINE_boolean('tiny', False, 'yolov3 or yolov3-tiny') flags.DEFINE_integer('size', 416, 'resize images to') flags.DEFINE_string('video', './data/video.mp4', 'path to video file or number for webcam)') flags.DEFINE_integer('timeF', 3, '视频帧计数间隔频率ms)') flags.DEFINE_string('output', None, 'path to output video') flags.DEFINE_string('output_format', 'XVID', 'codec used in VideoWriter when saving video to file') flags.DEFINE_integer('num_classes', 80, 'number of classes in the model') flags.DEFINE_integer('delay', 10, 'delay frames')
def main(_argv): physical_devices = tf.config.experimental.list_physical_devices('GPU') for physical_device in physical_devices: tf.config.experimental.set_memory_growth(physical_device, True)
if FLAGS.tiny:
yolo = YoloV3Tiny(classes=FLAGS.num_classes)
yolo = YoloV3(classes=FLAGS.num_classes)
logging.info('weights loaded')
class_names = [c.strip() for c in open(FLAGS.classes).readlines()]
logging.info('classes loaded')
times = []
vid = cv2.VideoCapture(int(FLAGS.video))
vid = cv2.VideoCapture(FLAGS.video)
out = None
if FLAGS.output:
# by default VideoCapture returns float instead of int
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(vid.get(cv2.CAP_PROP_FPS))
codec = cv2.VideoWriter_fourcc(*FLAGS.output_format)
out = cv2.VideoWriter(FLAGS.output, codec, fps, (width, height))
boxes, scores, classes, nums, timeslap = None, None, None, None, None
threadrun = MyThread(yolo, queue_img, boxes, scores, classes, nums, timeslap)
while True:
imgs = []
for _ in range(buffersize):
ret, img = vid.read()
if img is None: break
if len(imgs)==0:
logging.warning("Empty Frame")
imgs_in = [cv2.cvtColor(img, cv2.COLOR_BGR2RGB) for img in imgs]
imgs_in = [tf.expand_dims(img_in, 0) for img_in in imgs_in]
imgs_in = [transform_images(img_in, FLAGS.size) for img_in in imgs_in]
for index, img in enumerate(imgs):
if queue_img.empty(): # 线程没有图片处理的时候,输送最新图片给线程
while timeslap==None:
boxes, scores, classes, nums, timeslap = threadrun.get_result()
boxes, scores, classes, nums, timeslap = threadrun.get_result()
times = times[-20:]
img = draw_outputs(img, (boxes, scores, classes, nums), class_names)
img = cv2.putText(img, "Time: {:.2f}ms".format(sum(times) / len(times) * 1000), (0, 30),
cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 2)
if FLAGS.output:
cv2.imshow('output', img)
if cv2.waitKey(1) == ord('q'):
class MyThread(threading.Thread): def init(self, model, queue_img, boxes, scores, classes, nums, timeslap): threading.Thread.init(self) self.model = model self.queue = queue_img self.b,self.s,self.c,self.n,self.t=boxes, scores, classes, nums, timeslap def run(self): while True: try: t1 = time.time() self.boxes, self.scores, self.classes, self.nums = self.model.predict(self.queue.get()) if not self.queue.empty(): self.queue.get() t2 = time.time() self.timeslap = t2-t1 except Exception: pass # signals to queue job is done self.queue.task_done() def get_result(self): try: return self.boxes, self.scores, self.classes, self.nums, self.timeslap except Exception: return self.b,self.s,self.c,self.n,self.t
if name == 'main': try: app.run(main) except SystemExit: pass `