vigilar/docs/superpowers/plans/2026-04-02-daily-use-features.md
Aaron D. Lee 8a65ac8c69 Add implementation plan for daily use features
5-task plan covering presence detection, person/vehicle AI detection,
smart alert profiles, recording timeline UI, and health monitoring.
Tasks 1-3 parallelizable, 4 depends on 1+2, 5 depends on 2.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 00:01:53 -04:00

56 KiB

Daily Use Features Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add presence detection, person/vehicle AI detection, smart alert profiles, recording timeline, and health monitoring to make Vigilar a system a household relies on daily.

Architecture: Five independent modules (presence/, detection/, health/, alerts/profiles.py, web/static/js/timeline.js) that communicate via the existing MQTT bus and SQLite storage. The detection module hooks into the camera worker's existing motion pipeline. Alert profiles consume events from both presence and detection to route notifications by role and time of day.

Tech Stack: Python 3.11+, OpenCV DNN (MobileNet-SSD v2), Flask/Jinja2, Bootstrap 5, Canvas API for timeline, subprocess ping for presence, existing paho-mqtt bus.

Dependency graph:

Task 1 (Presence)  ──┐
Task 2 (Detection) ──┼──→ Task 4 (Alert Profiles)
Task 3 (Health)    ──┘
Task 5 (Timeline)  ──────→ (after Task 2 schema change)

Tasks 1, 2, 3 are fully parallelizable. Task 4 after 1+2. Task 5 after 2.


Task 1: Multi-Person Presence Detection

Files:

  • Create: vigilar/presence/__init__.py

  • Create: vigilar/presence/models.py

  • Create: vigilar/presence/monitor.py

  • Modify: vigilar/config.py (add PresenceConfig, PresenceMember)

  • Modify: vigilar/constants.py (add HouseholdState enum, presence Topics)

  • Modify: vigilar/main.py (start PresenceMonitor subprocess)

  • Test: tests/unit/test_presence.py

  • Step 1: Add constants and config models

In vigilar/constants.py, add after UPSStatus:

# --- Household Presence ---

class HouseholdState(StrEnum):
    EMPTY = "EMPTY"
    KIDS_HOME = "KIDS_HOME"
    ADULTS_HOME = "ADULTS_HOME"
    ALL_HOME = "ALL_HOME"

Add to Topics class:

    # Presence
    @staticmethod
    def presence_member(name: str) -> str:
        return f"vigilar/presence/{name}"

    PRESENCE_STATUS = "vigilar/presence/status"

In vigilar/config.py, add before # --- Rule Config ---:

# --- Presence Config ---

class PresenceMember(BaseModel):
    name: str
    ip: str
    role: str = "adult"  # adult | child

class PresenceConfig(BaseModel):
    enabled: bool = False
    ping_interval_s: int = 30
    departure_delay_m: int = 10
    method: str = "icmp"  # icmp | arping
    members: list[PresenceMember] = Field(default_factory=list)
    actions: dict[str, str] = Field(default_factory=lambda: {
        "EMPTY": "ARMED_AWAY",
        "ADULTS_HOME": "DISARMED",
        "KIDS_HOME": "ARMED_HOME",
        "ALL_HOME": "DISARMED",
    })

Add presence: PresenceConfig = Field(default_factory=PresenceConfig) to VigilarConfig after remote.

  • Step 2: Write failing tests

Create tests/unit/test_presence.py:

"""Tests for presence detection."""

import time
from unittest.mock import patch

from vigilar.config import PresenceConfig, PresenceMember
from vigilar.constants import HouseholdState
from vigilar.presence.models import MemberPresence, derive_household_state


class TestDeriveHouseholdState:
    def test_empty_when_no_members_home(self):
        members = [
            MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0),
            MemberPresence(name="Mom", role="adult", is_home=False, last_seen=0),
        ]
        assert derive_household_state(members) == HouseholdState.EMPTY

    def test_all_home(self):
        members = [
            MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0),
            MemberPresence(name="Mom", role="adult", is_home=True, last_seen=0),
            MemberPresence(name="Kid", role="child", is_home=True, last_seen=0),
        ]
        assert derive_household_state(members) == HouseholdState.ALL_HOME

    def test_kids_home_only(self):
        members = [
            MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0),
            MemberPresence(name="Kid", role="child", is_home=True, last_seen=0),
        ]
        assert derive_household_state(members) == HouseholdState.KIDS_HOME

    def test_adults_home_not_all(self):
        members = [
            MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0),
            MemberPresence(name="Kid", role="child", is_home=False, last_seen=0),
        ]
        assert derive_household_state(members) == HouseholdState.ADULTS_HOME

    def test_adults_home_no_children_configured(self):
        members = [
            MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0),
        ]
        assert derive_household_state(members) == HouseholdState.ALL_HOME

    def test_empty_list(self):
        assert derive_household_state([]) == HouseholdState.EMPTY


class TestPingHost:
    @patch("vigilar.presence.monitor.subprocess.run")
    def test_ping_success(self, mock_run):
        from vigilar.presence.monitor import ping_host
        mock_run.return_value = type("R", (), {"returncode": 0})()
        assert ping_host("192.168.1.50") is True

    @patch("vigilar.presence.monitor.subprocess.run")
    def test_ping_failure(self, mock_run):
        from vigilar.presence.monitor import ping_host
        mock_run.return_value = type("R", (), {"returncode": 1})()
        assert ping_host("192.168.1.50") is False

    @patch("vigilar.presence.monitor.subprocess.run")
    def test_ping_timeout(self, mock_run):
        import subprocess as sp
        from vigilar.presence.monitor import ping_host
        mock_run.side_effect = sp.TimeoutExpired(cmd="ping", timeout=2)
        assert ping_host("192.168.1.50") is False


class TestDepartureDelay:
    def test_not_departed_within_delay(self):
        from vigilar.presence.monitor import should_mark_away
        last_seen = time.monotonic() - 300  # 5 min ago
        assert should_mark_away(last_seen, departure_delay_m=10) is False

    def test_departed_after_delay(self):
        from vigilar.presence.monitor import should_mark_away
        last_seen = time.monotonic() - 700  # 11+ min ago
        assert should_mark_away(last_seen, departure_delay_m=10) is True
  • Step 3: Run tests to verify they fail

