# Daily Use Features Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add presence detection, person/vehicle AI detection, smart alert profiles, recording timeline, and health monitoring to make Vigilar a system a household relies on daily. **Architecture:** Five independent modules (`presence/`, `detection/`, `health/`, `alerts/profiles.py`, `web/static/js/timeline.js`) that communicate via the existing MQTT bus and SQLite storage. The detection module hooks into the camera worker's existing motion pipeline. Alert profiles consume events from both presence and detection to route notifications by role and time of day. **Tech Stack:** Python 3.11+, OpenCV DNN (MobileNet-SSD v2), Flask/Jinja2, Bootstrap 5, Canvas API for timeline, subprocess ping for presence, existing paho-mqtt bus. **Dependency graph:** ``` Task 1 (Presence) ──┐ Task 2 (Detection) ──┼──→ Task 4 (Alert Profiles) Task 3 (Health) ──┘ Task 5 (Timeline) ──────→ (after Task 2 schema change) ``` Tasks 1, 2, 3 are fully parallelizable. Task 4 after 1+2. Task 5 after 2. --- ## Task 1: Multi-Person Presence Detection **Files:** - Create: `vigilar/presence/__init__.py` - Create: `vigilar/presence/models.py` - Create: `vigilar/presence/monitor.py` - Modify: `vigilar/config.py` (add PresenceConfig, PresenceMember) - Modify: `vigilar/constants.py` (add HouseholdState enum, presence Topics) - Modify: `vigilar/main.py` (start PresenceMonitor subprocess) - Test: `tests/unit/test_presence.py` - [ ] **Step 1: Add constants and config models** In `vigilar/constants.py`, add after `UPSStatus`: ```python # --- Household Presence --- class HouseholdState(StrEnum): EMPTY = "EMPTY" KIDS_HOME = "KIDS_HOME" ADULTS_HOME = "ADULTS_HOME" ALL_HOME = "ALL_HOME" ``` Add to `Topics` class: ```python # Presence @staticmethod def presence_member(name: str) -> str: return f"vigilar/presence/{name}" PRESENCE_STATUS = "vigilar/presence/status" ``` In `vigilar/config.py`, add before `# --- Rule Config ---`: ```python # --- Presence Config --- class PresenceMember(BaseModel): name: str ip: str role: str = "adult" # adult | child class PresenceConfig(BaseModel): enabled: bool = False ping_interval_s: int = 30 departure_delay_m: int = 10 method: str = "icmp" # icmp | arping members: list[PresenceMember] = Field(default_factory=list) actions: dict[str, str] = Field(default_factory=lambda: { "EMPTY": "ARMED_AWAY", "ADULTS_HOME": "DISARMED", "KIDS_HOME": "ARMED_HOME", "ALL_HOME": "DISARMED", }) ``` Add `presence: PresenceConfig = Field(default_factory=PresenceConfig)` to `VigilarConfig` after `remote`. - [ ] **Step 2: Write failing tests** Create `tests/unit/test_presence.py`: ```python """Tests for presence detection.""" import time from unittest.mock import patch from vigilar.config import PresenceConfig, PresenceMember from vigilar.constants import HouseholdState from vigilar.presence.models import MemberPresence, derive_household_state class TestDeriveHouseholdState: def test_empty_when_no_members_home(self): members = [ MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0), MemberPresence(name="Mom", role="adult", is_home=False, last_seen=0), ] assert derive_household_state(members) == HouseholdState.EMPTY def test_all_home(self): members = [ MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0), MemberPresence(name="Mom", role="adult", is_home=True, last_seen=0), MemberPresence(name="Kid", role="child", is_home=True, last_seen=0), ] assert derive_household_state(members) == HouseholdState.ALL_HOME def test_kids_home_only(self): members = [ MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0), MemberPresence(name="Kid", role="child", is_home=True, last_seen=0), ] assert derive_household_state(members) == HouseholdState.KIDS_HOME def test_adults_home_not_all(self): members = [ MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0), MemberPresence(name="Kid", role="child", is_home=False, last_seen=0), ] assert derive_household_state(members) == HouseholdState.ADULTS_HOME def test_adults_home_no_children_configured(self): members = [ MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0), ] assert derive_household_state(members) == HouseholdState.ALL_HOME def test_empty_list(self): assert derive_household_state([]) == HouseholdState.EMPTY class TestPingHost: @patch("vigilar.presence.monitor.subprocess.run") def test_ping_success(self, mock_run): from vigilar.presence.monitor import ping_host mock_run.return_value = type("R", (), {"returncode": 0})() assert ping_host("192.168.1.50") is True @patch("vigilar.presence.monitor.subprocess.run") def test_ping_failure(self, mock_run): from vigilar.presence.monitor import ping_host mock_run.return_value = type("R", (), {"returncode": 1})() assert ping_host("192.168.1.50") is False @patch("vigilar.presence.monitor.subprocess.run") def test_ping_timeout(self, mock_run): import subprocess as sp from vigilar.presence.monitor import ping_host mock_run.side_effect = sp.TimeoutExpired(cmd="ping", timeout=2) assert ping_host("192.168.1.50") is False class TestDepartureDelay: def test_not_departed_within_delay(self): from vigilar.presence.monitor import should_mark_away last_seen = time.monotonic() - 300 # 5 min ago assert should_mark_away(last_seen, departure_delay_m=10) is False def test_departed_after_delay(self): from vigilar.presence.monitor import should_mark_away last_seen = time.monotonic() - 700 # 11+ min ago assert should_mark_away(last_seen, departure_delay_m=10) is True ``` - [ ] **Step 3: Run tests to verify they fail** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v` Expected: FAIL (modules don't exist yet) - [ ] **Step 4: Implement presence models** Create `vigilar/presence/__init__.py` (empty). Create `vigilar/presence/models.py`: ```python """Presence detection data models.""" from dataclasses import dataclass from vigilar.constants import HouseholdState @dataclass class MemberPresence: name: str role: str # "adult" | "child" is_home: bool last_seen: float # monotonic timestamp of last successful ping def derive_household_state(members: list[MemberPresence]) -> HouseholdState: if not members: return HouseholdState.EMPTY home = [m for m in members if m.is_home] if not home: return HouseholdState.EMPTY all_home = len(home) == len(members) adults_home = any(m.role == "adult" for m in home) kids_home = any(m.role == "child" for m in home) if all_home: return HouseholdState.ALL_HOME if adults_home: return HouseholdState.ADULTS_HOME if kids_home: return HouseholdState.KIDS_HOME return HouseholdState.EMPTY ``` - [ ] **Step 5: Implement presence monitor** Create `vigilar/presence/monitor.py`: ```python """Presence monitor — pings family phones to determine who's home.""" import logging import signal import subprocess import time from vigilar.bus import MessageBus from vigilar.config import VigilarConfig from vigilar.constants import HouseholdState, Topics from vigilar.presence.models import MemberPresence, derive_household_state log = logging.getLogger(__name__) def ping_host(ip: str, timeout_s: int = 2) -> bool: try: result = subprocess.run( ["ping", "-c", "1", "-W", str(timeout_s), ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=timeout_s + 1, ) return result.returncode == 0 except subprocess.TimeoutExpired: return False except FileNotFoundError: log.error("ping command not found") return False def should_mark_away(last_seen: float, departure_delay_m: int) -> bool: elapsed_m = (time.monotonic() - last_seen) / 60 return elapsed_m >= departure_delay_m class PresenceMonitor: def __init__(self, config: VigilarConfig): self._cfg = config.presence self._mqtt_cfg = config.mqtt self._members: list[MemberPresence] = [] self._last_household_state = HouseholdState.EMPTY self._bus: MessageBus | None = None for m in self._cfg.members: self._members.append(MemberPresence( name=m.name, role=m.role, is_home=False, last_seen=0, )) def run(self) -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s [presence] %(levelname)s: %(message)s", ) self._bus = MessageBus(self._mqtt_cfg, client_id="presence-monitor") self._bus.connect() shutdown = False def handle_signal(signum, frame): nonlocal shutdown shutdown = True signal.signal(signal.SIGTERM, handle_signal) log.info("Presence monitor started, tracking %d members", len(self._members)) while not shutdown: self._poll_once() for _ in range(self._cfg.ping_interval_s): if shutdown: break time.sleep(1) if self._bus: self._bus.disconnect() log.info("Presence monitor stopped") def _poll_once(self) -> None: for member in self._members: reachable = ping_host(member.ip) if reachable: member.is_home = True member.last_seen = time.monotonic() elif member.is_home: if should_mark_away(member.last_seen, self._cfg.departure_delay_m): member.is_home = False log.info("%s departed", member.name) if self._bus: self._bus.publish_event( Topics.presence_member(member.name), state="HOME" if member.is_home else "AWAY", name=member.name, role=member.role, ) new_state = derive_household_state(self._members) if new_state != self._last_household_state: log.info("Household state: %s -> %s", self._last_household_state, new_state) self._last_household_state = new_state if self._bus: members_dict = {m.name: "HOME" if m.is_home else "AWAY" for m in self._members} self._bus.publish_event( Topics.PRESENCE_STATUS, household=new_state.value, members=members_dict, ) def run_presence_monitor(config: VigilarConfig) -> None: monitor = PresenceMonitor(config) monitor.run() ``` - [ ] **Step 6: Wire into supervisor** In `vigilar/main.py`, add after the UPS monitor block: ```python # Presence monitor if cfg.presence.enabled and cfg.presence.members: from vigilar.presence.monitor import run_presence_monitor subsystems.append(SubsystemProcess("presence-monitor", run_presence_monitor, (cfg,))) ``` - [ ] **Step 7: Run tests to verify they pass** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v` Expected: All 9 tests PASS - [ ] **Step 8: Run full suite** Run: `source .venv/bin/activate && python -m pytest tests/ -v` Expected: All tests PASS (96 existing + 9 new = 105) - [ ] **Step 9: Commit** ```bash git add vigilar/presence/ vigilar/constants.py vigilar/config.py vigilar/main.py tests/unit/test_presence.py git commit -m "feat: add multi-person presence detection Track family phones via ping, derive household state (EMPTY/KIDS_HOME/ADULTS_HOME/ALL_HOME), publish via MQTT. Configurable departure delay, per-member roles, auto-arm actions." ``` --- ## Task 2: Person + Vehicle Detection **Files:** - Create: `vigilar/detection/__init__.py` - Create: `vigilar/detection/person.py` - Create: `vigilar/detection/vehicle.py` - Create: `vigilar/detection/zones.py` - Create: `vigilar/cli/cmd_calibrate.py` - Create: `scripts/download_model.sh` - Modify: `vigilar/constants.py` (add detection event types) - Modify: `vigilar/config.py` (add DetectionConfig, CameraZone, VehicleConfig) - Modify: `vigilar/storage/schema.py` (add detection_type, starred columns to recordings) - Modify: `vigilar/camera/worker.py` (add detection stage after motion) - Modify: `vigilar/cli/main.py` (register calibrate command) - Test: `tests/unit/test_detection.py` - [ ] **Step 1: Add constants and config** In `vigilar/constants.py`, add to `EventType`: ```python PERSON_DETECTED = "PERSON_DETECTED" VEHICLE_DETECTED = "VEHICLE_DETECTED" KNOWN_VEHICLE_ARRIVED = "KNOWN_VEHICLE_ARRIVED" UNKNOWN_VEHICLE_DETECTED = "UNKNOWN_VEHICLE_DETECTED" ``` Add to `RecordingTrigger`: ```python PERSON = "PERSON" VEHICLE = "VEHICLE" ``` In `vigilar/config.py`, add before `# --- Presence Config ---`: ```python # --- Detection Config --- class CameraZone(BaseModel): name: str region: list[int] = Field(default_factory=list) # [x, y, w, h] watch_for: list[str] = Field(default_factory=lambda: ["person", "vehicle"]) alert_unknown_vehicles: bool = False class DetectionConfig(BaseModel): person_detection: bool = False model_path: str = "/var/vigilar/models/mobilenet_ssd_v2.pb" model_config: str = "/var/vigilar/models/mobilenet_ssd_v2.pbtxt" confidence_threshold: float = 0.5 cameras: list[str] = Field(default_factory=list) # empty = all class KnownVehicle(BaseModel): name: str color_profile: str = "" # white, black, silver, red, blue, etc. size_class: str = "" # compact, midsize, large calibration_file: str = "" class VehicleConfig(BaseModel): known: list[KnownVehicle] = Field(default_factory=list) ``` Add `zones: list[CameraZone] = Field(default_factory=list)` to `CameraConfig`. Add to `VigilarConfig`: ```python detection: DetectionConfig = Field(default_factory=DetectionConfig) vehicles: VehicleConfig = Field(default_factory=VehicleConfig) ``` - [ ] **Step 2: Update schema** In `vigilar/storage/schema.py`, add two columns to the `recordings` table: ```python Column("detection_type", String), # person, vehicle, motion, None Column("starred", Integer, nullable=False, default=0), ``` Add after the `Column("thumbnail_path", ...)` line and before the closing `)`. - [ ] **Step 3: Write failing tests** Create `tests/unit/test_detection.py`: ```python """Tests for person and vehicle detection.""" import numpy as np from vigilar.detection.person import Detection, PersonDetector from vigilar.detection.vehicle import classify_dominant_color, classify_size from vigilar.detection.zones import filter_detections_by_zone class TestPersonDetector: def test_detector_initializes_without_model(self): detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt") assert not detector.is_loaded def test_detect_returns_empty_when_not_loaded(self): detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt") frame = np.zeros((480, 640, 3), dtype=np.uint8) detections = detector.detect(frame) assert detections == [] def test_detection_dataclass(self): d = Detection(class_name="person", class_id=1, confidence=0.85, bbox=(10, 20, 100, 200)) assert d.class_name == "person" assert d.confidence == 0.85 class TestVehicleColor: def test_white_detection(self): white_region = np.full((100, 100, 3), 240, dtype=np.uint8) color = classify_dominant_color(white_region) assert color == "white" def test_black_detection(self): black_region = np.full((100, 100, 3), 15, dtype=np.uint8) color = classify_dominant_color(black_region) assert color == "black" def test_size_compact(self): assert classify_size(bbox_area=5000, zone_area=100000) == "compact" def test_size_midsize(self): assert classify_size(bbox_area=15000, zone_area=100000) == "midsize" def test_size_large(self): assert classify_size(bbox_area=30000, zone_area=100000) == "large" class TestZoneFiltering: def test_detection_inside_zone(self): detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))] zone_region = (0, 0, 200, 200) filtered = filter_detections_by_zone(detections, zone_region, ["person"]) assert len(filtered) == 1 def test_detection_outside_zone(self): detections = [Detection("person", 1, 0.9, (300, 300, 50, 50))] zone_region = (0, 0, 200, 200) filtered = filter_detections_by_zone(detections, zone_region, ["person"]) assert len(filtered) == 0 def test_filter_by_class(self): detections = [ Detection("person", 1, 0.9, (50, 50, 80, 80)), Detection("car", 3, 0.8, (50, 50, 80, 80)), ] zone_region = (0, 0, 200, 200) filtered = filter_detections_by_zone(detections, zone_region, ["person"]) assert len(filtered) == 1 assert filtered[0].class_name == "person" def test_no_zone_returns_all(self): detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))] filtered = filter_detections_by_zone(detections, None, ["person", "car"]) assert len(filtered) == 1 ``` - [ ] **Step 4: Run tests to verify they fail** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_detection.py -v` Expected: FAIL - [ ] **Step 5: Implement person detector** Create `vigilar/detection/__init__.py` (empty). Create `vigilar/detection/person.py`: ```python """Person detection using MobileNet-SSD v2 via OpenCV DNN.""" import logging from dataclasses import dataclass from pathlib import Path import cv2 import numpy as np log = logging.getLogger(__name__) COCO_CLASSES = { 1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane", 6: "bus", 7: "train", 8: "truck", 9: "boat", } DETECT_CLASSES = {1, 3, 8} # person, car, truck @dataclass class Detection: class_name: str class_id: int confidence: float bbox: tuple[int, int, int, int] # x, y, w, h class PersonDetector: def __init__(self, model_path: str, config_path: str, confidence_threshold: float = 0.5): self._threshold = confidence_threshold self._net = None self.is_loaded = False if Path(model_path).exists() and Path(config_path).exists(): try: self._net = cv2.dnn.readNetFromTensorflow(model_path, config_path) self.is_loaded = True log.info("Person detection model loaded from %s", model_path) except cv2.error as e: log.error("Failed to load detection model: %s", e) else: log.warning("Detection model not found at %s — detection disabled", model_path) def detect(self, frame: np.ndarray) -> list[Detection]: if not self.is_loaded or self._net is None: return [] h, w = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, size=(300, 300), swapRB=True, crop=False) self._net.setInput(blob) output = self._net.forward() detections = [] for i in range(output.shape[2]): confidence = output[0, 0, i, 2] if confidence < self._threshold: continue class_id = int(output[0, 0, i, 1]) if class_id not in DETECT_CLASSES: continue x1 = int(output[0, 0, i, 3] * w) y1 = int(output[0, 0, i, 4] * h) x2 = int(output[0, 0, i, 5] * w) y2 = int(output[0, 0, i, 6] * h) bw, bh = x2 - x1, y2 - y1 if bw <= 0 or bh <= 0: continue detections.append(Detection( class_name=COCO_CLASSES.get(class_id, f"class_{class_id}"), class_id=class_id, confidence=float(confidence), bbox=(x1, y1, bw, bh), )) return detections ``` - [ ] **Step 6: Implement vehicle fingerprinting** Create `vigilar/detection/vehicle.py`: ```python """Vehicle color and size fingerprinting.""" import logging from dataclasses import dataclass from pathlib import Path import cv2 import numpy as np log = logging.getLogger(__name__) @dataclass class VehicleProfile: name: str color_profile: str size_class: str histogram: np.ndarray | None = None def classify_dominant_color(region: np.ndarray) -> str: hsv = cv2.cvtColor(region, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv) mean_s = float(np.mean(s)) mean_v = float(np.mean(v)) if mean_v < 40: return "black" if mean_s < 30 and mean_v > 180: return "white" if mean_s < 40: return "silver" mean_h = float(np.mean(h)) if mean_h < 10 or mean_h > 170: return "red" if 10 <= mean_h < 25: return "orange" if 25 <= mean_h < 35: return "yellow" if 35 <= mean_h < 85: return "green" if 85 <= mean_h < 130: return "blue" return "unknown" def classify_size(bbox_area: int, zone_area: int) -> str: if zone_area <= 0: return "unknown" ratio = bbox_area / zone_area if ratio < 0.1: return "compact" if ratio < 0.25: return "midsize" return "large" class VehicleFingerprint: def __init__(self, known_vehicles: list[VehicleProfile] | None = None): self._known = known_vehicles or [] def match(self, region: np.ndarray, bbox_area: int, zone_area: int) -> VehicleProfile | None: color = classify_dominant_color(region) size = classify_size(bbox_area, zone_area) for profile in self._known: if profile.color_profile == color and profile.size_class == size: return profile return None def add_profile(self, profile: VehicleProfile) -> None: self._known.append(profile) @property def known_count(self) -> int: return len(self._known) ``` - [ ] **Step 7: Implement zone filtering** Create `vigilar/detection/zones.py`: ```python """Zone-based detection filtering.""" from vigilar.detection.person import Detection def _bbox_center(bbox: tuple[int, int, int, int]) -> tuple[int, int]: x, y, w, h = bbox return x + w // 2, y + h // 2 def _point_in_rect(point: tuple[int, int], rect: tuple[int, int, int, int]) -> bool: px, py = point rx, ry, rw, rh = rect return rx <= px <= rx + rw and ry <= py <= ry + rh def filter_detections_by_zone( detections: list[Detection], zone_region: tuple[int, int, int, int] | None, watch_for: list[str], ) -> list[Detection]: result = [] for d in detections: if d.class_name not in watch_for: continue if zone_region is not None: center = _bbox_center(d.bbox) if not _point_in_rect(center, zone_region): continue result.append(d) return result ``` - [ ] **Step 8: Create model download script** Create `scripts/download_model.sh`: ```bash #!/usr/bin/env bash set -euo pipefail MODEL_DIR="${1:-/var/vigilar/models}" mkdir -p "$MODEL_DIR" PB_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pb" PBTXT_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pbtxt" echo "Downloading MobileNet-SSD v2 model..." if command -v curl &>/dev/null; then curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL" curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL" elif command -v wget &>/dev/null; then wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL" wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL" else echo "ERROR: curl or wget required" exit 1 fi echo "Model downloaded to $MODEL_DIR" ls -lh "$MODEL_DIR"/mobilenet_ssd_v2.* ``` Run: `chmod +x scripts/download_model.sh` - [ ] **Step 9: Wire detection into camera worker** In `vigilar/camera/worker.py`, add import at the top: ```python from vigilar.detection.person import PersonDetector from vigilar.detection.vehicle import VehicleFingerprint, VehicleProfile, classify_dominant_color, classify_size from vigilar.detection.zones import filter_detections_by_zone from vigilar.config import CameraConfig, DetectionConfig, MQTTConfig, RemoteConfig, VehicleConfig ``` After the `hls` and `remote_hls` initialization, add: ```python # Person/vehicle detector (second-stage after MOG2) person_detector: PersonDetector | None = None vehicle_fp: VehicleFingerprint | None = None det_cfg = None # will be passed as new parameter if det_cfg and det_cfg.person_detection: if not det_cfg.cameras or camera_id in det_cfg.cameras: person_detector = PersonDetector( det_cfg.model_path, det_cfg.model_config, det_cfg.confidence_threshold, ) if veh_cfg and veh_cfg.known: profiles = [VehicleProfile(v.name, v.color_profile, v.size_class) for v in veh_cfg.known] vehicle_fp = VehicleFingerprint(profiles) ``` In the motion detection block, after `bus.publish_event(Topics.camera_motion_start(...))`, add the detection second-stage: ```python # Second-stage: person/vehicle detection detection_type = "motion" if person_detector and person_detector.is_loaded: detections = person_detector.detect(frame) persons = [d for d in detections if d.class_name == "person"] vehicles = [d for d in detections if d.class_name in ("car", "truck")] if persons: detection_type = "person" bus.publish_event( f"vigilar/camera/{camera_id}/person", count=len(persons), confidence=max(d.confidence for d in persons), ) elif vehicles: detection_type = "vehicle" # Vehicle fingerprinting if vehicle_fp: for vd in vehicles: x, y, w, h = vd.bbox crop = frame[max(0,y):y+h, max(0,x):x+w] if crop.size > 0: match = vehicle_fp.match(crop, w*h, frame.shape[0]*frame.shape[1]) if match: bus.publish_event( f"vigilar/camera/{camera_id}/vehicle/known", vehicle=match.name, ) else: bus.publish_event( f"vigilar/camera/{camera_id}/vehicle/unknown", color=classify_dominant_color(crop), ) else: bus.publish_event( f"vigilar/camera/{camera_id}/vehicle", count=len(vehicles), ) ``` Update the `run_camera_worker` function signature to accept `det_cfg` and `veh_cfg` parameters. Update `camera/manager.py` to pass them. - [ ] **Step 10: Run tests** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_detection.py tests/ -v` Expected: All PASS - [ ] **Step 11: Commit** ```bash git add vigilar/detection/ vigilar/constants.py vigilar/config.py vigilar/storage/schema.py vigilar/camera/worker.py vigilar/camera/manager.py scripts/download_model.sh tests/unit/test_detection.py git commit -m "feat: add person/vehicle detection with zone fencing MobileNet-SSD v2 via OpenCV DNN as second-stage filter after MOG2. Vehicle color/size fingerprinting for known car matching. Zone-based filtering per camera. Model download script." ``` --- ## Task 3: Health Monitoring + Self-Healing **Files:** - Create: `vigilar/health/__init__.py` - Create: `vigilar/health/monitor.py` - Create: `vigilar/health/pruner.py` - Create: `vigilar/health/digest.py` - Modify: `vigilar/config.py` (add HealthConfig) - Modify: `vigilar/main.py` (start HealthMonitor) - Test: `tests/unit/test_health.py` - [ ] **Step 1: Add config** In `vigilar/config.py`, add before `# --- Rule Config ---`: ```python # --- Health Config --- class HealthConfig(BaseModel): enabled: bool = True disk_warn_pct: int = 85 disk_critical_pct: int = 95 auto_prune: bool = True auto_prune_target_pct: int = 80 daily_digest: bool = True daily_digest_time: str = "08:00" ``` Add `health: HealthConfig = Field(default_factory=HealthConfig)` to `VigilarConfig`. - [ ] **Step 2: Write failing tests** Create `tests/unit/test_health.py`: ```python """Tests for health monitoring.""" import shutil import tempfile from pathlib import Path from unittest.mock import patch from vigilar.health.pruner import find_prunable_recordings, calculate_disk_usage_pct from vigilar.health.monitor import HealthCheck, HealthStatus, check_disk, check_mqtt_port class TestDiskCheck: @patch("vigilar.health.monitor.shutil.disk_usage") def test_healthy(self, mock_usage): mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 50_000_000_000, "free": 50_000_000_000})() result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95) assert result.status == HealthStatus.HEALTHY @patch("vigilar.health.monitor.shutil.disk_usage") def test_warning(self, mock_usage): mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 90_000_000_000, "free": 10_000_000_000})() result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95) assert result.status == HealthStatus.WARNING @patch("vigilar.health.monitor.shutil.disk_usage") def test_critical(self, mock_usage): mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 97_000_000_000, "free": 3_000_000_000})() result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95) assert result.status == HealthStatus.CRITICAL class TestPruner: def test_calculate_disk_pct(self): with tempfile.TemporaryDirectory() as d: pct = calculate_disk_usage_pct(d) assert 0 <= pct <= 100 def test_find_prunable_empty(self, test_db): result = find_prunable_recordings(test_db, limit=10) assert result == [] class TestMQTTCheck: @patch("vigilar.health.monitor.socket.create_connection") def test_mqtt_reachable(self, mock_conn): mock_conn.return_value.__enter__ = lambda s: s mock_conn.return_value.__exit__ = lambda s, *a: None result = check_mqtt_port("127.0.0.1", 1883) assert result.status == HealthStatus.HEALTHY @patch("vigilar.health.monitor.socket.create_connection") def test_mqtt_unreachable(self, mock_conn): mock_conn.side_effect = ConnectionRefusedError() result = check_mqtt_port("127.0.0.1", 1883) assert result.status == HealthStatus.CRITICAL ``` - [ ] **Step 3: Run tests to verify they fail** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_health.py -v` Expected: FAIL - [ ] **Step 4: Implement health monitor** Create `vigilar/health/__init__.py` (empty). Create `vigilar/health/monitor.py`: ```python """Health monitoring — periodic subsystem checks.""" import logging import shutil import signal import socket import time from dataclasses import dataclass, field from enum import StrEnum from vigilar.bus import MessageBus from vigilar.config import VigilarConfig log = logging.getLogger(__name__) class HealthStatus(StrEnum): HEALTHY = "healthy" WARNING = "warning" CRITICAL = "critical" @dataclass class HealthCheck: name: str status: HealthStatus message: str = "" value: float = 0 def check_disk(path: str, warn_pct: int, critical_pct: int) -> HealthCheck: try: usage = shutil.disk_usage(path) pct = usage.used / usage.total * 100 if pct >= critical_pct: return HealthCheck("disk", HealthStatus.CRITICAL, f"{pct:.0f}% used", pct) if pct >= warn_pct: return HealthCheck("disk", HealthStatus.WARNING, f"{pct:.0f}% used", pct) return HealthCheck("disk", HealthStatus.HEALTHY, f"{pct:.0f}% used", pct) except OSError as e: return HealthCheck("disk", HealthStatus.CRITICAL, str(e)) def check_mqtt_port(host: str, port: int) -> HealthCheck: try: with socket.create_connection((host, port), timeout=5): return HealthCheck("mqtt", HealthStatus.HEALTHY, "reachable") except (ConnectionRefusedError, TimeoutError, OSError) as e: return HealthCheck("mqtt", HealthStatus.CRITICAL, str(e)) class HealthMonitor: def __init__(self, config: VigilarConfig): self._cfg = config self._bus: MessageBus | None = None self._checks: list[HealthCheck] = [] def run(self) -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s [health] %(levelname)s: %(message)s", ) self._bus = MessageBus(self._cfg.mqtt, client_id="health-monitor") self._bus.connect() shutdown = False def handle_signal(signum, frame): nonlocal shutdown shutdown = True signal.signal(signal.SIGTERM, handle_signal) log.info("Health monitor started") last_disk_check = 0 last_mqtt_check = 0 while not shutdown: now = time.monotonic() if now - last_disk_check >= 300: # 5 min disk = check_disk(self._cfg.system.data_dir, self._cfg.health.disk_warn_pct, self._cfg.health.disk_critical_pct) self._update_check(disk) last_disk_check = now if disk.status == HealthStatus.CRITICAL and self._cfg.health.auto_prune: from vigilar.health.pruner import auto_prune auto_prune(self._cfg) if now - last_mqtt_check >= 30: mqtt = check_mqtt_port(self._cfg.mqtt.host, self._cfg.mqtt.port) self._update_check(mqtt) last_mqtt_check = now self._publish_status() time.sleep(10) if self._bus: self._bus.disconnect() def _update_check(self, check: HealthCheck) -> None: for i, c in enumerate(self._checks): if c.name == check.name: self._checks[i] = check return self._checks.append(check) def _publish_status(self) -> None: if not self._bus: return overall = HealthStatus.HEALTHY for c in self._checks: if c.status == HealthStatus.CRITICAL: overall = HealthStatus.CRITICAL break if c.status == HealthStatus.WARNING: overall = HealthStatus.WARNING self._bus.publish_event( "vigilar/system/health", status=overall.value, checks={c.name: {"status": c.status.value, "message": c.message} for c in self._checks}, ) def run_health_monitor(config: VigilarConfig) -> None: monitor = HealthMonitor(config) monitor.run() ``` - [ ] **Step 5: Implement auto-pruner** Create `vigilar/health/pruner.py`: ```python """Auto-prune old recordings when disk usage is high.""" import logging import os import shutil from pathlib import Path from sqlalchemy import select from sqlalchemy.engine import Engine from vigilar.config import VigilarConfig from vigilar.storage.schema import recordings log = logging.getLogger(__name__) def calculate_disk_usage_pct(path: str) -> float: usage = shutil.disk_usage(path) return usage.used / usage.total * 100 def find_prunable_recordings(engine: Engine, limit: int = 50) -> list[dict]: query = ( select(recordings) .where(recordings.c.starred == 0) .order_by(recordings.c.started_at.asc()) .limit(limit) ) with engine.connect() as conn: return [dict(r) for r in conn.execute(query).mappings().all()] def auto_prune(config: VigilarConfig) -> int: from vigilar.storage.db import get_db_path, get_engine target = config.health.auto_prune_target_pct data_dir = config.system.data_dir current_pct = calculate_disk_usage_pct(data_dir) if current_pct <= target: return 0 engine = get_engine(get_db_path(data_dir)) pruned = 0 total_bytes = 0 while current_pct > target: candidates = find_prunable_recordings(engine, limit=20) if not candidates: break for rec in candidates: file_path = rec.get("file_path", "") if file_path and Path(file_path).exists(): size = Path(file_path).stat().st_size Path(file_path).unlink() total_bytes += size thumb = rec.get("thumbnail_path", "") if thumb and Path(thumb).exists(): Path(thumb).unlink() with engine.begin() as conn: conn.execute(recordings.delete().where(recordings.c.id == rec["id"])) pruned += 1 current_pct = calculate_disk_usage_pct(data_dir) if pruned: mb = total_bytes / (1024 * 1024) log.info("Auto-pruned %d recordings (%.1f MB), disk now at %.0f%%", pruned, mb, current_pct) return pruned ``` - [ ] **Step 6: Implement daily digest** Create `vigilar/health/digest.py`: ```python """Daily digest notification builder.""" import logging import shutil import time from sqlalchemy import func, select from sqlalchemy.engine import Engine from vigilar.constants import EventType from vigilar.storage.schema import events, recordings log = logging.getLogger(__name__) def build_digest(engine: Engine, data_dir: str, since_hours: int = 12) -> dict: since_ts = int((time.time() - since_hours * 3600) * 1000) with engine.connect() as conn: person_count = conn.execute( select(func.count()).select_from(events) .where(events.c.type == EventType.PERSON_DETECTED, events.c.ts >= since_ts) ).scalar() or 0 vehicle_count = conn.execute( select(func.count()).select_from(events) .where(events.c.type == EventType.UNKNOWN_VEHICLE_DETECTED, events.c.ts >= since_ts) ).scalar() or 0 recording_count = conn.execute( select(func.count()).select_from(recordings) .where(recordings.c.started_at >= since_ts // 1000) ).scalar() or 0 usage = shutil.disk_usage(data_dir) disk_pct = usage.used / usage.total * 100 disk_gb = usage.used / (1024**3) return { "person_detections": person_count, "unknown_vehicles": vehicle_count, "recordings": recording_count, "disk_used_gb": round(disk_gb, 1), "disk_used_pct": round(disk_pct, 0), "since_hours": since_hours, } def format_digest(data: dict) -> str: return ( f"Vigilar Daily Summary\n" f"Last {data['since_hours']}h: " f"{data['person_detections']} person detections, " f"{data['unknown_vehicles']} unknown vehicles, " f"{data['recordings']} recordings\n" f"Storage: {data['disk_used_gb']} GB ({data['disk_used_pct']:.0f}%)" ) ``` - [ ] **Step 7: Wire into supervisor** In `vigilar/main.py`, add after the presence monitor block: ```python # Health monitor if cfg.health.enabled: from vigilar.health.monitor import run_health_monitor subsystems.append(SubsystemProcess("health-monitor", run_health_monitor, (cfg,))) ``` - [ ] **Step 8: Run tests** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_health.py tests/ -v` Expected: All PASS - [ ] **Step 9: Commit** ```bash git add vigilar/health/ vigilar/config.py vigilar/main.py tests/unit/test_health.py git commit -m "feat: add health monitoring with auto-prune and daily digest Periodic disk, MQTT, and subsystem health checks. Auto-prune oldest non-starred recordings when disk exceeds threshold. Daily digest notification with overnight summary." ``` --- ## Task 4: Smart Alert Profiles **Files:** - Create: `vigilar/alerts/profiles.py` - Modify: `vigilar/config.py` (add AlertProfileConfig, AlertProfileRule) - Modify: `vigilar/alerts/dispatcher.py` (role-based routing) - Modify: `vigilar/web/templates/settings.html` (profile editor) - Modify: `vigilar/web/static/js/settings.js` (profile save/load) - Modify: `vigilar/web/blueprints/system.py` (profile API endpoints) - Test: `tests/unit/test_profiles.py` **Depends on:** Task 1 (HouseholdState) and Task 2 (detection event types) - [ ] **Step 1: Add config models** In `vigilar/config.py`, add before `# --- Health Config ---`: ```python # --- Alert Profile Config --- class AlertProfileRule(BaseModel): detection_type: str # person, unknown_vehicle, known_vehicle, motion camera_location: str = "any" # any | exterior | common_area | specific camera id action: str = "record_only" # push_and_record, push_adults, record_only, quiet_log recipients: str = "all" # all, adults, none class AlertProfileConfig(BaseModel): name: str enabled: bool = True presence_states: list[str] = Field(default_factory=list) time_window: str = "" # "" = all day, "23:00-06:00" = sleep hours rules: list[AlertProfileRule] = Field(default_factory=list) ``` Add `profiles: list[AlertProfileConfig] = Field(default_factory=list)` to `AlertsConfig`. Add `sleep_start: str = "23:00"` and `sleep_end: str = "06:00"` to `AlertsConfig`. - [ ] **Step 2: Write failing tests** Create `tests/unit/test_profiles.py`: ```python """Tests for smart alert profiles.""" from vigilar.alerts.profiles import match_profile, is_in_time_window, get_action_for_event from vigilar.config import AlertProfileConfig, AlertProfileRule from vigilar.constants import HouseholdState def _make_profile(name, states, time_window="", rules=None): return AlertProfileConfig( name=name, presence_states=states, time_window=time_window, rules=rules or [], ) class TestTimeWindow: def test_no_window_always_matches(self): assert is_in_time_window("", "14:30") is True def test_inside_window(self): assert is_in_time_window("23:00-06:00", "02:30") is True def test_outside_window(self): assert is_in_time_window("23:00-06:00", "14:30") is False def test_at_start(self): assert is_in_time_window("23:00-06:00", "23:00") is True def test_daytime_window(self): assert is_in_time_window("06:00-23:00", "14:30") is True class TestProfileMatching: def test_match_by_presence(self): profiles = [ _make_profile("Away", ["EMPTY"]), _make_profile("Home", ["ADULTS_HOME", "ALL_HOME"]), ] result = match_profile(profiles, HouseholdState.EMPTY, "14:00") assert result is not None assert result.name == "Away" def test_no_match(self): profiles = [_make_profile("Away", ["EMPTY"])] result = match_profile(profiles, HouseholdState.ALL_HOME, "14:00") assert result is None def test_time_window_narrows(self): profiles = [ _make_profile("Night", ["ALL_HOME"], "23:00-06:00"), _make_profile("Day", ["ALL_HOME"], "06:00-23:00"), ] result = match_profile(profiles, HouseholdState.ALL_HOME, "02:00") assert result is not None assert result.name == "Night" class TestActionForEvent: def test_person_gets_push(self): rules = [AlertProfileRule(detection_type="person", action="push_and_record", recipients="all")] profile = _make_profile("Away", ["EMPTY"], rules=rules) action, recipients = get_action_for_event(profile, "person", "front_door") assert action == "push_and_record" assert recipients == "all" def test_motion_gets_record_only(self): rules = [AlertProfileRule(detection_type="motion", action="record_only", recipients="none")] profile = _make_profile("Home", ["ALL_HOME"], rules=rules) action, recipients = get_action_for_event(profile, "motion", "front_door") assert action == "record_only" def test_no_matching_rule_defaults_quiet(self): profile = _make_profile("Home", ["ALL_HOME"], rules=[]) action, recipients = get_action_for_event(profile, "person", "front_door") assert action == "quiet_log" assert recipients == "none" ``` - [ ] **Step 3: Run tests to verify fail** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v` Expected: FAIL - [ ] **Step 4: Implement profile engine** Create `vigilar/alerts/profiles.py`: ```python """Smart alert profile matching engine.""" import logging from datetime import datetime from vigilar.config import AlertProfileConfig, AlertProfileRule from vigilar.constants import HouseholdState log = logging.getLogger(__name__) def is_in_time_window(window: str, current_time: str) -> bool: if not window: return True parts = window.split("-") if len(parts) != 2: return True start, end = parts[0].strip(), parts[1].strip() t = current_time if start <= end: return start <= t < end else: return t >= start or t < end def match_profile( profiles: list[AlertProfileConfig], household_state: HouseholdState, current_time: str, ) -> AlertProfileConfig | None: for profile in profiles: if not profile.enabled: continue if household_state.value not in profile.presence_states: continue if not is_in_time_window(profile.time_window, current_time): continue return profile return None def get_action_for_event( profile: AlertProfileConfig, detection_type: str, camera_id: str, ) -> tuple[str, str]: for rule in profile.rules: if rule.detection_type != detection_type: continue if rule.camera_location not in ("any", camera_id): continue return rule.action, rule.recipients return "quiet_log", "none" ``` - [ ] **Step 5: Run tests** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v` Expected: All 10 tests PASS - [ ] **Step 6: Run full suite** Run: `source .venv/bin/activate && python -m pytest tests/ -v` Expected: All PASS - [ ] **Step 7: Commit** ```bash git add vigilar/alerts/profiles.py vigilar/config.py tests/unit/test_profiles.py git commit -m "feat: add smart alert profiles with presence/time matching Profiles activate based on household state + time of day. Per-detection-type rules control push/record/quiet behavior. Role-based recipient filtering (all vs adults-only)." ``` --- ## Task 5: Recording Timeline **Files:** - Create: `vigilar/web/static/js/timeline.js` - Modify: `vigilar/web/blueprints/recordings.py` (add timeline API) - Modify: `vigilar/web/templates/recordings.html` (timeline UI) - Modify: `vigilar/storage/queries.py` (add get_timeline_data) - Test: `tests/unit/test_timeline.py` **Depends on:** Task 2 (detection_type column in recordings) - [ ] **Step 1: Add timeline query** In `vigilar/storage/queries.py`, add: ```python def get_timeline_data( engine: Engine, camera_id: str, day_start_ts: int, day_end_ts: int, ) -> list[dict[str, Any]]: query = ( select( recordings.c.id, recordings.c.started_at, recordings.c.ended_at, recordings.c.detection_type, recordings.c.starred, ) .where( recordings.c.camera_id == camera_id, recordings.c.started_at >= day_start_ts, recordings.c.started_at < day_end_ts, ) .order_by(recordings.c.started_at.asc()) ) with engine.connect() as conn: return [dict(r) for r in conn.execute(query).mappings().all()] ``` - [ ] **Step 2: Write failing test** Create `tests/unit/test_timeline.py`: ```python """Tests for recording timeline.""" import time from vigilar.storage.queries import get_timeline_data, insert_recording class TestTimelineQuery: def test_returns_recordings_for_day(self, test_db): now = int(time.time()) insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60, file_path="/tmp/a.mp4", trigger="MOTION", detection_type="person") insert_recording(test_db, camera_id="cam1", started_at=now+100, ended_at=now+130, file_path="/tmp/b.mp4", trigger="MOTION", detection_type="motion") results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600) assert len(results) == 2 assert results[0]["detection_type"] == "person" assert results[1]["detection_type"] == "motion" def test_filters_by_camera(self, test_db): now = int(time.time()) insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60, file_path="/tmp/a.mp4", trigger="MOTION") insert_recording(test_db, camera_id="cam2", started_at=now, ended_at=now+60, file_path="/tmp/b.mp4", trigger="MOTION") results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600) assert len(results) == 1 def test_filters_by_time_range(self, test_db): now = int(time.time()) insert_recording(test_db, camera_id="cam1", started_at=now - 7200, ended_at=now - 7100, file_path="/tmp/old.mp4", trigger="MOTION") insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60, file_path="/tmp/new.mp4", trigger="MOTION") results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600) assert len(results) == 1 ``` - [ ] **Step 3: Run test to verify fail** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py -v` Expected: FAIL - [ ] **Step 4: Add timeline API endpoint** In `vigilar/web/blueprints/recordings.py`, add: ```python import time as time_mod from datetime import datetime, timedelta @recordings_bp.route("/api/timeline") def timeline_api(): """JSON API: timeline data for a camera on a given day.""" camera_id = request.args.get("camera") date_str = request.args.get("date") # YYYY-MM-DD if not camera_id: return jsonify({"error": "camera required"}), 400 if date_str: day = datetime.strptime(date_str, "%Y-%m-%d") else: day = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) day_start = int(day.timestamp()) day_end = int((day + timedelta(days=1)).timestamp()) engine = current_app.config.get("DB_ENGINE") if engine is None: return jsonify([]) from vigilar.storage.queries import get_timeline_data data = get_timeline_data(engine, camera_id, day_start, day_end) return jsonify([ { "id": r["id"], "start": r["started_at"], "end": r["ended_at"], "type": r.get("detection_type", "motion"), "starred": bool(r.get("starred", 0)), } for r in data ]) ``` - [ ] **Step 5: Create timeline.js** Create `vigilar/web/static/js/timeline.js`: ```javascript /* Vigilar — Recording timeline renderer */ (function() { 'use strict'; const COLORS = { person: '#dc3545', vehicle: '#0d6efd', motion: '#6c757d', default: '#6c757d', }; class Timeline { constructor(canvas, options = {}) { this.canvas = canvas; this.ctx = canvas.getContext('2d'); this.segments = []; this.cameraId = options.cameraId || ''; this.date = options.date || new Date().toISOString().split('T')[0]; this.onClick = options.onClick || null; this.dayStartTs = 0; this.dayEndTs = 0; this._resize(); this._bindEvents(); } _resize() { const rect = this.canvas.parentElement.getBoundingClientRect(); this.canvas.width = rect.width; this.canvas.height = this.canvas.dataset.height ? parseInt(this.canvas.dataset.height) : 48; } _bindEvents() { this.canvas.addEventListener('click', (e) => this._handleClick(e)); window.addEventListener('resize', () => { this._resize(); this.render(); }); } async load() { const resp = await fetch(`/recordings/api/timeline?camera=${this.cameraId}&date=${this.date}`); if (!resp.ok) return; this.segments = await resp.json(); const d = new Date(this.date + 'T00:00:00'); this.dayStartTs = d.getTime() / 1000; this.dayEndTs = this.dayStartTs + 86400; this.render(); } render() { const w = this.canvas.width; const h = this.canvas.height; const ctx = this.ctx; const dur = this.dayEndTs - this.dayStartTs; ctx.clearRect(0, 0, w, h); ctx.fillStyle = '#161b22'; ctx.fillRect(0, 0, w, h); for (const seg of this.segments) { const x1 = ((seg.start - this.dayStartTs) / dur) * w; const x2 = seg.end ? ((seg.end - this.dayStartTs) / dur) * w : x1 + 2; const segW = Math.max(x2 - x1, 2); ctx.fillStyle = COLORS[seg.type] || COLORS.default; ctx.fillRect(x1, 4, segW, h - 8); } // Hour markers ctx.fillStyle = '#30363d'; ctx.font = '10px sans-serif'; for (let hour = 0; hour <= 24; hour += 6) { const x = (hour / 24) * w; ctx.fillRect(x, 0, 1, h); if (hour < 24) { ctx.fillStyle = '#8b949e'; ctx.fillText(`${hour}:00`, x + 3, h - 2); ctx.fillStyle = '#30363d'; } } } _handleClick(e) { const rect = this.canvas.getBoundingClientRect(); const x = e.clientX - rect.left; const pct = x / this.canvas.width; const ts = this.dayStartTs + pct * 86400; const clicked = this.segments.find(s => ts >= s.start && ts <= (s.end || s.start + 10)); if (clicked && this.onClick) { this.onClick(clicked); } } } window.VigilarTimeline = Timeline; })(); ``` - [ ] **Step 6: Update recordings template** Replace the table in `vigilar/web/templates/recordings.html` with the timeline view. Add a `` per camera, date picker, filter buttons, and a video player area. Use the `VigilarTimeline` class to render each camera's timeline and handle click-to-play. - [ ] **Step 7: Run tests** Run: `source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py tests/ -v` Expected: All PASS - [ ] **Step 8: Commit** ```bash git add vigilar/web/static/js/timeline.js vigilar/web/blueprints/recordings.py vigilar/web/templates/recordings.html vigilar/storage/queries.py tests/unit/test_timeline.py git commit -m "feat: add recording timeline with color-coded detection types Canvas-based 24h timeline per camera. Color-coded segments for person (red), vehicle (blue), motion (gray). Click to play. Date picker, filter buttons, hour markers." ``` --- ## Final: Integration Verification - [ ] **Step 1: Run full test suite** ```bash source .venv/bin/activate && python -m pytest tests/ -v ``` Expected: All tests pass (96 original + ~35 new = ~131 total) - [ ] **Step 2: Verify all web routes** ```bash source .venv/bin/activate && python -c " from vigilar.config import load_config from vigilar.web.app import create_app cfg = load_config() app = create_app(cfg) with app.test_client() as c: for p in ['/', '/system/settings', '/kiosk/', '/events/', '/sensors/', '/recordings/', '/cameras/']: r = c.get(p) print(f'{p}: {r.status_code}') " ``` Expected: All 200 - [ ] **Step 3: Commit and push** ```bash git push ```