Add events/rules engine, sensor bridge, and UPS monitor (Phases 6-8)

Phase 6 — Events + Rule Engine:
- EventProcessor subprocess: subscribes to all MQTT events, logs to DB,
  evaluates rules, fires alert actions
- ArmStateFSM: DISARMED/ARMED_HOME/ARMED_AWAY with PIN verification
  (HMAC-safe), DB persistence, MQTT state publishing
- RuleEngine: AND/OR logic, 4 condition types (arm_state, sensor_event,
  camera_motion, time_window), per-rule cooldown tracking
- SSE event stream with subscriber queue pattern and keepalive
- Event acknowledge endpoint

Phase 7 — Sensor Bridge:
- SensorBridge subprocess: subscribes to Zigbee2MQTT, normalizes payloads
  (contact, occupancy, temperature, humidity, battery, linkquality)
- GPIOHandler: conditional gpiozero import, callbacks for reed switches
  and PIR sensors
- SensorRegistry: maps Zigbee addresses and names to config sensor IDs
- SensorEvent/SensorState dataclasses
- Web UI now shows real sensor states from DB

Phase 8 — UPS Monitor:
- UPSMonitor subprocess: polls NUT via pynut2 with reconnect backoff
- State transition detection: OL→OB (power_loss), charge/runtime
  thresholds (low_battery, critical), OB→OL (restored)
- ShutdownSequence: ordered shutdown with configurable delay and command
- All conditionally imported (pynut2, gpiozero) for non-target platforms

Fixed test_db fixture to use isolated engines (no global singleton leak).
96 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-04-02 23:17:53 -04:00
parent 845a85d618
commit 10b0cf4d0e
20 changed files with 2149 additions and 26 deletions

329
tests/unit/test_events.py Normal file
View File

