feat(S5): pet rule engine with condition evaluation and cooldown
This commit is contained in:
parent
fac51a7c8a
commit
931b453ba9
41
tests/unit/test_pet_rules.py
Normal file
41
tests/unit/test_pet_rules.py
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
import time
|
||||||
|
from vigilar.pets.rules import PetState, evaluate_condition, evaluate_rule
|
||||||
|
|
||||||
|
def test_detected_in_zone_true():
|
||||||
|
state = PetState(pet_id="milo", current_zone="EXTERIOR")
|
||||||
|
assert evaluate_condition({"type": "detected_in_zone", "zone": "EXTERIOR"}, state, time.time()) is True
|
||||||
|
|
||||||
|
def test_detected_in_zone_false():
|
||||||
|
state = PetState(pet_id="milo", current_zone="INTERIOR")
|
||||||
|
assert evaluate_condition({"type": "detected_in_zone", "zone": "EXTERIOR"}, state, time.time()) is False
|
||||||
|
|
||||||
|
def test_not_seen_in_zone_true():
|
||||||
|
now = time.time()
|
||||||
|
state = PetState(pet_id="milo", current_zone="INTERIOR", last_seen_time=now - 3600)
|
||||||
|
assert evaluate_condition({"type": "not_seen_in_zone", "zone": "EXTERIOR", "minutes": 30}, state, now) is True
|
||||||
|
|
||||||
|
def test_not_seen_anywhere():
|
||||||
|
now = time.time()
|
||||||
|
state = PetState(pet_id="milo", last_seen_time=now - 30000)
|
||||||
|
assert evaluate_condition({"type": "not_seen_anywhere", "minutes": 480}, state, now) is True
|
||||||
|
|
||||||
|
def test_detected_without_person():
|
||||||
|
assert evaluate_condition({"type": "detected_without_person"}, PetState(pet_id="snek", person_present=False), time.time()) is True
|
||||||
|
assert evaluate_condition({"type": "detected_without_person"}, PetState(pet_id="snek", person_present=True), time.time()) is False
|
||||||
|
|
||||||
|
def test_in_zone_longer_than():
|
||||||
|
now = time.time()
|
||||||
|
state = PetState(pet_id="milo", current_zone="EXTERIOR", zone_entry_time=now - 3000)
|
||||||
|
assert evaluate_condition({"type": "in_zone_longer_than", "zone": "EXTERIOR", "minutes": 45}, state, now) is True
|
||||||
|
|
||||||
|
def test_evaluate_rule_and_logic():
|
||||||
|
now = time.time()
|
||||||
|
state = PetState(pet_id="milo", current_zone="EXTERIOR", zone_entry_time=now - 3600, person_present=False)
|
||||||
|
rule = {"conditions": [{"type": "detected_in_zone", "zone": "EXTERIOR"}, {"type": "detected_without_person"}]}
|
||||||
|
assert evaluate_rule(rule, state, now) is True
|
||||||
|
|
||||||
|
def test_evaluate_rule_one_fails():
|
||||||
|
now = time.time()
|
||||||
|
state = PetState(pet_id="milo", current_zone="INTERIOR", person_present=False)
|
||||||
|
rule = {"conditions": [{"type": "detected_in_zone", "zone": "EXTERIOR"}, {"type": "detected_without_person"}]}
|
||||||
|
assert evaluate_rule(rule, state, now) is False
|
||||||
0
vigilar/pets/__init__.py
Normal file
0
vigilar/pets/__init__.py
Normal file
135
vigilar/pets/rules.py
Normal file
135
vigilar/pets/rules.py
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
"""Pet rule engine — composable conditions with per-pet state tracking."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time as time_mod
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from vigilar.alerts.profiles import is_in_time_window
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PetState:
|
||||||
|
pet_id: str
|
||||||
|
last_seen_camera: str | None = None
|
||||||
|
last_seen_time: float = 0
|
||||||
|
current_zone: str | None = None
|
||||||
|
zone_entry_time: float = 0
|
||||||
|
person_present: bool = False
|
||||||
|
person_last_seen: float = 0
|
||||||
|
|
||||||
|
|
||||||
|
def evaluate_condition(condition: dict, pet_state: PetState, now: float) -> bool:
|
||||||
|
ctype = condition.get("type")
|
||||||
|
if ctype == "detected_in_zone":
|
||||||
|
return pet_state.current_zone == condition.get("zone")
|
||||||
|
elif ctype == "not_seen_in_zone":
|
||||||
|
if pet_state.current_zone == condition.get("zone"):
|
||||||
|
return False
|
||||||
|
return (now - pet_state.last_seen_time) >= condition.get("minutes", 0) * 60
|
||||||
|
elif ctype == "not_seen_anywhere":
|
||||||
|
return (now - pet_state.last_seen_time) >= condition.get("minutes", 0) * 60
|
||||||
|
elif ctype == "detected_without_person":
|
||||||
|
return not pet_state.person_present
|
||||||
|
elif ctype == "in_zone_longer_than":
|
||||||
|
if pet_state.current_zone != condition.get("zone"):
|
||||||
|
return False
|
||||||
|
return (now - pet_state.zone_entry_time) >= condition.get("minutes", 0) * 60
|
||||||
|
elif ctype == "time_of_day":
|
||||||
|
start = condition.get("start", "00:00")
|
||||||
|
end = condition.get("end", "23:59")
|
||||||
|
current_time = time_mod.strftime("%H:%M")
|
||||||
|
return is_in_time_window(f"{start}-{end}", current_time)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def evaluate_rule(rule: dict, pet_state: PetState, now: float) -> bool:
|
||||||
|
conditions = rule.get("conditions", [])
|
||||||
|
if isinstance(conditions, str):
|
||||||
|
conditions = json.loads(conditions)
|
||||||
|
for condition in conditions:
|
||||||
|
if not evaluate_condition(condition, pet_state, now):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def format_action_message(template: str, pet_state: PetState) -> str:
|
||||||
|
duration_s = time_mod.time() - pet_state.zone_entry_time if pet_state.zone_entry_time else 0
|
||||||
|
duration_min = int(duration_s / 60)
|
||||||
|
duration_str = f"{duration_min // 60}h {duration_min % 60}m" if duration_min >= 60 else f"{duration_min} minutes"
|
||||||
|
last_seen_s = time_mod.time() - pet_state.last_seen_time if pet_state.last_seen_time else 0
|
||||||
|
last_seen_min = int(last_seen_s / 60)
|
||||||
|
last_seen_str = f"{last_seen_min}m ago" if last_seen_min < 60 else f"{last_seen_min // 60}h ago"
|
||||||
|
camera = (pet_state.last_seen_camera or "unknown").replace("_", " ").title()
|
||||||
|
return template.format(
|
||||||
|
pet_name=pet_state.pet_id, camera=camera,
|
||||||
|
zone=pet_state.current_zone or "unknown",
|
||||||
|
duration=duration_str, last_seen=last_seen_str)
|
||||||
|
|
||||||
|
|
||||||
|
class PetRuleEngine:
|
||||||
|
def __init__(self):
|
||||||
|
self._pet_states: dict[str, PetState] = {}
|
||||||
|
self._rules: list[dict] = []
|
||||||
|
self._last_fired: dict[int, float] = {}
|
||||||
|
|
||||||
|
def load_rules(self, engine) -> None:
|
||||||
|
from vigilar.storage.queries import get_all_enabled_rules
|
||||||
|
self._rules = get_all_enabled_rules(engine)
|
||||||
|
log.info("Loaded %d pet rules", len(self._rules))
|
||||||
|
|
||||||
|
def get_or_create_state(self, pet_id: str) -> PetState:
|
||||||
|
if pet_id not in self._pet_states:
|
||||||
|
self._pet_states[pet_id] = PetState(pet_id=pet_id)
|
||||||
|
return self._pet_states[pet_id]
|
||||||
|
|
||||||
|
def on_pet_detection(self, pet_id, camera_id, zone, now) -> list[dict]:
|
||||||
|
state = self.get_or_create_state(pet_id)
|
||||||
|
old_zone = state.current_zone
|
||||||
|
state.last_seen_camera = camera_id
|
||||||
|
state.last_seen_time = now
|
||||||
|
if zone != old_zone:
|
||||||
|
state.current_zone = zone
|
||||||
|
state.zone_entry_time = now
|
||||||
|
else:
|
||||||
|
state.current_zone = zone
|
||||||
|
return self._evaluate_rules_for_pet(pet_id, now)
|
||||||
|
|
||||||
|
def on_person_detection(self, camera_id, now) -> None:
|
||||||
|
for state in self._pet_states.values():
|
||||||
|
if state.last_seen_camera == camera_id:
|
||||||
|
state.person_present = True
|
||||||
|
state.person_last_seen = now
|
||||||
|
|
||||||
|
def tick(self, now) -> list[dict]:
|
||||||
|
for state in self._pet_states.values():
|
||||||
|
if state.person_present and now - state.person_last_seen > 30:
|
||||||
|
state.person_present = False
|
||||||
|
triggered = []
|
||||||
|
for pet_id in self._pet_states:
|
||||||
|
triggered.extend(self._evaluate_rules_for_pet(pet_id, now))
|
||||||
|
return triggered
|
||||||
|
|
||||||
|
def _evaluate_rules_for_pet(self, pet_id, now) -> list[dict]:
|
||||||
|
state = self._pet_states.get(pet_id)
|
||||||
|
if not state:
|
||||||
|
return []
|
||||||
|
triggered = []
|
||||||
|
for rule in self._rules:
|
||||||
|
if rule["pet_id"] != pet_id:
|
||||||
|
continue
|
||||||
|
rule_id = rule["id"]
|
||||||
|
last = self._last_fired.get(rule_id, 0)
|
||||||
|
cooldown = rule.get("cooldown_minutes", 30) * 60
|
||||||
|
if now - last < cooldown:
|
||||||
|
continue
|
||||||
|
if evaluate_rule(rule, state, now):
|
||||||
|
self._last_fired[rule_id] = now
|
||||||
|
message = format_action_message(rule.get("action_message", ""), state)
|
||||||
|
triggered.append({
|
||||||
|
"rule_id": rule_id, "rule_name": rule["name"], "pet_id": pet_id,
|
||||||
|
"action": rule["action"], "message": message,
|
||||||
|
"camera_id": state.last_seen_camera})
|
||||||
|
return triggered
|
||||||
Loading…
Reference in New Issue
Block a user