How to Save Two Camera's Data But Not Influence Their Picture-Acquire Speed

How to save two camera's data but not influence their picture-acquire speed?

The correct approach at recording the video without frame drops is to isolate the two tasks (frame acquisition, and frame serialization) such that they don't influence each other (specifically so that fluctuations in serialization don't eat away time from capturing the frames, which has to happen without delays to prevent frame loss).

This can be achieved by delegating the serialization (encoding of the frames and writing them into a video file) to separate threads, and using some kind of synchronized queue to feed the data to the worker threads.

Following is a simple example showing how this could be done. Since I only have one camera and not the kind you have, I will simply use a webcam and duplicate the frames, but the general principle applies to your scenario as well.


Sample Code

In the beginning we have some includes:

#include <opencv2/opencv.hpp>

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
// ============================================================================
using std::chrono::high_resolution_clock;
using std::chrono::duration_cast;
using std::chrono::microseconds;
// ============================================================================

Synchronized Queue

The first step is to define our synchronized queue, which we will use to communicate with the worker threads that write the video.

The primary functions we need is the ability to:

  • Push new images into a the queue
  • Pop images from the queue, waiting when it's empty.
  • Ability to cancel all pending pops, when we're finished.

We use std::queue to hold the cv::Mat instances, and std::mutex to provide synchronization. A std::condition_variable is used to notify the consumer when image has been inserted into the queue (or the cancellation flag set), and a simple boolean flag is used to notify cancellation.

Finally, we use the empty struct cancelled as an exception thrown from pop(), so we can cleanly terminate the worker by cancelling the queue.

// ============================================================================
class frame_queue
{
public:
struct cancelled {};

public:
frame_queue();

void push(cv::Mat const& image);
cv::Mat pop();

void cancel();

private:
std::queue<cv::Mat> queue_;
std::mutex mutex_;
std::condition_variable cond_;
bool cancelled_;
};
// ----------------------------------------------------------------------------
frame_queue::frame_queue()
: cancelled_(false)
{
}
// ----------------------------------------------------------------------------
void frame_queue::cancel()
{
std::unique_lock<std::mutex> mlock(mutex_);
cancelled_ = true;
cond_.notify_all();
}
// ----------------------------------------------------------------------------
void frame_queue::push(cv::Mat const& image)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(image);
cond_.notify_one();
}
// ----------------------------------------------------------------------------
cv::Mat frame_queue::pop()
{
std::unique_lock<std::mutex> mlock(mutex_);

while (queue_.empty()) {
if (cancelled_) {
throw cancelled();
}
cond_.wait(mlock);
if (cancelled_) {
throw cancelled();
}
}

cv::Mat image(queue_.front());
queue_.pop();
return image;
}
// ============================================================================

Storage Worker

The next step is to define a simple storage_worker, which will be responsible for taking the frames from the synchronized queue, and encode them into a video file until the queue has been cancelled.

I've added simple timing, so we have some idea about how much time is spent encoding the frames, as well as simple logging to console, so we have some idea about what is happening in the program.

// ============================================================================
class storage_worker
{
public:
storage_worker(frame_queue& queue
, int32_t id
, std::string const& file_name
, int32_t fourcc
, double fps
, cv::Size frame_size
, bool is_color = true);

void run();

double total_time_ms() const { return total_time_ / 1000.0; }

private:
frame_queue& queue_;

int32_t id_;

std::string file_name_;
int32_t fourcc_;
double fps_;
cv::Size frame_size_;
bool is_color_;

double total_time_;
};
// ----------------------------------------------------------------------------
storage_worker::storage_worker(frame_queue& queue
, int32_t id
, std::string const& file_name
, int32_t fourcc
, double fps
, cv::Size frame_size
, bool is_color)
: queue_(queue)
, id_(id)
, file_name_(file_name)
, fourcc_(fourcc)
, fps_(fps)
, frame_size_(frame_size)
, is_color_(is_color)
, total_time_(0.0)
{
}
// ----------------------------------------------------------------------------
void storage_worker::run()
{
cv::VideoWriter writer(file_name_, fourcc_, fps_, frame_size_, is_color_);

try {
int32_t frame_count(0);
for (;;) {
cv::Mat image(queue_.pop());
if (!image.empty()) {
high_resolution_clock::time_point t1(high_resolution_clock::now());

++frame_count;
writer.write(image);

high_resolution_clock::time_point t2(high_resolution_clock::now());
double dt_us(static_cast<double>(duration_cast<microseconds>(t2 - t1).count()));
total_time_ += dt_us;

std::cout << "Worker " << id_ << " stored image #" << frame_count
<< " in " << (dt_us / 1000.0) << " ms" << std::endl;
}
}
} catch (frame_queue::cancelled& /*e*/) {
// Nothing more to process, we're done
std::cout << "Queue " << id_ << " cancelled, worker finished." << std::endl;
}
}
// ============================================================================