Run: source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v Expected: FAIL (modules don't exist yet)

  • Step 4: Implement presence models

Create vigilar/presence/__init__.py (empty).

Create vigilar/presence/models.py:

"""Presence detection data models."""

from dataclasses import dataclass

from vigilar.constants import HouseholdState


@dataclass
class MemberPresence:
    name: str
    role: str  # "adult" | "child"
    is_home: bool
    last_seen: float  # monotonic timestamp of last successful ping


def derive_household_state(members: list[MemberPresence]) -> HouseholdState:
    if not members:
        return HouseholdState.EMPTY

    home = [m for m in members if m.is_home]
    if not home:
        return HouseholdState.EMPTY

    all_home = len(home) == len(members)
    adults_home = any(m.role == "adult" for m in home)
    kids_home = any(m.role == "child" for m in home)

    if all_home:
        return HouseholdState.ALL_HOME
    if adults_home:
        return HouseholdState.ADULTS_HOME
    if kids_home:
        return HouseholdState.KIDS_HOME
    return HouseholdState.EMPTY
  • Step 5: Implement presence monitor

Create vigilar/presence/monitor.py:

"""Presence monitor — pings family phones to determine who's home."""

import logging
import signal
import subprocess
import time

from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import HouseholdState, Topics
from vigilar.presence.models import MemberPresence, derive_household_state

log = logging.getLogger(__name__)


def ping_host(ip: str, timeout_s: int = 2) -> bool:
    try:
        result = subprocess.run(
            ["ping", "-c", "1", "-W", str(timeout_s), ip],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            timeout=timeout_s + 1,
        )
        return result.returncode == 0
    except subprocess.TimeoutExpired:
        return False
    except FileNotFoundError:
        log.error("ping command not found")
        return False


def should_mark_away(last_seen: float, departure_delay_m: int) -> bool:
    elapsed_m = (time.monotonic() - last_seen) / 60
    return elapsed_m >= departure_delay_m


class PresenceMonitor:
    def __init__(self, config: VigilarConfig):
        self._cfg = config.presence
        self._mqtt_cfg = config.mqtt
        self._members: list[MemberPresence] = []
        self._last_household_state = HouseholdState.EMPTY
        self._bus: MessageBus | None = None

        for m in self._cfg.members:
            self._members.append(MemberPresence(
                name=m.name, role=m.role, is_home=False, last_seen=0,
            ))

    def run(self) -> None:
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s [presence] %(levelname)s: %(message)s",
        )

        self._bus = MessageBus(self._mqtt_cfg, client_id="presence-monitor")
        self._bus.connect()

        shutdown = False
        def handle_signal(signum, frame):
            nonlocal shutdown
            shutdown = True
        signal.signal(signal.SIGTERM, handle_signal)

        log.info("Presence monitor started, tracking %d members", len(self._members))

        while not shutdown:
            self._poll_once()
            for _ in range(self._cfg.ping_interval_s):
                if shutdown:
                    break
                time.sleep(1)

        if self._bus:
            self._bus.disconnect()
        log.info("Presence monitor stopped")

    def _poll_once(self) -> None:
        for member in self._members:
            reachable = ping_host(member.ip)
            if reachable:
                member.is_home = True
                member.last_seen = time.monotonic()
            elif member.is_home:
                if should_mark_away(member.last_seen, self._cfg.departure_delay_m):
                    member.is_home = False
                    log.info("%s departed", member.name)

            if self._bus:
                self._bus.publish_event(
                    Topics.presence_member(member.name),
                    state="HOME" if member.is_home else "AWAY",
                    name=member.name,
                    role=member.role,
                )

        new_state = derive_household_state(self._members)
        if new_state != self._last_household_state:
            log.info("Household state: %s -> %s", self._last_household_state, new_state)
            self._last_household_state = new_state

        if self._bus:
            members_dict = {m.name: "HOME" if m.is_home else "AWAY" for m in self._members}
            self._bus.publish_event(
                Topics.PRESENCE_STATUS,
                household=new_state.value,
                members=members_dict,
            )


def run_presence_monitor(config: VigilarConfig) -> None:
    monitor = PresenceMonitor(config)
    monitor.run()
  • Step 6: Wire into supervisor

In vigilar/main.py, add after the UPS monitor block:

    # Presence monitor
    if cfg.presence.enabled and cfg.presence.members:
        from vigilar.presence.monitor import run_presence_monitor
        subsystems.append(SubsystemProcess("presence-monitor", run_presence_monitor, (cfg,)))
  • Step 7: Run tests to verify they pass

Run: source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v Expected: All 9 tests PASS

  • Step 8: Run full suite

Run: source .venv/bin/activate && python -m pytest tests/ -v Expected: All tests PASS (96 existing + 9 new = 105)

  • Step 9: Commit
git add vigilar/presence/ vigilar/constants.py vigilar/config.py vigilar/main.py tests/unit/test_presence.py
git commit -m "feat: add multi-person presence detection

Track family phones via ping, derive household state
(EMPTY/KIDS_HOME/ADULTS_HOME/ALL_HOME), publish via MQTT.
Configurable departure delay, per-member roles, auto-arm actions."

Task 2: Person + Vehicle Detection

Files:

  • Create: vigilar/detection/__init__.py

  • Create: vigilar/detection/person.py

  • Create: vigilar/detection/vehicle.py

  • Create: vigilar/detection/zones.py

  • Create: vigilar/cli/cmd_calibrate.py

  • Create: scripts/download_model.sh

  • Modify: vigilar/constants.py (add detection event types)

  • Modify: vigilar/config.py (add DetectionConfig, CameraZone, VehicleConfig)

  • Modify: vigilar/storage/schema.py (add detection_type, starred columns to recordings)

  • Modify: vigilar/camera/worker.py (add detection stage after motion)

  • Modify: vigilar/cli/main.py (register calibrate command)

  • Test: tests/unit/test_detection.py

  • Step 1: Add constants and config

In vigilar/constants.py, add to EventType:

    PERSON_DETECTED = "PERSON_DETECTED"
    VEHICLE_DETECTED = "VEHICLE_DETECTED"
    KNOWN_VEHICLE_ARRIVED = "KNOWN_VEHICLE_ARRIVED"
    UNKNOWN_VEHICLE_DETECTED = "UNKNOWN_VEHICLE_DETECTED"

Add to RecordingTrigger:

    PERSON = "PERSON"
    VEHICLE = "VEHICLE"

In vigilar/config.py, add before # --- Presence Config ---:

# --- Detection Config ---

class CameraZone(BaseModel):
    name: str
    region: list[int] = Field(default_factory=list)  # [x, y, w, h]
    watch_for: list[str] = Field(default_factory=lambda: ["person", "vehicle"])
    alert_unknown_vehicles: bool = False

class DetectionConfig(BaseModel):
    person_detection: bool = False
    model_path: str = "/var/vigilar/models/mobilenet_ssd_v2.pb"
    model_config: str = "/var/vigilar/models/mobilenet_ssd_v2.pbtxt"
    confidence_threshold: float = 0.5
    cameras: list[str] = Field(default_factory=list)  # empty = all

