Add presence detection, person/vehicle AI detection, health monitoring

Task 1 — Presence: ping family phones, derive household state
(EMPTY/KIDS_HOME/ADULTS_HOME/ALL_HOME), configurable departure delay,
per-member roles, auto-arm actions via MQTT.

Task 2 — Detection: MobileNet-SSD v2 via OpenCV DNN for person/vehicle
classification. Vehicle color/size fingerprinting for known car matching.
Zone-based filtering per camera. Model download script.

Task 3 — Health: periodic disk/MQTT/subsystem checks, auto-prune oldest
non-starred recordings on disk pressure, daily digest builder.

126 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-04-03 00:06:45 -04:00
parent 8a65ac8c69
commit 8314a61815
19 changed files with 926 additions and 0 deletions

View File

View File

@@ -0,0 +1,76 @@
"""Person detection using MobileNet-SSD v2 via OpenCV DNN."""
import logging
from dataclasses import dataclass
from pathlib import Path
import cv2
import numpy as np
log = logging.getLogger(__name__)
COCO_CLASSES = {
1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane",
6: "bus", 7: "train", 8: "truck", 9: "boat",
}
DETECT_CLASSES = {1, 3, 8} # person, car, truck
@dataclass
class Detection:
class_name: str
class_id: int
confidence: float
bbox: tuple[int, int, int, int] # x, y, w, h
class PersonDetector:
def __init__(self, model_path: str, config_path: str, confidence_threshold: float = 0.5):
self._threshold = confidence_threshold
self._net = None
self.is_loaded = False
if Path(model_path).exists() and Path(config_path).exists():
try:
self._net = cv2.dnn.readNetFromTensorflow(model_path, config_path)
self.is_loaded = True
log.info("Person detection model loaded from %s", model_path)
except cv2.error as e:
log.error("Failed to load detection model: %s", e)
else:
log.warning("Detection model not found at %s — detection disabled", model_path)
def detect(self, frame: np.ndarray) -> list[Detection]:
if not self.is_loaded or self._net is None:
return []
h, w = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, size=(300, 300), swapRB=True, crop=False)
self._net.setInput(blob)
output = self._net.forward()
detections = []
for i in range(output.shape[2]):
confidence = output[0, 0, i, 2]
if confidence < self._threshold:
continue
class_id = int(output[0, 0, i, 1])
if class_id not in DETECT_CLASSES:
continue
x1 = int(output[0, 0, i, 3] * w)
y1 = int(output[0, 0, i, 4] * h)
x2 = int(output[0, 0, i, 5] * w)
y2 = int(output[0, 0, i, 6] * h)
bw, bh = x2 - x1, y2 - y1
if bw <= 0 or bh <= 0:
continue
detections.append(Detection(
class_name=COCO_CLASSES.get(class_id, f"class_{class_id}"),
class_id=class_id,
confidence=float(confidence),
bbox=(x1, y1, bw, bh),
))
return detections

View File

@@ -0,0 +1,76 @@
"""Vehicle color and size fingerprinting."""
import logging
from dataclasses import dataclass
import cv2
import numpy as np
log = logging.getLogger(__name__)
@dataclass
class VehicleProfile:
name: str
color_profile: str
size_class: str
histogram: np.ndarray | None = None
def classify_dominant_color(region: np.ndarray) -> str:
hsv = cv2.cvtColor(region, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
mean_s = float(np.mean(s))
mean_v = float(np.mean(v))
if mean_v < 40:
return "black"
if mean_s < 30 and mean_v > 180:
return "white"
if mean_s < 40:
return "silver"
mean_h = float(np.mean(h))
if mean_h < 10 or mean_h > 170:
return "red"
if 10 <= mean_h < 25:
return "orange"
if 25 <= mean_h < 35:
return "yellow"
if 35 <= mean_h < 85:
return "green"
if 85 <= mean_h < 130:
return "blue"
return "unknown"
def classify_size(bbox_area: int, zone_area: int) -> str:
if zone_area <= 0:
return "unknown"
ratio = bbox_area / zone_area
if ratio < 0.1:
return "compact"
if ratio < 0.25:
return "midsize"
return "large"
class VehicleFingerprint:
def __init__(self, known_vehicles: list[VehicleProfile] | None = None):
self._known = known_vehicles or []
def match(self, region: np.ndarray, bbox_area: int, zone_area: int) -> VehicleProfile | None:
color = classify_dominant_color(region)
size = classify_size(bbox_area, zone_area)
for profile in self._known:
if profile.color_profile == color and profile.size_class == size:
return profile
return None
def add_profile(self, profile: VehicleProfile) -> None:
self._known.append(profile)
@property
def known_count(self) -> int:
return len(self._known)

View File

@@ -0,0 +1,31 @@
"""Zone-based detection filtering."""
from vigilar.detection.person import Detection
def _bbox_center(bbox: tuple[int, int, int, int]) -> tuple[int, int]:
x, y, w, h = bbox
return x + w // 2, y + h // 2
def _point_in_rect(point: tuple[int, int], rect: tuple[int, int, int, int]) -> bool:
px, py = point
rx, ry, rw, rh = rect
return rx <= px <= rx + rw and ry <= py <= ry + rh
def filter_detections_by_zone(
detections: list[Detection],
zone_region: tuple[int, int, int, int] | None,
watch_for: list[str],
) -> list[Detection]:
result = []
for d in detections:
if d.class_name not in watch_for:
continue
if zone_region is not None:
center = _bbox_center(d.bbox)
if not _point_in_rect(center, zone_region):
continue
result.append(d)
return result