@@ -0,0 +1,329 @@
"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history."""
import hashlib
import time
import pytest
from vigilar.config import RuleCondition, RuleConfig, VigilarConfig
from vigilar.constants import ArmState, EventType, Severity
from vigilar.events.history import ack_event, query_events
from vigilar.events.rules import RuleEngine
from vigilar.events.state import ArmStateFSM
from vigilar.storage.queries import insert_event
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_config(rules=None, pin_hash=""):
return VigilarConfig(
system={"arm_pin_hash": pin_hash},
cameras=[],
sensors=[],
rules=rules or [],
)
def _pin_hash(pin: str) -> str:
return hashlib.sha256(pin.encode()).hexdigest()
# ---------------------------------------------------------------------------
# Rule Engine
# ---------------------------------------------------------------------------
class TestRuleEngine:
def test_and_logic_all_match(self):
rule = RuleConfig(
id="r1",
conditions=[
RuleCondition(type="arm_state", value="ARMED_AWAY"),
RuleCondition(type="camera_motion", value=""),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 123},
ArmState.ARMED_AWAY,
)
assert actions == ["alert_all"]
def test_and_logic_partial_match(self):
rule = RuleConfig(
id="r1",
conditions=[
RuleCondition(type="arm_state", value="ARMED_AWAY"),
RuleCondition(type="camera_motion", value=""),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# Arm state is DISARMED, so AND fails
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 123},
ArmState.DISARMED,
)
assert actions == []
def test_or_logic(self):
rule = RuleConfig(
id="r2",
conditions=[
RuleCondition(type="arm_state", value="ARMED_AWAY"),
RuleCondition(type="camera_motion", value=""),
],
logic="OR",
actions=["record_all_cameras"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# Only camera_motion matches, but OR logic means it fires
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 123},
ArmState.DISARMED,
)
assert actions == ["record_all_cameras"]
def test_cooldown_prevents_refire(self):
rule = RuleConfig(
id="r3",
conditions=[
RuleCondition(type="camera_motion", value=""),
],
logic="AND",
actions=["alert_all"],
cooldown_s=300,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# First evaluation fires
actions1 = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 1},
ArmState.ARMED_AWAY,
)
assert actions1 == ["alert_all"]
# Second evaluation within cooldown does NOT fire
actions2 = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 2},
ArmState.ARMED_AWAY,
)
assert actions2 == []
def test_sensor_event_condition(self):
rule = RuleConfig(
id="r4",
conditions=[
RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/sensor/door1/CONTACT_OPEN",
{"ts": 1},
ArmState.DISARMED,
)
assert actions == ["alert_all"]
def test_sensor_event_wrong_sensor(self):
rule = RuleConfig(
id="r5",
conditions=[
RuleCondition(type="sensor_event", sensor_id="door1", event="CONTACT_OPEN"),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/sensor/door2/CONTACT_OPEN",
{"ts": 1},
ArmState.DISARMED,
)
assert actions == []
def test_no_conditions_never_fires(self):
rule = RuleConfig(
id="r6",
conditions=[],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
actions = engine.evaluate(
"vigilar/camera/cam1/motion/start",
{"ts": 1},
ArmState.ARMED_AWAY,
)
assert actions == []
def test_specific_camera_motion(self):
rule = RuleConfig(
id="r7",
conditions=[
RuleCondition(type="camera_motion", value="cam2"),
],
logic="AND",
actions=["alert_all"],
cooldown_s=0,
)
cfg = _make_config(rules=[rule])
engine = RuleEngine(cfg)
# Wrong camera
assert engine.evaluate(
"vigilar/camera/cam1/motion/start", {}, ArmState.DISARMED
) == []
# Right camera
assert engine.evaluate(
"vigilar/camera/cam2/motion/start", {}, ArmState.DISARMED
) == ["alert_all"]
# ---------------------------------------------------------------------------
# Arm State FSM
# ---------------------------------------------------------------------------
class TestArmStateFSM:
def test_initial_state_disarmed(self, test_db):
cfg = _make_config()
fsm = ArmStateFSM(test_db, cfg)
assert fsm.state == ArmState.DISARMED
def test_transition_without_pin(self, test_db):
cfg = _make_config()
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.ARMED_HOME, triggered_by="test")
assert fsm.state == ArmState.ARMED_HOME
def test_transition_with_valid_pin(self, test_db):
pin = "1234"
cfg = _make_config(pin_hash=_pin_hash(pin))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.ARMED_AWAY, pin=pin, triggered_by="test")
assert fsm.state == ArmState.ARMED_AWAY
def test_transition_with_invalid_pin(self, test_db):
pin = "1234"
cfg = _make_config(pin_hash=_pin_hash(pin))
fsm = ArmStateFSM(test_db, cfg)
assert not fsm.transition(ArmState.ARMED_AWAY, pin="wrong", triggered_by="test")
assert fsm.state == ArmState.DISARMED
def test_transition_same_state_is_noop(self, test_db):
cfg = _make_config()
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.DISARMED, triggered_by="test")
assert fsm.state == ArmState.DISARMED
def test_state_persists_across_instances(self, test_db):
cfg = _make_config()
fsm1 = ArmStateFSM(test_db, cfg)
fsm1.transition(ArmState.ARMED_AWAY, triggered_by="test")
# New FSM instance should load from DB
fsm2 = ArmStateFSM(test_db, cfg)
assert fsm2.state == ArmState.ARMED_AWAY
def test_verify_pin(self, test_db):
pin = "5678"
cfg = _make_config(pin_hash=_pin_hash(pin))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.verify_pin(pin)
assert not fsm.verify_pin("0000")
# ---------------------------------------------------------------------------
# Event History
# ---------------------------------------------------------------------------
class TestEventHistory:
def test_insert_and_query(self, test_db):
event_id = insert_event(
test_db,
event_type=EventType.MOTION_START,
severity=Severity.WARNING,
source_id="cam1",
payload={"zone": "front"},
)
assert event_id > 0
rows = query_events(test_db)
assert len(rows) == 1
assert rows[0]["type"] == EventType.MOTION_START
assert rows[0]["source_id"] == "cam1"
def test_query_filter_by_type(self, test_db):
insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1")
insert_event(test_db, EventType.CONTACT_OPEN, Severity.WARNING, "door1")
rows = query_events(test_db, event_type=EventType.MOTION_START)
assert len(rows) == 1
assert rows[0]["type"] == EventType.MOTION_START
def test_query_filter_by_severity(self, test_db):
insert_event(test_db, EventType.MOTION_START, Severity.WARNING, "cam1")
insert_event(test_db, EventType.POWER_LOSS, Severity.CRITICAL, "ups")
rows = query_events(test_db, severity=Severity.CRITICAL)
assert len(rows) == 1
assert rows[0]["severity"] == Severity.CRITICAL
def test_acknowledge_event(self, test_db):
event_id = insert_event(
test_db, EventType.MOTION_START, Severity.WARNING, "cam1"
)
assert ack_event(test_db, event_id)
rows = query_events(test_db)
assert rows[0]["acknowledged"] == 1
assert rows[0]["ack_ts"] is not None
def test_acknowledge_nonexistent(self, test_db):
assert not ack_event(test_db, 99999)
def test_query_limit_and_offset(self, test_db):
for i in range(5):
insert_event(test_db, EventType.MOTION_START, Severity.INFO, f"cam{i}")
rows = query_events(test_db, limit=2)
assert len(rows) == 2
rows_offset = query_events(test_db, limit=2, offset=2)
assert len(rows_offset) == 2
# Should be different events
assert rows[0]["id"] != rows_offset[0]["id"]

