feat: wire MQTT → SSE bridge so the event timeline updates live (closes #1) #6

Merged
alee merged 2 commits from fix/issue-1-sse-bridge into main 2026-04-05 16:55:27 +00:00
9 changed files with 242 additions and 9 deletions

View File

@@ -40,4 +40,4 @@ All access is read-mostly via `vigilar.storage.queries`. Touches essentially eve
## Notes
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function — that function is defined but has no call site in the repo at time of writing, so live SSE updates will only flow once the bridge from MQTT into that queue is wired.
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function. A dedicated MQTT → SSE bridge (`vigilar/web/sse_bridge.py`) runs inside the web process, subscribes to `Topics.EVENTS_PUBLISHED`, and calls `broadcast_sse_event` for every classified event emitted by the events subsystem — so the in-browser event timeline updates live without a page refresh.

View File

@@ -585,13 +585,6 @@ Do not expose port `49735` directly on the WAN; require the tunnel.
## Known limitations
- **Event timeline is not live.** The web UI event timeline requires
a page refresh to show new events. `broadcast_sse_event` exists in
`vigilar/web/blueprints/events.py` but has zero call sites today;
events are not pushed to browsers via SSE. Web Push notifications
via VAPID are independent of the timeline and do work: you will
get mobile alerts as motion happens, but the in-page timeline lags
until you reload.
- **Recording integrity is not authenticated.** AES-256-CTR gives you
confidentiality, not tamper-evidence. If an attacker reaches the
recordings directory they can modify ciphertext unnoticed. See the

View File

@@ -11,6 +11,28 @@ from vigilar.config import VigilarConfig, load_config
from vigilar.storage.schema import metadata
@pytest.fixture(autouse=True, scope="session")
def _isolate_vigilar_config(tmp_path_factory):
"""Prevent tests from writing to the real config/vigilar.toml.
Web endpoint handlers that call `_save_and_reload()` read the target
path from the VIGILAR_CONFIG env var, falling back to the relative
`"config/vigilar.toml"`. Without this fixture, any test that exercises
such an endpoint rewrites the repo's committed config file via a
Pydantic round-trip, stripping comments and non-default fields.
"""
tmp_config = tmp_path_factory.mktemp("vigilar-config") / "vigilar.toml"
prev = os.environ.get("VIGILAR_CONFIG")
os.environ["VIGILAR_CONFIG"] = str(tmp_config)
try:
yield
finally:
if prev is None:
os.environ.pop("VIGILAR_CONFIG", None)
else:
os.environ["VIGILAR_CONFIG"] = prev
def _create_test_engine(db_path: Path):
"""Create a fresh engine for testing (bypasses the global singleton)."""
db_path.parent.mkdir(parents=True, exist_ok=True)

View File

@@ -429,3 +429,85 @@ class TestPetEventClassification:
assert etype == EventType.WILDLIFE_PASSIVE
assert sev == Severity.INFO
assert source == "front"
# ---------------------------------------------------------------------------
# Classified-event broadcast (for SSE bridge)
# ---------------------------------------------------------------------------
class _RecordingBus:
"""Minimal bus fake that records publishes instead of sending them."""
def __init__(self):
self.published: list[tuple[str, dict]] = []
def publish(self, topic: str, payload: dict, qos: int = 1) -> None:
self.published.append((topic, payload))
def publish_event(self, topic: str, **kwargs) -> None:
self.publish(topic, kwargs)
class _StubFSM:
state = ArmState.DISARMED
class _StubRuleEngine:
def evaluate(self, topic, payload, state):
return []
class TestEventsPublishedBroadcast:
"""EventProcessor should publish a classified-event summary to
Topics.EVENTS_PUBLISHED after inserting, so the Flask SSE bridge
can forward it to browser clients."""
def test_handle_event_publishes_classified_payload(self, test_db):
from vigilar.events.processor import EventProcessor
from vigilar.constants import Topics
processor = EventProcessor.__new__(EventProcessor)
bus = _RecordingBus()
processor._handle_event(
topic="vigilar/camera/cam1/motion/start",
payload={"ts": 12345, "detail": "x"},
engine=test_db,
fsm=_StubFSM(),
rule_engine=_StubRuleEngine(),
bus=bus,
)
published_on_bridge_topic = [
p for t, p in bus.published if t == Topics.EVENTS_PUBLISHED
]
assert len(published_on_bridge_topic) == 1, (
f"expected exactly one publish to {Topics.EVENTS_PUBLISHED}, "
f"got publishes: {bus.published}"
)
msg = published_on_bridge_topic[0]
assert msg["type"] == EventType.MOTION_START
assert msg["severity"] == Severity.WARNING
assert msg["source_id"] == "cam1"
assert "ts" in msg
assert "id" in msg and isinstance(msg["id"], int)
def test_unclassified_topic_does_not_publish(self, test_db):
"""Heartbeats and other non-event topics must not be forwarded."""
from vigilar.events.processor import EventProcessor
from vigilar.constants import Topics
processor = EventProcessor.__new__(EventProcessor)
bus = _RecordingBus()
processor._handle_event(
topic="vigilar/camera/cam1/heartbeat",
payload={"ts": 12345},
engine=test_db,
fsm=_StubFSM(),
rule_engine=_StubRuleEngine(),
bus=bus,
)
assert not any(t == Topics.EVENTS_PUBLISHED for t, _ in bus.published)

View File

@@ -0,0 +1,62 @@
"""Tests for the MQTT → Server-Sent Events bridge used by the web process."""
import json
import queue
def test_forward_event_delivers_payload_to_sse_subscribers():
"""forward_event should hand the payload to every connected SSE client."""
from vigilar.web.blueprints import events as events_bp
from vigilar.web.sse_bridge import forward_event
q: queue.Queue = queue.Queue(maxsize=10)
events_bp._sse_subscribers.append(q)
try:
payload = {
"type": "MOTION_START",
"source_id": "cam1",
"severity": "WARNING",
"ts": 1234,
}
forward_event("vigilar/events/published", payload)
raw = q.get_nowait()
assert raw.startswith("data: ")
# Strip "data: " prefix and trailing double newline
data = json.loads(raw[len("data: "):].rstrip())
assert data["type"] == "MOTION_START"
assert data["source_id"] == "cam1"
finally:
if q in events_bp._sse_subscribers:
events_bp._sse_subscribers.remove(q)
def test_start_sse_bridge_subscribes_to_events_published(monkeypatch):
"""start_sse_bridge must connect a bus and subscribe forward_event
to Topics.EVENTS_PUBLISHED — that is the single entry point for the
web process to observe classified events from the events subsystem."""
from vigilar.config import VigilarConfig
from vigilar.constants import Topics
from vigilar.web import sse_bridge
class FakeBus:
def __init__(self, config, client_id):
self.client_id = client_id
self.subscriptions: list[tuple[str, object]] = []
self.connected = False
def subscribe(self, topic, handler):
self.subscriptions.append((topic, handler))
def connect(self):
self.connected = True
monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus)
bus = sse_bridge.start_sse_bridge(VigilarConfig())
assert bus.connected is True
assert len(bus.subscriptions) == 1
topic, handler = bus.subscriptions[0]
assert topic == Topics.EVENTS_PUBLISHED
assert handler is sse_bridge.forward_event

