Phase 1 (Foundation): project skeleton, TOML config + Pydantic validation, MQTT bus wrapper, SQLite schema (9 tables), Click CLI, process supervisor. Phase 2 (Camera): RTSP capture via OpenCV, MOG2 motion detection with configurable sensitivity/zones, adaptive FPS recording (2fps idle/30fps motion) via FFmpeg subprocess, HLS live streaming, pre-motion ring buffer. Phase 3 (Web UI): Flask + Bootstrap 5 dark theme, 6 blueprints, Jinja2 templates (dashboard, kiosk 2x2 grid, events, sensors, recordings, settings), PWA with service worker + Web Push, full admin settings UI with config persistence. Remote Access: WireGuard tunnel configs, nginx reverse proxy with HLS caching + rate limiting, bandwidth-optimized remote HLS stream (426x240 @ 500kbps), DO droplet setup script, certbot TLS. 29 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
148 lines
5.5 KiB
Python
148 lines
5.5 KiB
Python
"""Motion detection using OpenCV MOG2 background subtractor.
|
|
|
|
Supports configurable sensitivity, minimum contour area,
|
|
and rectangular zone masking.
|
|
"""
|
|
|
|
import logging
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class MotionDetector:
|
|
"""MOG2-based motion detector with zone masking."""
|
|
|
|
def __init__(
|
|
self,
|
|
sensitivity: float = 0.7,
|
|
min_area_px: int = 500,
|
|
zones: list[list[int]] | None = None,
|
|
resolution: tuple[int, int] = (640, 360),
|
|
):
|
|
"""
|
|
Args:
|
|
sensitivity: 0.0 (off) to 1.0 (maximum sensitivity).
|
|
Maps to MOG2 varThreshold inversely: high sensitivity = low threshold.
|
|
min_area_px: Minimum contour area (in pixels) to count as motion.
|
|
zones: List of [x, y, w, h] rectangles to monitor. Empty = whole frame.
|
|
resolution: Resolution to downscale frames to for detection.
|
|
"""
|
|
self._sensitivity = sensitivity
|
|
self._min_area = min_area_px
|
|
self._zones = zones or []
|
|
self._resolution = resolution
|
|
|
|
# MOG2 threshold: sensitivity 1.0 → threshold 8, sensitivity 0.0 → threshold 128
|
|
var_threshold = int(128 - (sensitivity * 120))
|
|
self._subtractor = cv2.createBackgroundSubtractorMOG2(
|
|
history=500,
|
|
varThreshold=var_threshold,
|
|
detectShadows=True,
|
|
)
|
|
# Shadow detection value (127) gets eliminated by threshold
|
|
self._kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
|
self._zone_mask: np.ndarray | None = None
|
|
self._motion_active = False
|
|
self._warmup_frames = 60 # frames before detection is reliable
|
|
self._frame_count = 0
|
|
|
|
def _build_zone_mask(self, height: int, width: int) -> np.ndarray:
|
|
"""Build a binary mask from configured zones."""
|
|
if not self._zones:
|
|
return np.ones((height, width), dtype=np.uint8) * 255
|
|
|
|
mask = np.zeros((height, width), dtype=np.uint8)
|
|
for zone in self._zones:
|
|
if len(zone) == 4:
|
|
x, y, w, h = zone
|
|
# Scale zone coordinates to detection resolution
|
|
sx = width / 1920 # assume zones defined relative to 1920 wide
|
|
sy = height / 1080
|
|
x, y, w, h = int(x * sx), int(y * sy), int(w * sx), int(h * sy)
|
|
cv2.rectangle(mask, (x, y), (x + w, y + h), 255, -1)
|
|
return mask
|
|
|
|
def detect(self, frame: np.ndarray) -> tuple[bool, list[tuple[int, int, int, int]], float]:
|
|
"""Run motion detection on a frame.
|
|
|
|
Args:
|
|
frame: BGR frame from camera (any resolution — will be downscaled).
|
|
|
|
Returns:
|
|
(motion_detected, contour_rects, confidence)
|
|
- motion_detected: True if significant motion found
|
|
- contour_rects: list of (x, y, w, h) bounding boxes in detection coords
|
|
- confidence: 0.0-1.0 score based on motion area relative to frame
|
|
"""
|
|
self._frame_count += 1
|
|
|
|
# Downscale for detection
|
|
det_w, det_h = self._resolution
|
|
small = cv2.resize(frame, (det_w, det_h), interpolation=cv2.INTER_AREA)
|
|
|
|
# Build zone mask lazily
|
|
if self._zone_mask is None:
|
|
self._zone_mask = self._build_zone_mask(det_h, det_w)
|
|
|
|
# Apply MOG2
|
|
fg_mask = self._subtractor.apply(small)
|
|
|
|
# Remove shadows (value 127) and noise
|
|
_, fg_mask = cv2.threshold(fg_mask, 200, 255, cv2.THRESH_BINARY)
|
|
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self._kernel)
|
|
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self._kernel)
|
|
|
|
# Apply zone mask
|
|
fg_mask = cv2.bitwise_and(fg_mask, self._zone_mask)
|
|
|
|
# Skip during warmup (MOG2 needs time to build background model)
|
|
if self._frame_count < self._warmup_frames:
|
|
return False, [], 0.0
|
|
|
|
# Find contours
|
|
contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
rects = []
|
|
total_area = 0
|
|
for contour in contours:
|
|
area = cv2.contourArea(contour)
|
|
if area >= self._min_area:
|
|
rects.append(cv2.boundingRect(contour))
|
|
total_area += area
|
|
|
|
motion_detected = len(rects) > 0
|
|
frame_area = det_w * det_h
|
|
confidence = min(total_area / frame_area, 1.0) if frame_area > 0 else 0.0
|
|
|
|
self._motion_active = motion_detected
|
|
return motion_detected, rects, confidence
|
|
|
|
@property
|
|
def is_motion_active(self) -> bool:
|
|
return self._motion_active
|
|
|
|
def update_sensitivity(self, sensitivity: float) -> None:
|
|
"""Update detection sensitivity at runtime."""
|
|
self._sensitivity = max(0.0, min(1.0, sensitivity))
|
|
var_threshold = int(128 - (self._sensitivity * 120))
|
|
self._subtractor.setVarThreshold(var_threshold)
|
|
log.info("Motion sensitivity updated to %.2f (threshold=%d)", sensitivity, var_threshold)
|
|
|
|
def update_min_area(self, min_area_px: int) -> None:
|
|
"""Update minimum contour area at runtime."""
|
|
self._min_area = max(0, min_area_px)
|
|
|
|
def reset(self) -> None:
|
|
"""Reset the background model (e.g., after scene change)."""
|
|
var_threshold = int(128 - (self._sensitivity * 120))
|
|
self._subtractor = cv2.createBackgroundSubtractorMOG2(
|
|
history=500,
|
|
varThreshold=var_threshold,
|
|
detectShadows=True,
|
|
)
|
|
self._frame_count = 0
|
|
self._zone_mask = None
|