vigilar/tests/unit/test_events.py
Aaron D. Lee c77f732ac7 Differentiate PET_ESCAPE and UNKNOWN_ANIMAL events by zone and identity
Replace the flat pet/detected handler with context-aware classification:
unknown animals (no pet_id) → UNKNOWN_ANIMAL/WARNING, known pets in
exterior/transition zones → PET_ESCAPE/ALERT, known pets indoors →
PET_DETECTED/INFO. Adds four new unit tests covering all three paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 13:45:15 -04:00

432 lines
15 KiB
Python

"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history."""
import hashlib
import time
import pytest
from vigilar.config import RuleCondition, RuleConfig, VigilarConfig
from vigilar.constants import ArmState, EventType, Severity
from vigilar.events.history import ack_event, query_events
from vigilar.events.rules import RuleEngine
from vigilar.events.state import ArmStateFSM
from vigilar.storage.queries import insert_event
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_config(rules=None, pin_hash=""):
return VigilarConfig(
system={"arm_pin_hash": pin_hash},
cameras=[],
sensors=[],
rules=rules or [],
)
def _pin_hash(pin: str) -> str:
return hashlib.sha256(pin.encode()).hexdigest()
# ---------------------------------------------------------------------------
# Rule Engine
# ---------------------------------------------------------------------------
class TestRuleEngine:
def test_and_logic_all_match(self):
rule = RuleConfig(
id="r1",
conditions=[
RuleCondition(type="arm_state", value="ARMED_AWAY"),
RuleCondition(type="camera_motion", value=""),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 123},
ArmState.ARMED_AWAY,
)
assert actions == ["alert_all"]
def test_and_logic_partial_match(self):
rule = RuleConfig(
id="r1",
conditions=[
RuleCondition(type="arm_state", value="ARMED_AWAY"),
RuleCondition(type="camera_motion", value=""),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# Arm state is DISARMED, so AND fails
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 123},
ArmState.DISARMED,
)
assert actions == []
def test_or_logic(self):
rule = RuleConfig(
id="r2",
conditions=[
RuleCondition(type="arm_state", value="ARMED_AWAY"),
RuleCondition(type="camera_motion", value=""),
],
logic="OR",
actions=["record_all_cameras"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# Only camera_motion matches, but OR logic means it fires
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 123},
ArmState.DISARMED,
)
assert actions == ["record_all_cameras"]
def test_cooldown_prevents_refire(self):
rule = RuleConfig(
id="r3",
conditions=[
RuleCondition(type="camera_motion", value=""),
],
logic="AND",
actions=["alert_all"],
cooldown_s=300,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# First evaluation fires
actions1 = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 1},
ArmState.ARMED_AWAY,
)
assert actions1 == ["alert_all"]
# Second evaluation within cooldown does NOT fire
actions2 = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 2},
ArmState.ARMED_AWAY,
)
assert actions2 == []
def test_sensor_event_condition(self):
rule = RuleConfig(
id="r4",
conditions=[
RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/sensor/door1/CONTACT_OPEN",
{"ts": 1},
ArmState.DISARMED,
)
assert actions == ["alert_all"]
def test_sensor_event_wrong_sensor(self):
rule = RuleConfig(
id="r5",
conditions=[
RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/sensor/door2/CONTACT_OPEN",
{"ts": 1},
ArmState.DISARMED,
)
assert actions == []
def test_no_conditions_never_fires(self):
rule = RuleConfig(
id="r6",
conditions=[],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 1},
ArmState.ARMED_AWAY,
)
assert actions == []
def test_specific_camera_motion(self):
rule = RuleConfig(
id="r7",
conditions=[
RuleCondition(type="camera_motion", value="cam2"),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# Wrong camera
assert engine.evaluate(
"vigilar/camera/cam1/motion/start", {}, ArmState.DISARMED
) == []
# Right camera
assert engine.evaluate(
"vigilar/camera/cam2/motion/start", {}, ArmState.DISARMED
) == ["alert_all"]
# ---------------------------------------------------------------------------
# Arm State FSM
# ---------------------------------------------------------------------------
class TestArmStateFSM:
def test_initial_state_disarmed(self, test_db):
cfg = _make_config()
fsm = ArmStateFSM(test_db, cfg)
assert fsm.state == ArmState.DISARMED
def test_transition_without_pin(self, test_db):
cfg = _make_config()
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.ARMED_HOME, triggered_by="test")
assert fsm.state == ArmState.ARMED_HOME
def test_transition_with_valid_pin(self, test_db):
pin = "1234"
cfg = _make_config(pin_hash=_pin_hash(pin))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.ARMED_AWAY, pin=pin, triggered_by="test")
assert fsm.state == ArmState.ARMED_AWAY
def test_transition_with_invalid_pin(self, test_db):
pin = "1234"
cfg = _make_config(pin_hash=_pin_hash(pin))
fsm = ArmStateFSM(test_db, cfg)
assert not fsm.transition(ArmState.ARMED_AWAY, pin="wrong", triggered_by="test")
assert fsm.state == ArmState.DISARMED
def test_transition_same_state_is_noop(self, test_db):
cfg = _make_config()
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.DISARMED, triggered_by="test")
assert fsm.state == ArmState.DISARMED
def test_state_persists_across_instances(self, test_db):
cfg = _make_config()
fsm1 = ArmStateFSM(test_db, cfg)
fsm1.transition(ArmState.ARMED_AWAY, triggered_by="test")
# New FSM instance should load from DB
fsm2 = ArmStateFSM(test_db, cfg)
assert fsm2.state == ArmState.ARMED_AWAY
def test_verify_pin(self, test_db):
pin = "5678"
cfg = _make_config(pin_hash=_pin_hash(pin))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.verify_pin(pin)
assert not fsm.verify_pin("0000")
# ---------------------------------------------------------------------------
# Event History
# ---------------------------------------------------------------------------
class TestEventHistory:
def test_insert_and_query(self, test_db):
event_id = insert_event(
test_db,
event_type=EventType.MOTION_START,
severity=Severity.WARNING,
source_id="cam1",
payload={"zone": "front"},
)
assert event_id > 0
rows = query_events(test_db)
assert len(rows) == 1
assert rows[0]["type"] == EventType.MOTION_START
assert rows[0]["source_id"] == "cam1"
def test_query_filter_by_type(self, test_db):
insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1")
insert_event(test_db, EventType.CONTACT_OPEN, Severity.WARNING, "door1")
rows = query_events(test_db, event_type=EventType.MOTION_START)
assert len(rows) == 1
assert rows[0]["type"] == EventType.MOTION_START
def test_query_filter_by_severity(self, test_db):
insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1")
insert_event(test_db, EventType.POWER_LOSS, Severity.CRITICAL, "ups")
rows = query_events(test_db, severity=Severity.CRITICAL)
assert len(rows) == 1
assert rows[0]["severity"] == Severity.CRITICAL
def test_acknowledge_event(self, test_db):
event_id = insert_event(
test_db, EventType.MOTION_START, Severity.WARNING, "cam1"
)
assert ack_event(test_db, event_id)
rows = query_events(test_db)
assert rows[0]["acknowledged"] == 1
assert rows[0]["ack_ts"] is not None
def test_acknowledge_nonexistent(self, test_db):
assert not ack_event(test_db, 99999)
def test_query_limit_and_offset(self, test_db):
for i in range(5):
insert_event(test_db, EventType.MOTION_START, Severity.INFO, f"cam{i}")
rows = query_events(test_db, limit=2)
assert len(rows) == 2
rows_offset = query_events(test_db, limit=2, offset=2)
assert len(rows_offset) == 2
# Should be different events
assert rows[0]["id"] != rows_offset[0]["id"]
# ---------------------------------------------------------------------------
# Pet / Wildlife Event Classification
# ---------------------------------------------------------------------------
class TestPetEventClassification:
def test_pet_detected_event(self):
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/kitchen/pet/detected",
{"pet_id": "p1", "pet_name": "Angel", "confidence": 0.92, "camera_location": "INTERIOR"}, # noqa: E501
)
assert etype == EventType.PET_DETECTED
assert sev == Severity.INFO
assert source == "kitchen"
def test_pet_escape_exterior(self):
"""Known pet in exterior zone → PET_ESCAPE/ALERT."""
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/front/pet/detected",
{"pet_id": "p1", "pet_name": "Angel", "camera_location": "EXTERIOR"},
)
assert etype == EventType.PET_ESCAPE
assert sev == Severity.ALERT
def test_pet_escape_transition(self):
"""Known pet in transition zone → PET_ESCAPE/ALERT."""
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/garage/pet/detected",
{"pet_id": "p1", "pet_name": "Milo", "camera_location": "TRANSITION"},
)
assert etype == EventType.PET_ESCAPE
assert sev == Severity.ALERT
def test_unknown_animal(self):
"""No pet_id → UNKNOWN_ANIMAL/WARNING."""
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/kitchen/pet/detected",
{"species": "cat", "camera_location": "INTERIOR"},
)
assert etype == EventType.UNKNOWN_ANIMAL
assert sev == Severity.WARNING
def test_known_pet_interior(self):
"""Known pet in interior → PET_DETECTED/INFO."""
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/kitchen/pet/detected",
{"pet_id": "p1", "pet_name": "Angel", "camera_location": "INTERIOR"},
)
assert etype == EventType.PET_DETECTED
assert sev == Severity.INFO
def test_wildlife_predator_event(self):
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/front/wildlife/detected",
{"species": "bear", "threat_level": "PREDATOR"},
)
assert etype == EventType.WILDLIFE_PREDATOR
assert sev == Severity.CRITICAL
assert source == "front"
def test_wildlife_nuisance_event(self):
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/back/wildlife/detected",
{"species": "raccoon", "threat_level": "NUISANCE"},
)
assert etype == EventType.WILDLIFE_NUISANCE
assert sev == Severity.WARNING
assert source == "back"
def test_wildlife_passive_event(self):
from vigilar.events.processor import EventProcessor
from vigilar.constants import EventType, Severity
processor = EventProcessor.__new__(EventProcessor)
etype, sev, source = processor._classify_event(
"vigilar/camera/front/wildlife/detected",
{"species": "deer", "threat_level": "PASSIVE"},
)
assert etype == EventType.WILDLIFE_PASSIVE
assert sev == Severity.INFO
assert source == "front"