Files
vigilar/tests/unit/test_sse_bridge.py
adlee-was-taken 09b59e3bb5 feat: wire MQTT → SSE bridge so the event timeline updates live
Closes #1.

The Flask event-timeline was dead: `broadcast_sse_event` existed in
`vigilar/web/blueprints/events.py` but had zero call sites. Clients
subscribed to `/events/stream`, received the initial "connected"
message, and then only keepalives — a page refresh was required to
see new events. (Web Push via VAPID was independent and already worked.)

The root cause was a process-boundary gap: the events subsystem runs
in its own OS process and emits to MQTT, while the Flask app runs in a
separate process with no MQTT client of its own.

This change adds a thin bridge:

- EventProcessor._handle_event now publishes a classified summary
  (id, ts, type, severity, source_id, payload) to a new topic
  `Topics.EVENTS_PUBLISHED = "vigilar/events/published"` right after
  `insert_event()`. Classification logic stays in one place.

- A new module `vigilar/web/sse_bridge.py` provides `forward_event`
  (MQTT handler) and `start_sse_bridge(cfg)` (creates a MessageBus,
  subscribes forward_event to EVENTS_PUBLISHED, connects, returns the
  bus).

- `vigilar/main.py:_run_web` starts the bridge after `create_app(cfg)`
  and disconnects it on shutdown. Bridge failure is logged but does
  not kill the web process — the UI still works without live updates.

- `create_app` is deliberately NOT changed. Keeping the bridge out of
  the app factory means no existing test triggers a real MQTT
  connection, and the bridge stays a production-only concern wired by
  the supervisor.

Tests (all added with TDD, RED verified before GREEN):

- tests/unit/test_events.py::TestEventsPublishedBroadcast — asserts
  `_handle_event` publishes the classified payload for a motion event
  and does NOT publish for unclassified topics (heartbeats).
- tests/unit/test_sse_bridge.py — asserts `forward_event` reaches SSE
  subscribers, and `start_sse_bridge` wires the handler to
  `Topics.EVENTS_PUBLISHED` on a connected bus (fake bus, no real
  MQTT in tests).

Also refreshes the docs that previously flagged the dead SSE as a
known limitation (operator guide, web architecture doc).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 16:55:27 +00:00

63 lines
2.1 KiB
Python

"""Tests for the MQTT → Server-Sent Events bridge used by the web process."""
import json
import queue
def test_forward_event_delivers_payload_to_sse_subscribers():
"""forward_event should hand the payload to every connected SSE client."""
from vigilar.web.blueprints import events as events_bp
from vigilar.web.sse_bridge import forward_event
q: queue.Queue = queue.Queue(maxsize=10)
events_bp._sse_subscribers.append(q)
try:
payload = {
"type": "MOTION_START",
"source_id": "cam1",
"severity": "WARNING",
"ts": 1234,
}
forward_event("vigilar/events/published", payload)
raw = q.get_nowait()
assert raw.startswith("data: ")
# Strip "data: " prefix and trailing double newline
data = json.loads(raw[len("data: "):].rstrip())
assert data["type"] == "MOTION_START"
assert data["source_id"] == "cam1"
finally:
if q in events_bp._sse_subscribers:
events_bp._sse_subscribers.remove(q)
def test_start_sse_bridge_subscribes_to_events_published(monkeypatch):
"""start_sse_bridge must connect a bus and subscribe forward_event
to Topics.EVENTS_PUBLISHED — that is the single entry point for the
web process to observe classified events from the events subsystem."""
from vigilar.config import VigilarConfig
from vigilar.constants import Topics
from vigilar.web import sse_bridge
class FakeBus:
def __init__(self, config, client_id):
self.client_id = client_id
self.subscriptions: list[tuple[str, object]] = []
self.connected = False
def subscribe(self, topic, handler):
self.subscriptions.append((topic, handler))
def connect(self):
self.connected = True
monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus)
bus = sse_bridge.start_sse_bridge(VigilarConfig())
assert bus.connected is True
assert len(bus.subscriptions) == 1
topic, handler = bus.subscriptions[0]
assert topic == Topics.EVENTS_PUBLISHED
assert handler is sse_bridge.forward_event