class KnownVehicle(BaseModel):
    name: str
    color_profile: str = ""  # white, black, silver, red, blue, etc.
    size_class: str = ""  # compact, midsize, large
    calibration_file: str = ""

class VehicleConfig(BaseModel):
    known: list[KnownVehicle] = Field(default_factory=list)

Add zones: list[CameraZone] = Field(default_factory=list) to CameraConfig.

Add to VigilarConfig:

    detection: DetectionConfig = Field(default_factory=DetectionConfig)
    vehicles: VehicleConfig = Field(default_factory=VehicleConfig)
  • Step 2: Update schema

In vigilar/storage/schema.py, add two columns to the recordings table:

    Column("detection_type", String),   # person, vehicle, motion, None
    Column("starred", Integer, nullable=False, default=0),

Add after the Column("thumbnail_path", ...) line and before the closing ).

  • Step 3: Write failing tests

Create tests/unit/test_detection.py:

"""Tests for person and vehicle detection."""

import numpy as np

from vigilar.detection.person import Detection, PersonDetector
from vigilar.detection.vehicle import classify_dominant_color, classify_size
from vigilar.detection.zones import filter_detections_by_zone


class TestPersonDetector:
    def test_detector_initializes_without_model(self):
        detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt")
        assert not detector.is_loaded

    def test_detect_returns_empty_when_not_loaded(self):
        detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt")
        frame = np.zeros((480, 640, 3), dtype=np.uint8)
        detections = detector.detect(frame)
        assert detections == []

    def test_detection_dataclass(self):
        d = Detection(class_name="person", class_id=1, confidence=0.85, bbox=(10, 20, 100, 200))
        assert d.class_name == "person"
        assert d.confidence == 0.85


class TestVehicleColor:
    def test_white_detection(self):
        white_region = np.full((100, 100, 3), 240, dtype=np.uint8)
        color = classify_dominant_color(white_region)
        assert color == "white"

    def test_black_detection(self):
        black_region = np.full((100, 100, 3), 15, dtype=np.uint8)
        color = classify_dominant_color(black_region)
        assert color == "black"

    def test_size_compact(self):
        assert classify_size(bbox_area=5000, zone_area=100000) == "compact"

    def test_size_midsize(self):
        assert classify_size(bbox_area=15000, zone_area=100000) == "midsize"

    def test_size_large(self):
        assert classify_size(bbox_area=30000, zone_area=100000) == "large"


class TestZoneFiltering:
    def test_detection_inside_zone(self):
        detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))]
        zone_region = (0, 0, 200, 200)
        filtered = filter_detections_by_zone(detections, zone_region, ["person"])
        assert len(filtered) == 1

    def test_detection_outside_zone(self):
        detections = [Detection("person", 1, 0.9, (300, 300, 50, 50))]
        zone_region = (0, 0, 200, 200)
        filtered = filter_detections_by_zone(detections, zone_region, ["person"])
        assert len(filtered) == 0

    def test_filter_by_class(self):
        detections = [
            Detection("person", 1, 0.9, (50, 50, 80, 80)),
            Detection("car", 3, 0.8, (50, 50, 80, 80)),
        ]
        zone_region = (0, 0, 200, 200)
        filtered = filter_detections_by_zone(detections, zone_region, ["person"])
        assert len(filtered) == 1
        assert filtered[0].class_name == "person"

    def test_no_zone_returns_all(self):
        detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))]
        filtered = filter_detections_by_zone(detections, None, ["person", "car"])
        assert len(filtered) == 1
  • Step 4: Run tests to verify they fail

Run: source .venv/bin/activate && python -m pytest tests/unit/test_detection.py -v Expected: FAIL

  • Step 5: Implement person detector

Create vigilar/detection/__init__.py (empty).

Create vigilar/detection/person.py:

"""Person detection using MobileNet-SSD v2 via OpenCV DNN."""

import logging
from dataclasses import dataclass
from pathlib import Path

import cv2
import numpy as np

log = logging.getLogger(__name__)

COCO_CLASSES = {
    1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane",
    6: "bus", 7: "train", 8: "truck", 9: "boat",
}
DETECT_CLASSES = {1, 3, 8}  # person, car, truck


@dataclass
class Detection:
    class_name: str
    class_id: int
    confidence: float
    bbox: tuple[int, int, int, int]  # x, y, w, h


class PersonDetector:
    def __init__(self, model_path: str, config_path: str, confidence_threshold: float = 0.5):
        self._threshold = confidence_threshold
        self._net = None
        self.is_loaded = False

        if Path(model_path).exists() and Path(config_path).exists():
            try:
                self._net = cv2.dnn.readNetFromTensorflow(model_path, config_path)
                self.is_loaded = True
                log.info("Person detection model loaded from %s", model_path)
            except cv2.error as e:
                log.error("Failed to load detection model: %s", e)
        else:
            log.warning("Detection model not found at %s — detection disabled", model_path)

    def detect(self, frame: np.ndarray) -> list[Detection]:
        if not self.is_loaded or self._net is None:
            return []

        h, w = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, size=(300, 300), swapRB=True, crop=False)
        self._net.setInput(blob)
        output = self._net.forward()

        detections = []
        for i in range(output.shape[2]):
            confidence = output[0, 0, i, 2]
            if confidence < self._threshold:
                continue
            class_id = int(output[0, 0, i, 1])
            if class_id not in DETECT_CLASSES:
                continue

            x1 = int(output[0, 0, i, 3] * w)
            y1 = int(output[0, 0, i, 4] * h)
            x2 = int(output[0, 0, i, 5] * w)
            y2 = int(output[0, 0, i, 6] * h)
            bw, bh = x2 - x1, y2 - y1
            if bw <= 0 or bh <= 0:
                continue

            detections.append(Detection(
                class_name=COCO_CLASSES.get(class_id, f"class_{class_id}"),
                class_id=class_id,
                confidence=float(confidence),
                bbox=(x1, y1, bw, bh),
            ))

        return detections
  • Step 6: Implement vehicle fingerprinting

Create vigilar/detection/vehicle.py:

"""Vehicle color and size fingerprinting."""

import logging
from dataclasses import dataclass
from pathlib import Path

import cv2
import numpy as np

log = logging.getLogger(__name__)


@dataclass
class VehicleProfile:
    name: str
    color_profile: str
    size_class: str
    histogram: np.ndarray | None = None


