"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history.""" import time import pytest from vigilar.config import RuleCondition, RuleConfig, VigilarConfig from vigilar.constants import ArmState, EventType, Severity from vigilar.events.history import ack_event, query_events from vigilar.events.rules import RuleEngine from vigilar.events.state import ArmStateFSM from vigilar.storage.queries import insert_event # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- def _make_config(rules=None, pin_hash=""): return VigilarConfig( security={"pin_hash": pin_hash}, cameras=[], sensors=[], rules=rules or [], ) def _pin_hash(pin: str) -> str: from vigilar.alerts.pin import hash_pin return hash_pin(pin) # --------------------------------------------------------------------------- # Rule Engine # --------------------------------------------------------------------------- class TestRuleEngine: def test_and_logic_all_match(self): rule = RuleConfig( id="r1", conditions=[ RuleCondition(type="arm_state", value="ARMED_AWAY"), RuleCondition(type="camera_motion", value=""), ], logic="AND", actions=["alert_all"], cooldown_s=0, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) actions = engine.evaluate( "vigilar/camera/cam1/motion/start", {"ts": 123}, ArmState.ARMED_AWAY, ) assert actions == ["alert_all"] def test_and_logic_partial_match(self): rule = RuleConfig( id="r1", conditions=[ RuleCondition(type="arm_state", value="ARMED_AWAY"), RuleCondition(type="camera_motion", value=""), ], logic="AND", actions=["alert_all"], cooldown_s=0, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) # Arm state is DISARMED, so AND fails actions = engine.evaluate( "vigilar/camera/cam1/motion/start", {"ts": 123}, ArmState.DISARMED, ) assert actions == [] def test_or_logic(self): rule = RuleConfig( id="r2", conditions=[ RuleCondition(type="arm_state", value="ARMED_AWAY"), RuleCondition(type="camera_motion", value=""), ], logic="OR", actions=["record_all_cameras"], cooldown_s=0, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) # Only camera_motion matches, but OR logic means it fires actions = engine.evaluate( "vigilar/camera/cam1/motion/start", {"ts": 123}, ArmState.DISARMED, ) assert actions == ["record_all_cameras"] def test_cooldown_prevents_refire(self): rule = RuleConfig( id="r3", conditions=[ RuleCondition(type="camera_motion", value=""), ], logic="AND", actions=["alert_all"], cooldown_s=300, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) # First evaluation fires actions1 = engine.evaluate( "vigilar/camera/cam1/motion/start", {"ts": 1}, ArmState.ARMED_AWAY, ) assert actions1 == ["alert_all"] # Second evaluation within cooldown does NOT fire actions2 = engine.evaluate( "vigilar/camera/cam1/motion/start", {"ts": 2}, ArmState.ARMED_AWAY, ) assert actions2 == [] def test_sensor_event_condition(self): rule = RuleConfig( id="r4", conditions=[ RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"), ], logic="AND", actions=["alert_all"], cooldown_s=0, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) actions = engine.evaluate( "vigilar/sensor/door1/CONTACT_OPEN", {"ts": 1}, ArmState.DISARMED, ) assert actions == ["alert_all"] def test_sensor_event_wrong_sensor(self): rule = RuleConfig( id="r5", conditions=[ RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"), ], logic="AND", actions=["alert_all"], cooldown_s=0, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) actions = engine.evaluate( "vigilar/sensor/door2/CONTACT_OPEN", {"ts": 1}, ArmState.DISARMED, ) assert actions == [] def test_no_conditions_never_fires(self): rule = RuleConfig( id="r6", conditions=[], logic="AND", actions=["alert_all"], cooldown_s=0, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) actions = engine.evaluate( "vigilar/camera/cam1/motion/start", {"ts": 1}, ArmState.ARMED_AWAY, ) assert actions == [] def test_specific_camera_motion(self): rule = RuleConfig( id="r7", conditions=[ RuleCondition(type="camera_motion", value="cam2"), ], logic="AND", actions=["alert_all"], cooldown_s=0, ) cfg = _make_config(rules=[rule]) engine = RuleEngine(cfg) # Wrong camera assert engine.evaluate( "vigilar/camera/cam1/motion/start", {}, ArmState.DISARMED ) == [] # Right camera assert engine.evaluate( "vigilar/camera/cam2/motion/start", {}, ArmState.DISARMED ) == ["alert_all"] # --------------------------------------------------------------------------- # Arm State FSM # --------------------------------------------------------------------------- class TestArmStateFSM: def test_initial_state_disarmed(self, test_db): cfg = _make_config() fsm = ArmStateFSM(test_db, cfg) assert fsm.state == ArmState.DISARMED def test_transition_without_pin(self, test_db): cfg = _make_config() fsm = ArmStateFSM(test_db, cfg) assert fsm.transition(ArmState.ARMED_HOME, triggered_by="test") assert fsm.state == ArmState.ARMED_HOME def test_transition_with_valid_pin(self, test_db): pin = "1234" cfg = _make_config(pin_hash=_pin_hash(pin)) fsm = ArmStateFSM(test_db, cfg) assert fsm.transition(ArmState.ARMED_AWAY, pin=pin, triggered_by="test") assert fsm.state == ArmState.ARMED_AWAY def test_transition_with_invalid_pin(self, test_db): pin = "1234" cfg = _make_config(pin_hash=_pin_hash(pin)) fsm = ArmStateFSM(test_db, cfg) assert not fsm.transition(ArmState.ARMED_AWAY, pin="wrong", triggered_by="test") assert fsm.state == ArmState.DISARMED def test_transition_same_state_is_noop(self, test_db): cfg = _make_config() fsm = ArmStateFSM(test_db, cfg) assert fsm.transition(ArmState.DISARMED, triggered_by="test") assert fsm.state == ArmState.DISARMED def test_state_persists_across_instances(self, test_db): cfg = _make_config() fsm1 = ArmStateFSM(test_db, cfg) fsm1.transition(ArmState.ARMED_AWAY, triggered_by="test") # New FSM instance should load from DB fsm2 = ArmStateFSM(test_db, cfg) assert fsm2.state == ArmState.ARMED_AWAY def test_verify_pin(self, test_db): pin = "5678" cfg = _make_config(pin_hash=_pin_hash(pin)) fsm = ArmStateFSM(test_db, cfg) assert fsm.verify_pin(pin) assert not fsm.verify_pin("0000") # --------------------------------------------------------------------------- # Event History # --------------------------------------------------------------------------- class TestEventHistory: def test_insert_and_query(self, test_db): event_id = insert_event( test_db, event_type=EventType.MOTION_START, severity=Severity.WARNING, source_id="cam1", payload={"zone": "front"}, ) assert event_id > 0 rows = query_events(test_db) assert len(rows) == 1 assert rows[0]["type"] == EventType.MOTION_START assert rows[0]["source_id"] == "cam1" def test_query_filter_by_type(self, test_db): insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1") insert_event(test_db, EventType.CONTACT_OPEN, Severity.WARNING, "door1") rows = query_events(test_db, event_type=EventType.MOTION_START) assert len(rows) == 1 assert rows[0]["type"] == EventType.MOTION_START def test_query_filter_by_severity(self, test_db): insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1") insert_event(test_db, EventType.POWER_LOSS, Severity.CRITICAL, "ups") rows = query_events(test_db, severity=Severity.CRITICAL) assert len(rows) == 1 assert rows[0]["severity"] == Severity.CRITICAL def test_acknowledge_event(self, test_db): event_id = insert_event( test_db, EventType.MOTION_START, Severity.WARNING, "cam1" ) assert ack_event(test_db, event_id) rows = query_events(test_db) assert rows[0]["acknowledged"] == 1 assert rows[0]["ack_ts"] is not None def test_acknowledge_nonexistent(self, test_db): assert not ack_event(test_db, 99999) def test_query_limit_and_offset(self, test_db): for i in range(5): insert_event(test_db, EventType.MOTION_START, Severity.INFO, f"cam{i}") rows = query_events(test_db, limit=2) assert len(rows) == 2 rows_offset = query_events(test_db, limit=2, offset=2) assert len(rows_offset) == 2 # Should be different events assert rows[0]["id"] != rows_offset[0]["id"] # --------------------------------------------------------------------------- # Pet / Wildlife Event Classification # --------------------------------------------------------------------------- class TestPetEventClassification: def test_pet_detected_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/kitchen/pet/detected", {"pet_id": "p1", "pet_name": "Angel", "confidence": 0.92, "camera_location": "INTERIOR"}, # noqa: E501 ) assert etype == EventType.PET_DETECTED assert sev == Severity.INFO assert source == "kitchen" def test_pet_escape_exterior(self): """Known pet in exterior zone → PET_ESCAPE/ALERT.""" from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/front/pet/detected", {"pet_id": "p1", "pet_name": "Angel", "camera_location": "EXTERIOR"}, ) assert etype == EventType.PET_ESCAPE assert sev == Severity.ALERT def test_pet_escape_transition(self): """Known pet in transition zone → PET_ESCAPE/ALERT.""" from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/garage/pet/detected", {"pet_id": "p1", "pet_name": "Milo", "camera_location": "TRANSITION"}, ) assert etype == EventType.PET_ESCAPE assert sev == Severity.ALERT def test_unknown_animal(self): """No pet_id → UNKNOWN_ANIMAL/WARNING.""" from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/kitchen/pet/detected", {"species": "cat", "camera_location": "INTERIOR"}, ) assert etype == EventType.UNKNOWN_ANIMAL assert sev == Severity.WARNING def test_known_pet_interior(self): """Known pet in interior → PET_DETECTED/INFO.""" from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/kitchen/pet/detected", {"pet_id": "p1", "pet_name": "Angel", "camera_location": "INTERIOR"}, ) assert etype == EventType.PET_DETECTED assert sev == Severity.INFO def test_wildlife_predator_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/front/wildlife/detected", {"species": "bear", "threat_level": "PREDATOR"}, ) assert etype == EventType.WILDLIFE_PREDATOR assert sev == Severity.CRITICAL assert source == "front" def test_wildlife_nuisance_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/back/wildlife/detected", {"species": "raccoon", "threat_level": "NUISANCE"}, ) assert etype == EventType.WILDLIFE_NUISANCE assert sev == Severity.WARNING assert source == "back" def test_wildlife_passive_event(self): from vigilar.events.processor import EventProcessor from vigilar.constants import EventType, Severity processor = EventProcessor.__new__(EventProcessor) etype, sev, source = processor._classify_event( "vigilar/camera/front/wildlife/detected", {"species": "deer", "threat_level": "PASSIVE"}, ) assert etype == EventType.WILDLIFE_PASSIVE assert sev == Severity.INFO assert source == "front" # --------------------------------------------------------------------------- # Classified-event broadcast (for SSE bridge) # --------------------------------------------------------------------------- class _RecordingBus: """Minimal bus fake that records publishes instead of sending them.""" def __init__(self): self.published: list[tuple[str, dict]] = [] def publish(self, topic: str, payload: dict, qos: int = 1) -> None: self.published.append((topic, payload)) def publish_event(self, topic: str, **kwargs) -> None: self.publish(topic, kwargs) class _StubFSM: state = ArmState.DISARMED class _StubRuleEngine: def evaluate(self, topic, payload, state): return [] class TestEventsPublishedBroadcast: """EventProcessor should publish a classified-event summary to Topics.EVENTS_PUBLISHED after inserting, so the Flask SSE bridge can forward it to browser clients.""" def test_handle_event_publishes_classified_payload(self, test_db): from vigilar.events.processor import EventProcessor from vigilar.constants import Topics processor = EventProcessor.__new__(EventProcessor) bus = _RecordingBus() processor._handle_event( topic="vigilar/camera/cam1/motion/start", payload={"ts": 12345, "detail": "x"}, engine=test_db, fsm=_StubFSM(), rule_engine=_StubRuleEngine(), bus=bus, ) published_on_bridge_topic = [ p for t, p in bus.published if t == Topics.EVENTS_PUBLISHED ] assert len(published_on_bridge_topic) == 1, ( f"expected exactly one publish to {Topics.EVENTS_PUBLISHED}, " f"got publishes: {bus.published}" ) msg = published_on_bridge_topic[0] assert msg["type"] == EventType.MOTION_START assert msg["severity"] == Severity.WARNING assert msg["source_id"] == "cam1" assert "ts" in msg assert "id" in msg and isinstance(msg["id"], int) def test_unclassified_topic_does_not_publish(self, test_db): """Heartbeats and other non-event topics must not be forwarded.""" from vigilar.events.processor import EventProcessor from vigilar.constants import Topics processor = EventProcessor.__new__(EventProcessor) bus = _RecordingBus() processor._handle_event( topic="vigilar/camera/cam1/heartbeat", payload={"ts": 12345}, engine=test_db, fsm=_StubFSM(), rule_engine=_StubRuleEngine(), bus=bus, ) assert not any(t == Topics.EVENTS_PUBLISHED for t, _ in bus.published)