18 tasks covering: YOLOv8 detector, pet ID classifier, wildlife threat classification, crop management, alert integration, web UI, and training. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2675 lines
88 KiB
Markdown
2675 lines
88 KiB
Markdown
# Pet-Aware Security Features Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Add pet detection, pet identification, wildlife monitoring, and pet activity tracking to Vigilar using YOLOv8 unified detection and a trainable pet ID classifier.
|
|
|
|
**Architecture:** Replace MobileNet-SSD with YOLOv8-small for all object detection (people, vehicles, animals) in a single inference pass. Add a second-stage MobileNetV3-Small classifier for identifying specific pets from cropped bounding boxes. Wildlife gets threat-tiered alerting. Pet dashboard with activity tracking and highlight reels.
|
|
|
|
**Tech Stack:** ultralytics (YOLOv8), torchvision (MobileNetV3-Small), existing Flask/Bootstrap/SQLAlchemy stack.
|
|
|
|
**Spec:** `docs/superpowers/specs/2026-04-03-pet-aware-security-design.md`
|
|
|
|
---
|
|
|
|
### Task 1: Add new constants, enums, and MQTT topics
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/constants.py`
|
|
- Test: `tests/unit/test_constants.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for new enums and topics**
|
|
|
|
```python
|
|
# tests/unit/test_constants.py
|
|
"""Tests for new pet/wildlife constants."""
|
|
|
|
from vigilar.constants import (
|
|
CameraLocation,
|
|
EventType,
|
|
ThreatLevel,
|
|
Topics,
|
|
)
|
|
|
|
|
|
class TestThreatLevel:
|
|
def test_values(self):
|
|
assert ThreatLevel.PREDATOR == "PREDATOR"
|
|
assert ThreatLevel.NUISANCE == "NUISANCE"
|
|
assert ThreatLevel.PASSIVE == "PASSIVE"
|
|
|
|
def test_is_strenum(self):
|
|
assert isinstance(ThreatLevel.PREDATOR, str)
|
|
|
|
|
|
class TestCameraLocation:
|
|
def test_values(self):
|
|
assert CameraLocation.EXTERIOR == "EXTERIOR"
|
|
assert CameraLocation.INTERIOR == "INTERIOR"
|
|
assert CameraLocation.TRANSITION == "TRANSITION"
|
|
|
|
def test_is_strenum(self):
|
|
assert isinstance(CameraLocation.EXTERIOR, str)
|
|
|
|
|
|
class TestNewEventTypes:
|
|
def test_pet_events_exist(self):
|
|
assert EventType.PET_DETECTED == "PET_DETECTED"
|
|
assert EventType.PET_ESCAPE == "PET_ESCAPE"
|
|
assert EventType.UNKNOWN_ANIMAL == "UNKNOWN_ANIMAL"
|
|
|
|
def test_wildlife_events_exist(self):
|
|
assert EventType.WILDLIFE_PREDATOR == "WILDLIFE_PREDATOR"
|
|
assert EventType.WILDLIFE_NUISANCE == "WILDLIFE_NUISANCE"
|
|
assert EventType.WILDLIFE_PASSIVE == "WILDLIFE_PASSIVE"
|
|
|
|
|
|
class TestPetTopics:
|
|
def test_pet_detected_topic(self):
|
|
assert Topics.camera_pet_detected("front") == "vigilar/camera/front/pet/detected"
|
|
|
|
def test_wildlife_detected_topic(self):
|
|
assert Topics.camera_wildlife_detected("front") == "vigilar/camera/front/wildlife/detected"
|
|
|
|
def test_pet_location_topic(self):
|
|
assert Topics.pet_location("angel") == "vigilar/pets/angel/location"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_constants.py -v`
|
|
Expected: FAIL — ThreatLevel, CameraLocation not defined, new EventType values missing
|
|
|
|
- [ ] **Step 3: Add new enums and event types to constants.py**
|
|
|
|
Add after the `HouseholdState` enum (after line 103):
|
|
|
|
```python
|
|
# --- Threat Levels (Wildlife) ---
|
|
|
|
class ThreatLevel(StrEnum):
|
|
PREDATOR = "PREDATOR"
|
|
NUISANCE = "NUISANCE"
|
|
PASSIVE = "PASSIVE"
|
|
|
|
|
|
# --- Camera Location ---
|
|
|
|
class CameraLocation(StrEnum):
|
|
EXTERIOR = "EXTERIOR"
|
|
INTERIOR = "INTERIOR"
|
|
TRANSITION = "TRANSITION"
|
|
```
|
|
|
|
Add to the `EventType` enum (after line 42, before the closing of the class):
|
|
|
|
```python
|
|
PET_DETECTED = "PET_DETECTED"
|
|
PET_ESCAPE = "PET_ESCAPE"
|
|
UNKNOWN_ANIMAL = "UNKNOWN_ANIMAL"
|
|
WILDLIFE_PREDATOR = "WILDLIFE_PREDATOR"
|
|
WILDLIFE_NUISANCE = "WILDLIFE_NUISANCE"
|
|
WILDLIFE_PASSIVE = "WILDLIFE_PASSIVE"
|
|
```
|
|
|
|
Add to `RecordingTrigger` enum (after line 70):
|
|
|
|
```python
|
|
PET = "PET"
|
|
WILDLIFE = "WILDLIFE"
|
|
```
|
|
|
|
Add new topic methods to the `Topics` class (after the presence methods, before the system constants):
|
|
|
|
```python
|
|
# Pet
|
|
@staticmethod
|
|
def camera_pet_detected(camera_id: str) -> str:
|
|
return f"vigilar/camera/{camera_id}/pet/detected"
|
|
|
|
@staticmethod
|
|
def camera_wildlife_detected(camera_id: str) -> str:
|
|
return f"vigilar/camera/{camera_id}/wildlife/detected"
|
|
|
|
@staticmethod
|
|
def pet_location(pet_name: str) -> str:
|
|
return f"vigilar/pets/{pet_name}/location"
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_constants.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Run full test suite to check for regressions**
|
|
|
|
Run: `pytest tests/ -v`
|
|
Expected: All existing tests still pass
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add vigilar/constants.py tests/unit/test_constants.py
|
|
git commit -m "Add pet/wildlife enums, event types, and MQTT topics"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: Add pet and wildlife config models
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/config.py`
|
|
- Modify: `tests/unit/test_config.py`
|
|
|
|
- [ ] **Step 1: Write tests for new config models**
|
|
|
|
Add to `tests/unit/test_config.py`:
|
|
|
|
```python
|
|
from vigilar.config import PetsConfig, WildlifeThreatMap, WildlifeSizeHeuristics, PetActivityConfig
|
|
|
|
|
|
class TestPetsConfig:
|
|
def test_defaults(self):
|
|
cfg = PetsConfig()
|
|
assert cfg.enabled is False
|
|
assert cfg.model == "yolov8s"
|
|
assert cfg.confidence_threshold == 0.5
|
|
assert cfg.pet_id_threshold == 0.7
|
|
assert cfg.pet_id_low_confidence == 0.5
|
|
assert cfg.min_training_images == 20
|
|
assert cfg.crop_retention_days == 7
|
|
|
|
def test_custom_values(self):
|
|
cfg = PetsConfig(enabled=True, model="yolov8m", confidence_threshold=0.6)
|
|
assert cfg.enabled is True
|
|
assert cfg.model == "yolov8m"
|
|
assert cfg.confidence_threshold == 0.6
|
|
|
|
|
|
class TestWildlifeThreatMap:
|
|
def test_defaults(self):
|
|
tm = WildlifeThreatMap()
|
|
assert "bear" in tm.predator
|
|
assert "bird" in tm.passive
|
|
|
|
def test_custom_mapping(self):
|
|
tm = WildlifeThreatMap(predator=["bear", "wolf"], nuisance=["raccoon"])
|
|
assert "wolf" in tm.predator
|
|
assert "raccoon" in tm.nuisance
|
|
|
|
|
|
class TestWildlifeSizeHeuristics:
|
|
def test_defaults(self):
|
|
sh = WildlifeSizeHeuristics()
|
|
assert sh.small == 0.02
|
|
assert sh.medium == 0.08
|
|
assert sh.large == 0.15
|
|
|
|
|
|
class TestPetActivityConfig:
|
|
def test_defaults(self):
|
|
cfg = PetActivityConfig()
|
|
assert cfg.daily_digest is True
|
|
assert cfg.highlight_clips is True
|
|
assert cfg.zoomie_threshold == 0.8
|
|
|
|
|
|
class TestCameraConfigLocation:
|
|
def test_default_location_is_interior(self):
|
|
from vigilar.config import CameraConfig
|
|
cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x")
|
|
assert cfg.location == "INTERIOR"
|
|
|
|
def test_exterior_location(self):
|
|
from vigilar.config import CameraConfig
|
|
cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x", location="EXTERIOR")
|
|
assert cfg.location == "EXTERIOR"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_config.py::TestPetsConfig -v`
|
|
Expected: FAIL — PetsConfig not defined
|
|
|
|
- [ ] **Step 3: Add config models to config.py**
|
|
|
|
Add after `HealthConfig` (around line 239) and before `VigilarConfig`:
|
|
|
|
```python
|
|
# --- Pet Detection Config ---
|
|
|
|
class WildlifeThreatMap(BaseModel):
|
|
predator: list[str] = Field(default_factory=lambda: ["bear"])
|
|
nuisance: list[str] = Field(default_factory=list)
|
|
passive: list[str] = Field(default_factory=lambda: ["bird", "horse", "cow", "sheep"])
|
|
|
|
class WildlifeSizeHeuristics(BaseModel):
|
|
small: float = 0.02 # < 2% of frame → nuisance
|
|
medium: float = 0.08 # 2-8% → predator
|
|
large: float = 0.15 # > 8% → passive (deer-sized)
|
|
|
|
class WildlifeConfig(BaseModel):
|
|
threat_map: WildlifeThreatMap = Field(default_factory=WildlifeThreatMap)
|
|
size_heuristics: WildlifeSizeHeuristics = Field(default_factory=WildlifeSizeHeuristics)
|
|
|
|
class PetActivityConfig(BaseModel):
|
|
daily_digest: bool = True
|
|
highlight_clips: bool = True
|
|
zoomie_threshold: float = 0.8
|
|
|
|
class PetsConfig(BaseModel):
|
|
enabled: bool = False
|
|
model: str = "yolov8s"
|
|
model_path: str = "/var/vigilar/models/yolov8s.pt"
|
|
confidence_threshold: float = 0.5
|
|
pet_id_enabled: bool = True
|
|
pet_id_model_path: str = "/var/vigilar/models/pet_id.pt"
|
|
pet_id_threshold: float = 0.7
|
|
pet_id_low_confidence: float = 0.5
|
|
training_dir: str = "/var/vigilar/pets/training"
|
|
crop_staging_dir: str = "/var/vigilar/pets/staging"
|
|
crop_retention_days: int = 7
|
|
min_training_images: int = 20
|
|
wildlife: WildlifeConfig = Field(default_factory=WildlifeConfig)
|
|
activity: PetActivityConfig = Field(default_factory=PetActivityConfig)
|
|
```
|
|
|
|
Add `location` field to `CameraConfig` (after the existing fields, around line 45):
|
|
|
|
```python
|
|
location: str = "INTERIOR" # EXTERIOR | INTERIOR | TRANSITION
|
|
```
|
|
|
|
Add `pets` field to `VigilarConfig`:
|
|
|
|
```python
|
|
pets: PetsConfig = Field(default_factory=PetsConfig)
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_config.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/config.py tests/unit/test_config.py
|
|
git commit -m "Add pet detection, wildlife, and activity config models"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: Add database tables for pets, sightings, and training
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/storage/schema.py`
|
|
- Modify: `vigilar/storage/queries.py`
|
|
- Modify: `tests/unit/test_schema.py`
|
|
|
|
- [ ] **Step 1: Write tests for new tables**
|
|
|
|
Add to `tests/unit/test_schema.py`:
|
|
|
|
```python
|
|
from vigilar.storage.schema import pets, pet_sightings, wildlife_sightings, pet_training_images
|
|
|
|
|
|
class TestPetTables:
|
|
def test_pets_table_exists(self, test_db):
|
|
with test_db.connect() as conn:
|
|
result = conn.execute(pets.insert().values(
|
|
id="pet-1", name="Angel", species="cat", breed="DSH",
|
|
color_description="black", training_count=0, created_at=1000.0,
|
|
))
|
|
row = conn.execute(pets.select().where(pets.c.id == "pet-1")).first()
|
|
assert row is not None
|
|
assert row.name == "Angel"
|
|
assert row.species == "cat"
|
|
|
|
def test_pet_sightings_table(self, test_db):
|
|
with test_db.begin() as conn:
|
|
conn.execute(pets.insert().values(
|
|
id="pet-1", name="Angel", species="cat", training_count=0, created_at=1000.0,
|
|
))
|
|
conn.execute(pet_sightings.insert().values(
|
|
ts=1000.0, pet_id="pet-1", species="cat", camera_id="kitchen",
|
|
confidence=0.92, labeled=True,
|
|
))
|
|
rows = conn.execute(pet_sightings.select()).fetchall()
|
|
assert len(rows) == 1
|
|
assert rows[0].camera_id == "kitchen"
|
|
|
|
def test_wildlife_sightings_table(self, test_db):
|
|
with test_db.begin() as conn:
|
|
conn.execute(wildlife_sightings.insert().values(
|
|
ts=1000.0, species="bear", threat_level="PREDATOR",
|
|
camera_id="front", confidence=0.88,
|
|
))
|
|
rows = conn.execute(wildlife_sightings.select()).fetchall()
|
|
assert len(rows) == 1
|
|
assert rows[0].threat_level == "PREDATOR"
|
|
|
|
def test_pet_training_images_table(self, test_db):
|
|
with test_db.begin() as conn:
|
|
conn.execute(pets.insert().values(
|
|
id="pet-1", name="Angel", species="cat", training_count=0, created_at=1000.0,
|
|
))
|
|
conn.execute(pet_training_images.insert().values(
|
|
pet_id="pet-1", image_path="/var/vigilar/pets/training/angel/001.jpg",
|
|
source="upload", created_at=1000.0,
|
|
))
|
|
rows = conn.execute(pet_training_images.select()).fetchall()
|
|
assert len(rows) == 1
|
|
assert rows[0].source == "upload"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_schema.py::TestPetTables -v`
|
|
Expected: FAIL — tables not defined
|
|
|
|
- [ ] **Step 3: Add new tables to schema.py**
|
|
|
|
Add after `push_subscriptions` table (after line 125):
|
|
|
|
```python
|
|
pets = Table(
|
|
"pets",
|
|
metadata,
|
|
Column("id", String, primary_key=True),
|
|
Column("name", String, nullable=False),
|
|
Column("species", String, nullable=False),
|
|
Column("breed", String),
|
|
Column("color_description", String),
|
|
Column("photo_path", String),
|
|
Column("training_count", Integer, nullable=False, default=0),
|
|
Column("created_at", Float, nullable=False),
|
|
)
|
|
|
|
pet_sightings = Table(
|
|
"pet_sightings",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, autoincrement=True),
|
|
Column("ts", Float, nullable=False),
|
|
Column("pet_id", String),
|
|
Column("species", String, nullable=False),
|
|
Column("camera_id", String, nullable=False),
|
|
Column("confidence", Float),
|
|
Column("crop_path", String),
|
|
Column("labeled", Integer, nullable=False, default=0),
|
|
Column("event_id", Integer),
|
|
)
|
|
Index("idx_pet_sightings_ts", pet_sightings.c.ts.desc())
|
|
Index("idx_pet_sightings_pet", pet_sightings.c.pet_id, pet_sightings.c.ts.desc())
|
|
Index("idx_pet_sightings_camera", pet_sightings.c.camera_id, pet_sightings.c.ts.desc())
|
|
|
|
wildlife_sightings = Table(
|
|
"wildlife_sightings",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, autoincrement=True),
|
|
Column("ts", Float, nullable=False),
|
|
Column("species", String, nullable=False),
|
|
Column("threat_level", String, nullable=False),
|
|
Column("camera_id", String, nullable=False),
|
|
Column("confidence", Float),
|
|
Column("crop_path", String),
|
|
Column("event_id", Integer),
|
|
)
|
|
Index("idx_wildlife_ts", wildlife_sightings.c.ts.desc())
|
|
Index("idx_wildlife_threat", wildlife_sightings.c.threat_level, wildlife_sightings.c.ts.desc())
|
|
|
|
pet_training_images = Table(
|
|
"pet_training_images",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, autoincrement=True),
|
|
Column("pet_id", String, nullable=False),
|
|
Column("image_path", String, nullable=False),
|
|
Column("source", String, nullable=False),
|
|
Column("created_at", Float, nullable=False),
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_schema.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/storage/schema.py tests/unit/test_schema.py
|
|
git commit -m "Add pets, pet_sightings, wildlife_sightings, pet_training_images tables"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: Add pet and wildlife database query functions
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/storage/queries.py`
|
|
- Test: `tests/unit/test_pet_queries.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for pet query functions**
|
|
|
|
```python
|
|
# tests/unit/test_pet_queries.py
|
|
"""Tests for pet and wildlife query functions."""
|
|
|
|
import time
|
|
|
|
from vigilar.storage.queries import (
|
|
insert_pet,
|
|
get_pet,
|
|
get_all_pets,
|
|
insert_pet_sighting,
|
|
get_pet_sightings,
|
|
get_pet_last_location,
|
|
insert_wildlife_sighting,
|
|
get_wildlife_sightings,
|
|
insert_training_image,
|
|
get_training_images,
|
|
get_unlabeled_sightings,
|
|
label_sighting,
|
|
)
|
|
|
|
|
|
class TestPetCRUD:
|
|
def test_insert_and_get_pet(self, test_db):
|
|
pet_id = insert_pet(test_db, name="Angel", species="cat", breed="DSH",
|
|
color_description="black")
|
|
pet = get_pet(test_db, pet_id)
|
|
assert pet is not None
|
|
assert pet["name"] == "Angel"
|
|
assert pet["species"] == "cat"
|
|
assert pet["training_count"] == 0
|
|
|
|
def test_get_all_pets(self, test_db):
|
|
insert_pet(test_db, name="Angel", species="cat")
|
|
insert_pet(test_db, name="Milo", species="dog")
|
|
all_pets = get_all_pets(test_db)
|
|
assert len(all_pets) == 2
|
|
|
|
|
|
class TestPetSightings:
|
|
def test_insert_and_query(self, test_db):
|
|
pet_id = insert_pet(test_db, name="Angel", species="cat")
|
|
insert_pet_sighting(test_db, pet_id=pet_id, species="cat",
|
|
camera_id="kitchen", confidence=0.92)
|
|
sightings = get_pet_sightings(test_db, limit=10)
|
|
assert len(sightings) == 1
|
|
assert sightings[0]["camera_id"] == "kitchen"
|
|
|
|
def test_last_location(self, test_db):
|
|
pet_id = insert_pet(test_db, name="Angel", species="cat")
|
|
insert_pet_sighting(test_db, pet_id=pet_id, species="cat",
|
|
camera_id="kitchen", confidence=0.9)
|
|
insert_pet_sighting(test_db, pet_id=pet_id, species="cat",
|
|
camera_id="living_room", confidence=0.95)
|
|
loc = get_pet_last_location(test_db, pet_id)
|
|
assert loc is not None
|
|
assert loc["camera_id"] == "living_room"
|
|
|
|
def test_unlabeled_sightings(self, test_db):
|
|
insert_pet_sighting(test_db, pet_id=None, species="cat",
|
|
camera_id="kitchen", confidence=0.6, crop_path="/tmp/crop.jpg")
|
|
unlabeled = get_unlabeled_sightings(test_db, limit=10)
|
|
assert len(unlabeled) == 1
|
|
assert unlabeled[0]["labeled"] == 0
|
|
|
|
def test_label_sighting(self, test_db):
|
|
pet_id = insert_pet(test_db, name="Angel", species="cat")
|
|
insert_pet_sighting(test_db, pet_id=None, species="cat",
|
|
camera_id="kitchen", confidence=0.6)
|
|
sightings = get_unlabeled_sightings(test_db, limit=10)
|
|
sighting_id = sightings[0]["id"]
|
|
label_sighting(test_db, sighting_id, pet_id)
|
|
updated = get_pet_sightings(test_db, pet_id=pet_id)
|
|
assert len(updated) == 1
|
|
assert updated[0]["labeled"] == 1
|
|
|
|
|
|
class TestWildlifeSightings:
|
|
def test_insert_and_query(self, test_db):
|
|
insert_wildlife_sighting(test_db, species="bear", threat_level="PREDATOR",
|
|
camera_id="front", confidence=0.88)
|
|
sightings = get_wildlife_sightings(test_db, limit=10)
|
|
assert len(sightings) == 1
|
|
assert sightings[0]["threat_level"] == "PREDATOR"
|
|
|
|
def test_filter_by_threat(self, test_db):
|
|
insert_wildlife_sighting(test_db, species="bear", threat_level="PREDATOR",
|
|
camera_id="front", confidence=0.88)
|
|
insert_wildlife_sighting(test_db, species="deer", threat_level="PASSIVE",
|
|
camera_id="back", confidence=0.75)
|
|
predators = get_wildlife_sightings(test_db, threat_level="PREDATOR")
|
|
assert len(predators) == 1
|
|
|
|
|
|
class TestTrainingImages:
|
|
def test_insert_and_query(self, test_db):
|
|
pet_id = insert_pet(test_db, name="Angel", species="cat")
|
|
insert_training_image(test_db, pet_id=pet_id,
|
|
image_path="/var/vigilar/pets/training/angel/001.jpg",
|
|
source="upload")
|
|
images = get_training_images(test_db, pet_id)
|
|
assert len(images) == 1
|
|
assert images[0]["source"] == "upload"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_pet_queries.py -v`
|
|
Expected: FAIL — functions not defined
|
|
|
|
- [ ] **Step 3: Implement query functions**
|
|
|
|
Add to `vigilar/storage/queries.py`. Add imports at the top:
|
|
|
|
```python
|
|
from vigilar.storage.schema import (
|
|
# existing imports...
|
|
pets,
|
|
pet_sightings,
|
|
pet_training_images,
|
|
wildlife_sightings,
|
|
)
|
|
```
|
|
|
|
Add the following functions at the bottom of the file:
|
|
|
|
```python
|
|
# --- Pets ---
|
|
|
|
def insert_pet(
|
|
engine: Engine,
|
|
name: str,
|
|
species: str,
|
|
breed: str | None = None,
|
|
color_description: str | None = None,
|
|
photo_path: str | None = None,
|
|
) -> str:
|
|
import uuid
|
|
pet_id = str(uuid.uuid4())
|
|
with engine.begin() as conn:
|
|
conn.execute(pets.insert().values(
|
|
id=pet_id, name=name, species=species, breed=breed,
|
|
color_description=color_description, photo_path=photo_path,
|
|
training_count=0, created_at=time.time(),
|
|
))
|
|
return pet_id
|
|
|
|
|
|
def get_pet(engine: Engine, pet_id: str) -> dict[str, Any] | None:
|
|
with engine.connect() as conn:
|
|
row = conn.execute(pets.select().where(pets.c.id == pet_id)).first()
|
|
return dict(row._mapping) if row else None
|
|
|
|
|
|
def get_all_pets(engine: Engine) -> list[dict[str, Any]]:
|
|
with engine.connect() as conn:
|
|
rows = conn.execute(pets.select().order_by(pets.c.name)).fetchall()
|
|
return [dict(r._mapping) for r in rows]
|
|
|
|
|
|
# --- Pet Sightings ---
|
|
|
|
def insert_pet_sighting(
|
|
engine: Engine,
|
|
species: str,
|
|
camera_id: str,
|
|
confidence: float,
|
|
pet_id: str | None = None,
|
|
crop_path: str | None = None,
|
|
event_id: int | None = None,
|
|
) -> int:
|
|
with engine.begin() as conn:
|
|
result = conn.execute(pet_sightings.insert().values(
|
|
ts=time.time(), pet_id=pet_id, species=species,
|
|
camera_id=camera_id, confidence=confidence,
|
|
crop_path=crop_path, labeled=1 if pet_id else 0,
|
|
event_id=event_id,
|
|
))
|
|
return result.inserted_primary_key[0]
|
|
|
|
|
|
def get_pet_sightings(
|
|
engine: Engine,
|
|
pet_id: str | None = None,
|
|
camera_id: str | None = None,
|
|
since_ts: float | None = None,
|
|
limit: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
query = select(pet_sightings).order_by(desc(pet_sightings.c.ts)).limit(limit)
|
|
if pet_id:
|
|
query = query.where(pet_sightings.c.pet_id == pet_id)
|
|
if camera_id:
|
|
query = query.where(pet_sightings.c.camera_id == camera_id)
|
|
if since_ts:
|
|
query = query.where(pet_sightings.c.ts >= since_ts)
|
|
with engine.connect() as conn:
|
|
rows = conn.execute(query).fetchall()
|
|
return [dict(r._mapping) for r in rows]
|
|
|
|
|
|
def get_pet_last_location(engine: Engine, pet_id: str) -> dict[str, Any] | None:
|
|
with engine.connect() as conn:
|
|
row = conn.execute(
|
|
select(pet_sightings)
|
|
.where(pet_sightings.c.pet_id == pet_id)
|
|
.order_by(desc(pet_sightings.c.ts))
|
|
.limit(1)
|
|
).first()
|
|
return dict(row._mapping) if row else None
|
|
|
|
|
|
def get_unlabeled_sightings(
|
|
engine: Engine,
|
|
species: str | None = None,
|
|
limit: int = 50,
|
|
) -> list[dict[str, Any]]:
|
|
query = (
|
|
select(pet_sightings)
|
|
.where(pet_sightings.c.labeled == 0)
|
|
.order_by(desc(pet_sightings.c.ts))
|
|
.limit(limit)
|
|
)
|
|
if species:
|
|
query = query.where(pet_sightings.c.species == species)
|
|
with engine.connect() as conn:
|
|
rows = conn.execute(query).fetchall()
|
|
return [dict(r._mapping) for r in rows]
|
|
|
|
|
|
def label_sighting(engine: Engine, sighting_id: int, pet_id: str) -> None:
|
|
with engine.begin() as conn:
|
|
conn.execute(
|
|
pet_sightings.update()
|
|
.where(pet_sightings.c.id == sighting_id)
|
|
.values(pet_id=pet_id, labeled=1)
|
|
)
|
|
|
|
|
|
# --- Wildlife Sightings ---
|
|
|
|
def insert_wildlife_sighting(
|
|
engine: Engine,
|
|
species: str,
|
|
threat_level: str,
|
|
camera_id: str,
|
|
confidence: float,
|
|
crop_path: str | None = None,
|
|
event_id: int | None = None,
|
|
) -> int:
|
|
with engine.begin() as conn:
|
|
result = conn.execute(wildlife_sightings.insert().values(
|
|
ts=time.time(), species=species, threat_level=threat_level,
|
|
camera_id=camera_id, confidence=confidence,
|
|
crop_path=crop_path, event_id=event_id,
|
|
))
|
|
return result.inserted_primary_key[0]
|
|
|
|
|
|
def get_wildlife_sightings(
|
|
engine: Engine,
|
|
threat_level: str | None = None,
|
|
camera_id: str | None = None,
|
|
since_ts: float | None = None,
|
|
limit: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
query = select(wildlife_sightings).order_by(desc(wildlife_sightings.c.ts)).limit(limit)
|
|
if threat_level:
|
|
query = query.where(wildlife_sightings.c.threat_level == threat_level)
|
|
if camera_id:
|
|
query = query.where(wildlife_sightings.c.camera_id == camera_id)
|
|
if since_ts:
|
|
query = query.where(wildlife_sightings.c.ts >= since_ts)
|
|
with engine.connect() as conn:
|
|
rows = conn.execute(query).fetchall()
|
|
return [dict(r._mapping) for r in rows]
|
|
|
|
|
|
# --- Training Images ---
|
|
|
|
def insert_training_image(
|
|
engine: Engine,
|
|
pet_id: str,
|
|
image_path: str,
|
|
source: str,
|
|
) -> int:
|
|
with engine.begin() as conn:
|
|
result = conn.execute(pet_training_images.insert().values(
|
|
pet_id=pet_id, image_path=image_path,
|
|
source=source, created_at=time.time(),
|
|
))
|
|
# Update training count on pet
|
|
conn.execute(
|
|
pets.update().where(pets.c.id == pet_id)
|
|
.values(training_count=pets.c.training_count + 1)
|
|
)
|
|
return result.inserted_primary_key[0]
|
|
|
|
|
|
def get_training_images(
|
|
engine: Engine,
|
|
pet_id: str,
|
|
) -> list[dict[str, Any]]:
|
|
with engine.connect() as conn:
|
|
rows = conn.execute(
|
|
select(pet_training_images)
|
|
.where(pet_training_images.c.pet_id == pet_id)
|
|
.order_by(desc(pet_training_images.c.created_at))
|
|
).fetchall()
|
|
return [dict(r._mapping) for r in rows]
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_pet_queries.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/storage/queries.py tests/unit/test_pet_queries.py
|
|
git commit -m "Add pet and wildlife database query functions"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: Implement YOLOv8 unified detector
|
|
|
|
**Files:**
|
|
- Create: `vigilar/detection/yolo.py`
|
|
- Test: `tests/unit/test_yolo_detector.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for YOLOv8 detector**
|
|
|
|
```python
|
|
# tests/unit/test_yolo_detector.py
|
|
"""Tests for YOLOv8 unified detector."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from vigilar.detection.person import Detection
|
|
from vigilar.detection.yolo import YOLODetector, ANIMAL_CLASSES, WILDLIFE_CLASSES
|
|
|
|
|
|
class TestYOLOConstants:
|
|
def test_animal_classes(self):
|
|
assert "cat" in ANIMAL_CLASSES
|
|
assert "dog" in ANIMAL_CLASSES
|
|
|
|
def test_wildlife_classes(self):
|
|
assert "bear" in WILDLIFE_CLASSES
|
|
assert "bird" in WILDLIFE_CLASSES
|
|
|
|
def test_no_overlap_animal_wildlife(self):
|
|
assert not ANIMAL_CLASSES.intersection(WILDLIFE_CLASSES)
|
|
|
|
|
|
class TestYOLODetector:
|
|
def test_initializes_without_model(self):
|
|
detector = YOLODetector(model_path="nonexistent.pt", confidence_threshold=0.5)
|
|
assert not detector.is_loaded
|
|
|
|
def test_detect_returns_empty_when_not_loaded(self):
|
|
detector = YOLODetector(model_path="nonexistent.pt")
|
|
frame = np.zeros((480, 640, 3), dtype=np.uint8)
|
|
detections = detector.detect(frame)
|
|
assert detections == []
|
|
|
|
def test_classify_detection_person(self):
|
|
d = Detection(class_name="person", class_id=0, confidence=0.9, bbox=(10, 20, 100, 200))
|
|
assert YOLODetector.classify(d) == "person"
|
|
|
|
def test_classify_detection_vehicle(self):
|
|
d = Detection(class_name="car", class_id=2, confidence=0.85, bbox=(10, 20, 100, 200))
|
|
assert YOLODetector.classify(d) == "vehicle"
|
|
|
|
def test_classify_detection_domestic_animal(self):
|
|
d = Detection(class_name="cat", class_id=15, confidence=0.9, bbox=(10, 20, 100, 200))
|
|
assert YOLODetector.classify(d) == "domestic_animal"
|
|
|
|
def test_classify_detection_wildlife(self):
|
|
d = Detection(class_name="bear", class_id=21, confidence=0.8, bbox=(10, 20, 100, 200))
|
|
assert YOLODetector.classify(d) == "wildlife"
|
|
|
|
def test_classify_detection_other(self):
|
|
d = Detection(class_name="chair", class_id=56, confidence=0.7, bbox=(10, 20, 100, 200))
|
|
assert YOLODetector.classify(d) == "other"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_yolo_detector.py -v`
|
|
Expected: FAIL — yolo module not found
|
|
|
|
- [ ] **Step 3: Implement YOLOv8 detector**
|
|
|
|
```python
|
|
# vigilar/detection/yolo.py
|
|
"""Unified object detection using YOLOv8 via ultralytics."""
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
from vigilar.detection.person import Detection
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# COCO class IDs for domestic animals
|
|
ANIMAL_CLASSES = {"cat", "dog"}
|
|
|
|
# COCO class IDs for wildlife (subset that YOLO can detect)
|
|
WILDLIFE_CLASSES = {"bear", "bird", "horse", "cow", "sheep", "elephant", "zebra", "giraffe"}
|
|
|
|
# Vehicle class names from COCO
|
|
VEHICLE_CLASSES = {"car", "motorcycle", "bus", "truck", "boat"}
|
|
|
|
|
|
class YOLODetector:
|
|
def __init__(self, model_path: str, confidence_threshold: float = 0.5):
|
|
self._threshold = confidence_threshold
|
|
self._model = None
|
|
self.is_loaded = False
|
|
|
|
if Path(model_path).exists():
|
|
try:
|
|
from ultralytics import YOLO
|
|
self._model = YOLO(model_path)
|
|
self.is_loaded = True
|
|
log.info("YOLO model loaded from %s", model_path)
|
|
except Exception as e:
|
|
log.error("Failed to load YOLO model: %s", e)
|
|
else:
|
|
log.warning("YOLO model not found at %s — detection disabled", model_path)
|
|
|
|
def detect(self, frame: np.ndarray) -> list[Detection]:
|
|
if not self.is_loaded or self._model is None:
|
|
return []
|
|
|
|
results = self._model(frame, conf=self._threshold, verbose=False)
|
|
detections = []
|
|
|
|
for result in results:
|
|
for box in result.boxes:
|
|
class_id = int(box.cls[0])
|
|
confidence = float(box.conf[0])
|
|
class_name = result.names[class_id]
|
|
|
|
x1, y1, x2, y2 = box.xyxy[0].tolist()
|
|
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
|
|
bw, bh = x2 - x1, y2 - y1
|
|
if bw <= 0 or bh <= 0:
|
|
continue
|
|
|
|
detections.append(Detection(
|
|
class_name=class_name,
|
|
class_id=class_id,
|
|
confidence=confidence,
|
|
bbox=(x1, y1, bw, bh),
|
|
))
|
|
|
|
return detections
|
|
|
|
@staticmethod
|
|
def classify(detection: Detection) -> str:
|
|
name = detection.class_name
|
|
if name == "person":
|
|
return "person"
|
|
if name in VEHICLE_CLASSES:
|
|
return "vehicle"
|
|
if name in ANIMAL_CLASSES:
|
|
return "domestic_animal"
|
|
if name in WILDLIFE_CLASSES:
|
|
return "wildlife"
|
|
return "other"
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_yolo_detector.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/detection/yolo.py tests/unit/test_yolo_detector.py
|
|
git commit -m "Add YOLOv8 unified detector with class classification"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: Implement wildlife threat classifier
|
|
|
|
**Files:**
|
|
- Create: `vigilar/detection/wildlife.py`
|
|
- Test: `tests/unit/test_wildlife.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for wildlife threat classification**
|
|
|
|
```python
|
|
# tests/unit/test_wildlife.py
|
|
"""Tests for wildlife threat classification."""
|
|
|
|
from vigilar.config import WildlifeConfig, WildlifeThreatMap, WildlifeSizeHeuristics
|
|
from vigilar.detection.person import Detection
|
|
from vigilar.detection.wildlife import classify_wildlife_threat
|
|
|
|
|
|
def _make_config(**kwargs):
|
|
return WildlifeConfig(**kwargs)
|
|
|
|
|
|
class TestWildlifeThreatClassification:
|
|
def test_bear_is_predator(self):
|
|
cfg = _make_config()
|
|
d = Detection(class_name="bear", class_id=21, confidence=0.9, bbox=(100, 100, 200, 300))
|
|
level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080)
|
|
assert level == "PREDATOR"
|
|
assert species == "bear"
|
|
|
|
def test_bird_is_passive(self):
|
|
cfg = _make_config()
|
|
d = Detection(class_name="bird", class_id=14, confidence=0.8, bbox=(10, 10, 30, 20))
|
|
level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080)
|
|
assert level == "PASSIVE"
|
|
assert species == "bird"
|
|
|
|
def test_unknown_small_is_nuisance(self):
|
|
cfg = _make_config()
|
|
# Small bbox relative to frame → nuisance (raccoon/skunk sized)
|
|
# frame_area = 1920*1080 = 2073600, bbox area = 30*30 = 900 → 0.04%
|
|
d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 30, 30))
|
|
level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080)
|
|
assert level == "NUISANCE"
|
|
assert species == "unknown"
|
|
|
|
def test_unknown_medium_is_predator(self):
|
|
cfg = _make_config()
|
|
# Medium bbox → predator (fox/coyote sized)
|
|
# bbox area = 200*150 = 30000, frame = 2073600 → 1.4%... need bigger
|
|
# bbox area = 300*300 = 90000 / 2073600 → 4.3% → between small and medium thresholds
|
|
d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 300, 300))
|
|
level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080)
|
|
assert level == "PREDATOR"
|
|
|
|
def test_unknown_large_is_passive(self):
|
|
cfg = _make_config()
|
|
# Large bbox → passive (deer sized)
|
|
# bbox area = 600*500 = 300000 / 2073600 → 14.5% → > large threshold
|
|
d = Detection(class_name="unknown", class_id=99, confidence=0.7, bbox=(100, 100, 600, 500))
|
|
level, species = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080)
|
|
assert level == "PASSIVE"
|
|
|
|
def test_custom_threat_map(self):
|
|
cfg = _make_config(threat_map=WildlifeThreatMap(predator=["bear", "wolf"]))
|
|
d = Detection(class_name="wolf", class_id=99, confidence=0.85, bbox=(100, 100, 200, 200))
|
|
level, _ = classify_wildlife_threat(d, cfg, frame_area=1920 * 1080)
|
|
assert level == "PREDATOR"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_wildlife.py -v`
|
|
Expected: FAIL — wildlife module not found
|
|
|
|
- [ ] **Step 3: Implement wildlife classifier**
|
|
|
|
```python
|
|
# vigilar/detection/wildlife.py
|
|
"""Wildlife threat level classification."""
|
|
|
|
from vigilar.config import WildlifeConfig
|
|
from vigilar.detection.person import Detection
|
|
|
|
|
|
def classify_wildlife_threat(
|
|
detection: Detection,
|
|
config: WildlifeConfig,
|
|
frame_area: int,
|
|
) -> tuple[str, str]:
|
|
"""Classify a wildlife detection into threat level and species.
|
|
|
|
Returns (threat_level, species_name).
|
|
"""
|
|
species = detection.class_name
|
|
threat_map = config.threat_map
|
|
|
|
# Direct COCO class mapping first
|
|
if species in threat_map.predator:
|
|
return "PREDATOR", species
|
|
if species in threat_map.nuisance:
|
|
return "NUISANCE", species
|
|
if species in threat_map.passive:
|
|
return "PASSIVE", species
|
|
|
|
# Fallback to size heuristics for unknown species
|
|
_, _, w, h = detection.bbox
|
|
bbox_area = w * h
|
|
area_ratio = bbox_area / frame_area if frame_area > 0 else 0
|
|
|
|
heuristics = config.size_heuristics
|
|
if area_ratio < heuristics.small:
|
|
return "NUISANCE", species
|
|
elif area_ratio < heuristics.large:
|
|
return "PREDATOR", species
|
|
else:
|
|
return "PASSIVE", species
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_wildlife.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/detection/wildlife.py tests/unit/test_wildlife.py
|
|
git commit -m "Add wildlife threat classification with size heuristics"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 7: Implement pet ID classifier
|
|
|
|
**Files:**
|
|
- Create: `vigilar/detection/pet_id.py`
|
|
- Test: `tests/unit/test_pet_id.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for pet ID classifier**
|
|
|
|
```python
|
|
# tests/unit/test_pet_id.py
|
|
"""Tests for pet ID classifier."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from vigilar.detection.pet_id import PetIDClassifier, PetIDResult
|
|
|
|
|
|
class TestPetIDResult:
|
|
def test_identified(self):
|
|
r = PetIDResult(pet_id="pet-1", pet_name="Angel", confidence=0.9)
|
|
assert r.is_identified
|
|
assert not r.is_low_confidence
|
|
|
|
def test_low_confidence(self):
|
|
r = PetIDResult(pet_id="pet-1", pet_name="Angel", confidence=0.6)
|
|
assert r.is_identified
|
|
assert r.is_low_confidence
|
|
|
|
def test_unknown(self):
|
|
r = PetIDResult(pet_id=None, pet_name=None, confidence=0.3)
|
|
assert not r.is_identified
|
|
|
|
|
|
class TestPetIDClassifier:
|
|
def test_not_loaded_returns_unknown(self):
|
|
classifier = PetIDClassifier(model_path="nonexistent.pt")
|
|
assert not classifier.is_loaded
|
|
crop = np.zeros((224, 224, 3), dtype=np.uint8)
|
|
result = classifier.identify(crop, species="cat")
|
|
assert not result.is_identified
|
|
|
|
def test_no_pets_registered_returns_unknown(self):
|
|
classifier = PetIDClassifier(model_path="nonexistent.pt")
|
|
assert classifier.pet_count == 0
|
|
|
|
def test_register_pet(self):
|
|
classifier = PetIDClassifier(model_path="nonexistent.pt")
|
|
classifier.register_pet("pet-1", "Angel", "cat")
|
|
classifier.register_pet("pet-2", "Milo", "dog")
|
|
assert classifier.pet_count == 2
|
|
|
|
def test_species_filter(self):
|
|
classifier = PetIDClassifier(model_path="nonexistent.pt")
|
|
classifier.register_pet("pet-1", "Angel", "cat")
|
|
classifier.register_pet("pet-2", "Taquito", "cat")
|
|
classifier.register_pet("pet-3", "Milo", "dog")
|
|
assert len(classifier.get_pets_by_species("cat")) == 2
|
|
assert len(classifier.get_pets_by_species("dog")) == 1
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_pet_id.py -v`
|
|
Expected: FAIL — pet_id module not found
|
|
|
|
- [ ] **Step 3: Implement pet ID classifier**
|
|
|
|
```python
|
|
# vigilar/detection/pet_id.py
|
|
"""Pet identification classifier using MobileNetV3-Small."""
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Confidence thresholds (overridden by config at runtime)
|
|
DEFAULT_HIGH_THRESHOLD = 0.7
|
|
DEFAULT_LOW_THRESHOLD = 0.5
|
|
|
|
|
|
@dataclass
|
|
class PetIDResult:
|
|
pet_id: str | None
|
|
pet_name: str | None
|
|
confidence: float
|
|
high_threshold: float = DEFAULT_HIGH_THRESHOLD
|
|
low_threshold: float = DEFAULT_LOW_THRESHOLD
|
|
|
|
@property
|
|
def is_identified(self) -> bool:
|
|
return self.pet_id is not None and self.confidence >= self.low_threshold
|
|
|
|
@property
|
|
def is_low_confidence(self) -> bool:
|
|
return (
|
|
self.pet_id is not None
|
|
and self.low_threshold <= self.confidence < self.high_threshold
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class RegisteredPet:
|
|
pet_id: str
|
|
name: str
|
|
species: str
|
|
class_index: int # index in the classifier output
|
|
|
|
|
|
class PetIDClassifier:
|
|
def __init__(
|
|
self,
|
|
model_path: str,
|
|
high_threshold: float = DEFAULT_HIGH_THRESHOLD,
|
|
low_threshold: float = DEFAULT_LOW_THRESHOLD,
|
|
):
|
|
self._model_path = model_path
|
|
self._high_threshold = high_threshold
|
|
self._low_threshold = low_threshold
|
|
self._model = None
|
|
self._transform = None
|
|
self.is_loaded = False
|
|
self._pets: list[RegisteredPet] = []
|
|
|
|
if Path(model_path).exists():
|
|
try:
|
|
import torch
|
|
self._model = torch.load(model_path, map_location="cpu", weights_only=False)
|
|
self._model.eval()
|
|
self.is_loaded = True
|
|
log.info("Pet ID model loaded from %s", model_path)
|
|
except Exception as e:
|
|
log.error("Failed to load pet ID model: %s", e)
|
|
else:
|
|
log.info("Pet ID model not found at %s — identification disabled until trained",
|
|
model_path)
|
|
|
|
@property
|
|
def pet_count(self) -> int:
|
|
return len(self._pets)
|
|
|
|
def register_pet(self, pet_id: str, name: str, species: str) -> None:
|
|
idx = len(self._pets)
|
|
self._pets.append(RegisteredPet(pet_id=pet_id, name=name, species=species,
|
|
class_index=idx))
|
|
|
|
def get_pets_by_species(self, species: str) -> list[RegisteredPet]:
|
|
return [p for p in self._pets if p.species == species]
|
|
|
|
def identify(self, crop: np.ndarray, species: str) -> PetIDResult:
|
|
if not self.is_loaded or self._model is None:
|
|
return PetIDResult(pet_id=None, pet_name=None, confidence=0.0)
|
|
|
|
candidates = self.get_pets_by_species(species)
|
|
if not candidates:
|
|
return PetIDResult(pet_id=None, pet_name=None, confidence=0.0)
|
|
|
|
try:
|
|
import cv2
|
|
import torch
|
|
from torchvision import transforms
|
|
|
|
# Preprocess crop
|
|
resized = cv2.resize(crop, (224, 224))
|
|
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
|
|
transform = transforms.Compose([
|
|
transforms.ToTensor(),
|
|
transforms.Normalize(mean=[0.485, 0.456, 0.406],
|
|
std=[0.229, 0.224, 0.225]),
|
|
])
|
|
tensor = transform(rgb).unsqueeze(0)
|
|
|
|
with torch.no_grad():
|
|
output = self._model(tensor)
|
|
probs = torch.softmax(output, dim=1)[0]
|
|
|
|
# Find best match among candidates for this species
|
|
best_conf = 0.0
|
|
best_pet = None
|
|
for pet in candidates:
|
|
if pet.class_index < len(probs):
|
|
conf = float(probs[pet.class_index])
|
|
if conf > best_conf:
|
|
best_conf = conf
|
|
best_pet = pet
|
|
|
|
if best_pet and best_conf >= self._low_threshold:
|
|
return PetIDResult(
|
|
pet_id=best_pet.pet_id,
|
|
pet_name=best_pet.name,
|
|
confidence=best_conf,
|
|
high_threshold=self._high_threshold,
|
|
low_threshold=self._low_threshold,
|
|
)
|
|
|
|
return PetIDResult(pet_id=None, pet_name=None, confidence=best_conf)
|
|
|
|
except Exception as e:
|
|
log.error("Pet ID inference failed: %s", e)
|
|
return PetIDResult(pet_id=None, pet_name=None, confidence=0.0)
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_pet_id.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/detection/pet_id.py tests/unit/test_pet_id.py
|
|
git commit -m "Add pet ID classifier with species-filtered identification"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 8: Implement pet ID model trainer
|
|
|
|
**Files:**
|
|
- Create: `vigilar/detection/trainer.py`
|
|
- Test: `tests/unit/test_trainer.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for trainer**
|
|
|
|
```python
|
|
# tests/unit/test_trainer.py
|
|
"""Tests for pet ID model trainer."""
|
|
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from vigilar.detection.trainer import PetTrainer, TrainingStatus
|
|
|
|
|
|
class TestTrainingStatus:
|
|
def test_initial_status(self):
|
|
status = TrainingStatus()
|
|
assert status.is_training is False
|
|
assert status.progress == 0.0
|
|
assert status.error is None
|
|
|
|
|
|
class TestPetTrainer:
|
|
def test_check_readiness_no_pets(self, tmp_path):
|
|
trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt"))
|
|
ready, msg = trainer.check_readiness(min_images=20)
|
|
assert not ready
|
|
assert "No pet" in msg
|
|
|
|
def test_check_readiness_insufficient_images(self, tmp_path):
|
|
# Create pet dirs with too few images
|
|
pet_dir = tmp_path / "angel"
|
|
pet_dir.mkdir()
|
|
for i in range(5):
|
|
(pet_dir / f"{i}.jpg").write_bytes(b"fake")
|
|
|
|
trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt"))
|
|
ready, msg = trainer.check_readiness(min_images=20)
|
|
assert not ready
|
|
assert "angel" in msg.lower()
|
|
|
|
def test_check_readiness_sufficient_images(self, tmp_path):
|
|
for name in ["angel", "taquito"]:
|
|
pet_dir = tmp_path / name
|
|
pet_dir.mkdir()
|
|
for i in range(25):
|
|
(pet_dir / f"{i}.jpg").write_bytes(b"fake")
|
|
|
|
trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt"))
|
|
ready, msg = trainer.check_readiness(min_images=20)
|
|
assert ready
|
|
|
|
def test_get_class_names(self, tmp_path):
|
|
for name in ["angel", "milo", "taquito"]:
|
|
(tmp_path / name).mkdir()
|
|
|
|
trainer = PetTrainer(training_dir=str(tmp_path), model_output_path=str(tmp_path / "model.pt"))
|
|
names = trainer.get_class_names()
|
|
assert names == ["angel", "milo", "taquito"] # sorted
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_trainer.py -v`
|
|
Expected: FAIL — trainer module not found
|
|
|
|
- [ ] **Step 3: Implement trainer**
|
|
|
|
```python
|
|
# vigilar/detection/trainer.py
|
|
"""Pet ID model trainer using MobileNetV3-Small with transfer learning."""
|
|
|
|
import logging
|
|
import shutil
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TrainingStatus:
|
|
is_training: bool = False
|
|
progress: float = 0.0
|
|
epoch: int = 0
|
|
total_epochs: int = 0
|
|
accuracy: float = 0.0
|
|
error: str | None = None
|
|
|
|
|
|
class PetTrainer:
|
|
def __init__(self, training_dir: str, model_output_path: str):
|
|
self._training_dir = Path(training_dir)
|
|
self._model_output_path = Path(model_output_path)
|
|
self.status = TrainingStatus()
|
|
|
|
def get_class_names(self) -> list[str]:
|
|
if not self._training_dir.exists():
|
|
return []
|
|
return sorted([
|
|
d.name for d in self._training_dir.iterdir()
|
|
if d.is_dir() and not d.name.startswith(".")
|
|
])
|
|
|
|
def check_readiness(self, min_images: int = 20) -> tuple[bool, str]:
|
|
class_names = self.get_class_names()
|
|
if not class_names:
|
|
return False, "No pet directories found in training directory."
|
|
|
|
insufficient = []
|
|
for name in class_names:
|
|
pet_dir = self._training_dir / name
|
|
image_count = sum(1 for f in pet_dir.iterdir()
|
|
if f.suffix.lower() in (".jpg", ".jpeg", ".png"))
|
|
if image_count < min_images:
|
|
insufficient.append(f"{name}: {image_count}/{min_images}")
|
|
|
|
if insufficient:
|
|
return False, f"Insufficient training images: {', '.join(insufficient)}"
|
|
|
|
return True, f"Ready to train with {len(class_names)} classes."
|
|
|
|
def train(self, epochs: int = 30, batch_size: int = 16) -> bool:
|
|
try:
|
|
import torch
|
|
import torch.nn as nn
|
|
from torch.utils.data import DataLoader
|
|
from torchvision import datasets, models, transforms
|
|
|
|
self.status = TrainingStatus(is_training=True, total_epochs=epochs)
|
|
class_names = self.get_class_names()
|
|
num_classes = len(class_names)
|
|
|
|
if num_classes < 2:
|
|
self.status.error = "Need at least 2 pets to train."
|
|
self.status.is_training = False
|
|
return False
|
|
|
|
# Data transforms with augmentation
|
|
train_transform = transforms.Compose([
|
|
transforms.Resize((256, 256)),
|
|
transforms.RandomCrop(224),
|
|
transforms.RandomHorizontalFlip(),
|
|
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
|
|
transforms.ToTensor(),
|
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
|
|
])
|
|
|
|
dataset = datasets.ImageFolder(str(self._training_dir), transform=train_transform)
|
|
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2)
|
|
|
|
# MobileNetV3-Small with transfer learning
|
|
model = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT)
|
|
model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, num_classes)
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
model = model.to(device)
|
|
|
|
# Freeze feature extractor, train classifier head first
|
|
for param in model.features.parameters():
|
|
param.requires_grad = False
|
|
|
|
optimizer = torch.optim.Adam(model.classifier.parameters(), lr=1e-3)
|
|
criterion = nn.CrossEntropyLoss()
|
|
|
|
# Training loop
|
|
for epoch in range(epochs):
|
|
model.train()
|
|
running_loss = 0.0
|
|
correct = 0
|
|
total = 0
|
|
|
|
# Unfreeze features after 5 epochs for fine-tuning
|
|
if epoch == 5:
|
|
for param in model.features.parameters():
|
|
param.requires_grad = True
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
|
|
|
|
for inputs, labels in loader:
|
|
inputs, labels = inputs.to(device), labels.to(device)
|
|
optimizer.zero_grad()
|
|
outputs = model(inputs)
|
|
loss = criterion(outputs, labels)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
running_loss += loss.item()
|
|
_, predicted = outputs.max(1)
|
|
total += labels.size(0)
|
|
correct += predicted.eq(labels).sum().item()
|
|
|
|
accuracy = correct / total if total > 0 else 0
|
|
self.status.epoch = epoch + 1
|
|
self.status.progress = (epoch + 1) / epochs
|
|
self.status.accuracy = accuracy
|
|
log.info("Epoch %d/%d — loss: %.4f, accuracy: %.4f",
|
|
epoch + 1, epochs, running_loss / len(loader), accuracy)
|
|
|
|
# Backup existing model
|
|
if self._model_output_path.exists():
|
|
backup_path = self._model_output_path.with_suffix(".backup.pt")
|
|
shutil.copy2(self._model_output_path, backup_path)
|
|
log.info("Backed up previous model to %s", backup_path)
|
|
|
|
# Save model
|
|
model = model.to("cpu")
|
|
torch.save(model, self._model_output_path)
|
|
log.info("Pet ID model saved to %s (accuracy: %.2f%%)",
|
|
self._model_output_path, self.status.accuracy * 100)
|
|
|
|
self.status.is_training = False
|
|
return True
|
|
|
|
except Exception as e:
|
|
log.exception("Training failed: %s", e)
|
|
self.status.error = str(e)
|
|
self.status.is_training = False
|
|
return False
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_trainer.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/detection/trainer.py tests/unit/test_trainer.py
|
|
git commit -m "Add pet ID model trainer with MobileNetV3-Small transfer learning"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 9: Update alert profiles for pet/wildlife detection types
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/alerts/profiles.py`
|
|
- Modify: `tests/unit/test_profiles.py`
|
|
|
|
- [ ] **Step 1: Write tests for new detection types in alert profiles**
|
|
|
|
Add to `tests/unit/test_profiles.py`:
|
|
|
|
```python
|
|
class TestPetAlertRouting:
|
|
def test_known_pet_exterior_gets_push(self):
|
|
rules = [
|
|
AlertProfileRule(detection_type="known_pet", camera_location="EXTERIOR",
|
|
action="push_and_record", recipients="all"),
|
|
AlertProfileRule(detection_type="known_pet", camera_location="INTERIOR",
|
|
action="quiet_log", recipients="none"),
|
|
]
|
|
profile = _make_profile("Away", ["EMPTY"], rules=rules)
|
|
action, recipients = get_action_for_event(profile, "known_pet", "front_entrance",
|
|
camera_location="EXTERIOR")
|
|
assert action == "push_and_record"
|
|
|
|
def test_known_pet_interior_gets_quiet_log(self):
|
|
rules = [
|
|
AlertProfileRule(detection_type="known_pet", camera_location="EXTERIOR",
|
|
action="push_and_record", recipients="all"),
|
|
AlertProfileRule(detection_type="known_pet", camera_location="INTERIOR",
|
|
action="quiet_log", recipients="none"),
|
|
]
|
|
profile = _make_profile("Home", ["ALL_HOME"], rules=rules)
|
|
action, recipients = get_action_for_event(profile, "known_pet", "kitchen",
|
|
camera_location="INTERIOR")
|
|
assert action == "quiet_log"
|
|
|
|
def test_wildlife_predator_gets_urgent(self):
|
|
rules = [
|
|
AlertProfileRule(detection_type="wildlife_predator",
|
|
action="push_and_record", recipients="all"),
|
|
]
|
|
profile = _make_profile("Always", ["EMPTY", "ALL_HOME"], rules=rules)
|
|
action, _ = get_action_for_event(profile, "wildlife_predator", "front",
|
|
camera_location="EXTERIOR")
|
|
assert action == "push_and_record"
|
|
|
|
def test_camera_location_any_matches_all(self):
|
|
rules = [
|
|
AlertProfileRule(detection_type="wildlife_predator", camera_location="any",
|
|
action="push_and_record", recipients="all"),
|
|
]
|
|
profile = _make_profile("Always", ["EMPTY"], rules=rules)
|
|
action, _ = get_action_for_event(profile, "wildlife_predator", "kitchen",
|
|
camera_location="INTERIOR")
|
|
assert action == "push_and_record"
|
|
|
|
def test_backward_compatible_without_camera_location(self):
|
|
"""Existing calls without camera_location still work."""
|
|
rules = [AlertProfileRule(detection_type="person", action="push_and_record", recipients="all")]
|
|
profile = _make_profile("Away", ["EMPTY"], rules=rules)
|
|
action, _ = get_action_for_event(profile, "person", "front_door")
|
|
assert action == "push_and_record"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_profiles.py::TestPetAlertRouting -v`
|
|
Expected: FAIL — get_action_for_event doesn't accept camera_location parameter
|
|
|
|
- [ ] **Step 3: Update get_action_for_event to support camera_location**
|
|
|
|
Modify `vigilar/alerts/profiles.py`. Update the `get_action_for_event` function:
|
|
|
|
```python
|
|
def get_action_for_event(
|
|
profile: AlertProfileConfig,
|
|
detection_type: str,
|
|
camera_id: str,
|
|
camera_location: str | None = None,
|
|
) -> tuple[str, str]:
|
|
for rule in profile.rules:
|
|
if rule.detection_type != detection_type:
|
|
continue
|
|
# Check camera_location match
|
|
if rule.camera_location not in ("any", camera_id):
|
|
# Also check against the camera's location type
|
|
if camera_location and rule.camera_location != camera_location:
|
|
continue
|
|
elif not camera_location and rule.camera_location not in ("any", camera_id):
|
|
continue
|
|
return rule.action, rule.recipients
|
|
return "quiet_log", "none"
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_profiles.py -v`
|
|
Expected: PASS (all existing and new tests)
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/alerts/profiles.py tests/unit/test_profiles.py
|
|
git commit -m "Add camera_location filtering to alert profile matching"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 10: Update event processor for pet/wildlife events
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/events/processor.py`
|
|
- Modify: `tests/unit/test_events.py`
|
|
|
|
- [ ] **Step 1: Write tests for pet/wildlife event classification**
|
|
|
|
Add to `tests/unit/test_events.py`:
|
|
|
|
```python
|
|
class TestPetEventClassification:
|
|
def test_pet_detected_event(self):
|
|
from vigilar.events.processor import EventProcessor
|
|
from vigilar.constants import EventType, Severity
|
|
processor = EventProcessor.__new__(EventProcessor)
|
|
etype, sev, source = processor._classify_event(
|
|
"vigilar/camera/kitchen/pet/detected",
|
|
{"pet_name": "Angel", "confidence": 0.92},
|
|
)
|
|
assert etype == EventType.PET_DETECTED
|
|
assert sev == Severity.INFO
|
|
assert source == "kitchen"
|
|
|
|
def test_wildlife_predator_event(self):
|
|
from vigilar.events.processor import EventProcessor
|
|
from vigilar.constants import EventType, Severity
|
|
processor = EventProcessor.__new__(EventProcessor)
|
|
etype, sev, source = processor._classify_event(
|
|
"vigilar/camera/front/wildlife/detected",
|
|
{"species": "bear", "threat_level": "PREDATOR"},
|
|
)
|
|
assert etype == EventType.WILDLIFE_PREDATOR
|
|
assert sev == Severity.CRITICAL
|
|
assert source == "front"
|
|
|
|
def test_wildlife_nuisance_event(self):
|
|
from vigilar.events.processor import EventProcessor
|
|
from vigilar.constants import EventType, Severity
|
|
processor = EventProcessor.__new__(EventProcessor)
|
|
etype, sev, source = processor._classify_event(
|
|
"vigilar/camera/back/wildlife/detected",
|
|
{"species": "raccoon", "threat_level": "NUISANCE"},
|
|
)
|
|
assert etype == EventType.WILDLIFE_NUISANCE
|
|
assert sev == Severity.WARNING
|
|
assert source == "back"
|
|
|
|
def test_wildlife_passive_event(self):
|
|
from vigilar.events.processor import EventProcessor
|
|
from vigilar.constants import EventType, Severity
|
|
processor = EventProcessor.__new__(EventProcessor)
|
|
etype, sev, source = processor._classify_event(
|
|
"vigilar/camera/front/wildlife/detected",
|
|
{"species": "deer", "threat_level": "PASSIVE"},
|
|
)
|
|
assert etype == EventType.WILDLIFE_PASSIVE
|
|
assert sev == Severity.INFO
|
|
assert source == "front"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_events.py::TestPetEventClassification -v`
|
|
Expected: FAIL — pet/wildlife topics not handled in _classify_event
|
|
|
|
- [ ] **Step 3: Update event processor**
|
|
|
|
In `vigilar/events/processor.py`, update `_classify_event` to handle the new topics. Add these cases in the camera section (after the existing `_TOPIC_EVENT_MAP` check, around line 130):
|
|
|
|
```python
|
|
# Pet detection
|
|
if suffix == "pet/detected":
|
|
return EventType.PET_DETECTED, Severity.INFO, camera_id
|
|
|
|
# Wildlife detection — severity depends on threat_level in payload
|
|
if suffix == "wildlife/detected":
|
|
threat = payload.get("threat_level", "PASSIVE")
|
|
if threat == "PREDATOR":
|
|
return EventType.WILDLIFE_PREDATOR, Severity.CRITICAL, camera_id
|
|
elif threat == "NUISANCE":
|
|
return EventType.WILDLIFE_NUISANCE, Severity.WARNING, camera_id
|
|
else:
|
|
return EventType.WILDLIFE_PASSIVE, Severity.INFO, camera_id
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_events.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/events/processor.py tests/unit/test_events.py
|
|
git commit -m "Handle pet and wildlife events in event processor"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 11: Integrate detection into camera worker
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/camera/worker.py`
|
|
|
|
This task integrates the YOLOv8 detector and pet ID classifier into the camera frame loop. This is the core wiring task — no new tests because it requires live camera/MQTT integration testing, but the underlying components (Tasks 5-7) are individually tested.
|
|
|
|
- [ ] **Step 1: Add imports to worker.py**
|
|
|
|
Add at the top of `vigilar/camera/worker.py` after existing imports:
|
|
|
|
```python
|
|
from vigilar.detection.yolo import YOLODetector
|
|
from vigilar.detection.pet_id import PetIDClassifier
|
|
from vigilar.detection.wildlife import classify_wildlife_threat
|
|
```
|
|
|
|
- [ ] **Step 2: Add detector initialization in run_camera_worker**
|
|
|
|
Add after the existing component setup (around line 90, after HLS and recorder init), before the main loop:
|
|
|
|
```python
|
|
# Object detection (YOLOv8 unified detector)
|
|
yolo_detector = None
|
|
pet_classifier = None
|
|
if hasattr(camera_cfg, '_pets_config') and camera_cfg._pets_config.enabled:
|
|
pets_cfg = camera_cfg._pets_config
|
|
yolo_detector = YOLODetector(
|
|
model_path=pets_cfg.model_path,
|
|
confidence_threshold=pets_cfg.confidence_threshold,
|
|
)
|
|
if pets_cfg.pet_id_enabled:
|
|
pet_classifier = PetIDClassifier(
|
|
model_path=pets_cfg.pet_id_model_path,
|
|
high_threshold=pets_cfg.pet_id_threshold,
|
|
low_threshold=pets_cfg.pet_id_low_confidence,
|
|
)
|
|
```
|
|
|
|
Note: The `_pets_config` attribute will be set by the camera manager when spawning workers. This avoids changing the CameraConfig model's serialization.
|
|
|
|
- [ ] **Step 3: Add detection processing after motion detection**
|
|
|
|
Add after the motion START/END block (after line 244, before the heartbeat), inside the main loop:
|
|
|
|
```python
|
|
# Run object detection on motion frames
|
|
if state.motion_active and yolo_detector and yolo_detector.is_loaded:
|
|
detections = yolo_detector.detect(frame)
|
|
for det in detections:
|
|
category = YOLODetector.classify(det)
|
|
if category == "person":
|
|
bus.publish_event(
|
|
Topics.camera_motion_start(camera_id), # reuse existing topic
|
|
detection="person", confidence=det.confidence,
|
|
)
|
|
elif category == "domestic_animal":
|
|
# Crop for pet ID
|
|
x, y, w, h = det.bbox
|
|
crop = frame[max(0, y):y + h, max(0, x):x + w]
|
|
pet_result = None
|
|
if pet_classifier and pet_classifier.is_loaded and crop.size > 0:
|
|
pet_result = pet_classifier.identify(crop, species=det.class_name)
|
|
|
|
payload = {
|
|
"species": det.class_name,
|
|
"confidence": round(det.confidence, 3),
|
|
"camera_location": camera_cfg.location,
|
|
}
|
|
if pet_result and pet_result.is_identified:
|
|
payload["pet_id"] = pet_result.pet_id
|
|
payload["pet_name"] = pet_result.pet_name
|
|
payload["pet_confidence"] = round(pet_result.confidence, 3)
|
|
# Publish pet location update
|
|
bus.publish_event(
|
|
Topics.pet_location(pet_result.pet_name.lower()),
|
|
camera_id=camera_id,
|
|
camera_location=camera_cfg.location,
|
|
)
|
|
|
|
bus.publish_event(Topics.camera_pet_detected(camera_id), **payload)
|
|
|
|
elif category == "wildlife":
|
|
wildlife_cfg = camera_cfg._pets_config.wildlife if hasattr(camera_cfg, '_pets_config') else None
|
|
if wildlife_cfg:
|
|
frame_area = frame.shape[0] * frame.shape[1]
|
|
threat_level, species = classify_wildlife_threat(
|
|
det, wildlife_cfg, frame_area,
|
|
)
|
|
bus.publish_event(
|
|
Topics.camera_wildlife_detected(camera_id),
|
|
species=species,
|
|
threat_level=threat_level,
|
|
confidence=round(det.confidence, 3),
|
|
camera_location=camera_cfg.location,
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 4: Update run_camera_worker signature to accept pets config**
|
|
|
|
Update the function signature to pass pets config through:
|
|
|
|
```python
|
|
def run_camera_worker(
|
|
camera_cfg: CameraConfig,
|
|
mqtt_cfg: MQTTConfig,
|
|
recordings_dir: str,
|
|
hls_dir: str,
|
|
remote_cfg: RemoteConfig | None = None,
|
|
pets_cfg: "PetsConfig | None" = None,
|
|
) -> None:
|
|
```
|
|
|
|
And replace the `hasattr` check with:
|
|
|
|
```python
|
|
# Object detection (YOLOv8 unified detector)
|
|
yolo_detector = None
|
|
pet_classifier = None
|
|
if pets_cfg and pets_cfg.enabled:
|
|
yolo_detector = YOLODetector(
|
|
model_path=pets_cfg.model_path,
|
|
confidence_threshold=pets_cfg.confidence_threshold,
|
|
)
|
|
if pets_cfg.pet_id_enabled:
|
|
pet_classifier = PetIDClassifier(
|
|
model_path=pets_cfg.pet_id_model_path,
|
|
high_threshold=pets_cfg.pet_id_threshold,
|
|
low_threshold=pets_cfg.pet_id_low_confidence,
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/camera/worker.py
|
|
git commit -m "Integrate YOLOv8 detection and pet ID into camera worker"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 12: Add crop saving and staging cleanup
|
|
|
|
**Files:**
|
|
- Create: `vigilar/detection/crop_manager.py`
|
|
- Test: `tests/unit/test_crop_manager.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for crop manager**
|
|
|
|
```python
|
|
# tests/unit/test_crop_manager.py
|
|
"""Tests for detection crop saving and staging cleanup."""
|
|
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
from vigilar.detection.crop_manager import CropManager
|
|
|
|
|
|
class TestCropManager:
|
|
def test_save_crop(self, tmp_path):
|
|
manager = CropManager(staging_dir=str(tmp_path / "staging"),
|
|
training_dir=str(tmp_path / "training"))
|
|
crop = np.zeros((100, 80, 3), dtype=np.uint8)
|
|
path = manager.save_staging_crop(crop, species="cat", camera_id="kitchen")
|
|
assert Path(path).exists()
|
|
assert "cat" in path
|
|
assert "kitchen" in path
|
|
|
|
def test_promote_to_training(self, tmp_path):
|
|
manager = CropManager(staging_dir=str(tmp_path / "staging"),
|
|
training_dir=str(tmp_path / "training"))
|
|
crop = np.zeros((100, 80, 3), dtype=np.uint8)
|
|
staging_path = manager.save_staging_crop(crop, species="cat", camera_id="kitchen")
|
|
training_path = manager.promote_to_training(staging_path, pet_name="angel")
|
|
assert Path(training_path).exists()
|
|
assert "angel" in training_path
|
|
assert not Path(staging_path).exists()
|
|
|
|
def test_cleanup_old_crops(self, tmp_path):
|
|
staging = tmp_path / "staging"
|
|
staging.mkdir(parents=True)
|
|
|
|
# Create an old file
|
|
old_file = staging / "old_crop.jpg"
|
|
old_file.write_bytes(b"fake")
|
|
# Set mtime to 10 days ago
|
|
old_time = time.time() - 10 * 86400
|
|
import os
|
|
os.utime(old_file, (old_time, old_time))
|
|
|
|
# Create a recent file
|
|
new_file = staging / "new_crop.jpg"
|
|
new_file.write_bytes(b"fake")
|
|
|
|
manager = CropManager(staging_dir=str(staging), training_dir=str(tmp_path / "training"))
|
|
deleted = manager.cleanup_expired(retention_days=7)
|
|
assert deleted == 1
|
|
assert not old_file.exists()
|
|
assert new_file.exists()
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_crop_manager.py -v`
|
|
Expected: FAIL — module not found
|
|
|
|
- [ ] **Step 3: Implement crop manager**
|
|
|
|
```python
|
|
# vigilar/detection/crop_manager.py
|
|
"""Manage detection crop images for training and staging."""
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class CropManager:
|
|
def __init__(self, staging_dir: str, training_dir: str):
|
|
self._staging_dir = Path(staging_dir)
|
|
self._training_dir = Path(training_dir)
|
|
|
|
def save_staging_crop(self, crop: np.ndarray, species: str, camera_id: str) -> str:
|
|
self._staging_dir.mkdir(parents=True, exist_ok=True)
|
|
timestamp = int(time.time() * 1000)
|
|
filename = f"{species}_{camera_id}_{timestamp}.jpg"
|
|
filepath = self._staging_dir / filename
|
|
cv2.imwrite(str(filepath), crop)
|
|
return str(filepath)
|
|
|
|
def promote_to_training(self, staging_path: str, pet_name: str) -> str:
|
|
pet_dir = self._training_dir / pet_name.lower()
|
|
pet_dir.mkdir(parents=True, exist_ok=True)
|
|
src = Path(staging_path)
|
|
dst = pet_dir / src.name
|
|
shutil.move(str(src), str(dst))
|
|
return str(dst)
|
|
|
|
def cleanup_expired(self, retention_days: int = 7) -> int:
|
|
if not self._staging_dir.exists():
|
|
return 0
|
|
|
|
cutoff = time.time() - retention_days * 86400
|
|
deleted = 0
|
|
for filepath in self._staging_dir.iterdir():
|
|
if filepath.is_file() and filepath.stat().st_mtime < cutoff:
|
|
filepath.unlink()
|
|
deleted += 1
|
|
|
|
if deleted:
|
|
log.info("Cleaned up %d expired staging crops", deleted)
|
|
return deleted
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_crop_manager.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/detection/crop_manager.py tests/unit/test_crop_manager.py
|
|
git commit -m "Add crop manager for staging and training image lifecycle"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 13: Update daily digest with pet/wildlife summary
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/health/digest.py`
|
|
- Modify: `tests/unit/test_health.py`
|
|
|
|
- [ ] **Step 1: Write tests for pet digest**
|
|
|
|
Add to `tests/unit/test_health.py`:
|
|
|
|
```python
|
|
from vigilar.health.digest import build_digest, format_digest
|
|
from vigilar.storage.schema import pet_sightings, wildlife_sightings
|
|
|
|
|
|
class TestPetDigest:
|
|
def test_digest_includes_pet_sightings(self, test_db, tmp_data_dir):
|
|
import time
|
|
with test_db.begin() as conn:
|
|
conn.execute(pet_sightings.insert().values(
|
|
ts=time.time(), pet_id="p1", species="cat",
|
|
camera_id="kitchen", confidence=0.9, labeled=1,
|
|
))
|
|
conn.execute(pet_sightings.insert().values(
|
|
ts=time.time(), pet_id="p2", species="dog",
|
|
camera_id="kitchen", confidence=0.85, labeled=1,
|
|
))
|
|
data = build_digest(test_db, str(tmp_data_dir), since_hours=1)
|
|
assert data["pet_sightings"] == 2
|
|
|
|
def test_digest_includes_wildlife(self, test_db, tmp_data_dir):
|
|
import time
|
|
with test_db.begin() as conn:
|
|
conn.execute(wildlife_sightings.insert().values(
|
|
ts=time.time(), species="bear", threat_level="PREDATOR",
|
|
camera_id="front", confidence=0.9,
|
|
))
|
|
conn.execute(wildlife_sightings.insert().values(
|
|
ts=time.time(), species="deer", threat_level="PASSIVE",
|
|
camera_id="back", confidence=0.8,
|
|
))
|
|
data = build_digest(test_db, str(tmp_data_dir), since_hours=1)
|
|
assert data["wildlife_predators"] == 1
|
|
assert data["wildlife_passive"] == 1
|
|
|
|
def test_format_includes_pets(self):
|
|
data = {
|
|
"person_detections": 2, "unknown_vehicles": 0,
|
|
"recordings": 5, "disk_used_gb": 100.0, "disk_used_pct": 50,
|
|
"since_hours": 12, "pet_sightings": 15,
|
|
"wildlife_predators": 0, "wildlife_nuisance": 1, "wildlife_passive": 3,
|
|
}
|
|
text = format_digest(data)
|
|
assert "15 pet" in text
|
|
assert "1 nuisance" in text.lower() or "wildlife" in text.lower()
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_health.py::TestPetDigest -v`
|
|
Expected: FAIL — build_digest doesn't query pet tables
|
|
|
|
- [ ] **Step 3: Update digest.py**
|
|
|
|
Update imports in `vigilar/health/digest.py`:
|
|
|
|
```python
|
|
from vigilar.storage.schema import events, recordings, pet_sightings, wildlife_sightings
|
|
```
|
|
|
|
Add pet/wildlife queries to `build_digest` (inside the `with engine.connect()` block, after the recording_count query):
|
|
|
|
```python
|
|
pet_count = conn.execute(
|
|
select(func.count()).select_from(pet_sightings)
|
|
.where(pet_sightings.c.ts >= since_ts / 1000)
|
|
).scalar() or 0
|
|
|
|
wildlife_predator_count = conn.execute(
|
|
select(func.count()).select_from(wildlife_sightings)
|
|
.where(wildlife_sightings.c.ts >= since_ts / 1000,
|
|
wildlife_sightings.c.threat_level == "PREDATOR")
|
|
).scalar() or 0
|
|
|
|
wildlife_nuisance_count = conn.execute(
|
|
select(func.count()).select_from(wildlife_sightings)
|
|
.where(wildlife_sightings.c.ts >= since_ts / 1000,
|
|
wildlife_sightings.c.threat_level == "NUISANCE")
|
|
).scalar() or 0
|
|
|
|
wildlife_passive_count = conn.execute(
|
|
select(func.count()).select_from(wildlife_sightings)
|
|
.where(wildlife_sightings.c.ts >= since_ts / 1000,
|
|
wildlife_sightings.c.threat_level == "PASSIVE")
|
|
).scalar() or 0
|
|
```
|
|
|
|
Add to the return dict:
|
|
|
|
```python
|
|
"pet_sightings": pet_count,
|
|
"wildlife_predators": wildlife_predator_count,
|
|
"wildlife_nuisance": wildlife_nuisance_count,
|
|
"wildlife_passive": wildlife_passive_count,
|
|
```
|
|
|
|
Update `format_digest`:
|
|
|
|
```python
|
|
def format_digest(data: dict) -> str:
|
|
lines = [
|
|
f"Vigilar Daily Summary",
|
|
f"Last {data['since_hours']}h: "
|
|
f"{data['person_detections']} person detections, "
|
|
f"{data['unknown_vehicles']} unknown vehicles, "
|
|
f"{data['recordings']} recordings",
|
|
]
|
|
if data.get("pet_sightings", 0) > 0:
|
|
lines.append(f"Pets: {data['pet_sightings']} pet sightings")
|
|
wildlife_parts = []
|
|
if data.get("wildlife_predators", 0) > 0:
|
|
wildlife_parts.append(f"{data['wildlife_predators']} predator")
|
|
if data.get("wildlife_nuisance", 0) > 0:
|
|
wildlife_parts.append(f"{data['wildlife_nuisance']} nuisance")
|
|
if data.get("wildlife_passive", 0) > 0:
|
|
wildlife_parts.append(f"{data['wildlife_passive']} passive")
|
|
if wildlife_parts:
|
|
lines.append(f"Wildlife: {', '.join(wildlife_parts)}")
|
|
lines.append(f"Storage: {data['disk_used_gb']} GB ({data['disk_used_pct']:.0f}%)")
|
|
return "\n".join(lines)
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_health.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add vigilar/health/digest.py tests/unit/test_health.py
|
|
git commit -m "Add pet and wildlife counts to daily digest"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 14: Add pets web blueprint — API endpoints
|
|
|
|
**Files:**
|
|
- Create: `vigilar/web/blueprints/pets.py`
|
|
- Test: `tests/unit/test_pets_api.py` (new)
|
|
|
|
- [ ] **Step 1: Write tests for pets API**
|
|
|
|
```python
|
|
# tests/unit/test_pets_api.py
|
|
"""Tests for pets web blueprint API endpoints."""
|
|
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from vigilar.web.app import create_app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_db, tmp_path, sample_config):
|
|
app = create_app(sample_config, db_engine=test_db)
|
|
app.config["TESTING"] = True
|
|
with app.test_client() as client:
|
|
yield client
|
|
|
|
|
|
class TestPetsAPI:
|
|
def test_register_pet(self, client):
|
|
resp = client.post("/pets/register", json={
|
|
"name": "Angel", "species": "cat", "breed": "DSH",
|
|
"color_description": "black",
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["name"] == "Angel"
|
|
assert "id" in data
|
|
|
|
def test_get_pet_status(self, client):
|
|
client.post("/pets/register", json={"name": "Angel", "species": "cat"})
|
|
resp = client.get("/pets/api/status")
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert len(data["pets"]) == 1
|
|
assert data["pets"][0]["name"] == "Angel"
|
|
|
|
def test_get_sightings_empty(self, client):
|
|
resp = client.get("/pets/api/sightings")
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["sightings"] == []
|
|
|
|
def test_get_wildlife_empty(self, client):
|
|
resp = client.get("/pets/api/wildlife")
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["sightings"] == []
|
|
|
|
def test_get_unlabeled_empty(self, client):
|
|
resp = client.get("/pets/api/unlabeled")
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["crops"] == []
|
|
|
|
def test_label_sighting(self, client):
|
|
# Register a pet
|
|
resp = client.post("/pets/register", json={"name": "Angel", "species": "cat"})
|
|
pet_id = resp.get_json()["id"]
|
|
|
|
# Insert a sighting directly (simulating detection)
|
|
from vigilar.storage.queries import insert_pet_sighting
|
|
from flask import current_app
|
|
# Use the test_db through the app
|
|
with client.application.app_context():
|
|
engine = client.application.extensions.get("db_engine")
|
|
if engine:
|
|
sighting_id = insert_pet_sighting(engine, species="cat",
|
|
camera_id="kitchen", confidence=0.6)
|
|
|
|
resp = client.post(f"/pets/{pet_id}/label", json={"sighting_id": sighting_id})
|
|
assert resp.status_code == 200
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `pytest tests/unit/test_pets_api.py -v`
|
|
Expected: FAIL — pets blueprint not registered
|
|
|
|
- [ ] **Step 3: Implement pets blueprint**
|
|
|
|
```python
|
|
# vigilar/web/blueprints/pets.py
|
|
"""Pets blueprint — pet management, training, sightings, and dashboard."""
|
|
|
|
import logging
|
|
|
|
from flask import Blueprint, jsonify, render_template, request
|
|
|
|
from vigilar.storage.queries import (
|
|
get_all_pets,
|
|
get_pet,
|
|
get_pet_last_location,
|
|
get_pet_sightings,
|
|
get_training_images,
|
|
get_unlabeled_sightings,
|
|
get_wildlife_sightings,
|
|
insert_pet,
|
|
insert_training_image,
|
|
label_sighting,
|
|
)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
pets_bp = Blueprint("pets", __name__, url_prefix="/pets")
|
|
|
|
|
|
def _get_engine():
|
|
from flask import current_app
|
|
return current_app.extensions["db_engine"]
|
|
|
|
|
|
@pets_bp.route("/")
|
|
def pets_dashboard():
|
|
engine = _get_engine()
|
|
all_pets = get_all_pets(engine)
|
|
return render_template("pets/dashboard.html", pets=all_pets)
|
|
|
|
|
|
@pets_bp.route("/register", methods=["POST"])
|
|
def register_pet():
|
|
data = request.get_json()
|
|
engine = _get_engine()
|
|
pet_id = insert_pet(
|
|
engine,
|
|
name=data["name"],
|
|
species=data["species"],
|
|
breed=data.get("breed"),
|
|
color_description=data.get("color_description"),
|
|
)
|
|
pet = get_pet(engine, pet_id)
|
|
return jsonify(pet)
|
|
|
|
|
|
@pets_bp.route("/api/status")
|
|
def pet_status():
|
|
engine = _get_engine()
|
|
all_pets = get_all_pets(engine)
|
|
result = []
|
|
for pet in all_pets:
|
|
loc = get_pet_last_location(engine, pet["id"])
|
|
result.append({
|
|
**pet,
|
|
"last_camera": loc["camera_id"] if loc else None,
|
|
"last_seen": loc["ts"] if loc else None,
|
|
})
|
|
return jsonify({"pets": result})
|
|
|
|
|
|
@pets_bp.route("/api/sightings")
|
|
def sightings():
|
|
engine = _get_engine()
|
|
pet_id = request.args.get("pet_id")
|
|
camera_id = request.args.get("camera_id")
|
|
limit = int(request.args.get("limit", 100))
|
|
rows = get_pet_sightings(engine, pet_id=pet_id, camera_id=camera_id, limit=limit)
|
|
return jsonify({"sightings": rows})
|
|
|
|
|
|
@pets_bp.route("/api/wildlife")
|
|
def wildlife():
|
|
engine = _get_engine()
|
|
threat_level = request.args.get("threat_level")
|
|
limit = int(request.args.get("limit", 100))
|
|
rows = get_wildlife_sightings(engine, threat_level=threat_level, limit=limit)
|
|
return jsonify({"sightings": rows})
|
|
|
|
|
|
@pets_bp.route("/api/unlabeled")
|
|
def unlabeled():
|
|
engine = _get_engine()
|
|
species = request.args.get("species")
|
|
limit = int(request.args.get("limit", 50))
|
|
rows = get_unlabeled_sightings(engine, species=species, limit=limit)
|
|
return jsonify({"crops": rows})
|
|
|
|
|
|
@pets_bp.route("/<pet_id>/label", methods=["POST"])
|
|
def label_pet(pet_id):
|
|
data = request.get_json()
|
|
engine = _get_engine()
|
|
sighting_id = data["sighting_id"]
|
|
label_sighting(engine, sighting_id, pet_id)
|
|
return jsonify({"status": "labeled"})
|
|
|
|
|
|
@pets_bp.route("/<pet_id>/upload", methods=["POST"])
|
|
def upload_training(pet_id):
|
|
engine = _get_engine()
|
|
pet = get_pet(engine, pet_id)
|
|
if not pet:
|
|
return jsonify({"error": "Pet not found"}), 404
|
|
|
|
files = request.files.getlist("images")
|
|
saved = []
|
|
for f in files:
|
|
if f.filename:
|
|
from werkzeug.utils import secure_filename
|
|
from flask import current_app
|
|
import os
|
|
pets_cfg = current_app.config.get("PETS_CONFIG")
|
|
training_dir = pets_cfg.training_dir if pets_cfg else "/var/vigilar/pets/training"
|
|
pet_dir = os.path.join(training_dir, pet["name"].lower())
|
|
os.makedirs(pet_dir, exist_ok=True)
|
|
filepath = os.path.join(pet_dir, secure_filename(f.filename))
|
|
f.save(filepath)
|
|
insert_training_image(engine, pet_id=pet_id, image_path=filepath, source="upload")
|
|
saved.append(filepath)
|
|
|
|
return jsonify({"uploaded": len(saved)})
|
|
|
|
|
|
@pets_bp.route("/train", methods=["POST"])
|
|
def train_model():
|
|
from flask import current_app
|
|
pets_cfg = current_app.config.get("PETS_CONFIG")
|
|
if not pets_cfg:
|
|
return jsonify({"error": "Pet detection not configured"}), 400
|
|
|
|
from vigilar.detection.trainer import PetTrainer
|
|
trainer = PetTrainer(
|
|
training_dir=pets_cfg.training_dir,
|
|
model_output_path=pets_cfg.pet_id_model_path,
|
|
)
|
|
|
|
ready, msg = trainer.check_readiness(min_images=pets_cfg.min_training_images)
|
|
if not ready:
|
|
return jsonify({"error": msg}), 400
|
|
|
|
# Run training in background thread
|
|
import threading
|
|
thread = threading.Thread(target=trainer.train, daemon=True)
|
|
thread.start()
|
|
|
|
return jsonify({"status": "training_started", "message": msg})
|
|
|
|
|
|
@pets_bp.route("/api/training-status")
|
|
def training_status():
|
|
# Training status is ephemeral — stored in the trainer instance
|
|
return jsonify({"status": "idle", "message": "No training in progress"})
|
|
|
|
|
|
@pets_bp.route("/api/highlights")
|
|
def highlights():
|
|
# Highlight reel — computed from sightings + recordings
|
|
engine = _get_engine()
|
|
import time
|
|
since = time.time() - 86400 # last 24h
|
|
sightings = get_pet_sightings(engine, since_ts=since, limit=50)
|
|
|
|
highlights = []
|
|
for s in sightings:
|
|
if s.get("confidence", 0) > 0.8:
|
|
highlights.append({
|
|
"type": "pet_sighting",
|
|
"pet_id": s.get("pet_id"),
|
|
"camera_id": s["camera_id"],
|
|
"timestamp": s["ts"],
|
|
"confidence": s.get("confidence"),
|
|
})
|
|
|
|
return jsonify({"highlights": highlights[:20]})
|
|
|
|
|
|
@pets_bp.route("/<pet_id>/update", methods=["POST"])
|
|
def update_pet(pet_id):
|
|
engine = _get_engine()
|
|
pet = get_pet(engine, pet_id)
|
|
if not pet:
|
|
return jsonify({"error": "Pet not found"}), 404
|
|
|
|
data = request.get_json()
|
|
from vigilar.storage.schema import pets as pets_table
|
|
with engine.begin() as conn:
|
|
updates = {}
|
|
for field in ("name", "breed", "color_description"):
|
|
if field in data:
|
|
updates[field] = data[field]
|
|
if updates:
|
|
conn.execute(
|
|
pets_table.update().where(pets_table.c.id == pet_id).values(**updates)
|
|
)
|
|
return jsonify(get_pet(engine, pet_id))
|
|
|
|
|
|
@pets_bp.route("/<pet_id>/delete", methods=["DELETE"])
|
|
def delete_pet(pet_id):
|
|
engine = _get_engine()
|
|
from vigilar.storage.schema import pets as pets_table
|
|
with engine.begin() as conn:
|
|
conn.execute(pets_table.delete().where(pets_table.c.id == pet_id))
|
|
return jsonify({"status": "deleted"})
|
|
```
|
|
|
|
- [ ] **Step 4: Register the blueprint in app.py**
|
|
|
|
Read `vigilar/web/app.py` and add the pets blueprint registration alongside the existing blueprints:
|
|
|
|
```python
|
|
from vigilar.web.blueprints.pets import pets_bp
|
|
app.register_blueprint(pets_bp)
|
|
```
|
|
|
|
Also store the db engine in app extensions for the blueprint to access:
|
|
|
|
```python
|
|
app.extensions["db_engine"] = db_engine
|
|
```
|
|
|
|
- [ ] **Step 5: Run tests to verify they pass**
|
|
|
|
Run: `pytest tests/unit/test_pets_api.py -v`
|
|
Expected: PASS
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add vigilar/web/blueprints/pets.py vigilar/web/app.py tests/unit/test_pets_api.py
|
|
git commit -m "Add pets web blueprint with API endpoints"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 15: Add pets dashboard template
|
|
|
|
**Files:**
|
|
- Create: `vigilar/web/templates/pets/dashboard.html`
|
|
|
|
This is a Flask/Jinja2 template using Bootstrap 5 dark theme, matching the existing UI patterns.
|
|
|
|
- [ ] **Step 1: Check existing template patterns**
|
|
|
|
Read `vigilar/web/templates/` to understand the base template structure, navbar, and layout conventions used by other pages.
|
|
|
|
- [ ] **Step 2: Create the pets dashboard template**
|
|
|
|
Create `vigilar/web/templates/pets/dashboard.html` following the existing template patterns. Include:
|
|
|
|
- Per-pet status cards (name, species, location, last seen, status indicator)
|
|
- Wildlife summary bar
|
|
- Activity timeline (24-hour bars per pet)
|
|
- Unlabeled detection queue (thumbnail grid)
|
|
- Pet management section (register, upload photos, train model)
|
|
- Highlight reel section
|
|
|
|
Use Bootstrap 5 dark theme classes, match existing card/grid patterns from other templates. Use `fetch()` to load data from the `/pets/api/*` endpoints.
|
|
|
|
- [ ] **Step 3: Verify template renders**
|
|
|
|
Run the Flask dev server and check the `/pets/` route loads without errors. Verify the dark theme matches the rest of the UI.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add vigilar/web/templates/pets/dashboard.html
|
|
git commit -m "Add pets dashboard template with Bootstrap 5 dark theme"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 16: Add pet labeling UI to recordings playback
|
|
|
|
**Files:**
|
|
- Modify: `vigilar/web/templates/recordings/` (playback template)
|
|
- Modify: `vigilar/web/static/js/` (add pet labeling JS)
|
|
|
|
- [ ] **Step 1: Add labeling overlay to recording playback**
|
|
|
|
Add a JavaScript module that:
|
|
- Renders bounding box overlays on the video player when detection data is available
|
|
- Shows a "Who is this?" popup when clicking an animal bounding box
|
|
- Fetches registered pets filtered by species from `/pets/api/status`
|
|
- Calls `/pets/<id>/label` when user selects a pet
|
|
- Shows "Unknown" and "+ New Pet" options
|
|
|
|
- [ ] **Step 2: Add CSS for bounding box overlays**
|
|
|
|
Add styles to the existing CSS for:
|
|
- `.detection-overlay` — positioned over video, pointer-events transparent
|
|
- `.detection-bbox` — colored border with species/confidence label
|
|
- `.label-popup` — dark-themed dropdown for pet selection
|
|
|
|
- [ ] **Step 3: Test the labeling flow manually**
|
|
|
|
Verify that clicking a bounding box shows the popup, selecting a pet calls the API, and the popup dismisses.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add vigilar/web/templates/ vigilar/web/static/
|
|
git commit -m "Add pet labeling UI overlay to recording playback"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 17: Add dependencies to pyproject.toml
|
|
|
|
**Files:**
|
|
- Modify: `pyproject.toml`
|
|
|
|
- [ ] **Step 1: Add ultralytics and torchvision**
|
|
|
|
Add to the dependencies section:
|
|
|
|
```toml
|
|
ultralytics = ">=8.2.0"
|
|
torchvision = ">=0.18.0"
|
|
```
|
|
|
|
- [ ] **Step 2: Install and verify**
|
|
|
|
Run: `pip install -e ".[dev]"`
|
|
Expected: Installs successfully with new dependencies
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add pyproject.toml
|
|
git commit -m "Add ultralytics and torchvision dependencies for pet detection"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 18: Run full test suite and fix any issues
|
|
|
|
**Files:** All modified files
|
|
|
|
- [ ] **Step 1: Run complete test suite**
|
|
|
|
Run: `pytest tests/ -v --tb=short`
|
|
Expected: All tests pass
|
|
|
|
- [ ] **Step 2: Run ruff linter**
|
|
|
|
Run: `ruff check vigilar/`
|
|
Expected: No errors (fix any that appear)
|
|
|
|
- [ ] **Step 3: Run type checker**
|
|
|
|
Run: `mypy vigilar/ --ignore-missing-imports`
|
|
Expected: No new errors
|
|
|
|
- [ ] **Step 4: Fix any issues found**
|
|
|
|
Address any test failures, lint errors, or type errors. Commit fixes individually.
|
|
|
|
- [ ] **Step 5: Final commit**
|
|
|
|
```bash
|
|
git commit -m "Fix lint and type issues from pet detection integration"
|
|
```
|
|
|
|
---
|
|
|
|
## File Structure Summary
|
|
|
|
### New Files
|
|
| File | Purpose |
|
|
|------|---------|
|
|
| `vigilar/detection/yolo.py` | YOLOv8 unified detector (replaces MobileNet) |
|
|
| `vigilar/detection/wildlife.py` | Wildlife threat classification |
|
|
| `vigilar/detection/pet_id.py` | Pet identification classifier |
|
|
| `vigilar/detection/trainer.py` | Pet ID model trainer (MobileNetV3-Small) |
|
|
| `vigilar/detection/crop_manager.py` | Staging/training crop lifecycle |
|
|
| `vigilar/web/blueprints/pets.py` | Pets web blueprint (API + views) |
|
|
| `vigilar/web/templates/pets/dashboard.html` | Pet dashboard UI |
|
|
| `tests/unit/test_constants.py` | Tests for new enums |
|
|
| `tests/unit/test_yolo_detector.py` | Tests for YOLO detector |
|
|
| `tests/unit/test_wildlife.py` | Tests for wildlife classification |
|
|
| `tests/unit/test_pet_id.py` | Tests for pet ID classifier |
|
|
| `tests/unit/test_trainer.py` | Tests for model trainer |
|
|
| `tests/unit/test_crop_manager.py` | Tests for crop manager |
|
|
| `tests/unit/test_pet_queries.py` | Tests for DB query functions |
|
|
| `tests/unit/test_pets_api.py` | Tests for pets API endpoints |
|
|
|
|
### Modified Files
|
|
| File | Changes |
|
|
|------|---------|
|
|
| `vigilar/constants.py` | Add ThreatLevel, CameraLocation enums; 6 EventType values; pet MQTT topics |
|
|
| `vigilar/config.py` | Add PetsConfig, WildlifeConfig, PetActivityConfig; location field on CameraConfig |
|
|
| `vigilar/storage/schema.py` | Add pets, pet_sightings, wildlife_sightings, pet_training_images tables |
|
|
| `vigilar/storage/queries.py` | Add pet/wildlife/training query functions |
|
|
| `vigilar/camera/worker.py` | Integrate YOLO detection + pet ID into frame loop |
|
|
| `vigilar/alerts/profiles.py` | Add camera_location filtering to get_action_for_event |
|
|
| `vigilar/events/processor.py` | Handle pet/wildlife event classification |
|
|
| `vigilar/health/digest.py` | Add pet/wildlife counts to daily digest |
|
|
| `vigilar/web/app.py` | Register pets blueprint |
|
|
| `pyproject.toml` | Add ultralytics, torchvision deps |
|
|
| `tests/unit/test_config.py` | Tests for new config models |
|
|
| `tests/unit/test_profiles.py` | Tests for pet alert routing |
|
|
| `tests/unit/test_events.py` | Tests for pet event classification |
|
|
| `tests/unit/test_health.py` | Tests for pet digest |
|