def classify_dominant_color(region: np.ndarray) -> str:
    hsv = cv2.cvtColor(region, cv2.COLOR_BGR2HSV)
    h, s, v = cv2.split(hsv)
    mean_s = float(np.mean(s))
    mean_v = float(np.mean(v))

    if mean_v < 40:
        return "black"
    if mean_s < 30 and mean_v > 180:
        return "white"
    if mean_s < 40:
        return "silver"

    mean_h = float(np.mean(h))
    if mean_h < 10 or mean_h > 170:
        return "red"
    if 10 <= mean_h < 25:
        return "orange"
    if 25 <= mean_h < 35:
        return "yellow"
    if 35 <= mean_h < 85:
        return "green"
    if 85 <= mean_h < 130:
        return "blue"
    return "unknown"


def classify_size(bbox_area: int, zone_area: int) -> str:
    if zone_area <= 0:
        return "unknown"
    ratio = bbox_area / zone_area
    if ratio < 0.1:
        return "compact"
    if ratio < 0.25:
        return "midsize"
    return "large"


class VehicleFingerprint:
    def __init__(self, known_vehicles: list[VehicleProfile] | None = None):
        self._known = known_vehicles or []

    def match(self, region: np.ndarray, bbox_area: int, zone_area: int) -> VehicleProfile | None:
        color = classify_dominant_color(region)
        size = classify_size(bbox_area, zone_area)

        for profile in self._known:
            if profile.color_profile == color and profile.size_class == size:
                return profile
        return None

    def add_profile(self, profile: VehicleProfile) -> None:
        self._known.append(profile)

    @property
    def known_count(self) -> int:
        return len(self._known)
  • Step 7: Implement zone filtering

Create vigilar/detection/zones.py:

"""Zone-based detection filtering."""

from vigilar.detection.person import Detection


def _bbox_center(bbox: tuple[int, int, int, int]) -> tuple[int, int]:
    x, y, w, h = bbox
    return x + w // 2, y + h // 2


def _point_in_rect(point: tuple[int, int], rect: tuple[int, int, int, int]) -> bool:
    px, py = point
    rx, ry, rw, rh = rect
    return rx <= px <= rx + rw and ry <= py <= ry + rh


def filter_detections_by_zone(
    detections: list[Detection],
    zone_region: tuple[int, int, int, int] | None,
    watch_for: list[str],
) -> list[Detection]:
    result = []
    for d in detections:
        if d.class_name not in watch_for:
            continue
        if zone_region is not None:
            center = _bbox_center(d.bbox)
            if not _point_in_rect(center, zone_region):
                continue
        result.append(d)
    return result
  • Step 8: Create model download script

Create scripts/download_model.sh:

#!/usr/bin/env bash
set -euo pipefail

MODEL_DIR="${1:-/var/vigilar/models}"
mkdir -p "$MODEL_DIR"

PB_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pb"
PBTXT_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pbtxt"

echo "Downloading MobileNet-SSD v2 model..."
if command -v curl &>/dev/null; then
    curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL"
    curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL"
elif command -v wget &>/dev/null; then
    wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL"
    wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL"
else
    echo "ERROR: curl or wget required"
    exit 1
fi

echo "Model downloaded to $MODEL_DIR"
ls -lh "$MODEL_DIR"/mobilenet_ssd_v2.*

Run: chmod +x scripts/download_model.sh

  • Step 9: Wire detection into camera worker

In vigilar/camera/worker.py, add import at the top:

from vigilar.detection.person import PersonDetector
from vigilar.detection.vehicle import VehicleFingerprint, VehicleProfile, classify_dominant_color, classify_size
from vigilar.detection.zones import filter_detections_by_zone
from vigilar.config import CameraConfig, DetectionConfig, MQTTConfig, RemoteConfig, VehicleConfig

After the hls and remote_hls initialization, add:

    # Person/vehicle detector (second-stage after MOG2)
    person_detector: PersonDetector | None = None
    vehicle_fp: VehicleFingerprint | None = None
    det_cfg = None  # will be passed as new parameter

    if det_cfg and det_cfg.person_detection:
        if not det_cfg.cameras or camera_id in det_cfg.cameras:
            person_detector = PersonDetector(
                det_cfg.model_path, det_cfg.model_config, det_cfg.confidence_threshold,
            )

    if veh_cfg and veh_cfg.known:
        profiles = [VehicleProfile(v.name, v.color_profile, v.size_class) for v in veh_cfg.known]
        vehicle_fp = VehicleFingerprint(profiles)

In the motion detection block, after bus.publish_event(Topics.camera_motion_start(...)), add the detection second-stage:

            # Second-stage: person/vehicle detection
            detection_type = "motion"
            if person_detector and person_detector.is_loaded:
                detections = person_detector.detect(frame)
                persons = [d for d in detections if d.class_name == "person"]
                vehicles = [d for d in detections if d.class_name in ("car", "truck")]

                if persons:
                    detection_type = "person"
                    bus.publish_event(
                        f"vigilar/camera/{camera_id}/person",
                        count=len(persons),
                        confidence=max(d.confidence for d in persons),
                    )
                elif vehicles:
                    detection_type = "vehicle"
                    # Vehicle fingerprinting
                    if vehicle_fp:
                        for vd in vehicles:
                            x, y, w, h = vd.bbox
                            crop = frame[max(0,y):y+h, max(0,x):x+w]
                            if crop.size > 0:
                                match = vehicle_fp.match(crop, w*h, frame.shape[0]*frame.shape[1])
                                if match:
                                    bus.publish_event(
                                        f"vigilar/camera/{camera_id}/vehicle/known",
                                        vehicle=match.name,
                                    )
                                else:
                                    bus.publish_event(
                                        f"vigilar/camera/{camera_id}/vehicle/unknown",
                                        color=classify_dominant_color(crop),
                                    )
                    else:
                        bus.publish_event(
                            f"vigilar/camera/{camera_id}/vehicle",
                            count=len(vehicles),
                        )

Update the run_camera_worker function signature to accept det_cfg and veh_cfg parameters. Update camera/manager.py to pass them.

  • Step 10: Run tests

Run: source .venv/bin/activate && python -m pytest tests/unit/test_detection.py tests/ -v Expected: All PASS

  • Step 11: Commit
git add vigilar/detection/ vigilar/constants.py vigilar/config.py vigilar/storage/schema.py vigilar/camera/worker.py vigilar/camera/manager.py scripts/download_model.sh tests/unit/test_detection.py
git commit -m "feat: add person/vehicle detection with zone fencing

MobileNet-SSD v2 via OpenCV DNN as second-stage filter after MOG2.
Vehicle color/size fingerprinting for known car matching.
Zone-based filtering per camera. Model download script."

