1 Commits

Author SHA1 Message Date
adlee-was-taken
f99472fc1a docs(plan): implementation plan for PIN hashing unification (issue #2)
Plan document for issue #2 — the three-way PIN hash mismatch across
CLI, events FSM, and web arm/disarm. Proposes canonicalizing on
PBKDF2-SHA256 via alerts/pin and [security] pin_hash, deprecating
[system] arm_pin_hash, and wiring web arm/disarm through MQTT to the
FSM so the web buttons actually transition state.

Nine tasks, TDD throughout. No code changes in this commit.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:18:16 -04:00
19 changed files with 90 additions and 689 deletions

View File

@@ -40,4 +40,4 @@ All access is read-mostly via `vigilar.storage.queries`. Touches essentially eve
## Notes
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function. A dedicated MQTT → SSE bridge (`vigilar/web/sse_bridge.py`) runs inside the web process, subscribes to `Topics.EVENTS_PUBLISHED`, and calls `broadcast_sse_event` for every classified event emitted by the events subsystem — so the in-browser event timeline updates live without a page refresh.
Templates use Jinja2 with a Bootstrap 5 dark theme; the camera grid uses `hls.js` for multi-camera HLS playback with an MJPEG fallback on the single-camera page. The app is a PWA (service worker + manifest) with VAPID Web Push for mobile notifications. The event-timeline SSE endpoint lives around line 93 of `blueprints/events.py` and holds a per-client `queue.Queue` fed by a module-level `broadcast_sse_event` function — that function is defined but has no call site in the repo at time of writing, so live SSE updates will only flow once the bridge from MQTT into that queue is wired.

View File

@@ -113,9 +113,8 @@ omitted sections behave sensibly.
`.vge` files.
- `hls_dir` (default `/var/vigilar/hls`): HLS segment output.
- `log_level` (default `"INFO"`): one of DEBUG, INFO, WARNING, ERROR.
- `arm_pin_hash` (default `""`): **deprecated.** Still parsed but
ignored at runtime. Use `[security] pin_hash` instead; run
`vigilar config set-pin` to generate the canonical hash.
- `arm_pin_hash` (default `""`): commented out in the sample; set via
`vigilar config set-pin`.
### `[mqtt]`
@@ -292,15 +291,11 @@ enabled = false`, `[visitors] enabled = false`, `[highlights] enabled
- `[location] latitude`, `longitude` (default `0.0`): used for sunrise
and sunset lookups.
- `[security] pin_hash` (canonical arm/disarm PIN store): populated by
`vigilar config set-pin`, which emits a PBKDF2-SHA256 hash to paste
into the `[security]` section. The legacy `[system] arm_pin_hash`
field is deprecated; see the `[system]` section above.
- `[security] recovery_passphrase_hash`: used by the web
`/system/api/reset-pin` endpoint to authenticate PIN-reset requests.
There is no CLI helper for this field today — set it by hashing a
passphrase manually with `vigilar.alerts.pin.hash_pin` and pasting
the result into `[security]`, or leave it unset to disable recovery.
- `[security] pin_hash` and `recovery_passphrase_hash`: populated by
`vigilar config set-pin` (the same hash is also stored under
`[system] arm_pin_hash` on the `system` model; both fields exist
because the web UI uses `[security]` while the CLI helper prints a
`[system]` line — pick one location and stick with it).
## CLI reference
@@ -349,9 +344,9 @@ sudo -u vigilar /opt/vigilar/venv/bin/vigilar config show \
```
Dumps the parsed config as JSON with `web.password_hash`,
`security.pin_hash`, `security.recovery_passphrase_hash`, and
`alerts.webhook.secret` redacted. Useful for confirming which
defaults Pydantic applied for keys you did not set.
`system.arm_pin_hash`, and `alerts.webhook.secret` redacted. Useful
for confirming which defaults Pydantic applied for keys you did not
set.
### `vigilar config set-password`
@@ -370,12 +365,10 @@ prints a `password_hash = "salt_hex:key_hex"` line to paste into
sudo -u vigilar /opt/vigilar/venv/bin/vigilar config set-pin
```
Prompts for an arm/disarm PIN, derives a salted PBKDF2-SHA256 hash
(600,000 iterations) via `vigilar.alerts.pin.hash_pin`, and prints a
`pin_hash = "pbkdf2_sha256$salt$dk"` line to paste into `[security]`.
Again, no file write. The same hash format is verified identically by
the web arm/disarm endpoint and by `ArmStateFSM` in the event
processor — there is one canonical PIN store.
Prompts for an arm/disarm PIN, generates a random 32-byte HMAC key,
computes `HMAC-SHA256(key, pin)`, and prints an `arm_pin_hash =
"secret_hex:mac_hex"` line to paste into `[system]`. Again, no file
write.
## Secrets and security
@@ -395,13 +388,9 @@ processor — there is one canonical PIN store.
volume on integrity-verified storage (dm-integrity, ZFS with
checksums) or mirror to write-once media.
- The web UI password is a scrypt hash set by `vigilar config
set-password` and stored at `[web] password_hash`. The arm/disarm
PIN is a PBKDF2-SHA256 hash (600k iterations, salted) set by
`vigilar config set-pin` and stored at `[security] pin_hash`.
A legacy `[system] arm_pin_hash` field is still parsed but ignored
at runtime; if it's set and `[security] pin_hash` is empty, the
service logs a deprecation warning at startup and arm/disarm will
behave as if no PIN were configured until you re-run `set-pin`.
set-password` and stored at `[web] password_hash`. The arm PIN is
an HMAC stored at `[system] arm_pin_hash` (and/or `[security]
pin_hash`).
- TLS: `gen_cert.sh` uses `mkcert` if present, otherwise an `openssl`
ECDSA P-256 self-signed certificate valid for 3650 days with SANs
for `vigilar.local`, `localhost`, `127.0.0.1`, and the detected LAN
@@ -596,6 +585,13 @@ Do not expose port `49735` directly on the WAN; require the tunnel.
## Known limitations
- **Event timeline is not live.** The web UI event timeline requires
a page refresh to show new events. `broadcast_sse_event` exists in
`vigilar/web/blueprints/events.py` but has zero call sites today;
events are not pushed to browsers via SSE. Web Push notifications
via VAPID are independent of the timeline and do work: you will
get mobile alerts as motion happens, but the in-page timeline lags
until you reload.
- **Recording integrity is not authenticated.** AES-256-CTR gives you
confidentiality, not tamper-evidence. If an attacker reaches the
recordings directory they can modify ciphertext unnoticed. See the
@@ -613,6 +609,10 @@ Do not expose port `49735` directly on the WAN; require the tunnel.
`[health]` for real disk policy.
- **No schema migrations.** There is no Alembic (or equivalent) in
the tree. Rollbacks rely on your backup discipline.
- **Duplicate PIN fields.** `vigilar config set-pin` writes to
`[system] arm_pin_hash`, while the web arm/disarm flow reads from
`[security] pin_hash`. Both models exist. If you set one and the
other side does not behave as expected, mirror the value manually.
## Troubleshooting

View File

@@ -11,28 +11,6 @@ from vigilar.config import VigilarConfig, load_config
from vigilar.storage.schema import metadata
@pytest.fixture(autouse=True, scope="session")
def _isolate_vigilar_config(tmp_path_factory):
"""Prevent tests from writing to the real config/vigilar.toml.
Web endpoint handlers that call `_save_and_reload()` read the target
path from the VIGILAR_CONFIG env var, falling back to the relative
`"config/vigilar.toml"`. Without this fixture, any test that exercises
such an endpoint rewrites the repo's committed config file via a
Pydantic round-trip, stripping comments and non-default fields.
"""
tmp_config = tmp_path_factory.mktemp("vigilar-config") / "vigilar.toml"
prev = os.environ.get("VIGILAR_CONFIG")
os.environ["VIGILAR_CONFIG"] = str(tmp_config)
try:
yield
finally:
if prev is None:
os.environ.pop("VIGILAR_CONFIG", None)
else:
os.environ["VIGILAR_CONFIG"] = prev
def _create_test_engine(db_path: Path):
"""Create a fresh engine for testing (bypasses the global singleton)."""
db_path.parent.mkdir(parents=True, exist_ok=True)

View File

@@ -1,31 +0,0 @@
"""Tests for `vigilar config set-pin`."""
from click.testing import CliRunner
from vigilar.alerts.pin import verify_pin
from vigilar.cli.cmd_config import config_cmd
def test_set_pin_outputs_pbkdf2_hash_under_security_section():
"""The CLI must emit a hash that alerts.pin.verify_pin can validate,
and direct the user to [security] pin_hash (not [system] arm_pin_hash)."""
runner = CliRunner()
result = runner.invoke(config_cmd, ["set-pin"], input="1234\n1234\n")
assert result.exit_code == 0, result.output
# The output must direct the user to the [security] section
assert "[security]" in result.output
assert "arm_pin_hash" not in result.output
assert "pin_hash" in result.output
# Extract the emitted hash (line starts with `pin_hash = "..."`)
hash_line = next(
line for line in result.output.splitlines() if line.strip().startswith("pin_hash")
)
hash_value = hash_line.split('"')[1]
# Round-trip: the emitted hash must accept the plaintext PIN
assert verify_pin("1234", hash_value) is True
assert verify_pin("0000", hash_value) is False
# And it must be in PBKDF2 format (not the legacy HMAC "secret:mac" format)
assert hash_value.startswith("pbkdf2_sha256$")

View File

@@ -138,49 +138,3 @@ class TestCameraConfigLocation:
from vigilar.config import CameraConfig
cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x", location="EXTERIOR")
assert cfg.location == "EXTERIOR"
def test_deprecation_warning_for_arm_pin_hash(tmp_path, caplog):
"""Loading a config that still uses the legacy [system] arm_pin_hash
must log a clear warning pointing the user at `vigilar config set-pin`."""
import logging
cfg_path = tmp_path / "legacy.toml"
cfg_path.write_text(
'[system]\n'
'arm_pin_hash = "pbkdf2_sha256$abc$def"\n'
)
with caplog.at_level(logging.WARNING):
from vigilar.config import load_config
load_config(str(cfg_path))
messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert any("arm_pin_hash" in m and "deprecated" in m.lower() for m in messages), (
f"expected deprecation warning mentioning arm_pin_hash, got: {messages}"
)
def test_no_deprecation_warning_when_security_pin_hash_set(tmp_path, caplog):
"""No warning should fire if [security] pin_hash is populated,
regardless of whether [system] arm_pin_hash is also still present.
The warning is specifically for un-migrated configs."""
import logging
cfg_path = tmp_path / "migrated.toml"
cfg_path.write_text(
'[system]\n'
'arm_pin_hash = "pbkdf2_sha256$legacy$value"\n'
'\n'
'[security]\n'
'pin_hash = "pbkdf2_sha256$current$value"\n'
)
with caplog.at_level(logging.WARNING):
from vigilar.config import load_config
load_config(str(cfg_path))
deprecation_messages = [
r.message for r in caplog.records
if r.levelno >= logging.WARNING and "arm_pin_hash" in r.message
]
assert deprecation_messages == [], (
f"deprecation warning should not fire on migrated configs, "
f"got: {deprecation_messages}"
)

View File

@@ -1,5 +1,6 @@
"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history."""
import hashlib
import time
import pytest
@@ -18,7 +19,7 @@ from vigilar.storage.queries import insert_event
def _make_config(rules=None, pin_hash=""):
return VigilarConfig(
security={"pin_hash": pin_hash},
system={"arm_pin_hash": pin_hash},
cameras=[],
sensors=[],
rules=rules or [],
@@ -26,8 +27,7 @@ def _make_config(rules=None, pin_hash=""):
def _pin_hash(pin: str) -> str:
from vigilar.alerts.pin import hash_pin
return hash_pin(pin)
return hashlib.sha256(pin.encode()).hexdigest()
# ---------------------------------------------------------------------------
@@ -429,158 +429,3 @@ class TestPetEventClassification:
assert etype == EventType.WILDLIFE_PASSIVE
assert sev == Severity.INFO
assert source == "front"
# ---------------------------------------------------------------------------
# Classified-event broadcast (for SSE bridge)
# ---------------------------------------------------------------------------
class _RecordingBus:
"""Minimal bus fake that records publishes instead of sending them."""
def __init__(self):
self.published: list[tuple[str, dict]] = []
def publish(self, topic: str, payload: dict, qos: int = 1) -> None:
self.published.append((topic, payload))
def publish_event(self, topic: str, **kwargs) -> None:
self.publish(topic, kwargs)
class _StubFSM:
state = ArmState.DISARMED
class _StubRuleEngine:
def evaluate(self, topic, payload, state):
return []
class TestEventsPublishedBroadcast:
"""EventProcessor should publish a classified-event summary to
Topics.EVENTS_PUBLISHED after inserting, so the Flask SSE bridge
can forward it to browser clients."""
def test_handle_event_publishes_classified_payload(self, test_db):
from vigilar.events.processor import EventProcessor
from vigilar.constants import Topics
processor = EventProcessor.__new__(EventProcessor)
bus = _RecordingBus()
processor._handle_event(
topic="vigilar/camera/cam1/motion/start",
payload={"ts": 12345, "detail": "x"},
engine=test_db,
fsm=_StubFSM(),
rule_engine=_StubRuleEngine(),
bus=bus,
)
published_on_bridge_topic = [
p for t, p in bus.published if t == Topics.EVENTS_PUBLISHED
]
assert len(published_on_bridge_topic) == 1, (
f"expected exactly one publish to {Topics.EVENTS_PUBLISHED}, "
f"got publishes: {bus.published}"
)
msg = published_on_bridge_topic[0]
assert msg["type"] == EventType.MOTION_START
assert msg["severity"] == Severity.WARNING
assert msg["source_id"] == "cam1"
assert "ts" in msg
assert "id" in msg and isinstance(msg["id"], int)
def test_unclassified_topic_does_not_publish(self, test_db):
"""Heartbeats and other non-event topics must not be forwarded."""
from vigilar.events.processor import EventProcessor
from vigilar.constants import Topics
processor = EventProcessor.__new__(EventProcessor)
bus = _RecordingBus()
processor._handle_event(
topic="vigilar/camera/cam1/heartbeat",
payload={"ts": 12345},
engine=test_db,
fsm=_StubFSM(),
rule_engine=_StubRuleEngine(),
bus=bus,
)
assert not any(t == Topics.EVENTS_PUBLISHED for t, _ in bus.published)
# ---------------------------------------------------------------------------
# Arm Request Dispatch
# ---------------------------------------------------------------------------
class TestArmRequestDispatch:
"""SYSTEM_ARM_REQUEST messages must reach ArmStateFSM.transition."""
def test_arm_request_calls_fsm_transition(self, test_db):
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
state = ArmState.DISARMED
def transition(self, new_state, pin="", triggered_by="system"):
calls.append((new_state, pin, triggered_by))
return True
processor._handle_arm_request(
payload={"mode": "ARMED_AWAY", "pin": "1234", "triggered_by": "web"},
fsm=FakeFSM(),
)
assert len(calls) == 1
new_state, pin, triggered_by = calls[0]
assert new_state == ArmState.ARMED_AWAY
assert pin == "1234"
assert triggered_by == "web"
def test_arm_request_ignores_bad_mode(self, test_db):
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
def transition(self, *a, **kw):
calls.append((a, kw))
return True
processor._handle_arm_request(
payload={"mode": "NONSENSE", "pin": "1234"},
fsm=FakeFSM(),
)
assert calls == []
def test_arm_request_default_triggered_by(self, test_db):
"""Omitting triggered_by must default to 'unknown' (audit-log value)."""
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
state = ArmState.DISARMED
def transition(self, new_state, pin="", triggered_by="system"):
calls.append((new_state, pin, triggered_by))
return True
processor._handle_arm_request(
payload={"mode": "DISARMED", "pin": ""},
fsm=FakeFSM(),
)
assert len(calls) == 1
assert calls[0][2] == "unknown"

View File

@@ -37,14 +37,3 @@ def test_verify_pin_handles_unicode():
stored = hash_pin("p@ss!")
assert verify_pin("p@ss!", stored) is True
assert verify_pin("p@ss?", stored) is False
def test_verify_pin_rejects_malformed_hash():
"""verify_pin must return False (not raise) on malformed stored hashes.
Fail-closed is load-bearing: a misconfigured or partially-migrated
[security] pin_hash must lock out transitions, not grant access."""
assert verify_pin("1234", "sha256:deadbeef") is False
assert verify_pin("1234", "garbage") is False
assert verify_pin("1234", "pbkdf2_sha256$only$two$extra") is False
# Wrong algo prefix
assert verify_pin("1234", "argon2id$salt$dk") is False

View File

@@ -1,45 +0,0 @@
"""End-to-end test: the CLI, FSM, and web arm flow all accept the same PIN.
Regression guard for issue #2 — the three layers previously used three
incompatible hash schemes under two different config keys."""
from click.testing import CliRunner
from vigilar.alerts.pin import hash_pin, verify_pin
from vigilar.cli.cmd_config import config_cmd
from vigilar.config import SecurityConfig, VigilarConfig
from vigilar.events.state import ArmStateFSM
from vigilar.constants import ArmState
def test_cli_output_is_accepted_by_fsm(test_db):
"""Hash produced by `vigilar config set-pin` must verify against
ArmStateFSM.verify_pin, same config key, same format."""
runner = CliRunner()
result = runner.invoke(config_cmd, ["set-pin"], input="9876\n9876\n")
assert result.exit_code == 0, result.output
hash_line = next(
line for line in result.output.splitlines()
if line.strip().startswith("pin_hash")
)
hash_value = hash_line.split('"')[1]
cfg = VigilarConfig(security=SecurityConfig(pin_hash=hash_value))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.verify_pin("9876") is True
assert fsm.verify_pin("0000") is False
def test_fsm_transitions_with_pin_from_alerts_module(test_db):
"""The alerts.pin module and ArmStateFSM agree on the hash format."""
stored = hash_pin("4242")
cfg = VigilarConfig(security=SecurityConfig(pin_hash=stored))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.ARMED_AWAY, pin="4242", triggered_by="test") is True
assert fsm.state == ArmState.ARMED_AWAY
# Same stored hash rejects the wrong PIN
assert verify_pin("0000", stored) is False

View File

@@ -1,62 +0,0 @@
"""Tests for the MQTT → Server-Sent Events bridge used by the web process."""
import json
import queue
def test_forward_event_delivers_payload_to_sse_subscribers():
"""forward_event should hand the payload to every connected SSE client."""
from vigilar.web.blueprints import events as events_bp
from vigilar.web.sse_bridge import forward_event
q: queue.Queue = queue.Queue(maxsize=10)
events_bp._sse_subscribers.append(q)
try:
payload = {
"type": "MOTION_START",
"source_id": "cam1",
"severity": "WARNING",
"ts": 1234,
}
forward_event("vigilar/events/published", payload)
raw = q.get_nowait()
assert raw.startswith("data: ")
# Strip "data: " prefix and trailing double newline
data = json.loads(raw[len("data: "):].rstrip())
assert data["type"] == "MOTION_START"
assert data["source_id"] == "cam1"
finally:
if q in events_bp._sse_subscribers:
events_bp._sse_subscribers.remove(q)
def test_start_sse_bridge_subscribes_to_events_published(monkeypatch):
"""start_sse_bridge must connect a bus and subscribe forward_event
to Topics.EVENTS_PUBLISHED — that is the single entry point for the
web process to observe classified events from the events subsystem."""
from vigilar.config import VigilarConfig
from vigilar.constants import Topics
from vigilar.web import sse_bridge
class FakeBus:
def __init__(self, config, client_id):
self.client_id = client_id
self.subscriptions: list[tuple[str, object]] = []
self.connected = False
def subscribe(self, topic, handler):
self.subscriptions.append((topic, handler))
def connect(self):
self.connected = True
monkeypatch.setattr(sse_bridge, "MessageBus", FakeBus)
bus = sse_bridge.start_sse_bridge(VigilarConfig())
assert bus.connected is True
assert len(bus.subscriptions) == 1
topic, handler = bus.subscriptions[0]
assert topic == Topics.EVENTS_PUBLISHED
assert handler is sse_bridge.forward_event

View File

@@ -1,7 +1,6 @@
"""Tests for PIN verification on arm/disarm endpoints."""
import pytest
from unittest.mock import patch
from vigilar.alerts.pin import hash_pin
from vigilar.config import VigilarConfig, SecurityConfig
from vigilar.web.app import create_app
@@ -30,55 +29,35 @@ def app_no_pin():
def test_arm_without_pin_set(app_no_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_no_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["mode"] == "ARMED_AWAY"
assert payload["pin"] == ""
with app_no_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY"})
assert rv.status_code == 200
assert rv.get_json()["ok"] is True
def test_arm_correct_pin(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "1234"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["pin"] == "1234"
with app_with_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "1234"})
assert rv.status_code == 200
assert rv.get_json()["ok"] is True
def test_arm_wrong_pin_still_accepted_by_web_fsm_rejects(app_with_pin):
"""HTTP layer no longer pre-checks the PIN — it forwards to the FSM
unconditionally. The FSM verifies and, on mismatch, logs a warning
and leaves the state unchanged."""
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "0000"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["pin"] == "0000" # forwarded verbatim — FSM will reject
def test_arm_wrong_pin(app_with_pin):
with app_with_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "0000"})
assert rv.status_code == 401
def test_disarm_correct_pin(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "1234"})
assert rv.status_code == 202
pub.assert_called_once()
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "1234"})
assert rv.status_code == 200
def test_disarm_wrong_pin_still_accepted_by_web_fsm_rejects(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "9999"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["pin"] == "9999" # forwarded verbatim — FSM will reject
def test_disarm_wrong_pin(app_with_pin):
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "9999"})
assert rv.status_code == 401
def test_reset_pin_correct_passphrase(app_with_pin):
@@ -98,35 +77,3 @@ def test_reset_pin_wrong_passphrase(app_with_pin):
"new_pin": "5678",
})
assert rv.status_code == 401
def test_arm_publishes_arm_request_on_mqtt(app_with_pin):
"""POST /system/api/arm must publish a SYSTEM_ARM_REQUEST message
carrying the mode, pin, and a 'web' triggered_by tag."""
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post(
"/system/api/arm",
json={"mode": "ARMED_AWAY", "pin": "1234"},
)
assert rv.status_code == 202
assert rv.get_json()["ok"] is True
pub.assert_called_once()
call_args = pub.call_args
# _publish_arm_request(cfg, payload) — payload is args[1] or kwargs["payload"]
payload = call_args.args[1] if len(call_args.args) > 1 else call_args.kwargs["payload"]
assert payload["mode"] == "ARMED_AWAY"
assert payload["pin"] == "1234"
assert payload["triggered_by"] == "web"
def test_disarm_publishes_arm_request(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "1234"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["mode"] == "DISARMED"

View File

@@ -102,36 +102,3 @@ def test_recordings_page_loads():
with app.test_client() as client:
resp = client.get("/recordings/")
assert resp.status_code == 200
def test_system_status_reflects_fsm_arm_state(tmp_path, monkeypatch):
"""system_status must read the current arm state from the DB,
not return a hardcoded stub. Regression guard for the web-to-FSM
async flow introduced in issue #2."""
from vigilar.config import SystemConfig, VigilarConfig
import vigilar.storage.db as db_module
from vigilar.storage.db import get_db_path
from vigilar.storage.schema import metadata
from vigilar.storage.queries import insert_arm_state
from vigilar.web.app import create_app
from sqlalchemy import create_engine
data_dir = tmp_path / "data"
data_dir.mkdir()
cfg = VigilarConfig(system=SystemConfig(data_dir=str(data_dir)))
# Build an isolated engine (bypass the module-level singleton)
db_path = get_db_path(str(data_dir))
isolated_engine = create_engine(f"sqlite:///{db_path}", echo=False)
metadata.create_all(isolated_engine)
insert_arm_state(isolated_engine, "ARMED_AWAY", "test", None)
# Patch the singleton so the blueprint's get_engine() returns our engine
monkeypatch.setattr(db_module, "_engine", isolated_engine)
app = create_app(cfg)
with app.test_client() as c:
resp = c.get("/system/status")
assert resp.status_code == 200
assert resp.get_json()["arm_state"] == "ARMED_AWAY"

View File

@@ -47,10 +47,8 @@ def show_cmd(config_path: str | None) -> None:
# Redact sensitive fields
if data.get("web", {}).get("password_hash"):
data["web"]["password_hash"] = "***"
if data.get("security", {}).get("pin_hash"):
data["security"]["pin_hash"] = "***"
if data.get("security", {}).get("recovery_passphrase_hash"):
data["security"]["recovery_passphrase_hash"] = "***"
if data.get("system", {}).get("arm_pin_hash"):
data["system"]["arm_pin_hash"] = "***"
if data.get("alerts", {}).get("webhook", {}).get("secret"):
data["alerts"]["webhook"]["secret"] = "***"
click.echo(json.dumps(data, indent=2))
@@ -62,9 +60,12 @@ def show_cmd(config_path: str | None) -> None:
@config_cmd.command("set-password")
@click.option("--config", "-c", "config_path", default=None, help="Path to vigilar.toml.")
def set_password_cmd(config_path: str | None) -> None:
"""Generate a scrypt hash for the web UI password."""
"""Generate a bcrypt hash for the web UI password."""
try:
import hashlib
password = click.prompt("Enter web UI password", hide_input=True, confirmation_prompt=True)
# Use SHA-256 hash (bcrypt requires external dep, but cryptography is available)
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
import os
@@ -81,10 +82,14 @@ def set_password_cmd(config_path: str | None) -> None:
@config_cmd.command("set-pin")
def set_pin_cmd() -> None:
"""Generate a PBKDF2 hash for the arm/disarm PIN."""
from vigilar.alerts.pin import hash_pin
"""Generate an HMAC hash for the arm/disarm PIN."""
import hashlib
import hmac
import os
pin = click.prompt("Enter arm/disarm PIN", hide_input=True, confirmation_prompt=True)
hash_str = hash_pin(pin)
click.echo("\nAdd this to your vigilar.toml [security] section:")
click.echo(f'pin_hash = "{hash_str}"')
secret = os.urandom(32)
mac = hmac.new(secret, pin.encode(), hashlib.sha256).hexdigest()
hash_str = secret.hex() + ":" + mac
click.echo(f"\nAdd this to your vigilar.toml [system] section:")
click.echo(f'arm_pin_hash = "{hash_str}"')

View File

@@ -1,6 +1,5 @@
"""Configuration loading and validation via TOML + Pydantic."""
import logging
import sys
import tomllib
from pathlib import Path
@@ -24,8 +23,6 @@ from vigilar.constants import (
CameraLocation,
)
log = logging.getLogger(__name__)
# --- Camera Config ---
class CameraConfig(BaseModel):
@@ -438,13 +435,4 @@ def load_config(path: str | Path | None = None) -> VigilarConfig:
raw["sensors.gpio"] = gpio_config
# The [[sensors]] array items remain as 'sensors' key from TOML parsing
cfg = VigilarConfig(**raw)
if cfg.system.arm_pin_hash and not cfg.security.pin_hash:
log.warning(
"DEPRECATED: [system] arm_pin_hash is ignored; the arm/disarm "
"PIN lives under [security] pin_hash. Run `vigilar config "
"set-pin` and paste the output into [security]."
)
return cfg
return VigilarConfig(**raw)

