Phase 6 — Events + Rule Engine: - EventProcessor subprocess: subscribes to all MQTT events, logs to DB, evaluates rules, fires alert actions - ArmStateFSM: DISARMED/ARMED_HOME/ARMED_AWAY with PIN verification (HMAC-safe), DB persistence, MQTT state publishing - RuleEngine: AND/OR logic, 4 condition types (arm_state, sensor_event, camera_motion, time_window), per-rule cooldown tracking - SSE event stream with subscriber queue pattern and keepalive - Event acknowledge endpoint Phase 7 — Sensor Bridge: - SensorBridge subprocess: subscribes to Zigbee2MQTT, normalizes payloads (contact, occupancy, temperature, humidity, battery, linkquality) - GPIOHandler: conditional gpiozero import, callbacks for reed switches and PIR sensors - SensorRegistry: maps Zigbee addresses and names to config sensor IDs - SensorEvent/SensorState dataclasses - Web UI now shows real sensor states from DB Phase 8 — UPS Monitor: - UPSMonitor subprocess: polls NUT via pynut2 with reconnect backoff - State transition detection: OL→OB (power_loss), charge/runtime thresholds (low_battery, critical), OB→OL (restored) - ShutdownSequence: ordered shutdown with configurable delay and command - All conditionally imported (pynut2, gpiozero) for non-target platforms Fixed test_db fixture to use isolated engines (no global singleton leak). 96 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
24 lines
668 B
Python
24 lines
668 B
Python
"""Tests for database schema creation."""
|
|
|
|
from sqlalchemy import create_engine, inspect
|
|
|
|
from vigilar.storage.schema import metadata
|
|
|
|
|
|
def test_tables_created(tmp_path):
|
|
db_path = tmp_path / "test.db"
|
|
engine = create_engine(f"sqlite:///{db_path}", echo=False)
|
|
metadata.create_all(engine)
|
|
|
|
assert db_path.exists()
|
|
|
|
inspector = inspect(engine)
|
|
table_names = inspector.get_table_names()
|
|
|
|
expected = [
|
|
"cameras", "sensors", "sensor_states", "events", "recordings",
|
|
"system_events", "arm_state_log", "alert_log", "push_subscriptions",
|
|
]
|
|
for name in expected:
|
|
assert name in table_names, f"Missing table: {name}"
|