VimbaPython
VimbaPython copied to clipboard
streaming asynchronous_ grab using flask
Hey, all.
I need to use asynchronous mode to streaming frames from camera using flask.
Below is my code, based on 'Examples/asynchronous_grab.py' and the flask code I got this reference: https://towardsdatascience.com/video-streaming-in-web-browsers-with-opencv-flask-93a38846fe00.
import sys
from typing import Optional, Tuple
from vimba import *
#Import necessary libraries
from flask import Flask, Response
import cv2
import threading
#Initialize the Flask app
app = Flask(__name__)
def print_preamble():
print('///////////////////////////////////////////')
print('/// Vimba API Asynchronous Grab Example ///')
print('///////////////////////////////////////////\n')
def print_usage():
print('Usage:')
print(' python asynchronous_grab.py [/x] [-x] [camera_id]')
print(' python asynchronous_grab.py [/h] [-h]')
print()
print('Parameters:')
print(' /x, -x If set, use AllocAndAnnounce mode of buffer allocation')
print(' camera_id ID of the camera to use (using first camera if not specified)')
print()
def abort(reason: str, return_code: int = 1, usage: bool = False):
print(reason + '\n')
if usage:
print_usage()
sys.exit(return_code)
def parse_args() -> Tuple[Optional[str], AllocationMode]:
args = sys.argv[1:]
argc = len(args)
allocation_mode = AllocationMode.AnnounceFrame
cam_id = ""
for arg in args:
if arg in ('/h', '-h'):
print_usage()
sys.exit(0)
elif arg in ('/x', '-x'):
allocation_mode = AllocationMode.AllocAndAnnounceFrame
elif not cam_id:
cam_id = arg
if argc > 2:
abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)
return (cam_id if cam_id else None, allocation_mode)
def get_camera(camera_id: Optional[str]) -> Camera:
with Vimba.get_instance() as vimba:
if camera_id:
try:
return vimba.get_camera_by_id(camera_id)
except VimbaCameraError:
abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))
else:
cams = vimba.get_all_cameras()
if not cams:
abort('No Cameras accessible. Abort.')
return cams[0]
def setup_camera(cam: Camera):
with cam:
# Enable auto exposure time setting if camera supports it
try:
cam.ExposureAuto.set('Continuous')
except (AttributeError, VimbaFeatureError):
pass
# Enable white balancing if camera supports it
try:
cam.BalanceWhiteAuto.set('Continuous')
except (AttributeError, VimbaFeatureError):
pass
# Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.
try:
cam.GVSPAdjustPacketSize.run()
while not cam.GVSPAdjustPacketSize.is_done():
pass
except (AttributeError, VimbaFeatureError):
pass
# Query available, open_cv compatible pixel formats
# prefer color formats over monochrome formats
cv_fmts = intersect_pixel_formats(cam.get_pixel_formats(), OPENCV_PIXEL_FORMATS)
color_fmts = intersect_pixel_formats(cv_fmts, COLOR_PIXEL_FORMATS)
if color_fmts:
cam.set_pixel_format(color_fmts[0])
else:
mono_fmts = intersect_pixel_formats(cv_fmts, MONO_PIXEL_FORMATS)
if mono_fmts:
cam.set_pixel_format(mono_fmts[0])
else:
abort('Camera does not support a OpenCV compatible format natively. Abort')
def gen_frames(frame):
print('hello')
return (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
def frame_handler(cam: Camera, frame: Frame):
if frame.get_status() == FrameStatus.Complete:
print('{} acquired {}'.format(cam, frame), flush=True)
# ===============
# ===============
img = frame.as_opencv_image()
rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
ret, jpeg = cv2.imencode('.jpg', rgb_img)
fr = jpeg.tobytes()
gen_frames(fr)
cam.queue_frame(frame)
# ===============
# ===============
def main():
print_preamble()
cam_id, allocation_mode = parse_args()
with Vimba.get_instance() as vimba:
cams = vimba.get_all_cameras()
with cams[0] as cam:
setup_camera(cam)
print('Press <enter> to stop Frame acquisition.')
try:
# Start Streaming with a custom a buffer of 10 Frames (defaults to 5)
cam.start_streaming(handler=frame_handler, buffer_count=10, allocation_mode=allocation_mode)
input()
finally:
cam.stop_streaming()
@app.route('/video_feed')
def video_feed():
return Response(frame_handler(), mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
threading.Thread(target = main).start()
threading.Thread(target = app.run(debug=True, host="0.0.0.0", port=8888)).start()
I'm facing this error when I open the local server, but I can't find out a way to resolve it. Hope somebody can help me with this issue.
Observations: I'm using a USB-camera model
- Name: Allied Vision 1800 U-1240c
- Model: 1800 U-1240c
Hello Lucas-Mantuan,
my collegue checked your code and was able to execute it after modifying the line here;
def video_feed(): return Response(frame_handler(), mimetype='multipart/x-mixed-replace; boundary=frame')
You just need to give the frame_handler cam and frame as an argument and it should run.
Cheers,
Teresa