Towards AI Can Help your Team Adopt AI: Corporate Training, Consulting, and Talent Solutions.

Publication

Efficient Camera Stream With Python
Computer Vision   Latest   Machine Learning

Efficient Camera Stream With Python

Last Updated on August 24, 2023 by Editorial Team

Author(s): Argo Saakyan

Originally published on Towards AI.

Photo by Rahul Chakraborty on Unsplash

Let’s talk about using webcams with Python. I had a simple task of reading frames from the camera and running a neural net on each frame. With one specific webcam, I was having issues with setting up targeted fps (as I now understand — because the camera could run 30 fps with mjpeg format, but not raw), so I decided to dig into FFmpeg to see if it helps.

I ended up getting both OpenCV and FFmpeg working, but I found out a very interesting thing: FFmpeg performance was superior to OpenCV is my main use case. In fact, with FFmpeg, I had a 15x speedup for reading the frame and a 32% speedup for the whole pipeline. I could not believe the results and rechecked everything several times, but they were consistent.

Note: performance was exactly the same, when I just read frame after frame, but FFmpeg was faster when I ran something after reading the frame (which takes time). I’ll show exactly what I mean below.

Now, let’s take a look at the code. Firstly — class for reading webcam frames with OpenCV:

class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()

def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap

def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame

def release(self):
self.cap.release()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False

I use wait_for_cam function, as cameras often need time 'warm up'. Same warmup is used with FFmpeg class:

class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()

def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")

command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]

if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")

return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)

def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image

def release(self):
self.pipe.terminate()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False

For timing run function, I used decorator:

def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result

return wrapper

As a heavy synthetic task, in place of a neural net, I used this simple function (it also could be just time.sleep). This is a very important part, as without any task, reading speeds are the same for both OpenCV and FFmpeg:

def computation_task():
for _ in range(5000000):
9999 * 9999

Now function with a cycle where I read the frame, the time it, run computation_task:

@timeit
def run(cam: VideoStreamCV U+007C VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)

if run_task:
computation_task()

cam.release()
return round(np.mean(timer), 4)

And finally main function where I set up a couple of parameters, init 2 video streams with OpenCV and FFmpeg, and run them without computation_task and with it.

def main():
fsp = 30
resolution = (1920, 1080)

for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)

print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")

And here is what I get:

FFMPEG, task False:
Main function time: 3.2334s
Mean frame read time: 0.0323s

CV2, task False:
Main function time: 3.3934s
Mean frame read time: 0.0332s

FFMPEG, task True:
Main function time: 4.461s
Mean frame read time: 0.0014s

CV2, task True:
Main function time: 6.6833s
Mean frame read time: 0.023s

So, without a synthetic task, I get the same reading time: 0.0323, 0.0332. But with synthetic task: 0.0014 and 0.023, so FFmpeg is significantly faster. The beauty is that I got a real speedup with my neural net application, not only with synthetic tests, so I decided to share the results.

Here is a graph that shows how much time it takes for 1 iteration: read the frame, process it with a yolov8s model (on CPU), and save frames with detected objects:

Here is a full script with synthetic tests:

import platform
import subprocess
import time
from typing import Tuple
import cv2
import numpy as np


class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()

def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")

command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]

if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")

return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)

def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image

def release(self):
self.pipe.terminate()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False


class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()

def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap

def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame

def release(self):
self.cap.release()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False


def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result

return wrapper


def computation_task():
for _ in range(5000000):
9999 * 9999


@timeit
def run(cam: VideoStreamCV U+007C VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)

if run_task:
computation_task()

cam.release()
return round(np.mean(timer), 4)


def main():
fsp = 30
resolution = (1920, 1080)

for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)

print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")


if __name__ == "__main__":
main()

Note: This script was tested on an M1 Pro chip from Apple. Hope this was helpful!

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