diff --git a/docs/superpowers/plans/2026-04-02-daily-use-features.md b/docs/superpowers/plans/2026-04-02-daily-use-features.md new file mode 100644 index 0000000..6dca78f --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-daily-use-features.md @@ -0,0 +1,1783 @@ +# Daily Use Features Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add presence detection, person/vehicle AI detection, smart alert profiles, recording timeline, and health monitoring to make Vigilar a system a household relies on daily. + +**Architecture:** Five independent modules (`presence/`, `detection/`, `health/`, `alerts/profiles.py`, `web/static/js/timeline.js`) that communicate via the existing MQTT bus and SQLite storage. The detection module hooks into the camera worker's existing motion pipeline. Alert profiles consume events from both presence and detection to route notifications by role and time of day. + +**Tech Stack:** Python 3.11+, OpenCV DNN (MobileNet-SSD v2), Flask/Jinja2, Bootstrap 5, Canvas API for timeline, subprocess ping for presence, existing paho-mqtt bus. + +**Dependency graph:** +``` +Task 1 (Presence) ──┐ +Task 2 (Detection) ──┼──→ Task 4 (Alert Profiles) +Task 3 (Health) ──┘ +Task 5 (Timeline) ──────→ (after Task 2 schema change) +``` +Tasks 1, 2, 3 are fully parallelizable. Task 4 after 1+2. Task 5 after 2. + +--- + +## Task 1: Multi-Person Presence Detection + +**Files:** +- Create: `vigilar/presence/__init__.py` +- Create: `vigilar/presence/models.py` +- Create: `vigilar/presence/monitor.py` +- Modify: `vigilar/config.py` (add PresenceConfig, PresenceMember) +- Modify: `vigilar/constants.py` (add HouseholdState enum, presence Topics) +- Modify: `vigilar/main.py` (start PresenceMonitor subprocess) +- Test: `tests/unit/test_presence.py` + +- [ ] **Step 1: Add constants and config models** + +In `vigilar/constants.py`, add after `UPSStatus`: + +```python +# --- Household Presence --- + +class HouseholdState(StrEnum): + EMPTY = "EMPTY" + KIDS_HOME = "KIDS_HOME" + ADULTS_HOME = "ADULTS_HOME" + ALL_HOME = "ALL_HOME" +``` + +Add to `Topics` class: + +```python + # Presence + @staticmethod + def presence_member(name: str) -> str: + return f"vigilar/presence/{name}" + + PRESENCE_STATUS = "vigilar/presence/status" +``` + +In `vigilar/config.py`, add before `# --- Rule Config ---`: + +```python +# --- Presence Config --- + +class PresenceMember(BaseModel): + name: str + ip: str + role: str = "adult" # adult | child + +class PresenceConfig(BaseModel): + enabled: bool = False + ping_interval_s: int = 30 + departure_delay_m: int = 10 + method: str = "icmp" # icmp | arping + members: list[PresenceMember] = Field(default_factory=list) + actions: dict[str, str] = Field(default_factory=lambda: { + "EMPTY": "ARMED_AWAY", + "ADULTS_HOME": "DISARMED", + "KIDS_HOME": "ARMED_HOME", + "ALL_HOME": "DISARMED", + }) +``` + +Add `presence: PresenceConfig = Field(default_factory=PresenceConfig)` to `VigilarConfig` after `remote`. + +- [ ] **Step 2: Write failing tests** + +Create `tests/unit/test_presence.py`: + +```python +"""Tests for presence detection.""" + +import time +from unittest.mock import patch + +from vigilar.config import PresenceConfig, PresenceMember +from vigilar.constants import HouseholdState +from vigilar.presence.models import MemberPresence, derive_household_state + + +class TestDeriveHouseholdState: + def test_empty_when_no_members_home(self): + members = [ + MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0), + MemberPresence(name="Mom", role="adult", is_home=False, last_seen=0), + ] + assert derive_household_state(members) == HouseholdState.EMPTY + + def test_all_home(self): + members = [ + MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0), + MemberPresence(name="Mom", role="adult", is_home=True, last_seen=0), + MemberPresence(name="Kid", role="child", is_home=True, last_seen=0), + ] + assert derive_household_state(members) == HouseholdState.ALL_HOME + + def test_kids_home_only(self): + members = [ + MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0), + MemberPresence(name="Kid", role="child", is_home=True, last_seen=0), + ] + assert derive_household_state(members) == HouseholdState.KIDS_HOME + + def test_adults_home_not_all(self): + members = [ + MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0), + MemberPresence(name="Kid", role="child", is_home=False, last_seen=0), + ] + assert derive_household_state(members) == HouseholdState.ADULTS_HOME + + def test_adults_home_no_children_configured(self): + members = [ + MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0), + ] + assert derive_household_state(members) == HouseholdState.ALL_HOME + + def test_empty_list(self): + assert derive_household_state([]) == HouseholdState.EMPTY + + +class TestPingHost: + @patch("vigilar.presence.monitor.subprocess.run") + def test_ping_success(self, mock_run): + from vigilar.presence.monitor import ping_host + mock_run.return_value = type("R", (), {"returncode": 0})() + assert ping_host("192.168.1.50") is True + + @patch("vigilar.presence.monitor.subprocess.run") + def test_ping_failure(self, mock_run): + from vigilar.presence.monitor import ping_host + mock_run.return_value = type("R", (), {"returncode": 1})() + assert ping_host("192.168.1.50") is False + + @patch("vigilar.presence.monitor.subprocess.run") + def test_ping_timeout(self, mock_run): + import subprocess as sp + from vigilar.presence.monitor import ping_host + mock_run.side_effect = sp.TimeoutExpired(cmd="ping", timeout=2) + assert ping_host("192.168.1.50") is False + + +class TestDepartureDelay: + def test_not_departed_within_delay(self): + from vigilar.presence.monitor import should_mark_away + last_seen = time.monotonic() - 300 # 5 min ago + assert should_mark_away(last_seen, departure_delay_m=10) is False + + def test_departed_after_delay(self): + from vigilar.presence.monitor import should_mark_away + last_seen = time.monotonic() - 700 # 11+ min ago + assert should_mark_away(last_seen, departure_delay_m=10) is True +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v` +Expected: FAIL (modules don't exist yet) + +- [ ] **Step 4: Implement presence models** + +Create `vigilar/presence/__init__.py` (empty). + +Create `vigilar/presence/models.py`: + +```python +"""Presence detection data models.""" + +from dataclasses import dataclass + +from vigilar.constants import HouseholdState + + +@dataclass +class MemberPresence: + name: str + role: str # "adult" | "child" + is_home: bool + last_seen: float # monotonic timestamp of last successful ping + + +def derive_household_state(members: list[MemberPresence]) -> HouseholdState: + if not members: + return HouseholdState.EMPTY + + home = [m for m in members if m.is_home] + if not home: + return HouseholdState.EMPTY + + all_home = len(home) == len(members) + adults_home = any(m.role == "adult" for m in home) + kids_home = any(m.role == "child" for m in home) + + if all_home: + return HouseholdState.ALL_HOME + if adults_home: + return HouseholdState.ADULTS_HOME + if kids_home: + return HouseholdState.KIDS_HOME + return HouseholdState.EMPTY +``` + +- [ ] **Step 5: Implement presence monitor** + +Create `vigilar/presence/monitor.py`: + +```python +"""Presence monitor — pings family phones to determine who's home.""" + +import logging +import signal +import subprocess +import time + +from vigilar.bus import MessageBus +from vigilar.config import VigilarConfig +from vigilar.constants import HouseholdState, Topics +from vigilar.presence.models import MemberPresence, derive_household_state + +log = logging.getLogger(__name__) + + +def ping_host(ip: str, timeout_s: int = 2) -> bool: + try: + result = subprocess.run( + ["ping", "-c", "1", "-W", str(timeout_s), ip], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + timeout=timeout_s + 1, + ) + return result.returncode == 0 + except subprocess.TimeoutExpired: + return False + except FileNotFoundError: + log.error("ping command not found") + return False + + +def should_mark_away(last_seen: float, departure_delay_m: int) -> bool: + elapsed_m = (time.monotonic() - last_seen) / 60 + return elapsed_m >= departure_delay_m + + +class PresenceMonitor: + def __init__(self, config: VigilarConfig): + self._cfg = config.presence + self._mqtt_cfg = config.mqtt + self._members: list[MemberPresence] = [] + self._last_household_state = HouseholdState.EMPTY + self._bus: MessageBus | None = None + + for m in self._cfg.members: + self._members.append(MemberPresence( + name=m.name, role=m.role, is_home=False, last_seen=0, + )) + + def run(self) -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [presence] %(levelname)s: %(message)s", + ) + + self._bus = MessageBus(self._mqtt_cfg, client_id="presence-monitor") + self._bus.connect() + + shutdown = False + def handle_signal(signum, frame): + nonlocal shutdown + shutdown = True + signal.signal(signal.SIGTERM, handle_signal) + + log.info("Presence monitor started, tracking %d members", len(self._members)) + + while not shutdown: + self._poll_once() + for _ in range(self._cfg.ping_interval_s): + if shutdown: + break + time.sleep(1) + + if self._bus: + self._bus.disconnect() + log.info("Presence monitor stopped") + + def _poll_once(self) -> None: + for member in self._members: + reachable = ping_host(member.ip) + if reachable: + member.is_home = True + member.last_seen = time.monotonic() + elif member.is_home: + if should_mark_away(member.last_seen, self._cfg.departure_delay_m): + member.is_home = False + log.info("%s departed", member.name) + + if self._bus: + self._bus.publish_event( + Topics.presence_member(member.name), + state="HOME" if member.is_home else "AWAY", + name=member.name, + role=member.role, + ) + + new_state = derive_household_state(self._members) + if new_state != self._last_household_state: + log.info("Household state: %s -> %s", self._last_household_state, new_state) + self._last_household_state = new_state + + if self._bus: + members_dict = {m.name: "HOME" if m.is_home else "AWAY" for m in self._members} + self._bus.publish_event( + Topics.PRESENCE_STATUS, + household=new_state.value, + members=members_dict, + ) + + +def run_presence_monitor(config: VigilarConfig) -> None: + monitor = PresenceMonitor(config) + monitor.run() +``` + +- [ ] **Step 6: Wire into supervisor** + +In `vigilar/main.py`, add after the UPS monitor block: + +```python + # Presence monitor + if cfg.presence.enabled and cfg.presence.members: + from vigilar.presence.monitor import run_presence_monitor + subsystems.append(SubsystemProcess("presence-monitor", run_presence_monitor, (cfg,))) +``` + +- [ ] **Step 7: Run tests to verify they pass** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v` +Expected: All 9 tests PASS + +- [ ] **Step 8: Run full suite** + +Run: `source .venv/bin/activate && python -m pytest tests/ -v` +Expected: All tests PASS (96 existing + 9 new = 105) + +- [ ] **Step 9: Commit** + +```bash +git add vigilar/presence/ vigilar/constants.py vigilar/config.py vigilar/main.py tests/unit/test_presence.py +git commit -m "feat: add multi-person presence detection + +Track family phones via ping, derive household state +(EMPTY/KIDS_HOME/ADULTS_HOME/ALL_HOME), publish via MQTT. +Configurable departure delay, per-member roles, auto-arm actions." +``` + +--- + +## Task 2: Person + Vehicle Detection + +**Files:** +- Create: `vigilar/detection/__init__.py` +- Create: `vigilar/detection/person.py` +- Create: `vigilar/detection/vehicle.py` +- Create: `vigilar/detection/zones.py` +- Create: `vigilar/cli/cmd_calibrate.py` +- Create: `scripts/download_model.sh` +- Modify: `vigilar/constants.py` (add detection event types) +- Modify: `vigilar/config.py` (add DetectionConfig, CameraZone, VehicleConfig) +- Modify: `vigilar/storage/schema.py` (add detection_type, starred columns to recordings) +- Modify: `vigilar/camera/worker.py` (add detection stage after motion) +- Modify: `vigilar/cli/main.py` (register calibrate command) +- Test: `tests/unit/test_detection.py` + +- [ ] **Step 1: Add constants and config** + +In `vigilar/constants.py`, add to `EventType`: + +```python + PERSON_DETECTED = "PERSON_DETECTED" + VEHICLE_DETECTED = "VEHICLE_DETECTED" + KNOWN_VEHICLE_ARRIVED = "KNOWN_VEHICLE_ARRIVED" + UNKNOWN_VEHICLE_DETECTED = "UNKNOWN_VEHICLE_DETECTED" +``` + +Add to `RecordingTrigger`: + +```python + PERSON = "PERSON" + VEHICLE = "VEHICLE" +``` + +In `vigilar/config.py`, add before `# --- Presence Config ---`: + +```python +# --- Detection Config --- + +class CameraZone(BaseModel): + name: str + region: list[int] = Field(default_factory=list) # [x, y, w, h] + watch_for: list[str] = Field(default_factory=lambda: ["person", "vehicle"]) + alert_unknown_vehicles: bool = False + +class DetectionConfig(BaseModel): + person_detection: bool = False + model_path: str = "/var/vigilar/models/mobilenet_ssd_v2.pb" + model_config: str = "/var/vigilar/models/mobilenet_ssd_v2.pbtxt" + confidence_threshold: float = 0.5 + cameras: list[str] = Field(default_factory=list) # empty = all + +class KnownVehicle(BaseModel): + name: str + color_profile: str = "" # white, black, silver, red, blue, etc. + size_class: str = "" # compact, midsize, large + calibration_file: str = "" + +class VehicleConfig(BaseModel): + known: list[KnownVehicle] = Field(default_factory=list) +``` + +Add `zones: list[CameraZone] = Field(default_factory=list)` to `CameraConfig`. + +Add to `VigilarConfig`: +```python + detection: DetectionConfig = Field(default_factory=DetectionConfig) + vehicles: VehicleConfig = Field(default_factory=VehicleConfig) +``` + +- [ ] **Step 2: Update schema** + +In `vigilar/storage/schema.py`, add two columns to the `recordings` table: + +```python + Column("detection_type", String), # person, vehicle, motion, None + Column("starred", Integer, nullable=False, default=0), +``` + +Add after the `Column("thumbnail_path", ...)` line and before the closing `)`. + +- [ ] **Step 3: Write failing tests** + +Create `tests/unit/test_detection.py`: + +```python +"""Tests for person and vehicle detection.""" + +import numpy as np + +from vigilar.detection.person import Detection, PersonDetector +from vigilar.detection.vehicle import classify_dominant_color, classify_size +from vigilar.detection.zones import filter_detections_by_zone + + +class TestPersonDetector: + def test_detector_initializes_without_model(self): + detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt") + assert not detector.is_loaded + + def test_detect_returns_empty_when_not_loaded(self): + detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt") + frame = np.zeros((480, 640, 3), dtype=np.uint8) + detections = detector.detect(frame) + assert detections == [] + + def test_detection_dataclass(self): + d = Detection(class_name="person", class_id=1, confidence=0.85, bbox=(10, 20, 100, 200)) + assert d.class_name == "person" + assert d.confidence == 0.85 + + +class TestVehicleColor: + def test_white_detection(self): + white_region = np.full((100, 100, 3), 240, dtype=np.uint8) + color = classify_dominant_color(white_region) + assert color == "white" + + def test_black_detection(self): + black_region = np.full((100, 100, 3), 15, dtype=np.uint8) + color = classify_dominant_color(black_region) + assert color == "black" + + def test_size_compact(self): + assert classify_size(bbox_area=5000, zone_area=100000) == "compact" + + def test_size_midsize(self): + assert classify_size(bbox_area=15000, zone_area=100000) == "midsize" + + def test_size_large(self): + assert classify_size(bbox_area=30000, zone_area=100000) == "large" + + +class TestZoneFiltering: + def test_detection_inside_zone(self): + detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))] + zone_region = (0, 0, 200, 200) + filtered = filter_detections_by_zone(detections, zone_region, ["person"]) + assert len(filtered) == 1 + + def test_detection_outside_zone(self): + detections = [Detection("person", 1, 0.9, (300, 300, 50, 50))] + zone_region = (0, 0, 200, 200) + filtered = filter_detections_by_zone(detections, zone_region, ["person"]) + assert len(filtered) == 0 + + def test_filter_by_class(self): + detections = [ + Detection("person", 1, 0.9, (50, 50, 80, 80)), + Detection("car", 3, 0.8, (50, 50, 80, 80)), + ] + zone_region = (0, 0, 200, 200) + filtered = filter_detections_by_zone(detections, zone_region, ["person"]) + assert len(filtered) == 1 + assert filtered[0].class_name == "person" + + def test_no_zone_returns_all(self): + detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))] + filtered = filter_detections_by_zone(detections, None, ["person", "car"]) + assert len(filtered) == 1 +``` + +- [ ] **Step 4: Run tests to verify they fail** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_detection.py -v` +Expected: FAIL + +- [ ] **Step 5: Implement person detector** + +Create `vigilar/detection/__init__.py` (empty). + +Create `vigilar/detection/person.py`: + +```python +"""Person detection using MobileNet-SSD v2 via OpenCV DNN.""" + +import logging +from dataclasses import dataclass +from pathlib import Path + +import cv2 +import numpy as np + +log = logging.getLogger(__name__) + +COCO_CLASSES = { + 1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane", + 6: "bus", 7: "train", 8: "truck", 9: "boat", +} +DETECT_CLASSES = {1, 3, 8} # person, car, truck + + +@dataclass +class Detection: + class_name: str + class_id: int + confidence: float + bbox: tuple[int, int, int, int] # x, y, w, h + + +class PersonDetector: + def __init__(self, model_path: str, config_path: str, confidence_threshold: float = 0.5): + self._threshold = confidence_threshold + self._net = None + self.is_loaded = False + + if Path(model_path).exists() and Path(config_path).exists(): + try: + self._net = cv2.dnn.readNetFromTensorflow(model_path, config_path) + self.is_loaded = True + log.info("Person detection model loaded from %s", model_path) + except cv2.error as e: + log.error("Failed to load detection model: %s", e) + else: + log.warning("Detection model not found at %s — detection disabled", model_path) + + def detect(self, frame: np.ndarray) -> list[Detection]: + if not self.is_loaded or self._net is None: + return [] + + h, w = frame.shape[:2] + blob = cv2.dnn.blobFromImage(frame, size=(300, 300), swapRB=True, crop=False) + self._net.setInput(blob) + output = self._net.forward() + + detections = [] + for i in range(output.shape[2]): + confidence = output[0, 0, i, 2] + if confidence < self._threshold: + continue + class_id = int(output[0, 0, i, 1]) + if class_id not in DETECT_CLASSES: + continue + + x1 = int(output[0, 0, i, 3] * w) + y1 = int(output[0, 0, i, 4] * h) + x2 = int(output[0, 0, i, 5] * w) + y2 = int(output[0, 0, i, 6] * h) + bw, bh = x2 - x1, y2 - y1 + if bw <= 0 or bh <= 0: + continue + + detections.append(Detection( + class_name=COCO_CLASSES.get(class_id, f"class_{class_id}"), + class_id=class_id, + confidence=float(confidence), + bbox=(x1, y1, bw, bh), + )) + + return detections +``` + +- [ ] **Step 6: Implement vehicle fingerprinting** + +Create `vigilar/detection/vehicle.py`: + +```python +"""Vehicle color and size fingerprinting.""" + +import logging +from dataclasses import dataclass +from pathlib import Path + +import cv2 +import numpy as np + +log = logging.getLogger(__name__) + + +@dataclass +class VehicleProfile: + name: str + color_profile: str + size_class: str + histogram: np.ndarray | None = None + + +def classify_dominant_color(region: np.ndarray) -> str: + hsv = cv2.cvtColor(region, cv2.COLOR_BGR2HSV) + h, s, v = cv2.split(hsv) + mean_s = float(np.mean(s)) + mean_v = float(np.mean(v)) + + if mean_v < 40: + return "black" + if mean_s < 30 and mean_v > 180: + return "white" + if mean_s < 40: + return "silver" + + mean_h = float(np.mean(h)) + if mean_h < 10 or mean_h > 170: + return "red" + if 10 <= mean_h < 25: + return "orange" + if 25 <= mean_h < 35: + return "yellow" + if 35 <= mean_h < 85: + return "green" + if 85 <= mean_h < 130: + return "blue" + return "unknown" + + +def classify_size(bbox_area: int, zone_area: int) -> str: + if zone_area <= 0: + return "unknown" + ratio = bbox_area / zone_area + if ratio < 0.1: + return "compact" + if ratio < 0.25: + return "midsize" + return "large" + + +class VehicleFingerprint: + def __init__(self, known_vehicles: list[VehicleProfile] | None = None): + self._known = known_vehicles or [] + + def match(self, region: np.ndarray, bbox_area: int, zone_area: int) -> VehicleProfile | None: + color = classify_dominant_color(region) + size = classify_size(bbox_area, zone_area) + + for profile in self._known: + if profile.color_profile == color and profile.size_class == size: + return profile + return None + + def add_profile(self, profile: VehicleProfile) -> None: + self._known.append(profile) + + @property + def known_count(self) -> int: + return len(self._known) +``` + +- [ ] **Step 7: Implement zone filtering** + +Create `vigilar/detection/zones.py`: + +```python +"""Zone-based detection filtering.""" + +from vigilar.detection.person import Detection + + +def _bbox_center(bbox: tuple[int, int, int, int]) -> tuple[int, int]: + x, y, w, h = bbox + return x + w // 2, y + h // 2 + + +def _point_in_rect(point: tuple[int, int], rect: tuple[int, int, int, int]) -> bool: + px, py = point + rx, ry, rw, rh = rect + return rx <= px <= rx + rw and ry <= py <= ry + rh + + +def filter_detections_by_zone( + detections: list[Detection], + zone_region: tuple[int, int, int, int] | None, + watch_for: list[str], +) -> list[Detection]: + result = [] + for d in detections: + if d.class_name not in watch_for: + continue + if zone_region is not None: + center = _bbox_center(d.bbox) + if not _point_in_rect(center, zone_region): + continue + result.append(d) + return result +``` + +- [ ] **Step 8: Create model download script** + +Create `scripts/download_model.sh`: + +```bash +#!/usr/bin/env bash +set -euo pipefail + +MODEL_DIR="${1:-/var/vigilar/models}" +mkdir -p "$MODEL_DIR" + +PB_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pb" +PBTXT_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pbtxt" + +echo "Downloading MobileNet-SSD v2 model..." +if command -v curl &>/dev/null; then + curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL" + curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL" +elif command -v wget &>/dev/null; then + wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL" + wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL" +else + echo "ERROR: curl or wget required" + exit 1 +fi + +echo "Model downloaded to $MODEL_DIR" +ls -lh "$MODEL_DIR"/mobilenet_ssd_v2.* +``` + +Run: `chmod +x scripts/download_model.sh` + +- [ ] **Step 9: Wire detection into camera worker** + +In `vigilar/camera/worker.py`, add import at the top: + +```python +from vigilar.detection.person import PersonDetector +from vigilar.detection.vehicle import VehicleFingerprint, VehicleProfile, classify_dominant_color, classify_size +from vigilar.detection.zones import filter_detections_by_zone +from vigilar.config import CameraConfig, DetectionConfig, MQTTConfig, RemoteConfig, VehicleConfig +``` + +After the `hls` and `remote_hls` initialization, add: + +```python + # Person/vehicle detector (second-stage after MOG2) + person_detector: PersonDetector | None = None + vehicle_fp: VehicleFingerprint | None = None + det_cfg = None # will be passed as new parameter + + if det_cfg and det_cfg.person_detection: + if not det_cfg.cameras or camera_id in det_cfg.cameras: + person_detector = PersonDetector( + det_cfg.model_path, det_cfg.model_config, det_cfg.confidence_threshold, + ) + + if veh_cfg and veh_cfg.known: + profiles = [VehicleProfile(v.name, v.color_profile, v.size_class) for v in veh_cfg.known] + vehicle_fp = VehicleFingerprint(profiles) +``` + +In the motion detection block, after `bus.publish_event(Topics.camera_motion_start(...))`, add the detection second-stage: + +```python + # Second-stage: person/vehicle detection + detection_type = "motion" + if person_detector and person_detector.is_loaded: + detections = person_detector.detect(frame) + persons = [d for d in detections if d.class_name == "person"] + vehicles = [d for d in detections if d.class_name in ("car", "truck")] + + if persons: + detection_type = "person" + bus.publish_event( + f"vigilar/camera/{camera_id}/person", + count=len(persons), + confidence=max(d.confidence for d in persons), + ) + elif vehicles: + detection_type = "vehicle" + # Vehicle fingerprinting + if vehicle_fp: + for vd in vehicles: + x, y, w, h = vd.bbox + crop = frame[max(0,y):y+h, max(0,x):x+w] + if crop.size > 0: + match = vehicle_fp.match(crop, w*h, frame.shape[0]*frame.shape[1]) + if match: + bus.publish_event( + f"vigilar/camera/{camera_id}/vehicle/known", + vehicle=match.name, + ) + else: + bus.publish_event( + f"vigilar/camera/{camera_id}/vehicle/unknown", + color=classify_dominant_color(crop), + ) + else: + bus.publish_event( + f"vigilar/camera/{camera_id}/vehicle", + count=len(vehicles), + ) +``` + +Update the `run_camera_worker` function signature to accept `det_cfg` and `veh_cfg` parameters. Update `camera/manager.py` to pass them. + +- [ ] **Step 10: Run tests** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_detection.py tests/ -v` +Expected: All PASS + +- [ ] **Step 11: Commit** + +```bash +git add vigilar/detection/ vigilar/constants.py vigilar/config.py vigilar/storage/schema.py vigilar/camera/worker.py vigilar/camera/manager.py scripts/download_model.sh tests/unit/test_detection.py +git commit -m "feat: add person/vehicle detection with zone fencing + +MobileNet-SSD v2 via OpenCV DNN as second-stage filter after MOG2. +Vehicle color/size fingerprinting for known car matching. +Zone-based filtering per camera. Model download script." +``` + +--- + +## Task 3: Health Monitoring + Self-Healing + +**Files:** +- Create: `vigilar/health/__init__.py` +- Create: `vigilar/health/monitor.py` +- Create: `vigilar/health/pruner.py` +- Create: `vigilar/health/digest.py` +- Modify: `vigilar/config.py` (add HealthConfig) +- Modify: `vigilar/main.py` (start HealthMonitor) +- Test: `tests/unit/test_health.py` + +- [ ] **Step 1: Add config** + +In `vigilar/config.py`, add before `# --- Rule Config ---`: + +```python +# --- Health Config --- + +class HealthConfig(BaseModel): + enabled: bool = True + disk_warn_pct: int = 85 + disk_critical_pct: int = 95 + auto_prune: bool = True + auto_prune_target_pct: int = 80 + daily_digest: bool = True + daily_digest_time: str = "08:00" +``` + +Add `health: HealthConfig = Field(default_factory=HealthConfig)` to `VigilarConfig`. + +- [ ] **Step 2: Write failing tests** + +Create `tests/unit/test_health.py`: + +```python +"""Tests for health monitoring.""" + +import shutil +import tempfile +from pathlib import Path +from unittest.mock import patch + +from vigilar.health.pruner import find_prunable_recordings, calculate_disk_usage_pct +from vigilar.health.monitor import HealthCheck, HealthStatus, check_disk, check_mqtt_port + + +class TestDiskCheck: + @patch("vigilar.health.monitor.shutil.disk_usage") + def test_healthy(self, mock_usage): + mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 50_000_000_000, "free": 50_000_000_000})() + result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95) + assert result.status == HealthStatus.HEALTHY + + @patch("vigilar.health.monitor.shutil.disk_usage") + def test_warning(self, mock_usage): + mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 90_000_000_000, "free": 10_000_000_000})() + result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95) + assert result.status == HealthStatus.WARNING + + @patch("vigilar.health.monitor.shutil.disk_usage") + def test_critical(self, mock_usage): + mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 97_000_000_000, "free": 3_000_000_000})() + result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95) + assert result.status == HealthStatus.CRITICAL + + +class TestPruner: + def test_calculate_disk_pct(self): + with tempfile.TemporaryDirectory() as d: + pct = calculate_disk_usage_pct(d) + assert 0 <= pct <= 100 + + def test_find_prunable_empty(self, test_db): + result = find_prunable_recordings(test_db, limit=10) + assert result == [] + + +class TestMQTTCheck: + @patch("vigilar.health.monitor.socket.create_connection") + def test_mqtt_reachable(self, mock_conn): + mock_conn.return_value.__enter__ = lambda s: s + mock_conn.return_value.__exit__ = lambda s, *a: None + result = check_mqtt_port("127.0.0.1", 1883) + assert result.status == HealthStatus.HEALTHY + + @patch("vigilar.health.monitor.socket.create_connection") + def test_mqtt_unreachable(self, mock_conn): + mock_conn.side_effect = ConnectionRefusedError() + result = check_mqtt_port("127.0.0.1", 1883) + assert result.status == HealthStatus.CRITICAL +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_health.py -v` +Expected: FAIL + +- [ ] **Step 4: Implement health monitor** + +Create `vigilar/health/__init__.py` (empty). + +Create `vigilar/health/monitor.py`: + +```python +"""Health monitoring — periodic subsystem checks.""" + +import logging +import shutil +import signal +import socket +import time +from dataclasses import dataclass, field +from enum import StrEnum + +from vigilar.bus import MessageBus +from vigilar.config import VigilarConfig + +log = logging.getLogger(__name__) + + +class HealthStatus(StrEnum): + HEALTHY = "healthy" + WARNING = "warning" + CRITICAL = "critical" + + +@dataclass +class HealthCheck: + name: str + status: HealthStatus + message: str = "" + value: float = 0 + + +def check_disk(path: str, warn_pct: int, critical_pct: int) -> HealthCheck: + try: + usage = shutil.disk_usage(path) + pct = usage.used / usage.total * 100 + if pct >= critical_pct: + return HealthCheck("disk", HealthStatus.CRITICAL, f"{pct:.0f}% used", pct) + if pct >= warn_pct: + return HealthCheck("disk", HealthStatus.WARNING, f"{pct:.0f}% used", pct) + return HealthCheck("disk", HealthStatus.HEALTHY, f"{pct:.0f}% used", pct) + except OSError as e: + return HealthCheck("disk", HealthStatus.CRITICAL, str(e)) + + +def check_mqtt_port(host: str, port: int) -> HealthCheck: + try: + with socket.create_connection((host, port), timeout=5): + return HealthCheck("mqtt", HealthStatus.HEALTHY, "reachable") + except (ConnectionRefusedError, TimeoutError, OSError) as e: + return HealthCheck("mqtt", HealthStatus.CRITICAL, str(e)) + + +class HealthMonitor: + def __init__(self, config: VigilarConfig): + self._cfg = config + self._bus: MessageBus | None = None + self._checks: list[HealthCheck] = [] + + def run(self) -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [health] %(levelname)s: %(message)s", + ) + + self._bus = MessageBus(self._cfg.mqtt, client_id="health-monitor") + self._bus.connect() + + shutdown = False + def handle_signal(signum, frame): + nonlocal shutdown + shutdown = True + signal.signal(signal.SIGTERM, handle_signal) + + log.info("Health monitor started") + last_disk_check = 0 + last_mqtt_check = 0 + + while not shutdown: + now = time.monotonic() + + if now - last_disk_check >= 300: # 5 min + disk = check_disk(self._cfg.system.data_dir, self._cfg.health.disk_warn_pct, self._cfg.health.disk_critical_pct) + self._update_check(disk) + last_disk_check = now + + if disk.status == HealthStatus.CRITICAL and self._cfg.health.auto_prune: + from vigilar.health.pruner import auto_prune + auto_prune(self._cfg) + + if now - last_mqtt_check >= 30: + mqtt = check_mqtt_port(self._cfg.mqtt.host, self._cfg.mqtt.port) + self._update_check(mqtt) + last_mqtt_check = now + + self._publish_status() + time.sleep(10) + + if self._bus: + self._bus.disconnect() + + def _update_check(self, check: HealthCheck) -> None: + for i, c in enumerate(self._checks): + if c.name == check.name: + self._checks[i] = check + return + self._checks.append(check) + + def _publish_status(self) -> None: + if not self._bus: + return + overall = HealthStatus.HEALTHY + for c in self._checks: + if c.status == HealthStatus.CRITICAL: + overall = HealthStatus.CRITICAL + break + if c.status == HealthStatus.WARNING: + overall = HealthStatus.WARNING + + self._bus.publish_event( + "vigilar/system/health", + status=overall.value, + checks={c.name: {"status": c.status.value, "message": c.message} for c in self._checks}, + ) + + +def run_health_monitor(config: VigilarConfig) -> None: + monitor = HealthMonitor(config) + monitor.run() +``` + +- [ ] **Step 5: Implement auto-pruner** + +Create `vigilar/health/pruner.py`: + +```python +"""Auto-prune old recordings when disk usage is high.""" + +import logging +import os +import shutil +from pathlib import Path + +from sqlalchemy import select +from sqlalchemy.engine import Engine + +from vigilar.config import VigilarConfig +from vigilar.storage.schema import recordings + +log = logging.getLogger(__name__) + + +def calculate_disk_usage_pct(path: str) -> float: + usage = shutil.disk_usage(path) + return usage.used / usage.total * 100 + + +def find_prunable_recordings(engine: Engine, limit: int = 50) -> list[dict]: + query = ( + select(recordings) + .where(recordings.c.starred == 0) + .order_by(recordings.c.started_at.asc()) + .limit(limit) + ) + with engine.connect() as conn: + return [dict(r) for r in conn.execute(query).mappings().all()] + + +def auto_prune(config: VigilarConfig) -> int: + from vigilar.storage.db import get_db_path, get_engine + + target = config.health.auto_prune_target_pct + data_dir = config.system.data_dir + current_pct = calculate_disk_usage_pct(data_dir) + + if current_pct <= target: + return 0 + + engine = get_engine(get_db_path(data_dir)) + pruned = 0 + total_bytes = 0 + + while current_pct > target: + candidates = find_prunable_recordings(engine, limit=20) + if not candidates: + break + + for rec in candidates: + file_path = rec.get("file_path", "") + if file_path and Path(file_path).exists(): + size = Path(file_path).stat().st_size + Path(file_path).unlink() + total_bytes += size + + thumb = rec.get("thumbnail_path", "") + if thumb and Path(thumb).exists(): + Path(thumb).unlink() + + with engine.begin() as conn: + conn.execute(recordings.delete().where(recordings.c.id == rec["id"])) + pruned += 1 + + current_pct = calculate_disk_usage_pct(data_dir) + + if pruned: + mb = total_bytes / (1024 * 1024) + log.info("Auto-pruned %d recordings (%.1f MB), disk now at %.0f%%", pruned, mb, current_pct) + + return pruned +``` + +- [ ] **Step 6: Implement daily digest** + +Create `vigilar/health/digest.py`: + +```python +"""Daily digest notification builder.""" + +import logging +import shutil +import time + +from sqlalchemy import func, select +from sqlalchemy.engine import Engine + +from vigilar.constants import EventType +from vigilar.storage.schema import events, recordings + +log = logging.getLogger(__name__) + + +def build_digest(engine: Engine, data_dir: str, since_hours: int = 12) -> dict: + since_ts = int((time.time() - since_hours * 3600) * 1000) + + with engine.connect() as conn: + person_count = conn.execute( + select(func.count()).select_from(events) + .where(events.c.type == EventType.PERSON_DETECTED, events.c.ts >= since_ts) + ).scalar() or 0 + + vehicle_count = conn.execute( + select(func.count()).select_from(events) + .where(events.c.type == EventType.UNKNOWN_VEHICLE_DETECTED, events.c.ts >= since_ts) + ).scalar() or 0 + + recording_count = conn.execute( + select(func.count()).select_from(recordings) + .where(recordings.c.started_at >= since_ts // 1000) + ).scalar() or 0 + + usage = shutil.disk_usage(data_dir) + disk_pct = usage.used / usage.total * 100 + disk_gb = usage.used / (1024**3) + + return { + "person_detections": person_count, + "unknown_vehicles": vehicle_count, + "recordings": recording_count, + "disk_used_gb": round(disk_gb, 1), + "disk_used_pct": round(disk_pct, 0), + "since_hours": since_hours, + } + + +def format_digest(data: dict) -> str: + return ( + f"Vigilar Daily Summary\n" + f"Last {data['since_hours']}h: " + f"{data['person_detections']} person detections, " + f"{data['unknown_vehicles']} unknown vehicles, " + f"{data['recordings']} recordings\n" + f"Storage: {data['disk_used_gb']} GB ({data['disk_used_pct']:.0f}%)" + ) +``` + +- [ ] **Step 7: Wire into supervisor** + +In `vigilar/main.py`, add after the presence monitor block: + +```python + # Health monitor + if cfg.health.enabled: + from vigilar.health.monitor import run_health_monitor + subsystems.append(SubsystemProcess("health-monitor", run_health_monitor, (cfg,))) +``` + +- [ ] **Step 8: Run tests** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_health.py tests/ -v` +Expected: All PASS + +- [ ] **Step 9: Commit** + +```bash +git add vigilar/health/ vigilar/config.py vigilar/main.py tests/unit/test_health.py +git commit -m "feat: add health monitoring with auto-prune and daily digest + +Periodic disk, MQTT, and subsystem health checks. Auto-prune +oldest non-starred recordings when disk exceeds threshold. +Daily digest notification with overnight summary." +``` + +--- + +## Task 4: Smart Alert Profiles + +**Files:** +- Create: `vigilar/alerts/profiles.py` +- Modify: `vigilar/config.py` (add AlertProfileConfig, AlertProfileRule) +- Modify: `vigilar/alerts/dispatcher.py` (role-based routing) +- Modify: `vigilar/web/templates/settings.html` (profile editor) +- Modify: `vigilar/web/static/js/settings.js` (profile save/load) +- Modify: `vigilar/web/blueprints/system.py` (profile API endpoints) +- Test: `tests/unit/test_profiles.py` + +**Depends on:** Task 1 (HouseholdState) and Task 2 (detection event types) + +- [ ] **Step 1: Add config models** + +In `vigilar/config.py`, add before `# --- Health Config ---`: + +```python +# --- Alert Profile Config --- + +class AlertProfileRule(BaseModel): + detection_type: str # person, unknown_vehicle, known_vehicle, motion + camera_location: str = "any" # any | exterior | common_area | specific camera id + action: str = "record_only" # push_and_record, push_adults, record_only, quiet_log + recipients: str = "all" # all, adults, none + +class AlertProfileConfig(BaseModel): + name: str + enabled: bool = True + presence_states: list[str] = Field(default_factory=list) + time_window: str = "" # "" = all day, "23:00-06:00" = sleep hours + rules: list[AlertProfileRule] = Field(default_factory=list) +``` + +Add `profiles: list[AlertProfileConfig] = Field(default_factory=list)` to `AlertsConfig`. + +Add `sleep_start: str = "23:00"` and `sleep_end: str = "06:00"` to `AlertsConfig`. + +- [ ] **Step 2: Write failing tests** + +Create `tests/unit/test_profiles.py`: + +```python +"""Tests for smart alert profiles.""" + +from vigilar.alerts.profiles import match_profile, is_in_time_window, get_action_for_event +from vigilar.config import AlertProfileConfig, AlertProfileRule +from vigilar.constants import HouseholdState + + +def _make_profile(name, states, time_window="", rules=None): + return AlertProfileConfig( + name=name, + presence_states=states, + time_window=time_window, + rules=rules or [], + ) + + +class TestTimeWindow: + def test_no_window_always_matches(self): + assert is_in_time_window("", "14:30") is True + + def test_inside_window(self): + assert is_in_time_window("23:00-06:00", "02:30") is True + + def test_outside_window(self): + assert is_in_time_window("23:00-06:00", "14:30") is False + + def test_at_start(self): + assert is_in_time_window("23:00-06:00", "23:00") is True + + def test_daytime_window(self): + assert is_in_time_window("06:00-23:00", "14:30") is True + + +class TestProfileMatching: + def test_match_by_presence(self): + profiles = [ + _make_profile("Away", ["EMPTY"]), + _make_profile("Home", ["ADULTS_HOME", "ALL_HOME"]), + ] + result = match_profile(profiles, HouseholdState.EMPTY, "14:00") + assert result is not None + assert result.name == "Away" + + def test_no_match(self): + profiles = [_make_profile("Away", ["EMPTY"])] + result = match_profile(profiles, HouseholdState.ALL_HOME, "14:00") + assert result is None + + def test_time_window_narrows(self): + profiles = [ + _make_profile("Night", ["ALL_HOME"], "23:00-06:00"), + _make_profile("Day", ["ALL_HOME"], "06:00-23:00"), + ] + result = match_profile(profiles, HouseholdState.ALL_HOME, "02:00") + assert result is not None + assert result.name == "Night" + + +class TestActionForEvent: + def test_person_gets_push(self): + rules = [AlertProfileRule(detection_type="person", action="push_and_record", recipients="all")] + profile = _make_profile("Away", ["EMPTY"], rules=rules) + action, recipients = get_action_for_event(profile, "person", "front_door") + assert action == "push_and_record" + assert recipients == "all" + + def test_motion_gets_record_only(self): + rules = [AlertProfileRule(detection_type="motion", action="record_only", recipients="none")] + profile = _make_profile("Home", ["ALL_HOME"], rules=rules) + action, recipients = get_action_for_event(profile, "motion", "front_door") + assert action == "record_only" + + def test_no_matching_rule_defaults_quiet(self): + profile = _make_profile("Home", ["ALL_HOME"], rules=[]) + action, recipients = get_action_for_event(profile, "person", "front_door") + assert action == "quiet_log" + assert recipients == "none" +``` + +- [ ] **Step 3: Run tests to verify fail** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v` +Expected: FAIL + +- [ ] **Step 4: Implement profile engine** + +Create `vigilar/alerts/profiles.py`: + +```python +"""Smart alert profile matching engine.""" + +import logging +from datetime import datetime + +from vigilar.config import AlertProfileConfig, AlertProfileRule +from vigilar.constants import HouseholdState + +log = logging.getLogger(__name__) + + +def is_in_time_window(window: str, current_time: str) -> bool: + if not window: + return True + + parts = window.split("-") + if len(parts) != 2: + return True + + start, end = parts[0].strip(), parts[1].strip() + t = current_time + + if start <= end: + return start <= t < end + else: + return t >= start or t < end + + +def match_profile( + profiles: list[AlertProfileConfig], + household_state: HouseholdState, + current_time: str, +) -> AlertProfileConfig | None: + for profile in profiles: + if not profile.enabled: + continue + if household_state.value not in profile.presence_states: + continue + if not is_in_time_window(profile.time_window, current_time): + continue + return profile + return None + + +def get_action_for_event( + profile: AlertProfileConfig, + detection_type: str, + camera_id: str, +) -> tuple[str, str]: + for rule in profile.rules: + if rule.detection_type != detection_type: + continue + if rule.camera_location not in ("any", camera_id): + continue + return rule.action, rule.recipients + return "quiet_log", "none" +``` + +- [ ] **Step 5: Run tests** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v` +Expected: All 10 tests PASS + +- [ ] **Step 6: Run full suite** + +Run: `source .venv/bin/activate && python -m pytest tests/ -v` +Expected: All PASS + +- [ ] **Step 7: Commit** + +```bash +git add vigilar/alerts/profiles.py vigilar/config.py tests/unit/test_profiles.py +git commit -m "feat: add smart alert profiles with presence/time matching + +Profiles activate based on household state + time of day. +Per-detection-type rules control push/record/quiet behavior. +Role-based recipient filtering (all vs adults-only)." +``` + +--- + +## Task 5: Recording Timeline + +**Files:** +- Create: `vigilar/web/static/js/timeline.js` +- Modify: `vigilar/web/blueprints/recordings.py` (add timeline API) +- Modify: `vigilar/web/templates/recordings.html` (timeline UI) +- Modify: `vigilar/storage/queries.py` (add get_timeline_data) +- Test: `tests/unit/test_timeline.py` + +**Depends on:** Task 2 (detection_type column in recordings) + +- [ ] **Step 1: Add timeline query** + +In `vigilar/storage/queries.py`, add: + +```python +def get_timeline_data( + engine: Engine, + camera_id: str, + day_start_ts: int, + day_end_ts: int, +) -> list[dict[str, Any]]: + query = ( + select( + recordings.c.id, + recordings.c.started_at, + recordings.c.ended_at, + recordings.c.detection_type, + recordings.c.starred, + ) + .where( + recordings.c.camera_id == camera_id, + recordings.c.started_at >= day_start_ts, + recordings.c.started_at < day_end_ts, + ) + .order_by(recordings.c.started_at.asc()) + ) + with engine.connect() as conn: + return [dict(r) for r in conn.execute(query).mappings().all()] +``` + +- [ ] **Step 2: Write failing test** + +Create `tests/unit/test_timeline.py`: + +```python +"""Tests for recording timeline.""" + +import time + +from vigilar.storage.queries import get_timeline_data, insert_recording + + +class TestTimelineQuery: + def test_returns_recordings_for_day(self, test_db): + now = int(time.time()) + insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60, + file_path="/tmp/a.mp4", trigger="MOTION", detection_type="person") + insert_recording(test_db, camera_id="cam1", started_at=now+100, ended_at=now+130, + file_path="/tmp/b.mp4", trigger="MOTION", detection_type="motion") + + results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600) + assert len(results) == 2 + assert results[0]["detection_type"] == "person" + assert results[1]["detection_type"] == "motion" + + def test_filters_by_camera(self, test_db): + now = int(time.time()) + insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60, + file_path="/tmp/a.mp4", trigger="MOTION") + insert_recording(test_db, camera_id="cam2", started_at=now, ended_at=now+60, + file_path="/tmp/b.mp4", trigger="MOTION") + + results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600) + assert len(results) == 1 + + def test_filters_by_time_range(self, test_db): + now = int(time.time()) + insert_recording(test_db, camera_id="cam1", started_at=now - 7200, ended_at=now - 7100, + file_path="/tmp/old.mp4", trigger="MOTION") + insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60, + file_path="/tmp/new.mp4", trigger="MOTION") + + results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600) + assert len(results) == 1 +``` + +- [ ] **Step 3: Run test to verify fail** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py -v` +Expected: FAIL + +- [ ] **Step 4: Add timeline API endpoint** + +In `vigilar/web/blueprints/recordings.py`, add: + +```python +import time as time_mod +from datetime import datetime, timedelta + +@recordings_bp.route("/api/timeline") +def timeline_api(): + """JSON API: timeline data for a camera on a given day.""" + camera_id = request.args.get("camera") + date_str = request.args.get("date") # YYYY-MM-DD + + if not camera_id: + return jsonify({"error": "camera required"}), 400 + + if date_str: + day = datetime.strptime(date_str, "%Y-%m-%d") + else: + day = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + + day_start = int(day.timestamp()) + day_end = int((day + timedelta(days=1)).timestamp()) + + engine = current_app.config.get("DB_ENGINE") + if engine is None: + return jsonify([]) + + from vigilar.storage.queries import get_timeline_data + data = get_timeline_data(engine, camera_id, day_start, day_end) + + return jsonify([ + { + "id": r["id"], + "start": r["started_at"], + "end": r["ended_at"], + "type": r.get("detection_type", "motion"), + "starred": bool(r.get("starred", 0)), + } + for r in data + ]) +``` + +- [ ] **Step 5: Create timeline.js** + +Create `vigilar/web/static/js/timeline.js`: + +```javascript +/* Vigilar — Recording timeline renderer */ + +(function() { + 'use strict'; + + const COLORS = { + person: '#dc3545', + vehicle: '#0d6efd', + motion: '#6c757d', + default: '#6c757d', + }; + + class Timeline { + constructor(canvas, options = {}) { + this.canvas = canvas; + this.ctx = canvas.getContext('2d'); + this.segments = []; + this.cameraId = options.cameraId || ''; + this.date = options.date || new Date().toISOString().split('T')[0]; + this.onClick = options.onClick || null; + this.dayStartTs = 0; + this.dayEndTs = 0; + this._resize(); + this._bindEvents(); + } + + _resize() { + const rect = this.canvas.parentElement.getBoundingClientRect(); + this.canvas.width = rect.width; + this.canvas.height = this.canvas.dataset.height ? parseInt(this.canvas.dataset.height) : 48; + } + + _bindEvents() { + this.canvas.addEventListener('click', (e) => this._handleClick(e)); + window.addEventListener('resize', () => { this._resize(); this.render(); }); + } + + async load() { + const resp = await fetch(`/recordings/api/timeline?camera=${this.cameraId}&date=${this.date}`); + if (!resp.ok) return; + this.segments = await resp.json(); + + const d = new Date(this.date + 'T00:00:00'); + this.dayStartTs = d.getTime() / 1000; + this.dayEndTs = this.dayStartTs + 86400; + this.render(); + } + + render() { + const w = this.canvas.width; + const h = this.canvas.height; + const ctx = this.ctx; + const dur = this.dayEndTs - this.dayStartTs; + + ctx.clearRect(0, 0, w, h); + ctx.fillStyle = '#161b22'; + ctx.fillRect(0, 0, w, h); + + for (const seg of this.segments) { + const x1 = ((seg.start - this.dayStartTs) / dur) * w; + const x2 = seg.end ? ((seg.end - this.dayStartTs) / dur) * w : x1 + 2; + const segW = Math.max(x2 - x1, 2); + ctx.fillStyle = COLORS[seg.type] || COLORS.default; + ctx.fillRect(x1, 4, segW, h - 8); + } + + // Hour markers + ctx.fillStyle = '#30363d'; + ctx.font = '10px sans-serif'; + for (let hour = 0; hour <= 24; hour += 6) { + const x = (hour / 24) * w; + ctx.fillRect(x, 0, 1, h); + if (hour < 24) { + ctx.fillStyle = '#8b949e'; + ctx.fillText(`${hour}:00`, x + 3, h - 2); + ctx.fillStyle = '#30363d'; + } + } + } + + _handleClick(e) { + const rect = this.canvas.getBoundingClientRect(); + const x = e.clientX - rect.left; + const pct = x / this.canvas.width; + const ts = this.dayStartTs + pct * 86400; + + const clicked = this.segments.find(s => ts >= s.start && ts <= (s.end || s.start + 10)); + if (clicked && this.onClick) { + this.onClick(clicked); + } + } + } + + window.VigilarTimeline = Timeline; +})(); +``` + +- [ ] **Step 6: Update recordings template** + +Replace the table in `vigilar/web/templates/recordings.html` with the timeline view. Add a `` per camera, date picker, filter buttons, and a video player area. Use the `VigilarTimeline` class to render each camera's timeline and handle click-to-play. + +- [ ] **Step 7: Run tests** + +Run: `source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py tests/ -v` +Expected: All PASS + +- [ ] **Step 8: Commit** + +```bash +git add vigilar/web/static/js/timeline.js vigilar/web/blueprints/recordings.py vigilar/web/templates/recordings.html vigilar/storage/queries.py tests/unit/test_timeline.py +git commit -m "feat: add recording timeline with color-coded detection types + +Canvas-based 24h timeline per camera. Color-coded segments for +person (red), vehicle (blue), motion (gray). Click to play. +Date picker, filter buttons, hour markers." +``` + +--- + +## Final: Integration Verification + +- [ ] **Step 1: Run full test suite** + +```bash +source .venv/bin/activate && python -m pytest tests/ -v +``` + +Expected: All tests pass (96 original + ~35 new = ~131 total) + +- [ ] **Step 2: Verify all web routes** + +```bash +source .venv/bin/activate && python -c " +from vigilar.config import load_config +from vigilar.web.app import create_app +cfg = load_config() +app = create_app(cfg) +with app.test_client() as c: + for p in ['/', '/system/settings', '/kiosk/', '/events/', '/sensors/', '/recordings/', '/cameras/']: + r = c.get(p) + print(f'{p}: {r.status_code}') +" +``` + +Expected: All 200 + +- [ ] **Step 3: Commit and push** + +```bash +git push +```