View File

@@ -210,12 +210,6 @@ class Topics:
SYSTEM_ALERT = "vigilar/system/alert"
SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
# Web-to-FSM arm/disarm request (FSM verifies the PIN and transitions)
SYSTEM_ARM_REQUEST = "vigilar/system/arm_request"
# Classified events forwarded to the web SSE bridge (see events.processor
# and web.sse_bridge).
EVENTS_PUBLISHED = "vigilar/events/published"
# Wildcard subscriptions
ALL = "vigilar/#"

View File

@@ -9,7 +9,7 @@ from sqlalchemy.engine import Engine
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import ArmState, EventType, Severity, Topics
from vigilar.constants import EventType, Severity, Topics
from vigilar.events.rules import RuleEngine
from vigilar.events.state import ArmStateFSM
from vigilar.storage.db import get_db_path, init_db
@@ -58,20 +58,12 @@ class EventProcessor:
fsm.set_bus(bus)
bus.connect()
# Subscribe to all Vigilar topics (events/motion/sensors/etc.)
# Subscribe to all Vigilar topics
def on_message(topic: str, payload: dict[str, Any]) -> None:
self._handle_event(topic, payload, engine, fsm, rule_engine, bus)
bus.subscribe_all(on_message)
# Dedicated subscription for web-originated arm/disarm requests.
# Kept separate from on_message because these are commands, not
# classifiable events.
def on_arm_request(topic: str, payload: dict[str, Any]) -> None:
self._handle_arm_request(payload, fsm)
bus.subscribe(Topics.SYSTEM_ARM_REQUEST, on_arm_request)
log.info("Event processor started")
# Main loop
@@ -111,22 +103,6 @@ class EventProcessor:
payload=payload,
)
# Broadcast a classified summary for the web SSE bridge. This is
# independent of rule actions and DB storage — the web process
# subscribes to Topics.EVENTS_PUBLISHED and forwards each message
# to connected browser clients.
bus.publish(
Topics.EVENTS_PUBLISHED,
{
"id": event_id,
"ts": int(time.time() * 1000),
"type": event_type,
"severity": severity,
"source_id": source_id,
"payload": payload,
},
)
# Insert pet/wildlife sightings
if event_type in (
EventType.PET_DETECTED, EventType.PET_ESCAPE, EventType.UNKNOWN_ANIMAL
@@ -168,26 +144,6 @@ class EventProcessor:
except Exception:
log.exception("Error processing event on %s", topic)
def _handle_arm_request(
self,
payload: dict[str, Any],
fsm: ArmStateFSM,
) -> None:
"""Handle an arm/disarm request received over MQTT.
Payload fields:
- mode: str — desired ArmState ("DISARMED", "ARMED_HOME", "ARMED_AWAY")
- pin: str — plaintext PIN (FSM verifies against security.pin_hash)
- triggered_by: str — origin tag for the audit log (e.g. "web")
"""
mode = payload.get("mode", "")
if mode not in ArmState.__members__:
log.warning("Ignoring arm request with invalid mode: %r", mode)
return
pin = payload.get("pin", "")
triggered_by = payload.get("triggered_by", "unknown")
fsm.transition(ArmState(mode), pin=pin, triggered_by=triggered_by)
def _classify_event(
self, topic: str, payload: dict[str, Any]
) -> tuple[str | None, str | None, str | None]:

View File

@@ -1,11 +1,12 @@
"""Arm state finite state machine."""
import hashlib
import hmac
import logging
import time
from sqlalchemy.engine import Engine
from vigilar.alerts.pin import verify_pin
from vigilar.config import VigilarConfig
from vigilar.constants import ArmState, EventType, Severity, Topics
from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event
@@ -18,7 +19,7 @@ class ArmStateFSM:
def __init__(self, engine: Engine, config: VigilarConfig):
self._engine = engine
self._pin_hash = config.security.pin_hash
self._pin_hash = config.system.arm_pin_hash
self._state = ArmState.DISARMED
self._bus = None
self._load_initial_state()
@@ -42,11 +43,12 @@ class ArmStateFSM:
return self._state
def verify_pin(self, pin: str) -> bool:
"""Verify a PIN against the stored PBKDF2 hash."""
"""Verify a PIN against the stored hash using HMAC comparison."""
if not self._pin_hash:
# No PIN configured — allow all transitions
return True
return verify_pin(pin, self._pin_hash)
candidate = hashlib.sha256(pin.encode()).hexdigest()
return hmac.compare_digest(candidate, self._pin_hash)
def transition(
self,
@@ -66,10 +68,9 @@ class ArmStateFSM:
old_state = self._state
self._state = new_state
# pin_hash is always None here: PBKDF2 uses a random salt per call, so
# re-hashing the pin now would produce a value unrelated to the stored
# hash, making the column useless for audit correlation. See issue #2.
insert_arm_state(self._engine, new_state.value, triggered_by, None)
# Log to database
pin_hash = hashlib.sha256(pin.encode()).hexdigest() if pin else None
insert_arm_state(self._engine, new_state.value, triggered_by, pin_hash)
# Log event
insert_event(

View File

@@ -67,27 +67,9 @@ class SubsystemProcess:
def _run_web(cfg: VigilarConfig) -> None:
"""Run the Flask web server in a subprocess."""
from vigilar.web.app import create_app
from vigilar.web.sse_bridge import start_sse_bridge
app = create_app(cfg)
# Forward classified events from MQTT to browser SSE clients. Failure
# here must not kill the web process — the UI still works without
# live updates, it just requires a page refresh.
sse_bus = None
try:
sse_bus = start_sse_bridge(cfg)
except Exception:
log.exception("Failed to start SSE bridge; live event timeline will not update")
try:
app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False)
finally:
if sse_bus is not None:
try:
sse_bus.disconnect()
except Exception:
log.exception("Error disconnecting SSE bridge bus")
app.run(host=cfg.web.host, port=cfg.web.port, debug=False, use_reloader=False)
def _run_event_processor(cfg: VigilarConfig) -> None:

View File

@@ -36,20 +36,8 @@ def _save_and_reload(new_cfg: VigilarConfig) -> None:
def system_status():
"""JSON API: overall system health."""
cfg = _get_cfg()
arm_state = "DISARMED"
try:
from vigilar.storage.db import get_db_path, get_engine
from vigilar.storage.queries import get_current_arm_state
engine = get_engine(get_db_path(cfg.system.data_dir))
stored = get_current_arm_state(engine)
if stored:
arm_state = stored
except Exception:
current_app.logger.exception("Failed to read arm state from DB")
return jsonify({
"arm_state": arm_state,
"arm_state": "DISARMED",
"ups": {"status": "UNKNOWN"},
"cameras_online": 0,
"cameras_total": len(cfg.cameras),
@@ -66,46 +54,27 @@ def settings_page():
# --- Arm/Disarm ---
def _publish_arm_request(cfg: VigilarConfig, payload: dict) -> None:
"""Publish an arm/disarm request on MQTT for the event processor to pick up."""
from vigilar.bus import MessageBus
from vigilar.constants import Topics
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-arm-request")
bus.connect()
if not bus.connected:
raise RuntimeError("MQTT broker did not accept connection within timeout")
try:
bus.publish(Topics.SYSTEM_ARM_REQUEST, payload)
finally:
bus.disconnect()
@system_bp.route("/api/arm", methods=["POST"])
def arm_system():
data = request.get_json() or {}
mode = data.get("mode", "ARMED_AWAY")
pin = data.get("pin", "")
payload = {"mode": mode, "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"ok": True, "mode": mode}), 202
cfg = _get_cfg()
pin_hash = cfg.security.pin_hash
if pin_hash and not verify_pin(pin, pin_hash):
return jsonify({"error": "Invalid PIN"}), 401
return jsonify({"ok": True, "state": mode})
@system_bp.route("/api/disarm", methods=["POST"])
def disarm_system():
data = request.get_json() or {}
pin = data.get("pin", "")
payload = {"mode": "DISARMED", "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"ok": True, "mode": "DISARMED"}), 202
cfg = _get_cfg()
pin_hash = cfg.security.pin_hash
if pin_hash and not verify_pin(pin, pin_hash):
return jsonify({"error": "Invalid PIN"}), 401
return jsonify({"ok": True, "state": "DISARMED"})
@system_bp.route("/api/reset-pin", methods=["POST"])

View File

@@ -1,36 +0,0 @@
"""Bridge classified events from MQTT to Server-Sent Events subscribers.
The events subsystem (`vigilar.events.processor.EventProcessor`) publishes
classified events to `Topics.EVENTS_PUBLISHED`. The Flask web process runs
in its own OS process, so to make the in-browser event timeline update
live it must subscribe to that topic via its own `MessageBus` client and
forward every message to `broadcast_sse_event`.
"""
import logging
from typing import Any
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import Topics
from vigilar.web.blueprints.events import broadcast_sse_event
log = logging.getLogger(__name__)
def forward_event(topic: str, payload: dict[str, Any]) -> None:
"""MQTT handler: forward a classified event to SSE subscribers."""
broadcast_sse_event(payload)
def start_sse_bridge(cfg: VigilarConfig) -> MessageBus:
"""Create an MQTT client that forwards classified events to SSE clients.
Returns the connected `MessageBus` so the caller can disconnect it on
shutdown.
"""
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-sse-bridge")
bus.subscribe(Topics.EVENTS_PUBLISHED, forward_event)
bus.connect()
log.info("SSE bridge started: subscribed to %s", Topics.EVENTS_PUBLISHED)
return bus