"""Tests for the MQTT → Server-Sent Events bridge used by the web process.""" import json import queue def test_forward_event_delivers_payload_to_sse_subscribers(): """forward_event should hand the payload to every connected SSE client.""" from vigilar.web.blueprints import events as events_bp from vigilar.web.sse_bridge import forward_event q: queue.Queue = queue.Queue(maxsize=10) events_bp._sse_subscribers.append(q) try: payload = { "type": "MOTION_START", "source_id": "cam1", "severity": "WARNING", "ts": 1234, } forward_event("vigilar/events/published", payload) raw = q.get_nowait() assert raw.startswith("data: ") # Strip "data: " prefix and trailing double newline data = json.loads(raw[len("data: "):].rstrip()) assert data["type"] == "MOTION_START" assert data["source_id"] == "cam1" finally: if q in events_bp._sse_subscribers: events_bp._sse_subscribers.remove(q) def test_start_sse_bridge_subscribes_to_events_published(monkeypatch): """start_sse_bridge must connect a bus and subscribe forward_event to Topics.EVENTS_PUBLISHED — that is the single entry point for the web process to observe classified events from the events subsystem.""" from vigilar.config import VigilarConfig from vigilar.constants import Topics from vigilar.web import sse_bridge class FakeBus: def __init__(self, config, client_id): self.client_id = client_id self.subscriptions: list[tuple[str, object]] = [] self.connected = False def subscribe(self, topic, handler): self.subscriptions.append((topic, handler)) def connect(self): self.connected = True monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus) bus = sse_bridge.start_sse_bridge(VigilarConfig()) assert bus.connected is True assert len(bus.subscriptions) == 1 topic, handler = bus.subscriptions[0] assert topic == Topics.EVENTS_PUBLISHED assert handler is sse_bridge.forward_event