feat: wire MQTT → SSE bridge so the event timeline updates live

Closes #1.

The Flask event-timeline was dead: `broadcast_sse_event` existed in
`vigilar/web/blueprints/events.py` but had zero call sites. Clients
subscribed to `/events/stream`, received the initial "connected"
message, and then only keepalives — a page refresh was required to
see new events. (Web Push via VAPID was independent and already worked.)

The root cause was a process-boundary gap: the events subsystem runs
in its own OS process and emits to MQTT, while the Flask app runs in a
separate process with no MQTT client of its own.

This change adds a thin bridge:

- EventProcessor._handle_event now publishes a classified summary
  (id, ts, type, severity, source_id, payload) to a new topic
  `Topics.EVENTS_PUBLISHED = "vigilar/events/published"` right after
  `insert_event()`. Classification logic stays in one place.

- A new module `vigilar/web/sse_bridge.py` provides `forward_event`
  (MQTT handler) and `start_sse_bridge(cfg)` (creates a MessageBus,
  subscribes forward_event to EVENTS_PUBLISHED, connects, returns the
  bus).

- `vigilar/main.py:_run_web` starts the bridge after `create_app(cfg)`
  and disconnects it on shutdown. Bridge failure is logged but does
  not kill the web process — the UI still works without live updates.

- `create_app` is deliberately NOT changed. Keeping the bridge out of
  the app factory means no existing test triggers a real MQTT
  connection, and the bridge stays a production-only concern wired by
  the supervisor.

Tests (all added with TDD, RED verified before GREEN):

- tests/unit/test_events.py::TestEventsPublishedBroadcast — asserts
  `_handle_event` publishes the classified payload for a motion event
  and does NOT publish for unclassified topics (heartbeats).
- tests/unit/test_sse_bridge.py — asserts `forward_event` reaches SSE
  subscribers, and `start_sse_bridge` wires the handler to
  `Topics.EVENTS_PUBLISHED` on a connected bus (fake bus, no real
  MQTT in tests).

Also refreshes the docs that previously flagged the dead SSE as a
known limitation (operator guide, web architecture doc).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit was merged in pull request #6.
This commit is contained in:
adlee-was-taken
2026-04-05 11:07:25 -04:00
committed by A.D.Lee
parent 9f959f8c78
commit 09b59e3bb5
8 changed files with 220 additions and 9 deletions

View File

@@ -40,4 +40,4 @@ All access is read-mostly via `vigilar.storage.queries`. Touches essentially eve
## Notes ## Notes
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function — that function is defined but has no call site in the repo at time of writing, so live SSE updates will only flow once the bridge from MQTT into that queue is wired. Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function. A dedicated MQTT → SSE bridge (`vigilar/web/sse_bridge.py`) runs inside the web process, subscribes to `Topics.EVENTS_PUBLISHED`, and calls `broadcast_sse_event` for every classified event emitted by the events subsystem — so the in-browser event timeline updates live without a page refresh.

View File

@@ -585,13 +585,6 @@ Do not expose port `49735` directly on the WAN; require the tunnel.
## Known limitations ## Known limitations
- **Event timeline is not live.** The web UI event timeline requires
a page refresh to show new events. `broadcast_sse_event` exists in
`vigilar/web/blueprints/events.py` but has zero call sites today;
events are not pushed to browsers via SSE. Web Push notifications
via VAPID are independent of the timeline and do work: you will
get mobile alerts as motion happens, but the in-page timeline lags
until you reload.
- **Recording integrity is not authenticated.** AES-256-CTR gives you - **Recording integrity is not authenticated.** AES-256-CTR gives you
confidentiality, not tamper-evidence. If an attacker reaches the confidentiality, not tamper-evidence. If an attacker reaches the
recordings directory they can modify ciphertext unnoticed. See the recordings directory they can modify ciphertext unnoticed. See the

View File