Processing

Finally, we can put this all together.

We begin by initializing and configuring our video source. Then we create two frame_queue instances, one for each stream of images. We follow this by creating two instances of storage_worker, one for each queue. To keep things interesting, I've set a different codec for each.

Next step is to create and start worker threads, which will execute the run() method of each storage_worker. Having our consumers ready, we can start capturing frames from the camera, and feed them to the frame_queue instances. As mentioned above, I have only single source, so I insert copies of the same frame into both queues.

NB: I need to use the clone() method of cv::Mat to do a deep copy, otherwise I would be inserting references to the single buffer OpenCV VideoCapture uses for performance reasons. That would mean that the worker threads would be getting references to this single image, and there would be no synchronization for access to this shared image buffer. You need to make sure this does not happen in your scenario as well.

Once we have read the appropriate number of frames (you can implement any other kind of stop-condition you desire), we cancel the work queues, and wait for the worker threads to complete.

Finally we write some statistics about the time required for the different tasks.

// ============================================================================
int main()
{
// The video source -- for me this is a webcam, you use your specific camera API instead
// I only have one camera, so I will just duplicate the frames to simulate your scenario
cv::VideoCapture capture(0);

// Let's make it decent sized, since my camera defaults to 640x480
capture.set(CV_CAP_PROP_FRAME_WIDTH, 1920);
capture.set(CV_CAP_PROP_FRAME_HEIGHT, 1080);
capture.set(CV_CAP_PROP_FPS, 20.0);

// And fetch the actual values, so we can create our video correctly
int32_t frame_width(static_cast<int32_t>(capture.get(CV_CAP_PROP_FRAME_WIDTH)));
int32_t frame_height(static_cast<int32_t>(capture.get(CV_CAP_PROP_FRAME_HEIGHT)));
double video_fps(std::max(10.0, capture.get(CV_CAP_PROP_FPS))); // Some default in case it's 0

std::cout << "Capturing images (" << frame_width << "x" << frame_height
<< ") at " << video_fps << " FPS." << std::endl;

// The synchronized queues, one per video source/storage worker pair
std::vector<frame_queue> queue(2);

// Let's create our storage workers -- let's have two, to simulate your scenario
// and to keep it interesting, have each one write a different format
std::vector <storage_worker> storage;
storage.emplace_back(std::ref(queue[0]), 0
, std::string("foo_0.avi")
, CV_FOURCC('I', 'Y', 'U', 'V')
, video_fps
, cv::Size(frame_width, frame_height)
, true);

storage.emplace_back(std::ref(queue[1]), 1
, std::string("foo_1.avi")
, CV_FOURCC('D', 'I', 'V', 'X')
, video_fps
, cv::Size(frame_width, frame_height)
, true);

// And start the worker threads for each storage worker
std::vector<std::thread> storage_thread;
for (auto& s : storage) {
storage_thread.emplace_back(&storage_worker::run, &s);
}

// Now the main capture loop
int32_t const MAX_FRAME_COUNT(10);
double total_read_time(0.0);
int32_t frame_count(0);
for (; frame_count < MAX_FRAME_COUNT; ++frame_count) {
high_resolution_clock::time_point t1(high_resolution_clock::now());

// Try to read a frame
cv::Mat image;
if (!capture.read(image)) {
std::cerr << "Failed to capture image.\n";
break;
}

// Insert a copy into all queues
for (auto& q : queue) {
q.push(image.clone());
}

high_resolution_clock::time_point t2(high_resolution_clock::now());
double dt_us(static_cast<double>(duration_cast<microseconds>(t2 - t1).count()));
total_read_time += dt_us;

std::cout << "Captured image #" << frame_count << " in "
<< (dt_us / 1000.0) << " ms" << std::endl;
}

// We're done reading, cancel all the queues
for (auto& q : queue) {
q.cancel();
}

// And join all the worker threads, waiting for them to finish
for (auto& st : storage_thread) {
st.join();
}

if (frame_count == 0) {
std::cerr << "No frames captured.\n";
return -1;
}

// Report the timings
total_read_time /= 1000.0;
double total_write_time_a(storage[0].total_time_ms());
double total_write_time_b(storage[1].total_time_ms());

std::cout << "Completed processing " << frame_count << " images:\n"
<< " average capture time = " << (total_read_time / frame_count) << " ms\n"
<< " average write time A = " << (total_write_time_a / frame_count) << " ms\n"
<< " average write time B = " << (total_write_time_b / frame_count) << " ms\n";

return 0;
}
// ============================================================================

Console Output

Running this little sample, we get the following log output in the console, as well as the two video files on the disk.

NB: Since this was actually encoding a lot faster than capturing, I've added some wait into the storage_worker to show the separation better.

Capturing images (1920x1080) at 20 FPS.
Captured image #0 in 111.009 ms
Captured image #1 in 67.066 ms
Worker 0 stored image #1 in 94.087 ms
Captured image #2 in 62.059 ms
Worker 1 stored image #1 in 193.186 ms
Captured image #3 in 60.059 ms
Worker 0 stored image #2 in 100.097 ms
Captured image #4 in 78.075 ms
Worker 0 stored image #3 in 87.085 ms
Captured image #5 in 62.061 ms
Worker 0 stored image #4 in 95.092 ms
Worker 1 stored image #2 in 193.187 ms
Captured image #6 in 75.074 ms
Worker 0 stored image #5 in 95.093 ms
Captured image #7 in 63.061 ms
Captured image #8 in 64.061 ms
Worker 0 stored image #6 in 102.098 ms
Worker 1 stored image #3 in 201.195 ms
Captured image #9 in 76.074 ms
Worker 0 stored image #7 in 90.089 ms
Worker 0 stored image #8 in 91.087 ms
Worker 1 stored image #4 in 185.18 ms
Worker 0 stored image #9 in 82.08 ms
Worker 0 stored image #10 in 94.092 ms
Queue 0 cancelled, worker finished.
Worker 1 stored image #5 in 179.174 ms
Worker 1 stored image #6 in 106.102 ms
Worker 1 stored image #7 in 105.104 ms
Worker 1 stored image #8 in 103.101 ms
Worker 1 stored image #9 in 104.102 ms
Worker 1 stored image #10 in 104.1 ms
Queue 1 cancelled, worker finished.
Completed processing 10 images:
average capture time = 71.8599 ms
average write time A = 93.09 ms
average write time B = 147.443 ms
average write time B = 176.673 ms

Possible Improvements

Currently there is no protection against the queue getting too full in the situation when the serialization simply can't keep up with the rate the camera generates new images. Set some upper limit for the queue size, and check in the producer before you push the frame. You will need to decide how exactly you want to handle this situation.

How to capture multiple camera streams with OpenCV?

Sample Image

To capture multiple streams with OpenCV, I recommend using threading which can improve performance by alleviating the heavy I/O operations to a separate thread. Since accessing the webcam/IP/RTSP stream using cv2.VideoCapture().read() is a blocking operation, our main program is stuck until the frame is read from the camera device. If you have multiple streams, this latency will definitely be visible. To remedy this problem, we can use threading to spawn another thread to handle retrieving the frames using a deque in parallel instead of relying on a single thread to obtain the frames in sequential order. Threading allows frames to be continuously read without impacting the performance of our main program. The idea to capture a single stream using threading and OpenCV, is from a previous answer in Python OpenCV multithreading streaming from camera.

But if you want to capture multiple streams, OpenCV alone is not enough. You can use OpenCV in combination with a GUI framework to stitch each image onto a nice display. I will use PyQt4 as the framework, qdarkstyle for GUI CSS, and imutils for OpenCV convenience functions.


Here is a very stripped down version of the camera GUI I currently use without the placeholder images, credential admin login page, and camera switching ability. I've kept the automatic camera reconnect feature incase the internet dies or the camera connection is lost. I only have 8 cameras as shown in the image above, but it is very simple to add in another camera and should not impact performance. This camera GUI currently performs at about ~60 FPS so it is real-time. You can easily rearrange the layout using PyQt layouts so feel free to modify the code! Remember to change the stream links!

from PyQt4 import QtCore, QtGui
import qdarkstyle
from threading import Thread
from collections import deque
from datetime import datetime
import time
import sys
import cv2
import imutils

class CameraWidget(QtGui.QWidget):
"""Independent camera feed
Uses threading to grab IP camera frames in the background

@param width - Width of the video frame
@param height - Height of the video frame
@param stream_link - IP/RTSP/Webcam link
@param aspect_ratio - Whether to maintain frame aspect ratio or force into fraame
"""

def __init__(self, width, height, stream_link=0, aspect_ratio=False, parent=None, deque_size=1):
super(CameraWidget, self).__init__(parent)

# Initialize deque used to store frames read from the stream
self.deque = deque(maxlen=deque_size)

# Slight offset is needed since PyQt layouts have a built in padding
# So add offset to counter the padding
self.offset = 16
self.screen_width = width - self.offset
self.screen_height = height - self.offset
self.maintain_aspect_ratio = aspect_ratio

self.camera_stream_link = stream_link

# Flag to check if camera is valid/working
self.online = False
self.capture = None
self.video_frame = QtGui.QLabel()

self.load_network_stream()

# Start background frame grabbing
self.get_frame_thread = Thread(target=self.get_frame, args=())
self.get_frame_thread.daemon = True
self.get_frame_thread.start()

# Periodically set video frame to display
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.set_frame)
self.timer.start(.5)

print('Started camera: {}'.format(self.camera_stream_link))

def load_network_stream(self):
"""Verifies stream link and open new stream if valid"""

def load_network_stream_thread():
if self.verify_network_stream(self.camera_stream_link):
self.capture = cv2.VideoCapture(self.camera_stream_link)
self.online = True
self.load_stream_thread = Thread(target=load_network_stream_thread, args=())
self.load_stream_thread.daemon = True
self.load_stream_thread.start()

def verify_network_stream(self, link):
"""Attempts to receive a frame from given link"""

cap = cv2.VideoCapture(link)
if not cap.isOpened():
return False
cap.release()
return True

def get_frame(self):
"""Reads frame, resizes, and converts image to pixmap"""

while True:
try:
if self.capture.isOpened() and self.online:
# Read next frame from stream and insert into deque
status, frame = self.capture.read()
if status:
self.deque.append(frame)
else:
self.capture.release()
self.online = False
else:
# Attempt to reconnect
print('attempting to reconnect', self.camera_stream_link)
self.load_network_stream()
self.spin(2)
self.spin(.001)
except AttributeError:
pass

def spin(self, seconds):
"""Pause for set amount of seconds, replaces time.sleep so program doesnt stall"""

time_end = time.time() + seconds
while time.time() < time_end:
QtGui.QApplication.processEvents()

def set_frame(self):
"""Sets pixmap image to video frame"""

if not self.online:
self.spin(1)
return

if self.deque and self.online:
# Grab latest frame
frame = self.deque[-1]

# Keep frame aspect ratio
if self.maintain_aspect_ratio:
self.frame = imutils.resize(frame, width=self.screen_width)
# Force resize
else:
self.frame = cv2.resize(frame, (self.screen_width, self.screen_height))

# Add timestamp to cameras
cv2.rectangle(self.frame, (self.screen_width-190,0), (self.screen_width,50), color=(0,0,0), thickness=-1)
cv2.putText(self.frame, datetime.now().strftime('%H:%M:%S'), (self.screen_width-185,37), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255,255,255), lineType=cv2.LINE_AA)

# Convert to pixmap and set to video frame
self.img = QtGui.QImage(self.frame, self.frame.shape[1], self.frame.shape[0], QtGui.QImage.Format_RGB888).rgbSwapped()
self.pix = QtGui.QPixmap.fromImage(self.img)
self.video_frame.setPixmap(self.pix)

