Closes #1. The Flask event-timeline was dead: `broadcast_sse_event` existed in `vigilar/web/blueprints/events.py` but had zero call sites. Clients subscribed to `/events/stream`, received the initial "connected" message, and then only keepalives — a page refresh was required to see new events. (Web Push via VAPID was independent and already worked.) The root cause was a process-boundary gap: the events subsystem runs in its own OS process and emits to MQTT, while the Flask app runs in a separate process with no MQTT client of its own. This change adds a thin bridge: - EventProcessor._handle_event now publishes a classified summary (id, ts, type, severity, source_id, payload) to a new topic `Topics.EVENTS_PUBLISHED = "vigilar/events/published"` right after `insert_event()`. Classification logic stays in one place. - A new module `vigilar/web/sse_bridge.py` provides `forward_event` (MQTT handler) and `start_sse_bridge(cfg)` (creates a MessageBus, subscribes forward_event to EVENTS_PUBLISHED, connects, returns the bus). - `vigilar/main.py:_run_web` starts the bridge after `create_app(cfg)` and disconnects it on shutdown. Bridge failure is logged but does not kill the web process — the UI still works without live updates. - `create_app` is deliberately NOT changed. Keeping the bridge out of the app factory means no existing test triggers a real MQTT connection, and the bridge stays a production-only concern wired by the supervisor. Tests (all added with TDD, RED verified before GREEN): - tests/unit/test_events.py::TestEventsPublishedBroadcast — asserts `_handle_event` publishes the classified payload for a motion event and does NOT publish for unclassified topics (heartbeats). - tests/unit/test_sse_bridge.py — asserts `forward_event` reaches SSE subscribers, and `start_sse_bridge` wires the handler to `Topics.EVENTS_PUBLISHED` on a connected bus (fake bus, no real MQTT in tests). Also refreshes the docs that previously flagged the dead SSE as a known limitation (operator guide, web architecture doc). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
63 lines
2.1 KiB
Python
63 lines
2.1 KiB
Python
"""Tests for the MQTT → Server-Sent Events bridge used by the web process."""
|
|
|
|
import json
|
|
import queue
|
|
|
|
|
|
def test_forward_event_delivers_payload_to_sse_subscribers():
|
|
"""forward_event should hand the payload to every connected SSE client."""
|
|
from vigilar.web.blueprints import events as events_bp
|
|
from vigilar.web.sse_bridge import forward_event
|
|
|
|
q: queue.Queue = queue.Queue(maxsize=10)
|
|
events_bp._sse_subscribers.append(q)
|
|
try:
|
|
payload = {
|
|
"type": "MOTION_START",
|
|
"source_id": "cam1",
|
|
"severity": "WARNING",
|
|
"ts": 1234,
|
|
}
|
|
forward_event("vigilar/events/published", payload)
|
|
|
|
raw = q.get_nowait()
|
|
assert raw.startswith("data: ")
|
|
# Strip "data: " prefix and trailing double newline
|
|
data = json.loads(raw[len("data: "):].rstrip())
|
|
assert data["type"] == "MOTION_START"
|
|
assert data["source_id"] == "cam1"
|
|
finally:
|
|
if q in events_bp._sse_subscribers:
|
|
events_bp._sse_subscribers.remove(q)
|
|
|
|
|
|
def test_start_sse_bridge_subscribes_to_events_published(monkeypatch):
|
|
"""start_sse_bridge must connect a bus and subscribe forward_event
|
|
to Topics.EVENTS_PUBLISHED — that is the single entry point for the
|
|
web process to observe classified events from the events subsystem."""
|
|
from vigilar.config import VigilarConfig
|
|
from vigilar.constants import Topics
|
|
from vigilar.web import sse_bridge
|
|
|
|
class FakeBus:
|
|
def __init__(self, config, client_id):
|
|
self.client_id = client_id
|
|
self.subscriptions: list[tuple[str, object]] = []
|
|
self.connected = False
|
|
|
|
def subscribe(self, topic, handler):
|
|
self.subscriptions.append((topic, handler))
|
|
|
|
def connect(self):
|
|
self.connected = True
|
|
|
|
monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus)
|
|
|
|
bus = sse_bridge.start_sse_bridge(VigilarConfig())
|
|
|
|
assert bus.connected is True
|
|
assert len(bus.subscriptions) == 1
|
|
topic, handler = bus.subscriptions[0]
|
|
assert topic == Topics.EVENTS_PUBLISHED
|
|
assert handler is sse_bridge.forward_event
|