Efficient Camera Stream With Python
Last Updated on August 24, 2023 by Editorial Team
Author(s): Argo Saakyan
Originally published on Towards AI.
Letβs talk about using webcams with Python. I had a simple task of reading frames from the camera and running a neural net on each frame. With one specific webcam, I was having issues with setting up targeted fps (as I now understand β because the camera could run 30 fps with mjpeg format, but not raw), so I decided to dig into FFmpeg to see if it helps.
I ended up getting both OpenCV and FFmpeg working, but I found out a very interesting thing: FFmpeg performance was superior to OpenCV is my main use case. In fact, with FFmpeg, I had a 15x speedup for reading the frame and a 32% speedup for the whole pipeline. I could not believe the results and rechecked everything several times, but they were consistent.
Note: performance was exactly the same, when I just read frame after frame, but FFmpeg was faster when I ran something after reading the frame (which takes time). Iβll show exactly what I mean below.
Now, letβs take a look at the code. Firstly β class for reading webcam frames with OpenCV:
class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()
def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap
def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame
def release(self):
self.cap.release()
def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False
I use wait_for_cam
function, as cameras often need time 'warm up'. Same warmup is used with FFmpeg class:
class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()
def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")
command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]
if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")
return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)
def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image
def release(self):
self.pipe.terminate()
def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False
For timing run
function, I used decorator:
def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result
return wrapper
As a heavy synthetic task, in place of a neural net, I used this simple function (it also could be just time.sleep
). This is a very important part, as without any task, reading speeds are the same for both OpenCV and FFmpeg:
def computation_task():
for _ in range(5000000):
9999 * 9999
Now function with a cycle where I read the frame, the time it, run computation_task
:
@timeit
def run(cam: VideoStreamCV U+007C VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)
if run_task:
computation_task()
cam.release()
return round(np.mean(timer), 4)
And finally main
function where I set up a couple of parameters, init 2 video streams with OpenCV and FFmpeg, and run them without computation_task
and with it.
def main():
fsp = 30
resolution = (1920, 1080)
for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)
print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")
And here is what I get:
FFMPEG, task False:
Main function time: 3.2334s
Mean frame read time: 0.0323s
CV2, task False:
Main function time: 3.3934s
Mean frame read time: 0.0332s
FFMPEG, task True:
Main function time: 4.461s
Mean frame read time: 0.0014s
CV2, task True:
Main function time: 6.6833s
Mean frame read time: 0.023s
So, without a synthetic task, I get the same reading time: 0.0323
, 0.0332
. But with synthetic task: 0.0014
and 0.023
, so FFmpeg is significantly faster. The beauty is that I got a real speedup with my neural net application, not only with synthetic tests, so I decided to share the results.
Here is a graph that shows how much time it takes for 1 iteration: read the frame, process it with a yolov8s model (on CPU), and save frames with detected objects:
Here is a full script with synthetic tests:
import platform
import subprocess
import time
from typing import Tuple
import cv2
import numpy as np
class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()
def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")
command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]
if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")
return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)
def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image
def release(self):
self.pipe.terminate()
def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False
class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()
def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap
def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame
def release(self):
self.cap.release()
def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False
def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result
return wrapper
def computation_task():
for _ in range(5000000):
9999 * 9999
@timeit
def run(cam: VideoStreamCV U+007C VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)
if run_task:
computation_task()
cam.release()
return round(np.mean(timer), 4)
def main():
fsp = 30
resolution = (1920, 1080)
for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)
print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")
if __name__ == "__main__":
main()
Note: This script was tested on an M1 Pro chip from Apple. Hope this was helpful!
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI