Initial commit: Vigilar DIY home security system

Phase 1 (Foundation): project skeleton, TOML config + Pydantic validation,
MQTT bus wrapper, SQLite schema (9 tables), Click CLI, process supervisor.

Phase 2 (Camera): RTSP capture via OpenCV, MOG2 motion detection with
configurable sensitivity/zones, adaptive FPS recording (2fps idle/30fps
motion) via FFmpeg subprocess, HLS live streaming, pre-motion ring buffer.

Phase 3 (Web UI): Flask + Bootstrap 5 dark theme, 6 blueprints, Jinja2
templates (dashboard, kiosk 2x2 grid, events, sensors, recordings, settings),
PWA with service worker + Web Push, full admin settings UI with config
persistence.

Remote Access: WireGuard tunnel configs, nginx reverse proxy with HLS
caching + rate limiting, bandwidth-optimized remote HLS stream (426x240
@ 500kbps), DO droplet setup script, certbot TLS.

29 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-04-02 23:11:27 -04:00
commit 845a85d618
69 changed files with 7061 additions and 0 deletions

View File

245
vigilar/camera/hls.py Normal file
View File

@@ -0,0 +1,245 @@
"""HLS segment generator using FFmpeg.
Produces an HLS live stream per camera by running an FFmpeg process
that reads frames from a pipe and outputs .m3u8 + .ts segment files.
The web UI serves these files for low-latency, bandwidth-efficient
multi-camera viewing via hls.js.
"""
import logging
import shutil
import subprocess
import time
from pathlib import Path
import cv2
import numpy as np
log = logging.getLogger(__name__)
class HLSStreamer:
"""Generates HLS segments from raw video frames via FFmpeg."""
def __init__(
self,
camera_id: str,
hls_dir: str,
fps: int = 15,
resolution: tuple[int, int] = (640, 360),
segment_duration: int = 2,
list_size: int = 5,
):
"""
Args:
camera_id: Camera identifier for directory naming.
hls_dir: Root directory for HLS output.
fps: Frame rate for the HLS stream (lower than capture for bandwidth).
resolution: Output resolution for HLS segments.
segment_duration: Duration of each .ts segment in seconds.
list_size: Number of segments to keep in the playlist.
"""
self._camera_id = camera_id
self._output_dir = Path(hls_dir) / camera_id
self._fps = fps
self._resolution = resolution
self._segment_duration = segment_duration
self._list_size = list_size
self._ffmpeg_path = shutil.which("ffmpeg") or "ffmpeg"
self._process: subprocess.Popen | None = None
self._running = False
@property
def playlist_path(self) -> Path:
"""Path to the HLS playlist file."""
return self._output_dir / "stream.m3u8"
def start(self) -> None:
"""Start the FFmpeg HLS encoding process."""
self._output_dir.mkdir(parents=True, exist_ok=True)
# Clean stale segments
for f in self._output_dir.glob("*.ts"):
f.unlink()
if self.playlist_path.exists():
self.playlist_path.unlink()
w, h = self._resolution
cmd = [
self._ffmpeg_path,
"-y",
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{w}x{h}",
"-r", str(self._fps),
"-i", "pipe:0",
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-g", str(self._fps * self._segment_duration), # keyframe interval
"-sc_threshold", "0",
"-pix_fmt", "yuv420p",
"-f", "hls",
"-hls_time", str(self._segment_duration),
"-hls_list_size", str(self._list_size),
"-hls_flags", "delete_segments+append_list",
"-hls_segment_filename", str(self._output_dir / "seg_%03d.ts"),
str(self.playlist_path),
]
try:
self._process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
self._running = True
log.info("HLS streamer started for %s (%dx%d @ %d fps)", self._camera_id, w, h, self._fps)
except FileNotFoundError:
log.error("FFmpeg not found — HLS streaming disabled for %s", self._camera_id)
self._process = None
self._running = False
def write_frame(self, frame: np.ndarray) -> None:
"""Write a frame to the HLS encoder.
Frame should be at capture resolution — it will be downscaled
to the HLS output resolution.
"""
if not self._running or not self._process or not self._process.stdin:
return
try:
h, w = frame.shape[:2]
rw, rh = self._resolution
if w != rw or h != rh:
frame = cv2.resize(frame, (rw, rh), interpolation=cv2.INTER_AREA)
self._process.stdin.write(frame.tobytes())
except BrokenPipeError:
log.warning("HLS FFmpeg pipe broken for %s", self._camera_id)
self._running = False
def stop(self) -> None:
"""Stop the HLS encoding process."""
if not self._process:
return
try:
if self._process.stdin:
self._process.stdin.close()
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait()
self._running = False
self._process = None
log.info("HLS streamer stopped for %s", self._camera_id)
def cleanup(self) -> None:
"""Remove all HLS segments and playlist."""
self.stop()
if self._output_dir.exists():
for f in self._output_dir.iterdir():
f.unlink()
@property
def is_running(self) -> bool:
return self._running
class RemoteHLSStreamer(HLSStreamer):
"""Lower-quality HLS stream optimized for remote access over limited uplink.
Produces a separate stream at reduced resolution/fps/bitrate that gets
served through the WireGuard tunnel → nginx reverse proxy path.
This keeps the home upload bandwidth under control.
At 426x240 @ 10fps @ 500kbps: ~0.5 Mbps per camera.
4 cameras = ~2 Mbps, well within 22 Mbps uplink even with 4 viewers.
"""
def __init__(
self,
camera_id: str,
hls_dir: str,
fps: int = 10,
resolution: tuple[int, int] = (426, 240),
bitrate_kbps: int = 500,
segment_duration: int = 2,
list_size: int = 5,
):
# Remote streams go in a /remote/ subdirectory
super().__init__(
camera_id=camera_id,
hls_dir=hls_dir,
fps=fps,
resolution=resolution,
segment_duration=segment_duration,
list_size=list_size,
)
self._bitrate_kbps = bitrate_kbps
# Override output dir to be under /remote/ subdirectory
self._output_dir = Path(hls_dir) / camera_id / "remote"
@property
def playlist_path(self) -> Path:
return self._output_dir / "stream.m3u8"
def start(self) -> None:
"""Start FFmpeg with constrained bitrate for remote streaming."""
self._output_dir.mkdir(parents=True, exist_ok=True)
for f in self._output_dir.glob("*.ts"):
f.unlink()
if self.playlist_path.exists():
self.playlist_path.unlink()
w, h = self._resolution
bitrate = f"{self._bitrate_kbps}k"
cmd = [
self._ffmpeg_path,
"-y",
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{w}x{h}",
"-r", str(self._fps),
"-i", "pipe:0",
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-b:v", bitrate,
"-maxrate", bitrate,
"-bufsize", f"{self._bitrate_kbps * 2}k",
"-g", str(self._fps * self._segment_duration),
"-sc_threshold", "0",
"-pix_fmt", "yuv420p",
"-f", "hls",
"-hls_time", str(self._segment_duration),
"-hls_list_size", str(self._list_size),
"-hls_flags", "delete_segments+append_list",
"-hls_segment_filename", str(self._output_dir / "seg_%03d.ts"),
str(self.playlist_path),
]
try:
self._process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
self._running = True
log.info(
"Remote HLS started for %s (%dx%d @ %d fps, %s)",
self._camera_id, w, h, self._fps, bitrate,
)
except FileNotFoundError:
log.error("FFmpeg not found — remote HLS disabled for %s", self._camera_id)
self._process = None
self._running = False

117
vigilar/camera/manager.py Normal file
View File

@@ -0,0 +1,117 @@
"""Camera manager — spawns and supervises per-camera worker processes."""
import logging
import time
from multiprocessing import Process
from vigilar.camera.worker import run_camera_worker
from vigilar.config import VigilarConfig
log = logging.getLogger(__name__)
class CameraWorkerHandle:
"""Handle for a single camera worker process."""
def __init__(self, camera_id: str, process: Process):
self.camera_id = camera_id
self.process = process
self.restart_count = 0
self.last_restart: float = 0
@property
def is_alive(self) -> bool:
return self.process.is_alive()
class CameraManager:
"""Manages all camera worker processes."""
def __init__(self, config: VigilarConfig):
self._config = config
self._workers: dict[str, CameraWorkerHandle] = {}
def start(self) -> None:
"""Start a worker process for each enabled camera."""
enabled = [c for c in self._config.cameras if c.enabled]
log.info("Starting %d camera workers", len(enabled))
for cam_cfg in enabled:
self._start_worker(cam_cfg.id)
def _start_worker(self, camera_id: str) -> None:
"""Start a single camera worker process."""
cam_cfg = next((c for c in self._config.cameras if c.id == camera_id), None)
if not cam_cfg:
log.error("Camera config not found: %s", camera_id)
return
process = Process(
target=run_camera_worker,
args=(
cam_cfg,
self._config.mqtt,
self._config.system.recordings_dir,
self._config.system.hls_dir,
self._config.remote if self._config.remote.enabled else None,
),
name=f"camera-{camera_id}",
daemon=True,
)
process.start()
self._workers[camera_id] = CameraWorkerHandle(camera_id, process)
log.info("Camera worker started: %s (pid=%d)", camera_id, process.pid)
def check_and_restart(self) -> None:
"""Check for crashed workers and restart them with backoff."""
for camera_id, handle in list(self._workers.items()):
if handle.is_alive:
continue
if handle.restart_count >= 10:
log.error("Camera %s exceeded max restarts, giving up", camera_id)
continue
backoff = min(2 ** handle.restart_count, 60)
elapsed = time.monotonic() - handle.last_restart
if elapsed < backoff:
continue
handle.restart_count += 1
handle.last_restart = time.monotonic()
log.warning("Restarting camera worker %s (attempt %d)", camera_id, handle.restart_count)
self._start_worker(camera_id)
def stop(self) -> None:
"""Stop all camera worker processes."""
log.info("Stopping %d camera workers", len(self._workers))
for handle in self._workers.values():
if handle.process.is_alive():
handle.process.terminate()
for handle in self._workers.values():
handle.process.join(timeout=5)
if handle.process.is_alive():
handle.process.kill()
handle.process.join(timeout=2)
self._workers.clear()
@property
def worker_count(self) -> int:
return len(self._workers)
@property
def alive_count(self) -> int:
return sum(1 for h in self._workers.values() if h.is_alive)
def get_status(self) -> dict:
"""Get status of all camera workers."""
return {
cid: {
"alive": h.is_alive,
"pid": h.process.pid if h.process else None,
"restart_count": h.restart_count,
}
for cid, h in self._workers.items()
}

147
vigilar/camera/motion.py Normal file
View File

@@ -0,0 +1,147 @@
"""Motion detection using OpenCV MOG2 background subtractor.
Supports configurable sensitivity, minimum contour area,
and rectangular zone masking.
"""
import logging
import cv2
import numpy as np
log = logging.getLogger(__name__)
class MotionDetector:
"""MOG2-based motion detector with zone masking."""
def __init__(
self,
sensitivity: float = 0.7,
min_area_px: int = 500,
zones: list[list[int]] | None = None,
resolution: tuple[int, int] = (640, 360),
):
"""
Args:
sensitivity: 0.0 (off) to 1.0 (maximum sensitivity).
Maps to MOG2 varThreshold inversely: high sensitivity = low threshold.
min_area_px: Minimum contour area (in pixels) to count as motion.
zones: List of [x, y, w, h] rectangles to monitor. Empty = whole frame.
resolution: Resolution to downscale frames to for detection.
"""
self._sensitivity = sensitivity
self._min_area = min_area_px
self._zones = zones or []
self._resolution = resolution
# MOG2 threshold: sensitivity 1.0 → threshold 8, sensitivity 0.0 → threshold 128
var_threshold = int(128 - (sensitivity * 120))
self._subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=var_threshold,
detectShadows=True,
)
# Shadow detection value (127) gets eliminated by threshold
self._kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
self._zone_mask: np.ndarray | None = None
self._motion_active = False
self._warmup_frames = 60 # frames before detection is reliable
self._frame_count = 0
def _build_zone_mask(self, height: int, width: int) -> np.ndarray:
"""Build a binary mask from configured zones."""
if not self._zones:
return np.ones((height, width), dtype=np.uint8) * 255
mask = np.zeros((height, width), dtype=np.uint8)
for zone in self._zones:
if len(zone) == 4:
x, y, w, h = zone
# Scale zone coordinates to detection resolution
sx = width / 1920 # assume zones defined relative to 1920 wide
sy = height / 1080
x, y, w, h = int(x * sx), int(y * sy), int(w * sx), int(h * sy)
cv2.rectangle(mask, (x, y), (x + w, y + h), 255, -1)
return mask
def detect(self, frame: np.ndarray) -> tuple[bool, list[tuple[int, int, int, int]], float]:
"""Run motion detection on a frame.
Args:
frame: BGR frame from camera (any resolution — will be downscaled).
Returns:
(motion_detected, contour_rects, confidence)
- motion_detected: True if significant motion found
- contour_rects: list of (x, y, w, h) bounding boxes in detection coords
- confidence: 0.0-1.0 score based on motion area relative to frame
"""
self._frame_count += 1
# Downscale for detection
det_w, det_h = self._resolution
small = cv2.resize(frame, (det_w, det_h), interpolation=cv2.INTER_AREA)
# Build zone mask lazily
if self._zone_mask is None:
self._zone_mask = self._build_zone_mask(det_h, det_w)
# Apply MOG2
fg_mask = self._subtractor.apply(small)
# Remove shadows (value 127) and noise
_, fg_mask = cv2.threshold(fg_mask, 200, 255, cv2.THRESH_BINARY)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self._kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self._kernel)
# Apply zone mask
fg_mask = cv2.bitwise_and(fg_mask, self._zone_mask)
# Skip during warmup (MOG2 needs time to build background model)
if self._frame_count < self._warmup_frames:
return False, [], 0.0
# Find contours
contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
rects = []
total_area = 0
for contour in contours:
area = cv2.contourArea(contour)
if area >= self._min_area:
rects.append(cv2.boundingRect(contour))
total_area += area
motion_detected = len(rects) > 0
frame_area = det_w * det_h
confidence = min(total_area / frame_area, 1.0) if frame_area > 0 else 0.0
self._motion_active = motion_detected
return motion_detected, rects, confidence
@property
def is_motion_active(self) -> bool:
return self._motion_active
def update_sensitivity(self, sensitivity: float) -> None:
"""Update detection sensitivity at runtime."""
self._sensitivity = max(0.0, min(1.0, sensitivity))
var_threshold = int(128 - (self._sensitivity * 120))
self._subtractor.setVarThreshold(var_threshold)
log.info("Motion sensitivity updated to %.2f (threshold=%d)", sensitivity, var_threshold)
def update_min_area(self, min_area_px: int) -> None:
"""Update minimum contour area at runtime."""
self._min_area = max(0, min_area_px)
def reset(self) -> None:
"""Reset the background model (e.g., after scene change)."""
var_threshold = int(128 - (self._sensitivity * 120))
self._subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=var_threshold,
detectShadows=True,
)
self._frame_count = 0
self._zone_mask = None

228
vigilar/camera/recorder.py Normal file
View File

@@ -0,0 +1,228 @@
"""Adaptive FPS recorder using FFmpeg subprocess.
Two recording modes:
1. Idle recording: writes frames at 2 FPS (every Nth frame skipped)
2. Motion recording: writes all frames at full 30 FPS, including
pre-motion buffer frames flushed from the ring buffer.
Uses FFmpeg via stdin pipe for frame-accurate control over what
gets written, while letting FFmpeg handle H.264 encoding efficiently.
"""
import logging
import os
import shutil
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import cv2
import numpy as np
from vigilar.camera.ring_buffer import TimestampedFrame
log = logging.getLogger(__name__)
@dataclass
class RecordingSegment:
"""Metadata about a completed recording segment."""
file_path: str
started_at: int # unix ms
ended_at: int # unix ms
duration_s: float
file_size: int
trigger: str # MOTION, CONTINUOUS, MANUAL
fps: int
frame_count: int
class AdaptiveRecorder:
"""Records video segments at adaptive FPS using FFmpeg.
Writes raw frames to FFmpeg via stdin pipe. FFmpeg encodes to H.264 MP4.
This gives us frame-accurate control over which frames get recorded
while keeping CPU usage low via hardware-accelerated encoding when available.
"""
def __init__(
self,
camera_id: str,
recordings_dir: str,
idle_fps: int = 2,
motion_fps: int = 30,
resolution: tuple[int, int] = (1920, 1080),
):
self._camera_id = camera_id
self._recordings_dir = Path(recordings_dir)
self._idle_fps = idle_fps
self._motion_fps = motion_fps
self._resolution = resolution
self._ffmpeg_path = shutil.which("ffmpeg") or "ffmpeg"
self._process: subprocess.Popen | None = None
self._current_path: Path | None = None
self._current_fps: int = idle_fps
self._current_trigger: str = "CONTINUOUS"
self._started_at: int = 0
self._frame_count: int = 0
self._recording = False
def _ensure_dir(self) -> Path:
"""Create date-based subdirectory for recordings."""
now = time.localtime()
subdir = self._recordings_dir / self._camera_id / f"{now.tm_year:04d}" / f"{now.tm_mon:02d}" / f"{now.tm_mday:02d}"
subdir.mkdir(parents=True, exist_ok=True)
return subdir
def _start_ffmpeg(self, fps: int) -> None:
"""Start an FFmpeg process for recording at the given FPS."""
subdir = self._ensure_dir()
timestamp = time.strftime("%H%M%S")
filename = f"{self._camera_id}_{timestamp}_{fps}fps.mp4"
self._current_path = subdir / filename
self._current_fps = fps
w, h = self._resolution
cmd = [
self._ffmpeg_path,
"-y",
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{w}x{h}",
"-r", str(fps),
"-i", "pipe:0",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
"-pix_fmt", "yuv420p",
"-movflags", "+faststart",
str(self._current_path),
]
try:
self._process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
self._started_at = int(time.time() * 1000)
self._frame_count = 0
self._recording = True
log.info("Recording started: %s at %d FPS", self._current_path.name, fps)
except FileNotFoundError:
log.error("FFmpeg not found at %s — recording disabled", self._ffmpeg_path)
self._process = None
self._recording = False
def _stop_ffmpeg(self) -> RecordingSegment | None:
"""Stop the current FFmpeg process and return segment metadata."""
if not self._process or not self._recording:
return None
try:
self._process.stdin.close()
self._process.wait(timeout=10)
except (subprocess.TimeoutExpired, BrokenPipeError):
self._process.kill()
self._process.wait()
self._recording = False
ended_at = int(time.time() * 1000)
duration = (ended_at - self._started_at) / 1000.0
file_size = 0
if self._current_path and self._current_path.exists():
file_size = self._current_path.stat().st_size
segment = RecordingSegment(
file_path=str(self._current_path) if self._current_path else "",
started_at=self._started_at,
ended_at=ended_at,
duration_s=duration,
file_size=file_size,
trigger=self._current_trigger,
fps=self._current_fps,
frame_count=self._frame_count,
)
log.info(
"Recording stopped: %s (%.1fs, %d frames, %d bytes)",
self._current_path.name if self._current_path else "unknown",
duration, self._frame_count, file_size,
)
self._process = None
self._current_path = None
return segment
def write_frame(self, frame: np.ndarray) -> None:
"""Write a single frame to the active recording."""
if not self._recording or not self._process or not self._process.stdin:
return
try:
# Resize to recording resolution if needed
h, w = frame.shape[:2]
rw, rh = self._resolution
if w != rw or h != rh:
frame = cv2.resize(frame, (rw, rh))
self._process.stdin.write(frame.tobytes())
self._frame_count += 1
except BrokenPipeError:
log.warning("FFmpeg pipe broken for %s", self._camera_id)
self._recording = False
def start_idle_recording(self) -> None:
"""Start or switch to idle (low FPS) recording."""
if self._recording and self._current_fps == self._idle_fps:
return # already recording at idle FPS
if self._recording:
self._stop_ffmpeg()
self._current_trigger = "CONTINUOUS"
self._start_ffmpeg(self._idle_fps)
def start_motion_recording(self, pre_buffer: list[TimestampedFrame] | None = None) -> None:
"""Start motion recording at full FPS.
Optionally flush pre-motion buffer frames into the new segment.
"""
if self._recording:
self._stop_ffmpeg()
self._current_trigger = "MOTION"
self._start_ffmpeg(self._motion_fps)
# Write pre-buffer frames
if pre_buffer:
for tsf in pre_buffer:
self.write_frame(tsf.frame)
log.debug("Flushed %d pre-buffer frames for %s", len(pre_buffer), self._camera_id)
def stop_recording(self) -> RecordingSegment | None:
"""Stop the current recording and return segment info."""
return self._stop_ffmpeg()
def start_manual_recording(self) -> None:
"""Start a manually-triggered recording at motion FPS."""
if self._recording:
self._stop_ffmpeg()
self._current_trigger = "MANUAL"
self._start_ffmpeg(self._motion_fps)
@property
def is_recording(self) -> bool:
return self._recording
@property
def current_fps(self) -> int:
return self._current_fps if self._recording else 0
@property
def current_trigger(self) -> str:
return self._current_trigger

View File

@@ -0,0 +1,85 @@
"""Thread-safe ring buffer for pre-motion frame storage.
Keeps a rolling window of frames so that when motion is detected,
we can include the seconds leading up to the trigger in the recording.
"""
import threading
import time
from collections import deque
from dataclasses import dataclass, field
import numpy as np
@dataclass
class TimestampedFrame:
"""A frame with its capture timestamp."""
frame: np.ndarray
timestamp: float # time.monotonic()
frame_number: int = 0
class RingBuffer:
"""Fixed-duration ring buffer for video frames.
Stores the most recent `duration_s * max_fps` frames, automatically
discarding the oldest when full. Thread-safe for single-producer,
single-consumer use (camera worker writes, recorder reads).
"""
def __init__(self, duration_s: int = 5, max_fps: int = 30):
self._max_frames = duration_s * max_fps
self._buffer: deque[TimestampedFrame] = deque(maxlen=self._max_frames)
self._lock = threading.Lock()
self._frame_count = 0
def push(self, frame: np.ndarray) -> None:
"""Add a frame to the buffer."""
with self._lock:
self._frame_count += 1
self._buffer.append(TimestampedFrame(
frame=frame,
timestamp=time.monotonic(),
frame_number=self._frame_count,
))
def flush(self) -> list[TimestampedFrame]:
"""Drain all frames from the buffer and return them in order.
Used when motion is detected to get the pre-motion context.
The buffer is cleared after flush.
"""
with self._lock:
frames = list(self._buffer)
self._buffer.clear()
return frames
def peek_latest(self) -> TimestampedFrame | None:
"""Get the most recent frame without removing it."""
with self._lock:
return self._buffer[-1] if self._buffer else None
@property
def count(self) -> int:
"""Current number of frames in the buffer."""
with self._lock:
return len(self._buffer)
@property
def capacity(self) -> int:
"""Maximum number of frames the buffer can hold."""
return self._max_frames
@property
def duration_s(self) -> float:
"""Actual duration of content in the buffer, in seconds."""
with self._lock:
if len(self._buffer) < 2:
return 0.0
return self._buffer[-1].timestamp - self._buffer[0].timestamp
def clear(self) -> None:
"""Discard all frames."""
with self._lock:
self._buffer.clear()

267
vigilar/camera/worker.py Normal file
View File

@@ -0,0 +1,267 @@
"""Per-camera worker process.
Each camera runs in its own multiprocessing.Process. Responsibilities:
1. Open RTSP stream via OpenCV with reconnect logic
2. Run MOG2 motion detection on every frame (downscaled)
3. Manage adaptive FPS recording (2 FPS idle → 30 FPS on motion)
4. Feed frames to HLS streamer for live viewing
5. Publish motion events and heartbeats via MQTT
"""
import logging
import signal
import time
import cv2
import numpy as np
from vigilar.bus import MessageBus
from vigilar.camera.hls import HLSStreamer, RemoteHLSStreamer
from vigilar.camera.motion import MotionDetector
from vigilar.camera.recorder import AdaptiveRecorder
from vigilar.camera.ring_buffer import RingBuffer
from vigilar.config import CameraConfig, MQTTConfig, RemoteConfig
from vigilar.constants import Topics
log = logging.getLogger(__name__)
class CameraState:
"""Tracks the current state of a camera worker."""
def __init__(self):
self.connected = False
self.motion_active = False
self.motion_start_time: float = 0
self.last_motion_time: float = 0
self.fps_actual: float = 0
self.frame_count: int = 0
self.reconnect_count: int = 0
self.idle_frame_skip: int = 0
def run_camera_worker(
camera_cfg: CameraConfig,
mqtt_cfg: MQTTConfig,
recordings_dir: str,
hls_dir: str,
remote_cfg: RemoteConfig | None = None,
) -> None:
"""Main entry point for a camera worker process."""
camera_id = camera_cfg.id
log.info("Camera worker starting: %s (%s)", camera_id, camera_cfg.rtsp_url)
# Setup logging for this process
logging.basicConfig(
level=logging.INFO,
format=f"%(asctime)s [{camera_id}] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# MQTT bus
bus = MessageBus(mqtt_cfg, client_id=f"camera-{camera_id}")
bus.connect()
# Subscribe to threshold update commands
def on_threshold_update(topic: str, payload: dict) -> None:
if "sensitivity" in payload:
detector.update_sensitivity(payload["sensitivity"])
if "min_area" in payload:
detector.update_min_area(payload["min_area"])
bus.subscribe(f"vigilar/camera/{camera_id}/config", on_threshold_update)
# Components
ring_buf = RingBuffer(
duration_s=camera_cfg.pre_motion_buffer_s,
max_fps=camera_cfg.motion_fps,
)
detector = MotionDetector(
sensitivity=camera_cfg.motion_sensitivity,
min_area_px=camera_cfg.motion_min_area_px,
zones=camera_cfg.motion_zones,
resolution=tuple(camera_cfg.resolution_motion),
)
recorder = AdaptiveRecorder(
camera_id=camera_id,
recordings_dir=recordings_dir,
idle_fps=camera_cfg.idle_fps,
motion_fps=camera_cfg.motion_fps,
resolution=tuple(camera_cfg.resolution_capture),
)
hls = HLSStreamer(
camera_id=camera_id,
hls_dir=hls_dir,
fps=15, # HLS stream at 15fps for bandwidth
resolution=(640, 360),
)
# Remote HLS streamer (lower quality for WireGuard tunnel)
remote_hls: RemoteHLSStreamer | None = None
if remote_cfg and remote_cfg.enabled:
remote_hls = RemoteHLSStreamer(
camera_id=camera_id,
hls_dir=hls_dir,
fps=remote_cfg.remote_hls_fps,
resolution=tuple(remote_cfg.remote_hls_resolution),
bitrate_kbps=remote_cfg.remote_hls_bitrate_kbps,
)
state = CameraState()
shutdown = False
def handle_signal(signum, frame):
nonlocal shutdown
shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
# Camera capture loop
cap = None
last_heartbeat = 0
fps_timer = time.monotonic()
fps_frame_count = 0
native_fps = camera_cfg.motion_fps # assumed until we read from stream
# Idle frame skip: at 30fps native, skip factor 15 gives 2fps written
idle_skip_factor = max(1, native_fps // camera_cfg.idle_fps)
while not shutdown:
# Connect / reconnect
if cap is None or not cap.isOpened():
state.connected = False
backoff = min(2 ** state.reconnect_count, 60)
if state.reconnect_count > 0:
log.info("Reconnecting to %s in %ds...", camera_id, backoff)
bus.publish_event(Topics.camera_error(camera_id), error="disconnected", reconnect_in=backoff)
time.sleep(backoff)
cap = cv2.VideoCapture(camera_cfg.rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
state.reconnect_count += 1
log.warning("Failed to connect to %s (%s)", camera_id, camera_cfg.rtsp_url)
continue
state.connected = True
state.reconnect_count = 0
native_fps = cap.get(cv2.CAP_PROP_FPS) or camera_cfg.motion_fps
idle_skip_factor = max(1, int(native_fps / camera_cfg.idle_fps))
log.info("Connected to %s (native %d fps, idle skip %d)", camera_id, native_fps, idle_skip_factor)
# Start HLS streamers
hls.start()
if remote_hls:
remote_hls.start()
# Start idle recording if continuous recording is enabled
if camera_cfg.record_continuous:
recorder.start_idle_recording()
# Read frame
ret, frame = cap.read()
if not ret:
log.warning("Frame read failed for %s", camera_id)
cap.release()
cap = None
hls.stop()
if remote_hls:
remote_hls.stop()
recorder.stop_recording()
continue
state.frame_count += 1
fps_frame_count += 1
now = time.monotonic()
# Calculate actual FPS every second
if now - fps_timer >= 1.0:
state.fps_actual = fps_frame_count / (now - fps_timer)
fps_frame_count = 0
fps_timer = now
# Always push to ring buffer (full FPS for pre-motion context)
ring_buf.push(frame.copy())
# Feed HLS streamers
hls.write_frame(frame)
if remote_hls:
remote_hls.write_frame(frame)
# Run motion detection
motion_detected, rects, confidence = detector.detect(frame)
if motion_detected and not state.motion_active:
# --- Motion START ---
state.motion_active = True
state.motion_start_time = now
state.last_motion_time = now
log.info("Motion START on %s (confidence=%.3f, zones=%d)", camera_id, confidence, len(rects))
bus.publish_event(
Topics.camera_motion_start(camera_id),
confidence=confidence,
zones=len(rects),
)
# Switch to motion recording with pre-buffer
if camera_cfg.record_on_motion:
pre_frames = ring_buf.flush()
recorder.start_motion_recording(pre_buffer=pre_frames)
elif motion_detected and state.motion_active:
# Ongoing motion — update timestamp
state.last_motion_time = now
elif not motion_detected and state.motion_active:
# Check if silence period exceeded
silence = now - state.last_motion_time
if silence >= camera_cfg.post_motion_buffer_s:
# --- Motion END ---
duration = now - state.motion_start_time
state.motion_active = False
log.info("Motion END on %s (duration=%.1fs)", camera_id, duration)
segment = recorder.stop_recording()
bus.publish_event(
Topics.camera_motion_end(camera_id),
duration_s=round(duration, 1),
clip_path=segment.file_path if segment else "",
)
# Resume idle recording if continuous mode
if camera_cfg.record_continuous:
recorder.start_idle_recording()
# Write frame to recorder (adaptive FPS)
if recorder.is_recording:
if state.motion_active:
# Motion: write every frame (full FPS)
recorder.write_frame(frame)
else:
# Idle: write every Nth frame
if state.frame_count % idle_skip_factor == 0:
recorder.write_frame(frame)
# Heartbeat every 10 seconds
if now - last_heartbeat >= 10:
last_heartbeat = now
bus.publish_event(
Topics.camera_heartbeat(camera_id),
connected=state.connected,
fps=round(state.fps_actual, 1),
motion=state.motion_active,
recording=recorder.is_recording,
recording_fps=recorder.current_fps,
resolution=list(frame.shape[:2][::-1]), # [w, h]
)
# Shutdown
log.info("Camera worker shutting down: %s", camera_id)
if cap and cap.isOpened():
cap.release()
recorder.stop_recording()
hls.stop()
if remote_hls:
remote_hls.stop()
bus.disconnect()