From 10b0cf4d0ea376c543c2874b7dbab902de9caef1 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Thu, 2 Apr 2026 23:17:53 -0400 Subject: [PATCH] Add events/rules engine, sensor bridge, and UPS monitor (Phases 6-8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 6 — Events + Rule Engine: - EventProcessor subprocess: subscribes to all MQTT events, logs to DB, evaluates rules, fires alert actions - ArmStateFSM: DISARMED/ARMED_HOME/ARMED_AWAY with PIN verification (HMAC-safe), DB persistence, MQTT state publishing - RuleEngine: AND/OR logic, 4 condition types (arm_state, sensor_event, camera_motion, time_window), per-rule cooldown tracking - SSE event stream with subscriber queue pattern and keepalive - Event acknowledge endpoint Phase 7 — Sensor Bridge: - SensorBridge subprocess: subscribes to Zigbee2MQTT, normalizes payloads (contact, occupancy, temperature, humidity, battery, linkquality) - GPIOHandler: conditional gpiozero import, callbacks for reed switches and PIR sensors - SensorRegistry: maps Zigbee addresses and names to config sensor IDs - SensorEvent/SensorState dataclasses - Web UI now shows real sensor states from DB Phase 8 — UPS Monitor: - UPSMonitor subprocess: polls NUT via pynut2 with reconnect backoff - State transition detection: OL→OB (power_loss), charge/runtime thresholds (low_battery, critical), OB→OL (restored) - ShutdownSequence: ordered shutdown with configurable delay and command - All conditionally imported (pynut2, gpiozero) for non-target platforms Fixed test_db fixture to use isolated engines (no global singleton leak). 96 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/conftest.py | 19 +- tests/unit/test_events.py | 329 ++++++++++++++++++++++++++++++ tests/unit/test_schema.py | 8 +- tests/unit/test_sensors.py | 251 +++++++++++++++++++++++ tests/unit/test_ups.py | 324 +++++++++++++++++++++++++++++ vigilar/events/__init__.py | 1 + vigilar/events/history.py | 33 +++ vigilar/events/processor.py | 189 +++++++++++++++++ vigilar/events/rules.py | 127 ++++++++++++ vigilar/events/state.py | 94 +++++++++ vigilar/main.py | 30 ++- vigilar/sensors/__init__.py | 1 + vigilar/sensors/bridge.py | 201 ++++++++++++++++++ vigilar/sensors/gpio_handler.py | 134 ++++++++++++ vigilar/sensors/models.py | 52 +++++ vigilar/sensors/registry.py | 52 +++++ vigilar/ups/monitor.py | 154 ++++++++++++++ vigilar/ups/shutdown.py | 46 +++++ vigilar/web/blueprints/events.py | 91 ++++++++- vigilar/web/blueprints/sensors.py | 39 +++- 20 files changed, 2149 insertions(+), 26 deletions(-) create mode 100644 tests/unit/test_events.py create mode 100644 tests/unit/test_sensors.py create mode 100644 tests/unit/test_ups.py create mode 100644 vigilar/events/history.py create mode 100644 vigilar/events/processor.py create mode 100644 vigilar/events/rules.py create mode 100644 vigilar/events/state.py create mode 100644 vigilar/sensors/bridge.py create mode 100644 vigilar/sensors/gpio_handler.py create mode 100644 vigilar/sensors/models.py create mode 100644 vigilar/sensors/registry.py create mode 100644 vigilar/ups/monitor.py create mode 100644 vigilar/ups/shutdown.py diff --git a/tests/conftest.py b/tests/conftest.py index ef1aa65..21be9a3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,9 +5,18 @@ import tempfile from pathlib import Path import pytest +from sqlalchemy import create_engine, event from vigilar.config import VigilarConfig, load_config -from vigilar.storage.db import init_db +from vigilar.storage.schema import metadata + + +def _create_test_engine(db_path: Path): + """Create a fresh engine for testing (bypasses the global singleton).""" + db_path.parent.mkdir(parents=True, exist_ok=True) + engine = create_engine(f"sqlite:///{db_path}", echo=False) + metadata.create_all(engine) + return engine @pytest.fixture @@ -19,10 +28,10 @@ def tmp_data_dir(tmp_path): @pytest.fixture -def test_db(tmp_data_dir): - """Initialize a test database and return the engine.""" - db_path = tmp_data_dir / "test.db" - return init_db(db_path) +def test_db(tmp_path): + """Create an isolated test database (no shared state between tests).""" + db_path = tmp_path / "data" / "test.db" + return _create_test_engine(db_path) @pytest.fixture diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py new file mode 100644 index 0000000..aadabee --- /dev/null +++ b/tests/unit/test_events.py @@ -0,0 +1,329 @@ +"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history.""" + +import hashlib +import time + +import pytest + +from vigilar.config import RuleCondition, RuleConfig, VigilarConfig +from vigilar.constants import ArmState, EventType, Severity +from vigilar.events.history import ack_event, query_events +from vigilar.events.rules import RuleEngine +from vigilar.events.state import ArmStateFSM +from vigilar.storage.queries import insert_event + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +def _make_config(rules=None, pin_hash=""): + return VigilarConfig( + system={"arm_pin_hash": pin_hash}, + cameras=[], + sensors=[], + rules=rules or [], + ) + + +def _pin_hash(pin: str) -> str: + return hashlib.sha256(pin.encode()).hexdigest() + + +# --------------------------------------------------------------------------- +# Rule Engine +# --------------------------------------------------------------------------- + +class TestRuleEngine: + def test_and_logic_all_match(self): + rule = RuleConfig( + id="r1", + conditions=[ + RuleCondition(type="arm_state", value="ARMED_AWAY"), + RuleCondition(type="camera_motion", value=""), + ], + logic="AND", + actions=["alert_all"], + cooldown_s=0, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + actions = engine.evaluate( + "vigilar/camera/cam1/motion/start", + {"ts": 123}, + ArmState.ARMED_AWAY, + ) + assert actions == ["alert_all"] + + def test_and_logic_partial_match(self): + rule = RuleConfig( + id="r1", + conditions=[ + RuleCondition(type="arm_state", value="ARMED_AWAY"), + RuleCondition(type="camera_motion", value=""), + ], + logic="AND", + actions=["alert_all"], + cooldown_s=0, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + # Arm state is DISARMED, so AND fails + actions = engine.evaluate( + "vigilar/camera/cam1/motion/start", + {"ts": 123}, + ArmState.DISARMED, + ) + assert actions == [] + + def test_or_logic(self): + rule = RuleConfig( + id="r2", + conditions=[ + RuleCondition(type="arm_state", value="ARMED_AWAY"), + RuleCondition(type="camera_motion", value=""), + ], + logic="OR", + actions=["record_all_cameras"], + cooldown_s=0, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + # Only camera_motion matches, but OR logic means it fires + actions = engine.evaluate( + "vigilar/camera/cam1/motion/start", + {"ts": 123}, + ArmState.DISARMED, + ) + assert actions == ["record_all_cameras"] + + def test_cooldown_prevents_refire(self): + rule = RuleConfig( + id="r3", + conditions=[ + RuleCondition(type="camera_motion", value=""), + ], + logic="AND", + actions=["alert_all"], + cooldown_s=300, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + # First evaluation fires + actions1 = engine.evaluate( + "vigilar/camera/cam1/motion/start", + {"ts": 1}, + ArmState.ARMED_AWAY, + ) + assert actions1 == ["alert_all"] + + # Second evaluation within cooldown does NOT fire + actions2 = engine.evaluate( + "vigilar/camera/cam1/motion/start", + {"ts": 2}, + ArmState.ARMED_AWAY, + ) + assert actions2 == [] + + def test_sensor_event_condition(self): + rule = RuleConfig( + id="r4", + conditions=[ + RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"), + ], + logic="AND", + actions=["alert_all"], + cooldown_s=0, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + actions = engine.evaluate( + "vigilar/sensor/door1/CONTACT_OPEN", + {"ts": 1}, + ArmState.DISARMED, + ) + assert actions == ["alert_all"] + + def test_sensor_event_wrong_sensor(self): + rule = RuleConfig( + id="r5", + conditions=[ + RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"), + ], + logic="AND", + actions=["alert_all"], + cooldown_s=0, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + actions = engine.evaluate( + "vigilar/sensor/door2/CONTACT_OPEN", + {"ts": 1}, + ArmState.DISARMED, + ) + assert actions == [] + + def test_no_conditions_never_fires(self): + rule = RuleConfig( + id="r6", + conditions=[], + logic="AND", + actions=["alert_all"], + cooldown_s=0, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + actions = engine.evaluate( + "vigilar/camera/cam1/motion/start", + {"ts": 1}, + ArmState.ARMED_AWAY, + ) + assert actions == [] + + def test_specific_camera_motion(self): + rule = RuleConfig( + id="r7", + conditions=[ + RuleCondition(type="camera_motion", value="cam2"), + ], + logic="AND", + actions=["alert_all"], + cooldown_s=0, + ) + cfg = _make_config(rules=[rule]) + engine = RuleEngine(cfg) + + # Wrong camera + assert engine.evaluate( + "vigilar/camera/cam1/motion/start", {}, ArmState.DISARMED + ) == [] + + # Right camera + assert engine.evaluate( + "vigilar/camera/cam2/motion/start", {}, ArmState.DISARMED + ) == ["alert_all"] + + +# --------------------------------------------------------------------------- +# Arm State FSM +# --------------------------------------------------------------------------- + +class TestArmStateFSM: + def test_initial_state_disarmed(self, test_db): + cfg = _make_config() + fsm = ArmStateFSM(test_db, cfg) + assert fsm.state == ArmState.DISARMED + + def test_transition_without_pin(self, test_db): + cfg = _make_config() + fsm = ArmStateFSM(test_db, cfg) + + assert fsm.transition(ArmState.ARMED_HOME, triggered_by="test") + assert fsm.state == ArmState.ARMED_HOME + + def test_transition_with_valid_pin(self, test_db): + pin = "1234" + cfg = _make_config(pin_hash=_pin_hash(pin)) + fsm = ArmStateFSM(test_db, cfg) + + assert fsm.transition(ArmState.ARMED_AWAY, pin=pin, triggered_by="test") + assert fsm.state == ArmState.ARMED_AWAY + + def test_transition_with_invalid_pin(self, test_db): + pin = "1234" + cfg = _make_config(pin_hash=_pin_hash(pin)) + fsm = ArmStateFSM(test_db, cfg) + + assert not fsm.transition(ArmState.ARMED_AWAY, pin="wrong", triggered_by="test") + assert fsm.state == ArmState.DISARMED + + def test_transition_same_state_is_noop(self, test_db): + cfg = _make_config() + fsm = ArmStateFSM(test_db, cfg) + assert fsm.transition(ArmState.DISARMED, triggered_by="test") + assert fsm.state == ArmState.DISARMED + + def test_state_persists_across_instances(self, test_db): + cfg = _make_config() + fsm1 = ArmStateFSM(test_db, cfg) + fsm1.transition(ArmState.ARMED_AWAY, triggered_by="test") + + # New FSM instance should load from DB + fsm2 = ArmStateFSM(test_db, cfg) + assert fsm2.state == ArmState.ARMED_AWAY + + def test_verify_pin(self, test_db): + pin = "5678" + cfg = _make_config(pin_hash=_pin_hash(pin)) + fsm = ArmStateFSM(test_db, cfg) + assert fsm.verify_pin(pin) + assert not fsm.verify_pin("0000") + + +# --------------------------------------------------------------------------- +# Event History +# --------------------------------------------------------------------------- + +class TestEventHistory: + def test_insert_and_query(self, test_db): + event_id = insert_event( + test_db, + event_type=EventType.MOTION_START, + severity=Severity.WARNING, + source_id="cam1", + payload={"zone": "front"}, + ) + assert event_id > 0 + + rows = query_events(test_db) + assert len(rows) == 1 + assert rows[0]["type"] == EventType.MOTION_START + assert rows[0]["source_id"] == "cam1" + + def test_query_filter_by_type(self, test_db): + insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1") + insert_event(test_db, EventType.CONTACT_OPEN, Severity.WARNING, "door1") + + rows = query_events(test_db, event_type=EventType.MOTION_START) + assert len(rows) == 1 + assert rows[0]["type"] == EventType.MOTION_START + + def test_query_filter_by_severity(self, test_db): + insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1") + insert_event(test_db, EventType.POWER_LOSS, Severity.CRITICAL, "ups") + + rows = query_events(test_db, severity=Severity.CRITICAL) + assert len(rows) == 1 + assert rows[0]["severity"] == Severity.CRITICAL + + def test_acknowledge_event(self, test_db): + event_id = insert_event( + test_db, EventType.MOTION_START, Severity.WARNING, "cam1" + ) + assert ack_event(test_db, event_id) + + rows = query_events(test_db) + assert rows[0]["acknowledged"] == 1 + assert rows[0]["ack_ts"] is not None + + def test_acknowledge_nonexistent(self, test_db): + assert not ack_event(test_db, 99999) + + def test_query_limit_and_offset(self, test_db): + for i in range(5): + insert_event(test_db, EventType.MOTION_START, Severity.INFO, f"cam{i}") + + rows = query_events(test_db, limit=2) + assert len(rows) == 2 + + rows_offset = query_events(test_db, limit=2, offset=2) + assert len(rows_offset) == 2 + # Should be different events + assert rows[0]["id"] != rows_offset[0]["id"] diff --git a/tests/unit/test_schema.py b/tests/unit/test_schema.py index 1df333e..8f492aa 100644 --- a/tests/unit/test_schema.py +++ b/tests/unit/test_schema.py @@ -1,15 +1,17 @@ """Tests for database schema creation.""" -from vigilar.storage.db import init_db +from sqlalchemy import create_engine, inspect + from vigilar.storage.schema import metadata def test_tables_created(tmp_path): db_path = tmp_path / "test.db" - engine = init_db(db_path) + engine = create_engine(f"sqlite:///{db_path}", echo=False) + metadata.create_all(engine) + assert db_path.exists() - from sqlalchemy import inspect inspector = inspect(engine) table_names = inspector.get_table_names() diff --git a/tests/unit/test_sensors.py b/tests/unit/test_sensors.py new file mode 100644 index 0000000..18eafbc --- /dev/null +++ b/tests/unit/test_sensors.py @@ -0,0 +1,251 @@ +"""Tests for sensor bridge subsystem — models, registry, payload normalization.""" + +import time + +import pytest + +from vigilar.config import SensorConfig, VigilarConfig +from vigilar.constants import EventType +from vigilar.sensors.bridge import normalize_zigbee_payload +from vigilar.sensors.models import SensorEvent, SensorState +from vigilar.sensors.registry import SensorRegistry + + +# --- Fixtures --- + + +@pytest.fixture +def zigbee_sensor_config(): + return VigilarConfig( + sensors=[ + SensorConfig( + id="front_door", + display_name="Front Door", + type="CONTACT", + protocol="ZIGBEE", + device_address="front_door_sensor", + location="Entrance", + ), + SensorConfig( + id="hallway_motion", + display_name="Hallway Motion", + type="MOTION", + protocol="ZIGBEE", + device_address="hallway_pir", + location="Hallway", + ), + SensorConfig( + id="living_room_temp", + display_name="Living Room Temp", + type="TEMPERATURE", + protocol="ZIGBEE", + device_address="lr_climate", + location="Living Room", + ), + SensorConfig( + id="garage_reed", + display_name="Garage Reed", + type="CONTACT", + protocol="GPIO", + device_address="17", + location="Garage", + ), + SensorConfig( + id="disabled_sensor", + display_name="Disabled", + type="CONTACT", + protocol="ZIGBEE", + device_address="disabled_one", + location="Attic", + enabled=False, + ), + ], + ) + + +# --- Dataclass Tests --- + + +class TestSensorEvent: + def test_create_with_state(self): + event = SensorEvent( + sensor_id="front_door", + event_type=EventType.CONTACT_OPEN, + state="OPEN", + source_protocol="ZIGBEE", + ) + assert event.sensor_id == "front_door" + assert event.event_type == EventType.CONTACT_OPEN + assert event.state == "OPEN" + assert event.value is None + assert event.timestamp > 0 + + def test_create_with_value(self): + event = SensorEvent( + sensor_id="living_room_temp", + event_type="temperature", + value=22.5, + source_protocol="ZIGBEE", + ) + assert event.value == 22.5 + assert event.state is None + + def test_to_dict(self): + event = SensorEvent( + sensor_id="s1", + event_type="temperature", + value=18.0, + timestamp=1000, + source_protocol="ZIGBEE", + ) + d = event.to_dict() + assert d["sensor_id"] == "s1" + assert d["value"] == 18.0 + assert d["ts"] == 1000 + assert "state" not in d + + +class TestSensorState: + def test_create(self): + state = SensorState( + sensor_id="front_door", + states={"CONTACT_OPEN": "OPEN"}, + last_seen=12345, + battery_pct=87, + ) + assert state.sensor_id == "front_door" + assert state.battery_pct == 87 + assert state.signal_strength is None + + def test_to_dict(self): + state = SensorState(sensor_id="s1", states={"temp": 22.5}, last_seen=100) + d = state.to_dict() + assert d["sensor_id"] == "s1" + assert d["states"] == {"temp": 22.5} + assert d["battery_pct"] is None + + +# --- Registry Tests --- + + +class TestSensorRegistry: + def test_lookup_by_zigbee_name(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + sensor = reg.get_sensor_by_zigbee_name("front_door_sensor") + assert sensor is not None + assert sensor.id == "front_door" + + def test_lookup_by_address(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + sensor = reg.get_sensor_by_address("hallway_pir") + assert sensor is not None + assert sensor.id == "hallway_motion" + + def test_lookup_unknown_returns_none(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + assert reg.get_sensor_by_zigbee_name("nonexistent") is None + assert reg.get_sensor_by_address("nonexistent") is None + + def test_disabled_sensors_excluded(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + assert reg.get_sensor_by_zigbee_name("disabled_one") is None + assert reg.get_sensor("disabled_sensor") is None + + def test_gpio_not_in_zigbee_lookup(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + assert reg.get_sensor_by_zigbee_name("17") is None + + def test_gpio_sensors_list(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + gpio = reg.gpio_sensors() + assert len(gpio) == 1 + assert gpio[0].id == "garage_reed" + + def test_zigbee_sensors_list(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + zigbee = reg.zigbee_sensors() + assert len(zigbee) == 3 + + def test_all_sensors_excludes_disabled(self, zigbee_sensor_config): + reg = SensorRegistry(zigbee_sensor_config) + assert len(reg.all_sensors()) == 4 + + +# --- Zigbee Payload Normalization Tests --- + + +class TestNormalizeZigbeePayload: + def test_contact_open(self): + results = normalize_zigbee_payload({"contact": False}) + assert len(results) == 1 + r = results[0] + assert r["event_type"] == EventType.CONTACT_OPEN + assert r["state"] == "OPEN" + assert r["source"] == "zigbee" + assert "ts" in r + + def test_contact_closed(self): + results = normalize_zigbee_payload({"contact": True}) + assert len(results) == 1 + assert results[0]["event_type"] == EventType.CONTACT_CLOSED + assert results[0]["state"] == "CLOSED" + + def test_motion_active(self): + results = normalize_zigbee_payload({"occupancy": True}) + assert len(results) == 1 + assert results[0]["event_type"] == EventType.MOTION_START + assert results[0]["state"] == "ACTIVE" + + def test_motion_idle(self): + results = normalize_zigbee_payload({"occupancy": False}) + assert len(results) == 1 + assert results[0]["event_type"] == EventType.MOTION_END + assert results[0]["state"] == "IDLE" + + def test_temperature(self): + results = normalize_zigbee_payload({"temperature": 22.5}) + assert len(results) == 1 + r = results[0] + assert r["event_type"] == "temperature" + assert r["value"] == 22.5 + assert r["unit"] == "C" + + def test_humidity(self): + results = normalize_zigbee_payload({"humidity": 45}) + assert len(results) == 1 + assert results[0]["value"] == 45.0 + + def test_battery(self): + results = normalize_zigbee_payload({"battery": 87}) + assert len(results) == 1 + assert results[0]["event_type"] == "battery" + assert results[0]["pct"] == 87 + + def test_linkquality(self): + results = normalize_zigbee_payload({"linkquality": 120}) + assert len(results) == 1 + assert results[0]["event_type"] == "linkquality" + assert results[0]["value"] == 120 + + def test_multi_field_payload(self): + """A real Zigbee sensor often sends multiple fields at once.""" + results = normalize_zigbee_payload({ + "contact": False, + "battery": 92, + "linkquality": 85, + "temperature": 21.3, + }) + event_types = {r["event_type"] for r in results} + assert EventType.CONTACT_OPEN in event_types + assert "battery" in event_types + assert "temperature" in event_types + assert "linkquality" in event_types + assert len(results) == 4 + + def test_empty_payload(self): + results = normalize_zigbee_payload({}) + assert results == [] + + def test_unknown_fields_ignored(self): + results = normalize_zigbee_payload({"unknown_field": 42, "foo": "bar"}) + assert results == [] diff --git a/tests/unit/test_ups.py b/tests/unit/test_ups.py new file mode 100644 index 0000000..53ea1d8 --- /dev/null +++ b/tests/unit/test_ups.py @@ -0,0 +1,324 @@ +"""Tests for UPS monitor state transitions and shutdown sequence.""" + +from unittest.mock import MagicMock, call, patch + +import pytest + +from vigilar.config import MQTTConfig, SystemConfig, UPSConfig, VigilarConfig +from vigilar.constants import EventType, Severity, Topics, UPSStatus +from vigilar.ups.monitor import UPSMonitor, _parse_status, _safe_float + + +# --- Helpers --- + +def _make_cfg(**ups_overrides) -> VigilarConfig: + return VigilarConfig( + system=SystemConfig(data_dir="/tmp/vigilar-test"), + mqtt=MQTTConfig(), + ups=UPSConfig(**ups_overrides), + ) + + +def _make_nut_vars( + status: str = "OL", + charge: str = "100", + runtime: str = "3600", + voltage: str = "230.0", + load: str = "25", +) -> dict[str, str]: + return { + "ups.status": status, + "battery.charge": charge, + "battery.runtime": runtime, + "input.voltage": voltage, + "ups.load": load, + } + + +# --- _parse_status --- + +class TestParseStatus: + def test_online(self): + assert _parse_status("OL") == UPSStatus.ONLINE + + def test_on_battery(self): + assert _parse_status("OB") == UPSStatus.ON_BATTERY + + def test_low_battery(self): + assert _parse_status("OB LB") == UPSStatus.LOW_BATTERY + + def test_online_charging(self): + assert _parse_status("OL CHRG") == UPSStatus.ONLINE + + def test_unknown(self): + assert _parse_status("BYPASS") == UPSStatus.UNKNOWN + + def test_empty(self): + assert _parse_status("") == UPSStatus.UNKNOWN + + +class TestSafeFloat: + def test_valid(self): + assert _safe_float("42.5") == 42.5 + + def test_none(self): + assert _safe_float(None) == 0.0 + + def test_garbage(self): + assert _safe_float("n/a", -1.0) == -1.0 + + +# --- State Transition Tests --- + +class TestStateTransitions: + """Test UPSMonitor._poll_once for state transition detection.""" + + def _poll(self, monitor: UPSMonitor, bus: MagicMock, engine: MagicMock, nut_vars: dict): + client = MagicMock() + client.GetUPSVars.return_value = nut_vars + monitor._poll_once(client, bus, engine) + + def test_online_to_on_battery_publishes_power_loss(self): + cfg = _make_cfg() + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ONLINE + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event") as mock_ie, \ + patch("vigilar.ups.monitor.insert_system_event") as mock_ise: + self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="80", runtime="1800")) + + assert monitor._prev_status == UPSStatus.ON_BATTERY + + # Check power loss was published + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_STATUS in topics_published + assert Topics.UPS_POWER_LOSS in topics_published + + def test_on_battery_to_online_publishes_restored(self): + cfg = _make_cfg() + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ON_BATTERY + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + self._poll(monitor, bus, engine, _make_nut_vars(status="OL")) + + assert monitor._prev_status == UPSStatus.ONLINE + + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_RESTORED in topics_published + + def test_low_battery_to_online_publishes_restored(self): + cfg = _make_cfg() + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.LOW_BATTERY + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + self._poll(monitor, bus, engine, _make_nut_vars(status="OL")) + + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_RESTORED in topics_published + + def test_charge_below_threshold_publishes_low_battery(self): + cfg = _make_cfg(low_battery_threshold_pct=30) + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ON_BATTERY + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="15", runtime="1800")) + + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_LOW_BATTERY in topics_published + + def test_charge_above_threshold_no_low_battery(self): + cfg = _make_cfg(low_battery_threshold_pct=20) + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ON_BATTERY + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="50", runtime="3600")) + + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_LOW_BATTERY not in topics_published + + def test_critical_runtime_triggers_shutdown(self): + cfg = _make_cfg(critical_runtime_threshold_s=300, shutdown_delay_s=0) + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ON_BATTERY + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"), \ + patch("vigilar.ups.shutdown.insert_event"), \ + patch("vigilar.ups.shutdown.insert_system_event"), \ + patch("vigilar.ups.shutdown.os.system") as mock_system, \ + patch("vigilar.ups.shutdown.time.sleep"): + self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="10", runtime="60")) + + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_CRITICAL in topics_published + assert Topics.SYSTEM_SHUTDOWN in topics_published + assert monitor._shutdown_triggered is True + mock_system.assert_called_once_with("shutdown -h now") + + def test_critical_runtime_only_triggers_once(self): + cfg = _make_cfg(critical_runtime_threshold_s=300, shutdown_delay_s=0) + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ON_BATTERY + monitor._shutdown_triggered = True # already triggered + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="5", runtime="30")) + + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_CRITICAL not in topics_published + + def test_online_no_low_battery_even_if_low_charge(self): + """When on mains, low charge should not trigger low_battery event.""" + cfg = _make_cfg(low_battery_threshold_pct=50) + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ONLINE + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + self._poll(monitor, bus, engine, _make_nut_vars(status="OL", charge="10")) + + topics_published = [c.args[0] for c in bus.publish_event.call_args_list] + assert Topics.UPS_LOW_BATTERY not in topics_published + + def test_status_always_published(self): + cfg = _make_cfg() + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ONLINE + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + self._poll(monitor, bus, engine, _make_nut_vars()) + + assert bus.publish_event.call_args_list[0].args[0] == Topics.UPS_STATUS + + def test_bytes_vars_decoded(self): + """NUT client may return bytes keys/values.""" + cfg = _make_cfg() + monitor = UPSMonitor(cfg) + monitor._prev_status = UPSStatus.ONLINE + + bus = MagicMock() + engine = MagicMock() + + byte_vars = { + b"ups.status": b"OL", + b"battery.charge": b"95", + b"battery.runtime": b"7200", + b"input.voltage": b"231.2", + b"ups.load": b"18", + } + + client = MagicMock() + client.GetUPSVars.return_value = byte_vars + + with patch("vigilar.ups.monitor.insert_event"), \ + patch("vigilar.ups.monitor.insert_system_event"): + monitor._poll_once(client, bus, engine) + + assert monitor._prev_status == UPSStatus.ONLINE + bus.publish_event.assert_called_once() + + +# --- Shutdown Sequence Tests --- + +class TestShutdownSequence: + def test_execute_order(self): + """Verify: publish -> log -> sleep -> os.system.""" + from vigilar.ups.shutdown import ShutdownSequence + + ups_cfg = UPSConfig(shutdown_delay_s=10) + seq = ShutdownSequence(ups_cfg) + + bus = MagicMock() + engine = MagicMock() + call_order = [] + + bus.publish_event.side_effect = lambda *a, **kw: call_order.append("publish") + + with patch("vigilar.ups.shutdown.insert_system_event", side_effect=lambda *a, **kw: call_order.append("insert_system_event")), \ + patch("vigilar.ups.shutdown.insert_event", side_effect=lambda *a, **kw: call_order.append("insert_event")), \ + patch("vigilar.ups.shutdown.time.sleep", side_effect=lambda s: call_order.append(f"sleep_{s}")) as mock_sleep, \ + patch("vigilar.ups.shutdown.os.system", side_effect=lambda cmd: call_order.append(f"system_{cmd}")) as mock_sys: + seq.execute(bus, engine, reason="test_critical") + + assert call_order[0] == "publish" + assert "insert_system_event" in call_order + assert "insert_event" in call_order + assert "sleep_10" in call_order + assert "system_shutdown -h now" in call_order + # Sleep and system call should come after publish/log + sleep_idx = call_order.index("sleep_10") + system_idx = call_order.index("system_shutdown -h now") + assert sleep_idx < system_idx + + def test_custom_shutdown_command(self): + from vigilar.ups.shutdown import ShutdownSequence + + ups_cfg = UPSConfig(shutdown_delay_s=0) + seq = ShutdownSequence(ups_cfg, shutdown_command="poweroff") + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.shutdown.insert_system_event"), \ + patch("vigilar.ups.shutdown.insert_event"), \ + patch("vigilar.ups.shutdown.time.sleep"), \ + patch("vigilar.ups.shutdown.os.system") as mock_sys: + seq.execute(bus, engine, reason="test") + + mock_sys.assert_called_once_with("poweroff") + + def test_publishes_system_shutdown_topic(self): + from vigilar.ups.shutdown import ShutdownSequence + + ups_cfg = UPSConfig(shutdown_delay_s=0) + seq = ShutdownSequence(ups_cfg) + + bus = MagicMock() + engine = MagicMock() + + with patch("vigilar.ups.shutdown.insert_system_event"), \ + patch("vigilar.ups.shutdown.insert_event"), \ + patch("vigilar.ups.shutdown.time.sleep"), \ + patch("vigilar.ups.shutdown.os.system"): + seq.execute(bus, engine, reason="battery_dead") + + bus.publish_event.assert_called_once() + assert bus.publish_event.call_args.args[0] == Topics.SYSTEM_SHUTDOWN + assert bus.publish_event.call_args.kwargs["reason"] == "battery_dead" diff --git a/vigilar/events/__init__.py b/vigilar/events/__init__.py index e69de29..d964127 100644 --- a/vigilar/events/__init__.py +++ b/vigilar/events/__init__.py @@ -0,0 +1 @@ +"""Events subsystem — event processing, rule engine, arm state FSM.""" diff --git a/vigilar/events/history.py b/vigilar/events/history.py new file mode 100644 index 0000000..af827f3 --- /dev/null +++ b/vigilar/events/history.py @@ -0,0 +1,33 @@ +"""Event history helper — thin wrapper around storage queries.""" + +from typing import Any + +from sqlalchemy.engine import Engine + +from vigilar.storage.queries import acknowledge_event, get_events + + +def query_events( + engine: Engine, + event_type: str | None = None, + severity: str | None = None, + source_id: str | None = None, + since_ts: int | None = None, + limit: int = 100, + offset: int = 0, +) -> list[dict[str, Any]]: + """Query events with optional filters.""" + return get_events( + engine, + event_type=event_type, + severity=severity, + source_id=source_id, + since_ts=since_ts, + limit=limit, + offset=offset, + ) + + +def ack_event(engine: Engine, event_id: int) -> bool: + """Acknowledge an event by ID. Returns True if the event existed.""" + return acknowledge_event(engine, event_id) diff --git a/vigilar/events/processor.py b/vigilar/events/processor.py new file mode 100644 index 0000000..0983aa7 --- /dev/null +++ b/vigilar/events/processor.py @@ -0,0 +1,189 @@ +"""Event processor — subscribes to MQTT, logs events, evaluates rules.""" + +import json +import logging +import signal +import time +from typing import Any + +from sqlalchemy.engine import Engine + +from vigilar.bus import MessageBus +from vigilar.config import VigilarConfig +from vigilar.constants import ArmState, EventType, Severity, Topics +from vigilar.events.rules import RuleEngine +from vigilar.events.state import ArmStateFSM +from vigilar.storage.db import get_db_path, init_db +from vigilar.storage.queries import insert_event + +log = logging.getLogger(__name__) + +# Map MQTT topic patterns to (EventType, Severity) +_TOPIC_EVENT_MAP: dict[str, tuple[EventType, Severity]] = { + "motion/start": (EventType.MOTION_START, Severity.WARNING), + "motion/end": (EventType.MOTION_END, Severity.INFO), + "error": (EventType.CAMERA_ERROR, Severity.ALERT), +} + + +class EventProcessor: + """Main event processing loop — runs as a subprocess.""" + + def __init__(self, config: VigilarConfig): + self._config = config + self._shutdown = False + + def run(self) -> None: + """Entry point for the subprocess.""" + logging.basicConfig( + level=getattr(logging, self._config.system.log_level, logging.INFO), + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + ) + + # Setup signal handling + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + # Init DB + db_path = get_db_path(self._config.system.data_dir) + engine = init_db(db_path) + + # Init components + fsm = ArmStateFSM(engine, self._config) + rule_engine = RuleEngine(self._config) + + # Init MQTT + bus = MessageBus(self._config.mqtt, client_id="vigilar-event-processor") + fsm.set_bus(bus) + bus.connect() + + # Subscribe to all Vigilar topics + def on_message(topic: str, payload: dict[str, Any]) -> None: + self._handle_event(topic, payload, engine, fsm, rule_engine, bus) + + bus.subscribe_all(on_message) + + log.info("Event processor started") + + # Main loop + try: + while not self._shutdown: + time.sleep(0.5) + except KeyboardInterrupt: + pass + finally: + bus.disconnect() + log.info("Event processor stopped") + + def _handle_signal(self, signum: int, frame: Any) -> None: + self._shutdown = True + + def _handle_event( + self, + topic: str, + payload: dict[str, Any], + engine: Engine, + fsm: ArmStateFSM, + rule_engine: RuleEngine, + bus: MessageBus, + ) -> None: + """Process a single MQTT message.""" + try: + event_type, severity, source_id = self._classify_event(topic, payload) + if event_type is None: + return + + # Insert event to DB + event_id = insert_event( + engine, + event_type=event_type, + severity=severity, + source_id=source_id, + payload=payload, + ) + + # Evaluate rules + actions = rule_engine.evaluate(topic, payload, fsm.state) + + # Execute actions + for action in actions: + self._execute_action(action, event_id, bus, payload) + + except Exception: + log.exception("Error processing event on %s", topic) + + def _classify_event( + self, topic: str, payload: dict[str, Any] + ) -> tuple[str | None, str | None, str | None]: + """Classify an MQTT message into event type, severity, and source ID.""" + parts = topic.split("/") + if len(parts) < 3: + return None, None, None + + # vigilar/camera/{id}/motion/start + if parts[1] == "camera" and len(parts) >= 4: + camera_id = parts[2] + suffix = "/".join(parts[3:]) + if suffix in _TOPIC_EVENT_MAP: + etype, sev = _TOPIC_EVENT_MAP[suffix] + return etype, sev, camera_id + # Ignore heartbeats etc. + return None, None, None + + # vigilar/sensor/{id}/{event_type} + if parts[1] == "sensor" and len(parts) >= 4: + sensor_id = parts[2] + sensor_event = parts[3].upper() + if sensor_event in EventType.__members__: + etype = EventType(sensor_event) + elif sensor_event == "CONTACT_OPEN": + etype = EventType.CONTACT_OPEN + elif sensor_event == "CONTACT_CLOSED": + etype = EventType.CONTACT_CLOSED + else: + etype = EventType.CONTACT_OPEN # fallback for unknown sensor events + sev = Severity.WARNING if etype in ( + EventType.CONTACT_OPEN, EventType.MOTION_START + ) else Severity.INFO + return etype, sev, sensor_id + + # vigilar/ups/* + if parts[1] == "ups": + if topic == Topics.UPS_POWER_LOSS: + return EventType.POWER_LOSS, Severity.CRITICAL, "ups" + elif topic == Topics.UPS_LOW_BATTERY: + return EventType.LOW_BATTERY, Severity.CRITICAL, "ups" + elif topic == Topics.UPS_RESTORED: + return EventType.POWER_RESTORED, Severity.INFO, "ups" + return None, None, None + + # vigilar/system/* — ignore (we produce these, not consume) + return None, None, None + + def _execute_action( + self, + action: str, + event_id: int, + bus: MessageBus, + payload: dict[str, Any], + ) -> None: + """Execute a rule action.""" + log.info("Executing action: %s (event_id=%d)", action, event_id) + + if action == "alert_all": + bus.publish(Topics.SYSTEM_ALERT, { + "ts": int(time.time() * 1000), + "event_id": event_id, + "type": "alert", + "payload": payload, + }) + elif action == "record_all_cameras": + # Publish a command for each configured camera to start recording + for cam in self._config.cameras: + bus.publish(f"vigilar/camera/{cam.id}/command/record", { + "ts": int(time.time() * 1000), + "event_id": event_id, + "action": "start_recording", + }) + else: + log.warning("Unknown action: %s", action) diff --git a/vigilar/events/rules.py b/vigilar/events/rules.py new file mode 100644 index 0000000..c34a47c --- /dev/null +++ b/vigilar/events/rules.py @@ -0,0 +1,127 @@ +"""Rule engine for evaluating event-driven automation rules.""" + +import logging +import time +from datetime import datetime, timezone + +from vigilar.config import RuleCondition, RuleConfig, VigilarConfig +from vigilar.constants import ArmState + +log = logging.getLogger(__name__) + + +class RuleEngine: + """Evaluates configured rules against incoming events.""" + + def __init__(self, config: VigilarConfig): + self._rules = config.rules + self._cooldowns: dict[str, float] = {} # rule_id -> last_fired_time + + def _check_cooldown(self, rule: RuleConfig) -> bool: + """Return True if the rule is NOT in cooldown (i.e., can fire).""" + last = self._cooldowns.get(rule.id, 0.0) + return (time.monotonic() - last) >= rule.cooldown_s + + def _record_fire(self, rule: RuleConfig) -> None: + self._cooldowns[rule.id] = time.monotonic() + + def _eval_condition( + self, + cond: RuleCondition, + event_topic: str, + event_payload: dict, + current_arm_state: ArmState, + ) -> bool: + """Evaluate a single condition against the current event context.""" + if cond.type == "arm_state": + return current_arm_state.value == cond.value + + elif cond.type == "sensor_event": + # Match topic pattern: vigilar/sensor/{sensor_id}/{event} + parts = event_topic.split("/") + if len(parts) >= 4 and parts[1] == "sensor": + topic_sensor_id = parts[2] + topic_event = parts[3] + sensor_match = (not cond.sensor_id) or (cond.sensor_id == topic_sensor_id) + event_match = (not cond.event) or (cond.event == topic_event) + return sensor_match and event_match + return False + + elif cond.type == "camera_motion": + # Match topic pattern: vigilar/camera/{camera_id}/motion/start + parts = event_topic.split("/") + if len(parts) >= 4 and parts[1] == "camera" and "motion" in parts: + if cond.value: + # Specific camera + return parts[2] == cond.value + return True # Any camera + return False + + elif cond.type == "time_window": + # Value format: "HH:MM-HH:MM" + return _in_time_window(cond.value) + + log.warning("Unknown condition type: %s", cond.type) + return False + + def evaluate( + self, + event_topic: str, + event_payload: dict, + current_arm_state: ArmState, + ) -> list[str]: + """Evaluate all rules against an event. Returns list of action strings to fire.""" + actions: list[str] = [] + + for rule in self._rules: + if not self._check_cooldown(rule): + continue + + if not rule.conditions: + continue + + results = [ + self._eval_condition(c, event_topic, event_payload, current_arm_state) + for c in rule.conditions + ] + + if rule.logic == "AND": + matched = all(results) + elif rule.logic == "OR": + matched = any(results) + else: + log.warning("Unknown rule logic: %s", rule.logic) + matched = False + + if matched: + log.info("Rule matched: %s -> actions: %s", rule.id, rule.actions) + self._record_fire(rule) + actions.extend(rule.actions) + + return actions + + +def _in_time_window(window: str) -> bool: + """Check if current time falls within a 'HH:MM-HH:MM' window. + + Handles overnight windows (e.g., '22:00-06:00'). + """ + try: + start_str, end_str = window.split("-") + start_h, start_m = map(int, start_str.strip().split(":")) + end_h, end_m = map(int, end_str.strip().split(":")) + except (ValueError, AttributeError): + log.warning("Invalid time_window format: %s", window) + return False + + now = datetime.now(timezone.utc) + current_minutes = now.hour * 60 + now.minute + start_minutes = start_h * 60 + start_m + end_minutes = end_h * 60 + end_m + + if start_minutes <= end_minutes: + # Normal window (e.g., 08:00-17:00) + return start_minutes <= current_minutes < end_minutes + else: + # Overnight window (e.g., 22:00-06:00) + return current_minutes >= start_minutes or current_minutes < end_minutes diff --git a/vigilar/events/state.py b/vigilar/events/state.py new file mode 100644 index 0000000..dad5b1f --- /dev/null +++ b/vigilar/events/state.py @@ -0,0 +1,94 @@ +"""Arm state finite state machine.""" + +import hashlib +import hmac +import logging +import time + +from sqlalchemy.engine import Engine + +from vigilar.config import VigilarConfig +from vigilar.constants import ArmState, EventType, Severity, Topics +from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event + +log = logging.getLogger(__name__) + + +class ArmStateFSM: + """Manages DISARMED / ARMED_HOME / ARMED_AWAY state transitions.""" + + def __init__(self, engine: Engine, config: VigilarConfig): + self._engine = engine + self._pin_hash = config.system.arm_pin_hash + self._state = ArmState.DISARMED + self._bus = None + self._load_initial_state() + + def _load_initial_state(self) -> None: + """Load the most recent arm state from the database.""" + stored = get_current_arm_state(self._engine) + if stored and stored in ArmState.__members__: + self._state = ArmState(stored) + log.info("Loaded arm state from DB: %s", self._state) + else: + self._state = ArmState.DISARMED + log.info("No stored arm state, defaulting to DISARMED") + + def set_bus(self, bus: object) -> None: + """Attach a MessageBus for publishing state changes.""" + self._bus = bus + + @property + def state(self) -> ArmState: + return self._state + + def verify_pin(self, pin: str) -> bool: + """Verify a PIN against the stored hash using HMAC comparison.""" + if not self._pin_hash: + # No PIN configured — allow all transitions + return True + candidate = hashlib.sha256(pin.encode()).hexdigest() + return hmac.compare_digest(candidate, self._pin_hash) + + def transition( + self, + new_state: ArmState, + pin: str = "", + triggered_by: str = "system", + ) -> bool: + """Attempt a state transition. Returns True if successful.""" + if new_state == self._state: + return True + + # PIN required for arming from disarmed or disarming from armed + if self._pin_hash and not self.verify_pin(pin): + log.warning("Arm state change rejected: bad PIN (by %s)", triggered_by) + return False + + old_state = self._state + self._state = new_state + + # Log to database + pin_hash = hashlib.sha256(pin.encode()).hexdigest() if pin else None + insert_arm_state(self._engine, new_state.value, triggered_by, pin_hash) + + # Log event + insert_event( + self._engine, + event_type=EventType.ARM_STATE_CHANGED, + severity=Severity.INFO, + source_id="system", + payload={"old_state": old_state.value, "new_state": new_state.value}, + ) + + # Publish to MQTT + if self._bus is not None: + self._bus.publish(Topics.SYSTEM_ARM_STATE, { + "ts": int(time.time() * 1000), + "state": new_state.value, + "old_state": old_state.value, + "triggered_by": triggered_by, + }) + + log.info("Arm state: %s -> %s (by %s)", old_state, new_state, triggered_by) + return True diff --git a/vigilar/main.py b/vigilar/main.py index 740d324..47dc03a 100644 --- a/vigilar/main.py +++ b/vigilar/main.py @@ -72,6 +72,22 @@ def _run_web(cfg: VigilarConfig) -> None: app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False) +def _run_event_processor(cfg: VigilarConfig) -> None: + """Run the event processor in a subprocess.""" + from vigilar.events.processor import EventProcessor + + processor = EventProcessor(cfg) + processor.run() + + +def _run_ups_monitor(cfg: VigilarConfig) -> None: + """Run the UPS monitor in a subprocess.""" + from vigilar.ups.monitor import UPSMonitor + + monitor = UPSMonitor(cfg) + monitor.run() + + def run_supervisor(cfg: VigilarConfig) -> None: """Main supervisor loop — starts all subsystems, monitors, restarts on crash.""" # Initialize database @@ -93,9 +109,17 @@ def run_supervisor(cfg: VigilarConfig) -> None: from vigilar.camera.manager import CameraManager camera_mgr = CameraManager(cfg) if cfg.cameras else None - # TODO: Phase 6 — Event processor - # TODO: Phase 7 — Sensor bridge - # TODO: Phase 8 — UPS monitor + # Event processor + subsystems.append(SubsystemProcess("event-processor", _run_event_processor, (cfg,))) + + # Phase 7 — Sensor bridge + if cfg.sensors: + from vigilar.sensors.bridge import run_sensor_bridge + subsystems.append(SubsystemProcess("sensor-bridge", run_sensor_bridge, (cfg,))) + + # Phase 8 — UPS monitor + if cfg.ups.enabled: + subsystems.append(SubsystemProcess("ups-monitor", _run_ups_monitor, (cfg,))) # Handle signals for clean shutdown shutdown_requested = False diff --git a/vigilar/sensors/__init__.py b/vigilar/sensors/__init__.py index e69de29..2da48b6 100644 --- a/vigilar/sensors/__init__.py +++ b/vigilar/sensors/__init__.py @@ -0,0 +1 @@ +"""Sensor bridge subsystem — Zigbee2MQTT and GPIO integration.""" diff --git a/vigilar/sensors/bridge.py b/vigilar/sensors/bridge.py new file mode 100644 index 0000000..094d85d --- /dev/null +++ b/vigilar/sensors/bridge.py @@ -0,0 +1,201 @@ +"""Sensor Bridge — subprocess that bridges Zigbee2MQTT to the Vigilar internal bus.""" + +from __future__ import annotations + +import logging +import signal +import time +from typing import Any + +from vigilar.bus import MessageBus +from vigilar.config import MQTTConfig, VigilarConfig +from vigilar.constants import EventType, SensorProtocol, Topics +from vigilar.sensors.models import SensorEvent +from vigilar.sensors.registry import SensorRegistry +from vigilar.storage.db import get_db_path, get_engine +from vigilar.storage.queries import upsert_sensor_state + +log = logging.getLogger(__name__) + + +def normalize_zigbee_payload( + raw: dict[str, Any], +) -> list[dict[str, Any]]: + """Normalize a Zigbee2MQTT payload into a list of internal event dicts. + + Each dict has keys: event_type, and one of (state, value, pct, unit). + """ + results: list[dict[str, Any]] = [] + ts = int(time.time() * 1000) + + # Contact sensor + if "contact" in raw: + contact = raw["contact"] + state = "CLOSED" if contact else "OPEN" + event_type = EventType.CONTACT_CLOSED if contact else EventType.CONTACT_OPEN + results.append({ + "event_type": event_type, + "state": state, + "ts": ts, + "source": "zigbee", + }) + + # Motion / occupancy sensor + if "occupancy" in raw: + occupancy = raw["occupancy"] + state = "ACTIVE" if occupancy else "IDLE" + event_type = EventType.MOTION_START if occupancy else EventType.MOTION_END + results.append({ + "event_type": event_type, + "state": state, + "ts": ts, + "source": "zigbee", + }) + + # Temperature + if "temperature" in raw: + results.append({ + "event_type": "temperature", + "value": float(raw["temperature"]), + "unit": "C", + "ts": ts, + "source": "zigbee", + }) + + # Humidity + if "humidity" in raw: + results.append({ + "event_type": "humidity", + "value": float(raw["humidity"]), + "ts": ts, + "source": "zigbee", + }) + + # Battery + if "battery" in raw: + results.append({ + "event_type": "battery", + "pct": int(raw["battery"]), + "ts": ts, + "source": "zigbee", + }) + + # Link quality / signal strength + if "linkquality" in raw: + results.append({ + "event_type": "linkquality", + "value": int(raw["linkquality"]), + "ts": ts, + "source": "zigbee", + }) + + return results + + +class SensorBridge: + """Bridges Zigbee2MQTT messages to the internal Vigilar MQTT bus.""" + + def __init__(self, config: VigilarConfig) -> None: + self._config = config + self._registry = SensorRegistry(config) + self._z2m_prefix = config.zigbee2mqtt.mqtt_topic_prefix + self._bus = MessageBus(config.mqtt, client_id="vigilar-sensor-bridge") + self._engine = get_engine(get_db_path(config.system.data_dir)) + self._running = False + + def start(self) -> None: + """Connect to MQTT and start processing sensor messages.""" + self._bus.connect() + self._running = True + + # Subscribe to zigbee2mqtt topics + z2m_wildcard = f"{self._z2m_prefix}/#" + self._bus.subscribe(z2m_wildcard, self._handle_z2m_message) + + log.info("Sensor bridge started, subscribed to %s", z2m_wildcard) + + def stop(self) -> None: + self._running = False + self._bus.disconnect() + log.info("Sensor bridge stopped") + + def _handle_z2m_message(self, topic: str, payload: dict[str, Any]) -> None: + """Handle a Zigbee2MQTT message, normalize and republish.""" + # Topic format: zigbee2mqtt/ + # Ignore bridge status messages like zigbee2mqtt/bridge/... + parts = topic.split("/") + if len(parts) < 2: + return + # Skip bridge info topics + if parts[1] == "bridge": + return + + friendly_name = parts[1] + sensor_cfg = self._registry.get_sensor_by_zigbee_name(friendly_name) + if sensor_cfg is None: + log.debug("Unknown zigbee device: %s", friendly_name) + return + + normalized = normalize_zigbee_payload(payload) + for event_data in normalized: + event_type = event_data["event_type"] + + # Create SensorEvent + event = SensorEvent( + sensor_id=sensor_cfg.id, + event_type=event_type, + state=event_data.get("state"), + value=event_data.get("value"), + timestamp=event_data["ts"], + source_protocol=SensorProtocol.ZIGBEE, + ) + + # Publish to internal bus + topic_out = Topics.sensor_event(sensor_cfg.id, event_type) + self._bus.publish(topic_out, event_data) + + # Update DB state + self._persist_state(sensor_cfg.id, event_data) + + def _persist_state(self, sensor_id: str, event_data: dict[str, Any]) -> None: + """Write normalized event data to sensor_states table.""" + try: + event_type = event_data["event_type"] + + if "state" in event_data: + upsert_sensor_state(self._engine, sensor_id, event_type, event_data["state"]) + elif "value" in event_data: + upsert_sensor_state(self._engine, sensor_id, event_type, event_data["value"]) + elif "pct" in event_data: + upsert_sensor_state(self._engine, sensor_id, "battery", event_data["pct"]) + + # Always update last_seen + upsert_sensor_state(self._engine, sensor_id, "last_seen", event_data["ts"]) + except Exception: + log.exception("Failed to persist state for %s", sensor_id) + + +def run_sensor_bridge(cfg: VigilarConfig) -> None: + """Subprocess entry point for the sensor bridge.""" + logging.basicConfig(level=getattr(logging, cfg.system.log_level, logging.INFO)) + + bridge = SensorBridge(cfg) + bridge.start() + + # Block until signal + shutdown = False + + def handle_signal(signum: int, frame: Any) -> None: + nonlocal shutdown + shutdown = True + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + try: + while not shutdown: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + bridge.stop() diff --git a/vigilar/sensors/gpio_handler.py b/vigilar/sensors/gpio_handler.py new file mode 100644 index 0000000..9393bb9 --- /dev/null +++ b/vigilar/sensors/gpio_handler.py @@ -0,0 +1,134 @@ +"""GPIO sensor handler — reads physical GPIO pins for wired sensors.""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +from vigilar.bus import MessageBus +from vigilar.config import SensorConfig, SensorGPIOConfig, VigilarConfig +from vigilar.constants import EventType, SensorProtocol, Topics +from vigilar.sensors.models import SensorEvent +from vigilar.sensors.registry import SensorRegistry +from vigilar.storage.db import get_db_path, get_engine +from vigilar.storage.queries import upsert_sensor_state + +log = logging.getLogger(__name__) + +# Conditional import — gpiozero is only available on Raspberry Pi +try: + from gpiozero import Button, MotionSensor # type: ignore[import-untyped] + + GPIO_AVAILABLE = True +except ImportError: + GPIO_AVAILABLE = False + log.info("gpiozero not available — GPIO sensors disabled") + + +class GPIOHandler: + """Sets up GPIO pin callbacks for wired sensors.""" + + def __init__(self, config: VigilarConfig) -> None: + self._config = config + self._gpio_config = config.sensor_gpio + self._registry = SensorRegistry(config) + self._bus = MessageBus(config.mqtt, client_id="vigilar-gpio-handler") + self._engine = get_engine(get_db_path(config.system.data_dir)) + self._devices: list[Any] = [] + + def start(self) -> None: + """Initialize GPIO pins and set up callbacks.""" + if not GPIO_AVAILABLE: + log.warning("GPIO not available, skipping GPIO sensor setup") + return + + self._bus.connect() + gpio_sensors = self._registry.gpio_sensors() + + if not gpio_sensors: + log.info("No GPIO sensors configured") + return + + for sensor_cfg in gpio_sensors: + self._setup_sensor(sensor_cfg) + + log.info("GPIO handler started with %d sensors", len(gpio_sensors)) + + def stop(self) -> None: + """Clean up GPIO resources.""" + for device in self._devices: + try: + device.close() + except Exception: + pass + self._devices.clear() + self._bus.disconnect() + log.info("GPIO handler stopped") + + def _setup_sensor(self, sensor_cfg: SensorConfig) -> None: + """Set up a single GPIO sensor with callbacks.""" + pin = int(sensor_cfg.device_address) + bounce_s = self._gpio_config.bounce_time_ms / 1000.0 + sensor_type = sensor_cfg.type.upper() + + if sensor_type == "CONTACT": + device = Button(pin, bounce_time=bounce_s) + device.when_pressed = lambda s=sensor_cfg: self._on_contact_closed(s) + device.when_released = lambda s=sensor_cfg: self._on_contact_open(s) + self._devices.append(device) + log.info("GPIO contact sensor on pin %d: %s", pin, sensor_cfg.id) + + elif sensor_type == "MOTION": + device = MotionSensor(pin) + device.when_motion = lambda s=sensor_cfg: self._on_motion_start(s) + device.when_no_motion = lambda s=sensor_cfg: self._on_motion_end(s) + self._devices.append(device) + log.info("GPIO motion sensor on pin %d: %s", pin, sensor_cfg.id) + + else: + log.warning("Unsupported GPIO sensor type %s for %s", sensor_type, sensor_cfg.id) + + def _publish_event(self, sensor_cfg: SensorConfig, event_type: str, state: str) -> None: + """Publish a GPIO sensor event to the internal bus and persist state.""" + ts = int(time.time() * 1000) + event = SensorEvent( + sensor_id=sensor_cfg.id, + event_type=event_type, + state=state, + timestamp=ts, + source_protocol=SensorProtocol.GPIO, + ) + + payload = { + "event_type": event_type, + "state": state, + "ts": ts, + "source": "gpio", + } + + topic = Topics.sensor_event(sensor_cfg.id, event_type) + self._bus.publish(topic, payload) + + # Persist to DB + try: + upsert_sensor_state(self._engine, sensor_cfg.id, event_type, state) + upsert_sensor_state(self._engine, sensor_cfg.id, "last_seen", ts) + except Exception: + log.exception("Failed to persist GPIO state for %s", sensor_cfg.id) + + def _on_contact_open(self, sensor_cfg: SensorConfig) -> None: + log.info("Contact OPEN: %s", sensor_cfg.id) + self._publish_event(sensor_cfg, EventType.CONTACT_OPEN, "OPEN") + + def _on_contact_closed(self, sensor_cfg: SensorConfig) -> None: + log.info("Contact CLOSED: %s", sensor_cfg.id) + self._publish_event(sensor_cfg, EventType.CONTACT_CLOSED, "CLOSED") + + def _on_motion_start(self, sensor_cfg: SensorConfig) -> None: + log.info("Motion START: %s", sensor_cfg.id) + self._publish_event(sensor_cfg, EventType.MOTION_START, "ACTIVE") + + def _on_motion_end(self, sensor_cfg: SensorConfig) -> None: + log.info("Motion END: %s", sensor_cfg.id) + self._publish_event(sensor_cfg, EventType.MOTION_END, "IDLE") diff --git a/vigilar/sensors/models.py b/vigilar/sensors/models.py new file mode 100644 index 0000000..d57e071 --- /dev/null +++ b/vigilar/sensors/models.py @@ -0,0 +1,52 @@ +"""Sensor data models for the Vigilar sensor bridge.""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class SensorEvent: + """A single event from a sensor (contact open, motion detected, etc.).""" + + sensor_id: str + event_type: str + state: str | None = None + value: float | None = None + timestamp: int = field(default_factory=lambda: int(time.time() * 1000)) + source_protocol: str = "" + + def to_dict(self) -> dict[str, Any]: + d: dict[str, Any] = { + "sensor_id": self.sensor_id, + "event_type": self.event_type, + "ts": self.timestamp, + "source": self.source_protocol, + } + if self.state is not None: + d["state"] = self.state + if self.value is not None: + d["value"] = self.value + return d + + +@dataclass +class SensorState: + """Aggregated current state of a sensor.""" + + sensor_id: str + states: dict[str, Any] = field(default_factory=dict) + last_seen: int = 0 + battery_pct: int | None = None + signal_strength: int | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "sensor_id": self.sensor_id, + "states": self.states, + "last_seen": self.last_seen, + "battery_pct": self.battery_pct, + "signal_strength": self.signal_strength, + } diff --git a/vigilar/sensors/registry.py b/vigilar/sensors/registry.py new file mode 100644 index 0000000..066f52b --- /dev/null +++ b/vigilar/sensors/registry.py @@ -0,0 +1,52 @@ +"""Sensor registry — maps external IDs/names to internal sensor configs.""" + +from __future__ import annotations + +import logging + +from vigilar.config import SensorConfig, VigilarConfig + +log = logging.getLogger(__name__) + + +class SensorRegistry: + """Loads sensor configs and provides lookup by address or Zigbee name.""" + + def __init__(self, config: VigilarConfig) -> None: + self._by_id: dict[str, SensorConfig] = {} + self._by_address: dict[str, SensorConfig] = {} + self._by_zigbee_name: dict[str, SensorConfig] = {} + + for sensor in config.sensors: + if not sensor.enabled: + continue + self._by_id[sensor.id] = sensor + if sensor.device_address: + self._by_address[sensor.device_address] = sensor + # For Zigbee sensors, the device_address is the friendly name + if sensor.protocol.upper() == "ZIGBEE": + self._by_zigbee_name[sensor.device_address] = sensor + + log.info( + "Sensor registry loaded: %d sensors (%d zigbee)", + len(self._by_id), + len(self._by_zigbee_name), + ) + + def get_sensor(self, sensor_id: str) -> SensorConfig | None: + return self._by_id.get(sensor_id) + + def get_sensor_by_address(self, address: str) -> SensorConfig | None: + return self._by_address.get(address) + + def get_sensor_by_zigbee_name(self, friendly_name: str) -> SensorConfig | None: + return self._by_zigbee_name.get(friendly_name) + + def all_sensors(self) -> list[SensorConfig]: + return list(self._by_id.values()) + + def zigbee_sensors(self) -> list[SensorConfig]: + return [s for s in self._by_id.values() if s.protocol.upper() == "ZIGBEE"] + + def gpio_sensors(self) -> list[SensorConfig]: + return [s for s in self._by_id.values() if s.protocol.upper() == "GPIO"] diff --git a/vigilar/ups/monitor.py b/vigilar/ups/monitor.py new file mode 100644 index 0000000..e855fa5 --- /dev/null +++ b/vigilar/ups/monitor.py @@ -0,0 +1,154 @@ +"""UPS monitor — polls NUT daemon and publishes power status to MQTT.""" + +import logging +import time +from typing import Any + +from vigilar.bus import MessageBus +from vigilar.config import VigilarConfig +from vigilar.constants import EventType, Severity, Topics, UPSStatus +from vigilar.storage.db import get_db_path, get_engine +from vigilar.storage.queries import insert_event, insert_system_event + +log = logging.getLogger(__name__) + +try: + from PyNUT import PyNUTClient # pynut2 + HAS_PYNUT = True +except ImportError: + HAS_PYNUT = False + PyNUTClient = None # type: ignore[assignment,misc] + + +def _parse_status(raw: str) -> UPSStatus: + """Parse the NUT ups.status string into a UPSStatus enum value.""" + tokens = raw.strip().upper().split() + if "LB" in tokens: + return UPSStatus.LOW_BATTERY + if "OB" in tokens: + return UPSStatus.ON_BATTERY + if "OL" in tokens: + return UPSStatus.ONLINE + return UPSStatus.UNKNOWN + + +def _safe_float(val: Any, default: float = 0.0) -> float: + try: + return float(val) + except (TypeError, ValueError): + return default + + +class UPSMonitor: + """Polls a NUT server for UPS data and publishes events on MQTT.""" + + def __init__(self, cfg: VigilarConfig) -> None: + self._cfg = cfg + self._ups_cfg = cfg.ups + self._prev_status: UPSStatus = UPSStatus.UNKNOWN + self._shutdown_triggered = False + + def run(self) -> None: + """Subprocess entry-point — connects to MQTT and NUT, then loops.""" + logging.basicConfig(level=getattr(logging, self._cfg.system.log_level, logging.INFO)) + + if not HAS_PYNUT: + log.error("pynut2 is not installed — UPS monitoring disabled") + return + + bus = MessageBus(self._cfg.mqtt, client_id="vigilar-ups") + bus.connect() + + engine = get_engine(get_db_path(self._cfg.system.data_dir)) + + backoff = 1 + max_backoff = 120 + + while True: + try: + client = PyNUTClient( + host=self._ups_cfg.nut_host, + port=self._ups_cfg.nut_port, + ) + log.info("Connected to NUT at %s:%d", self._ups_cfg.nut_host, self._ups_cfg.nut_port) + backoff = 1 # reset on success + + while True: + self._poll_once(client, bus, engine) + time.sleep(self._ups_cfg.poll_interval_s) + + except Exception: + log.warning( + "NUT unavailable, retrying in %ds", backoff, exc_info=True + ) + time.sleep(backoff) + backoff = min(backoff * 2, max_backoff) + + def _poll_once(self, client: Any, bus: MessageBus, engine: Any) -> None: + """Read UPS vars, detect transitions, publish status.""" + ups_name = self._ups_cfg.ups_name + raw_vars = client.GetUPSVars(ups_name) + + # Decode bytes values if needed (pynut2 may return bytes) + vars_dict: dict[str, str] = {} + for k, v in raw_vars.items(): + key = k.decode() if isinstance(k, bytes) else k + val = v.decode() if isinstance(v, bytes) else v + vars_dict[key] = val + + status_str = vars_dict.get("ups.status", "UNKNOWN") + status = _parse_status(status_str) + charge = _safe_float(vars_dict.get("battery.charge", 100)) + runtime = _safe_float(vars_dict.get("battery.runtime", 9999)) + input_voltage = _safe_float(vars_dict.get("input.voltage", 0)) + load = _safe_float(vars_dict.get("ups.load", 0)) + + payload = { + "status": str(status), + "battery_charge_pct": charge, + "battery_runtime_s": runtime, + "input_voltage": input_voltage, + "ups_load_pct": load, + } + + # Always publish full status + bus.publish_event(Topics.UPS_STATUS, **payload) + + # --- State transitions --- + prev = self._prev_status + + # OL -> OB: power loss + if prev == UPSStatus.ONLINE and status in (UPSStatus.ON_BATTERY, UPSStatus.LOW_BATTERY): + bus.publish_event(Topics.UPS_POWER_LOSS, **payload) + insert_event(engine, EventType.POWER_LOSS, Severity.ALERT, source_id="ups", payload=payload) + insert_system_event(engine, "ups", "ALERT", "Power loss detected — running on battery") + log.warning("Power loss detected") + + # OB -> OL: restored + if prev in (UPSStatus.ON_BATTERY, UPSStatus.LOW_BATTERY) and status == UPSStatus.ONLINE: + bus.publish_event(Topics.UPS_RESTORED, **payload) + insert_event(engine, EventType.POWER_RESTORED, Severity.INFO, source_id="ups", payload=payload) + insert_system_event(engine, "ups", "INFO", "Power restored — back on mains") + log.info("Power restored") + + # Low battery threshold + if status != UPSStatus.ONLINE and charge < self._ups_cfg.low_battery_threshold_pct: + bus.publish_event(Topics.UPS_LOW_BATTERY, **payload) + insert_event(engine, EventType.LOW_BATTERY, Severity.WARNING, source_id="ups", payload=payload) + log.warning("Low battery: %.0f%%", charge) + + # Critical runtime — trigger shutdown + if ( + status != UPSStatus.ONLINE + and runtime < self._ups_cfg.critical_runtime_threshold_s + and not self._shutdown_triggered + ): + bus.publish_event(Topics.UPS_CRITICAL, **payload) + insert_system_event(engine, "ups", "CRITICAL", f"Critical runtime ({runtime:.0f}s) — initiating shutdown") + log.critical("Critical runtime %.0fs — starting shutdown", runtime) + self._shutdown_triggered = True + + from vigilar.ups.shutdown import ShutdownSequence + ShutdownSequence(self._ups_cfg).execute(bus, engine, reason="critical_runtime") + + self._prev_status = status diff --git a/vigilar/ups/shutdown.py b/vigilar/ups/shutdown.py new file mode 100644 index 0000000..4e0b506 --- /dev/null +++ b/vigilar/ups/shutdown.py @@ -0,0 +1,46 @@ +"""Graceful shutdown sequence triggered by UPS critical state.""" + +import logging +import os +import time +from typing import Any + +from vigilar.bus import MessageBus +from vigilar.config import UPSConfig +from vigilar.constants import EventType, Severity, Topics +from vigilar.storage.queries import insert_event, insert_system_event + +log = logging.getLogger(__name__) + + +class ShutdownSequence: + """Coordinates a clean shutdown when UPS reports critical battery.""" + + def __init__(self, ups_cfg: UPSConfig, shutdown_command: str = "shutdown -h now") -> None: + self._delay = ups_cfg.shutdown_delay_s + self._command = shutdown_command + + def execute(self, bus: MessageBus, engine: Any, reason: str = "ups_critical") -> None: + """Run the shutdown sequence: notify, wait, halt.""" + log.warning("Shutdown sequence started (reason=%s, delay=%ds)", reason, self._delay) + + # 1. Publish shutdown event + bus.publish_event(Topics.SYSTEM_SHUTDOWN, reason=reason, delay_s=self._delay) + + # 2. Log to database + insert_system_event(engine, "ups", "CRITICAL", f"Shutdown initiated: {reason}") + insert_event( + engine, + EventType.SYSTEM_SHUTDOWN, + Severity.CRITICAL, + source_id="ups", + payload={"reason": reason}, + ) + + # 3. Wait for other subsystems to wind down + log.info("Waiting %ds before system halt...", self._delay) + time.sleep(self._delay) + + # 4. Halt the machine + log.warning("Executing: %s", self._command) + os.system(self._command) diff --git a/vigilar/web/blueprints/events.py b/vigilar/web/blueprints/events.py index aae7d57..b347bad 100644 --- a/vigilar/web/blueprints/events.py +++ b/vigilar/web/blueprints/events.py @@ -1,31 +1,108 @@ -"""Events blueprint — event log, SSE stream.""" +"""Events blueprint — event log, SSE stream, acknowledgement.""" -from flask import Blueprint, Response, jsonify, render_template, request +import json +import queue +import threading +import time +from typing import Any + +from flask import Blueprint, Response, current_app, jsonify, render_template, request events_bp = Blueprint("events", __name__, url_prefix="/events") +# Global list of SSE subscriber queues +_sse_subscribers: list[queue.Queue] = [] +_sse_lock = threading.Lock() + + +def broadcast_sse_event(data: dict[str, Any]) -> None: + """Push an event dict to all connected SSE clients.""" + msg = f"data: {json.dumps(data, default=str)}\n\n" + with _sse_lock: + dead: list[queue.Queue] = [] + for q in _sse_subscribers: + try: + q.put_nowait(msg) + except queue.Full: + dead.append(q) + for q in dead: + _sse_subscribers.remove(q) + @events_bp.route("/") def event_list(): - # TODO: Phase 6 — query events from DB - return render_template("events.html", events=[]) + from vigilar.events.history import query_events + + engine = current_app.config.get("DB_ENGINE") + events = [] + if engine is not None: + events = query_events(engine, limit=200) + return render_template("events.html", events=events) @events_bp.route("/api/list") def events_api(): """JSON API: event list with filtering.""" + from vigilar.events.history import query_events + + engine = current_app.config.get("DB_ENGINE") + if engine is None: + return jsonify([]) + event_type = request.args.get("type") severity = request.args.get("severity") + source_id = request.args.get("source_id") limit = request.args.get("limit", 100, type=int) - # TODO: Phase 6 — query from DB - return jsonify([]) + offset = request.args.get("offset", 0, type=int) + + rows = query_events( + engine, + event_type=event_type or None, + severity=severity or None, + source_id=source_id or None, + limit=limit, + offset=offset, + ) + return jsonify(rows) @events_bp.route("/stream") def event_stream(): """SSE endpoint for live events.""" + q: queue.Queue = queue.Queue(maxsize=256) + with _sse_lock: + _sse_subscribers.append(q) + def generate(): - # TODO: Phase 6 — subscribe to MQTT events and yield SSE messages yield "data: {\"type\": \"connected\"}\n\n" + try: + while True: + try: + msg = q.get(timeout=30) + yield msg + except queue.Empty: + # Send keepalive comment + yield ": keepalive\n\n" + except GeneratorExit: + pass + finally: + with _sse_lock: + if q in _sse_subscribers: + _sse_subscribers.remove(q) return Response(generate(), mimetype="text/event-stream") + + +@events_bp.route("//acknowledge", methods=["POST"]) +def acknowledge(event_id: int): + """Acknowledge an event.""" + from vigilar.events.history import ack_event + + engine = current_app.config.get("DB_ENGINE") + if engine is None: + return jsonify({"error": "database not available"}), 503 + + success = ack_event(engine, event_id) + if success: + return jsonify({"status": "ok", "event_id": event_id}) + return jsonify({"error": "event not found"}), 404 diff --git a/vigilar/web/blueprints/sensors.py b/vigilar/web/blueprints/sensors.py index c257b27..5c0f61c 100644 --- a/vigilar/web/blueprints/sensors.py +++ b/vigilar/web/blueprints/sensors.py @@ -2,15 +2,31 @@ from flask import Blueprint, current_app, jsonify, render_template +from vigilar.storage.queries import get_sensor_state + sensors_bp = Blueprint("sensors", __name__, url_prefix="/sensors") +def _get_engine(): + """Get the DB engine from the app config, if available.""" + return current_app.config.get("DB_ENGINE") + + @sensors_bp.route("/") def sensor_list(): cfg = current_app.config.get("VIGILAR_CONFIG") sensors = cfg.sensors if cfg else [] - # TODO: Phase 7 — merge with live state from DB - return render_template("sensors.html", sensors=sensors) + engine = _get_engine() + sensor_data = [] + for s in sensors: + state = {} + if engine is not None: + try: + state = get_sensor_state(engine, s.id) + except Exception: + pass + sensor_data.append({"config": s, "state": state}) + return render_template("sensors.html", sensors=sensor_data) @sensors_bp.route("/api/status") @@ -18,14 +34,21 @@ def sensors_status_api(): """JSON API: all sensor current states.""" cfg = current_app.config.get("VIGILAR_CONFIG") sensors = cfg.sensors if cfg else [] - return jsonify([ - { + engine = _get_engine() + result = [] + for s in sensors: + state = {} + if engine is not None: + try: + state = get_sensor_state(engine, s.id) + except Exception: + pass + result.append({ "id": s.id, "display_name": s.display_name, "type": s.type, "protocol": s.protocol, "location": s.location, - "state": {}, # TODO: get from DB - } - for s in sensors - ]) + "state": state, + }) + return jsonify(result)