diff --git a/docs/superpowers/plans/2026-04-03-pet-aware-security.md b/docs/superpowers/plans/2026-04-03-pet-aware-security.md new file mode 100644 index 0000000..5295cd3 --- /dev/null +++ b/docs/superpowers/plans/2026-04-03-pet-aware-security.md @@ -0,0 +1,2674 @@ +# Pet-Aware Security Features Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add pet detection, pet identification, wildlife monitoring, and pet activity tracking to Vigilar using YOLOv8 unified detection and a trainable pet ID classifier. + +**Architecture:** Replace MobileNet-SSD with YOLOv8-small for all object detection (people, vehicles, animals) in a single inference pass. Add a second-stage MobileNetV3-Small classifier for identifying specific pets from cropped bounding boxes. Wildlife gets threat-tiered alerting. Pet dashboard with activity tracking and highlight reels. + +**Tech Stack:** ultralytics (YOLOv8), torchvision (MobileNetV3-Small), existing Flask/Bootstrap/SQLAlchemy stack. + +**Spec:** `docs/superpowers/specs/2026-04-03-pet-aware-security-design.md` + +--- + +### Task 1: Add new constants, enums, and MQTT topics + +**Files:** +- Modify: `vigilar/constants.py` +- Test: `tests/unit/test_constants.py` (new) + +- [ ] **Step 1: Write tests for new enums and topics** + +```python +# tests/unit/test_constants.py +"""Tests for new pet/wildlife constants.""" + +from vigilar.constants import ( + CameraLocation, + EventType, + ThreatLevel, + Topics, +) + + +class TestThreatLevel: + def test_values(self): + assert ThreatLevel.PREDATOR == "PREDATOR" + assert ThreatLevel.NUISANCE == "NUISANCE" + assert ThreatLevel.PASSIVE == "PASSIVE" + + def test_is_strenum(self): + assert isinstance(ThreatLevel.PREDATOR, str) + + +class TestCameraLocation: + def test_values(self): + assert CameraLocation.EXTERIOR == "EXTERIOR" + assert CameraLocation.INTERIOR == "INTERIOR" + assert CameraLocation.TRANSITION == "TRANSITION" + + def test_is_strenum(self): + assert isinstance(CameraLocation.EXTERIOR, str) + + +class TestNewEventTypes: + def test_pet_events_exist(self): + assert EventType.PET_DETECTED == "PET_DETECTED" + assert EventType.PET_ESCAPE == "PET_ESCAPE" + assert EventType.UNKNOWN_ANIMAL == "UNKNOWN_ANIMAL" + + def test_wildlife_events_exist(self): + assert EventType.WILDLIFE_PREDATOR == "WILDLIFE_PREDATOR" + assert EventType.WILDLIFE_NUISANCE == "WILDLIFE_NUISANCE" + assert EventType.WILDLIFE_PASSIVE == "WILDLIFE_PASSIVE" + + +class TestPetTopics: + def test_pet_detected_topic(self): + assert Topics.camera_pet_detected("front") == "vigilar/camera/front/pet/detected" + + def test_wildlife_detected_topic(self): + assert Topics.camera_wildlife_detected("front") == "vigilar/camera/front/wildlife/detected" + + def test_pet_location_topic(self): + assert Topics.pet_location("angel") == "vigilar/pets/angel/location" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_constants.py -v` +Expected: FAIL — ThreatLevel, CameraLocation not defined, new EventType values missing + +- [ ] **Step 3: Add new enums and event types to constants.py** + +Add after the `HouseholdState` enum (after line 103): + +```python +# --- Threat Levels (Wildlife) --- + +class ThreatLevel(StrEnum): + PREDATOR = "PREDATOR" + NUISANCE = "NUISANCE" + PASSIVE = "PASSIVE" + + +# --- Camera Location --- + +class CameraLocation(StrEnum): + EXTERIOR = "EXTERIOR" + INTERIOR = "INTERIOR" + TRANSITION = "TRANSITION" +``` + +Add to the `EventType` enum (after line 42, before the closing of the class): + +```python + PET_DETECTED = "PET_DETECTED" + PET_ESCAPE = "PET_ESCAPE" + UNKNOWN_ANIMAL = "UNKNOWN_ANIMAL" + WILDLIFE_PREDATOR = "WILDLIFE_PREDATOR" + WILDLIFE_NUISANCE = "WILDLIFE_NUISANCE" + WILDLIFE_PASSIVE = "WILDLIFE_PASSIVE" +``` + +Add to `RecordingTrigger` enum (after line 70): + +```python + PET = "PET" + WILDLIFE = "WILDLIFE" +``` + +Add new topic methods to the `Topics` class (after the presence methods, before the system constants): + +```python + # Pet + @staticmethod + def camera_pet_detected(camera_id: str) -> str: + return f"vigilar/camera/{camera_id}/pet/detected" + + @staticmethod + def camera_wildlife_detected(camera_id: str) -> str: + return f"vigilar/camera/{camera_id}/wildlife/detected" + + @staticmethod + def pet_location(pet_name: str) -> str: + return f"vigilar/pets/{pet_name}/location" +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_constants.py -v` +Expected: PASS + +- [ ] **Step 5: Run full test suite to check for regressions** + +Run: `pytest tests/ -v` +Expected: All existing tests still pass + +- [ ] **Step 6: Commit** + +```bash +git add vigilar/constants.py tests/unit/test_constants.py +git commit -m "Add pet/wildlife enums, event types, and MQTT topics" +``` + +--- + +### Task 2: Add pet and wildlife config models + +**Files:** +- Modify: `vigilar/config.py` +- Modify: `tests/unit/test_config.py` + +- [ ] **Step 1: Write tests for new config models** + +Add to `tests/unit/test_config.py`: + +```python +from vigilar.config import PetsConfig, WildlifeThreatMap, WildlifeSizeHeuristics, PetActivityConfig + + +class TestPetsConfig: + def test_defaults(self): + cfg = PetsConfig() + assert cfg.enabled is False + assert cfg.model == "yolov8s" + assert cfg.confidence_threshold == 0.5 + assert cfg.pet_id_threshold == 0.7 + assert cfg.pet_id_low_confidence == 0.5 + assert cfg.min_training_images == 20 + assert cfg.crop_retention_days == 7 + + def test_custom_values(self): + cfg = PetsConfig(enabled=True, model="yolov8m", confidence_threshold=0.6) + assert cfg.enabled is True + assert cfg.model == "yolov8m" + assert cfg.confidence_threshold == 0.6 + + +class TestWildlifeThreatMap: + def test_defaults(self): + tm = WildlifeThreatMap() + assert "bear" in tm.predator + assert "bird" in tm.passive + + def test_custom_mapping(self): + tm = WildlifeThreatMap(predator=["bear", "wolf"], nuisance=["raccoon"]) + assert "wolf" in tm.predator + assert "raccoon" in tm.nuisance + + +class TestWildlifeSizeHeuristics: + def test_defaults(self): + sh = WildlifeSizeHeuristics() + assert sh.small == 0.02 + assert sh.medium == 0.08 + assert sh.large == 0.15 + + +class TestPetActivityConfig: + def test_defaults(self): + cfg = PetActivityConfig() + assert cfg.daily_digest is True + assert cfg.highlight_clips is True + assert cfg.zoomie_threshold == 0.8 + + +class TestCameraConfigLocation: + def test_default_location_is_interior(self): + from vigilar.config import CameraConfig + cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x") + assert cfg.location == "INTERIOR" + + def test_exterior_location(self): + from vigilar.config import CameraConfig + cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x", location="EXTERIOR") + assert cfg.location == "EXTERIOR" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_config.py::TestPetsConfig -v` +Expected: FAIL — PetsConfig not defined + +- [ ] **Step 3: Add config models to config.py** + +Add after `HealthConfig` (around line 239) and before `VigilarConfig`: + +```python +# --- Pet Detection Config --- + +class WildlifeThreatMap(BaseModel): + predator: list[str] = Field(default_factory=lambda: ["bear"]) + nuisance: list[str] = Field(default_factory=list) + passive: list[str] = Field(default_factory=lambda: ["bird", "horse", "cow", "sheep"]) + +class WildlifeSizeHeuristics(BaseModel): + small: float = 0.02 # < 2% of frame → nuisance + medium: float = 0.08 # 2-8% → predator + large: float = 0.15 # > 8% → passive (deer-sized) + +class WildlifeConfig(BaseModel): + threat_map: WildlifeThreatMap = Field(default_factory=WildlifeThreatMap) + size_heuristics: WildlifeSizeHeuristics = Field(default_factory=WildlifeSizeHeuristics) + +class PetActivityConfig(BaseModel): + daily_digest: bool = True + highlight_clips: bool = True + zoomie_threshold: float = 0.8 + +class PetsConfig(BaseModel): + enabled: bool = False + model: str = "yolov8s" + model_path: str = "/var/vigilar/models/yolov8s.pt" + confidence_threshold: float = 0.5 + pet_id_enabled: bool = True + pet_id_model_path: str = "/var/vigilar/models/pet_id.pt" + pet_id_threshold: float = 0.7 + pet_id_low_confidence: float = 0.5 + training_dir: str = "/var/vigilar/pets/training" + crop_staging_dir: str = "/var/vigilar/pets/staging" + crop_retention_days: int = 7 + min_training_images: int = 20 + wildlife: WildlifeConfig = Field(default_factory=WildlifeConfig) + activity: PetActivityConfig = Field(default_factory=PetActivityConfig) +``` + +Add `location` field to `CameraConfig` (after the existing fields, around line 45): + +```python + location: str = "INTERIOR" # EXTERIOR | INTERIOR | TRANSITION +``` + +Add `pets` field to `VigilarConfig`: + +```python + pets: PetsConfig = Field(default_factory=PetsConfig) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_config.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/config.py tests/unit/test_config.py +git commit -m "Add pet detection, wildlife, and activity config models" +``` + +--- + +### Task 3: Add database tables for pets, sightings, and training + +**Files:** +- Modify: `vigilar/storage/schema.py` +- Modify: `vigilar/storage/queries.py` +- Modify: `tests/unit/test_schema.py` + +- [ ] **Step 1: Write tests for new tables** + +Add to `tests/unit/test_schema.py`: + +```python +from vigilar.storage.schema import pets, pet_sightings, wildlife_sightings, pet_training_images + + +class TestPetTables: + def test_pets_table_exists(self, test_db): + with test_db.connect() as conn: + result = conn.execute(pets.insert().values( + id="pet-1", name="Angel", species="cat", breed="DSH", + color_description="black", training_count=0, created_at=1000.0, + )) + row = conn.execute(pets.select().where(pets.c.id == "pet-1")).first() + assert row is not None + assert row.name == "Angel" + assert row.species == "cat" + + def test_pet_sightings_table(self, test_db): + with test_db.begin() as conn: + conn.execute(pets.insert().values( + id="pet-1", name="Angel", species="cat", training_count=0, created_at=1000.0, + )) + conn.execute(pet_sightings.insert().values( + ts=1000.0, pet_id="pet-1", species="cat", camera_id="kitchen", + confidence=0.92, labeled=True, + )) + rows = conn.execute(pet_sightings.select()).fetchall() + assert len(rows) == 1 + assert rows[0].camera_id == "kitchen" + + def test_wildlife_sightings_table(self, test_db): + with test_db.begin() as conn: + conn.execute(wildlife_sightings.insert().values( + ts=1000.0, species="bear", threat_level="PREDATOR", + camera_id="front", confidence=0.88, + )) + rows = conn.execute(wildlife_sightings.select()).fetchall() + assert len(rows) == 1 + assert rows[0].threat_level == "PREDATOR" + + def test_pet_training_images_table(self, test_db): + with test_db.begin() as conn: + conn.execute(pets.insert().values( + id="pet-1", name="Angel", species="cat", training_count=0, created_at=1000.0, + )) + conn.execute(pet_training_images.insert().values( + pet_id="pet-1", image_path="/var/vigilar/pets/training/angel/001.jpg", + source="upload", created_at=1000.0, + )) + rows = conn.execute(pet_training_images.select()).fetchall() + assert len(rows) == 1 + assert rows[0].source == "upload" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_schema.py::TestPetTables -v` +Expected: FAIL — tables not defined + +- [ ] **Step 3: Add new tables to schema.py** + +Add after `push_subscriptions` table (after line 125): + +```python +pets = Table( + "pets", + metadata, + Column("id", String, primary_key=True), + Column("name", String, nullable=False), + Column("species", String, nullable=False), + Column("breed", String), + Column("color_description", String), + Column("photo_path", String), + Column("training_count", Integer, nullable=False, default=0), + Column("created_at", Float, nullable=False), +) + +pet_sightings = Table( + "pet_sightings", + metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("ts", Float, nullable=False), + Column("pet_id", String), + Column("species", String, nullable=False), + Column("camera_id", String, nullable=False), + Column("confidence", Float), + Column("crop_path", String), + Column("labeled", Integer, nullable=False, default=0), + Column("event_id", Integer), +) +Index("idx_pet_sightings_ts", pet_sightings.c.ts.desc()) +Index("idx_pet_sightings_pet", pet_sightings.c.pet_id, pet_sightings.c.ts.desc()) +Index("idx_pet_sightings_camera", pet_sightings.c.camera_id, pet_sightings.c.ts.desc()) + +wildlife_sightings = Table( + "wildlife_sightings", + metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("ts", Float, nullable=False), + Column("species", String, nullable=False), + Column("threat_level", String, nullable=False), + Column("camera_id", String, nullable=False), + Column("confidence", Float), + Column("crop_path", String), + Column("event_id", Integer), +) +Index("idx_wildlife_ts", wildlife_sightings.c.ts.desc()) +Index("idx_wildlife_threat", wildlife_sightings.c.threat_level, wildlife_sightings.c.ts.desc()) + +pet_training_images = Table( + "pet_training_images", + metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("pet_id", String, nullable=False), + Column("image_path", String, nullable=False), + Column("source", String, nullable=False), + Column("created_at", Float, nullable=False), +) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_schema.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/storage/schema.py tests/unit/test_schema.py +git commit -m "Add pets, pet_sightings, wildlife_sightings, pet_training_images tables" +``` + +--- + +### Task 4: Add pet and wildlife database query functions + +**Files:** +- Modify: `vigilar/storage/queries.py` +- Test: `tests/unit/test_pet_queries.py` (new) + +- [ ] **Step 1: Write tests for pet query functions** + +```python +# tests/unit/test_pet_queries.py +"""Tests for pet and wildlife query functions.""" + +import time + +from vigilar.storage.queries import ( + insert_pet, + get_pet, + get_all_pets, + insert_pet_sighting, + get_pet_sightings, + get_pet_last_location, + insert_wildlife_sighting, + get_wildlife_sightings, + insert_training_image, + get_training_images, + get_unlabeled_sightings, + label_sighting, +) + + +class TestPetCRUD: + def test_insert_and_get_pet(self, test_db): + pet_id = insert_pet(test_db, name="Angel", species="cat", breed="DSH", + color_description="black") + pet = get_pet(test_db, pet_id) + assert pet is not None + assert pet["name"] == "Angel" + assert pet["species"] == "cat" + assert pet["training_count"] == 0 + + def test_get_all_pets(self, test_db): + insert_pet(test_db, name="Angel", species="cat") + insert_pet(test_db, name="Milo", species="dog") + all_pets = get_all_pets(test_db) + assert len(all_pets) == 2 + + +class TestPetSightings: + def test_insert_and_query(self, test_db): + pet_id = insert_pet(test_db, name="Angel", species="cat") + insert_pet_sighting(test_db, pet_id=pet_id, species="cat", + camera_id="kitchen", confidence=0.92) + sightings = get_pet_sightings(test_db, limit=10) + assert len(sightings) == 1 + assert sightings[0]["camera_id"] == "kitchen" + + def test_last_location(self, test_db): + pet_id = insert_pet(test_db, name="Angel", species="cat") + insert_pet_sighting(test_db, pet_id=pet_id, species="cat", + camera_id="kitchen", confidence=0.9) + insert_pet_sighting(test_db, pet_id=pet_id, species="cat", + camera_id="living_room", confidence=0.95) + loc = get_pet_last_location(test_db, pet_id) + assert loc is not None + assert loc["camera_id"] == "living_room" + + def test_unlabeled_sightings(self, test_db): + insert_pet_sighting(test_db, pet_id=None, species="cat", + camera_id="kitchen", confidence=0.6, crop_path="/tmp/crop.jpg") + unlabeled = get_unlabeled_sightings(test_db, limit=10) + assert len(unlabeled) == 1 + assert unlabeled[0]["labeled"] == 0 + + def test_label_sighting(self, test_db): + pet_id = insert_pet(test_db, name="Angel", species="cat") + insert_pet_sighting(test_db, pet_id=None, species="cat", + camera_id="kitchen", confidence=0.6) + sightings = get_unlabeled_sightings(test_db, limit=10) + sighting_id = sightings[0]["id"] + label_sighting(test_db, sighting_id, pet_id) + updated = get_pet_sightings(test_db, pet_id=pet_id) + assert len(updated) == 1 + assert updated[0]["labeled"] == 1 + + +class TestWildlifeSightings: + def test_insert_and_query(self, test_db): + insert_wildlife_sighting(test_db, species="bear", threat_level="PREDATOR", + camera_id="front", confidence=0.88) + sightings = get_wildlife_sightings(test_db, limit=10) + assert len(sightings) == 1 + assert sightings[0]["threat_level"] == "PREDATOR" + + def test_filter_by_threat(self, test_db): + insert_wildlife_sighting(test_db, species="bear", threat_level="PREDATOR", + camera_id="front", confidence=0.88) + insert_wildlife_sighting(test_db, species="deer", threat_level="PASSIVE", + camera_id="back", confidence=0.75) + predators = get_wildlife_sightings(test_db, threat_level="PREDATOR") + assert len(predators) == 1 + + +class TestTrainingImages: + def test_insert_and_query(self, test_db): + pet_id = insert_pet(test_db, name="Angel", species="cat") + insert_training_image(test_db, pet_id=pet_id, + image_path="/var/vigilar/pets/training/angel/001.jpg", + source="upload") + images = get_training_images(test_db, pet_id) + assert len(images) == 1 + assert images[0]["source"] == "upload" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_pet_queries.py -v` +Expected: FAIL — functions not defined + +- [ ] **Step 3: Implement query functions** + +Add to `vigilar/storage/queries.py`. Add imports at the top: + +```python +from vigilar.storage.schema import ( + # existing imports... + pets, + pet_sightings, + pet_training_images, + wildlife_sightings, +) +``` + +Add the following functions at the bottom of the file: + +```python +# --- Pets --- + +def insert_pet( + engine: Engine, + name: str, + species: str, + breed: str | None = None, + color_description: str | None = None, + photo_path: str | None = None, +) -> str: + import uuid + pet_id = str(uuid.uuid4()) + with engine.begin() as conn: + conn.execute(pets.insert().values( + id=pet_id, name=name, species=species, breed=breed, + color_description=color_description, photo_path=photo_path, + training_count=0, created_at=time.time(), + )) + return pet_id + + +def get_pet(engine: Engine, pet_id: str) -> dict[str, Any] | None: + with engine.connect() as conn: + row = conn.execute(pets.select().where(pets.c.id == pet_id)).first() + return dict(row._mapping) if row else None + + +def get_all_pets(engine: Engine) -> list[dict[str, Any]]: + with engine.connect() as conn: + rows = conn.execute(pets.select().order_by(pets.c.name)).fetchall() + return [dict(r._mapping) for r in rows] + + +# --- Pet Sightings --- + +def insert_pet_sighting( + engine: Engine, + species: str, + camera_id: str, + confidence: float, + pet_id: str | None = None, + crop_path: str | None = None, + event_id: int | None = None, +) -> int: + with engine.begin() as conn: + result = conn.execute(pet_sightings.insert().values( + ts=time.time(), pet_id=pet_id, species=species, + camera_id=camera_id, confidence=confidence, + crop_path=crop_path, labeled=1 if pet_id else 0, + event_id=event_id, + )) + return result.inserted_primary_key[0] + + +def get_pet_sightings( + engine: Engine, + pet_id: str | None = None, + camera_id: str | None = None, + since_ts: float | None = None, + limit: int = 100, +) -> list[dict[str, Any]]: + query = select(pet_sightings).order_by(desc(pet_sightings.c.ts)).limit(limit) + if pet_id: + query = query.where(pet_sightings.c.pet_id == pet_id) + if camera_id: + query = query.where(pet_sightings.c.camera_id == camera_id) + if since_ts: + query = query.where(pet_sightings.c.ts >= since_ts) + with engine.connect() as conn: + rows = conn.execute(query).fetchall() + return [dict(r._mapping) for r in rows] + + +def get_pet_last_location(engine: Engine, pet_id: str) -> dict[str, Any] | None: + with engine.connect() as conn: + row = conn.execute( + select(pet_sightings) + .where(pet_sightings.c.pet_id == pet_id) + .order_by(desc(pet_sightings.c.ts)) + .limit(1) + ).first() + return dict(row._mapping) if row else None + + +def get_unlabeled_sightings( + engine: Engine, + species: str | None = None, + limit: int = 50, +) -> list[dict[str, Any]]: + query = ( + select(pet_sightings) + .where(pet_sightings.c.labeled == 0) + .order_by(desc(pet_sightings.c.ts)) + .limit(limit) + ) + if species: + query = query.where(pet_sightings.c.species == species) + with engine.connect() as conn: + rows = conn.execute(query).fetchall() + return [dict(r._mapping) for r in rows] + + +def label_sighting(engine: Engine, sighting_id: int, pet_id: str) -> None: + with engine.begin() as conn: + conn.execute( + pet_sightings.update() + .where(pet_sightings.c.id == sighting_id) + .values(pet_id=pet_id, labeled=1) + ) + + +# --- Wildlife Sightings --- + +def insert_wildlife_sighting( + engine: Engine, + species: str, + threat_level: str, + camera_id: str, + confidence: float, + crop_path: str | None = None, + event_id: int | None = None, +) -> int: + with engine.begin() as conn: + result = conn.execute(wildlife_sightings.insert().values( + ts=time.time(), species=species, threat_level=threat_level, + camera_id=camera_id, confidence=confidence, + crop_path=crop_path, event_id=event_id, + )) + return result.inserted_primary_key[0] + + +def get_wildlife_sightings( + engine: Engine, + threat_level: str | None = None, + camera_id: str | None = None, + since_ts: float | None = None, + limit: int = 100, +) -> list[dict[str, Any]]: + query = select(wildlife_sightings).order_by(desc(wildlife_sightings.c.ts)).limit(limit) + if threat_level: + query = query.where(wildlife_sightings.c.threat_level == threat_level) + if camera_id: + query = query.where(wildlife_sightings.c.camera_id == camera_id) + if since_ts: + query = query.where(wildlife_sightings.c.ts >= since_ts) + with engine.connect() as conn: + rows = conn.execute(query).fetchall() + return [dict(r._mapping) for r in rows] + + +# --- Training Images --- + +def insert_training_image( + engine: Engine, + pet_id: str, + image_path: str, + source: str, +) -> int: + with engine.begin() as conn: + result = conn.execute(pet_training_images.insert().values( + pet_id=pet_id, image_path=image_path, + source=source, created_at=time.time(), + )) + # Update training count on pet + conn.execute( + pets.update().where(pets.c.id == pet_id) + .values(training_count=pets.c.training_count + 1) + ) + return result.inserted_primary_key[0] + + +def get_training_images( + engine: Engine, + pet_id: str, +) -> list[dict[str, Any]]: + with engine.connect() as conn: + rows = conn.execute( + select(pet_training_images) + .where(pet_training_images.c.pet_id == pet_id) + .order_by(desc(pet_training_images.c.created_at)) + ).fetchall() + return [dict(r._mapping) for r in rows] +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_pet_queries.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/storage/queries.py tests/unit/test_pet_queries.py +git commit -m "Add pet and wildlife database query functions" +``` + +--- + +### Task 5: Implement YOLOv8 unified detector + +**Files:** +- Create: `vigilar/detection/yolo.py` +- Test: `tests/unit/test_yolo_detector.py` (new) + +- [ ] **Step 1: Write tests for YOLOv8 detector** + +```python +# tests/unit/test_yolo_detector.py +"""Tests for YOLOv8 unified detector.""" + +import numpy as np +import pytest + +from vigilar.detection.person import Detection +from vigilar.detection.yolo import YOLODetector, ANIMAL_CLASSES, WILDLIFE_CLASSES + + +class TestYOLOConstants: + def test_animal_classes(self): + assert "cat" in ANIMAL_CLASSES + assert "dog" in ANIMAL_CLASSES + + def test_wildlife_classes(self): + assert "bear" in WILDLIFE_CLASSES + assert "bird" in WILDLIFE_CLASSES + + def test_no_overlap_animal_wildlife(self): + assert not ANIMAL_CLASSES.intersection(WILDLIFE_CLASSES) + + +class TestYOLODetector: + def test_initializes_without_model(self): + detector = YOLODetector(model_path="nonexistent.pt", confidence_threshold=0.5) + assert not detector.is_loaded + + def test_detect_returns_empty_when_not_loaded(self): + detector = YOLODetector(model_path="nonexistent.pt") + frame = np.zeros((480, 640, 3), dtype=np.uint8) + detections = detector.detect(frame) + assert detections == [] + + def test_classify_detection_person(self): + d = Detection(class_name="person", class_id=0, confidence=0.9, bbox=(10, 20, 100, 200)) + assert YOLODetector.classify(d) == "person" + + def test_classify_detection_vehicle(self): + d = Detection(class_name="car", class_id=2, confidence=0.85, bbox=(10, 20, 100, 200)) + assert YOLODetector.classify(d) == "vehicle" + + def test_classify_detection_domestic_animal(self): + d = Detection(class_name="cat", class_id=15, confidence=0.9, bbox=(10, 20, 100, 200)) + assert YOLODetector.classify(d) == "domestic_animal" + + def test_classify_detection_wildlife(self): + d = Detection(class_name="bear", class_id=21, confidence=0.8, bbox=(10, 20, 100, 200)) + assert YOLODetector.classify(d) == "wildlife" + + def test_classify_detection_other(self): + d = Detection(class_name="chair", class_id=56, confidence=0.7, bbox=(10, 20, 100, 200)) + assert YOLODetector.classify(d) == "other" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_yolo_detector.py -v` +Expected: FAIL — yolo module not found + +- [ ] **Step 3: Implement YOLOv8 detector** + +```python +# vigilar/detection/yolo.py +"""Unified object detection using YOLOv8 via ultralytics.""" + +import logging +from pathlib import Path + +import numpy as np + +from vigilar.detection.person import Detection + +log = logging.getLogger(__name__) + +# COCO class IDs for domestic animals +ANIMAL_CLASSES = {"cat", "dog"} + +# COCO class IDs for wildlife (subset that YOLO can detect) +WILDLIFE_CLASSES = {"bear", "bird", "horse", "cow", "sheep", "elephant", "zebra", "giraffe"} + +# Vehicle class names from COCO +VEHICLE_CLASSES = {"car", "motorcycle", "bus", "truck", "boat"} + + +class YOLODetector: + def __init__(self, model_path: str, confidence_threshold: float = 0.5): + self._threshold = confidence_threshold + self._model = None + self.is_loaded = False + + if Path(model_path).exists(): + try: + from ultralytics import YOLO + self._model = YOLO(model_path) + self.is_loaded = True + log.info("YOLO model loaded from %s", model_path) + except Exception as e: + log.error("Failed to load YOLO model: %s", e) + else: + log.warning("YOLO model not found at %s — detection disabled", model_path) + + def detect(self, frame: np.ndarray) -> list[Detection]: + if not self.is_loaded or self._model is None: + return [] + + results = self._model(frame, conf=self._threshold, verbose=False) + detections = [] + + for result in results: + for box in result.boxes: + class_id = int(box.cls[0]) + confidence = float(box.conf[0]) + class_name = result.names[class_id] + + x1, y1, x2, y2 = box.xyxy[0].tolist() + x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) + bw, bh = x2 - x1, y2 - y1 + if bw <= 0 or bh <= 0: + continue + + detections.append(Detection( + class_name=class_name, + class_id=class_id, + confidence=confidence, + bbox=(x1, y1, bw, bh), + )) + + return detections + + @staticmethod + def classify(detection: Detection) -> str: + name = detection.class_name + if name == "person": + return "person" + if name in VEHICLE_CLASSES: + return "vehicle" + if name in ANIMAL_CLASSES: + return "domestic_animal" + if name in WILDLIFE_CLASSES: + return "wildlife" + return "other" +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_yolo_detector.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/detection/yolo.py tests/unit/test_yolo_detector.py +git commit -m "Add YOLOv8 unified detector with class classification" +``` + +--- + +### Task 6: Implement wildlife threat classifier + +**Files:** +- Create: `vigilar/detection/wildlife.py` +- Test: `tests/unit/test_wildlife.py` (new) + +- [ ] **Step 1: Write tests for wildlife threat classification** + +```python +# tests/unit/test_wildlife.py +"""Tests for wildlife threat classification.""" + +from vigilar.config import WildlifeConfig, WildlifeThreatMap, WildlifeSizeHeuristics +from vigilar.detection.person import Detection +from vigilar.detection.wildlife import classify_wildlife_threat + + +def _make_config(**kwargs): + return WildlifeConfig(**kwargs) + + +class TestWildlifeThreatClassification: + def test_bear_is_predator(self): + cfg = _make_config() + d = Detection(class_name="bear", class_id=21, confidence=0.9, bbox=(100, 100, 200, 300)) + level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) + assert level == "PREDATOR" + assert species == "bear" + + def test_bird_is_passive(self): + cfg = _make_config() + d = Detection(class_name="bird", class_id=14, confidence=0.8, bbox=(10, 10, 30, 20)) + level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) + assert level == "PASSIVE" + assert species == "bird" + + def test_unknown_small_is_nuisance(self): + cfg = _make_config() + # Small bbox relative to frame → nuisance (raccoon/skunk sized) + # frame_area = 1920*1080 = 2073600, bbox area = 30*30 = 900 → 0.04% + d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 30, 30)) + level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) + assert level == "NUISANCE" + assert species == "unknown" + + def test_unknown_medium_is_predator(self): + cfg = _make_config() + # Medium bbox → predator (fox/coyote sized) + # bbox area = 200*150 = 30000, frame = 2073600 → 1.4%... need bigger + # bbox area = 300*300 = 90000 / 2073600 → 4.3% → between small and medium thresholds + d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 300, 300)) + level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) + assert level == "PREDATOR" + + def test_unknown_large_is_passive(self): + cfg = _make_config() + # Large bbox → passive (deer sized) + # bbox area = 600*500 = 300000 / 2073600 → 14.5% → > large threshold + d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 600, 500)) + level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) + assert level == "PASSIVE" + + def test_custom_threat_map(self): + cfg = _make_config(threat_map=WildlifeThreatMap(predator=["bear", "wolf"])) + d = Detection(class_name="wolf", class_id=99, confidence=0.85, bbox=(100, 100, 200, 200)) + level, _ = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) + assert level == "PREDATOR" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_wildlife.py -v` +Expected: FAIL — wildlife module not found + +- [ ] **Step 3: Implement wildlife classifier** + +```python +# vigilar/detection/wildlife.py +"""Wildlife threat level classification.""" + +from vigilar.config import WildlifeConfig +from vigilar.detection.person import Detection + + +def classify_wildlife_threat( + detection: Detection, + config: WildlifeConfig, + frame_area: int, +) -> tuple[str, str]: + """Classify a wildlife detection into threat level and species. + + Returns (threat_level, species_name). + """ + species = detection.class_name + threat_map = config.threat_map + + # Direct COCO class mapping first + if species in threat_map.predator: + return "PREDATOR", species + if species in threat_map.nuisance: + return "NUISANCE", species + if species in threat_map.passive: + return "PASSIVE", species + + # Fallback to size heuristics for unknown species + _, _, w, h = detection.bbox + bbox_area = w * h + area_ratio = bbox_area / frame_area if frame_area > 0 else 0 + + heuristics = config.size_heuristics + if area_ratio < heuristics.small: + return "NUISANCE", species + elif area_ratio < heuristics.large: + return "PREDATOR", species + else: + return "PASSIVE", species +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_wildlife.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/detection/wildlife.py tests/unit/test_wildlife.py +git commit -m "Add wildlife threat classification with size heuristics" +``` + +--- + +### Task 7: Implement pet ID classifier + +**Files:** +- Create: `vigilar/detection/pet_id.py` +- Test: `tests/unit/test_pet_id.py` (new) + +- [ ] **Step 1: Write tests for pet ID classifier** + +```python +# tests/unit/test_pet_id.py +"""Tests for pet ID classifier.""" + +import numpy as np +import pytest + +from vigilar.detection.pet_id import PetIDClassifier, PetIDResult + + +class TestPetIDResult: + def test_identified(self): + r = PetIDResult(pet_id="pet-1", pet_name="Angel", confidence=0.9) + assert r.is_identified + assert not r.is_low_confidence + + def test_low_confidence(self): + r = PetIDResult(pet_id="pet-1", pet_name="Angel", confidence=0.6) + assert r.is_identified + assert r.is_low_confidence + + def test_unknown(self): + r = PetIDResult(pet_id=None, pet_name=None, confidence=0.3) + assert not r.is_identified + + +class TestPetIDClassifier: + def test_not_loaded_returns_unknown(self): + classifier = PetIDClassifier(model_path="nonexistent.pt") + assert not classifier.is_loaded + crop = np.zeros((224, 224, 3), dtype=np.uint8) + result = classifier.identify(crop, species="cat") + assert not result.is_identified + + def test_no_pets_registered_returns_unknown(self): + classifier = PetIDClassifier(model_path="nonexistent.pt") + assert classifier.pet_count == 0 + + def test_register_pet(self): + classifier = PetIDClassifier(model_path="nonexistent.pt") + classifier.register_pet("pet-1", "Angel", "cat") + classifier.register_pet("pet-2", "Milo", "dog") + assert classifier.pet_count == 2 + + def test_species_filter(self): + classifier = PetIDClassifier(model_path="nonexistent.pt") + classifier.register_pet("pet-1", "Angel", "cat") + classifier.register_pet("pet-2", "Taquito", "cat") + classifier.register_pet("pet-3", "Milo", "dog") + assert len(classifier.get_pets_by_species("cat")) == 2 + assert len(classifier.get_pets_by_species("dog")) == 1 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_pet_id.py -v` +Expected: FAIL — pet_id module not found + +- [ ] **Step 3: Implement pet ID classifier** + +```python +# vigilar/detection/pet_id.py +"""Pet identification classifier using MobileNetV3-Small.""" + +import logging +from dataclasses import dataclass +from pathlib import Path + +import numpy as np + +log = logging.getLogger(__name__) + +# Confidence thresholds (overridden by config at runtime) +DEFAULT_HIGH_THRESHOLD = 0.7 +DEFAULT_LOW_THRESHOLD = 0.5 + + +@dataclass +class PetIDResult: + pet_id: str | None + pet_name: str | None + confidence: float + high_threshold: float = DEFAULT_HIGH_THRESHOLD + low_threshold: float = DEFAULT_LOW_THRESHOLD + + @property + def is_identified(self) -> bool: + return self.pet_id is not None and self.confidence >= self.low_threshold + + @property + def is_low_confidence(self) -> bool: + return ( + self.pet_id is not None + and self.low_threshold <= self.confidence < self.high_threshold + ) + + +@dataclass +class RegisteredPet: + pet_id: str + name: str + species: str + class_index: int # index in the classifier output + + +class PetIDClassifier: + def __init__( + self, + model_path: str, + high_threshold: float = DEFAULT_HIGH_THRESHOLD, + low_threshold: float = DEFAULT_LOW_THRESHOLD, + ): + self._model_path = model_path + self._high_threshold = high_threshold + self._low_threshold = low_threshold + self._model = None + self._transform = None + self.is_loaded = False + self._pets: list[RegisteredPet] = [] + + if Path(model_path).exists(): + try: + import torch + self._model = torch.load(model_path, map_location="cpu", weights_only=False) + self._model.eval() + self.is_loaded = True + log.info("Pet ID model loaded from %s", model_path) + except Exception as e: + log.error("Failed to load pet ID model: %s", e) + else: + log.info("Pet ID model not found at %s — identification disabled until trained", + model_path) + + @property + def pet_count(self) -> int: + return len(self._pets) + + def register_pet(self, pet_id: str, name: str, species: str) -> None: + idx = len(self._pets) + self._pets.append(RegisteredPet(pet_id=pet_id, name=name, species=species, + class_index=idx)) + + def get_pets_by_species(self, species: str) -> list[RegisteredPet]: + return [p for p in self._pets if p.species == species] + + def identify(self, crop: np.ndarray, species: str) -> PetIDResult: + if not self.is_loaded or self._model is None: + return PetIDResult(pet_id=None, pet_name=None, confidence=0.0) + + candidates = self.get_pets_by_species(species) + if not candidates: + return PetIDResult(pet_id=None, pet_name=None, confidence=0.0) + + try: + import cv2 + import torch + from torchvision import transforms + + # Preprocess crop + resized = cv2.resize(crop, (224, 224)) + rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]), + ]) + tensor = transform(rgb).unsqueeze(0) + + with torch.no_grad(): + output = self._model(tensor) + probs = torch.softmax(output, dim=1)[0] + + # Find best match among candidates for this species + best_conf = 0.0 + best_pet = None + for pet in candidates: + if pet.class_index < len(probs): + conf = float(probs[pet.class_index]) + if conf > best_conf: + best_conf = conf + best_pet = pet + + if best_pet and best_conf >= self._low_threshold: + return PetIDResult( + pet_id=best_pet.pet_id, + pet_name=best_pet.name, + confidence=best_conf, + high_threshold=self._high_threshold, + low_threshold=self._low_threshold, + ) + + return PetIDResult(pet_id=None, pet_name=None, confidence=best_conf) + + except Exception as e: + log.error("Pet ID inference failed: %s", e) + return PetIDResult(pet_id=None, pet_name=None, confidence=0.0) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_pet_id.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/detection/pet_id.py tests/unit/test_pet_id.py +git commit -m "Add pet ID classifier with species-filtered identification" +``` + +--- + +### Task 8: Implement pet ID model trainer + +**Files:** +- Create: `vigilar/detection/trainer.py` +- Test: `tests/unit/test_trainer.py` (new) + +- [ ] **Step 1: Write tests for trainer** + +```python +# tests/unit/test_trainer.py +"""Tests for pet ID model trainer.""" + +import os +from pathlib import Path + +import numpy as np +import pytest + +from vigilar.detection.trainer import PetTrainer, TrainingStatus + + +class TestTrainingStatus: + def test_initial_status(self): + status = TrainingStatus() + assert status.is_training is False + assert status.progress == 0.0 + assert status.error is None + + +class TestPetTrainer: + def test_check_readiness_no_pets(self, tmp_path): + trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) + ready, msg = trainer.check_readiness(min_images=20) + assert not ready + assert "No pet" in msg + + def test_check_readiness_insufficient_images(self, tmp_path): + # Create pet dirs with too few images + pet_dir = tmp_path / "angel" + pet_dir.mkdir() + for i in range(5): + (pet_dir / f"{i}.jpg").write_bytes(b"fake") + + trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) + ready, msg = trainer.check_readiness(min_images=20) + assert not ready + assert "angel" in msg.lower() + + def test_check_readiness_sufficient_images(self, tmp_path): + for name in ["angel", "taquito"]: + pet_dir = tmp_path / name + pet_dir.mkdir() + for i in range(25): + (pet_dir / f"{i}.jpg").write_bytes(b"fake") + + trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) + ready, msg = trainer.check_readiness(min_images=20) + assert ready + + def test_get_class_names(self, tmp_path): + for name in ["angel", "milo", "taquito"]: + (tmp_path / name).mkdir() + + trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) + names = trainer.get_class_names() + assert names == ["angel", "milo", "taquito"] # sorted +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_trainer.py -v` +Expected: FAIL — trainer module not found + +- [ ] **Step 3: Implement trainer** + +```python +# vigilar/detection/trainer.py +"""Pet ID model trainer using MobileNetV3-Small with transfer learning.""" + +import logging +import shutil +from dataclasses import dataclass, field +from pathlib import Path + +log = logging.getLogger(__name__) + + +@dataclass +class TrainingStatus: + is_training: bool = False + progress: float = 0.0 + epoch: int = 0 + total_epochs: int = 0 + accuracy: float = 0.0 + error: str | None = None + + +class PetTrainer: + def __init__(self, training_dir: str, model_output_path: str): + self._training_dir = Path(training_dir) + self._model_output_path = Path(model_output_path) + self.status = TrainingStatus() + + def get_class_names(self) -> list[str]: + if not self._training_dir.exists(): + return [] + return sorted([ + d.name for d in self._training_dir.iterdir() + if d.is_dir() and not d.name.startswith(".") + ]) + + def check_readiness(self, min_images: int = 20) -> tuple[bool, str]: + class_names = self.get_class_names() + if not class_names: + return False, "No pet directories found in training directory." + + insufficient = [] + for name in class_names: + pet_dir = self._training_dir / name + image_count = sum(1 for f in pet_dir.iterdir() + if f.suffix.lower() in (".jpg", ".jpeg", ".png")) + if image_count < min_images: + insufficient.append(f"{name}: {image_count}/{min_images}") + + if insufficient: + return False, f"Insufficient training images: {', '.join(insufficient)}" + + return True, f"Ready to train with {len(class_names)} classes." + + def train(self, epochs: int = 30, batch_size: int = 16) -> bool: + try: + import torch + import torch.nn as nn + from torch.utils.data import DataLoader + from torchvision import datasets, models, transforms + + self.status = TrainingStatus(is_training=True, total_epochs=epochs) + class_names = self.get_class_names() + num_classes = len(class_names) + + if num_classes < 2: + self.status.error = "Need at least 2 pets to train." + self.status.is_training = False + return False + + # Data transforms with augmentation + train_transform = transforms.Compose([ + transforms.Resize((256, 256)), + transforms.RandomCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), + transforms.ToTensor(), + transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), + ]) + + dataset = datasets.ImageFolder(str(self._training_dir), transform=train_transform) + loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2) + + # MobileNetV3-Small with transfer learning + model = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT) + model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, num_classes) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model = model.to(device) + + # Freeze feature extractor, train classifier head first + for param in model.features.parameters(): + param.requires_grad = False + + optimizer = torch.optim.Adam(model.classifier.parameters(), lr=1e-3) + criterion = nn.CrossEntropyLoss() + + # Training loop + for epoch in range(epochs): + model.train() + running_loss = 0.0 + correct = 0 + total = 0 + + # Unfreeze features after 5 epochs for fine-tuning + if epoch == 5: + for param in model.features.parameters(): + param.requires_grad = True + optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) + + for inputs, labels in loader: + inputs, labels = inputs.to(device), labels.to(device) + optimizer.zero_grad() + outputs = model(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + running_loss += loss.item() + _, predicted = outputs.max(1) + total += labels.size(0) + correct += predicted.eq(labels).sum().item() + + accuracy = correct / total if total > 0 else 0 + self.status.epoch = epoch + 1 + self.status.progress = (epoch + 1) / epochs + self.status.accuracy = accuracy + log.info("Epoch %d/%d — loss: %.4f, accuracy: %.4f", + epoch + 1, epochs, running_loss / len(loader), accuracy) + + # Backup existing model + if self._model_output_path.exists(): + backup_path = self._model_output_path.with_suffix(".backup.pt") + shutil.copy2(self._model_output_path, backup_path) + log.info("Backed up previous model to %s", backup_path) + + # Save model + model = model.to("cpu") + torch.save(model, self._model_output_path) + log.info("Pet ID model saved to %s (accuracy: %.2f%%)", + self._model_output_path, self.status.accuracy * 100) + + self.status.is_training = False + return True + + except Exception as e: + log.exception("Training failed: %s", e) + self.status.error = str(e) + self.status.is_training = False + return False +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_trainer.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/detection/trainer.py tests/unit/test_trainer.py +git commit -m "Add pet ID model trainer with MobileNetV3-Small transfer learning" +``` + +--- + +### Task 9: Update alert profiles for pet/wildlife detection types + +**Files:** +- Modify: `vigilar/alerts/profiles.py` +- Modify: `tests/unit/test_profiles.py` + +- [ ] **Step 1: Write tests for new detection types in alert profiles** + +Add to `tests/unit/test_profiles.py`: + +```python +class TestPetAlertRouting: + def test_known_pet_exterior_gets_push(self): + rules = [ + AlertProfileRule(detection_type="known_pet", camera_location="EXTERIOR", + action="push_and_record", recipients="all"), + AlertProfileRule(detection_type="known_pet", camera_location="INTERIOR", + action="quiet_log", recipients="none"), + ] + profile = _make_profile("Away", ["EMPTY"], rules=rules) + action, recipients = get_action_for_event(profile, "known_pet", "front_entrance", + camera_location="EXTERIOR") + assert action == "push_and_record" + + def test_known_pet_interior_gets_quiet_log(self): + rules = [ + AlertProfileRule(detection_type="known_pet", camera_location="EXTERIOR", + action="push_and_record", recipients="all"), + AlertProfileRule(detection_type="known_pet", camera_location="INTERIOR", + action="quiet_log", recipients="none"), + ] + profile = _make_profile("Home", ["ALL_HOME"], rules=rules) + action, recipients = get_action_for_event(profile, "known_pet", "kitchen", + camera_location="INTERIOR") + assert action == "quiet_log" + + def test_wildlife_predator_gets_urgent(self): + rules = [ + AlertProfileRule(detection_type="wildlife_predator", + action="push_and_record", recipients="all"), + ] + profile = _make_profile("Always", ["EMPTY", "ALL_HOME"], rules=rules) + action, _ = get_action_for_event(profile, "wildlife_predator", "front", + camera_location="EXTERIOR") + assert action == "push_and_record" + + def test_camera_location_any_matches_all(self): + rules = [ + AlertProfileRule(detection_type="wildlife_predator", camera_location="any", + action="push_and_record", recipients="all"), + ] + profile = _make_profile("Always", ["EMPTY"], rules=rules) + action, _ = get_action_for_event(profile, "wildlife_predator", "kitchen", + camera_location="INTERIOR") + assert action == "push_and_record" + + def test_backward_compatible_without_camera_location(self): + """Existing calls without camera_location still work.""" + rules = [AlertProfileRule(detection_type="person", action="push_and_record", recipients="all")] + profile = _make_profile("Away", ["EMPTY"], rules=rules) + action, _ = get_action_for_event(profile, "person", "front_door") + assert action == "push_and_record" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_profiles.py::TestPetAlertRouting -v` +Expected: FAIL — get_action_for_event doesn't accept camera_location parameter + +- [ ] **Step 3: Update get_action_for_event to support camera_location** + +Modify `vigilar/alerts/profiles.py`. Update the `get_action_for_event` function: + +```python +def get_action_for_event( + profile: AlertProfileConfig, + detection_type: str, + camera_id: str, + camera_location: str | None = None, +) -> tuple[str, str]: + for rule in profile.rules: + if rule.detection_type != detection_type: + continue + # Check camera_location match + if rule.camera_location not in ("any", camera_id): + # Also check against the camera's location type + if camera_location and rule.camera_location != camera_location: + continue + elif not camera_location and rule.camera_location not in ("any", camera_id): + continue + return rule.action, rule.recipients + return "quiet_log", "none" +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_profiles.py -v` +Expected: PASS (all existing and new tests) + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/alerts/profiles.py tests/unit/test_profiles.py +git commit -m "Add camera_location filtering to alert profile matching" +``` + +--- + +### Task 10: Update event processor for pet/wildlife events + +**Files:** +- Modify: `vigilar/events/processor.py` +- Modify: `tests/unit/test_events.py` + +- [ ] **Step 1: Write tests for pet/wildlife event classification** + +Add to `tests/unit/test_events.py`: + +```python +class TestPetEventClassification: + def test_pet_detected_event(self): + from vigilar.events.processor import EventProcessor + from vigilar.constants import EventType, Severity + processor = EventProcessor.__new__(EventProcessor) + etype, sev, source = processor._classify_event( + "vigilar/camera/kitchen/pet/detected", + {"pet_name": "Angel", "confidence": 0.92}, + ) + assert etype == EventType.PET_DETECTED + assert sev == Severity.INFO + assert source == "kitchen" + + def test_wildlife_predator_event(self): + from vigilar.events.processor import EventProcessor + from vigilar.constants import EventType, Severity + processor = EventProcessor.__new__(EventProcessor) + etype, sev, source = processor._classify_event( + "vigilar/camera/front/wildlife/detected", + {"species": "bear", "threat_level": "PREDATOR"}, + ) + assert etype == EventType.WILDLIFE_PREDATOR + assert sev == Severity.CRITICAL + assert source == "front" + + def test_wildlife_nuisance_event(self): + from vigilar.events.processor import EventProcessor + from vigilar.constants import EventType, Severity + processor = EventProcessor.__new__(EventProcessor) + etype, sev, source = processor._classify_event( + "vigilar/camera/back/wildlife/detected", + {"species": "raccoon", "threat_level": "NUISANCE"}, + ) + assert etype == EventType.WILDLIFE_NUISANCE + assert sev == Severity.WARNING + assert source == "back" + + def test_wildlife_passive_event(self): + from vigilar.events.processor import EventProcessor + from vigilar.constants import EventType, Severity + processor = EventProcessor.__new__(EventProcessor) + etype, sev, source = processor._classify_event( + "vigilar/camera/front/wildlife/detected", + {"species": "deer", "threat_level": "PASSIVE"}, + ) + assert etype == EventType.WILDLIFE_PASSIVE + assert sev == Severity.INFO + assert source == "front" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_events.py::TestPetEventClassification -v` +Expected: FAIL — pet/wildlife topics not handled in _classify_event + +- [ ] **Step 3: Update event processor** + +In `vigilar/events/processor.py`, update `_classify_event` to handle the new topics. Add these cases in the camera section (after the existing `_TOPIC_EVENT_MAP` check, around line 130): + +```python + # Pet detection + if suffix == "pet/detected": + return EventType.PET_DETECTED, Severity.INFO, camera_id + + # Wildlife detection — severity depends on threat_level in payload + if suffix == "wildlife/detected": + threat = payload.get("threat_level", "PASSIVE") + if threat == "PREDATOR": + return EventType.WILDLIFE_PREDATOR, Severity.CRITICAL, camera_id + elif threat == "NUISANCE": + return EventType.WILDLIFE_NUISANCE, Severity.WARNING, camera_id + else: + return EventType.WILDLIFE_PASSIVE, Severity.INFO, camera_id +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_events.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/events/processor.py tests/unit/test_events.py +git commit -m "Handle pet and wildlife events in event processor" +``` + +--- + +### Task 11: Integrate detection into camera worker + +**Files:** +- Modify: `vigilar/camera/worker.py` + +This task integrates the YOLOv8 detector and pet ID classifier into the camera frame loop. This is the core wiring task — no new tests because it requires live camera/MQTT integration testing, but the underlying components (Tasks 5-7) are individually tested. + +- [ ] **Step 1: Add imports to worker.py** + +Add at the top of `vigilar/camera/worker.py` after existing imports: + +```python +from vigilar.detection.yolo import YOLODetector +from vigilar.detection.pet_id import PetIDClassifier +from vigilar.detection.wildlife import classify_wildlife_threat +``` + +- [ ] **Step 2: Add detector initialization in run_camera_worker** + +Add after the existing component setup (around line 90, after HLS and recorder init), before the main loop: + +```python + # Object detection (YOLOv8 unified detector) + yolo_detector = None + pet_classifier = None + if hasattr(camera_cfg, '_pets_config') and camera_cfg._pets_config.enabled: + pets_cfg = camera_cfg._pets_config + yolo_detector = YOLODetector( + model_path=pets_cfg.model_path, + confidence_threshold=pets_cfg.confidence_threshold, + ) + if pets_cfg.pet_id_enabled: + pet_classifier = PetIDClassifier( + model_path=pets_cfg.pet_id_model_path, + high_threshold=pets_cfg.pet_id_threshold, + low_threshold=pets_cfg.pet_id_low_confidence, + ) +``` + +Note: The `_pets_config` attribute will be set by the camera manager when spawning workers. This avoids changing the CameraConfig model's serialization. + +- [ ] **Step 3: Add detection processing after motion detection** + +Add after the motion START/END block (after line 244, before the heartbeat), inside the main loop: + +```python + # Run object detection on motion frames + if state.motion_active and yolo_detector and yolo_detector.is_loaded: + detections = yolo_detector.detect(frame) + for det in detections: + category = YOLODetector.classify(det) + if category == "person": + bus.publish_event( + Topics.camera_motion_start(camera_id), # reuse existing topic + detection="person", confidence=det.confidence, + ) + elif category == "domestic_animal": + # Crop for pet ID + x, y, w, h = det.bbox + crop = frame[max(0, y):y + h, max(0, x):x + w] + pet_result = None + if pet_classifier and pet_classifier.is_loaded and crop.size > 0: + pet_result = pet_classifier.identify(crop, species=det.class_name) + + payload = { + "species": det.class_name, + "confidence": round(det.confidence, 3), + "camera_location": camera_cfg.location, + } + if pet_result and pet_result.is_identified: + payload["pet_id"] = pet_result.pet_id + payload["pet_name"] = pet_result.pet_name + payload["pet_confidence"] = round(pet_result.confidence, 3) + # Publish pet location update + bus.publish_event( + Topics.pet_location(pet_result.pet_name.lower()), + camera_id=camera_id, + camera_location=camera_cfg.location, + ) + + bus.publish_event(Topics.camera_pet_detected(camera_id), **payload) + + elif category == "wildlife": + wildlife_cfg = camera_cfg._pets_config.wildlife if hasattr(camera_cfg, '_pets_config') else None + if wildlife_cfg: + frame_area = frame.shape[0] * frame.shape[1] + threat_level, species = classify_wildlife_threat( + det, wildlife_cfg, frame_area, + ) + bus.publish_event( + Topics.camera_wildlife_detected(camera_id), + species=species, + threat_level=threat_level, + confidence=round(det.confidence, 3), + camera_location=camera_cfg.location, + ) +``` + +- [ ] **Step 4: Update run_camera_worker signature to accept pets config** + +Update the function signature to pass pets config through: + +```python +def run_camera_worker( + camera_cfg: CameraConfig, + mqtt_cfg: MQTTConfig, + recordings_dir: str, + hls_dir: str, + remote_cfg: RemoteConfig | None = None, + pets_cfg: "PetsConfig | None" = None, +) -> None: +``` + +And replace the `hasattr` check with: + +```python + # Object detection (YOLOv8 unified detector) + yolo_detector = None + pet_classifier = None + if pets_cfg and pets_cfg.enabled: + yolo_detector = YOLODetector( + model_path=pets_cfg.model_path, + confidence_threshold=pets_cfg.confidence_threshold, + ) + if pets_cfg.pet_id_enabled: + pet_classifier = PetIDClassifier( + model_path=pets_cfg.pet_id_model_path, + high_threshold=pets_cfg.pet_id_threshold, + low_threshold=pets_cfg.pet_id_low_confidence, + ) +``` + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/camera/worker.py +git commit -m "Integrate YOLOv8 detection and pet ID into camera worker" +``` + +--- + +### Task 12: Add crop saving and staging cleanup + +**Files:** +- Create: `vigilar/detection/crop_manager.py` +- Test: `tests/unit/test_crop_manager.py` (new) + +- [ ] **Step 1: Write tests for crop manager** + +```python +# tests/unit/test_crop_manager.py +"""Tests for detection crop saving and staging cleanup.""" + +import time +from pathlib import Path + +import numpy as np + +from vigilar.detection.crop_manager import CropManager + + +class TestCropManager: + def test_save_crop(self, tmp_path): + manager = CropManager(staging_dir=str(tmp_path / "staging"), + training_dir=str(tmp_path / "training")) + crop = np.zeros((100, 80, 3), dtype=np.uint8) + path = manager.save_staging_crop(crop, species="cat", camera_id="kitchen") + assert Path(path).exists() + assert "cat" in path + assert "kitchen" in path + + def test_promote_to_training(self, tmp_path): + manager = CropManager(staging_dir=str(tmp_path / "staging"), + training_dir=str(tmp_path / "training")) + crop = np.zeros((100, 80, 3), dtype=np.uint8) + staging_path = manager.save_staging_crop(crop, species="cat", camera_id="kitchen") + training_path = manager.promote_to_training(staging_path, pet_name="angel") + assert Path(training_path).exists() + assert "angel" in training_path + assert not Path(staging_path).exists() + + def test_cleanup_old_crops(self, tmp_path): + staging = tmp_path / "staging" + staging.mkdir(parents=True) + + # Create an old file + old_file = staging / "old_crop.jpg" + old_file.write_bytes(b"fake") + # Set mtime to 10 days ago + old_time = time.time() - 10 * 86400 + import os + os.utime(old_file, (old_time, old_time)) + + # Create a recent file + new_file = staging / "new_crop.jpg" + new_file.write_bytes(b"fake") + + manager = CropManager(staging_dir=str(staging), training_dir=str(tmp_path / "training")) + deleted = manager.cleanup_expired(retention_days=7) + assert deleted == 1 + assert not old_file.exists() + assert new_file.exists() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_crop_manager.py -v` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement crop manager** + +```python +# vigilar/detection/crop_manager.py +"""Manage detection crop images for training and staging.""" + +import logging +import os +import shutil +import time +from pathlib import Path + +import cv2 +import numpy as np + +log = logging.getLogger(__name__) + + +class CropManager: + def __init__(self, staging_dir: str, training_dir: str): + self._staging_dir = Path(staging_dir) + self._training_dir = Path(training_dir) + + def save_staging_crop(self, crop: np.ndarray, species: str, camera_id: str) -> str: + self._staging_dir.mkdir(parents=True, exist_ok=True) + timestamp = int(time.time() * 1000) + filename = f"{species}_{camera_id}_{timestamp}.jpg" + filepath = self._staging_dir / filename + cv2.imwrite(str(filepath), crop) + return str(filepath) + + def promote_to_training(self, staging_path: str, pet_name: str) -> str: + pet_dir = self._training_dir / pet_name.lower() + pet_dir.mkdir(parents=True, exist_ok=True) + src = Path(staging_path) + dst = pet_dir / src.name + shutil.move(str(src), str(dst)) + return str(dst) + + def cleanup_expired(self, retention_days: int = 7) -> int: + if not self._staging_dir.exists(): + return 0 + + cutoff = time.time() - retention_days * 86400 + deleted = 0 + for filepath in self._staging_dir.iterdir(): + if filepath.is_file() and filepath.stat().st_mtime < cutoff: + filepath.unlink() + deleted += 1 + + if deleted: + log.info("Cleaned up %d expired staging crops", deleted) + return deleted +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_crop_manager.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/detection/crop_manager.py tests/unit/test_crop_manager.py +git commit -m "Add crop manager for staging and training image lifecycle" +``` + +--- + +### Task 13: Update daily digest with pet/wildlife summary + +**Files:** +- Modify: `vigilar/health/digest.py` +- Modify: `tests/unit/test_health.py` + +- [ ] **Step 1: Write tests for pet digest** + +Add to `tests/unit/test_health.py`: + +```python +from vigilar.health.digest import build_digest, format_digest +from vigilar.storage.schema import pet_sightings, wildlife_sightings + + +class TestPetDigest: + def test_digest_includes_pet_sightings(self, test_db, tmp_data_dir): + import time + with test_db.begin() as conn: + conn.execute(pet_sightings.insert().values( + ts=time.time(), pet_id="p1", species="cat", + camera_id="kitchen", confidence=0.9, labeled=1, + )) + conn.execute(pet_sightings.insert().values( + ts=time.time(), pet_id="p2", species="dog", + camera_id="kitchen", confidence=0.85, labeled=1, + )) + data = build_digest(test_db, str(tmp_data_dir), since_hours=1) + assert data["pet_sightings"] == 2 + + def test_digest_includes_wildlife(self, test_db, tmp_data_dir): + import time + with test_db.begin() as conn: + conn.execute(wildlife_sightings.insert().values( + ts=time.time(), species="bear", threat_level="PREDATOR", + camera_id="front", confidence=0.9, + )) + conn.execute(wildlife_sightings.insert().values( + ts=time.time(), species="deer", threat_level="PASSIVE", + camera_id="back", confidence=0.8, + )) + data = build_digest(test_db, str(tmp_data_dir), since_hours=1) + assert data["wildlife_predators"] == 1 + assert data["wildlife_passive"] == 1 + + def test_format_includes_pets(self): + data = { + "person_detections": 2, "unknown_vehicles": 0, + "recordings": 5, "disk_used_gb": 100.0, "disk_used_pct": 50, + "since_hours": 12, "pet_sightings": 15, + "wildlife_predators": 0, "wildlife_nuisance": 1, "wildlife_passive": 3, + } + text = format_digest(data) + assert "15 pet" in text + assert "1 nuisance" in text.lower() or "wildlife" in text.lower() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_health.py::TestPetDigest -v` +Expected: FAIL — build_digest doesn't query pet tables + +- [ ] **Step 3: Update digest.py** + +Update imports in `vigilar/health/digest.py`: + +```python +from vigilar.storage.schema import events, recordings, pet_sightings, wildlife_sightings +``` + +Add pet/wildlife queries to `build_digest` (inside the `with engine.connect()` block, after the recording_count query): + +```python + pet_count = conn.execute( + select(func.count()).select_from(pet_sightings) + .where(pet_sightings.c.ts >= since_ts / 1000) + ).scalar() or 0 + + wildlife_predator_count = conn.execute( + select(func.count()).select_from(wildlife_sightings) + .where(wildlife_sightings.c.ts >= since_ts / 1000, + wildlife_sightings.c.threat_level == "PREDATOR") + ).scalar() or 0 + + wildlife_nuisance_count = conn.execute( + select(func.count()).select_from(wildlife_sightings) + .where(wildlife_sightings.c.ts >= since_ts / 1000, + wildlife_sightings.c.threat_level == "NUISANCE") + ).scalar() or 0 + + wildlife_passive_count = conn.execute( + select(func.count()).select_from(wildlife_sightings) + .where(wildlife_sightings.c.ts >= since_ts / 1000, + wildlife_sightings.c.threat_level == "PASSIVE") + ).scalar() or 0 +``` + +Add to the return dict: + +```python + "pet_sightings": pet_count, + "wildlife_predators": wildlife_predator_count, + "wildlife_nuisance": wildlife_nuisance_count, + "wildlife_passive": wildlife_passive_count, +``` + +Update `format_digest`: + +```python +def format_digest(data: dict) -> str: + lines = [ + f"Vigilar Daily Summary", + f"Last {data['since_hours']}h: " + f"{data['person_detections']} person detections, " + f"{data['unknown_vehicles']} unknown vehicles, " + f"{data['recordings']} recordings", + ] + if data.get("pet_sightings", 0) > 0: + lines.append(f"Pets: {data['pet_sightings']} pet sightings") + wildlife_parts = [] + if data.get("wildlife_predators", 0) > 0: + wildlife_parts.append(f"{data['wildlife_predators']} predator") + if data.get("wildlife_nuisance", 0) > 0: + wildlife_parts.append(f"{data['wildlife_nuisance']} nuisance") + if data.get("wildlife_passive", 0) > 0: + wildlife_parts.append(f"{data['wildlife_passive']} passive") + if wildlife_parts: + lines.append(f"Wildlife: {', '.join(wildlife_parts)}") + lines.append(f"Storage: {data['disk_used_gb']} GB ({data['disk_used_pct']:.0f}%)") + return "\n".join(lines) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/unit/test_health.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add vigilar/health/digest.py tests/unit/test_health.py +git commit -m "Add pet and wildlife counts to daily digest" +``` + +--- + +### Task 14: Add pets web blueprint — API endpoints + +**Files:** +- Create: `vigilar/web/blueprints/pets.py` +- Test: `tests/unit/test_pets_api.py` (new) + +- [ ] **Step 1: Write tests for pets API** + +```python +# tests/unit/test_pets_api.py +"""Tests for pets web blueprint API endpoints.""" + +import json + +import pytest + +from vigilar.web.app import create_app + + +@pytest.fixture +def client(test_db, tmp_path, sample_config): + app = create_app(sample_config, db_engine=test_db) + app.config["TESTING"] = True + with app.test_client() as client: + yield client + + +class TestPetsAPI: + def test_register_pet(self, client): + resp = client.post("/pets/register", json={ + "name": "Angel", "species": "cat", "breed": "DSH", + "color_description": "black", + }) + assert resp.status_code == 200 + data = resp.get_json() + assert data["name"] == "Angel" + assert "id" in data + + def test_get_pet_status(self, client): + client.post("/pets/register", json={"name": "Angel", "species": "cat"}) + resp = client.get("/pets/api/status") + assert resp.status_code == 200 + data = resp.get_json() + assert len(data["pets"]) == 1 + assert data["pets"][0]["name"] == "Angel" + + def test_get_sightings_empty(self, client): + resp = client.get("/pets/api/sightings") + assert resp.status_code == 200 + data = resp.get_json() + assert data["sightings"] == [] + + def test_get_wildlife_empty(self, client): + resp = client.get("/pets/api/wildlife") + assert resp.status_code == 200 + data = resp.get_json() + assert data["sightings"] == [] + + def test_get_unlabeled_empty(self, client): + resp = client.get("/pets/api/unlabeled") + assert resp.status_code == 200 + data = resp.get_json() + assert data["crops"] == [] + + def test_label_sighting(self, client): + # Register a pet + resp = client.post("/pets/register", json={"name": "Angel", "species": "cat"}) + pet_id = resp.get_json()["id"] + + # Insert a sighting directly (simulating detection) + from vigilar.storage.queries import insert_pet_sighting + from flask import current_app + # Use the test_db through the app + with client.application.app_context(): + engine = client.application.extensions.get("db_engine") + if engine: + sighting_id = insert_pet_sighting(engine, species="cat", + camera_id="kitchen", confidence=0.6) + + resp = client.post(f"/pets/{pet_id}/label", json={"sighting_id": sighting_id}) + assert resp.status_code == 200 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/unit/test_pets_api.py -v` +Expected: FAIL — pets blueprint not registered + +- [ ] **Step 3: Implement pets blueprint** + +```python +# vigilar/web/blueprints/pets.py +"""Pets blueprint — pet management, training, sightings, and dashboard.""" + +import logging + +from flask import Blueprint, jsonify, render_template, request + +from vigilar.storage.queries import ( + get_all_pets, + get_pet, + get_pet_last_location, + get_pet_sightings, + get_training_images, + get_unlabeled_sightings, + get_wildlife_sightings, + insert_pet, + insert_training_image, + label_sighting, +) + +log = logging.getLogger(__name__) + +pets_bp = Blueprint("pets", __name__, url_prefix="/pets") + + +def _get_engine(): + from flask import current_app + return current_app.extensions["db_engine"] + + +@pets_bp.route("/") +def pets_dashboard(): + engine = _get_engine() + all_pets = get_all_pets(engine) + return render_template("pets/dashboard.html", pets=all_pets) + + +@pets_bp.route("/register", methods=["POST"]) +def register_pet(): + data = request.get_json() + engine = _get_engine() + pet_id = insert_pet( + engine, + name=data["name"], + species=data["species"], + breed=data.get("breed"), + color_description=data.get("color_description"), + ) + pet = get_pet(engine, pet_id) + return jsonify(pet) + + +@pets_bp.route("/api/status") +def pet_status(): + engine = _get_engine() + all_pets = get_all_pets(engine) + result = [] + for pet in all_pets: + loc = get_pet_last_location(engine, pet["id"]) + result.append({ + **pet, + "last_camera": loc["camera_id"] if loc else None, + "last_seen": loc["ts"] if loc else None, + }) + return jsonify({"pets": result}) + + +@pets_bp.route("/api/sightings") +def sightings(): + engine = _get_engine() + pet_id = request.args.get("pet_id") + camera_id = request.args.get("camera_id") + limit = int(request.args.get("limit", 100)) + rows = get_pet_sightings(engine, pet_id=pet_id, camera_id=camera_id, limit=limit) + return jsonify({"sightings": rows}) + + +@pets_bp.route("/api/wildlife") +def wildlife(): + engine = _get_engine() + threat_level = request.args.get("threat_level") + limit = int(request.args.get("limit", 100)) + rows = get_wildlife_sightings(engine, threat_level=threat_level, limit=limit) + return jsonify({"sightings": rows}) + + +@pets_bp.route("/api/unlabeled") +def unlabeled(): + engine = _get_engine() + species = request.args.get("species") + limit = int(request.args.get("limit", 50)) + rows = get_unlabeled_sightings(engine, species=species, limit=limit) + return jsonify({"crops": rows}) + + +@pets_bp.route("//label", methods=["POST"]) +def label_pet(pet_id): + data = request.get_json() + engine = _get_engine() + sighting_id = data["sighting_id"] + label_sighting(engine, sighting_id, pet_id) + return jsonify({"status": "labeled"}) + + +@pets_bp.route("//upload", methods=["POST"]) +def upload_training(pet_id): + engine = _get_engine() + pet = get_pet(engine, pet_id) + if not pet: + return jsonify({"error": "Pet not found"}), 404 + + files = request.files.getlist("images") + saved = [] + for f in files: + if f.filename: + from werkzeug.utils import secure_filename + from flask import current_app + import os + pets_cfg = current_app.config.get("PETS_CONFIG") + training_dir = pets_cfg.training_dir if pets_cfg else "/var/vigilar/pets/training" + pet_dir = os.path.join(training_dir, pet["name"].lower()) + os.makedirs(pet_dir, exist_ok=True) + filepath = os.path.join(pet_dir, secure_filename(f.filename)) + f.save(filepath) + insert_training_image(engine, pet_id=pet_id, image_path=filepath, source="upload") + saved.append(filepath) + + return jsonify({"uploaded": len(saved)}) + + +@pets_bp.route("/train", methods=["POST"]) +def train_model(): + from flask import current_app + pets_cfg = current_app.config.get("PETS_CONFIG") + if not pets_cfg: + return jsonify({"error": "Pet detection not configured"}), 400 + + from vigilar.detection.trainer import PetTrainer + trainer = PetTrainer( + training_dir=pets_cfg.training_dir, + model_output_path=pets_cfg.pet_id_model_path, + ) + + ready, msg = trainer.check_readiness(min_images=pets_cfg.min_training_images) + if not ready: + return jsonify({"error": msg}), 400 + + # Run training in background thread + import threading + thread = threading.Thread(target=trainer.train, daemon=True) + thread.start() + + return jsonify({"status": "training_started", "message": msg}) + + +@pets_bp.route("/api/training-status") +def training_status(): + # Training status is ephemeral — stored in the trainer instance + return jsonify({"status": "idle", "message": "No training in progress"}) + + +@pets_bp.route("/api/highlights") +def highlights(): + # Highlight reel — computed from sightings + recordings + engine = _get_engine() + import time + since = time.time() - 86400 # last 24h + sightings = get_pet_sightings(engine, since_ts=since, limit=50) + + highlights = [] + for s in sightings: + if s.get("confidence", 0) > 0.8: + highlights.append({ + "type": "pet_sighting", + "pet_id": s.get("pet_id"), + "camera_id": s["camera_id"], + "timestamp": s["ts"], + "confidence": s.get("confidence"), + }) + + return jsonify({"highlights": highlights[:20]}) + + +@pets_bp.route("//update", methods=["POST"]) +def update_pet(pet_id): + engine = _get_engine() + pet = get_pet(engine, pet_id) + if not pet: + return jsonify({"error": "Pet not found"}), 404 + + data = request.get_json() + from vigilar.storage.schema import pets as pets_table + with engine.begin() as conn: + updates = {} + for field in ("name", "breed", "color_description"): + if field in data: + updates[field] = data[field] + if updates: + conn.execute( + pets_table.update().where(pets_table.c.id == pet_id).values(**updates) + ) + return jsonify(get_pet(engine, pet_id)) + + +@pets_bp.route("//delete", methods=["DELETE"]) +def delete_pet(pet_id): + engine = _get_engine() + from vigilar.storage.schema import pets as pets_table + with engine.begin() as conn: + conn.execute(pets_table.delete().where(pets_table.c.id == pet_id)) + return jsonify({"status": "deleted"}) +``` + +- [ ] **Step 4: Register the blueprint in app.py** + +Read `vigilar/web/app.py` and add the pets blueprint registration alongside the existing blueprints: + +```python +from vigilar.web.blueprints.pets import pets_bp +app.register_blueprint(pets_bp) +``` + +Also store the db engine in app extensions for the blueprint to access: + +```python +app.extensions["db_engine"] = db_engine +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `pytest tests/unit/test_pets_api.py -v` +Expected: PASS + +- [ ] **Step 6: Commit** + +```bash +git add vigilar/web/blueprints/pets.py vigilar/web/app.py tests/unit/test_pets_api.py +git commit -m "Add pets web blueprint with API endpoints" +``` + +--- + +### Task 15: Add pets dashboard template + +**Files:** +- Create: `vigilar/web/templates/pets/dashboard.html` + +This is a Flask/Jinja2 template using Bootstrap 5 dark theme, matching the existing UI patterns. + +- [ ] **Step 1: Check existing template patterns** + +Read `vigilar/web/templates/` to understand the base template structure, navbar, and layout conventions used by other pages. + +- [ ] **Step 2: Create the pets dashboard template** + +Create `vigilar/web/templates/pets/dashboard.html` following the existing template patterns. Include: + +- Per-pet status cards (name, species, location, last seen, status indicator) +- Wildlife summary bar +- Activity timeline (24-hour bars per pet) +- Unlabeled detection queue (thumbnail grid) +- Pet management section (register, upload photos, train model) +- Highlight reel section + +Use Bootstrap 5 dark theme classes, match existing card/grid patterns from other templates. Use `fetch()` to load data from the `/pets/api/*` endpoints. + +- [ ] **Step 3: Verify template renders** + +Run the Flask dev server and check the `/pets/` route loads without errors. Verify the dark theme matches the rest of the UI. + +- [ ] **Step 4: Commit** + +```bash +git add vigilar/web/templates/pets/dashboard.html +git commit -m "Add pets dashboard template with Bootstrap 5 dark theme" +``` + +--- + +### Task 16: Add pet labeling UI to recordings playback + +**Files:** +- Modify: `vigilar/web/templates/recordings/` (playback template) +- Modify: `vigilar/web/static/js/` (add pet labeling JS) + +- [ ] **Step 1: Add labeling overlay to recording playback** + +Add a JavaScript module that: +- Renders bounding box overlays on the video player when detection data is available +- Shows a "Who is this?" popup when clicking an animal bounding box +- Fetches registered pets filtered by species from `/pets/api/status` +- Calls `/pets//label` when user selects a pet +- Shows "Unknown" and "+ New Pet" options + +- [ ] **Step 2: Add CSS for bounding box overlays** + +Add styles to the existing CSS for: +- `.detection-overlay` — positioned over video, pointer-events transparent +- `.detection-bbox` — colored border with species/confidence label +- `.label-popup` — dark-themed dropdown for pet selection + +- [ ] **Step 3: Test the labeling flow manually** + +Verify that clicking a bounding box shows the popup, selecting a pet calls the API, and the popup dismisses. + +- [ ] **Step 4: Commit** + +```bash +git add vigilar/web/templates/ vigilar/web/static/ +git commit -m "Add pet labeling UI overlay to recording playback" +``` + +--- + +### Task 17: Add dependencies to pyproject.toml + +**Files:** +- Modify: `pyproject.toml` + +- [ ] **Step 1: Add ultralytics and torchvision** + +Add to the dependencies section: + +```toml +ultralytics = ">=8.2.0" +torchvision = ">=0.18.0" +``` + +- [ ] **Step 2: Install and verify** + +Run: `pip install -e ".[dev]"` +Expected: Installs successfully with new dependencies + +- [ ] **Step 3: Commit** + +```bash +git add pyproject.toml +git commit -m "Add ultralytics and torchvision dependencies for pet detection" +``` + +--- + +### Task 18: Run full test suite and fix any issues + +**Files:** All modified files + +- [ ] **Step 1: Run complete test suite** + +Run: `pytest tests/ -v --tb=short` +Expected: All tests pass + +- [ ] **Step 2: Run ruff linter** + +Run: `ruff check vigilar/` +Expected: No errors (fix any that appear) + +- [ ] **Step 3: Run type checker** + +Run: `mypy vigilar/ --ignore-missing-imports` +Expected: No new errors + +- [ ] **Step 4: Fix any issues found** + +Address any test failures, lint errors, or type errors. Commit fixes individually. + +- [ ] **Step 5: Final commit** + +```bash +git commit -m "Fix lint and type issues from pet detection integration" +``` + +--- + +## File Structure Summary + +### New Files +| File | Purpose | +|------|---------| +| `vigilar/detection/yolo.py` | YOLOv8 unified detector (replaces MobileNet) | +| `vigilar/detection/wildlife.py` | Wildlife threat classification | +| `vigilar/detection/pet_id.py` | Pet identification classifier | +| `vigilar/detection/trainer.py` | Pet ID model trainer (MobileNetV3-Small) | +| `vigilar/detection/crop_manager.py` | Staging/training crop lifecycle | +| `vigilar/web/blueprints/pets.py` | Pets web blueprint (API + views) | +| `vigilar/web/templates/pets/dashboard.html` | Pet dashboard UI | +| `tests/unit/test_constants.py` | Tests for new enums | +| `tests/unit/test_yolo_detector.py` | Tests for YOLO detector | +| `tests/unit/test_wildlife.py` | Tests for wildlife classification | +| `tests/unit/test_pet_id.py` | Tests for pet ID classifier | +| `tests/unit/test_trainer.py` | Tests for model trainer | +| `tests/unit/test_crop_manager.py` | Tests for crop manager | +| `tests/unit/test_pet_queries.py` | Tests for DB query functions | +| `tests/unit/test_pets_api.py` | Tests for pets API endpoints | + +### Modified Files +| File | Changes | +|------|---------| +| `vigilar/constants.py` | Add ThreatLevel, CameraLocation enums; 6 EventType values; pet MQTT topics | +| `vigilar/config.py` | Add PetsConfig, WildlifeConfig, PetActivityConfig; location field on CameraConfig | +| `vigilar/storage/schema.py` | Add pets, pet_sightings, wildlife_sightings, pet_training_images tables | +| `vigilar/storage/queries.py` | Add pet/wildlife/training query functions | +| `vigilar/camera/worker.py` | Integrate YOLO detection + pet ID into frame loop | +| `vigilar/alerts/profiles.py` | Add camera_location filtering to get_action_for_event | +| `vigilar/events/processor.py` | Handle pet/wildlife event classification | +| `vigilar/health/digest.py` | Add pet/wildlife counts to daily digest | +| `vigilar/web/app.py` | Register pets blueprint | +| `pyproject.toml` | Add ultralytics, torchvision deps | +| `tests/unit/test_config.py` | Tests for new config models | +| `tests/unit/test_profiles.py` | Tests for pet alert routing | +| `tests/unit/test_events.py` | Tests for pet event classification | +| `tests/unit/test_health.py` | Tests for pet digest |