Compare commits
2 Commits
main
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d17186f466 | ||
|
|
e657f2bfbc |
@@ -40,4 +40,4 @@ All access is read-mostly via `vigilar.storage.queries`. Touches essentially eve
|
||||
|
||||
## Notes
|
||||
|
||||
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function — that function is defined but has no call site in the repo at time of writing, so live SSE updates will only flow once the bridge from MQTT into that queue is wired.
|
||||
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function. A dedicated MQTT → SSE bridge (`vigilar/web/sse_bridge.py`) runs inside the web process, subscribes to `Topics.EVENTS_PUBLISHED`, and calls `broadcast_sse_event` for every classified event emitted by the events subsystem — so the in-browser event timeline updates live without a page refresh.
|
||||
|
||||
@@ -585,13 +585,6 @@ Do not expose port `49735` directly on the WAN; require the tunnel.
|
||||
|
||||
## Known limitations
|
||||
|
||||
- **Event timeline is not live.** The web UI event timeline requires
|
||||
a page refresh to show new events. `broadcast_sse_event` exists in
|
||||
`vigilar/web/blueprints/events.py` but has zero call sites today;
|
||||
events are not pushed to browsers via SSE. Web Push notifications
|
||||
via VAPID are independent of the timeline and do work: you will
|
||||
get mobile alerts as motion happens, but the in-page timeline lags
|
||||
until you reload.
|
||||
- **Recording integrity is not authenticated.** AES-256-CTR gives you
|
||||
confidentiality, not tamper-evidence. If an attacker reaches the
|
||||
recordings directory they can modify ciphertext unnoticed. See the
|
||||
|
||||
@@ -11,6 +11,28 @@ from vigilar.config import VigilarConfig, load_config
|
||||
from vigilar.storage.schema import metadata
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="session")
|
||||
def _isolate_vigilar_config(tmp_path_factory):
|
||||
"""Prevent tests from writing to the real config/vigilar.toml.
|
||||
|
||||
Web endpoint handlers that call `_save_and_reload()` read the target
|
||||
path from the VIGILAR_CONFIG env var, falling back to the relative
|
||||
`"config/vigilar.toml"`. Without this fixture, any test that exercises
|
||||
such an endpoint rewrites the repo's committed config file via a
|
||||
Pydantic round-trip, stripping comments and non-default fields.
|
||||
"""
|
||||
tmp_config = tmp_path_factory.mktemp("vigilar-config") / "vigilar.toml"
|
||||
prev = os.environ.get("VIGILAR_CONFIG")
|
||||
os.environ["VIGILAR_CONFIG"] = str(tmp_config)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if prev is None:
|
||||
os.environ.pop("VIGILAR_CONFIG", None)
|
||||
else:
|
||||
os.environ["VIGILAR_CONFIG"] = prev
|
||||
|
||||
|
||||
def _create_test_engine(db_path: Path):
|
||||
"""Create a fresh engine for testing (bypasses the global singleton)."""
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@@ -429,3 +429,85 @@ class TestPetEventClassification:
|
||||
assert etype == EventType.WILDLIFE_PASSIVE
|
||||
assert sev == Severity.INFO
|
||||
assert source == "front"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Classified-event broadcast (for SSE bridge)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _RecordingBus:
|
||||
"""Minimal bus fake that records publishes instead of sending them."""
|
||||
|
||||
def __init__(self):
|
||||
self.published: list[tuple[str, dict]] = []
|
||||
|
||||
def publish(self, topic: str, payload: dict, qos: int = 1) -> None:
|
||||
self.published.append((topic, payload))
|
||||
|
||||
def publish_event(self, topic: str, **kwargs) -> None:
|
||||
self.publish(topic, kwargs)
|
||||
|
||||
|
||||
class _StubFSM:
|
||||
state = ArmState.DISARMED
|
||||
|
||||
|
||||
class _StubRuleEngine:
|
||||
def evaluate(self, topic, payload, state):
|
||||
return []
|
||||
|
||||
|
||||
class TestEventsPublishedBroadcast:
|
||||
"""EventProcessor should publish a classified-event summary to
|
||||
Topics.EVENTS_PUBLISHED after inserting, so the Flask SSE bridge
|
||||
can forward it to browser clients."""
|
||||
|
||||
def test_handle_event_publishes_classified_payload(self, test_db):
|
||||
from vigilar.events.processor import EventProcessor
|
||||
from vigilar.constants import Topics
|
||||
|
||||
processor = EventProcessor.__new__(EventProcessor)
|
||||
bus = _RecordingBus()
|
||||
|
||||
processor._handle_event(
|
||||
topic="vigilar/camera/cam1/motion/start",
|
||||
payload={"ts": 12345, "detail": "x"},
|
||||
engine=test_db,
|
||||
fsm=_StubFSM(),
|
||||
rule_engine=_StubRuleEngine(),
|
||||
bus=bus,
|
||||
)
|
||||
|
||||
published_on_bridge_topic = [
|
||||
p for t, p in bus.published if t == Topics.EVENTS_PUBLISHED
|
||||
]
|
||||
assert len(published_on_bridge_topic) == 1, (
|
||||
f"expected exactly one publish to {Topics.EVENTS_PUBLISHED}, "
|
||||
f"got publishes: {bus.published}"
|
||||
)
|
||||
|
||||
msg = published_on_bridge_topic[0]
|
||||
assert msg["type"] == EventType.MOTION_START
|
||||
assert msg["severity"] == Severity.WARNING
|
||||
assert msg["source_id"] == "cam1"
|
||||
assert "ts" in msg
|
||||
assert "id" in msg and isinstance(msg["id"], int)
|
||||
|
||||
def test_unclassified_topic_does_not_publish(self, test_db):
|
||||
"""Heartbeats and other non-event topics must not be forwarded."""
|
||||
from vigilar.events.processor import EventProcessor
|
||||
from vigilar.constants import Topics
|
||||
|
||||
processor = EventProcessor.__new__(EventProcessor)
|
||||
bus = _RecordingBus()
|
||||
|
||||
processor._handle_event(
|
||||
topic="vigilar/camera/cam1/heartbeat",
|
||||
payload={"ts": 12345},
|
||||
engine=test_db,
|
||||
fsm=_StubFSM(),
|
||||
rule_engine=_StubRuleEngine(),
|
||||
bus=bus,
|
||||
)
|
||||
|
||||
assert not any(t == Topics.EVENTS_PUBLISHED for t, _ in bus.published)
|
||||
|
||||
62
tests/unit/test_sse_bridge.py
Normal file
62
tests/unit/test_sse_bridge.py
Normal file
@@ -0,0 +1,62 @@
|
||||
"""Tests for the MQTT → Server-Sent Events bridge used by the web process."""
|
||||
|
||||
import json
|
||||
import queue
|
||||
|
||||
|
||||
def test_forward_event_delivers_payload_to_sse_subscribers():
|
||||
"""forward_event should hand the payload to every connected SSE client."""
|
||||
from vigilar.web.blueprints import events as events_bp
|
||||
from vigilar.web.sse_bridge import forward_event
|
||||
|
||||
q: queue.Queue = queue.Queue(maxsize=10)
|
||||
events_bp._sse_subscribers.append(q)
|
||||
try:
|
||||
payload = {
|
||||
"type": "MOTION_START",
|
||||
"source_id": "cam1",
|
||||
"severity": "WARNING",
|
||||
"ts": 1234,
|
||||
}
|
||||
forward_event("vigilar/events/published", payload)
|
||||
|
||||
raw = q.get_nowait()
|
||||
assert raw.startswith("data: ")
|
||||
# Strip "data: " prefix and trailing double newline
|
||||
data = json.loads(raw[len("data: "):].rstrip())
|
||||
assert data["type"] == "MOTION_START"
|
||||
assert data["source_id"] == "cam1"
|
||||
finally:
|
||||
if q in events_bp._sse_subscribers:
|
||||
events_bp._sse_subscribers.remove(q)
|
||||
|
||||
|
||||
def test_start_sse_bridge_subscribes_to_events_published(monkeypatch):
|
||||
"""start_sse_bridge must connect a bus and subscribe forward_event
|
||||
to Topics.EVENTS_PUBLISHED — that is the single entry point for the
|
||||
web process to observe classified events from the events subsystem."""
|
||||
from vigilar.config import VigilarConfig
|
||||
from vigilar.constants import Topics
|
||||
from vigilar.web import sse_bridge
|
||||
|
||||
class FakeBus:
|
||||
def __init__(self, config, client_id):
|
||||
self.client_id = client_id
|
||||
self.subscriptions: list[tuple[str, object]] = []
|
||||
self.connected = False
|
||||
|
||||
def subscribe(self, topic, handler):
|
||||
self.subscriptions.append((topic, handler))
|
||||
|
||||
def connect(self):
|
||||
self.connected = True
|
||||
|
||||
monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus)
|
||||
|
||||
bus = sse_bridge.start_sse_bridge(VigilarConfig())
|
||||
|
||||
assert bus.connected is True
|
||||
assert len(bus.subscriptions) == 1
|
||||
topic, handler = bus.subscriptions[0]
|
||||
assert topic == Topics.EVENTS_PUBLISHED
|
||||
assert handler is sse_bridge.forward_event
|
||||
@@ -211,6 +211,10 @@ class Topics:
|
||||
SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
|
||||
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
|
||||
|
||||
# Classified events forwarded to the web SSE bridge (see events.processor
|
||||
# and web.sse_bridge).
|
||||
EVENTS_PUBLISHED = "vigilar/events/published"
|
||||
|
||||
# Wildcard subscriptions
|
||||
ALL = "vigilar/#"
|
||||
ALL_CAMERAS = "vigilar/camera/#"
|
||||
|
||||
@@ -103,6 +103,22 @@ class EventProcessor:
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
# Broadcast a classified summary for the web SSE bridge. This is
|
||||
# independent of rule actions and DB storage — the web process
|
||||
# subscribes to Topics.EVENTS_PUBLISHED and forwards each message
|
||||
# to connected browser clients.
|
||||
bus.publish(
|
||||
Topics.EVENTS_PUBLISHED,
|
||||
{
|
||||
"id": event_id,
|
||||
"ts": int(time.time() * 1000),
|
||||
"type": event_type,
|
||||
"severity": severity,
|
||||
"source_id": source_id,
|
||||
"payload": payload,
|
||||
},
|
||||
)
|
||||
|
||||
# Insert pet/wildlife sightings
|
||||
if event_type in (
|
||||
EventType.PET_DETECTED, EventType.PET_ESCAPE, EventType.UNKNOWN_ANIMAL
|
||||
|
||||
@@ -67,9 +67,27 @@ class SubsystemProcess:
|
||||
def _run_web(cfg: VigilarConfig) -> None:
|
||||
"""Run the Flask web server in a subprocess."""
|
||||
from vigilar.web.app import create_app
|
||||
from vigilar.web.sse_bridge import start_sse_bridge
|
||||
|
||||
app = create_app(cfg)
|
||||
|
||||
# Forward classified events from MQTT to browser SSE clients. Failure
|
||||
# here must not kill the web process — the UI still works without
|
||||
# live updates, it just requires a page refresh.
|
||||
sse_bus = None
|
||||
try:
|
||||
sse_bus = start_sse_bridge(cfg)
|
||||
except Exception:
|
||||
log.exception("Failed to start SSE bridge; live event timeline will not update")
|
||||
|
||||
try:
|
||||
app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False)
|
||||
finally:
|
||||
if sse_bus is not None:
|
||||
try:
|
||||
sse_bus.disconnect()
|
||||
except Exception:
|
||||
log.exception("Error disconnecting SSE bridge bus")
|
||||
|
||||
|
||||
def _run_event_processor(cfg: VigilarConfig) -> None:
|
||||
|
||||
36
vigilar/web/sse_bridge.py
Normal file
36
vigilar/web/sse_bridge.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Bridge classified events from MQTT to Server-Sent Events subscribers.
|
||||
|
||||
The events subsystem (`vigilar.events.processor.EventProcessor`) publishes
|
||||
classified events to `Topics.EVENTS_PUBLISHED`. The Flask web process runs
|
||||
in its own OS process, so to make the in-browser event timeline update
|
||||
live it must subscribe to that topic via its own `MessageBus` client and
|
||||
forward every message to `broadcast_sse_event`.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from vigilar.bus import MessageBus
|
||||
from vigilar.config import VigilarConfig
|
||||
from vigilar.constants import Topics
|
||||
from vigilar.web.blueprints.events import broadcast_sse_event
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def forward_event(topic: str, payload: dict[str, Any]) -> None:
|
||||
"""MQTT handler: forward a classified event to SSE subscribers."""
|
||||
broadcast_sse_event(payload)
|
||||
|
||||
|
||||
def start_sse_bridge(cfg: VigilarConfig) -> MessageBus:
|
||||
"""Create an MQTT client that forwards classified events to SSE clients.
|
||||
|
||||
Returns the connected `MessageBus` so the caller can disconnect it on
|
||||
shutdown.
|
||||
"""
|
||||
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-sse-bridge")
|
||||
bus.subscribe(Topics.EVENTS_PUBLISHED, forward_event)
|
||||
bus.connect()
|
||||
log.info("SSE bridge started: subscribed to %s", Topics.EVENTS_PUBLISHED)
|
||||
return bus
|
||||
Reference in New Issue
Block a user