VimbaPython
VimbaPython copied to clipboard
Write video to file
Hello!
I am trying to write frames to a file, but it does not work. There are no error messages, but the file I write the frames to does not work. We are using the USB camera (1800 U-240c).
I used the Asynchronous Grab with OpenCV Example to start with, and modified it to write the file with open cv. When I run the script, it does show the current frames (although it is slow) but when I click on the file that was made (open with standard windows media player or similar), the media player gives an error message.
I am just a beginner with open cv and vimba, so I hope someone here knows more.
import threading
import numpy as np
import sys
import cv2
from typing import Optional
from vimba import *
from datetime import datetime
import time
from cv2 import VideoWriter
from cv2 import VideoWriter_fourcc
import pandas as pd
def print_preamble():
print('///////////////////////////////////////////////////////')
print('/// Vimba API Asynchronous Grab with OpenCV Example ///')
print('///////////////////////////////////////////////////////\n')
def print_usage():
print('Usage:')
print(' python asynchronous_grab_opencv.py [camera_id]')
print(' python asynchronous_grab_opencv.py [/h] [-h]')
print()
print('Parameters:')
print(' camera_id ID of the camera to use (using first camera if not specified)')
print()
def abort(reason: str, return_code: int = 1, usage: bool = False):
print(reason + '\n')
if usage:
print_usage()
sys.exit(return_code)
def parse_args() -> Optional[str]:
args = sys.argv[1:]
argc = len(args)
for arg in args:
if arg in ('/h', '-h'):
print_usage()
sys.exit(0)
if argc > 1:
abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)
return None if argc == 0 else args[0]
def get_camera(camera_id: Optional[str]) -> Camera:
with Vimba.get_instance() as vimba:
if camera_id:
try:
return vimba.get_camera_by_id(camera_id)
except VimbaCameraError:
abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))
else:
cams = vimba.get_all_cameras()
if not cams:
abort('No Cameras accessible. Abort.')
return cams[0]
def setup_camera(cam: Camera):
with cam:
# Enable auto exposure time setting if camera supports it
try:
cam.ExposureAuto.set('Continuous')
except (AttributeError, VimbaFeatureError):
pass
# Enable white balancing if camera supports it
try:
cam.BalanceWhiteAuto.set('Continuous')
except (AttributeError, VimbaFeatureError):
pass
# Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.
try:
cam.GVSPAdjustPacketSize.run()
while not cam.GVSPAdjustPacketSize.is_done():
pass
except (AttributeError, VimbaFeatureError):
pass
# Query available, open_cv compatible pixel formats
# prefer color formats over monochrome formats
cv_fmts = intersect_pixel_formats(cam.get_pixel_formats(), OPENCV_PIXEL_FORMATS)
color_fmts = intersect_pixel_formats(cv_fmts, COLOR_PIXEL_FORMATS)
if color_fmts:
cam.set_pixel_format(color_fmts[0])
else:
mono_fmts = intersect_pixel_formats(cv_fmts, MONO_PIXEL_FORMATS)
if mono_fmts:
cam.set_pixel_format(mono_fmts[0])
print(cam.set_pixel_format(mono_fmts[0]))
else:
abort('Camera does not support a OpenCV compatible format natively. Abort.')
def main():
print_preamble()
with Vimba.get_instance() as vimba:
cams = vimba.get_all_cameras()
with cams[0] as cam:
setup_camera(cam)
video = VideoWriter('newvid.avi', VideoWriter_fourcc(*'MJPG'), 25.0, (1216, 1936))
while True:
frame = cam.get_frame()
frame.convert_pixel_format(PixelFormat.Bgra8)
frame = frame.as_opencv_image()
cv2.imshow('show', frame)
video.write(frame)
# check if frame size is correct
# newframe = np.reshape(np.array(frame), (frame.shape[0], frame.shape[1]))
# DF = pd.DataFrame(newframe)
# DF.to_csv('data1.csv')
if cv2.waitKey(1) & 0xFF == ord('d'):
break
cam.stop_streaming()
cv2.destroyAllWindows()
video.release()
if __name__ == '__main__':
main()
With respect tot the above issue I would request you to refer tot the below example code:
import sys import threading from typing import Optional from vimba import * import cv2 import queue
frame_queue = queue.Queue()
writer = None
def print_preamble(): print('///////////////////////////////////////////') print('/// Vimba API Asynchronous Grab Example ///') print('///////////////////////////////////////////\n')
def print_usage(): print('Usage:') print(' python asynchronous_grab.py [camera_id]') print(' python asynchronous_grab.py [/h] [-h]') print() print('Parameters:') print(' camera_id ID of the camera to use (using first camera if not specified)') print()
def abort(reason: str, return_code: int = 1, usage: bool = False): print(reason + '\n')
if usage:
print_usage()
sys.exit(return_code)
def parse_args() -> Optional[str]: args = sys.argv[1:] argc = len(args)
for arg in args:
if arg in ('/h', '-h'):
print_usage()
sys.exit(0)
if argc > 1:
abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)
return None if argc == 0 else args[0]
def get_camera(camera_id: Optional[str]) -> Camera: with Vimba.get_instance() as vimba: if camera_id: try: return vimba.get_camera_by_id(camera_id)
except VimbaCameraError:
abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))
else:
cams = vimba.get_all_cameras()
if not cams:
abort('No Cameras accessible. Abort.')
return cams[0]
def setup_camera(cam: Camera): with cam: # Try to adjust GeV packet size. This Feature is only available for GigE - Cameras. try: cam.GVSPAdjustPacketSize.run()
while not cam.GVSPAdjustPacketSize.is_done():
pass
except (AttributeError, VimbaFeatureError):
pass
# Set acquistion frame rate to 25 fps
try:
cam.TriggerSource.Set("FixedRate")
cam.AcquisitionFrameRateAbs.Set(25)
except (AttributeError, VimbaFeatureError):
pass
def frame_handler(cam: Camera, frame: Frame): print('{} acquired {}'.format(cam, frame), flush=True)
global frame_queue
frame_queue.put(frame.as_opencv_image())
cv_image = frame.as_opencv_image()
cv2.imshow("Display", cv_image)
cv2.waitKey(1)
cam.queue_frame(frame)
def write_to_file(): global frame_queue global writer
while True:
img = frame_queue.get()
print("took image from queue")
if writer is None:
height, width = img.shape[0:2]
size = (width, height)
print(size)
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
writer = cv2.VideoWriter('filename2.avi',
fourcc,
25,
size)
writer.write(cv2.cvtColor(img, cv2.COLOR_GRAY2BGR))
frame_queue.task_done()
def main(): print_preamble() cam_id = parse_args()
with Vimba.get_instance():
with get_camera(cam_id) as cam:
setup_camera(cam)
print('Press <enter> to stop Frame acquisition.')
try:
# Start Streaming with a custom a buffer of 10 Frames (defaults to 5)
threading.Thread(target=write_to_file, daemon=True).start()
cam.start_streaming(handler=frame_handler, buffer_count=10)
input()
finally:
cam.stop_streaming()
frame_queue.join()
writer.release()
if name == 'main': main()
Hi, Im hitting enter after the required number of frames have been captured but it returns an AttributeError: 'numpy.ndarray' object has no attribute 'join'. How do i resolve this?