Phase 1 (Foundation): project skeleton, TOML config + Pydantic validation, MQTT bus wrapper, SQLite schema (9 tables), Click CLI, process supervisor. Phase 2 (Camera): RTSP capture via OpenCV, MOG2 motion detection with configurable sensitivity/zones, adaptive FPS recording (2fps idle/30fps motion) via FFmpeg subprocess, HLS live streaming, pre-motion ring buffer. Phase 3 (Web UI): Flask + Bootstrap 5 dark theme, 6 blueprints, Jinja2 templates (dashboard, kiosk 2x2 grid, events, sensors, recordings, settings), PWA with service worker + Web Push, full admin settings UI with config persistence. Remote Access: WireGuard tunnel configs, nginx reverse proxy with HLS caching + rate limiting, bandwidth-optimized remote HLS stream (426x240 @ 500kbps), DO droplet setup script, certbot TLS. 29 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
227 lines
6.9 KiB
Python
227 lines
6.9 KiB
Python
"""Write configuration changes back to the TOML file.
|
|
|
|
Since Python's tomllib is read-only, we use a simple approach:
|
|
serialize the Pydantic config to a dict and write it with tomli_w
|
|
(or a manual TOML serializer). This preserves all values but not
|
|
comments from the original file.
|
|
"""
|
|
|
|
import copy
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from vigilar.config import VigilarConfig
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _serialize_value(v: Any) -> Any:
|
|
"""Convert Pydantic types to TOML-compatible types."""
|
|
if isinstance(v, bool):
|
|
return v
|
|
if isinstance(v, (int, float, str)):
|
|
return v
|
|
if isinstance(v, list):
|
|
return [_serialize_value(i) for i in v]
|
|
if isinstance(v, dict):
|
|
return {k: _serialize_value(val) for k, val in v.items()}
|
|
return str(v)
|
|
|
|
|
|
def _to_toml_string(data: dict[str, Any], indent: int = 0) -> str:
|
|
"""Convert a dict to TOML string format."""
|
|
lines: list[str] = []
|
|
|
|
# Internal keys to skip in normal processing
|
|
skip_keys = {"_sensor_gpio"}
|
|
|
|
# Separate simple key-value pairs from tables and arrays of tables
|
|
tables = {}
|
|
array_tables = {}
|
|
|
|
for key, value in data.items():
|
|
if key in skip_keys:
|
|
continue
|
|
if isinstance(value, dict):
|
|
tables[key] = value
|
|
elif isinstance(value, list) and value and isinstance(value[0], dict):
|
|
array_tables[key] = value
|
|
elif isinstance(value, list) and not value:
|
|
# Skip empty arrays — they conflict with array-of-tables notation
|
|
continue
|
|
else:
|
|
lines.append(f"{key} = {_format_toml_value(value)}")
|
|
|
|
# Write tables
|
|
for key, value in tables.items():
|
|
if lines:
|
|
lines.append("")
|
|
lines.append(f"[{key}]")
|
|
for k, v in value.items():
|
|
if isinstance(v, dict):
|
|
lines.append("")
|
|
lines.append(f"[{key}.{k}]")
|
|
for kk, vv in v.items():
|
|
lines.append(f"{kk} = {_format_toml_value(vv)}")
|
|
else:
|
|
lines.append(f"{k} = {_format_toml_value(v)}")
|
|
|
|
# Write arrays of tables
|
|
for key, items in array_tables.items():
|
|
for item in items:
|
|
lines.append("")
|
|
lines.append(f"[[{key}]]")
|
|
for k, v in item.items():
|
|
if isinstance(v, list) and v and isinstance(v[0], dict):
|
|
# Inline array of dicts (e.g., rule conditions)
|
|
lines.append(f"{k} = [")
|
|
for d in v:
|
|
parts = ", ".join(f'{dk} = {_format_toml_value(dv)}' for dk, dv in d.items() if dv)
|
|
lines.append(f" {{{parts}}},")
|
|
lines.append("]")
|
|
else:
|
|
lines.append(f"{k} = {_format_toml_value(v)}")
|
|
|
|
# Write sensor GPIO config as [sensors.gpio] after [[sensors]] entries
|
|
gpio_cfg = data.get("_sensor_gpio", {})
|
|
if gpio_cfg:
|
|
lines.append("")
|
|
lines.append("[sensors.gpio]")
|
|
for k, v in gpio_cfg.items():
|
|
lines.append(f"{k} = {_format_toml_value(v)}")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def _format_toml_value(value: Any) -> str:
|
|
"""Format a single value for TOML output."""
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
if isinstance(value, int):
|
|
return str(value)
|
|
if isinstance(value, float):
|
|
return str(value)
|
|
if isinstance(value, str):
|
|
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
|
|
return f'"{escaped}"'
|
|
if isinstance(value, list):
|
|
if not value:
|
|
return "[]"
|
|
if all(isinstance(v, (int, float)) for v in value):
|
|
return f"[{', '.join(str(v) for v in value)}]"
|
|
if all(isinstance(v, str) for v in value):
|
|
return f"[{', '.join(_format_toml_value(v) for v in value)}]"
|
|
return f"[{', '.join(_format_toml_value(v) for v in value)}]"
|
|
return f'"{value}"'
|
|
|
|
|
|
def config_to_dict(cfg: VigilarConfig) -> dict[str, Any]:
|
|
"""Convert a VigilarConfig to a TOML-ready dict structure."""
|
|
data = cfg.model_dump(by_alias=False)
|
|
|
|
# Restructure for TOML layout
|
|
result: dict[str, Any] = {}
|
|
|
|
result["system"] = data.get("system", {})
|
|
result["mqtt"] = data.get("mqtt", {})
|
|
result["web"] = data.get("web", {})
|
|
result["zigbee2mqtt"] = data.get("zigbee2mqtt", {})
|
|
result["ups"] = data.get("ups", {})
|
|
result["storage"] = data.get("storage", {})
|
|
result["alerts"] = data.get("alerts", {})
|
|
result["remote"] = data.get("remote", {})
|
|
result["cameras"] = data.get("cameras", [])
|
|
|
|
# Sensors as array-of-tables + gpio sub-table
|
|
sensors = data.get("sensors", [])
|
|
gpio_cfg = data.get("sensor_gpio", {})
|
|
result["sensors"] = sensors
|
|
# gpio config stored separately — only written if sensors exist
|
|
result["_sensor_gpio"] = gpio_cfg
|
|
|
|
result["rules"] = data.get("rules", [])
|
|
|
|
return result
|
|
|
|
|
|
def save_config(cfg: VigilarConfig, path: str | Path) -> None:
|
|
"""Save config to a TOML file, creating a backup of the original."""
|
|
path = Path(path)
|
|
|
|
# Backup existing config
|
|
if path.exists():
|
|
backup = path.with_suffix(f".toml.bak.{int(time.time())}")
|
|
shutil.copy2(path, backup)
|
|
log.info("Config backup: %s", backup.name)
|
|
|
|
data = config_to_dict(cfg)
|
|
toml_str = _to_toml_string(data)
|
|
|
|
# Atomic write via temp file
|
|
tmp_path = path.with_suffix(".toml.tmp")
|
|
tmp_path.write_text(toml_str)
|
|
tmp_path.replace(path)
|
|
|
|
log.info("Config saved: %s", path)
|
|
|
|
|
|
def update_config_section(
|
|
cfg: VigilarConfig,
|
|
section: str,
|
|
updates: dict[str, Any],
|
|
) -> VigilarConfig:
|
|
"""Update a section of the config and return a new VigilarConfig.
|
|
|
|
Args:
|
|
cfg: Current config.
|
|
section: Top-level section name (e.g., 'system', 'web', 'ups').
|
|
updates: Dict of field names to new values.
|
|
|
|
Returns:
|
|
New VigilarConfig with the updates applied.
|
|
"""
|
|
data = cfg.model_dump(by_alias=False)
|
|
|
|
if section in data and isinstance(data[section], dict):
|
|
data[section].update(updates)
|
|
else:
|
|
data[section] = updates
|
|
|
|
return VigilarConfig(**data)
|
|
|
|
|
|
def update_camera_config(
|
|
cfg: VigilarConfig,
|
|
camera_id: str,
|
|
updates: dict[str, Any],
|
|
) -> VigilarConfig:
|
|
"""Update a specific camera's config."""
|
|
data = cfg.model_dump(by_alias=False)
|
|
|
|
for cam in data.get("cameras", []):
|
|
if cam["id"] == camera_id:
|
|
cam.update(updates)
|
|
break
|
|
|
|
return VigilarConfig(**data)
|
|
|
|
|
|
def update_alert_config(
|
|
cfg: VigilarConfig,
|
|
channel: str,
|
|
updates: dict[str, Any],
|
|
) -> VigilarConfig:
|
|
"""Update a specific alert channel config."""
|
|
data = cfg.model_dump(by_alias=False)
|
|
|
|
if "alerts" not in data:
|
|
data["alerts"] = {}
|
|
if channel in data["alerts"]:
|
|
data["alerts"][channel].update(updates)
|
|
|
|
return VigilarConfig(**data)
|