# Pet-Aware Security Features Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add pet detection, pet identification, wildlife monitoring, and pet activity tracking to Vigilar using YOLOv8 unified detection and a trainable pet ID classifier. **Architecture:** Replace MobileNet-SSD with YOLOv8-small for all object detection (people, vehicles, animals) in a single inference pass. Add a second-stage MobileNetV3-Small classifier for identifying specific pets from cropped bounding boxes. Wildlife gets threat-tiered alerting. Pet dashboard with activity tracking and highlight reels. **Tech Stack:** ultralytics (YOLOv8), torchvision (MobileNetV3-Small), existing Flask/Bootstrap/SQLAlchemy stack. **Spec:** `docs/superpowers/specs/2026-04-03-pet-aware-security-design.md` --- ### Task 1: Add new constants, enums, and MQTT topics **Files:** - Modify: `vigilar/constants.py` - Test: `tests/unit/test_constants.py` (new) - [ ] **Step 1: Write tests for new enums and topics** ```python # tests/unit/test_constants.py """Tests for new pet/wildlife constants.""" from vigilar.constants import ( CameraLocation, EventType, ThreatLevel, Topics, ) class TestThreatLevel: def test_values(self): assert ThreatLevel.PREDATOR == "PREDATOR" assert ThreatLevel.NUISANCE == "NUISANCE" assert ThreatLevel.PASSIVE == "PASSIVE" def test_is_strenum(self): assert isinstance(ThreatLevel.PREDATOR, str) class TestCameraLocation: def test_values(self): assert CameraLocation.EXTERIOR == "EXTERIOR" assert CameraLocation.INTERIOR == "INTERIOR" assert CameraLocation.TRANSITION == "TRANSITION" def test_is_strenum(self): assert isinstance(CameraLocation.EXTERIOR, str) class TestNewEventTypes: def test_pet_events_exist(self): assert EventType.PET_DETECTED == "PET_DETECTED" assert EventType.PET_ESCAPE == "PET_ESCAPE" assert EventType.UNKNOWN_ANIMAL == "UNKNOWN_ANIMAL" def test_wildlife_events_exist(self): assert EventType.WILDLIFE_PREDATOR == "WILDLIFE_PREDATOR" assert EventType.WILDLIFE_NUISANCE == "WILDLIFE_NUISANCE" assert EventType.WILDLIFE_PASSIVE == "WILDLIFE_PASSIVE" class TestPetTopics: def test_pet_detected_topic(self): assert Topics.camera_pet_detected("front") == "vigilar/camera/front/pet/detected" def test_wildlife_detected_topic(self): assert Topics.camera_wildlife_detected("front") == "vigilar/camera/front/wildlife/detected" def test_pet_location_topic(self): assert Topics.pet_location("angel") == "vigilar/pets/angel/location" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_constants.py -v` Expected: FAIL — ThreatLevel, CameraLocation not defined, new EventType values missing - [ ] **Step 3: Add new enums and event types to constants.py** Add after the `HouseholdState` enum (after line 103): ```python # --- Threat Levels (Wildlife) --- class ThreatLevel(StrEnum): PREDATOR = "PREDATOR" NUISANCE = "NUISANCE" PASSIVE = "PASSIVE" # --- Camera Location --- class CameraLocation(StrEnum): EXTERIOR = "EXTERIOR" INTERIOR = "INTERIOR" TRANSITION = "TRANSITION" ``` Add to the `EventType` enum (after line 42, before the closing of the class): ```python PET_DETECTED = "PET_DETECTED" PET_ESCAPE = "PET_ESCAPE" UNKNOWN_ANIMAL = "UNKNOWN_ANIMAL" WILDLIFE_PREDATOR = "WILDLIFE_PREDATOR" WILDLIFE_NUISANCE = "WILDLIFE_NUISANCE" WILDLIFE_PASSIVE = "WILDLIFE_PASSIVE" ``` Add to `RecordingTrigger` enum (after line 70): ```python PET = "PET" WILDLIFE = "WILDLIFE" ``` Add new topic methods to the `Topics` class (after the presence methods, before the system constants): ```python # Pet @staticmethod def camera_pet_detected(camera_id: str) -> str: return f"vigilar/camera/{camera_id}/pet/detected" @staticmethod def camera_wildlife_detected(camera_id: str) -> str: return f"vigilar/camera/{camera_id}/wildlife/detected" @staticmethod def pet_location(pet_name: str) -> str: return f"vigilar/pets/{pet_name}/location" ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_constants.py -v` Expected: PASS - [ ] **Step 5: Run full test suite to check for regressions** Run: `pytest tests/ -v` Expected: All existing tests still pass - [ ] **Step 6: Commit** ```bash git add vigilar/constants.py tests/unit/test_constants.py git commit -m "Add pet/wildlife enums, event types, and MQTT topics" ``` --- ### Task 2: Add pet and wildlife config models **Files:** - Modify: `vigilar/config.py` - Modify: `tests/unit/test_config.py` - [ ] **Step 1: Write tests for new config models** Add to `tests/unit/test_config.py`: ```python from vigilar.config import PetsConfig, WildlifeThreatMap, WildlifeSizeHeuristics, PetActivityConfig class TestPetsConfig: def test_defaults(self): cfg = PetsConfig() assert cfg.enabled is False assert cfg.model == "yolov8s" assert cfg.confidence_threshold == 0.5 assert cfg.pet_id_threshold == 0.7 assert cfg.pet_id_low_confidence == 0.5 assert cfg.min_training_images == 20 assert cfg.crop_retention_days == 7 def test_custom_values(self): cfg = PetsConfig(enabled=True, model="yolov8m", confidence_threshold=0.6) assert cfg.enabled is True assert cfg.model == "yolov8m" assert cfg.confidence_threshold == 0.6 class TestWildlifeThreatMap: def test_defaults(self): tm = WildlifeThreatMap() assert "bear" in tm.predator assert "bird" in tm.passive def test_custom_mapping(self): tm = WildlifeThreatMap(predator=["bear", "wolf"], nuisance=["raccoon"]) assert "wolf" in tm.predator assert "raccoon" in tm.nuisance class TestWildlifeSizeHeuristics: def test_defaults(self): sh = WildlifeSizeHeuristics() assert sh.small == 0.02 assert sh.medium == 0.08 assert sh.large == 0.15 class TestPetActivityConfig: def test_defaults(self): cfg = PetActivityConfig() assert cfg.daily_digest is True assert cfg.highlight_clips is True assert cfg.zoomie_threshold == 0.8 class TestCameraConfigLocation: def test_default_location_is_interior(self): from vigilar.config import CameraConfig cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x") assert cfg.location == "INTERIOR" def test_exterior_location(self): from vigilar.config import CameraConfig cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x", location="EXTERIOR") assert cfg.location == "EXTERIOR" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_config.py::TestPetsConfig -v` Expected: FAIL — PetsConfig not defined - [ ] **Step 3: Add config models to config.py** Add after `HealthConfig` (around line 239) and before `VigilarConfig`: ```python # --- Pet Detection Config --- class WildlifeThreatMap(BaseModel): predator: list[str] = Field(default_factory=lambda: ["bear"]) nuisance: list[str] = Field(default_factory=list) passive: list[str] = Field(default_factory=lambda: ["bird", "horse", "cow", "sheep"]) class WildlifeSizeHeuristics(BaseModel): small: float = 0.02 # < 2% of frame → nuisance medium: float = 0.08 # 2-8% → predator large: float = 0.15 # > 8% → passive (deer-sized) class WildlifeConfig(BaseModel): threat_map: WildlifeThreatMap = Field(default_factory=WildlifeThreatMap) size_heuristics: WildlifeSizeHeuristics = Field(default_factory=WildlifeSizeHeuristics) class PetActivityConfig(BaseModel): daily_digest: bool = True highlight_clips: bool = True zoomie_threshold: float = 0.8 class PetsConfig(BaseModel): enabled: bool = False model: str = "yolov8s" model_path: str = "/var/vigilar/models/yolov8s.pt" confidence_threshold: float = 0.5 pet_id_enabled: bool = True pet_id_model_path: str = "/var/vigilar/models/pet_id.pt" pet_id_threshold: float = 0.7 pet_id_low_confidence: float = 0.5 training_dir: str = "/var/vigilar/pets/training" crop_staging_dir: str = "/var/vigilar/pets/staging" crop_retention_days: int = 7 min_training_images: int = 20 wildlife: WildlifeConfig = Field(default_factory=WildlifeConfig) activity: PetActivityConfig = Field(default_factory=PetActivityConfig) ``` Add `location` field to `CameraConfig` (after the existing fields, around line 45): ```python location: str = "INTERIOR" # EXTERIOR | INTERIOR | TRANSITION ``` Add `pets` field to `VigilarConfig`: ```python pets: PetsConfig = Field(default_factory=PetsConfig) ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_config.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/config.py tests/unit/test_config.py git commit -m "Add pet detection, wildlife, and activity config models" ``` --- ### Task 3: Add database tables for pets, sightings, and training **Files:** - Modify: `vigilar/storage/schema.py` - Modify: `vigilar/storage/queries.py` - Modify: `tests/unit/test_schema.py` - [ ] **Step 1: Write tests for new tables** Add to `tests/unit/test_schema.py`: ```python from vigilar.storage.schema import pets, pet_sightings, wildlife_sightings, pet_training_images class TestPetTables: def test_pets_table_exists(self, test_db): with test_db.connect() as conn: result = conn.execute(pets.insert().values( id="pet-1", name="Angel", species="cat", breed="DSH", color_description="black", training_count=0, created_at=1000.0, )) row = conn.execute(pets.select().where(pets.c.id == "pet-1")).first() assert row is not None assert row.name == "Angel" assert row.species == "cat" def test_pet_sightings_table(self, test_db): with test_db.begin() as conn: conn.execute(pets.insert().values( id="pet-1", name="Angel", species="cat", training_count=0, created_at=1000.0, )) conn.execute(pet_sightings.insert().values( ts=1000.0, pet_id="pet-1", species="cat", camera_id="kitchen", confidence=0.92, labeled=True, )) rows = conn.execute(pet_sightings.select()).fetchall() assert len(rows) == 1 assert rows[0].camera_id == "kitchen" def test_wildlife_sightings_table(self, test_db): with test_db.begin() as conn: conn.execute(wildlife_sightings.insert().values( ts=1000.0, species="bear", threat_level="PREDATOR", camera_id="front", confidence=0.88, )) rows = conn.execute(wildlife_sightings.select()).fetchall() assert len(rows) == 1 assert rows[0].threat_level == "PREDATOR" def test_pet_training_images_table(self, test_db): with test_db.begin() as conn: conn.execute(pets.insert().values( id="pet-1", name="Angel", species="cat", training_count=0, created_at=1000.0, )) conn.execute(pet_training_images.insert().values( pet_id="pet-1", image_path="/var/vigilar/pets/training/angel/001.jpg", source="upload", created_at=1000.0, )) rows = conn.execute(pet_training_images.select()).fetchall() assert len(rows) == 1 assert rows[0].source == "upload" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_schema.py::TestPetTables -v` Expected: FAIL — tables not defined - [ ] **Step 3: Add new tables to schema.py** Add after `push_subscriptions` table (after line 125): ```python pets = Table( "pets", metadata, Column("id", String, primary_key=True), Column("name", String, nullable=False), Column("species", String, nullable=False), Column("breed", String), Column("color_description", String), Column("photo_path", String), Column("training_count", Integer, nullable=False, default=0), Column("created_at", Float, nullable=False), ) pet_sightings = Table( "pet_sightings", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("ts", Float, nullable=False), Column("pet_id", String), Column("species", String, nullable=False), Column("camera_id", String, nullable=False), Column("confidence", Float), Column("crop_path", String), Column("labeled", Integer, nullable=False, default=0), Column("event_id", Integer), ) Index("idx_pet_sightings_ts", pet_sightings.c.ts.desc()) Index("idx_pet_sightings_pet", pet_sightings.c.pet_id, pet_sightings.c.ts.desc()) Index("idx_pet_sightings_camera", pet_sightings.c.camera_id, pet_sightings.c.ts.desc()) wildlife_sightings = Table( "wildlife_sightings", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("ts", Float, nullable=False), Column("species", String, nullable=False), Column("threat_level", String, nullable=False), Column("camera_id", String, nullable=False), Column("confidence", Float), Column("crop_path", String), Column("event_id", Integer), ) Index("idx_wildlife_ts", wildlife_sightings.c.ts.desc()) Index("idx_wildlife_threat", wildlife_sightings.c.threat_level, wildlife_sightings.c.ts.desc()) pet_training_images = Table( "pet_training_images", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("pet_id", String, nullable=False), Column("image_path", String, nullable=False), Column("source", String, nullable=False), Column("created_at", Float, nullable=False), ) ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_schema.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/storage/schema.py tests/unit/test_schema.py git commit -m "Add pets, pet_sightings, wildlife_sightings, pet_training_images tables" ``` --- ### Task 4: Add pet and wildlife database query functions **Files:** - Modify: `vigilar/storage/queries.py` - Test: `tests/unit/test_pet_queries.py` (new) - [ ] **Step 1: Write tests for pet query functions** ```python # tests/unit/test_pet_queries.py """Tests for pet and wildlife query functions.""" import time from vigilar.storage.queries import ( insert_pet, get_pet, get_all_pets, insert_pet_sighting, get_pet_sightings, get_pet_last_location, insert_wildlife_sighting, get_wildlife_sightings, insert_training_image, get_training_images, get_unlabeled_sightings, label_sighting, ) class TestPetCRUD: def test_insert_and_get_pet(self, test_db): pet_id = insert_pet(test_db, name="Angel", species="cat", breed="DSH", color_description="black") pet = get_pet(test_db, pet_id) assert pet is not None assert pet["name"] == "Angel" assert pet["species"] == "cat" assert pet["training_count"] == 0 def test_get_all_pets(self, test_db): insert_pet(test_db, name="Angel", species="cat") insert_pet(test_db, name="Milo", species="dog") all_pets = get_all_pets(test_db) assert len(all_pets) == 2 class TestPetSightings: def test_insert_and_query(self, test_db): pet_id = insert_pet(test_db, name="Angel", species="cat") insert_pet_sighting(test_db, pet_id=pet_id, species="cat", camera_id="kitchen", confidence=0.92) sightings = get_pet_sightings(test_db, limit=10) assert len(sightings) == 1 assert sightings[0]["camera_id"] == "kitchen" def test_last_location(self, test_db): pet_id = insert_pet(test_db, name="Angel", species="cat") insert_pet_sighting(test_db, pet_id=pet_id, species="cat", camera_id="kitchen", confidence=0.9) insert_pet_sighting(test_db, pet_id=pet_id, species="cat", camera_id="living_room", confidence=0.95) loc = get_pet_last_location(test_db, pet_id) assert loc is not None assert loc["camera_id"] == "living_room" def test_unlabeled_sightings(self, test_db): insert_pet_sighting(test_db, pet_id=None, species="cat", camera_id="kitchen", confidence=0.6, crop_path="/tmp/crop.jpg") unlabeled = get_unlabeled_sightings(test_db, limit=10) assert len(unlabeled) == 1 assert unlabeled[0]["labeled"] == 0 def test_label_sighting(self, test_db): pet_id = insert_pet(test_db, name="Angel", species="cat") insert_pet_sighting(test_db, pet_id=None, species="cat", camera_id="kitchen", confidence=0.6) sightings = get_unlabeled_sightings(test_db, limit=10) sighting_id = sightings[0]["id"] label_sighting(test_db, sighting_id, pet_id) updated = get_pet_sightings(test_db, pet_id=pet_id) assert len(updated) == 1 assert updated[0]["labeled"] == 1 class TestWildlifeSightings: def test_insert_and_query(self, test_db): insert_wildlife_sighting(test_db, species="bear", threat_level="PREDATOR", camera_id="front", confidence=0.88) sightings = get_wildlife_sightings(test_db, limit=10) assert len(sightings) == 1 assert sightings[0]["threat_level"] == "PREDATOR" def test_filter_by_threat(self, test_db): insert_wildlife_sighting(test_db, species="bear", threat_level="PREDATOR", camera_id="front", confidence=0.88) insert_wildlife_sighting(test_db, species="deer", threat_level="PASSIVE", camera_id="back", confidence=0.75) predators = get_wildlife_sightings(test_db, threat_level="PREDATOR") assert len(predators) == 1 class TestTrainingImages: def test_insert_and_query(self, test_db): pet_id = insert_pet(test_db, name="Angel", species="cat") insert_training_image(test_db, pet_id=pet_id, image_path="/var/vigilar/pets/training/angel/001.jpg", source="upload") images = get_training_images(test_db, pet_id) assert len(images) == 1 assert images[0]["source"] == "upload" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_pet_queries.py -v` Expected: FAIL — functions not defined - [ ] **Step 3: Implement query functions** Add to `vigilar/storage/queries.py`. Add imports at the top: ```python from vigilar.storage.schema import ( # existing imports... pets, pet_sightings, pet_training_images, wildlife_sightings, ) ``` Add the following functions at the bottom of the file: ```python # --- Pets --- def insert_pet( engine: Engine, name: str, species: str, breed: str | None = None, color_description: str | None = None, photo_path: str | None = None, ) -> str: import uuid pet_id = str(uuid.uuid4()) with engine.begin() as conn: conn.execute(pets.insert().values( id=pet_id, name=name, species=species, breed=breed, color_description=color_description, photo_path=photo_path, training_count=0, created_at=time.time(), )) return pet_id def get_pet(engine: Engine, pet_id: str) -> dict[str, Any] | None: with engine.connect() as conn: row = conn.execute(pets.select().where(pets.c.id == pet_id)).first() return dict(row._mapping) if row else None def get_all_pets(engine: Engine) -> list[dict[str, Any]]: with engine.connect() as conn: rows = conn.execute(pets.select().order_by(pets.c.name)).fetchall() return [dict(r._mapping) for r in rows] # --- Pet Sightings --- def insert_pet_sighting( engine: Engine, species: str, camera_id: str, confidence: float, pet_id: str | None = None, crop_path: str | None = None, event_id: int | None = None, ) -> int: with engine.begin() as conn: result = conn.execute(pet_sightings.insert().values( ts=time.time(), pet_id=pet_id, species=species, camera_id=camera_id, confidence=confidence, crop_path=crop_path, labeled=1 if pet_id else 0, event_id=event_id, )) return result.inserted_primary_key[0] def get_pet_sightings( engine: Engine, pet_id: str | None = None, camera_id: str | None = None, since_ts: float | None = None, limit: int = 100, ) -> list[dict[str, Any]]: query = select(pet_sightings).order_by(desc(pet_sightings.c.ts)).limit(limit) if pet_id: query = query.where(pet_sightings.c.pet_id == pet_id) if camera_id: query = query.where(pet_sightings.c.camera_id == camera_id) if since_ts: query = query.where(pet_sightings.c.ts >= since_ts) with engine.connect() as conn: rows = conn.execute(query).fetchall() return [dict(r._mapping) for r in rows] def get_pet_last_location(engine: Engine, pet_id: str) -> dict[str, Any] | None: with engine.connect() as conn: row = conn.execute( select(pet_sightings) .where(pet_sightings.c.pet_id == pet_id) .order_by(desc(pet_sightings.c.ts)) .limit(1) ).first() return dict(row._mapping) if row else None def get_unlabeled_sightings( engine: Engine, species: str | None = None, limit: int = 50, ) -> list[dict[str, Any]]: query = ( select(pet_sightings) .where(pet_sightings.c.labeled == 0) .order_by(desc(pet_sightings.c.ts)) .limit(limit) ) if species: query = query.where(pet_sightings.c.species == species) with engine.connect() as conn: rows = conn.execute(query).fetchall() return [dict(r._mapping) for r in rows] def label_sighting(engine: Engine, sighting_id: int, pet_id: str) -> None: with engine.begin() as conn: conn.execute( pet_sightings.update() .where(pet_sightings.c.id == sighting_id) .values(pet_id=pet_id, labeled=1) ) # --- Wildlife Sightings --- def insert_wildlife_sighting( engine: Engine, species: str, threat_level: str, camera_id: str, confidence: float, crop_path: str | None = None, event_id: int | None = None, ) -> int: with engine.begin() as conn: result = conn.execute(wildlife_sightings.insert().values( ts=time.time(), species=species, threat_level=threat_level, camera_id=camera_id, confidence=confidence, crop_path=crop_path, event_id=event_id, )) return result.inserted_primary_key[0] def get_wildlife_sightings( engine: Engine, threat_level: str | None = None, camera_id: str | None = None, since_ts: float | None = None, limit: int = 100, ) -> list[dict[str, Any]]: query = select(wildlife_sightings).order_by(desc(wildlife_sightings.c.ts)).limit(limit) if threat_level: query = query.where(wildlife_sightings.c.threat_level == threat_level) if camera_id: query = query.where(wildlife_sightings.c.camera_id == camera_id) if since_ts: query = query.where(wildlife_sightings.c.ts >= since_ts) with engine.connect() as conn: rows = conn.execute(query).fetchall() return [dict(r._mapping) for r in rows] # --- Training Images --- def insert_training_image( engine: Engine, pet_id: str, image_path: str, source: str, ) -> int: with engine.begin() as conn: result = conn.execute(pet_training_images.insert().values( pet_id=pet_id, image_path=image_path, source=source, created_at=time.time(), )) # Update training count on pet conn.execute( pets.update().where(pets.c.id == pet_id) .values(training_count=pets.c.training_count + 1) ) return result.inserted_primary_key[0] def get_training_images( engine: Engine, pet_id: str, ) -> list[dict[str, Any]]: with engine.connect() as conn: rows = conn.execute( select(pet_training_images) .where(pet_training_images.c.pet_id == pet_id) .order_by(desc(pet_training_images.c.created_at)) ).fetchall() return [dict(r._mapping) for r in rows] ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_pet_queries.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/storage/queries.py tests/unit/test_pet_queries.py git commit -m "Add pet and wildlife database query functions" ``` --- ### Task 5: Implement YOLOv8 unified detector **Files:** - Create: `vigilar/detection/yolo.py` - Test: `tests/unit/test_yolo_detector.py` (new) - [ ] **Step 1: Write tests for YOLOv8 detector** ```python # tests/unit/test_yolo_detector.py """Tests for YOLOv8 unified detector.""" import numpy as np import pytest from vigilar.detection.person import Detection from vigilar.detection.yolo import YOLODetector, ANIMAL_CLASSES, WILDLIFE_CLASSES class TestYOLOConstants: def test_animal_classes(self): assert "cat" in ANIMAL_CLASSES assert "dog" in ANIMAL_CLASSES def test_wildlife_classes(self): assert "bear" in WILDLIFE_CLASSES assert "bird" in WILDLIFE_CLASSES def test_no_overlap_animal_wildlife(self): assert not ANIMAL_CLASSES.intersection(WILDLIFE_CLASSES) class TestYOLODetector: def test_initializes_without_model(self): detector = YOLODetector(model_path="nonexistent.pt", confidence_threshold=0.5) assert not detector.is_loaded def test_detect_returns_empty_when_not_loaded(self): detector = YOLODetector(model_path="nonexistent.pt") frame = np.zeros((480, 640, 3), dtype=np.uint8) detections = detector.detect(frame) assert detections == [] def test_classify_detection_person(self): d = Detection(class_name="person", class_id=0, confidence=0.9, bbox=(10, 20, 100, 200)) assert YOLODetector.classify(d) == "person" def test_classify_detection_vehicle(self): d = Detection(class_name="car", class_id=2, confidence=0.85, bbox=(10, 20, 100, 200)) assert YOLODetector.classify(d) == "vehicle" def test_classify_detection_domestic_animal(self): d = Detection(class_name="cat", class_id=15, confidence=0.9, bbox=(10, 20, 100, 200)) assert YOLODetector.classify(d) == "domestic_animal" def test_classify_detection_wildlife(self): d = Detection(class_name="bear", class_id=21, confidence=0.8, bbox=(10, 20, 100, 200)) assert YOLODetector.classify(d) == "wildlife" def test_classify_detection_other(self): d = Detection(class_name="chair", class_id=56, confidence=0.7, bbox=(10, 20, 100, 200)) assert YOLODetector.classify(d) == "other" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_yolo_detector.py -v` Expected: FAIL — yolo module not found - [ ] **Step 3: Implement YOLOv8 detector** ```python # vigilar/detection/yolo.py """Unified object detection using YOLOv8 via ultralytics.""" import logging from pathlib import Path import numpy as np from vigilar.detection.person import Detection log = logging.getLogger(__name__) # COCO class IDs for domestic animals ANIMAL_CLASSES = {"cat", "dog"} # COCO class IDs for wildlife (subset that YOLO can detect) WILDLIFE_CLASSES = {"bear", "bird", "horse", "cow", "sheep", "elephant", "zebra", "giraffe"} # Vehicle class names from COCO VEHICLE_CLASSES = {"car", "motorcycle", "bus", "truck", "boat"} class YOLODetector: def __init__(self, model_path: str, confidence_threshold: float = 0.5): self._threshold = confidence_threshold self._model = None self.is_loaded = False if Path(model_path).exists(): try: from ultralytics import YOLO self._model = YOLO(model_path) self.is_loaded = True log.info("YOLO model loaded from %s", model_path) except Exception as e: log.error("Failed to load YOLO model: %s", e) else: log.warning("YOLO model not found at %s — detection disabled", model_path) def detect(self, frame: np.ndarray) -> list[Detection]: if not self.is_loaded or self._model is None: return [] results = self._model(frame, conf=self._threshold, verbose=False) detections = [] for result in results: for box in result.boxes: class_id = int(box.cls[0]) confidence = float(box.conf[0]) class_name = result.names[class_id] x1, y1, x2, y2 = box.xyxy[0].tolist() x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) bw, bh = x2 - x1, y2 - y1 if bw <= 0 or bh <= 0: continue detections.append(Detection( class_name=class_name, class_id=class_id, confidence=confidence, bbox=(x1, y1, bw, bh), )) return detections @staticmethod def classify(detection: Detection) -> str: name = detection.class_name if name == "person": return "person" if name in VEHICLE_CLASSES: return "vehicle" if name in ANIMAL_CLASSES: return "domestic_animal" if name in WILDLIFE_CLASSES: return "wildlife" return "other" ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_yolo_detector.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/detection/yolo.py tests/unit/test_yolo_detector.py git commit -m "Add YOLOv8 unified detector with class classification" ``` --- ### Task 6: Implement wildlife threat classifier **Files:** - Create: `vigilar/detection/wildlife.py` - Test: `tests/unit/test_wildlife.py` (new) - [ ] **Step 1: Write tests for wildlife threat classification** ```python # tests/unit/test_wildlife.py """Tests for wildlife threat classification.""" from vigilar.config import WildlifeConfig, WildlifeThreatMap, WildlifeSizeHeuristics from vigilar.detection.person import Detection from vigilar.detection.wildlife import classify_wildlife_threat def _make_config(**kwargs): return WildlifeConfig(**kwargs) class TestWildlifeThreatClassification: def test_bear_is_predator(self): cfg = _make_config() d = Detection(class_name="bear", class_id=21, confidence=0.9, bbox=(100, 100, 200, 300)) level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) assert level == "PREDATOR" assert species == "bear" def test_bird_is_passive(self): cfg = _make_config() d = Detection(class_name="bird", class_id=14, confidence=0.8, bbox=(10, 10, 30, 20)) level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) assert level == "PASSIVE" assert species == "bird" def test_unknown_small_is_nuisance(self): cfg = _make_config() # Small bbox relative to frame → nuisance (raccoon/skunk sized) # frame_area = 1920*1080 = 2073600, bbox area = 30*30 = 900 → 0.04% d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 30, 30)) level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) assert level == "NUISANCE" assert species == "unknown" def test_unknown_medium_is_predator(self): cfg = _make_config() # Medium bbox → predator (fox/coyote sized) # bbox area = 200*150 = 30000, frame = 2073600 → 1.4%... need bigger # bbox area = 300*300 = 90000 / 2073600 → 4.3% → between small and medium thresholds d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 300, 300)) level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) assert level == "PREDATOR" def test_unknown_large_is_passive(self): cfg = _make_config() # Large bbox → passive (deer sized) # bbox area = 600*500 = 300000 / 2073600 → 14.5% → > large threshold d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 600, 500)) level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) assert level == "PASSIVE" def test_custom_threat_map(self): cfg = _make_config(threat_map=WildlifeThreatMap(predator=["bear", "wolf"])) d = Detection(class_name="wolf", class_id=99, confidence=0.85, bbox=(100, 100, 200, 200)) level, _ = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080) assert level == "PREDATOR" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_wildlife.py -v` Expected: FAIL — wildlife module not found - [ ] **Step 3: Implement wildlife classifier** ```python # vigilar/detection/wildlife.py """Wildlife threat level classification.""" from vigilar.config import WildlifeConfig from vigilar.detection.person import Detection def classify_wildlife_threat( detection: Detection, config: WildlifeConfig, frame_area: int, ) -> tuple[str, str]: """Classify a wildlife detection into threat level and species. Returns (threat_level, species_name). """ species = detection.class_name threat_map = config.threat_map # Direct COCO class mapping first if species in threat_map.predator: return "PREDATOR", species if species in threat_map.nuisance: return "NUISANCE", species if species in threat_map.passive: return "PASSIVE", species # Fallback to size heuristics for unknown species _, _, w, h = detection.bbox bbox_area = w * h area_ratio = bbox_area / frame_area if frame_area > 0 else 0 heuristics = config.size_heuristics if area_ratio < heuristics.small: return "NUISANCE", species elif area_ratio < heuristics.large: return "PREDATOR", species else: return "PASSIVE", species ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_wildlife.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/detection/wildlife.py tests/unit/test_wildlife.py git commit -m "Add wildlife threat classification with size heuristics" ``` --- ### Task 7: Implement pet ID classifier **Files:** - Create: `vigilar/detection/pet_id.py` - Test: `tests/unit/test_pet_id.py` (new) - [ ] **Step 1: Write tests for pet ID classifier** ```python # tests/unit/test_pet_id.py """Tests for pet ID classifier.""" import numpy as np import pytest from vigilar.detection.pet_id import PetIDClassifier, PetIDResult class TestPetIDResult: def test_identified(self): r = PetIDResult(pet_id="pet-1", pet_name="Angel", confidence=0.9) assert r.is_identified assert not r.is_low_confidence def test_low_confidence(self): r = PetIDResult(pet_id="pet-1", pet_name="Angel", confidence=0.6) assert r.is_identified assert r.is_low_confidence def test_unknown(self): r = PetIDResult(pet_id=None, pet_name=None, confidence=0.3) assert not r.is_identified class TestPetIDClassifier: def test_not_loaded_returns_unknown(self): classifier = PetIDClassifier(model_path="nonexistent.pt") assert not classifier.is_loaded crop = np.zeros((224, 224, 3), dtype=np.uint8) result = classifier.identify(crop, species="cat") assert not result.is_identified def test_no_pets_registered_returns_unknown(self): classifier = PetIDClassifier(model_path="nonexistent.pt") assert classifier.pet_count == 0 def test_register_pet(self): classifier = PetIDClassifier(model_path="nonexistent.pt") classifier.register_pet("pet-1", "Angel", "cat") classifier.register_pet("pet-2", "Milo", "dog") assert classifier.pet_count == 2 def test_species_filter(self): classifier = PetIDClassifier(model_path="nonexistent.pt") classifier.register_pet("pet-1", "Angel", "cat") classifier.register_pet("pet-2", "Taquito", "cat") classifier.register_pet("pet-3", "Milo", "dog") assert len(classifier.get_pets_by_species("cat")) == 2 assert len(classifier.get_pets_by_species("dog")) == 1 ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_pet_id.py -v` Expected: FAIL — pet_id module not found - [ ] **Step 3: Implement pet ID classifier** ```python # vigilar/detection/pet_id.py """Pet identification classifier using MobileNetV3-Small.""" import logging from dataclasses import dataclass from pathlib import Path import numpy as np log = logging.getLogger(__name__) # Confidence thresholds (overridden by config at runtime) DEFAULT_HIGH_THRESHOLD = 0.7 DEFAULT_LOW_THRESHOLD = 0.5 @dataclass class PetIDResult: pet_id: str | None pet_name: str | None confidence: float high_threshold: float = DEFAULT_HIGH_THRESHOLD low_threshold: float = DEFAULT_LOW_THRESHOLD @property def is_identified(self) -> bool: return self.pet_id is not None and self.confidence >= self.low_threshold @property def is_low_confidence(self) -> bool: return ( self.pet_id is not None and self.low_threshold <= self.confidence < self.high_threshold ) @dataclass class RegisteredPet: pet_id: str name: str species: str class_index: int # index in the classifier output class PetIDClassifier: def __init__( self, model_path: str, high_threshold: float = DEFAULT_HIGH_THRESHOLD, low_threshold: float = DEFAULT_LOW_THRESHOLD, ): self._model_path = model_path self._high_threshold = high_threshold self._low_threshold = low_threshold self._model = None self._transform = None self.is_loaded = False self._pets: list[RegisteredPet] = [] if Path(model_path).exists(): try: import torch self._model = torch.load(model_path, map_location="cpu", weights_only=False) self._model.eval() self.is_loaded = True log.info("Pet ID model loaded from %s", model_path) except Exception as e: log.error("Failed to load pet ID model: %s", e) else: log.info("Pet ID model not found at %s — identification disabled until trained", model_path) @property def pet_count(self) -> int: return len(self._pets) def register_pet(self, pet_id: str, name: str, species: str) -> None: idx = len(self._pets) self._pets.append(RegisteredPet(pet_id=pet_id, name=name, species=species, class_index=idx)) def get_pets_by_species(self, species: str) -> list[RegisteredPet]: return [p for p in self._pets if p.species == species] def identify(self, crop: np.ndarray, species: str) -> PetIDResult: if not self.is_loaded or self._model is None: return PetIDResult(pet_id=None, pet_name=None, confidence=0.0) candidates = self.get_pets_by_species(species) if not candidates: return PetIDResult(pet_id=None, pet_name=None, confidence=0.0) try: import cv2 import torch from torchvision import transforms # Preprocess crop resized = cv2.resize(crop, (224, 224)) rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) tensor = transform(rgb).unsqueeze(0) with torch.no_grad(): output = self._model(tensor) probs = torch.softmax(output, dim=1)[0] # Find best match among candidates for this species best_conf = 0.0 best_pet = None for pet in candidates: if pet.class_index < len(probs): conf = float(probs[pet.class_index]) if conf > best_conf: best_conf = conf best_pet = pet if best_pet and best_conf >= self._low_threshold: return PetIDResult( pet_id=best_pet.pet_id, pet_name=best_pet.name, confidence=best_conf, high_threshold=self._high_threshold, low_threshold=self._low_threshold, ) return PetIDResult(pet_id=None, pet_name=None, confidence=best_conf) except Exception as e: log.error("Pet ID inference failed: %s", e) return PetIDResult(pet_id=None, pet_name=None, confidence=0.0) ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_pet_id.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/detection/pet_id.py tests/unit/test_pet_id.py git commit -m "Add pet ID classifier with species-filtered identification" ``` --- ### Task 8: Implement pet ID model trainer **Files:** - Create: `vigilar/detection/trainer.py` - Test: `tests/unit/test_trainer.py` (new) - [ ] **Step 1: Write tests for trainer** ```python # tests/unit/test_trainer.py """Tests for pet ID model trainer.""" import os from pathlib import Path import numpy as np import pytest from vigilar.detection.trainer import PetTrainer, TrainingStatus class TestTrainingStatus: def test_initial_status(self): status = TrainingStatus() assert status.is_training is False assert status.progress == 0.0 assert status.error is None class TestPetTrainer: def test_check_readiness_no_pets(self, tmp_path): trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) ready, msg = trainer.check_readiness(min_images=20) assert not ready assert "No pet" in msg def test_check_readiness_insufficient_images(self, tmp_path): # Create pet dirs with too few images pet_dir = tmp_path / "angel" pet_dir.mkdir() for i in range(5): (pet_dir / f"{i}.jpg").write_bytes(b"fake") trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) ready, msg = trainer.check_readiness(min_images=20) assert not ready assert "angel" in msg.lower() def test_check_readiness_sufficient_images(self, tmp_path): for name in ["angel", "taquito"]: pet_dir = tmp_path / name pet_dir.mkdir() for i in range(25): (pet_dir / f"{i}.jpg").write_bytes(b"fake") trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) ready, msg = trainer.check_readiness(min_images=20) assert ready def test_get_class_names(self, tmp_path): for name in ["angel", "milo", "taquito"]: (tmp_path / name).mkdir() trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt")) names = trainer.get_class_names() assert names == ["angel", "milo", "taquito"] # sorted ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_trainer.py -v` Expected: FAIL — trainer module not found - [ ] **Step 3: Implement trainer** ```python # vigilar/detection/trainer.py """Pet ID model trainer using MobileNetV3-Small with transfer learning.""" import logging import shutil from dataclasses import dataclass, field from pathlib import Path log = logging.getLogger(__name__) @dataclass class TrainingStatus: is_training: bool = False progress: float = 0.0 epoch: int = 0 total_epochs: int = 0 accuracy: float = 0.0 error: str | None = None class PetTrainer: def __init__(self, training_dir: str, model_output_path: str): self._training_dir = Path(training_dir) self._model_output_path = Path(model_output_path) self.status = TrainingStatus() def get_class_names(self) -> list[str]: if not self._training_dir.exists(): return [] return sorted([ d.name for d in self._training_dir.iterdir() if d.is_dir() and not d.name.startswith(".") ]) def check_readiness(self, min_images: int = 20) -> tuple[bool, str]: class_names = self.get_class_names() if not class_names: return False, "No pet directories found in training directory." insufficient = [] for name in class_names: pet_dir = self._training_dir / name image_count = sum(1 for f in pet_dir.iterdir() if f.suffix.lower() in (".jpg", ".jpeg", ".png")) if image_count < min_images: insufficient.append(f"{name}: {image_count}/{min_images}") if insufficient: return False, f"Insufficient training images: {', '.join(insufficient)}" return True, f"Ready to train with {len(class_names)} classes." def train(self, epochs: int = 30, batch_size: int = 16) -> bool: try: import torch import torch.nn as nn from torch.utils.data import DataLoader from torchvision import datasets, models, transforms self.status = TrainingStatus(is_training=True, total_epochs=epochs) class_names = self.get_class_names() num_classes = len(class_names) if num_classes < 2: self.status.error = "Need at least 2 pets to train." self.status.is_training = False return False # Data transforms with augmentation train_transform = transforms.Compose([ transforms.Resize((256, 256)), transforms.RandomCrop(224), transforms.RandomHorizontalFlip(), transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), ]) dataset = datasets.ImageFolder(str(self._training_dir), transform=train_transform) loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2) # MobileNetV3-Small with transfer learning model = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT) model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, num_classes) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) # Freeze feature extractor, train classifier head first for param in model.features.parameters(): param.requires_grad = False optimizer = torch.optim.Adam(model.classifier.parameters(), lr=1e-3) criterion = nn.CrossEntropyLoss() # Training loop for epoch in range(epochs): model.train() running_loss = 0.0 correct = 0 total = 0 # Unfreeze features after 5 epochs for fine-tuning if epoch == 5: for param in model.features.parameters(): param.requires_grad = True optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) for inputs, labels in loader: inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() _, predicted = outputs.max(1) total += labels.size(0) correct += predicted.eq(labels).sum().item() accuracy = correct / total if total > 0 else 0 self.status.epoch = epoch + 1 self.status.progress = (epoch + 1) / epochs self.status.accuracy = accuracy log.info("Epoch %d/%d — loss: %.4f, accuracy: %.4f", epoch + 1, epochs, running_loss / len(loader), accuracy) # Backup existing model if self._model_output_path.exists(): backup_path = self._model_output_path.with_suffix(".backup.pt") shutil.copy2(self._model_output_path, backup_path) log.info("Backed up previous model to %s", backup_path) # Save model model = model.to("cpu") torch.save(model, self._model_output_path) log.info("Pet ID model saved to %s (accuracy: %.2f%%)", self._model_output_path, self.status.accuracy * 100) self.status.is_training = False return True except Exception as e: log.exception("Training failed: %s", e) self.status.error = str(e) self.status.is_training = False return False ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_trainer.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/detection/trainer.py tests/unit/test_trainer.py git commit -m "Add pet ID model trainer with MobileNetV3-Small transfer learning" ``` --- ### Task 9: Update alert profiles for pet/wildlife detection types **Files:** - Modify: `vigilar/alerts/profiles.py` - Modify: `tests/unit/test_profiles.py` - [ ] **Step 1: Write tests for new detection types in alert profiles** Add to `tests/unit/test_profiles.py`: ```python class TestPetAlertRouting: def test_known_pet_exterior_gets_push(self): rules = [ AlertProfileRule(detection_type="known_pet", camera_location="EXTERIOR", action="push_and_record", recipients="all"), AlertProfileRule(detection_type="known_pet", camera_location="INTERIOR", action="quiet_log", recipients="none"), ] profile = _make_profile("Away", ["EMPTY"], rules=rules) action, recipients = get_action_for_event(profile, "known_pet", "front_entrance", camera_location="EXTERIOR") assert action == "push_and_record" def test_known_pet_interior_gets_quiet_log(self): rules = [ AlertProfileRule(detection_type="known_pet", camera_location="EXTERIOR", action="push_and_record", recipients="all"), AlertProfileRule(detection_type="known_pet", camera_location="INTERIOR", action="quiet_log", recipients="none"), ] profile = _make_profile("Home", ["ALL_HOME"], rules=rules) action, recipients = get_action_for_event(profile, "known_pet", "kitchen", camera_location="INTERIOR") assert action == "quiet_log" def test_wildlife_predator_gets_urgent(self): rules = [ AlertProfileRule(detection_type="wildlife_predator", action="push_and_record", recipients="all"), ] profile = _make_profile("Always", ["EMPTY", "ALL_HOME"], rules=rules) action, _ = get_action_for_event(profile, "wildlife_predator", "front", camera_location="EXTERIOR") assert action == "push_and_record" def test_camera_location_any_matches_all(self): rules = [ AlertProfileRule(detection_type="wildlife_predator", camera_location="any", action="push_and_record", recipients="all"), ] profile = _make_profile("Always", ["EMPTY"], rules=rules) action, _ = get_action_for_event(profile, "wildlife_predator", "kitchen", camera_location="INTERIOR") assert action == "push_and_record" def test_backward_compatible_without_camera_location(self): """Existing calls without camera_location still work.""" rules = [AlertProfileRule(detection_type="person", action="push_and_record", recipients="all")] profile = _make_profile("Away", ["EMPTY"], rules=rules) action, _ = get_action_for_event(profile, "person", "front_door") assert action == "push_and_record" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_profiles.py::TestPetAlertRouting -v` Expected: FAIL — get_action_for_event doesn't accept camera_location parameter - [ ] **Step 3: Update get_action_for_event to support camera_location** Modify `vigilar/alerts/profiles.py`. Update the `get_action_for_event` function: ```python def get_action_for_event( profile: AlertProfileConfig, detection_type: str, camera_id: str, camera_location: str | None = None, ) -> tuple[str, str]: for rule in profile.rules: if rule.detection_type != detection_type: continue # Check camera_location match if rule.camera_location not in ("any", camera_id): # Also check against the camera's location type if camera_location and rule.camera_location != camera_location: continue elif not camera_location and rule.camera_location not in ("any", camera_id): continue return rule.action, rule.recipients return "quiet_log", "none" ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_profiles.py -v` Expected: PASS (all existing and new tests) - [ ] **Step 5: Commit** ```bash git add vigilar/alerts/profiles.py tests/unit/test_profiles.py git commit -m "Add camera_location filtering to alert profile matching" ``` --- ### Task 10: Update event processor for pet/wildlife events **Files:** - Modify: `vigilar/events/processor.py` - Modify: `tests/unit/test_events.py` - [ ] **Step 1: Write tests for pet/wildlife event classification** Add to `tests/unit/test_events.py`: ```python class TestPetEventClassification: def test_pet_detected_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/kitchen/pet/detected", {"pet_name": "Angel", "confidence": 0.92}, ) assert etype == EventType.PET_DETECTED assert sev == Severity.INFO assert source == "kitchen" def test_wildlife_predator_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/front/wildlife/detected", {"species": "bear", "threat_level": "PREDATOR"}, ) assert etype == EventType.WILDLIFE_PREDATOR assert sev == Severity.CRITICAL assert source == "front" def test_wildlife_nuisance_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/back/wildlife/detected", {"species": "raccoon", "threat_level": "NUISANCE"}, ) assert etype == EventType.WILDLIFE_NUISANCE assert sev == Severity.WARNING assert source == "back" def test_wildlife_passive_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/front/wildlife/detected", {"species": "deer", "threat_level": "PASSIVE"}, ) assert etype == EventType.WILDLIFE_PASSIVE assert sev == Severity.INFO assert source == "front" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_events.py::TestPetEventClassification -v` Expected: FAIL — pet/wildlife topics not handled in _classify_event - [ ] **Step 3: Update event processor** In `vigilar/events/processor.py`, update `_classify_event` to handle the new topics. Add these cases in the camera section (after the existing `_TOPIC_EVENT_MAP` check, around line 130): ```python # Pet detection if suffix == "pet/detected": return EventType.PET_DETECTED, Severity.INFO, camera_id # Wildlife detection — severity depends on threat_level in payload if suffix == "wildlife/detected": threat = payload.get("threat_level", "PASSIVE") if threat == "PREDATOR": return EventType.WILDLIFE_PREDATOR, Severity.CRITICAL, camera_id elif threat == "NUISANCE": return EventType.WILDLIFE_NUISANCE, Severity.WARNING, camera_id else: return EventType.WILDLIFE_PASSIVE, Severity.INFO, camera_id ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_events.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/events/processor.py tests/unit/test_events.py git commit -m "Handle pet and wildlife events in event processor" ``` --- ### Task 11: Integrate detection into camera worker **Files:** - Modify: `vigilar/camera/worker.py` This task integrates the YOLOv8 detector and pet ID classifier into the camera frame loop. This is the core wiring task — no new tests because it requires live camera/MQTT integration testing, but the underlying components (Tasks 5-7) are individually tested. - [ ] **Step 1: Add imports to worker.py** Add at the top of `vigilar/camera/worker.py` after existing imports: ```python from vigilar.detection.yolo import YOLODetector from vigilar.detection.pet_id import PetIDClassifier from vigilar.detection.wildlife import classify_wildlife_threat ``` - [ ] **Step 2: Add detector initialization in run_camera_worker** Add after the existing component setup (around line 90, after HLS and recorder init), before the main loop: ```python # Object detection (YOLOv8 unified detector) yolo_detector = None pet_classifier = None if hasattr(camera_cfg, '_pets_config') and camera_cfg._pets_config.enabled: pets_cfg = camera_cfg._pets_config yolo_detector = YOLODetector( model_path=pets_cfg.model_path, confidence_threshold=pets_cfg.confidence_threshold, ) if pets_cfg.pet_id_enabled: pet_classifier = PetIDClassifier( model_path=pets_cfg.pet_id_model_path, high_threshold=pets_cfg.pet_id_threshold, low_threshold=pets_cfg.pet_id_low_confidence, ) ``` Note: The `_pets_config` attribute will be set by the camera manager when spawning workers. This avoids changing the CameraConfig model's serialization. - [ ] **Step 3: Add detection processing after motion detection** Add after the motion START/END block (after line 244, before the heartbeat), inside the main loop: ```python # Run object detection on motion frames if state.motion_active and yolo_detector and yolo_detector.is_loaded: detections = yolo_detector.detect(frame) for det in detections: category = YOLODetector.classify(det) if category == "person": bus.publish_event( Topics.camera_motion_start(camera_id), # reuse existing topic detection="person", confidence=det.confidence, ) elif category == "domestic_animal": # Crop for pet ID x, y, w, h = det.bbox crop = frame[max(0, y):y + h, max(0, x):x + w] pet_result = None if pet_classifier and pet_classifier.is_loaded and crop.size > 0: pet_result = pet_classifier.identify(crop, species=det.class_name) payload = { "species": det.class_name, "confidence": round(det.confidence, 3), "camera_location": camera_cfg.location, } if pet_result and pet_result.is_identified: payload["pet_id"] = pet_result.pet_id payload["pet_name"] = pet_result.pet_name payload["pet_confidence"] = round(pet_result.confidence, 3) # Publish pet location update bus.publish_event( Topics.pet_location(pet_result.pet_name.lower()), camera_id=camera_id, camera_location=camera_cfg.location, ) bus.publish_event(Topics.camera_pet_detected(camera_id), **payload) elif category == "wildlife": wildlife_cfg = camera_cfg._pets_config.wildlife if hasattr(camera_cfg, '_pets_config') else None if wildlife_cfg: frame_area = frame.shape[0] * frame.shape[1] threat_level, species = classify_wildlife_threat( det, wildlife_cfg, frame_area, ) bus.publish_event( Topics.camera_wildlife_detected(camera_id), species=species, threat_level=threat_level, confidence=round(det.confidence, 3), camera_location=camera_cfg.location, ) ``` - [ ] **Step 4: Update run_camera_worker signature to accept pets config** Update the function signature to pass pets config through: ```python def run_camera_worker( camera_cfg: CameraConfig, mqtt_cfg: MQTTConfig, recordings_dir: str, hls_dir: str, remote_cfg: RemoteConfig | None = None, pets_cfg: "PetsConfig | None" = None, ) -> None: ``` And replace the `hasattr` check with: ```python # Object detection (YOLOv8 unified detector) yolo_detector = None pet_classifier = None if pets_cfg and pets_cfg.enabled: yolo_detector = YOLODetector( model_path=pets_cfg.model_path, confidence_threshold=pets_cfg.confidence_threshold, ) if pets_cfg.pet_id_enabled: pet_classifier = PetIDClassifier( model_path=pets_cfg.pet_id_model_path, high_threshold=pets_cfg.pet_id_threshold, low_threshold=pets_cfg.pet_id_low_confidence, ) ``` - [ ] **Step 5: Commit** ```bash git add vigilar/camera/worker.py git commit -m "Integrate YOLOv8 detection and pet ID into camera worker" ``` --- ### Task 12: Add crop saving and staging cleanup **Files:** - Create: `vigilar/detection/crop_manager.py` - Test: `tests/unit/test_crop_manager.py` (new) - [ ] **Step 1: Write tests for crop manager** ```python # tests/unit/test_crop_manager.py """Tests for detection crop saving and staging cleanup.""" import time from pathlib import Path import numpy as np from vigilar.detection.crop_manager import CropManager class TestCropManager: def test_save_crop(self, tmp_path): manager = CropManager(staging_dir=str(tmp_path / "staging"), training_dir=str(tmp_path / "training")) crop = np.zeros((100, 80, 3), dtype=np.uint8) path = manager.save_staging_crop(crop, species="cat", camera_id="kitchen") assert Path(path).exists() assert "cat" in path assert "kitchen" in path def test_promote_to_training(self, tmp_path): manager = CropManager(staging_dir=str(tmp_path / "staging"), training_dir=str(tmp_path / "training")) crop = np.zeros((100, 80, 3), dtype=np.uint8) staging_path = manager.save_staging_crop(crop, species="cat", camera_id="kitchen") training_path = manager.promote_to_training(staging_path, pet_name="angel") assert Path(training_path).exists() assert "angel" in training_path assert not Path(staging_path).exists() def test_cleanup_old_crops(self, tmp_path): staging = tmp_path / "staging" staging.mkdir(parents=True) # Create an old file old_file = staging / "old_crop.jpg" old_file.write_bytes(b"fake") # Set mtime to 10 days ago old_time = time.time() - 10 * 86400 import os os.utime(old_file, (old_time, old_time)) # Create a recent file new_file = staging / "new_crop.jpg" new_file.write_bytes(b"fake") manager = CropManager(staging_dir=str(staging), training_dir=str(tmp_path / "training")) deleted = manager.cleanup_expired(retention_days=7) assert deleted == 1 assert not old_file.exists() assert new_file.exists() ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_crop_manager.py -v` Expected: FAIL — module not found - [ ] **Step 3: Implement crop manager** ```python # vigilar/detection/crop_manager.py """Manage detection crop images for training and staging.""" import logging import os import shutil import time from pathlib import Path import cv2 import numpy as np log = logging.getLogger(__name__) class CropManager: def __init__(self, staging_dir: str, training_dir: str): self._staging_dir = Path(staging_dir) self._training_dir = Path(training_dir) def save_staging_crop(self, crop: np.ndarray, species: str, camera_id: str) -> str: self._staging_dir.mkdir(parents=True, exist_ok=True) timestamp = int(time.time() * 1000) filename = f"{species}_{camera_id}_{timestamp}.jpg" filepath = self._staging_dir / filename cv2.imwrite(str(filepath), crop) return str(filepath) def promote_to_training(self, staging_path: str, pet_name: str) -> str: pet_dir = self._training_dir / pet_name.lower() pet_dir.mkdir(parents=True, exist_ok=True) src = Path(staging_path) dst = pet_dir / src.name shutil.move(str(src), str(dst)) return str(dst) def cleanup_expired(self, retention_days: int = 7) -> int: if not self._staging_dir.exists(): return 0 cutoff = time.time() - retention_days * 86400 deleted = 0 for filepath in self._staging_dir.iterdir(): if filepath.is_file() and filepath.stat().st_mtime < cutoff: filepath.unlink() deleted += 1 if deleted: log.info("Cleaned up %d expired staging crops", deleted) return deleted ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_crop_manager.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/detection/crop_manager.py tests/unit/test_crop_manager.py git commit -m "Add crop manager for staging and training image lifecycle" ``` --- ### Task 13: Update daily digest with pet/wildlife summary **Files:** - Modify: `vigilar/health/digest.py` - Modify: `tests/unit/test_health.py` - [ ] **Step 1: Write tests for pet digest** Add to `tests/unit/test_health.py`: ```python from vigilar.health.digest import build_digest, format_digest from vigilar.storage.schema import pet_sightings, wildlife_sightings class TestPetDigest: def test_digest_includes_pet_sightings(self, test_db, tmp_data_dir): import time with test_db.begin() as conn: conn.execute(pet_sightings.insert().values( ts=time.time(), pet_id="p1", species="cat", camera_id="kitchen", confidence=0.9, labeled=1, )) conn.execute(pet_sightings.insert().values( ts=time.time(), pet_id="p2", species="dog", camera_id="kitchen", confidence=0.85, labeled=1, )) data = build_digest(test_db, str(tmp_data_dir), since_hours=1) assert data["pet_sightings"] == 2 def test_digest_includes_wildlife(self, test_db, tmp_data_dir): import time with test_db.begin() as conn: conn.execute(wildlife_sightings.insert().values( ts=time.time(), species="bear", threat_level="PREDATOR", camera_id="front", confidence=0.9, )) conn.execute(wildlife_sightings.insert().values( ts=time.time(), species="deer", threat_level="PASSIVE", camera_id="back", confidence=0.8, )) data = build_digest(test_db, str(tmp_data_dir), since_hours=1) assert data["wildlife_predators"] == 1 assert data["wildlife_passive"] == 1 def test_format_includes_pets(self): data = { "person_detections": 2, "unknown_vehicles": 0, "recordings": 5, "disk_used_gb": 100.0, "disk_used_pct": 50, "since_hours": 12, "pet_sightings": 15, "wildlife_predators": 0, "wildlife_nuisance": 1, "wildlife_passive": 3, } text = format_digest(data) assert "15 pet" in text assert "1 nuisance" in text.lower() or "wildlife" in text.lower() ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_health.py::TestPetDigest -v` Expected: FAIL — build_digest doesn't query pet tables - [ ] **Step 3: Update digest.py** Update imports in `vigilar/health/digest.py`: ```python from vigilar.storage.schema import events, recordings, pet_sightings, wildlife_sightings ``` Add pet/wildlife queries to `build_digest` (inside the `with engine.connect()` block, after the recording_count query): ```python pet_count = conn.execute( select(func.count()).select_from(pet_sightings) .where(pet_sightings.c.ts >= since_ts / 1000) ).scalar() or 0 wildlife_predator_count = conn.execute( select(func.count()).select_from(wildlife_sightings) .where(wildlife_sightings.c.ts >= since_ts / 1000, wildlife_sightings.c.threat_level == "PREDATOR") ).scalar() or 0 wildlife_nuisance_count = conn.execute( select(func.count()).select_from(wildlife_sightings) .where(wildlife_sightings.c.ts >= since_ts / 1000, wildlife_sightings.c.threat_level == "NUISANCE") ).scalar() or 0 wildlife_passive_count = conn.execute( select(func.count()).select_from(wildlife_sightings) .where(wildlife_sightings.c.ts >= since_ts / 1000, wildlife_sightings.c.threat_level == "PASSIVE") ).scalar() or 0 ``` Add to the return dict: ```python "pet_sightings": pet_count, "wildlife_predators": wildlife_predator_count, "wildlife_nuisance": wildlife_nuisance_count, "wildlife_passive": wildlife_passive_count, ``` Update `format_digest`: ```python def format_digest(data: dict) -> str: lines = [ f"Vigilar Daily Summary", f"Last {data['since_hours']}h: " f"{data['person_detections']} person detections, " f"{data['unknown_vehicles']} unknown vehicles, " f"{data['recordings']} recordings", ] if data.get("pet_sightings", 0) > 0: lines.append(f"Pets: {data['pet_sightings']} pet sightings") wildlife_parts = [] if data.get("wildlife_predators", 0) > 0: wildlife_parts.append(f"{data['wildlife_predators']} predator") if data.get("wildlife_nuisance", 0) > 0: wildlife_parts.append(f"{data['wildlife_nuisance']} nuisance") if data.get("wildlife_passive", 0) > 0: wildlife_parts.append(f"{data['wildlife_passive']} passive") if wildlife_parts: lines.append(f"Wildlife: {', '.join(wildlife_parts)}") lines.append(f"Storage: {data['disk_used_gb']} GB ({data['disk_used_pct']:.0f}%)") return "\n".join(lines) ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest tests/unit/test_health.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add vigilar/health/digest.py tests/unit/test_health.py git commit -m "Add pet and wildlife counts to daily digest" ``` --- ### Task 14: Add pets web blueprint — API endpoints **Files:** - Create: `vigilar/web/blueprints/pets.py` - Test: `tests/unit/test_pets_api.py` (new) - [ ] **Step 1: Write tests for pets API** ```python # tests/unit/test_pets_api.py """Tests for pets web blueprint API endpoints.""" import json import pytest from vigilar.web.app import create_app @pytest.fixture def client(test_db, tmp_path, sample_config): app = create_app(sample_config, db_engine=test_db) app.config["TESTING"] = True with app.test_client() as client: yield client class TestPetsAPI: def test_register_pet(self, client): resp = client.post("/pets/register", json={ "name": "Angel", "species": "cat", "breed": "DSH", "color_description": "black", }) assert resp.status_code == 200 data = resp.get_json() assert data["name"] == "Angel" assert "id" in data def test_get_pet_status(self, client): client.post("/pets/register", json={"name": "Angel", "species": "cat"}) resp = client.get("/pets/api/status") assert resp.status_code == 200 data = resp.get_json() assert len(data["pets"]) == 1 assert data["pets"][0]["name"] == "Angel" def test_get_sightings_empty(self, client): resp = client.get("/pets/api/sightings") assert resp.status_code == 200 data = resp.get_json() assert data["sightings"] == [] def test_get_wildlife_empty(self, client): resp = client.get("/pets/api/wildlife") assert resp.status_code == 200 data = resp.get_json() assert data["sightings"] == [] def test_get_unlabeled_empty(self, client): resp = client.get("/pets/api/unlabeled") assert resp.status_code == 200 data = resp.get_json() assert data["crops"] == [] def test_label_sighting(self, client): # Register a pet resp = client.post("/pets/register", json={"name": "Angel", "species": "cat"}) pet_id = resp.get_json()["id"] # Insert a sighting directly (simulating detection) from vigilar.storage.queries import insert_pet_sighting from flask import current_app # Use the test_db through the app with client.application.app_context(): engine = client.application.extensions.get("db_engine") if engine: sighting_id = insert_pet_sighting(engine, species="cat", camera_id="kitchen", confidence=0.6) resp = client.post(f"/pets/{pet_id}/label", json={"sighting_id": sighting_id}) assert resp.status_code == 200 ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest tests/unit/test_pets_api.py -v` Expected: FAIL — pets blueprint not registered - [ ] **Step 3: Implement pets blueprint** ```python # vigilar/web/blueprints/pets.py """Pets blueprint — pet management, training, sightings, and dashboard.""" import logging from flask import Blueprint, jsonify, render_template, request from vigilar.storage.queries import ( get_all_pets, get_pet, get_pet_last_location, get_pet_sightings, get_training_images, get_unlabeled_sightings, get_wildlife_sightings, insert_pet, insert_training_image, label_sighting, ) log = logging.getLogger(__name__) pets_bp = Blueprint("pets", __name__, url_prefix="/pets") def _get_engine(): from flask import current_app return current_app.extensions["db_engine"] @pets_bp.route("/") def pets_dashboard(): engine = _get_engine() all_pets = get_all_pets(engine) return render_template("pets/dashboard.html", pets=all_pets) @pets_bp.route("/register", methods=["POST"]) def register_pet(): data = request.get_json() engine = _get_engine() pet_id = insert_pet( engine, name=data["name"], species=data["species"], breed=data.get("breed"), color_description=data.get("color_description"), ) pet = get_pet(engine, pet_id) return jsonify(pet) @pets_bp.route("/api/status") def pet_status(): engine = _get_engine() all_pets = get_all_pets(engine) result = [] for pet in all_pets: loc = get_pet_last_location(engine, pet["id"]) result.append({ **pet, "last_camera": loc["camera_id"] if loc else None, "last_seen": loc["ts"] if loc else None, }) return jsonify({"pets": result}) @pets_bp.route("/api/sightings") def sightings(): engine = _get_engine() pet_id = request.args.get("pet_id") camera_id = request.args.get("camera_id") limit = int(request.args.get("limit", 100)) rows = get_pet_sightings(engine, pet_id=pet_id, camera_id=camera_id, limit=limit) return jsonify({"sightings": rows}) @pets_bp.route("/api/wildlife") def wildlife(): engine = _get_engine() threat_level = request.args.get("threat_level") limit = int(request.args.get("limit", 100)) rows = get_wildlife_sightings(engine, threat_level=threat_level, limit=limit) return jsonify({"sightings": rows}) @pets_bp.route("/api/unlabeled") def unlabeled(): engine = _get_engine() species = request.args.get("species") limit = int(request.args.get("limit", 50)) rows = get_unlabeled_sightings(engine, species=species, limit=limit) return jsonify({"crops": rows}) @pets_bp.route("//label", methods=["POST"]) def label_pet(pet_id): data = request.get_json() engine = _get_engine() sighting_id = data["sighting_id"] label_sighting(engine, sighting_id, pet_id) return jsonify({"status": "labeled"}) @pets_bp.route("//upload", methods=["POST"]) def upload_training(pet_id): engine = _get_engine() pet = get_pet(engine, pet_id) if not pet: return jsonify({"error": "Pet not found"}), 404 files = request.files.getlist("images") saved = [] for f in files: if f.filename: from werkzeug.utils import secure_filename from flask import current_app import os pets_cfg = current_app.config.get("PETS_CONFIG") training_dir = pets_cfg.training_dir if pets_cfg else "/var/vigilar/pets/training" pet_dir = os.path.join(training_dir, pet["name"].lower()) os.makedirs(pet_dir, exist_ok=True) filepath = os.path.join(pet_dir, secure_filename(f.filename)) f.save(filepath) insert_training_image(engine, pet_id=pet_id, image_path=filepath, source="upload") saved.append(filepath) return jsonify({"uploaded": len(saved)}) @pets_bp.route("/train", methods=["POST"]) def train_model(): from flask import current_app pets_cfg = current_app.config.get("PETS_CONFIG") if not pets_cfg: return jsonify({"error": "Pet detection not configured"}), 400 from vigilar.detection.trainer import PetTrainer trainer = PetTrainer( training_dir=pets_cfg.training_dir, model_output_path=pets_cfg.pet_id_model_path, ) ready, msg = trainer.check_readiness(min_images=pets_cfg.min_training_images) if not ready: return jsonify({"error": msg}), 400 # Run training in background thread import threading thread = threading.Thread(target=trainer.train, daemon=True) thread.start() return jsonify({"status": "training_started", "message": msg}) @pets_bp.route("/api/training-status") def training_status(): # Training status is ephemeral — stored in the trainer instance return jsonify({"status": "idle", "message": "No training in progress"}) @pets_bp.route("/api/highlights") def highlights(): # Highlight reel — computed from sightings + recordings engine = _get_engine() import time since = time.time() - 86400 # last 24h sightings = get_pet_sightings(engine, since_ts=since, limit=50) highlights = [] for s in sightings: if s.get("confidence", 0) > 0.8: highlights.append({ "type": "pet_sighting", "pet_id": s.get("pet_id"), "camera_id": s["camera_id"], "timestamp": s["ts"], "confidence": s.get("confidence"), }) return jsonify({"highlights": highlights[:20]}) @pets_bp.route("//update", methods=["POST"]) def update_pet(pet_id): engine = _get_engine() pet = get_pet(engine, pet_id) if not pet: return jsonify({"error": "Pet not found"}), 404 data = request.get_json() from vigilar.storage.schema import pets as pets_table with engine.begin() as conn: updates = {} for field in ("name", "breed", "color_description"): if field in data: updates[field] = data[field] if updates: conn.execute( pets_table.update().where(pets_table.c.id == pet_id).values(**updates) ) return jsonify(get_pet(engine, pet_id)) @pets_bp.route("//delete", methods=["DELETE"]) def delete_pet(pet_id): engine = _get_engine() from vigilar.storage.schema import pets as pets_table with engine.begin() as conn: conn.execute(pets_table.delete().where(pets_table.c.id == pet_id)) return jsonify({"status": "deleted"}) ``` - [ ] **Step 4: Register the blueprint in app.py** Read `vigilar/web/app.py` and add the pets blueprint registration alongside the existing blueprints: ```python from vigilar.web.blueprints.pets import pets_bp app.register_blueprint(pets_bp) ``` Also store the db engine in app extensions for the blueprint to access: ```python app.extensions["db_engine"] = db_engine ``` - [ ] **Step 5: Run tests to verify they pass** Run: `pytest tests/unit/test_pets_api.py -v` Expected: PASS - [ ] **Step 6: Commit** ```bash git add vigilar/web/blueprints/pets.py vigilar/web/app.py tests/unit/test_pets_api.py git commit -m "Add pets web blueprint with API endpoints" ``` --- ### Task 15: Add pets dashboard template **Files:** - Create: `vigilar/web/templates/pets/dashboard.html` This is a Flask/Jinja2 template using Bootstrap 5 dark theme, matching the existing UI patterns. - [ ] **Step 1: Check existing template patterns** Read `vigilar/web/templates/` to understand the base template structure, navbar, and layout conventions used by other pages. - [ ] **Step 2: Create the pets dashboard template** Create `vigilar/web/templates/pets/dashboard.html` following the existing template patterns. Include: - Per-pet status cards (name, species, location, last seen, status indicator) - Wildlife summary bar - Activity timeline (24-hour bars per pet) - Unlabeled detection queue (thumbnail grid) - Pet management section (register, upload photos, train model) - Highlight reel section Use Bootstrap 5 dark theme classes, match existing card/grid patterns from other templates. Use `fetch()` to load data from the `/pets/api/*` endpoints. - [ ] **Step 3: Verify template renders** Run the Flask dev server and check the `/pets/` route loads without errors. Verify the dark theme matches the rest of the UI. - [ ] **Step 4: Commit** ```bash git add vigilar/web/templates/pets/dashboard.html git commit -m "Add pets dashboard template with Bootstrap 5 dark theme" ``` --- ### Task 16: Add pet labeling UI to recordings playback **Files:** - Modify: `vigilar/web/templates/recordings/` (playback template) - Modify: `vigilar/web/static/js/` (add pet labeling JS) - [ ] **Step 1: Add labeling overlay to recording playback** Add a JavaScript module that: - Renders bounding box overlays on the video player when detection data is available - Shows a "Who is this?" popup when clicking an animal bounding box - Fetches registered pets filtered by species from `/pets/api/status` - Calls `/pets//label` when user selects a pet - Shows "Unknown" and "+ New Pet" options - [ ] **Step 2: Add CSS for bounding box overlays** Add styles to the existing CSS for: - `.detection-overlay` — positioned over video, pointer-events transparent - `.detection-bbox` — colored border with species/confidence label - `.label-popup` — dark-themed dropdown for pet selection - [ ] **Step 3: Test the labeling flow manually** Verify that clicking a bounding box shows the popup, selecting a pet calls the API, and the popup dismisses. - [ ] **Step 4: Commit** ```bash git add vigilar/web/templates/ vigilar/web/static/ git commit -m "Add pet labeling UI overlay to recording playback" ``` --- ### Task 17: Add dependencies to pyproject.toml **Files:** - Modify: `pyproject.toml` - [ ] **Step 1: Add ultralytics and torchvision** Add to the dependencies section: ```toml ultralytics = ">=8.2.0" torchvision = ">=0.18.0" ``` - [ ] **Step 2: Install and verify** Run: `pip install -e ".[dev]"` Expected: Installs successfully with new dependencies - [ ] **Step 3: Commit** ```bash git add pyproject.toml git commit -m "Add ultralytics and torchvision dependencies for pet detection" ``` --- ### Task 18: Run full test suite and fix any issues **Files:** All modified files - [ ] **Step 1: Run complete test suite** Run: `pytest tests/ -v --tb=short` Expected: All tests pass - [ ] **Step 2: Run ruff linter** Run: `ruff check vigilar/` Expected: No errors (fix any that appear) - [ ] **Step 3: Run type checker** Run: `mypy vigilar/ --ignore-missing-imports` Expected: No new errors - [ ] **Step 4: Fix any issues found** Address any test failures, lint errors, or type errors. Commit fixes individually. - [ ] **Step 5: Final commit** ```bash git commit -m "Fix lint and type issues from pet detection integration" ``` --- ## File Structure Summary ### New Files | File | Purpose | |------|---------| | `vigilar/detection/yolo.py` | YOLOv8 unified detector (replaces MobileNet) | | `vigilar/detection/wildlife.py` | Wildlife threat classification | | `vigilar/detection/pet_id.py` | Pet identification classifier | | `vigilar/detection/trainer.py` | Pet ID model trainer (MobileNetV3-Small) | | `vigilar/detection/crop_manager.py` | Staging/training crop lifecycle | | `vigilar/web/blueprints/pets.py` | Pets web blueprint (API + views) | | `vigilar/web/templates/pets/dashboard.html` | Pet dashboard UI | | `tests/unit/test_constants.py` | Tests for new enums | | `tests/unit/test_yolo_detector.py` | Tests for YOLO detector | | `tests/unit/test_wildlife.py` | Tests for wildlife classification | | `tests/unit/test_pet_id.py` | Tests for pet ID classifier | | `tests/unit/test_trainer.py` | Tests for model trainer | | `tests/unit/test_crop_manager.py` | Tests for crop manager | | `tests/unit/test_pet_queries.py` | Tests for DB query functions | | `tests/unit/test_pets_api.py` | Tests for pets API endpoints | ### Modified Files | File | Changes | |------|---------| | `vigilar/constants.py` | Add ThreatLevel, CameraLocation enums; 6 EventType values; pet MQTT topics | | `vigilar/config.py` | Add PetsConfig, WildlifeConfig, PetActivityConfig; location field on CameraConfig | | `vigilar/storage/schema.py` | Add pets, pet_sightings, wildlife_sightings, pet_training_images tables | | `vigilar/storage/queries.py` | Add pet/wildlife/training query functions | | `vigilar/camera/worker.py` | Integrate YOLO detection + pet ID into frame loop | | `vigilar/alerts/profiles.py` | Add camera_location filtering to get_action_for_event | | `vigilar/events/processor.py` | Handle pet/wildlife event classification | | `vigilar/health/digest.py` | Add pet/wildlife counts to daily digest | | `vigilar/web/app.py` | Register pets blueprint | | `pyproject.toml` | Add ultralytics, torchvision deps | | `tests/unit/test_config.py` | Tests for new config models | | `tests/unit/test_profiles.py` | Tests for pet alert routing | | `tests/unit/test_events.py` | Tests for pet event classification | | `tests/unit/test_health.py` | Tests for pet digest |