Files
vigilar/vigilar/web/blueprints/system.py
adlee-was-taken 08c689e6dd fix(web): raise on MQTT connect timeout in _publish_arm_request
Code review on 9f203d8 caught a silent-failure mode: MessageBus.connect
logs and returns without raising when the MQTT handshake times out, so
an overloaded broker would let bus.publish() enqueue into paho's outbox
only to be discarded by the immediate disconnect(). The web endpoint
would return 202 even though the FSM never received the request.

Guard with 'if not bus.connected: raise RuntimeError'. The existing
try/except in arm_system/disarm_system catches the exception and turns
it into a 503 with the same log message as other bus failures.
2026-04-05 12:07:20 -04:00

392 lines
13 KiB
Python

"""System blueprint — health, arm/disarm, UPS, admin settings."""
import os
from flask import Blueprint, current_app, jsonify, render_template, request
from vigilar.alerts.pin import hash_pin, verify_pin
from vigilar.config import VigilarConfig
from vigilar.config_writer import (
save_config,
update_alert_config,
update_camera_config,
update_config_section,
)
system_bp = Blueprint("system", __name__, url_prefix="/system")
def _get_cfg() -> VigilarConfig:
return current_app.config.get("VIGILAR_CONFIG") or VigilarConfig()
def _get_config_path() -> str:
return os.environ.get("VIGILAR_CONFIG", "config/vigilar.toml")
def _save_and_reload(new_cfg: VigilarConfig) -> None:
"""Save config to disk and update the app's live config."""
save_config(new_cfg, _get_config_path())
current_app.config["VIGILAR_CONFIG"] = new_cfg
# --- Pages ---
@system_bp.route("/status")
def system_status():
"""JSON API: overall system health."""
cfg = _get_cfg()
return jsonify({
"arm_state": "DISARMED",
"ups": {"status": "UNKNOWN"},
"cameras_online": 0,
"cameras_total": len(cfg.cameras),
"sensors_online": 0,
"sensors_total": len(cfg.sensors),
})
@system_bp.route("/settings")
def settings_page():
cfg = _get_cfg()
return render_template("settings.html", cameras=cfg.cameras, config=cfg)
# --- Arm/Disarm ---
def _publish_arm_request(cfg: VigilarConfig, payload: dict) -> None:
"""Publish an arm/disarm request on MQTT for the event processor to pick up."""
from vigilar.bus import MessageBus
from vigilar.constants import Topics
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-arm-request")
bus.connect()
if not bus.connected:
raise RuntimeError("MQTT broker did not accept connection within timeout")
try:
bus.publish(Topics.SYSTEM_ARM_REQUEST, payload)
finally:
bus.disconnect()
@system_bp.route("/api/arm", methods=["POST"])
def arm_system():
data = request.get_json() or {}
mode = data.get("mode", "ARMED_AWAY")
pin = data.get("pin", "")
payload = {"mode": mode, "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"ok": True, "mode": mode}), 202
@system_bp.route("/api/disarm", methods=["POST"])
def disarm_system():
data = request.get_json() or {}
pin = data.get("pin", "")
payload = {"mode": "DISARMED", "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"ok": True, "mode": "DISARMED"}), 202
@system_bp.route("/api/reset-pin", methods=["POST"])
def reset_pin():
data = request.get_json() or {}
recovery_passphrase = data.get("recovery_passphrase", "")
new_pin = data.get("new_pin", "")
cfg = _get_cfg()
if not verify_pin(recovery_passphrase, cfg.security.recovery_passphrase_hash):
return jsonify({"error": "Invalid recovery passphrase"}), 401
new_security = cfg.security.model_copy(update={"pin_hash": hash_pin(new_pin)})
new_cfg = cfg.model_copy(update={"security": new_security})
try:
_save_and_reload(new_cfg)
except Exception:
current_app.config["VIGILAR_CONFIG"] = new_cfg
return jsonify({"ok": True})
# --- Config Read API ---
@system_bp.route("/api/config")
def get_config_api():
"""Return full config (secrets redacted)."""
cfg = _get_cfg()
data = cfg.model_dump(by_alias=False)
# Redact secrets
data.get("web", {}).pop("password_hash", None)
data.get("system", {}).pop("arm_pin_hash", None)
data.get("alerts", {}).get("webhook", {}).pop("secret", None)
data.get("storage", {}).pop("key_file", None)
data.pop("security", None)
return jsonify(data)
@system_bp.route("/api/config/<section>")
def get_config_section_api(section: str):
"""Return a specific config section."""
cfg = _get_cfg()
data = cfg.model_dump(by_alias=False)
if section == "cameras":
return jsonify(data.get("cameras", []))
if section == "sensors":
return jsonify(data.get("sensors", []))
if section == "rules":
return jsonify(data.get("rules", []))
if section in data:
return jsonify(data[section])
return jsonify({"error": f"Unknown section: {section}"}), 404
# --- System Settings ---
@system_bp.route("/api/config/system", methods=["POST"])
def update_system_config():
"""Update system settings (timezone, log level, dirs)."""
data = request.get_json() or {}
allowed = {"timezone", "log_level", "data_dir", "recordings_dir", "hls_dir", "name"}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return jsonify({"error": "No valid fields to update"}), 400
cfg = _get_cfg()
new_cfg = update_config_section(cfg, "system", updates)
_save_and_reload(new_cfg)
return jsonify({"ok": True, "updated": list(updates.keys())})
# --- Web Settings ---
@system_bp.route("/api/config/web", methods=["POST"])
def update_web_config():
data = request.get_json() or {}
allowed = {"host", "port", "username", "session_timeout"}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return jsonify({"error": "No valid fields"}), 400
cfg = _get_cfg()
new_cfg = update_config_section(cfg, "web", updates)
_save_and_reload(new_cfg)
return jsonify({"ok": True, "updated": list(updates.keys()), "note": "Restart required for port changes"})
# --- Remote Access Settings ---
@system_bp.route("/api/config/remote", methods=["POST"])
def update_remote_config():
data = request.get_json() or {}
allowed = {
"enabled", "upload_bandwidth_mbps",
"remote_hls_resolution", "remote_hls_fps",
"remote_hls_bitrate_kbps", "max_remote_viewers", "tunnel_ip",
}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return jsonify({"error": "No valid fields"}), 400
cfg = _get_cfg()
new_cfg = update_config_section(cfg, "remote", updates)
_save_and_reload(new_cfg)
return jsonify({"ok": True, "updated": list(updates.keys()), "note": "Restart required to apply stream changes"})
# --- MQTT Settings ---
@system_bp.route("/api/config/mqtt", methods=["POST"])
def update_mqtt_config():
data = request.get_json() or {}
allowed = {"host", "port", "username", "password"}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return jsonify({"error": "No valid fields"}), 400
cfg = _get_cfg()
new_cfg = update_config_section(cfg, "mqtt", updates)
_save_and_reload(new_cfg)
return jsonify({"ok": True, "updated": list(updates.keys()), "note": "Restart required"})
# --- Camera Settings ---
@system_bp.route("/api/config/camera/<camera_id>", methods=["POST"])
def update_camera_config_api(camera_id: str):
"""Update settings for a specific camera."""
data = request.get_json() or {}
allowed = {
"display_name", "rtsp_url", "enabled",
"record_continuous", "record_on_motion",
"motion_sensitivity", "motion_min_area_px", "motion_zones",
"pre_motion_buffer_s", "post_motion_buffer_s",
"idle_fps", "motion_fps", "retention_days",
"resolution_capture", "resolution_motion",
}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return jsonify({"error": "No valid fields"}), 400
cfg = _get_cfg()
# Verify camera exists
if not any(c.id == camera_id for c in cfg.cameras):
return jsonify({"error": f"Camera not found: {camera_id}"}), 404
new_cfg = update_camera_config(cfg, camera_id, updates)
_save_and_reload(new_cfg)
# Notify camera worker of runtime-changeable settings via MQTT
runtime_fields = {"motion_sensitivity", "motion_min_area_px"}
runtime_updates = {k: v for k, v in updates.items() if k in runtime_fields}
if runtime_updates:
try:
from vigilar.bus import MessageBus
bus = MessageBus(cfg.mqtt, client_id="web-config-push")
bus.connect()
bus.publish(f"vigilar/camera/{camera_id}/config", runtime_updates)
bus.disconnect()
except Exception:
pass # Non-critical — settings saved, worker picks up on restart
return jsonify({"ok": True, "camera_id": camera_id, "updated": list(updates.keys())})
# --- Camera threshold shortcut (used by settings sliders) ---
@system_bp.route("/api/camera/<camera_id>/threshold", methods=["POST"])
def update_camera_threshold(camera_id: str):
"""Quick endpoint for motion sensitivity slider."""
data = request.get_json() or {}
sensitivity = data.get("sensitivity")
if sensitivity is None:
return jsonify({"error": "Missing sensitivity"}), 400
sensitivity = max(0.0, min(1.0, float(sensitivity)))
cfg = _get_cfg()
if not any(c.id == camera_id for c in cfg.cameras):
return jsonify({"error": f"Camera not found: {camera_id}"}), 404
new_cfg = update_camera_config(cfg, camera_id, {"motion_sensitivity": sensitivity})
_save_and_reload(new_cfg)
# Push to live worker
try:
from vigilar.bus import MessageBus
bus = MessageBus(cfg.mqtt, client_id="web-threshold-push")
bus.connect()
bus.publish(f"vigilar/camera/{camera_id}/config", {"sensitivity": sensitivity})
bus.disconnect()
except Exception:
pass
return jsonify({"ok": True, "camera_id": camera_id, "sensitivity": sensitivity})
# --- UPS Settings ---
@system_bp.route("/api/config/ups", methods=["POST"])
def update_ups_config():
data = request.get_json() or {}
allowed = {
"enabled", "nut_host", "nut_port", "ups_name",
"poll_interval_s", "low_battery_threshold_pct",
"critical_runtime_threshold_s", "shutdown_delay_s",
}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return jsonify({"error": "No valid fields"}), 400
cfg = _get_cfg()
new_cfg = update_config_section(cfg, "ups", updates)
_save_and_reload(new_cfg)
return jsonify({"ok": True, "updated": list(updates.keys())})
# --- Storage Settings ---
@system_bp.route("/api/config/storage", methods=["POST"])
def update_storage_config():
data = request.get_json() or {}
allowed = {"encrypt_recordings", "max_disk_usage_gb", "free_space_floor_gb"}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return jsonify({"error": "No valid fields"}), 400
cfg = _get_cfg()
new_cfg = update_config_section(cfg, "storage", updates)
_save_and_reload(new_cfg)
return jsonify({"ok": True, "updated": list(updates.keys())})
# --- Alert Settings ---
@system_bp.route("/api/config/alerts/<channel>", methods=["POST"])
def update_alert_config_api(channel: str):
"""Update settings for a specific alert channel."""
data = request.get_json() or {}
valid_channels = {"local", "web_push", "email", "webhook"}
if channel not in valid_channels:
return jsonify({"error": f"Invalid channel: {channel}"}), 404
cfg = _get_cfg()
new_cfg = update_alert_config(cfg, channel, data)
_save_and_reload(new_cfg)
return jsonify({"ok": True, "channel": channel, "updated": list(data.keys())})
# --- VAPID Key ---
@system_bp.route("/api/vapid-key")
def vapid_public_key():
"""Return the VAPID public key for push subscription."""
cfg = _get_cfg()
key_file = cfg.alerts.web_push.vapid_private_key_file
if not os.path.exists(key_file):
return jsonify({"error": "VAPID keys not configured"}), 404
try:
from py_vapid import Vapid
vapid = Vapid.from_file(key_file)
public_key = vapid.public_key
# Encode as URL-safe base64
import base64
raw = public_key.public_bytes_raw()
b64 = base64.urlsafe_b64encode(raw).rstrip(b"=").decode()
return jsonify({"publicKey": b64})
except Exception as e:
return jsonify({"error": str(e)}), 500
# --- Push Subscription ---
@system_bp.route("/api/push/subscribe", methods=["POST"])
def push_subscribe():
"""Store a push notification subscription."""
data = request.get_json() or {}
endpoint = data.get("endpoint")
keys = data.get("keys", {})
p256dh = keys.get("p256dh", "")
auth = keys.get("auth", "")
if not endpoint or not p256dh or not auth:
return jsonify({"error": "Missing subscription fields"}), 400
try:
from vigilar.storage.db import get_db_path, get_engine
from vigilar.storage.queries import save_push_subscription
cfg = _get_cfg()
engine = get_engine(get_db_path(cfg.system.data_dir))
save_push_subscription(
engine, endpoint, p256dh, auth,
user_agent=request.headers.get("User-Agent", ""),
)
return jsonify({"ok": True})
except Exception as e:
return jsonify({"error": str(e)}), 500