View File

@@ -1,15 +1,17 @@
"""Tests for database schema creation."""
from vigilar.storage.db import init_db
from sqlalchemy import create_engine, inspect
from vigilar.storage.schema import metadata
def test_tables_created(tmp_path):
db_path = tmp_path / "test.db"
engine = init_db(db_path)
engine = create_engine(f"sqlite:///{db_path}", echo=False)
metadata.create_all(engine)
assert db_path.exists()
from sqlalchemy import inspect
inspector = inspect(engine)
table_names = inspector.get_table_names()

251
tests/unit/test_sensors.py Normal file
View File

@@ -0,0 +1,251 @@
"""Tests for sensor bridge subsystem — models, registry, payload normalization."""
import time
import pytest
from vigilar.config import SensorConfig, VigilarConfig
from vigilar.constants import EventType
from vigilar.sensors.bridge import normalize_zigbee_payload
from vigilar.sensors.models import SensorEvent, SensorState
from vigilar.sensors.registry import SensorRegistry
# --- Fixtures ---
@pytest.fixture
def zigbee_sensor_config():
return VigilarConfig(
sensors=[
SensorConfig(
id="front_door",
display_name="Front Door",
type="CONTACT",
protocol="ZIGBEE",
device_address="front_door_sensor",
location="Entrance",
),
SensorConfig(
id="hallway_motion",
display_name="Hallway Motion",
type="MOTION",
protocol="ZIGBEE",
device_address="hallway_pir",
location="Hallway",
),
SensorConfig(
id="living_room_temp",
display_name="Living Room Temp",
type="TEMPERATURE",
protocol="ZIGBEE",
device_address="lr_climate",
location="Living Room",
),
SensorConfig(
id="garage_reed",
display_name="Garage Reed",
type="CONTACT",
protocol="GPIO",
device_address="17",
location="Garage",
),
SensorConfig(
id="disabled_sensor",
display_name="Disabled",
type="CONTACT",
protocol="ZIGBEE",
device_address="disabled_one",
location="Attic",
enabled=False,
),
],
)
# --- Dataclass Tests ---
class TestSensorEvent:
def test_create_with_state(self):
event = SensorEvent(
sensor_id="front_door",
event_type=EventType.CONTACT_OPEN,
state="OPEN",
source_protocol="ZIGBEE",
)
assert event.sensor_id == "front_door"
assert event.event_type == EventType.CONTACT_OPEN
assert event.state == "OPEN"
assert event.value is None
assert event.timestamp > 0
def test_create_with_value(self):
event = SensorEvent(
sensor_id="living_room_temp",
event_type="temperature",
value=22.5,
source_protocol="ZIGBEE",
)
assert event.value == 22.5
assert event.state is None
def test_to_dict(self):
event = SensorEvent(
sensor_id="s1",
event_type="temperature",
value=18.0,
timestamp=1000,
source_protocol="ZIGBEE",
)
d = event.to_dict()
assert d["sensor_id"] == "s1"
assert d["value"] == 18.0
assert d["ts"] == 1000
assert "state" not in d
class TestSensorState:
def test_create(self):
state = SensorState(
sensor_id="front_door",
states={"CONTACT_OPEN": "OPEN"},
last_seen=12345,
battery_pct=87,
)
assert state.sensor_id == "front_door"
assert state.battery_pct == 87
assert state.signal_strength is None
def test_to_dict(self):
state = SensorState(sensor_id="s1", states={"temp": 22.5}, last_seen=100)
d = state.to_dict()
assert d["sensor_id"] == "s1"
assert d["states"] == {"temp": 22.5}
assert d["battery_pct"] is None
# --- Registry Tests ---
class TestSensorRegistry:
def test_lookup_by_zigbee_name(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
sensor = reg.get_sensor_by_zigbee_name("front_door_sensor")
assert sensor is not None
assert sensor.id == "front_door"
def test_lookup_by_address(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
sensor = reg.get_sensor_by_address("hallway_pir")
assert sensor is not None
assert sensor.id == "hallway_motion"
def test_lookup_unknown_returns_none(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
assert reg.get_sensor_by_zigbee_name("nonexistent") is None
assert reg.get_sensor_by_address("nonexistent") is None
def test_disabled_sensors_excluded(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
assert reg.get_sensor_by_zigbee_name("disabled_one") is None
assert reg.get_sensor("disabled_sensor") is None
def test_gpio_not_in_zigbee_lookup(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
assert reg.get_sensor_by_zigbee_name("17") is None
def test_gpio_sensors_list(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
gpio = reg.gpio_sensors()
assert len(gpio) == 1
assert gpio[0].id == "garage_reed"
def test_zigbee_sensors_list(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
zigbee = reg.zigbee_sensors()
assert len(zigbee) == 3
def test_all_sensors_excludes_disabled(self, zigbee_sensor_config):
reg = SensorRegistry(zigbee_sensor_config)
assert len(reg.all_sensors()) == 4
# --- Zigbee Payload Normalization Tests ---
class TestNormalizeZigbeePayload:
def test_contact_open(self):
results = normalize_zigbee_payload({"contact": False})
assert len(results) == 1
r = results[0]
assert r["event_type"] == EventType.CONTACT_OPEN
assert r["state"] == "OPEN"
assert r["source"] == "zigbee"
assert "ts" in r
def test_contact_closed(self):
results = normalize_zigbee_payload({"contact": True})
assert len(results) == 1
assert results[0]["event_type"] == EventType.CONTACT_CLOSED
assert results[0]["state"] == "CLOSED"
def test_motion_active(self):
results = normalize_zigbee_payload({"occupancy": True})
assert len(results) == 1
assert results[0]["event_type"] == EventType.MOTION_START
assert results[0]["state"] == "ACTIVE"
def test_motion_idle(self):
results = normalize_zigbee_payload({"occupancy": False})
assert len(results) == 1
assert results[0]["event_type"] == EventType.MOTION_END
assert results[0]["state"] == "IDLE"
def test_temperature(self):
results = normalize_zigbee_payload({"temperature": 22.5})
assert len(results) == 1
r = results[0]
assert r["event_type"] == "temperature"
assert r["value"] == 22.5
assert r["unit"] == "C"
def test_humidity(self):
results = normalize_zigbee_payload({"humidity": 45})
assert len(results) == 1
assert results[0]["value"] == 45.0
def test_battery(self):
results = normalize_zigbee_payload({"battery": 87})
assert len(results) == 1
assert results[0]["event_type"] == "battery"
assert results[0]["pct"] == 87
def test_linkquality(self):
results = normalize_zigbee_payload({"linkquality": 120})
assert len(results) == 1
assert results[0]["event_type"] == "linkquality"
assert results[0]["value"] == 120
def test_multi_field_payload(self):
"""A real Zigbee sensor often sends multiple fields at once."""
results = normalize_zigbee_payload({
"contact": False,
"battery": 92,
"linkquality": 85,
"temperature": 21.3,
})
event_types = {r["event_type"] for r in results}
assert EventType.CONTACT_OPEN in event_types
assert "battery" in event_types
assert "temperature" in event_types
assert "linkquality" in event_types
assert len(results) == 4
def test_empty_payload(self):
results = normalize_zigbee_payload({})
assert results == []
def test_unknown_fields_ignored(self):
results = normalize_zigbee_payload({"unknown_field": 42, "foo": "bar"})
assert results == []

324
tests/unit/test_ups.py Normal file
View File

@@ -0,0 +1,324 @@
"""Tests for UPS monitor state transitions and shutdown sequence."""
from unittest.mock import MagicMock, call, patch
import pytest
from vigilar.config import MQTTConfig, SystemConfig, UPSConfig, VigilarConfig
from vigilar.constants import EventType, Severity, Topics, UPSStatus
from vigilar.ups.monitor import UPSMonitor, _parse_status, _safe_float
# --- Helpers ---
def _make_cfg(**ups_overrides) -> VigilarConfig:
return VigilarConfig(
system=SystemConfig(data_dir="/tmp/vigilar-test"),
mqtt=MQTTConfig(),
ups=UPSConfig(**ups_overrides),
)
def _make_nut_vars(
status: str = "OL",
charge: str = "100",
runtime: str = "3600",
voltage: str = "230.0",
load: str = "25",
) -> dict[str, str]:
return {
"ups.status": status,
"battery.charge": charge,
"battery.runtime": runtime,
"input.voltage": voltage,
"ups.load": load,
}
# --- _parse_status ---
class TestParseStatus:
def test_online(self):
assert _parse_status("OL") == UPSStatus.ONLINE
def test_on_battery(self):
assert _parse_status("OB") == UPSStatus.ON_BATTERY
def test_low_battery(self):
assert _parse_status("OB LB") == UPSStatus.LOW_BATTERY
def test_online_charging(self):
assert _parse_status("OL CHRG") == UPSStatus.ONLINE
def test_unknown(self):
assert _parse_status("BYPASS") == UPSStatus.UNKNOWN
def test_empty(self):
assert _parse_status("") == UPSStatus.UNKNOWN
class TestSafeFloat:
def test_valid(self):
assert _safe_float("42.5") == 42.5
def test_none(self):
assert _safe_float(None) == 0.0
def test_garbage(self):
assert _safe_float("n/a", -1.0) == -1.0
# --- State Transition Tests ---
class TestStateTransitions:
"""Test UPSMonitor._poll_once for state transition detection."""
def _poll(self, monitor: UPSMonitor, bus: MagicMock, engine: MagicMock, nut_vars: dict):
client = MagicMock()
client.GetUPSVars.return_value = nut_vars
monitor._poll_once(client, bus, engine)
def test_online_to_on_battery_publishes_power_loss(self):
cfg = _make_cfg()
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ONLINE
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event") as mock_ie, \
patch("vigilar.ups.monitor.insert_system_event") as mock_ise:
self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="80", runtime="1800"))
assert monitor._prev_status == UPSStatus.ON_BATTERY
# Check power loss was published
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_STATUS in topics_published
assert Topics.UPS_POWER_LOSS in topics_published
def test_on_battery_to_online_publishes_restored(self):
cfg = _make_cfg()
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ON_BATTERY
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
self._poll(monitor, bus, engine, _make_nut_vars(status="OL"))
assert monitor._prev_status == UPSStatus.ONLINE
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_RESTORED in topics_published
def test_low_battery_to_online_publishes_restored(self):
cfg = _make_cfg()
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.LOW_BATTERY
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
self._poll(monitor, bus, engine, _make_nut_vars(status="OL"))
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_RESTORED in topics_published
def test_charge_below_threshold_publishes_low_battery(self):
cfg = _make_cfg(low_battery_threshold_pct=30)
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ON_BATTERY
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="15", runtime="1800"))
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_LOW_BATTERY in topics_published
def test_charge_above_threshold_no_low_battery(self):
cfg = _make_cfg(low_battery_threshold_pct=20)
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ON_BATTERY
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="50", runtime="3600"))
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_LOW_BATTERY not in topics_published
def test_critical_runtime_triggers_shutdown(self):
cfg = _make_cfg(critical_runtime_threshold_s=300, shutdown_delay_s=0)
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ON_BATTERY
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"), \
patch("vigilar.ups.shutdown.insert_event"), \
patch("vigilar.ups.shutdown.insert_system_event"), \
patch("vigilar.ups.shutdown.os.system") as mock_system, \
patch("vigilar.ups.shutdown.time.sleep"):
self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="10", runtime="60"))
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_CRITICAL in topics_published
assert Topics.SYSTEM_SHUTDOWN in topics_published
assert monitor._shutdown_triggered is True
mock_system.assert_called_once_with("shutdown -h now")
def test_critical_runtime_only_triggers_once(self):
cfg = _make_cfg(critical_runtime_threshold_s=300, shutdown_delay_s=0)
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ON_BATTERY
monitor._shutdown_triggered = True # already triggered
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
self._poll(monitor, bus, engine, _make_nut_vars(status="OB", charge="5", runtime="30"))
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_CRITICAL not in topics_published
def test_online_no_low_battery_even_if_low_charge(self):
"""When on mains, low charge should not trigger low_battery event."""
cfg = _make_cfg(low_battery_threshold_pct=50)
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ONLINE
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
self._poll(monitor, bus, engine, _make_nut_vars(status="OL", charge="10"))
topics_published = [c.args[0] for c in bus.publish_event.call_args_list]
assert Topics.UPS_LOW_BATTERY not in topics_published
def test_status_always_published(self):
cfg = _make_cfg()
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ONLINE
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
self._poll(monitor, bus, engine, _make_nut_vars())
assert bus.publish_event.call_args_list[0].args[0] == Topics.UPS_STATUS
def test_bytes_vars_decoded(self):
"""NUT client may return bytes keys/values."""
cfg = _make_cfg()
monitor = UPSMonitor(cfg)
monitor._prev_status = UPSStatus.ONLINE
bus = MagicMock()
engine = MagicMock()
byte_vars = {
b"ups.status": b"OL",
b"battery.charge": b"95",
b"battery.runtime": b"7200",
b"input.voltage": b"231.2",
b"ups.load": b"18",
}
client = MagicMock()
client.GetUPSVars.return_value = byte_vars
with patch("vigilar.ups.monitor.insert_event"), \
patch("vigilar.ups.monitor.insert_system_event"):
monitor._poll_once(client, bus, engine)
assert monitor._prev_status == UPSStatus.ONLINE
bus.publish_event.assert_called_once()
# --- Shutdown Sequence Tests ---
class TestShutdownSequence:
def test_execute_order(self):
"""Verify: publish -> log -> sleep -> os.system."""
from vigilar.ups.shutdown import ShutdownSequence
ups_cfg = UPSConfig(shutdown_delay_s=10)
seq = ShutdownSequence(ups_cfg)
bus = MagicMock()
engine = MagicMock()
call_order = []
bus.publish_event.side_effect = lambda *a, **kw: call_order.append("publish")
with patch("vigilar.ups.shutdown.insert_system_event", side_effect=lambda *a, **kw: call_order.append("insert_system_event")), \
patch("vigilar.ups.shutdown.insert_event", side_effect=lambda *a, **kw: call_order.append("insert_event")), \
patch("vigilar.ups.shutdown.time.sleep", side_effect=lambda s: call_order.append(f"sleep_{s}")) as mock_sleep, \
patch("vigilar.ups.shutdown.os.system", side_effect=lambda cmd: call_order.append(f"system_{cmd}")) as mock_sys:
seq.execute(bus, engine, reason="test_critical")
assert call_order[0] == "publish"
assert "insert_system_event" in call_order
assert "insert_event" in call_order
assert "sleep_10" in call_order
assert "system_shutdown -h now" in call_order
# Sleep and system call should come after publish/log
sleep_idx = call_order.index("sleep_10")
system_idx = call_order.index("system_shutdown -h now")
assert sleep_idx < system_idx
def test_custom_shutdown_command(self):
from vigilar.ups.shutdown import ShutdownSequence
ups_cfg = UPSConfig(shutdown_delay_s=0)
seq = ShutdownSequence(ups_cfg, shutdown_command="poweroff")
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.shutdown.insert_system_event"), \
patch("vigilar.ups.shutdown.insert_event"), \
patch("vigilar.ups.shutdown.time.sleep"), \
patch("vigilar.ups.shutdown.os.system") as mock_sys:
seq.execute(bus, engine, reason="test")
mock_sys.assert_called_once_with("poweroff")
def test_publishes_system_shutdown_topic(self):
from vigilar.ups.shutdown import ShutdownSequence
ups_cfg = UPSConfig(shutdown_delay_s=0)
seq = ShutdownSequence(ups_cfg)
bus = MagicMock()
engine = MagicMock()
with patch("vigilar.ups.shutdown.insert_system_event"), \
patch("vigilar.ups.shutdown.insert_event"), \
patch("vigilar.ups.shutdown.time.sleep"), \
patch("vigilar.ups.shutdown.os.system"):
seq.execute(bus, engine, reason="battery_dead")
bus.publish_event.assert_called_once()
assert bus.publish_event.call_args.args[0] == Topics.SYSTEM_SHUTDOWN
assert bus.publish_event.call_args.kwargs["reason"] == "battery_dead"