feat(Q5): package delivery state machine with sunset-aware reminders

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee 2026-04-03 18:46:59 -04:00
parent 8bf7900324
commit 31757f410a
2 changed files with 138 additions and 0 deletions

View File

@ -0,0 +1,39 @@
import time
from vigilar.detection.package import PackageTracker, PackageState
def test_initial_state_is_idle():
tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0)
assert tracker.state == PackageState.IDLE
def test_person_detected_starts_tracking():
tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0)
tracker.on_person_detected(time.time())
assert tracker._person_last_seen > 0
def test_person_departed_then_package_detected():
tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0)
now = time.time()
tracker.on_person_detected(now - 60)
tracker._person_last_seen = now - 35
result = tracker.check_for_package(
detections=[{"class_id": 28, "bbox": [100, 200, 50, 50], "confidence": 0.7}], now=now)
assert result is True
assert tracker.state == PackageState.PRESENT
def test_package_collected():
tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0)
now = time.time()
tracker._state = PackageState.PRESENT
tracker._package_bbox = [100, 200, 50, 50]
tracker._detected_at = now - 3600
tracker.on_person_detected(now)
collected = tracker.check_collected(detections=[], now=now)
assert collected is True
assert tracker.state == PackageState.COLLECTED
def test_reminder_time_calculation():
tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0)
now = time.time()
reminder = tracker._calculate_reminder_time(now)
assert reminder >= now
assert reminder <= now + 3 * 3600 + 60

View File

@ -0,0 +1,99 @@
"""Package delivery detection state machine."""
import datetime
import logging
import time
from enum import StrEnum
from vigilar.detection.solar import get_sunset
log = logging.getLogger(__name__)
PACKAGE_CLASS_IDS = {24, 26, 28} # backpack, handbag, suitcase
class PackageState(StrEnum):
IDLE = "IDLE"
PRESENT = "PRESENT"
REMINDED = "REMINDED"
COLLECTED = "COLLECTED"
class PackageTracker:
def __init__(self, camera_id: str, latitude: float, longitude: float):
self.camera_id = camera_id
self._latitude = latitude
self._longitude = longitude
self._state = PackageState.IDLE
self._person_last_seen: float = 0
self._person_departure_threshold: float = 30.0
self._package_bbox: list[float] | None = None
self._detected_at: float = 0
self._reminder_at: float = 0
self._person_returned: bool = False
@property
def state(self) -> PackageState:
return self._state
def on_person_detected(self, now: float) -> None:
self._person_last_seen = now
if self._state in (PackageState.PRESENT, PackageState.REMINDED):
self._person_returned = True
def check_for_package(self, detections: list[dict], now: float) -> bool:
if self._state != PackageState.IDLE:
return False
if self._person_last_seen == 0:
return False
if now - self._person_last_seen < self._person_departure_threshold:
return False
for det in detections:
if det.get("class_id") in PACKAGE_CLASS_IDS:
self._state = PackageState.PRESENT
self._package_bbox = det.get("bbox")
self._detected_at = now
self._reminder_at = self._calculate_reminder_time(now)
self._person_returned = False
log.info("Package detected on %s", self.camera_id)
return True
return False
def check_collected(self, detections: list[dict], now: float) -> bool:
if self._state not in (PackageState.PRESENT, PackageState.REMINDED):
return False
if not self._person_returned:
return False
for det in detections:
if det.get("class_id") in PACKAGE_CLASS_IDS:
return False
self._state = PackageState.COLLECTED
return True
def check_reminder(self, now: float) -> bool:
if self._state != PackageState.PRESENT:
return False
if now >= self._reminder_at:
self._state = PackageState.REMINDED
return True
return False
def reset(self) -> None:
self._state = PackageState.IDLE
self._package_bbox = None
self._detected_at = 0
self._reminder_at = 0
self._person_returned = False
def _calculate_reminder_time(self, now: float) -> float:
three_hours = now + 3 * 3600
try:
today = datetime.date.today()
sunset_time = get_sunset(self._latitude, self._longitude, today)
sunset_dt = datetime.datetime.combine(today, sunset_time)
sunset_ts = sunset_dt.timestamp()
if sunset_ts < now:
sunset_ts += 86400
return min(sunset_ts, three_hours)
except Exception:
return three_hours