Task 3: Health Monitoring + Self-Healing

Files:

  • Create: vigilar/health/__init__.py

  • Create: vigilar/health/monitor.py

  • Create: vigilar/health/pruner.py

  • Create: vigilar/health/digest.py

  • Modify: vigilar/config.py (add HealthConfig)

  • Modify: vigilar/main.py (start HealthMonitor)

  • Test: tests/unit/test_health.py

  • Step 1: Add config

In vigilar/config.py, add before # --- Rule Config ---:

# --- Health Config ---

class HealthConfig(BaseModel):
    enabled: bool = True
    disk_warn_pct: int = 85
    disk_critical_pct: int = 95
    auto_prune: bool = True
    auto_prune_target_pct: int = 80
    daily_digest: bool = True
    daily_digest_time: str = "08:00"

Add health: HealthConfig = Field(default_factory=HealthConfig) to VigilarConfig.

  • Step 2: Write failing tests

Create tests/unit/test_health.py:

"""Tests for health monitoring."""

import shutil
import tempfile
from pathlib import Path
from unittest.mock import patch

from vigilar.health.pruner import find_prunable_recordings, calculate_disk_usage_pct
from vigilar.health.monitor import HealthCheck, HealthStatus, check_disk, check_mqtt_port


class TestDiskCheck:
    @patch("vigilar.health.monitor.shutil.disk_usage")
    def test_healthy(self, mock_usage):
        mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 50_000_000_000, "free": 50_000_000_000})()
        result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95)
        assert result.status == HealthStatus.HEALTHY

    @patch("vigilar.health.monitor.shutil.disk_usage")
    def test_warning(self, mock_usage):
        mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 90_000_000_000, "free": 10_000_000_000})()
        result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95)
        assert result.status == HealthStatus.WARNING

    @patch("vigilar.health.monitor.shutil.disk_usage")
    def test_critical(self, mock_usage):
        mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 97_000_000_000, "free": 3_000_000_000})()
        result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95)
        assert result.status == HealthStatus.CRITICAL


class TestPruner:
    def test_calculate_disk_pct(self):
        with tempfile.TemporaryDirectory() as d:
            pct = calculate_disk_usage_pct(d)
            assert 0 <= pct <= 100

    def test_find_prunable_empty(self, test_db):
        result = find_prunable_recordings(test_db, limit=10)
        assert result == []


class TestMQTTCheck:
    @patch("vigilar.health.monitor.socket.create_connection")
    def test_mqtt_reachable(self, mock_conn):
        mock_conn.return_value.__enter__ = lambda s: s
        mock_conn.return_value.__exit__ = lambda s, *a: None
        result = check_mqtt_port("127.0.0.1", 1883)
        assert result.status == HealthStatus.HEALTHY

    @patch("vigilar.health.monitor.socket.create_connection")
    def test_mqtt_unreachable(self, mock_conn):
        mock_conn.side_effect = ConnectionRefusedError()
        result = check_mqtt_port("127.0.0.1", 1883)
        assert result.status == HealthStatus.CRITICAL
  • Step 3: Run tests to verify they fail

Run: source .venv/bin/activate && python -m pytest tests/unit/test_health.py -v Expected: FAIL

  • Step 4: Implement health monitor

Create vigilar/health/__init__.py (empty).

Create vigilar/health/monitor.py:

"""Health monitoring — periodic subsystem checks."""

import logging
import shutil
import signal
import socket
import time
from dataclasses import dataclass, field
from enum import StrEnum

from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig

log = logging.getLogger(__name__)


class HealthStatus(StrEnum):
    HEALTHY = "healthy"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class HealthCheck:
    name: str
    status: HealthStatus
    message: str = ""
    value: float = 0


def check_disk(path: str, warn_pct: int, critical_pct: int) -> HealthCheck:
    try:
        usage = shutil.disk_usage(path)
        pct = usage.used / usage.total * 100
        if pct >= critical_pct:
            return HealthCheck("disk", HealthStatus.CRITICAL, f"{pct:.0f}% used", pct)
        if pct >= warn_pct:
            return HealthCheck("disk", HealthStatus.WARNING, f"{pct:.0f}% used", pct)
        return HealthCheck("disk", HealthStatus.HEALTHY, f"{pct:.0f}% used", pct)
    except OSError as e:
        return HealthCheck("disk", HealthStatus.CRITICAL, str(e))


def check_mqtt_port(host: str, port: int) -> HealthCheck:
    try:
        with socket.create_connection((host, port), timeout=5):
            return HealthCheck("mqtt", HealthStatus.HEALTHY, "reachable")
    except (ConnectionRefusedError, TimeoutError, OSError) as e:
        return HealthCheck("mqtt", HealthStatus.CRITICAL, str(e))


class HealthMonitor:
    def __init__(self, config: VigilarConfig):
        self._cfg = config
        self._bus: MessageBus | None = None
        self._checks: list[HealthCheck] = []

    def run(self) -> None:
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s [health] %(levelname)s: %(message)s",
        )

        self._bus = MessageBus(self._cfg.mqtt, client_id="health-monitor")
        self._bus.connect()

        shutdown = False
        def handle_signal(signum, frame):
            nonlocal shutdown
            shutdown = True
        signal.signal(signal.SIGTERM, handle_signal)

        log.info("Health monitor started")
        last_disk_check = 0
        last_mqtt_check = 0

        while not shutdown:
            now = time.monotonic()

            if now - last_disk_check >= 300:  # 5 min
                disk = check_disk(self._cfg.system.data_dir, self._cfg.health.disk_warn_pct, self._cfg.health.disk_critical_pct)
                self._update_check(disk)
                last_disk_check = now

                if disk.status == HealthStatus.CRITICAL and self._cfg.health.auto_prune:
                    from vigilar.health.pruner import auto_prune
                    auto_prune(self._cfg)

            if now - last_mqtt_check >= 30:
                mqtt = check_mqtt_port(self._cfg.mqtt.host, self._cfg.mqtt.port)
                self._update_check(mqtt)
                last_mqtt_check = now

            self._publish_status()
            time.sleep(10)

        if self._bus:
            self._bus.disconnect()

    def _update_check(self, check: HealthCheck) -> None:
        for i, c in enumerate(self._checks):
            if c.name == check.name:
                self._checks[i] = check
                return
        self._checks.append(check)

    def _publish_status(self) -> None:
        if not self._bus:
            return
        overall = HealthStatus.HEALTHY
        for c in self._checks:
            if c.status == HealthStatus.CRITICAL:
                overall = HealthStatus.CRITICAL
                break
            if c.status == HealthStatus.WARNING:
                overall = HealthStatus.WARNING

        self._bus.publish_event(
            "vigilar/system/health",
            status=overall.value,
            checks={c.name: {"status": c.status.value, "message": c.message} for c in self._checks},
        )