View File

@@ -211,6 +211,10 @@ class Topics:
SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
# Classified events forwarded to the web SSE bridge (see events.processor
# and web.sse_bridge).
EVENTS_PUBLISHED = "vigilar/events/published"
# Wildcard subscriptions
ALL = "vigilar/#"
ALL_CAMERAS = "vigilar/camera/#"

View File

@@ -103,6 +103,22 @@ class EventProcessor:
payload=payload,
)
# Broadcast a classified summary for the web SSE bridge. This is
# independent of rule actions and DB storage — the web process
# subscribes to Topics.EVENTS_PUBLISHED and forwards each message
# to connected browser clients.
bus.publish(
Topics.EVENTS_PUBLISHED,
{
"id": event_id,
"ts": int(time.time() * 1000),
"type": event_type,
"severity": severity,
"source_id": source_id,
"payload": payload,
},
)
# Insert pet/wildlife sightings
if event_type in (
EventType.PET_DETECTED, EventType.PET_ESCAPE, EventType.UNKNOWN_ANIMAL

View File

@@ -67,9 +67,27 @@ class SubsystemProcess:
def _run_web(cfg: VigilarConfig) -> None:
"""Run the Flask web server in a subprocess."""
from vigilar.web.app import create_app
from vigilar.web.sse_bridge import start_sse_bridge
app = create_app(cfg)
# Forward classified events from MQTT to browser SSE clients. Failure
# here must not kill the web process — the UI still works without
# live updates, it just requires a page refresh.
sse_bus = None
try:
sse_bus = start_sse_bridge(cfg)
except Exception:
log.exception("Failed to start SSE bridge; live event timeline will not update")
try:
app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False)
finally:
if sse_bus is not None:
try:
sse_bus.disconnect()
except Exception:
log.exception("Error disconnecting SSE bridge bus")
def _run_event_processor(cfg: VigilarConfig) -> None:

36
vigilar/web/sse_bridge.py Normal file
View File

@@ -0,0 +1,36 @@
"""Bridge classified events from MQTT to Server-Sent Events subscribers.
The events subsystem (`vigilar.events.processor.EventProcessor`) publishes
classified events to `Topics.EVENTS_PUBLISHED`. The Flask web process runs
in its own OS process, so to make the in-browser event timeline update
live it must subscribe to that topic via its own `MessageBus` client and
forward every message to `broadcast_sse_event`.
"""
import logging
from typing import Any
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import Topics
from vigilar.web.blueprints.events import broadcast_sse_event
log = logging.getLogger(__name__)
def forward_event(topic: str, payload: dict[str, Any]) -> None:
"""MQTT handler: forward a classified event to SSE subscribers."""
broadcast_sse_event(payload)
def start_sse_bridge(cfg: VigilarConfig) -> MessageBus:
"""Create an MQTT client that forwards classified events to SSE clients.
Returns the connected `MessageBus` so the caller can disconnect it on
shutdown.
"""
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-sse-bridge")
bus.subscribe(Topics.EVENTS_PUBLISHED, forward_event)
bus.connect()
log.info("SSE bridge started: subscribed to %s", Topics.EVENTS_PUBLISHED)
return bus