vigilar/docs/superpowers/plans/2026-04-02-daily-use-features.md
Aaron D. Lee 8a65ac8c69 Add implementation plan for daily use features
5-task plan covering presence detection, person/vehicle AI detection,
smart alert profiles, recording timeline UI, and health monitoring.
Tasks 1-3 parallelizable, 4 depends on 1+2, 5 depends on 2.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 00:01:53 -04:00

1784 lines
56 KiB
Markdown

# Daily Use Features Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add presence detection, person/vehicle AI detection, smart alert profiles, recording timeline, and health monitoring to make Vigilar a system a household relies on daily.
**Architecture:** Five independent modules (`presence/`, `detection/`, `health/`, `alerts/profiles.py`, `web/static/js/timeline.js`) that communicate via the existing MQTT bus and SQLite storage. The detection module hooks into the camera worker's existing motion pipeline. Alert profiles consume events from both presence and detection to route notifications by role and time of day.
**Tech Stack:** Python 3.11+, OpenCV DNN (MobileNet-SSD v2), Flask/Jinja2, Bootstrap 5, Canvas API for timeline, subprocess ping for presence, existing paho-mqtt bus.
**Dependency graph:**
```
Task 1 (Presence) ──┐
Task 2 (Detection) ──┼──→ Task 4 (Alert Profiles)
Task 3 (Health) ──┘
Task 5 (Timeline) ──────→ (after Task 2 schema change)
```
Tasks 1, 2, 3 are fully parallelizable. Task 4 after 1+2. Task 5 after 2.
---
## Task 1: Multi-Person Presence Detection
**Files:**
- Create: `vigilar/presence/__init__.py`
- Create: `vigilar/presence/models.py`
- Create: `vigilar/presence/monitor.py`
- Modify: `vigilar/config.py` (add PresenceConfig, PresenceMember)
- Modify: `vigilar/constants.py` (add HouseholdState enum, presence Topics)
- Modify: `vigilar/main.py` (start PresenceMonitor subprocess)
- Test: `tests/unit/test_presence.py`
- [ ] **Step 1: Add constants and config models**
In `vigilar/constants.py`, add after `UPSStatus`:
```python
# --- Household Presence ---
class HouseholdState(StrEnum):
EMPTY = "EMPTY"
KIDS_HOME = "KIDS_HOME"
ADULTS_HOME = "ADULTS_HOME"
ALL_HOME = "ALL_HOME"
```
Add to `Topics` class:
```python
# Presence
@staticmethod
def presence_member(name: str) -> str:
return f"vigilar/presence/{name}"
PRESENCE_STATUS = "vigilar/presence/status"
```
In `vigilar/config.py`, add before `# --- Rule Config ---`:
```python
# --- Presence Config ---
class PresenceMember(BaseModel):
name: str
ip: str
role: str = "adult" # adult | child
class PresenceConfig(BaseModel):
enabled: bool = False
ping_interval_s: int = 30
departure_delay_m: int = 10
method: str = "icmp" # icmp | arping
members: list[PresenceMember] = Field(default_factory=list)
actions: dict[str, str] = Field(default_factory=lambda: {
"EMPTY": "ARMED_AWAY",
"ADULTS_HOME": "DISARMED",
"KIDS_HOME": "ARMED_HOME",
"ALL_HOME": "DISARMED",
})
```
Add `presence: PresenceConfig = Field(default_factory=PresenceConfig)` to `VigilarConfig` after `remote`.
- [ ] **Step 2: Write failing tests**
Create `tests/unit/test_presence.py`:
```python
"""Tests for presence detection."""
import time
from unittest.mock import patch
from vigilar.config import PresenceConfig, PresenceMember
from vigilar.constants import HouseholdState
from vigilar.presence.models import MemberPresence, derive_household_state
class TestDeriveHouseholdState:
def test_empty_when_no_members_home(self):
members = [
MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0),
MemberPresence(name="Mom", role="adult", is_home=False, last_seen=0),
]
assert derive_household_state(members) == HouseholdState.EMPTY
def test_all_home(self):
members = [
MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0),
MemberPresence(name="Mom", role="adult", is_home=True, last_seen=0),
MemberPresence(name="Kid", role="child", is_home=True, last_seen=0),
]
assert derive_household_state(members) == HouseholdState.ALL_HOME
def test_kids_home_only(self):
members = [
MemberPresence(name="Dad", role="adult", is_home=False, last_seen=0),
MemberPresence(name="Kid", role="child", is_home=True, last_seen=0),
]
assert derive_household_state(members) == HouseholdState.KIDS_HOME
def test_adults_home_not_all(self):
members = [
MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0),
MemberPresence(name="Kid", role="child", is_home=False, last_seen=0),
]
assert derive_household_state(members) == HouseholdState.ADULTS_HOME
def test_adults_home_no_children_configured(self):
members = [
MemberPresence(name="Dad", role="adult", is_home=True, last_seen=0),
]
assert derive_household_state(members) == HouseholdState.ALL_HOME
def test_empty_list(self):
assert derive_household_state([]) == HouseholdState.EMPTY
class TestPingHost:
@patch("vigilar.presence.monitor.subprocess.run")
def test_ping_success(self, mock_run):
from vigilar.presence.monitor import ping_host
mock_run.return_value = type("R", (), {"returncode": 0})()
assert ping_host("192.168.1.50") is True
@patch("vigilar.presence.monitor.subprocess.run")
def test_ping_failure(self, mock_run):
from vigilar.presence.monitor import ping_host
mock_run.return_value = type("R", (), {"returncode": 1})()
assert ping_host("192.168.1.50") is False
@patch("vigilar.presence.monitor.subprocess.run")
def test_ping_timeout(self, mock_run):
import subprocess as sp
from vigilar.presence.monitor import ping_host
mock_run.side_effect = sp.TimeoutExpired(cmd="ping", timeout=2)
assert ping_host("192.168.1.50") is False
class TestDepartureDelay:
def test_not_departed_within_delay(self):
from vigilar.presence.monitor import should_mark_away
last_seen = time.monotonic() - 300 # 5 min ago
assert should_mark_away(last_seen, departure_delay_m=10) is False
def test_departed_after_delay(self):
from vigilar.presence.monitor import should_mark_away
last_seen = time.monotonic() - 700 # 11+ min ago
assert should_mark_away(last_seen, departure_delay_m=10) is True
```
- [ ] **Step 3: Run tests to verify they fail**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v`
Expected: FAIL (modules don't exist yet)
- [ ] **Step 4: Implement presence models**
Create `vigilar/presence/__init__.py` (empty).
Create `vigilar/presence/models.py`:
```python
"""Presence detection data models."""
from dataclasses import dataclass
from vigilar.constants import HouseholdState
@dataclass
class MemberPresence:
name: str
role: str # "adult" | "child"
is_home: bool
last_seen: float # monotonic timestamp of last successful ping
def derive_household_state(members: list[MemberPresence]) -> HouseholdState:
if not members:
return HouseholdState.EMPTY
home = [m for m in members if m.is_home]
if not home:
return HouseholdState.EMPTY
all_home = len(home) == len(members)
adults_home = any(m.role == "adult" for m in home)
kids_home = any(m.role == "child" for m in home)
if all_home:
return HouseholdState.ALL_HOME
if adults_home:
return HouseholdState.ADULTS_HOME
if kids_home:
return HouseholdState.KIDS_HOME
return HouseholdState.EMPTY
```
- [ ] **Step 5: Implement presence monitor**
Create `vigilar/presence/monitor.py`:
```python
"""Presence monitor — pings family phones to determine who's home."""
import logging
import signal
import subprocess
import time
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import HouseholdState, Topics
from vigilar.presence.models import MemberPresence, derive_household_state
log = logging.getLogger(__name__)
def ping_host(ip: str, timeout_s: int = 2) -> bool:
try:
result = subprocess.run(
["ping", "-c", "1", "-W", str(timeout_s), ip],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=timeout_s + 1,
)
return result.returncode == 0
except subprocess.TimeoutExpired:
return False
except FileNotFoundError:
log.error("ping command not found")
return False
def should_mark_away(last_seen: float, departure_delay_m: int) -> bool:
elapsed_m = (time.monotonic() - last_seen) / 60
return elapsed_m >= departure_delay_m
class PresenceMonitor:
def __init__(self, config: VigilarConfig):
self._cfg = config.presence
self._mqtt_cfg = config.mqtt
self._members: list[MemberPresence] = []
self._last_household_state = HouseholdState.EMPTY
self._bus: MessageBus | None = None
for m in self._cfg.members:
self._members.append(MemberPresence(
name=m.name, role=m.role, is_home=False, last_seen=0,
))
def run(self) -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [presence] %(levelname)s: %(message)s",
)
self._bus = MessageBus(self._mqtt_cfg, client_id="presence-monitor")
self._bus.connect()
shutdown = False
def handle_signal(signum, frame):
nonlocal shutdown
shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
log.info("Presence monitor started, tracking %d members", len(self._members))
while not shutdown:
self._poll_once()
for _ in range(self._cfg.ping_interval_s):
if shutdown:
break
time.sleep(1)
if self._bus:
self._bus.disconnect()
log.info("Presence monitor stopped")
def _poll_once(self) -> None:
for member in self._members:
reachable = ping_host(member.ip)
if reachable:
member.is_home = True
member.last_seen = time.monotonic()
elif member.is_home:
if should_mark_away(member.last_seen, self._cfg.departure_delay_m):
member.is_home = False
log.info("%s departed", member.name)
if self._bus:
self._bus.publish_event(
Topics.presence_member(member.name),
state="HOME" if member.is_home else "AWAY",
name=member.name,
role=member.role,
)
new_state = derive_household_state(self._members)
if new_state != self._last_household_state:
log.info("Household state: %s -> %s", self._last_household_state, new_state)
self._last_household_state = new_state
if self._bus:
members_dict = {m.name: "HOME" if m.is_home else "AWAY" for m in self._members}
self._bus.publish_event(
Topics.PRESENCE_STATUS,
household=new_state.value,
members=members_dict,
)
def run_presence_monitor(config: VigilarConfig) -> None:
monitor = PresenceMonitor(config)
monitor.run()
```
- [ ] **Step 6: Wire into supervisor**
In `vigilar/main.py`, add after the UPS monitor block:
```python
# Presence monitor
if cfg.presence.enabled and cfg.presence.members:
from vigilar.presence.monitor import run_presence_monitor
subsystems.append(SubsystemProcess("presence-monitor", run_presence_monitor, (cfg,)))
```
- [ ] **Step 7: Run tests to verify they pass**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_presence.py -v`
Expected: All 9 tests PASS
- [ ] **Step 8: Run full suite**
Run: `source .venv/bin/activate && python -m pytest tests/ -v`
Expected: All tests PASS (96 existing + 9 new = 105)
- [ ] **Step 9: Commit**
```bash
git add vigilar/presence/ vigilar/constants.py vigilar/config.py vigilar/main.py tests/unit/test_presence.py
git commit -m "feat: add multi-person presence detection
Track family phones via ping, derive household state
(EMPTY/KIDS_HOME/ADULTS_HOME/ALL_HOME), publish via MQTT.
Configurable departure delay, per-member roles, auto-arm actions."
```
---
## Task 2: Person + Vehicle Detection
**Files:**
- Create: `vigilar/detection/__init__.py`
- Create: `vigilar/detection/person.py`
- Create: `vigilar/detection/vehicle.py`
- Create: `vigilar/detection/zones.py`
- Create: `vigilar/cli/cmd_calibrate.py`
- Create: `scripts/download_model.sh`
- Modify: `vigilar/constants.py` (add detection event types)
- Modify: `vigilar/config.py` (add DetectionConfig, CameraZone, VehicleConfig)
- Modify: `vigilar/storage/schema.py` (add detection_type, starred columns to recordings)
- Modify: `vigilar/camera/worker.py` (add detection stage after motion)
- Modify: `vigilar/cli/main.py` (register calibrate command)
- Test: `tests/unit/test_detection.py`
- [ ] **Step 1: Add constants and config**
In `vigilar/constants.py`, add to `EventType`:
```python
PERSON_DETECTED = "PERSON_DETECTED"
VEHICLE_DETECTED = "VEHICLE_DETECTED"
KNOWN_VEHICLE_ARRIVED = "KNOWN_VEHICLE_ARRIVED"
UNKNOWN_VEHICLE_DETECTED = "UNKNOWN_VEHICLE_DETECTED"
```
Add to `RecordingTrigger`:
```python
PERSON = "PERSON"
VEHICLE = "VEHICLE"
```
In `vigilar/config.py`, add before `# --- Presence Config ---`:
```python
# --- Detection Config ---
class CameraZone(BaseModel):
name: str
region: list[int] = Field(default_factory=list) # [x, y, w, h]
watch_for: list[str] = Field(default_factory=lambda: ["person", "vehicle"])
alert_unknown_vehicles: bool = False
class DetectionConfig(BaseModel):
person_detection: bool = False
model_path: str = "/var/vigilar/models/mobilenet_ssd_v2.pb"
model_config: str = "/var/vigilar/models/mobilenet_ssd_v2.pbtxt"
confidence_threshold: float = 0.5
cameras: list[str] = Field(default_factory=list) # empty = all
class KnownVehicle(BaseModel):
name: str
color_profile: str = "" # white, black, silver, red, blue, etc.
size_class: str = "" # compact, midsize, large
calibration_file: str = ""
class VehicleConfig(BaseModel):
known: list[KnownVehicle] = Field(default_factory=list)
```
Add `zones: list[CameraZone] = Field(default_factory=list)` to `CameraConfig`.
Add to `VigilarConfig`:
```python
detection: DetectionConfig = Field(default_factory=DetectionConfig)
vehicles: VehicleConfig = Field(default_factory=VehicleConfig)
```
- [ ] **Step 2: Update schema**
In `vigilar/storage/schema.py`, add two columns to the `recordings` table:
```python
Column("detection_type", String), # person, vehicle, motion, None
Column("starred", Integer, nullable=False, default=0),
```
Add after the `Column("thumbnail_path", ...)` line and before the closing `)`.
- [ ] **Step 3: Write failing tests**
Create `tests/unit/test_detection.py`:
```python
"""Tests for person and vehicle detection."""
import numpy as np
from vigilar.detection.person import Detection, PersonDetector
from vigilar.detection.vehicle import classify_dominant_color, classify_size
from vigilar.detection.zones import filter_detections_by_zone
class TestPersonDetector:
def test_detector_initializes_without_model(self):
detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt")
assert not detector.is_loaded
def test_detect_returns_empty_when_not_loaded(self):
detector = PersonDetector(model_path="nonexistent.pb", config_path="nonexistent.pbtxt")
frame = np.zeros((480, 640, 3), dtype=np.uint8)
detections = detector.detect(frame)
assert detections == []
def test_detection_dataclass(self):
d = Detection(class_name="person", class_id=1, confidence=0.85, bbox=(10, 20, 100, 200))
assert d.class_name == "person"
assert d.confidence == 0.85
class TestVehicleColor:
def test_white_detection(self):
white_region = np.full((100, 100, 3), 240, dtype=np.uint8)
color = classify_dominant_color(white_region)
assert color == "white"
def test_black_detection(self):
black_region = np.full((100, 100, 3), 15, dtype=np.uint8)
color = classify_dominant_color(black_region)
assert color == "black"
def test_size_compact(self):
assert classify_size(bbox_area=5000, zone_area=100000) == "compact"
def test_size_midsize(self):
assert classify_size(bbox_area=15000, zone_area=100000) == "midsize"
def test_size_large(self):
assert classify_size(bbox_area=30000, zone_area=100000) == "large"
class TestZoneFiltering:
def test_detection_inside_zone(self):
detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))]
zone_region = (0, 0, 200, 200)
filtered = filter_detections_by_zone(detections, zone_region, ["person"])
assert len(filtered) == 1
def test_detection_outside_zone(self):
detections = [Detection("person", 1, 0.9, (300, 300, 50, 50))]
zone_region = (0, 0, 200, 200)
filtered = filter_detections_by_zone(detections, zone_region, ["person"])
assert len(filtered) == 0
def test_filter_by_class(self):
detections = [
Detection("person", 1, 0.9, (50, 50, 80, 80)),
Detection("car", 3, 0.8, (50, 50, 80, 80)),
]
zone_region = (0, 0, 200, 200)
filtered = filter_detections_by_zone(detections, zone_region, ["person"])
assert len(filtered) == 1
assert filtered[0].class_name == "person"
def test_no_zone_returns_all(self):
detections = [Detection("person", 1, 0.9, (50, 50, 80, 80))]
filtered = filter_detections_by_zone(detections, None, ["person", "car"])
assert len(filtered) == 1
```
- [ ] **Step 4: Run tests to verify they fail**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_detection.py -v`
Expected: FAIL
- [ ] **Step 5: Implement person detector**
Create `vigilar/detection/__init__.py` (empty).
Create `vigilar/detection/person.py`:
```python
"""Person detection using MobileNet-SSD v2 via OpenCV DNN."""
import logging
from dataclasses import dataclass
from pathlib import Path
import cv2
import numpy as np
log = logging.getLogger(__name__)
COCO_CLASSES = {
1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane",
6: "bus", 7: "train", 8: "truck", 9: "boat",
}
DETECT_CLASSES = {1, 3, 8} # person, car, truck
@dataclass
class Detection:
class_name: str
class_id: int
confidence: float
bbox: tuple[int, int, int, int] # x, y, w, h
class PersonDetector:
def __init__(self, model_path: str, config_path: str, confidence_threshold: float = 0.5):
self._threshold = confidence_threshold
self._net = None
self.is_loaded = False
if Path(model_path).exists() and Path(config_path).exists():
try:
self._net = cv2.dnn.readNetFromTensorflow(model_path, config_path)
self.is_loaded = True
log.info("Person detection model loaded from %s", model_path)
except cv2.error as e:
log.error("Failed to load detection model: %s", e)
else:
log.warning("Detection model not found at %s — detection disabled", model_path)
def detect(self, frame: np.ndarray) -> list[Detection]:
if not self.is_loaded or self._net is None:
return []
h, w = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, size=(300, 300), swapRB=True, crop=False)
self._net.setInput(blob)
output = self._net.forward()
detections = []
for i in range(output.shape[2]):
confidence = output[0, 0, i, 2]
if confidence < self._threshold:
continue
class_id = int(output[0, 0, i, 1])
if class_id not in DETECT_CLASSES:
continue
x1 = int(output[0, 0, i, 3] * w)
y1 = int(output[0, 0, i, 4] * h)
x2 = int(output[0, 0, i, 5] * w)
y2 = int(output[0, 0, i, 6] * h)
bw, bh = x2 - x1, y2 - y1
if bw <= 0 or bh <= 0:
continue
detections.append(Detection(
class_name=COCO_CLASSES.get(class_id, f"class_{class_id}"),
class_id=class_id,
confidence=float(confidence),
bbox=(x1, y1, bw, bh),
))
return detections
```
- [ ] **Step 6: Implement vehicle fingerprinting**
Create `vigilar/detection/vehicle.py`:
```python
"""Vehicle color and size fingerprinting."""
import logging
from dataclasses import dataclass
from pathlib import Path
import cv2
import numpy as np
log = logging.getLogger(__name__)
@dataclass
class VehicleProfile:
name: str
color_profile: str
size_class: str
histogram: np.ndarray | None = None
def classify_dominant_color(region: np.ndarray) -> str:
hsv = cv2.cvtColor(region, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
mean_s = float(np.mean(s))
mean_v = float(np.mean(v))
if mean_v < 40:
return "black"
if mean_s < 30 and mean_v > 180:
return "white"
if mean_s < 40:
return "silver"
mean_h = float(np.mean(h))
if mean_h < 10 or mean_h > 170:
return "red"
if 10 <= mean_h < 25:
return "orange"
if 25 <= mean_h < 35:
return "yellow"
if 35 <= mean_h < 85:
return "green"
if 85 <= mean_h < 130:
return "blue"
return "unknown"
def classify_size(bbox_area: int, zone_area: int) -> str:
if zone_area <= 0:
return "unknown"
ratio = bbox_area / zone_area
if ratio < 0.1:
return "compact"
if ratio < 0.25:
return "midsize"
return "large"
class VehicleFingerprint:
def __init__(self, known_vehicles: list[VehicleProfile] | None = None):
self._known = known_vehicles or []
def match(self, region: np.ndarray, bbox_area: int, zone_area: int) -> VehicleProfile | None:
color = classify_dominant_color(region)
size = classify_size(bbox_area, zone_area)
for profile in self._known:
if profile.color_profile == color and profile.size_class == size:
return profile
return None
def add_profile(self, profile: VehicleProfile) -> None:
self._known.append(profile)
@property
def known_count(self) -> int:
return len(self._known)
```
- [ ] **Step 7: Implement zone filtering**
Create `vigilar/detection/zones.py`:
```python
"""Zone-based detection filtering."""
from vigilar.detection.person import Detection
def _bbox_center(bbox: tuple[int, int, int, int]) -> tuple[int, int]:
x, y, w, h = bbox
return x + w // 2, y + h // 2
def _point_in_rect(point: tuple[int, int], rect: tuple[int, int, int, int]) -> bool:
px, py = point
rx, ry, rw, rh = rect
return rx <= px <= rx + rw and ry <= py <= ry + rh
def filter_detections_by_zone(
detections: list[Detection],
zone_region: tuple[int, int, int, int] | None,
watch_for: list[str],
) -> list[Detection]:
result = []
for d in detections:
if d.class_name not in watch_for:
continue
if zone_region is not None:
center = _bbox_center(d.bbox)
if not _point_in_rect(center, zone_region):
continue
result.append(d)
return result
```
- [ ] **Step 8: Create model download script**
Create `scripts/download_model.sh`:
```bash
#!/usr/bin/env bash
set -euo pipefail
MODEL_DIR="${1:-/var/vigilar/models}"
mkdir -p "$MODEL_DIR"
PB_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pb"
PBTXT_URL="https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pbtxt"
echo "Downloading MobileNet-SSD v2 model..."
if command -v curl &>/dev/null; then
curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL"
curl -fSL -o "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL"
elif command -v wget &>/dev/null; then
wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pb" "$PB_URL"
wget -q -O "$MODEL_DIR/mobilenet_ssd_v2.pbtxt" "$PBTXT_URL"
else
echo "ERROR: curl or wget required"
exit 1
fi
echo "Model downloaded to $MODEL_DIR"
ls -lh "$MODEL_DIR"/mobilenet_ssd_v2.*
```
Run: `chmod +x scripts/download_model.sh`
- [ ] **Step 9: Wire detection into camera worker**
In `vigilar/camera/worker.py`, add import at the top:
```python
from vigilar.detection.person import PersonDetector
from vigilar.detection.vehicle import VehicleFingerprint, VehicleProfile, classify_dominant_color, classify_size
from vigilar.detection.zones import filter_detections_by_zone
from vigilar.config import CameraConfig, DetectionConfig, MQTTConfig, RemoteConfig, VehicleConfig
```
After the `hls` and `remote_hls` initialization, add:
```python
# Person/vehicle detector (second-stage after MOG2)
person_detector: PersonDetector | None = None
vehicle_fp: VehicleFingerprint | None = None
det_cfg = None # will be passed as new parameter
if det_cfg and det_cfg.person_detection:
if not det_cfg.cameras or camera_id in det_cfg.cameras:
person_detector = PersonDetector(
det_cfg.model_path, det_cfg.model_config, det_cfg.confidence_threshold,
)
if veh_cfg and veh_cfg.known:
profiles = [VehicleProfile(v.name, v.color_profile, v.size_class) for v in veh_cfg.known]
vehicle_fp = VehicleFingerprint(profiles)
```
In the motion detection block, after `bus.publish_event(Topics.camera_motion_start(...))`, add the detection second-stage:
```python
# Second-stage: person/vehicle detection
detection_type = "motion"
if person_detector and person_detector.is_loaded:
detections = person_detector.detect(frame)
persons = [d for d in detections if d.class_name == "person"]
vehicles = [d for d in detections if d.class_name in ("car", "truck")]
if persons:
detection_type = "person"
bus.publish_event(
f"vigilar/camera/{camera_id}/person",
count=len(persons),
confidence=max(d.confidence for d in persons),
)
elif vehicles:
detection_type = "vehicle"
# Vehicle fingerprinting
if vehicle_fp:
for vd in vehicles:
x, y, w, h = vd.bbox
crop = frame[max(0,y):y+h, max(0,x):x+w]
if crop.size > 0:
match = vehicle_fp.match(crop, w*h, frame.shape[0]*frame.shape[1])
if match:
bus.publish_event(
f"vigilar/camera/{camera_id}/vehicle/known",
vehicle=match.name,
)
else:
bus.publish_event(
f"vigilar/camera/{camera_id}/vehicle/unknown",
color=classify_dominant_color(crop),
)
else:
bus.publish_event(
f"vigilar/camera/{camera_id}/vehicle",
count=len(vehicles),
)
```
Update the `run_camera_worker` function signature to accept `det_cfg` and `veh_cfg` parameters. Update `camera/manager.py` to pass them.
- [ ] **Step 10: Run tests**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_detection.py tests/ -v`
Expected: All PASS
- [ ] **Step 11: Commit**
```bash
git add vigilar/detection/ vigilar/constants.py vigilar/config.py vigilar/storage/schema.py vigilar/camera/worker.py vigilar/camera/manager.py scripts/download_model.sh tests/unit/test_detection.py
git commit -m "feat: add person/vehicle detection with zone fencing
MobileNet-SSD v2 via OpenCV DNN as second-stage filter after MOG2.
Vehicle color/size fingerprinting for known car matching.
Zone-based filtering per camera. Model download script."
```
---
## Task 3: Health Monitoring + Self-Healing
**Files:**
- Create: `vigilar/health/__init__.py`
- Create: `vigilar/health/monitor.py`
- Create: `vigilar/health/pruner.py`
- Create: `vigilar/health/digest.py`
- Modify: `vigilar/config.py` (add HealthConfig)
- Modify: `vigilar/main.py` (start HealthMonitor)
- Test: `tests/unit/test_health.py`
- [ ] **Step 1: Add config**
In `vigilar/config.py`, add before `# --- Rule Config ---`:
```python
# --- Health Config ---
class HealthConfig(BaseModel):
enabled: bool = True
disk_warn_pct: int = 85
disk_critical_pct: int = 95
auto_prune: bool = True
auto_prune_target_pct: int = 80
daily_digest: bool = True
daily_digest_time: str = "08:00"
```
Add `health: HealthConfig = Field(default_factory=HealthConfig)` to `VigilarConfig`.
- [ ] **Step 2: Write failing tests**
Create `tests/unit/test_health.py`:
```python
"""Tests for health monitoring."""
import shutil
import tempfile
from pathlib import Path
from unittest.mock import patch
from vigilar.health.pruner import find_prunable_recordings, calculate_disk_usage_pct
from vigilar.health.monitor import HealthCheck, HealthStatus, check_disk, check_mqtt_port
class TestDiskCheck:
@patch("vigilar.health.monitor.shutil.disk_usage")
def test_healthy(self, mock_usage):
mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 50_000_000_000, "free": 50_000_000_000})()
result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95)
assert result.status == HealthStatus.HEALTHY
@patch("vigilar.health.monitor.shutil.disk_usage")
def test_warning(self, mock_usage):
mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 90_000_000_000, "free": 10_000_000_000})()
result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95)
assert result.status == HealthStatus.WARNING
@patch("vigilar.health.monitor.shutil.disk_usage")
def test_critical(self, mock_usage):
mock_usage.return_value = type("U", (), {"total": 100_000_000_000, "used": 97_000_000_000, "free": 3_000_000_000})()
result = check_disk("/var/vigilar", warn_pct=85, critical_pct=95)
assert result.status == HealthStatus.CRITICAL
class TestPruner:
def test_calculate_disk_pct(self):
with tempfile.TemporaryDirectory() as d:
pct = calculate_disk_usage_pct(d)
assert 0 <= pct <= 100
def test_find_prunable_empty(self, test_db):
result = find_prunable_recordings(test_db, limit=10)
assert result == []
class TestMQTTCheck:
@patch("vigilar.health.monitor.socket.create_connection")
def test_mqtt_reachable(self, mock_conn):
mock_conn.return_value.__enter__ = lambda s: s
mock_conn.return_value.__exit__ = lambda s, *a: None
result = check_mqtt_port("127.0.0.1", 1883)
assert result.status == HealthStatus.HEALTHY
@patch("vigilar.health.monitor.socket.create_connection")
def test_mqtt_unreachable(self, mock_conn):
mock_conn.side_effect = ConnectionRefusedError()
result = check_mqtt_port("127.0.0.1", 1883)
assert result.status == HealthStatus.CRITICAL
```
- [ ] **Step 3: Run tests to verify they fail**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_health.py -v`
Expected: FAIL
- [ ] **Step 4: Implement health monitor**
Create `vigilar/health/__init__.py` (empty).
Create `vigilar/health/monitor.py`:
```python
"""Health monitoring — periodic subsystem checks."""
import logging
import shutil
import signal
import socket
import time
from dataclasses import dataclass, field
from enum import StrEnum
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
log = logging.getLogger(__name__)
class HealthStatus(StrEnum):
HEALTHY = "healthy"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class HealthCheck:
name: str
status: HealthStatus
message: str = ""
value: float = 0
def check_disk(path: str, warn_pct: int, critical_pct: int) -> HealthCheck:
try:
usage = shutil.disk_usage(path)
pct = usage.used / usage.total * 100
if pct >= critical_pct:
return HealthCheck("disk", HealthStatus.CRITICAL, f"{pct:.0f}% used", pct)
if pct >= warn_pct:
return HealthCheck("disk", HealthStatus.WARNING, f"{pct:.0f}% used", pct)
return HealthCheck("disk", HealthStatus.HEALTHY, f"{pct:.0f}% used", pct)
except OSError as e:
return HealthCheck("disk", HealthStatus.CRITICAL, str(e))
def check_mqtt_port(host: str, port: int) -> HealthCheck:
try:
with socket.create_connection((host, port), timeout=5):
return HealthCheck("mqtt", HealthStatus.HEALTHY, "reachable")
except (ConnectionRefusedError, TimeoutError, OSError) as e:
return HealthCheck("mqtt", HealthStatus.CRITICAL, str(e))
class HealthMonitor:
def __init__(self, config: VigilarConfig):
self._cfg = config
self._bus: MessageBus | None = None
self._checks: list[HealthCheck] = []
def run(self) -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [health] %(levelname)s: %(message)s",
)
self._bus = MessageBus(self._cfg.mqtt, client_id="health-monitor")
self._bus.connect()
shutdown = False
def handle_signal(signum, frame):
nonlocal shutdown
shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
log.info("Health monitor started")
last_disk_check = 0
last_mqtt_check = 0
while not shutdown:
now = time.monotonic()
if now - last_disk_check >= 300: # 5 min
disk = check_disk(self._cfg.system.data_dir, self._cfg.health.disk_warn_pct, self._cfg.health.disk_critical_pct)
self._update_check(disk)
last_disk_check = now
if disk.status == HealthStatus.CRITICAL and self._cfg.health.auto_prune:
from vigilar.health.pruner import auto_prune
auto_prune(self._cfg)
if now - last_mqtt_check >= 30:
mqtt = check_mqtt_port(self._cfg.mqtt.host, self._cfg.mqtt.port)
self._update_check(mqtt)
last_mqtt_check = now
self._publish_status()
time.sleep(10)
if self._bus:
self._bus.disconnect()
def _update_check(self, check: HealthCheck) -> None:
for i, c in enumerate(self._checks):
if c.name == check.name:
self._checks[i] = check
return
self._checks.append(check)
def _publish_status(self) -> None:
if not self._bus:
return
overall = HealthStatus.HEALTHY
for c in self._checks:
if c.status == HealthStatus.CRITICAL:
overall = HealthStatus.CRITICAL
break
if c.status == HealthStatus.WARNING:
overall = HealthStatus.WARNING
self._bus.publish_event(
"vigilar/system/health",
status=overall.value,
checks={c.name: {"status": c.status.value, "message": c.message} for c in self._checks},
)
def run_health_monitor(config: VigilarConfig) -> None:
monitor = HealthMonitor(config)
monitor.run()
```
- [ ] **Step 5: Implement auto-pruner**
Create `vigilar/health/pruner.py`:
```python
"""Auto-prune old recordings when disk usage is high."""
import logging
import os
import shutil
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.engine import Engine
from vigilar.config import VigilarConfig
from vigilar.storage.schema import recordings
log = logging.getLogger(__name__)
def calculate_disk_usage_pct(path: str) -> float:
usage = shutil.disk_usage(path)
return usage.used / usage.total * 100
def find_prunable_recordings(engine: Engine, limit: int = 50) -> list[dict]:
query = (
select(recordings)
.where(recordings.c.starred == 0)
.order_by(recordings.c.started_at.asc())
.limit(limit)
)
with engine.connect() as conn:
return [dict(r) for r in conn.execute(query).mappings().all()]
def auto_prune(config: VigilarConfig) -> int:
from vigilar.storage.db import get_db_path, get_engine
target = config.health.auto_prune_target_pct
data_dir = config.system.data_dir
current_pct = calculate_disk_usage_pct(data_dir)
if current_pct <= target:
return 0
engine = get_engine(get_db_path(data_dir))
pruned = 0
total_bytes = 0
while current_pct > target:
candidates = find_prunable_recordings(engine, limit=20)
if not candidates:
break
for rec in candidates:
file_path = rec.get("file_path", "")
if file_path and Path(file_path).exists():
size = Path(file_path).stat().st_size
Path(file_path).unlink()
total_bytes += size
thumb = rec.get("thumbnail_path", "")
if thumb and Path(thumb).exists():
Path(thumb).unlink()
with engine.begin() as conn:
conn.execute(recordings.delete().where(recordings.c.id == rec["id"]))
pruned += 1
current_pct = calculate_disk_usage_pct(data_dir)
if pruned:
mb = total_bytes / (1024 * 1024)
log.info("Auto-pruned %d recordings (%.1f MB), disk now at %.0f%%", pruned, mb, current_pct)
return pruned
```
- [ ] **Step 6: Implement daily digest**
Create `vigilar/health/digest.py`:
```python
"""Daily digest notification builder."""
import logging
import shutil
import time
from sqlalchemy import func, select
from sqlalchemy.engine import Engine
from vigilar.constants import EventType
from vigilar.storage.schema import events, recordings
log = logging.getLogger(__name__)
def build_digest(engine: Engine, data_dir: str, since_hours: int = 12) -> dict:
since_ts = int((time.time() - since_hours * 3600) * 1000)
with engine.connect() as conn:
person_count = conn.execute(
select(func.count()).select_from(events)
.where(events.c.type == EventType.PERSON_DETECTED, events.c.ts >= since_ts)
).scalar() or 0
vehicle_count = conn.execute(
select(func.count()).select_from(events)
.where(events.c.type == EventType.UNKNOWN_VEHICLE_DETECTED, events.c.ts >= since_ts)
).scalar() or 0
recording_count = conn.execute(
select(func.count()).select_from(recordings)
.where(recordings.c.started_at >= since_ts // 1000)
).scalar() or 0
usage = shutil.disk_usage(data_dir)
disk_pct = usage.used / usage.total * 100
disk_gb = usage.used / (1024**3)
return {
"person_detections": person_count,
"unknown_vehicles": vehicle_count,
"recordings": recording_count,
"disk_used_gb": round(disk_gb, 1),
"disk_used_pct": round(disk_pct, 0),
"since_hours": since_hours,
}
def format_digest(data: dict) -> str:
return (
f"Vigilar Daily Summary\n"
f"Last {data['since_hours']}h: "
f"{data['person_detections']} person detections, "
f"{data['unknown_vehicles']} unknown vehicles, "
f"{data['recordings']} recordings\n"
f"Storage: {data['disk_used_gb']} GB ({data['disk_used_pct']:.0f}%)"
)
```
- [ ] **Step 7: Wire into supervisor**
In `vigilar/main.py`, add after the presence monitor block:
```python
# Health monitor
if cfg.health.enabled:
from vigilar.health.monitor import run_health_monitor
subsystems.append(SubsystemProcess("health-monitor", run_health_monitor, (cfg,)))
```
- [ ] **Step 8: Run tests**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_health.py tests/ -v`
Expected: All PASS
- [ ] **Step 9: Commit**
```bash
git add vigilar/health/ vigilar/config.py vigilar/main.py tests/unit/test_health.py
git commit -m "feat: add health monitoring with auto-prune and daily digest
Periodic disk, MQTT, and subsystem health checks. Auto-prune
oldest non-starred recordings when disk exceeds threshold.
Daily digest notification with overnight summary."
```
---
## Task 4: Smart Alert Profiles
**Files:**
- Create: `vigilar/alerts/profiles.py`
- Modify: `vigilar/config.py` (add AlertProfileConfig, AlertProfileRule)
- Modify: `vigilar/alerts/dispatcher.py` (role-based routing)
- Modify: `vigilar/web/templates/settings.html` (profile editor)
- Modify: `vigilar/web/static/js/settings.js` (profile save/load)
- Modify: `vigilar/web/blueprints/system.py` (profile API endpoints)
- Test: `tests/unit/test_profiles.py`
**Depends on:** Task 1 (HouseholdState) and Task 2 (detection event types)
- [ ] **Step 1: Add config models**
In `vigilar/config.py`, add before `# --- Health Config ---`:
```python
# --- Alert Profile Config ---
class AlertProfileRule(BaseModel):
detection_type: str # person, unknown_vehicle, known_vehicle, motion
camera_location: str = "any" # any | exterior | common_area | specific camera id
action: str = "record_only" # push_and_record, push_adults, record_only, quiet_log
recipients: str = "all" # all, adults, none
class AlertProfileConfig(BaseModel):
name: str
enabled: bool = True
presence_states: list[str] = Field(default_factory=list)
time_window: str = "" # "" = all day, "23:00-06:00" = sleep hours
rules: list[AlertProfileRule] = Field(default_factory=list)
```
Add `profiles: list[AlertProfileConfig] = Field(default_factory=list)` to `AlertsConfig`.
Add `sleep_start: str = "23:00"` and `sleep_end: str = "06:00"` to `AlertsConfig`.
- [ ] **Step 2: Write failing tests**
Create `tests/unit/test_profiles.py`:
```python
"""Tests for smart alert profiles."""
from vigilar.alerts.profiles import match_profile, is_in_time_window, get_action_for_event
from vigilar.config import AlertProfileConfig, AlertProfileRule
from vigilar.constants import HouseholdState
def _make_profile(name, states, time_window="", rules=None):
return AlertProfileConfig(
name=name,
presence_states=states,
time_window=time_window,
rules=rules or [],
)
class TestTimeWindow:
def test_no_window_always_matches(self):
assert is_in_time_window("", "14:30") is True
def test_inside_window(self):
assert is_in_time_window("23:00-06:00", "02:30") is True
def test_outside_window(self):
assert is_in_time_window("23:00-06:00", "14:30") is False
def test_at_start(self):
assert is_in_time_window("23:00-06:00", "23:00") is True
def test_daytime_window(self):
assert is_in_time_window("06:00-23:00", "14:30") is True
class TestProfileMatching:
def test_match_by_presence(self):
profiles = [
_make_profile("Away", ["EMPTY"]),
_make_profile("Home", ["ADULTS_HOME", "ALL_HOME"]),
]
result = match_profile(profiles, HouseholdState.EMPTY, "14:00")
assert result is not None
assert result.name == "Away"
def test_no_match(self):
profiles = [_make_profile("Away", ["EMPTY"])]
result = match_profile(profiles, HouseholdState.ALL_HOME, "14:00")
assert result is None
def test_time_window_narrows(self):
profiles = [
_make_profile("Night", ["ALL_HOME"], "23:00-06:00"),
_make_profile("Day", ["ALL_HOME"], "06:00-23:00"),
]
result = match_profile(profiles, HouseholdState.ALL_HOME, "02:00")
assert result is not None
assert result.name == "Night"
class TestActionForEvent:
def test_person_gets_push(self):
rules = [AlertProfileRule(detection_type="person", action="push_and_record", recipients="all")]
profile = _make_profile("Away", ["EMPTY"], rules=rules)
action, recipients = get_action_for_event(profile, "person", "front_door")
assert action == "push_and_record"
assert recipients == "all"
def test_motion_gets_record_only(self):
rules = [AlertProfileRule(detection_type="motion", action="record_only", recipients="none")]
profile = _make_profile("Home", ["ALL_HOME"], rules=rules)
action, recipients = get_action_for_event(profile, "motion", "front_door")
assert action == "record_only"
def test_no_matching_rule_defaults_quiet(self):
profile = _make_profile("Home", ["ALL_HOME"], rules=[])
action, recipients = get_action_for_event(profile, "person", "front_door")
assert action == "quiet_log"
assert recipients == "none"
```
- [ ] **Step 3: Run tests to verify fail**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v`
Expected: FAIL
- [ ] **Step 4: Implement profile engine**
Create `vigilar/alerts/profiles.py`:
```python
"""Smart alert profile matching engine."""
import logging
from datetime import datetime
from vigilar.config import AlertProfileConfig, AlertProfileRule
from vigilar.constants import HouseholdState
log = logging.getLogger(__name__)
def is_in_time_window(window: str, current_time: str) -> bool:
if not window:
return True
parts = window.split("-")
if len(parts) != 2:
return True
start, end = parts[0].strip(), parts[1].strip()
t = current_time
if start <= end:
return start <= t < end
else:
return t >= start or t < end
def match_profile(
profiles: list[AlertProfileConfig],
household_state: HouseholdState,
current_time: str,
) -> AlertProfileConfig | None:
for profile in profiles:
if not profile.enabled:
continue
if household_state.value not in profile.presence_states:
continue
if not is_in_time_window(profile.time_window, current_time):
continue
return profile
return None
def get_action_for_event(
profile: AlertProfileConfig,
detection_type: str,
camera_id: str,
) -> tuple[str, str]:
for rule in profile.rules:
if rule.detection_type != detection_type:
continue
if rule.camera_location not in ("any", camera_id):
continue
return rule.action, rule.recipients
return "quiet_log", "none"
```
- [ ] **Step 5: Run tests**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_profiles.py -v`
Expected: All 10 tests PASS
- [ ] **Step 6: Run full suite**
Run: `source .venv/bin/activate && python -m pytest tests/ -v`
Expected: All PASS
- [ ] **Step 7: Commit**
```bash
git add vigilar/alerts/profiles.py vigilar/config.py tests/unit/test_profiles.py
git commit -m "feat: add smart alert profiles with presence/time matching
Profiles activate based on household state + time of day.
Per-detection-type rules control push/record/quiet behavior.
Role-based recipient filtering (all vs adults-only)."
```
---
## Task 5: Recording Timeline
**Files:**
- Create: `vigilar/web/static/js/timeline.js`
- Modify: `vigilar/web/blueprints/recordings.py` (add timeline API)
- Modify: `vigilar/web/templates/recordings.html` (timeline UI)
- Modify: `vigilar/storage/queries.py` (add get_timeline_data)
- Test: `tests/unit/test_timeline.py`
**Depends on:** Task 2 (detection_type column in recordings)
- [ ] **Step 1: Add timeline query**
In `vigilar/storage/queries.py`, add:
```python
def get_timeline_data(
engine: Engine,
camera_id: str,
day_start_ts: int,
day_end_ts: int,
) -> list[dict[str, Any]]:
query = (
select(
recordings.c.id,
recordings.c.started_at,
recordings.c.ended_at,
recordings.c.detection_type,
recordings.c.starred,
)
.where(
recordings.c.camera_id == camera_id,
recordings.c.started_at >= day_start_ts,
recordings.c.started_at < day_end_ts,
)
.order_by(recordings.c.started_at.asc())
)
with engine.connect() as conn:
return [dict(r) for r in conn.execute(query).mappings().all()]
```
- [ ] **Step 2: Write failing test**
Create `tests/unit/test_timeline.py`:
```python
"""Tests for recording timeline."""
import time
from vigilar.storage.queries import get_timeline_data, insert_recording
class TestTimelineQuery:
def test_returns_recordings_for_day(self, test_db):
now = int(time.time())
insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60,
file_path="/tmp/a.mp4", trigger="MOTION", detection_type="person")
insert_recording(test_db, camera_id="cam1", started_at=now+100, ended_at=now+130,
file_path="/tmp/b.mp4", trigger="MOTION", detection_type="motion")
results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600)
assert len(results) == 2
assert results[0]["detection_type"] == "person"
assert results[1]["detection_type"] == "motion"
def test_filters_by_camera(self, test_db):
now = int(time.time())
insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60,
file_path="/tmp/a.mp4", trigger="MOTION")
insert_recording(test_db, camera_id="cam2", started_at=now, ended_at=now+60,
file_path="/tmp/b.mp4", trigger="MOTION")
results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600)
assert len(results) == 1
def test_filters_by_time_range(self, test_db):
now = int(time.time())
insert_recording(test_db, camera_id="cam1", started_at=now - 7200, ended_at=now - 7100,
file_path="/tmp/old.mp4", trigger="MOTION")
insert_recording(test_db, camera_id="cam1", started_at=now, ended_at=now+60,
file_path="/tmp/new.mp4", trigger="MOTION")
results = get_timeline_data(test_db, "cam1", now - 3600, now + 3600)
assert len(results) == 1
```
- [ ] **Step 3: Run test to verify fail**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py -v`
Expected: FAIL
- [ ] **Step 4: Add timeline API endpoint**
In `vigilar/web/blueprints/recordings.py`, add:
```python
import time as time_mod
from datetime import datetime, timedelta
@recordings_bp.route("/api/timeline")
def timeline_api():
"""JSON API: timeline data for a camera on a given day."""
camera_id = request.args.get("camera")
date_str = request.args.get("date") # YYYY-MM-DD
if not camera_id:
return jsonify({"error": "camera required"}), 400
if date_str:
day = datetime.strptime(date_str, "%Y-%m-%d")
else:
day = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
day_start = int(day.timestamp())
day_end = int((day + timedelta(days=1)).timestamp())
engine = current_app.config.get("DB_ENGINE")
if engine is None:
return jsonify([])
from vigilar.storage.queries import get_timeline_data
data = get_timeline_data(engine, camera_id, day_start, day_end)
return jsonify([
{
"id": r["id"],
"start": r["started_at"],
"end": r["ended_at"],
"type": r.get("detection_type", "motion"),
"starred": bool(r.get("starred", 0)),
}
for r in data
])
```
- [ ] **Step 5: Create timeline.js**
Create `vigilar/web/static/js/timeline.js`:
```javascript
/* Vigilar — Recording timeline renderer */
(function() {
'use strict';
const COLORS = {
person: '#dc3545',
vehicle: '#0d6efd',
motion: '#6c757d',
default: '#6c757d',
};
class Timeline {
constructor(canvas, options = {}) {
this.canvas = canvas;
this.ctx = canvas.getContext('2d');
this.segments = [];
this.cameraId = options.cameraId || '';
this.date = options.date || new Date().toISOString().split('T')[0];
this.onClick = options.onClick || null;
this.dayStartTs = 0;
this.dayEndTs = 0;
this._resize();
this._bindEvents();
}
_resize() {
const rect = this.canvas.parentElement.getBoundingClientRect();
this.canvas.width = rect.width;
this.canvas.height = this.canvas.dataset.height ? parseInt(this.canvas.dataset.height) : 48;
}
_bindEvents() {
this.canvas.addEventListener('click', (e) => this._handleClick(e));
window.addEventListener('resize', () => { this._resize(); this.render(); });
}
async load() {
const resp = await fetch(`/recordings/api/timeline?camera=${this.cameraId}&date=${this.date}`);
if (!resp.ok) return;
this.segments = await resp.json();
const d = new Date(this.date + 'T00:00:00');
this.dayStartTs = d.getTime() / 1000;
this.dayEndTs = this.dayStartTs + 86400;
this.render();
}
render() {
const w = this.canvas.width;
const h = this.canvas.height;
const ctx = this.ctx;
const dur = this.dayEndTs - this.dayStartTs;
ctx.clearRect(0, 0, w, h);
ctx.fillStyle = '#161b22';
ctx.fillRect(0, 0, w, h);
for (const seg of this.segments) {
const x1 = ((seg.start - this.dayStartTs) / dur) * w;
const x2 = seg.end ? ((seg.end - this.dayStartTs) / dur) * w : x1 + 2;
const segW = Math.max(x2 - x1, 2);
ctx.fillStyle = COLORS[seg.type] || COLORS.default;
ctx.fillRect(x1, 4, segW, h - 8);
}
// Hour markers
ctx.fillStyle = '#30363d';
ctx.font = '10px sans-serif';
for (let hour = 0; hour <= 24; hour += 6) {
const x = (hour / 24) * w;
ctx.fillRect(x, 0, 1, h);
if (hour < 24) {
ctx.fillStyle = '#8b949e';
ctx.fillText(`${hour}:00`, x + 3, h - 2);
ctx.fillStyle = '#30363d';
}
}
}
_handleClick(e) {
const rect = this.canvas.getBoundingClientRect();
const x = e.clientX - rect.left;
const pct = x / this.canvas.width;
const ts = this.dayStartTs + pct * 86400;
const clicked = this.segments.find(s => ts >= s.start && ts <= (s.end || s.start + 10));
if (clicked && this.onClick) {
this.onClick(clicked);
}
}
}
window.VigilarTimeline = Timeline;
})();
```
- [ ] **Step 6: Update recordings template**
Replace the table in `vigilar/web/templates/recordings.html` with the timeline view. Add a `<canvas>` per camera, date picker, filter buttons, and a video player area. Use the `VigilarTimeline` class to render each camera's timeline and handle click-to-play.
- [ ] **Step 7: Run tests**
Run: `source .venv/bin/activate && python -m pytest tests/unit/test_timeline.py tests/ -v`
Expected: All PASS
- [ ] **Step 8: Commit**
```bash
git add vigilar/web/static/js/timeline.js vigilar/web/blueprints/recordings.py vigilar/web/templates/recordings.html vigilar/storage/queries.py tests/unit/test_timeline.py
git commit -m "feat: add recording timeline with color-coded detection types
Canvas-based 24h timeline per camera. Color-coded segments for
person (red), vehicle (blue), motion (gray). Click to play.
Date picker, filter buttons, hour markers."
```
---
## Final: Integration Verification
- [ ] **Step 1: Run full test suite**
```bash
source .venv/bin/activate && python -m pytest tests/ -v
```
Expected: All tests pass (96 original + ~35 new = ~131 total)
- [ ] **Step 2: Verify all web routes**
```bash
source .venv/bin/activate && python -c "
from vigilar.config import load_config
from vigilar.web.app import create_app
cfg = load_config()
app = create_app(cfg)
with app.test_client() as c:
for p in ['/', '/system/settings', '/kiosk/', '/events/', '/sensors/', '/recordings/', '/cameras/']:
r = c.get(p)
print(f'{p}: {r.status_code}')
"
```
Expected: All 200
- [ ] **Step 3: Commit and push**
```bash
git push
```