feat: wire MQTT → SSE bridge so the event timeline updates live
Closes #1. The Flask event-timeline was dead: `broadcast_sse_event` existed in `vigilar/web/blueprints/events.py` but had zero call sites. Clients subscribed to `/events/stream`, received the initial "connected" message, and then only keepalives — a page refresh was required to see new events. (Web Push via VAPID was independent and already worked.) The root cause was a process-boundary gap: the events subsystem runs in its own OS process and emits to MQTT, while the Flask app runs in a separate process with no MQTT client of its own. This change adds a thin bridge: - EventProcessor._handle_event now publishes a classified summary (id, ts, type, severity, source_id, payload) to a new topic `Topics.EVENTS_PUBLISHED = "vigilar/events/published"` right after `insert_event()`. Classification logic stays in one place. - A new module `vigilar/web/sse_bridge.py` provides `forward_event` (MQTT handler) and `start_sse_bridge(cfg)` (creates a MessageBus, subscribes forward_event to EVENTS_PUBLISHED, connects, returns the bus). - `vigilar/main.py:_run_web` starts the bridge after `create_app(cfg)` and disconnects it on shutdown. Bridge failure is logged but does not kill the web process — the UI still works without live updates. - `create_app` is deliberately NOT changed. Keeping the bridge out of the app factory means no existing test triggers a real MQTT connection, and the bridge stays a production-only concern wired by the supervisor. Tests (all added with TDD, RED verified before GREEN): - tests/unit/test_events.py::TestEventsPublishedBroadcast — asserts `_handle_event` publishes the classified payload for a motion event and does NOT publish for unclassified topics (heartbeats). - tests/unit/test_sse_bridge.py — asserts `forward_event` reaches SSE subscribers, and `start_sse_bridge` wires the handler to `Topics.EVENTS_PUBLISHED` on a connected bus (fake bus, no real MQTT in tests). Also refreshes the docs that previously flagged the dead SSE as a known limitation (operator guide, web architecture doc). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -40,4 +40,4 @@ All access is read-mostly via `vigilar.storage.queries`. Touches essentially eve
|
||||
|
||||
## Notes
|
||||
|
||||
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function — that function is defined but has no call site in the repo at time of writing, so live SSE updates will only flow once the bridge from MQTT into that queue is wired.
|
||||
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function. A dedicated MQTT → SSE bridge (`vigilar/web/sse_bridge.py`) runs inside the web process, subscribes to `Topics.EVENTS_PUBLISHED`, and calls `broadcast_sse_event` for every classified event emitted by the events subsystem — so the in-browser event timeline updates live without a page refresh.
|
||||
|
||||
@@ -585,13 +585,6 @@ Do not expose port `49735` directly on the WAN; require the tunnel.
|
||||
|
||||
## Known limitations
|
||||
|
||||
- **Event timeline is not live.** The web UI event timeline requires
|
||||
a page refresh to show new events. `broadcast_sse_event` exists in
|
||||
`vigilar/web/blueprints/events.py` but has zero call sites today;
|
||||
events are not pushed to browsers via SSE. Web Push notifications
|
||||
via VAPID are independent of the timeline and do work: you will
|
||||
get mobile alerts as motion happens, but the in-page timeline lags
|
||||
until you reload.
|
||||
- **Recording integrity is not authenticated.** AES-256-CTR gives you
|
||||
confidentiality, not tamper-evidence. If an attacker reaches the
|
||||
recordings directory they can modify ciphertext unnoticed. See the
|
||||
|
||||
@@ -429,3 +429,85 @@ class TestPetEventClassification:
|
||||
assert etype == EventType.WILDLIFE_PASSIVE
|
||||
assert sev == Severity.INFO
|
||||
assert source == "front"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Classified-event broadcast (for SSE bridge)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _RecordingBus:
|
||||
"""Minimal bus fake that records publishes instead of sending them."""
|
||||
|
||||
def __init__(self):
|
||||
self.published: list[tuple[str, dict]] = []
|
||||
|
||||
def publish(self, topic: str, payload: dict, qos: int = 1) -> None:
|
||||
self.published.append((topic, payload))
|
||||
|
||||
def publish_event(self, topic: str, **kwargs) -> None:
|
||||
self.publish(topic, kwargs)
|
||||
|
||||
|
||||
class _StubFSM:
|
||||
state = ArmState.DISARMED
|
||||
|
||||
|
||||
class _StubRuleEngine:
|
||||
def evaluate(self, topic, payload, state):
|
||||
return []
|
||||
|
||||
|
||||
class TestEventsPublishedBroadcast:
|
||||
"""EventProcessor should publish a classified-event summary to
|
||||
Topics.EVENTS_PUBLISHED after inserting, so the Flask SSE bridge
|
||||
can forward it to browser clients."""
|
||||
|
||||
def test_handle_event_publishes_classified_payload(self, test_db):
|
||||
from vigilar.events.processor import EventProcessor
|
||||
from vigilar.constants import Topics
|
||||
|
||||
processor = EventProcessor.__new__(EventProcessor)
|
||||
bus = _RecordingBus()
|
||||
|
||||
processor._handle_event(
|
||||
topic="vigilar/camera/cam1/motion/start",
|
||||
payload={"ts": 12345, "detail": "x"},
|
||||
engine=test_db,
|
||||
fsm=_StubFSM(),
|
||||
rule_engine=_StubRuleEngine(),
|
||||
bus=bus,
|
||||
)
|
||||
|
||||
published_on_bridge_topic = [
|
||||
p for t, p in bus.published if t == Topics.EVENTS_PUBLISHED
|
||||
]
|
||||
assert len(published_on_bridge_topic) == 1, (
|
||||
f"expected exactly one publish to {Topics.EVENTS_PUBLISHED}, "
|
||||
f"got publishes: {bus.published}"
|
||||
)
|
||||
|
||||
msg = published_on_bridge_topic[0]
|
||||
assert msg["type"] == EventType.MOTION_START
|
||||
assert msg["severity"] == Severity.WARNING
|
||||
assert msg["source_id"] == "cam1"
|
||||
assert "ts" in msg
|
||||
assert "id" in msg and isinstance(msg["id"], int)
|
||||
|
||||
def test_unclassified_topic_does_not_publish(self, test_db):
|
||||
"""Heartbeats and other non-event topics must not be forwarded."""
|
||||
from vigilar.events.processor import EventProcessor
|
||||
from vigilar.constants import Topics
|
||||
|
||||
processor = EventProcessor.__new__(EventProcessor)
|
||||
bus = _RecordingBus()
|
||||
|
||||
processor._handle_event(
|
||||
topic="vigilar/camera/cam1/heartbeat",
|
||||
payload={"ts": 12345},
|
||||
engine=test_db,
|
||||
fsm=_StubFSM(),
|
||||
rule_engine=_StubRuleEngine(),
|
||||
bus=bus,
|
||||
)
|
||||
|
||||
assert not any(t == Topics.EVENTS_PUBLISHED for t, _ in bus.published)
|
||||
|
||||
62
tests/unit/test_sse_bridge.py
Normal file
62
tests/unit/test_sse_bridge.py
Normal file
@@ -0,0 +1,62 @@
|
||||
"""Tests for the MQTT → Server-Sent Events bridge used by the web process."""
|
||||
|
||||
import json
|
||||
import queue
|
||||
|
||||
|
||||
def test_forward_event_delivers_payload_to_sse_subscribers():
|
||||
"""forward_event should hand the payload to every connected SSE client."""
|
||||
from vigilar.web.blueprints import events as events_bp
|
||||
from vigilar.web.sse_bridge import forward_event
|
||||
|
||||
q: queue.Queue = queue.Queue(maxsize=10)
|
||||
events_bp._sse_subscribers.append(q)
|
||||
try:
|
||||
payload = {
|
||||
"type": "MOTION_START",
|
||||
"source_id": "cam1",
|
||||
"severity": "WARNING",
|
||||
"ts": 1234,
|
||||
}
|
||||
forward_event("vigilar/events/published", payload)
|
||||
|
||||
raw = q.get_nowait()
|
||||
assert raw.startswith("data: ")
|
||||
# Strip "data: " prefix and trailing double newline
|
||||
data = json.loads(raw[len("data: "):].rstrip())
|
||||
assert data["type"] == "MOTION_START"
|
||||
assert data["source_id"] == "cam1"
|
||||
finally:
|
||||
if q in events_bp._sse_subscribers:
|
||||
events_bp._sse_subscribers.remove(q)
|
||||
|
||||
|
||||
def test_start_sse_bridge_subscribes_to_events_published(monkeypatch):
|
||||
"""start_sse_bridge must connect a bus and subscribe forward_event
|
||||
to Topics.EVENTS_PUBLISHED — that is the single entry point for the
|
||||
web process to observe classified events from the events subsystem."""
|
||||
from vigilar.config import VigilarConfig
|
||||
from vigilar.constants import Topics
|
||||
from vigilar.web import sse_bridge
|
||||
|
||||
class FakeBus:
|
||||
def __init__(self, config, client_id):
|
||||
self.client_id = client_id
|
||||
self.subscriptions: list[tuple[str, object]] = []
|
||||
self.connected = False
|
||||
|
||||
def subscribe(self, topic, handler):
|
||||
self.subscriptions.append((topic, handler))
|
||||
|
||||
def connect(self):
|
||||
self.connected = True
|
||||
|
||||
monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus)
|
||||
|
||||
bus = sse_bridge.start_sse_bridge(VigilarConfig())
|
||||
|
||||
assert bus.connected is True
|
||||
assert len(bus.subscriptions) == 1
|
||||
topic, handler = bus.subscriptions[0]
|
||||
assert topic == Topics.EVENTS_PUBLISHED
|
||||
assert handler is sse_bridge.forward_event
|
||||
@@ -211,6 +211,10 @@ class Topics:
|
||||
SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
|
||||
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
|
||||
|
||||
# Classified events forwarded to the web SSE bridge (see events.processor
|
||||
# and web.sse_bridge).
|
||||
EVENTS_PUBLISHED = "vigilar/events/published"
|
||||
|
||||
# Wildcard subscriptions
|
||||
ALL = "vigilar/#"
|
||||
ALL_CAMERAS = "vigilar/camera/#"
|
||||
|
||||
@@ -103,6 +103,22 @@ class EventProcessor:
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
# Broadcast a classified summary for the web SSE bridge. This is
|
||||
# independent of rule actions and DB storage — the web process
|
||||
# subscribes to Topics.EVENTS_PUBLISHED and forwards each message
|
||||
# to connected browser clients.
|
||||
bus.publish(
|
||||
Topics.EVENTS_PUBLISHED,
|
||||
{
|
||||
"id": event_id,
|
||||
"ts": int(time.time() * 1000),
|
||||
"type": event_type,
|
||||
"severity": severity,
|
||||
"source_id": source_id,
|
||||
"payload": payload,
|
||||
},
|
||||
)
|
||||
|
||||
# Insert pet/wildlife sightings
|
||||
if event_type in (
|
||||
EventType.PET_DETECTED, EventType.PET_ESCAPE, EventType.UNKNOWN_ANIMAL
|
||||
|
||||
@@ -67,9 +67,27 @@ class SubsystemProcess:
|
||||
def _run_web(cfg: VigilarConfig) -> None:
|
||||
"""Run the Flask web server in a subprocess."""
|
||||
from vigilar.web.app import create_app
|
||||
from vigilar.web.sse_bridge import start_sse_bridge
|
||||
|
||||
app = create_app(cfg)
|
||||
|
||||
# Forward classified events from MQTT to browser SSE clients. Failure
|
||||
# here must not kill the web process — the UI still works without
|
||||
# live updates, it just requires a page refresh.
|
||||
sse_bus = None
|
||||
try:
|
||||
sse_bus = start_sse_bridge(cfg)
|
||||
except Exception:
|
||||
log.exception("Failed to start SSE bridge; live event timeline will not update")
|
||||
|
||||
try:
|
||||
app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False)
|
||||
finally:
|
||||
if sse_bus is not None:
|
||||
try:
|
||||
sse_bus.disconnect()
|
||||
except Exception:
|
||||
log.exception("Error disconnecting SSE bridge bus")
|
||||
|
||||
|
||||
def _run_event_processor(cfg: VigilarConfig) -> None:
|
||||
|
||||
36
vigilar/web/sse_bridge.py
Normal file
36
vigilar/web/sse_bridge.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Bridge classified events from MQTT to Server-Sent Events subscribers.
|
||||
|
||||
The events subsystem (`vigilar.events.processor.EventProcessor`) publishes
|
||||
classified events to `Topics.EVENTS_PUBLISHED`. The Flask web process runs
|
||||
in its own OS process, so to make the in-browser event timeline update
|
||||
live it must subscribe to that topic via its own `MessageBus` client and
|
||||
forward every message to `broadcast_sse_event`.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from vigilar.bus import MessageBus
|
||||
from vigilar.config import VigilarConfig
|
||||
from vigilar.constants import Topics
|
||||
from vigilar.web.blueprints.events import broadcast_sse_event
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def forward_event(topic: str, payload: dict[str, Any]) -> None:
|
||||
"""MQTT handler: forward a classified event to SSE subscribers."""
|
||||
broadcast_sse_event(payload)
|
||||
|
||||
|
||||
def start_sse_bridge(cfg: VigilarConfig) -> MessageBus:
|
||||
"""Create an MQTT client that forwards classified events to SSE clients.
|
||||
|
||||
Returns the connected `MessageBus` so the caller can disconnect it on
|
||||
shutdown.
|
||||
"""
|
||||
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-sse-bridge")
|
||||
bus.subscribe(Topics.EVENTS_PUBLISHED, forward_event)
|
||||
bus.connect()
|
||||
log.info("SSE bridge started: subscribed to %s", Topics.EVENTS_PUBLISHED)
|
||||
return bus
|
||||
Reference in New Issue
Block a user