Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Read by thought-leaders and decision-makers around the world. Phone Number: +1-650-246-9381 Email: [email protected]
228 Park Avenue South New York, NY 10003 United States
Website: Publisher: https://towardsai.net/#publisher Diversity Policy: https://towardsai.net/about Ethics Policy: https://towardsai.net/about Masthead: https://towardsai.net/about
Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Founders: Roberto Iriondo, , Job Title: Co-founder and Advisor Works for: Towards AI, Inc. Follow Roberto: X, LinkedIn, GitHub, Google Scholar, Towards AI Profile, Medium, ML@CMU, FreeCodeCamp, Crunchbase, Bloomberg, Roberto Iriondo, Generative AI Lab, Generative AI Lab Denis Piffaretti, Job Title: Co-founder Works for: Towards AI, Inc. Louie Peters, Job Title: Co-founder Works for: Towards AI, Inc. Louis-François Bouchard, Job Title: Co-founder Works for: Towards AI, Inc. Cover:
Towards AI Cover
Logo:
Towards AI Logo
Areas Served: Worldwide Alternate Name: Towards AI, Inc. Alternate Name: Towards AI Co. Alternate Name: towards ai Alternate Name: towardsai Alternate Name: towards.ai Alternate Name: tai Alternate Name: toward ai Alternate Name: toward.ai Alternate Name: Towards AI, Inc. Alternate Name: towardsai.net Alternate Name: pub.towardsai.net
5 stars – based on 497 reviews

Frequently Used, Contextual References

TODO: Remember to copy unique IDs whenever it needs used. i.e., URL: 304b2e42315e

Resources

Take our 85+ lesson From Beginner to Advanced LLM Developer Certification: From choosing a project to deploying a working product this is the most comprehensive and practical LLM course out there!

Publication

Efficient Camera Stream With Python
Computer Vision   Latest   Machine Learning

Efficient Camera Stream With Python

Last Updated on August 24, 2023 by Editorial Team

Author(s): Argo Saakyan

Originally published on Towards AI.

Photo by Rahul Chakraborty on Unsplash

Let’s talk about using webcams with Python. I had a simple task of reading frames from the camera and running a neural net on each frame. With one specific webcam, I was having issues with setting up targeted fps (as I now understand β€” because the camera could run 30 fps with mjpeg format, but not raw), so I decided to dig into FFmpeg to see if it helps.

I ended up getting both OpenCV and FFmpeg working, but I found out a very interesting thing: FFmpeg performance was superior to OpenCV is my main use case. In fact, with FFmpeg, I had a 15x speedup for reading the frame and a 32% speedup for the whole pipeline. I could not believe the results and rechecked everything several times, but they were consistent.

Note: performance was exactly the same, when I just read frame after frame, but FFmpeg was faster when I ran something after reading the frame (which takes time). I’ll show exactly what I mean below.

Now, let’s take a look at the code. Firstly β€” class for reading webcam frames with OpenCV:

class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()

def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap

def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame

def release(self):
self.cap.release()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False

I use wait_for_cam function, as cameras often need time 'warm up'. Same warmup is used with FFmpeg class:

class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()

def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")

command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]

if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")

return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)

def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image

def release(self):
self.pipe.terminate()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False

For timing run function, I used decorator:

def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result

return wrapper

As a heavy synthetic task, in place of a neural net, I used this simple function (it also could be just time.sleep). This is a very important part, as without any task, reading speeds are the same for both OpenCV and FFmpeg:

def computation_task():
for _ in range(5000000):
9999 * 9999

Now function with a cycle where I read the frame, the time it, run computation_task:

@timeit
def run(cam: VideoStreamCV U+007C VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)

if run_task:
computation_task()

cam.release()
return round(np.mean(timer), 4)

And finally main function where I set up a couple of parameters, init 2 video streams with OpenCV and FFmpeg, and run them without computation_task and with it.

def main():
fsp = 30
resolution = (1920, 1080)

for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)

print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")

And here is what I get:

FFMPEG, task False:
Main function time: 3.2334s
Mean frame read time: 0.0323s

CV2, task False:
Main function time: 3.3934s
Mean frame read time: 0.0332s

FFMPEG, task True:
Main function time: 4.461s
Mean frame read time: 0.0014s

CV2, task True:
Main function time: 6.6833s
Mean frame read time: 0.023s

So, without a synthetic task, I get the same reading time: 0.0323, 0.0332. But with synthetic task: 0.0014 and 0.023, so FFmpeg is significantly faster. The beauty is that I got a real speedup with my neural net application, not only with synthetic tests, so I decided to share the results.

Here is a graph that shows how much time it takes for 1 iteration: read the frame, process it with a yolov8s model (on CPU), and save frames with detected objects:

Here is a full script with synthetic tests:

import platform
import subprocess
import time
from typing import Tuple
import cv2
import numpy as np


class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()

def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")

command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]

if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")

return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)

def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image

def release(self):
self.pipe.terminate()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False


class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()

def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap

def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame

def release(self):
self.cap.release()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False


def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result

return wrapper


def computation_task():
for _ in range(5000000):
9999 * 9999


@timeit
def run(cam: VideoStreamCV U+007C VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)

if run_task:
computation_task()

cam.release()
return round(np.mean(timer), 4)


def main():
fsp = 30
resolution = (1920, 1080)

for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)

print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")


if __name__ == "__main__":
main()

Note: This script was tested on an M1 Pro chip from Apple. Hope this was helpful!

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.

Published via Towards AI

Feedback ↓