def run_health_monitor(config: VigilarConfig) -> None:
    monitor = HealthMonitor(config)
    monitor.run()
  • Step 5: Implement auto-pruner

Create vigilar/health/pruner.py:

"""Auto-prune old recordings when disk usage is high."""

import logging
import os
import shutil
from pathlib import Path

from sqlalchemy import select
from sqlalchemy.engine import Engine

from vigilar.config import VigilarConfig
from vigilar.storage.schema import recordings

log = logging.getLogger(__name__)


def calculate_disk_usage_pct(path: str) -> float:
    usage = shutil.disk_usage(path)
    return usage.used / usage.total * 100


def find_prunable_recordings(engine: Engine, limit: int = 50) -> list[dict]:
    query = (
        select(recordings)
        .where(recordings.c.starred == 0)
        .order_by(recordings.c.started_at.asc())
        .limit(limit)
    )
    with engine.connect() as conn:
        return [dict(r) for r in conn.execute(query).mappings().all()]


def auto_prune(config: VigilarConfig) -> int:
    from vigilar.storage.db import get_db_path, get_engine

    target = config.health.auto_prune_target_pct
    data_dir = config.system.data_dir
    current_pct = calculate_disk_usage_pct(data_dir)

    if current_pct <= target:
        return 0

    engine = get_engine(get_db_path(data_dir))
    pruned = 0
    total_bytes = 0

    while current_pct > target:
        candidates = find_prunable_recordings(engine, limit=20)
        if not candidates:
            break

        for rec in candidates:
            file_path = rec.get("file_path", "")
            if file_path and Path(file_path).exists():
                size = Path(file_path).stat().st_size
                Path(file_path).unlink()
                total_bytes += size

            thumb = rec.get("thumbnail_path", "")
            if thumb and Path(thumb).exists():
                Path(thumb).unlink()

            with engine.begin() as conn:
                conn.execute(recordings.delete().where(recordings.c.id == rec["id"]))
            pruned += 1

        current_pct = calculate_disk_usage_pct(data_dir)

    if pruned:
        mb = total_bytes / (1024 * 1024)
        log.info("Auto-pruned %d recordings (%.1f MB), disk now at %.0f%%", pruned, mb, current_pct)

    return pruned
  • Step 6: Implement daily digest

Create vigilar/health/digest.py:

"""Daily digest notification builder."""

import logging
import shutil
import time

from sqlalchemy import func, select
from sqlalchemy.engine import Engine

from vigilar.constants import EventType
from vigilar.storage.schema import events, recordings

log = logging.getLogger(__name__)