@@ -429,3 +429,85 @@ class TestPetEventClassification:
assert etype == EventType.WILDLIFE_PASSIVE assert etype == EventType.WILDLIFE_PASSIVE
assert sev == Severity.INFO assert sev == Severity.INFO
assert source == "front" assert source == "front"
# ---------------------------------------------------------------------------
# Classified-event broadcast (for SSE bridge)
# ---------------------------------------------------------------------------
class _RecordingBus:
"""Minimal bus fake that records publishes instead of sending them."""
def __init__(self):
self.published: list[tuple[str, dict]] = []
def publish(self, topic: str, payload: dict, qos: int = 1) -> None:
self.published.append((topic, payload))
def publish_event(self, topic: str, **kwargs) -> None:
self.publish(topic, kwargs)
class _StubFSM:
state = ArmState.DISARMED
class _StubRuleEngine:
def evaluate(self, topic, payload, state):
return []
class TestEventsPublishedBroadcast:
"""EventProcessor should publish a classified-event summary to
Topics.EVENTS_PUBLISHED after inserting, so the Flask SSE bridge
can forward it to browser clients."""
def test_handle_event_publishes_classified_payload(self, test_db):
from vigilar.events.processor import EventProcessor
from vigilar.constants import Topics
processor = EventProcessor.__new__(EventProcessor)
bus = _RecordingBus()
processor._handle_event(
topic="vigilar/camera/cam1/motion/start",
payload={"ts": 12345, "detail": "x"},
engine=test_db,
fsm=_StubFSM(),
rule_engine=_StubRuleEngine(),
bus=bus,
)
published_on_bridge_topic = [
p for t, p in bus.published if t == Topics.EVENTS_PUBLISHED
]
assert len(published_on_bridge_topic) == 1, (
f"expected exactly one publish to {Topics.EVENTS_PUBLISHED}, "
f"got publishes: {bus.published}"
)
msg = published_on_bridge_topic[0]
assert msg["type"] == EventType.MOTION_START
assert msg["severity"] == Severity.WARNING
assert msg["source_id"] == "cam1"
assert "ts" in msg
assert "id" in msg and isinstance(msg["id"], int)
def test_unclassified_topic_does_not_publish(self, test_db):
"""Heartbeats and other non-event topics must not be forwarded."""
from vigilar.events.processor import EventProcessor
from vigilar.constants import Topics
processor = EventProcessor.__new__(EventProcessor)
bus = _RecordingBus()
processor._handle_event(
topic="vigilar/camera/cam1/heartbeat",
payload={"ts": 12345},
engine=test_db,
fsm=_StubFSM(),
rule_engine=_StubRuleEngine(),
bus=bus,
)
assert not any(t == Topics.EVENTS_PUBLISHED for t, _ in bus.published)

View File

@@ -0,0 +1,62 @@
"""Tests for the MQTT → Server-Sent Events bridge used by the web process."""
import json
import queue
def test_forward_event_delivers_payload_to_sse_subscribers():
"""forward_event should hand the payload to every connected SSE client."""
from vigilar.web.blueprints import events as events_bp
from vigilar.web.sse_bridge import forward_event
q: queue.Queue = queue.Queue(maxsize=10)
events_bp._sse_subscribers.append(q)
try:
payload = {
"type": "MOTION_START",
"source_id": "cam1",
"severity": "WARNING",
"ts": 1234,
}
forward_event("vigilar/events/published", payload)
raw = q.get_nowait()
assert raw.startswith("data: ")
# Strip "data: " prefix and trailing double newline
data = json.loads(raw[len("data: "):].rstrip())
assert data["type"] == "MOTION_START"
assert data["source_id"] == "cam1"
finally:
if q in events_bp._sse_subscribers:
events_bp._sse_subscribers.remove(q)
def test_start_sse_bridge_subscribes_to_events_published(monkeypatch):
"""start_sse_bridge must connect a bus and subscribe forward_event
to Topics.EVENTS_PUBLISHED — that is the single entry point for the
web process to observe classified events from the events subsystem."""
from vigilar.config import VigilarConfig
from vigilar.constants import Topics
from vigilar.web import sse_bridge
class FakeBus:
def __init__(self, config, client_id):
self.client_id = client_id
self.subscriptions: list[tuple[str, object]] = []
self.connected = False
def subscribe(self, topic, handler):
self.subscriptions.append((topic, handler))
def connect(self):
self.connected = True
monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus)
bus = sse_bridge.start_sse_bridge(VigilarConfig())
assert bus.connected is True
assert len(bus.subscriptions) == 1
topic, handler = bus.subscriptions[0]
assert topic == Topics.EVENTS_PUBLISHED
assert handler is sse_bridge.forward_event