def get_video_frame(self):
return self.video_frame

def exit_application():
"""Exit program event handler"""

sys.exit(1)

if __name__ == '__main__':

# Create main application window
app = QtGui.QApplication([])
app.setStyleSheet(qdarkstyle.load_stylesheet_pyqt())
app.setStyle(QtGui.QStyleFactory.create("Cleanlooks"))
mw = QtGui.QMainWindow()
mw.setWindowTitle('Camera GUI')
mw.setWindowFlags(QtCore.Qt.FramelessWindowHint)

cw = QtGui.QWidget()
ml = QtGui.QGridLayout()
cw.setLayout(ml)
mw.setCentralWidget(cw)
mw.showMaximized()

# Dynamically determine screen width/height
screen_width = QtGui.QApplication.desktop().screenGeometry().width()
screen_height = QtGui.QApplication.desktop().screenGeometry().height()

# Create Camera Widgets
username = 'Your camera username!'
password = 'Your camera password!'

# Stream links
camera0 = 'rtsp://{}:{}@192.168.1.43:554/cam/realmonitor?channel=1&subtype=0'.format(username, password)
camera1 = 'rtsp://{}:{}@192.168.1.45/axis-media/media.amp'.format(username, password)
camera2 = 'rtsp://{}:{}@192.168.1.47:554/cam/realmonitor?channel=1&subtype=0'.format(username, password)
camera3 = 'rtsp://{}:{}@192.168.1.40:554/cam/realmonitor?channel=1&subtype=0'.format(username, password)
camera4 = 'rtsp://{}:{}@192.168.1.44:554/cam/realmonitor?channel=1&subtype=0'.format(username, password)
camera5 = 'rtsp://{}:{}@192.168.1.42:554/cam/realmonitor?channel=1&subtype=0'.format(username, password)
camera6 = 'rtsp://{}:{}@192.168.1.46:554/cam/realmonitor?channel=1&subtype=0'.format(username, password)
camera7 = 'rtsp://{}:{}@192.168.1.41:554/cam/realmonitor?channel=1&subtype=0'.format(username, password)

# Create camera widgets
print('Creating Camera Widgets...')
zero = CameraWidget(screen_width//3, screen_height//3, camera0)
one = CameraWidget(screen_width//3, screen_height//3, camera1)
two = CameraWidget(screen_width//3, screen_height//3, camera2)
three = CameraWidget(screen_width//3, screen_height//3, camera3)
four = CameraWidget(screen_width//3, screen_height//3, camera4)
five = CameraWidget(screen_width//3, screen_height//3, camera5)
six = CameraWidget(screen_width//3, screen_height//3, camera6)
seven = CameraWidget(screen_width//3, screen_height//3, camera7)

# Add widgets to layout
print('Adding widgets to layout...')
ml.addWidget(zero.get_video_frame(),0,0,1,1)
ml.addWidget(one.get_video_frame(),0,1,1,1)
ml.addWidget(two.get_video_frame(),0,2,1,1)
ml.addWidget(three.get_video_frame(),1,0,1,1)
ml.addWidget(four.get_video_frame(),1,1,1,1)
ml.addWidget(five.get_video_frame(),1,2,1,1)
ml.addWidget(six.get_video_frame(),2,0,1,1)
ml.addWidget(seven.get_video_frame(),2,1,1,1)

print('Verifying camera credentials...')

mw.show()

QtGui.QShortcut(QtGui.QKeySequence('Ctrl+Q'), mw, exit_application)

if(sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()

Related camera/IP/RTSP, FPS, video, threading, and multiprocessing posts

  1. Python OpenCV streaming from camera - multithreading, timestamps

  2. Video Streaming from IP Camera in Python Using OpenCV cv2.VideoCapture

  3. How to capture multiple camera streams with OpenCV?

  4. OpenCV real time streaming video capture is slow. How to drop frames or get synced with real time?

  5. Storing RTSP stream as video file with OpenCV VideoWriter

  6. OpenCV video saving

  7. Python OpenCV multiprocessing cv2.VideoCapture mp4



Related Topics



Leave a reply



Submit