def build_digest(engine: Engine, data_dir: str, since_hours: int = 12) -> dict:
    since_ts = int((time.time() - since_hours * 3600) * 1000)

    with engine.connect() as conn:
        person_count = conn.execute(
            select(func.count()).select_from(events)
            .where(events.c.type == EventType.PERSON_DETECTED, events.c.ts >= since_ts)
        ).scalar() or 0

        vehicle_count = conn.execute(
            select(func.count()).select_from(events)
            .where(events.c.type == EventType.UNKNOWN_VEHICLE_DETECTED, events.c.ts >= since_ts)
        ).scalar() or 0

        recording_count = conn.execute(
            select(func.count()).select_from(recordings)
            .where(recordings.c.started_at >= since_ts // 1000)
        ).scalar() or 0

    usage = shutil.disk_usage(data_dir)
    disk_pct = usage.used / usage.total * 100
    disk_gb = usage.used / (1024**3)

    return {
        "person_detections": person_count,
        "unknown_vehicles": vehicle_count,
        "recordings": recording_count,
        "disk_used_gb": round(disk_gb, 1),
        "disk_used_pct": round(disk_pct, 0),
        "since_hours": since_hours,
    }


def format_digest(data: dict) -> str:
    return (
        f"Vigilar Daily Summary\n"
        f"Last {data['since_hours']}h: "
        f"{data['person_detections']} person detections, "
        f"{data['unknown_vehicles']} unknown vehicles, "
        f"{data['recordings']} recordings\n"
        f"Storage: {data['disk_used_gb']} GB ({data['disk_used_pct']:.0f}%)"
    )
  • Step 7: Wire into supervisor

In vigilar/main.py, add after the presence monitor block:

    # Health monitor
    if cfg.health.enabled:
        from vigilar.health.monitor import run_health_monitor
        subsystems.append(SubsystemProcess("health-monitor", run_health_monitor, (cfg,)))
  • Step 8: Run tests

Run: source .venv/bin/activate && python -m pytest tests/unit/test_health.py tests/ -v Expected: All PASS

  • Step 9: Commit
git add vigilar/health/ vigilar/config.py vigilar/main.py tests/unit/test_health.py
git commit -m "feat: add health monitoring with auto-prune and daily digest

Periodic disk, MQTT, and subsystem health checks. Auto-prune
oldest non-starred recordings when disk exceeds threshold.
Daily digest notification with overnight summary."

Task 4: Smart Alert Profiles

Files:

  • Create: vigilar/alerts/profiles.py
  • Modify: vigilar/config.py (add AlertProfileConfig, AlertProfileRule)
  • Modify: vigilar/alerts/dispatcher.py (role-based routing)
  • Modify: vigilar/web/templates/settings.html (profile editor)
  • Modify: vigilar/web/static/js/settings.js (profile save/load)
  • Modify: vigilar/web/blueprints/system.py (profile API endpoints)
  • Test: tests/unit/test_profiles.py

Depends on: Task 1 (HouseholdState) and Task 2 (detection event types)

  • Step 1: Add config models

In vigilar/config.py, add before # --- Health Config ---:

# --- Alert Profile Config ---

class AlertProfileRule(BaseModel):
    detection_type: str  # person, unknown_vehicle, known_vehicle, motion
    camera_location: str = "any"  # any | exterior | common_area | specific camera id
    action: str = "record_only"  # push_and_record, push_adults, record_only, quiet_log
    recipients: str = "all"  # all, adults, none

class AlertProfileConfig(BaseModel):
    name: str
    enabled: bool = True
    presence_states: list[str] = Field(default_factory=list)
    time_window: str = ""  # "" = all day, "23:00-06:00" = sleep hours
    rules: list[AlertProfileRule] = Field(default_factory=list)

Add profiles: list[AlertProfileConfig] = Field(default_factory=list) to AlertsConfig.

Add sleep_start: str = "23:00" and sleep_end: str = "06:00" to AlertsConfig.

  • Step 2: Write failing tests

Create tests/unit/test_profiles.py:

"""Tests for smart alert profiles."""

from vigilar.alerts.profiles import match_profile, is_in_time_window, get_action_for_event
from vigilar.config import AlertProfileConfig, AlertProfileRule
from vigilar.constants import HouseholdState


def _make_profile(name, states, time_window="", rules=None):
    return AlertProfileConfig(
        name=name,
        presence_states=states,
        time_window=time_window,
        rules=rules or [],
    )


class TestTimeWindow:
    def test_no_window_always_matches(self):
        assert is_in_time_window("", "14:30") is True

    def test_inside_window(self):
        assert is_in_time_window("23:00-06:00", "02:30") is True

    def test_outside_window(self):
        assert is_in_time_window("23:00-06:00", "14:30") is False

    def test_at_start(self):
        assert is_in_time_window("23:00-06:00", "23:00") is True

    def test_daytime_window(self):
        assert is_in_time_window("06:00-23:00", "14:30") is True


class TestProfileMatching:
    def test_match_by_presence(self):
        profiles = [
            _make_profile("Away", ["EMPTY"]),
            _make_profile("Home", ["ADULTS_HOME", "ALL_HOME"]),
        ]
        result = match_profile(profiles, HouseholdState.EMPTY, "14:00")
        assert result is not None
        assert result.name == "Away"

    def test_no_match(self):
        profiles = [_make_profile("Away", ["EMPTY"])]
        result = match_profile(profiles, HouseholdState.ALL_HOME, "14:00")
        assert result is None

    def test_time_window_narrows(self):
        profiles = [
            _make_profile("Night", ["ALL_HOME"], "23:00-06:00"),
            _make_profile("Day", ["ALL_HOME"], "06:00-23:00"),
        ]
        result = match_profile(profiles, HouseholdState.ALL_HOME, "02:00")
        assert result is not None
        assert result.name == "Night"


class TestActionForEvent:
    def test_person_gets_push(self):
        rules = [AlertProfileRule(detection_type="person", action="push_and_record", recipients="all")]
        profile = _make_profile("Away", ["EMPTY"], rules=rules)
        action, recipients = get_action_for_event(profile, "person", "front_door")
        assert action == "push_and_record"
        assert recipients == "all"

    def test_motion_gets_record_only(self):
        rules = [AlertProfileRule(detection_type="motion", action="record_only", recipients="none")]
        profile = _make_profile("Home", ["ALL_HOME"], rules=rules)
        action, recipients = get_action_for_event(profile, "motion", "front_door")
        assert action == "record_only"

    def test_no_matching_rule_defaults_quiet(self):
        profile = _make_profile("Home", ["ALL_HOME"], rules=[])
        action, recipients = get_action_for_event(profile, "person", "front_door")
        assert action == "quiet_log"
        assert recipients == "none"
  • Step 3: Run tests to verify fail

Run: source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v Expected: FAIL

  • Step 4: Implement profile engine

Create vigilar/alerts/profiles.py:

"""Smart alert profile matching engine."""

import logging
from datetime import datetime

from vigilar.config import AlertProfileConfig, AlertProfileRule
from vigilar.constants import HouseholdState

log = logging.getLogger(__name__)


def is_in_time_window(window: str, current_time: str) -> bool:
    if not window:
        return True

    parts = window.split("-")
    if len(parts) != 2:
        return True

    start, end = parts[0].strip(), parts[1].strip()
    t = current_time

    if start <= end:
        return start <= t < end
    else:
        return t >= start or t < end


def match_profile(
    profiles: list[AlertProfileConfig],
    household_state: HouseholdState,
    current_time: str,
) -> AlertProfileConfig | None:
    for profile in profiles:
        if not profile.enabled:
            continue
        if household_state.value not in profile.presence_states:
            continue
        if not is_in_time_window(profile.time_window, current_time):
            continue
        return profile
    return None


def get_action_for_event(
    profile: AlertProfileConfig,
    detection_type: str,
    camera_id: str,
) -> tuple[str, str]:
    for rule in profile.rules:
        if rule.detection_type != detection_type:
            continue
        if rule.camera_location not in ("any", camera_id):
            continue
        return rule.action, rule.recipients
    return "quiet_log", "none"
  • Step 5: Run tests

Run: source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v Expected: All 10 tests PASS

  • Step 6: Run full suite

Run: source .venv/bin/activate && python -m pytest tests/ -v Expected: All PASS

  • Step 7: Commit
git add vigilar/alerts/profiles.py vigilar/config.py tests/unit/test_profiles.py
git commit -m "feat: add smart alert profiles with presence/time matching

Profiles activate based on household state + time of day.
Per-detection-type rules control push/record/quiet behavior.
Role-based recipient filtering (all vs adults-only)."

Task 5: Recording Timeline

Files:

  • Create: vigilar/web/static/js/timeline.js
  • Modify: vigilar/web/blueprints/recordings.py (add timeline API)
  • Modify: vigilar/web/templates/recordings.html (timeline UI)
  • Modify: vigilar/storage/queries.py (add get_timeline_data)
  • Test: tests/unit/test_timeline.py

Depends on: Task 2 (detection_type column in recordings)

  • Step 1: Add timeline query

In vigilar/storage/queries.py, add:

def get_timeline_data(
    engine: Engine,
    camera_id: str,
    day_start_ts: int,
    day_end_ts: int,
) -> list[dict[str, Any]]:
    query = (
        select(
            recordings.c.id,
            recordings.c.started_at,
            recordings.c.ended_at,
            recordings.c.detection_type,
            recordings.c.starred,
        )
        .where(
            recordings.c.camera_id == camera_id,
            recordings.c.started_at >= day_start_ts,
            recordings.c.started_at < day_end_ts,
        )
        .order_by(recordings.c.started_at.asc())
    )
    with engine.connect() as conn:
        return [dict(r) for r in conn.execute(query).mappings().all()]
  • Step 2: Write failing test

Create tests/unit/test_timeline.py:

"""Tests for recording timeline."""

import time

from vigilar.storage.queries import get_timeline_data, insert_recording


class TestTimelineQuery:
    def test_returns_recordings_for_day(self, test_db):
        now = int(time.time())
        insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60,
                         file_path="/tmp/a.mp4", trigger="MOTION", detection_type="person")
        insert_recording(test_db, camera_id="cam1", started_at=now+100, ended_at=now+130,
                         file_path="/tmp/b.mp4", trigger="MOTION", detection_type="motion")

        results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600)
        assert len(results) == 2
        assert results[0]["detection_type"] == "person"
        assert results[1]["detection_type"] == "motion"

    def test_filters_by_camera(self, test_db):
        now = int(time.time())
        insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60,
                         file_path="/tmp/a.mp4", trigger="MOTION")
        insert_recording(test_db, camera_id="cam2", started_at=now, ended_at=now+60,
                         file_path="/tmp/b.mp4", trigger="MOTION")

        results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600)
        assert len(results) == 1

    def test_filters_by_time_range(self, test_db):
        now = int(time.time())
        insert_recording(test_db, camera_id="cam1", started_at=now - 7200, ended_at=now - 7100,
                         file_path="/tmp/old.mp4", trigger="MOTION")
        insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60,
                         file_path="/tmp/new.mp4", trigger="MOTION")

        results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600)
        assert len(results) == 1
  • Step 3: Run test to verify fail