View File

@@ -211,6 +211,10 @@ class Topics:
SYSTEM_SHUTDOWN = "vigilar/system/shutdown" SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated" SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
# Classified events forwarded to the web SSE bridge (see events.processor
# and web.sse_bridge).
EVENTS_PUBLISHED = "vigilar/events/published"
# Wildcard subscriptions # Wildcard subscriptions
ALL = "vigilar/#" ALL = "vigilar/#"
ALL_CAMERAS = "vigilar/camera/#" ALL_CAMERAS = "vigilar/camera/#"

View File

@@ -103,6 +103,22 @@ class EventProcessor:
payload=payload, payload=payload,
) )
# Broadcast a classified summary for the web SSE bridge. This is
# independent of rule actions and DB storage — the web process
# subscribes to Topics.EVENTS_PUBLISHED and forwards each message
# to connected browser clients.
bus.publish(
Topics.EVENTS_PUBLISHED,
{
"id": event_id,
"ts": int(time.time() * 1000),
"type": event_type,
"severity": severity,
"source_id": source_id,
"payload": payload,
},
)
# Insert pet/wildlife sightings # Insert pet/wildlife sightings
if event_type in ( if event_type in (
EventType.PET_DETECTED, EventType.PET_ESCAPE, EventType.UNKNOWN_ANIMAL EventType.PET_DETECTED, EventType.PET_ESCAPE, EventType.UNKNOWN_ANIMAL

View File

@@ -67,9 +67,27 @@ class SubsystemProcess:
def _run_web(cfg: VigilarConfig) -> None: def _run_web(cfg: VigilarConfig) -> None:
"""Run the Flask web server in a subprocess.""" """Run the Flask web server in a subprocess."""
from vigilar.web.app import create_app from vigilar.web.app import create_app
from vigilar.web.sse_bridge import start_sse_bridge
app = create_app(cfg) app = create_app(cfg)
app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False)
# Forward classified events from MQTT to browser SSE clients. Failure
# here must not kill the web process — the UI still works without
# live updates, it just requires a page refresh.
sse_bus = None
try:
sse_bus = start_sse_bridge(cfg)
except Exception:
log.exception("Failed to start SSE bridge; live event timeline will not update")
try:
app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False)
finally:
if sse_bus is not None:
try:
sse_bus.disconnect()
except Exception:
log.exception("Error disconnecting SSE bridge bus")
def _run_event_processor(cfg: VigilarConfig) -> None: def _run_event_processor(cfg: VigilarConfig) -> None:

36
vigilar/web/sse_bridge.py Normal file
View File

@@ -0,0 +1,36 @@
"""Bridge classified events from MQTT to Server-Sent Events subscribers.
The events subsystem (`vigilar.events.processor.EventProcessor`) publishes
classified events to `Topics.EVENTS_PUBLISHED`. The Flask web process runs
in its own OS process, so to make the in-browser event timeline update
live it must subscribe to that topic via its own `MessageBus` client and
forward every message to `broadcast_sse_event`.
"""
import logging
from typing import Any
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import Topics
from vigilar.web.blueprints.events import broadcast_sse_event
log = logging.getLogger(__name__)
def forward_event(topic: str, payload: dict[str, Any]) -> None:
"""MQTT handler: forward a classified event to SSE subscribers."""
broadcast_sse_event(payload)
def start_sse_bridge(cfg: VigilarConfig) -> MessageBus:
"""Create an MQTT client that forwards classified events to SSE clients.
Returns the connected `MessageBus` so the caller can disconnect it on
shutdown.
"""
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-sse-bridge")
bus.subscribe(Topics.EVENTS_PUBLISHED, forward_event)
bus.connect()
log.info("SSE bridge started: subscribed to %s", Topics.EVENTS_PUBLISHED)
return bus