From 31757f410ae7ee83309cf8ec1a67d26bb9224cab Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Fri, 3 Apr 2026 18:46:59 -0400 Subject: [PATCH] feat(Q5): package delivery state machine with sunset-aware reminders Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/unit/test_package.py | 39 ++++++++++++++ vigilar/detection/package.py | 99 ++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 tests/unit/test_package.py create mode 100644 vigilar/detection/package.py diff --git a/tests/unit/test_package.py b/tests/unit/test_package.py new file mode 100644 index 0000000..a616d4b --- /dev/null +++ b/tests/unit/test_package.py @@ -0,0 +1,39 @@ +import time +from vigilar.detection.package import PackageTracker, PackageState + +def test_initial_state_is_idle(): + tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0) + assert tracker.state == PackageState.IDLE + +def test_person_detected_starts_tracking(): + tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0) + tracker.on_person_detected(time.time()) + assert tracker._person_last_seen > 0 + +def test_person_departed_then_package_detected(): + tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0) + now = time.time() + tracker.on_person_detected(now - 60) + tracker._person_last_seen = now - 35 + result = tracker.check_for_package( + detections=[{"class_id": 28, "bbox": [100, 200, 50, 50], "confidence": 0.7}], now=now) + assert result is True + assert tracker.state == PackageState.PRESENT + +def test_package_collected(): + tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0) + now = time.time() + tracker._state = PackageState.PRESENT + tracker._package_bbox = [100, 200, 50, 50] + tracker._detected_at = now - 3600 + tracker.on_person_detected(now) + collected = tracker.check_collected(detections=[], now=now) + assert collected is True + assert tracker.state == PackageState.COLLECTED + +def test_reminder_time_calculation(): + tracker = PackageTracker(camera_id="front", latitude=45.0, longitude=-85.0) + now = time.time() + reminder = tracker._calculate_reminder_time(now) + assert reminder >= now + assert reminder <= now + 3 * 3600 + 60 diff --git a/vigilar/detection/package.py b/vigilar/detection/package.py new file mode 100644 index 0000000..ec309c5 --- /dev/null +++ b/vigilar/detection/package.py @@ -0,0 +1,99 @@ +"""Package delivery detection state machine.""" + +import datetime +import logging +import time +from enum import StrEnum + +from vigilar.detection.solar import get_sunset + +log = logging.getLogger(__name__) + +PACKAGE_CLASS_IDS = {24, 26, 28} # backpack, handbag, suitcase + + +class PackageState(StrEnum): + IDLE = "IDLE" + PRESENT = "PRESENT" + REMINDED = "REMINDED" + COLLECTED = "COLLECTED" + + +class PackageTracker: + def __init__(self, camera_id: str, latitude: float, longitude: float): + self.camera_id = camera_id + self._latitude = latitude + self._longitude = longitude + self._state = PackageState.IDLE + self._person_last_seen: float = 0 + self._person_departure_threshold: float = 30.0 + self._package_bbox: list[float] | None = None + self._detected_at: float = 0 + self._reminder_at: float = 0 + self._person_returned: bool = False + + @property + def state(self) -> PackageState: + return self._state + + def on_person_detected(self, now: float) -> None: + self._person_last_seen = now + if self._state in (PackageState.PRESENT, PackageState.REMINDED): + self._person_returned = True + + def check_for_package(self, detections: list[dict], now: float) -> bool: + if self._state != PackageState.IDLE: + return False + if self._person_last_seen == 0: + return False + if now - self._person_last_seen < self._person_departure_threshold: + return False + for det in detections: + if det.get("class_id") in PACKAGE_CLASS_IDS: + self._state = PackageState.PRESENT + self._package_bbox = det.get("bbox") + self._detected_at = now + self._reminder_at = self._calculate_reminder_time(now) + self._person_returned = False + log.info("Package detected on %s", self.camera_id) + return True + return False + + def check_collected(self, detections: list[dict], now: float) -> bool: + if self._state not in (PackageState.PRESENT, PackageState.REMINDED): + return False + if not self._person_returned: + return False + for det in detections: + if det.get("class_id") in PACKAGE_CLASS_IDS: + return False + self._state = PackageState.COLLECTED + return True + + def check_reminder(self, now: float) -> bool: + if self._state != PackageState.PRESENT: + return False + if now >= self._reminder_at: + self._state = PackageState.REMINDED + return True + return False + + def reset(self) -> None: + self._state = PackageState.IDLE + self._package_bbox = None + self._detected_at = 0 + self._reminder_at = 0 + self._person_returned = False + + def _calculate_reminder_time(self, now: float) -> float: + three_hours = now + 3 * 3600 + try: + today = datetime.date.today() + sunset_time = get_sunset(self._latitude, self._longitude, today) + sunset_dt = datetime.datetime.combine(today, sunset_time) + sunset_ts = sunset_dt.timestamp() + if sunset_ts < now: + sunset_ts += 86400 + return min(sunset_ts, three_hours) + except Exception: + return three_hours