Run: source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py -v Expected: FAIL

  • Step 4: Add timeline API endpoint

In vigilar/web/blueprints/recordings.py, add:

import time as time_mod
from datetime import datetime, timedelta

@recordings_bp.route("/api/timeline")
def timeline_api():
    """JSON API: timeline data for a camera on a given day."""
    camera_id = request.args.get("camera")
    date_str = request.args.get("date")  # YYYY-MM-DD

    if not camera_id:
        return jsonify({"error": "camera required"}), 400

    if date_str:
        day = datetime.strptime(date_str, "%Y-%m-%d")
    else:
        day = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

    day_start = int(day.timestamp())
    day_end = int((day + timedelta(days=1)).timestamp())

    engine = current_app.config.get("DB_ENGINE")
    if engine is None:
        return jsonify([])

    from vigilar.storage.queries import get_timeline_data
    data = get_timeline_data(engine, camera_id, day_start, day_end)

    return jsonify([
        {
            "id": r["id"],
            "start": r["started_at"],
            "end": r["ended_at"],
            "type": r.get("detection_type", "motion"),
            "starred": bool(r.get("starred", 0)),
        }
        for r in data
    ])
  • Step 5: Create timeline.js

Create vigilar/web/static/js/timeline.js:

/* Vigilar — Recording timeline renderer */

(function() {
    'use strict';

    const COLORS = {
        person: '#dc3545',
        vehicle: '#0d6efd',
        motion: '#6c757d',
        default: '#6c757d',
    };

    class Timeline {
        constructor(canvas, options = {}) {
            this.canvas = canvas;
            this.ctx = canvas.getContext('2d');
            this.segments = [];
            this.cameraId = options.cameraId || '';
            this.date = options.date || new Date().toISOString().split('T')[0];
            this.onClick = options.onClick || null;
            this.dayStartTs = 0;
            this.dayEndTs = 0;
            this._resize();
            this._bindEvents();
        }

        _resize() {
            const rect = this.canvas.parentElement.getBoundingClientRect();
            this.canvas.width = rect.width;
            this.canvas.height = this.canvas.dataset.height ? parseInt(this.canvas.dataset.height) : 48;
        }

        _bindEvents() {
            this.canvas.addEventListener('click', (e) => this._handleClick(e));
            window.addEventListener('resize', () => { this._resize(); this.render(); });
        }

        async load() {
            const resp = await fetch(`/recordings/api/timeline?camera=${this.cameraId}&date=${this.date}`);
            if (!resp.ok) return;
            this.segments = await resp.json();

            const d = new Date(this.date + 'T00:00:00');
            this.dayStartTs = d.getTime() / 1000;
            this.dayEndTs = this.dayStartTs + 86400;
            this.render();
        }

        render() {
            const w = this.canvas.width;
            const h = this.canvas.height;
            const ctx = this.ctx;
            const dur = this.dayEndTs - this.dayStartTs;

            ctx.clearRect(0, 0, w, h);
            ctx.fillStyle = '#161b22';
            ctx.fillRect(0, 0, w, h);

            for (const seg of this.segments) {
                const x1 = ((seg.start - this.dayStartTs) / dur) * w;
                const x2 = seg.end ? ((seg.end - this.dayStartTs) / dur) * w : x1 + 2;
                const segW = Math.max(x2 - x1, 2);
                ctx.fillStyle = COLORS[seg.type] || COLORS.default;
                ctx.fillRect(x1, 4, segW, h - 8);
            }

            // Hour markers
            ctx.fillStyle = '#30363d';
            ctx.font = '10px sans-serif';
            for (let hour = 0; hour <= 24; hour += 6) {
                const x = (hour / 24) * w;
                ctx.fillRect(x, 0, 1, h);
                if (hour < 24) {
                    ctx.fillStyle = '#8b949e';
                    ctx.fillText(`${hour}:00`, x + 3, h - 2);
                    ctx.fillStyle = '#30363d';
                }
            }
        }

        _handleClick(e) {
            const rect = this.canvas.getBoundingClientRect();
            const x = e.clientX - rect.left;
            const pct = x / this.canvas.width;
            const ts = this.dayStartTs + pct * 86400;

            const clicked = this.segments.find(s => ts >= s.start && ts <= (s.end || s.start + 10));
            if (clicked && this.onClick) {
                this.onClick(clicked);
            }
        }
    }

    window.VigilarTimeline = Timeline;
})();
  • Step 6: Update recordings template

Replace the table in vigilar/web/templates/recordings.html with the timeline view. Add a <canvas> per camera, date picker, filter buttons, and a video player area. Use the VigilarTimeline class to render each camera's timeline and handle click-to-play.

  • Step 7: Run tests

Run: source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py tests/ -v Expected: All PASS

  • Step 8: Commit
git add vigilar/web/static/js/timeline.js vigilar/web/blueprints/recordings.py vigilar/web/templates/recordings.html vigilar/storage/queries.py tests/unit/test_timeline.py
git commit -m "feat: add recording timeline with color-coded detection types

Canvas-based 24h timeline per camera. Color-coded segments for
person (red), vehicle (blue), motion (gray). Click to play.
Date picker, filter buttons, hour markers."

Final: Integration Verification

  • Step 1: Run full test suite
source .venv/bin/activate && python -m pytest tests/ -v

Expected: All tests pass (96 original + ~35 new = ~131 total)

  • Step 2: Verify all web routes
source .venv/bin/activate && python -c "
from vigilar.config import load_config
from vigilar.web.app import create_app
cfg = load_config()
app = create_app(cfg)
with app.test_client() as c:
    for p in ['/', '/system/settings', '/kiosk/', '/events/', '/sensors/', '/recordings/', '/cameras/']:
        r = c.get(p)
        print(f'{p}: {r.status_code}')
"

Expected: All 200

  • Step 3: Commit and push
git push