From 931b453ba96d49d2e1d8c916676433e996d51070 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Fri, 3 Apr 2026 18:52:38 -0400 Subject: [PATCH] feat(S5): pet rule engine with condition evaluation and cooldown --- tests/unit/test_pet_rules.py | 41 +++++++++++ vigilar/pets/__init__.py | 0 vigilar/pets/rules.py | 135 +++++++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+) create mode 100644 tests/unit/test_pet_rules.py create mode 100644 vigilar/pets/__init__.py create mode 100644 vigilar/pets/rules.py diff --git a/tests/unit/test_pet_rules.py b/tests/unit/test_pet_rules.py new file mode 100644 index 0000000..135a4ef --- /dev/null +++ b/tests/unit/test_pet_rules.py @@ -0,0 +1,41 @@ +import time +from vigilar.pets.rules import PetState, evaluate_condition, evaluate_rule + +def test_detected_in_zone_true(): + state = PetState(pet_id="milo", current_zone="EXTERIOR") + assert evaluate_condition({"type": "detected_in_zone", "zone": "EXTERIOR"}, state, time.time()) is True + +def test_detected_in_zone_false(): + state = PetState(pet_id="milo", current_zone="INTERIOR") + assert evaluate_condition({"type": "detected_in_zone", "zone": "EXTERIOR"}, state, time.time()) is False + +def test_not_seen_in_zone_true(): + now = time.time() + state = PetState(pet_id="milo", current_zone="INTERIOR", last_seen_time=now - 3600) + assert evaluate_condition({"type": "not_seen_in_zone", "zone": "EXTERIOR", "minutes": 30}, state, now) is True + +def test_not_seen_anywhere(): + now = time.time() + state = PetState(pet_id="milo", last_seen_time=now - 30000) + assert evaluate_condition({"type": "not_seen_anywhere", "minutes": 480}, state, now) is True + +def test_detected_without_person(): + assert evaluate_condition({"type": "detected_without_person"}, PetState(pet_id="snek", person_present=False), time.time()) is True + assert evaluate_condition({"type": "detected_without_person"}, PetState(pet_id="snek", person_present=True), time.time()) is False + +def test_in_zone_longer_than(): + now = time.time() + state = PetState(pet_id="milo", current_zone="EXTERIOR", zone_entry_time=now - 3000) + assert evaluate_condition({"type": "in_zone_longer_than", "zone": "EXTERIOR", "minutes": 45}, state, now) is True + +def test_evaluate_rule_and_logic(): + now = time.time() + state = PetState(pet_id="milo", current_zone="EXTERIOR", zone_entry_time=now - 3600, person_present=False) + rule = {"conditions": [{"type": "detected_in_zone", "zone": "EXTERIOR"}, {"type": "detected_without_person"}]} + assert evaluate_rule(rule, state, now) is True + +def test_evaluate_rule_one_fails(): + now = time.time() + state = PetState(pet_id="milo", current_zone="INTERIOR", person_present=False) + rule = {"conditions": [{"type": "detected_in_zone", "zone": "EXTERIOR"}, {"type": "detected_without_person"}]} + assert evaluate_rule(rule, state, now) is False diff --git a/vigilar/pets/__init__.py b/vigilar/pets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vigilar/pets/rules.py b/vigilar/pets/rules.py new file mode 100644 index 0000000..317725e --- /dev/null +++ b/vigilar/pets/rules.py @@ -0,0 +1,135 @@ +"""Pet rule engine — composable conditions with per-pet state tracking.""" + +import json +import logging +import time as time_mod +from dataclasses import dataclass + +from vigilar.alerts.profiles import is_in_time_window + +log = logging.getLogger(__name__) + + +@dataclass +class PetState: + pet_id: str + last_seen_camera: str | None = None + last_seen_time: float = 0 + current_zone: str | None = None + zone_entry_time: float = 0 + person_present: bool = False + person_last_seen: float = 0 + + +def evaluate_condition(condition: dict, pet_state: PetState, now: float) -> bool: + ctype = condition.get("type") + if ctype == "detected_in_zone": + return pet_state.current_zone == condition.get("zone") + elif ctype == "not_seen_in_zone": + if pet_state.current_zone == condition.get("zone"): + return False + return (now - pet_state.last_seen_time) >= condition.get("minutes", 0) * 60 + elif ctype == "not_seen_anywhere": + return (now - pet_state.last_seen_time) >= condition.get("minutes", 0) * 60 + elif ctype == "detected_without_person": + return not pet_state.person_present + elif ctype == "in_zone_longer_than": + if pet_state.current_zone != condition.get("zone"): + return False + return (now - pet_state.zone_entry_time) >= condition.get("minutes", 0) * 60 + elif ctype == "time_of_day": + start = condition.get("start", "00:00") + end = condition.get("end", "23:59") + current_time = time_mod.strftime("%H:%M") + return is_in_time_window(f"{start}-{end}", current_time) + return False + + +def evaluate_rule(rule: dict, pet_state: PetState, now: float) -> bool: + conditions = rule.get("conditions", []) + if isinstance(conditions, str): + conditions = json.loads(conditions) + for condition in conditions: + if not evaluate_condition(condition, pet_state, now): + return False + return True + + +def format_action_message(template: str, pet_state: PetState) -> str: + duration_s = time_mod.time() - pet_state.zone_entry_time if pet_state.zone_entry_time else 0 + duration_min = int(duration_s / 60) + duration_str = f"{duration_min // 60}h {duration_min % 60}m" if duration_min >= 60 else f"{duration_min} minutes" + last_seen_s = time_mod.time() - pet_state.last_seen_time if pet_state.last_seen_time else 0 + last_seen_min = int(last_seen_s / 60) + last_seen_str = f"{last_seen_min}m ago" if last_seen_min < 60 else f"{last_seen_min // 60}h ago" + camera = (pet_state.last_seen_camera or "unknown").replace("_", " ").title() + return template.format( + pet_name=pet_state.pet_id, camera=camera, + zone=pet_state.current_zone or "unknown", + duration=duration_str, last_seen=last_seen_str) + + +class PetRuleEngine: + def __init__(self): + self._pet_states: dict[str, PetState] = {} + self._rules: list[dict] = [] + self._last_fired: dict[int, float] = {} + + def load_rules(self, engine) -> None: + from vigilar.storage.queries import get_all_enabled_rules + self._rules = get_all_enabled_rules(engine) + log.info("Loaded %d pet rules", len(self._rules)) + + def get_or_create_state(self, pet_id: str) -> PetState: + if pet_id not in self._pet_states: + self._pet_states[pet_id] = PetState(pet_id=pet_id) + return self._pet_states[pet_id] + + def on_pet_detection(self, pet_id, camera_id, zone, now) -> list[dict]: + state = self.get_or_create_state(pet_id) + old_zone = state.current_zone + state.last_seen_camera = camera_id + state.last_seen_time = now + if zone != old_zone: + state.current_zone = zone + state.zone_entry_time = now + else: + state.current_zone = zone + return self._evaluate_rules_for_pet(pet_id, now) + + def on_person_detection(self, camera_id, now) -> None: + for state in self._pet_states.values(): + if state.last_seen_camera == camera_id: + state.person_present = True + state.person_last_seen = now + + def tick(self, now) -> list[dict]: + for state in self._pet_states.values(): + if state.person_present and now - state.person_last_seen > 30: + state.person_present = False + triggered = [] + for pet_id in self._pet_states: + triggered.extend(self._evaluate_rules_for_pet(pet_id, now)) + return triggered + + def _evaluate_rules_for_pet(self, pet_id, now) -> list[dict]: + state = self._pet_states.get(pet_id) + if not state: + return [] + triggered = [] + for rule in self._rules: + if rule["pet_id"] != pet_id: + continue + rule_id = rule["id"] + last = self._last_fired.get(rule_id, 0) + cooldown = rule.get("cooldown_minutes", 30) * 60 + if now - last < cooldown: + continue + if evaluate_rule(rule, state, now): + self._last_fired[rule_id] = now + message = format_action_message(rule.get("action_message", ""), state) + triggered.append({ + "rule_id": rule_id, "rule_name": rule["name"], "pet_id": pet_id, + "action": rule["action"], "message": message, + "camera_id": state.last_seen_camera}) + return triggered