fix: unify PIN hashing across CLI, FSM, and web (closes #2) #7

Merged
alee merged 16 commits from fix/issue-2-pin-unification into main 2026-04-05 16:59:51 +00:00
15 changed files with 1444 additions and 77 deletions

View File

@@ -113,8 +113,9 @@ omitted sections behave sensibly.
`.vge` files.
- `hls_dir` (default `/var/vigilar/hls`): HLS segment output.
- `log_level` (default `"INFO"`): one of DEBUG, INFO, WARNING, ERROR.
- `arm_pin_hash` (default `""`): commented out in the sample; set via
`vigilar config set-pin`.
- `arm_pin_hash` (default `""`): **deprecated.** Still parsed but
ignored at runtime. Use `[security] pin_hash` instead; run
`vigilar config set-pin` to generate the canonical hash.
### `[mqtt]`
@@ -291,11 +292,15 @@ enabled = false`, `[visitors] enabled = false`, `[highlights] enabled
- `[location] latitude`, `longitude` (default `0.0`): used for sunrise
and sunset lookups.
- `[security] pin_hash` and `recovery_passphrase_hash`: populated by
`vigilar config set-pin` (the same hash is also stored under
`[system] arm_pin_hash` on the `system` model; both fields exist
because the web UI uses `[security]` while the CLI helper prints a
`[system]` line — pick one location and stick with it).
- `[security] pin_hash` (canonical arm/disarm PIN store): populated by
`vigilar config set-pin`, which emits a PBKDF2-SHA256 hash to paste
into the `[security]` section. The legacy `[system] arm_pin_hash`
field is deprecated; see the `[system]` section above.
- `[security] recovery_passphrase_hash`: used by the web
`/system/api/reset-pin` endpoint to authenticate PIN-reset requests.
There is no CLI helper for this field today — set it by hashing a
passphrase manually with `vigilar.alerts.pin.hash_pin` and pasting
the result into `[security]`, or leave it unset to disable recovery.
## CLI reference
@@ -344,9 +349,9 @@ sudo -u vigilar /opt/vigilar/venv/bin/vigilar config show \
```
Dumps the parsed config as JSON with `web.password_hash`,
`system.arm_pin_hash`, and `alerts.webhook.secret` redacted. Useful
for confirming which defaults Pydantic applied for keys you did not
set.
`security.pin_hash`, `security.recovery_passphrase_hash`, and
`alerts.webhook.secret` redacted. Useful for confirming which
defaults Pydantic applied for keys you did not set.
### `vigilar config set-password`
@@ -365,10 +370,12 @@ prints a `password_hash = "salt_hex:key_hex"` line to paste into
sudo -u vigilar /opt/vigilar/venv/bin/vigilar config set-pin
```
Prompts for an arm/disarm PIN, generates a random 32-byte HMAC key,
computes `HMAC-SHA256(key, pin)`, and prints an `arm_pin_hash =
"secret_hex:mac_hex"` line to paste into `[system]`. Again, no file
write.
Prompts for an arm/disarm PIN, derives a salted PBKDF2-SHA256 hash
(600,000 iterations) via `vigilar.alerts.pin.hash_pin`, and prints a
`pin_hash = "pbkdf2_sha256$salt$dk"` line to paste into `[security]`.
Again, no file write. The same hash format is verified identically by
the web arm/disarm endpoint and by `ArmStateFSM` in the event
processor — there is one canonical PIN store.
## Secrets and security
@@ -388,9 +395,13 @@ write.
volume on integrity-verified storage (dm-integrity, ZFS with
checksums) or mirror to write-once media.
- The web UI password is a scrypt hash set by `vigilar config
set-password` and stored at `[web] password_hash`. The arm PIN is
an HMAC stored at `[system] arm_pin_hash` (and/or `[security]
pin_hash`).
set-password` and stored at `[web] password_hash`. The arm/disarm
PIN is a PBKDF2-SHA256 hash (600k iterations, salted) set by
`vigilar config set-pin` and stored at `[security] pin_hash`.
A legacy `[system] arm_pin_hash` field is still parsed but ignored
at runtime; if it's set and `[security] pin_hash` is empty, the
service logs a deprecation warning at startup and arm/disarm will
behave as if no PIN were configured until you re-run `set-pin`.
- TLS: `gen_cert.sh` uses `mkcert` if present, otherwise an `openssl`
ECDSA P-256 self-signed certificate valid for 3650 days with SANs
for `vigilar.local`, `localhost`, `127.0.0.1`, and the detected LAN
@@ -602,10 +613,6 @@ Do not expose port `49735` directly on the WAN; require the tunnel.
`[health]` for real disk policy.
- **No schema migrations.** There is no Alembic (or equivalent) in
the tree. Rollbacks rely on your backup discipline.
- **Duplicate PIN fields.** `vigilar config set-pin` writes to
`[system] arm_pin_hash`, while the web arm/disarm flow reads from
`[security] pin_hash`. Both models exist. If you set one and the
other side does not behave as expected, mirror the value manually.
## Troubleshooting

View File

@@ -0,0 +1,998 @@
# PIN Hashing Unification Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Unify the three incompatible PIN hashing schemes in Vigilar onto `alerts/pin.py`'s PBKDF2-SHA256 implementation, and make the web arm/disarm buttons actually transition system state. Closes Gitea issue #2.
**Architecture:** One canonical hash (`alerts.pin.hash_pin` / `verify_pin`, PBKDF2-SHA256 600k iterations, salted, format `pbkdf2_sha256$salt$dk`). One canonical config key (`[security] pin_hash`). `[system] arm_pin_hash` becomes a deprecated field with a migration warning. The web arm/disarm endpoints publish a new `vigilar/system/arm_request` MQTT topic; the event processor subscribes and hands requests to `ArmStateFSM.transition()`, which verifies the PIN via `alerts.pin.verify_pin` and performs the state transition. Live state is reported back to the web on the existing `vigilar/system/arm_state` topic (already published by the FSM) and read by the web via the DB for HTTP responses.
**Tech Stack:** Python 3.11+, Pydantic v2 (config), paho-mqtt (bus), SQLAlchemy Core (storage), pytest.
---
## Background (read before starting)
Three incompatible schemes exist today:
| Component | File:line | Hash scheme | Config key |
|---|---|---|---|
| Web arm/disarm/reset | `web/blueprints/system.py:63-64, 74-75, 88` via `alerts/pin.py` | PBKDF2-SHA256, 600k iter, salted | `[security] pin_hash` |
| `ArmStateFSM` | `events/state.py:50` | Unsalted SHA-256(pin) | `[system] arm_pin_hash` |
| CLI `vigilar config set-pin` | `cli/cmd_config.py:86-95` | HMAC-SHA256(random, pin) | `[system] arm_pin_hash` |
Consequences:
1. `vigilar config set-pin` produces output no verifier understands (HMAC-format into a key the FSM reads as raw SHA-256).
2. The web arm endpoint checks the PIN but never calls the FSM — it returns `{"ok": true}` without transitioning state. The real FSM lives in the events process and is only reachable via MQTT.
3. The FSM uses unsalted SHA-256, trivially brute-forceable for 4-digit PINs.
4. `events/state.py:46` calls `hmac.compare_digest` a "HMAC comparison" — misleading; it's timing-safe equality.
The winning scheme is `alerts/pin.py`'s PBKDF2-SHA256. `[security] pin_hash` is the winning config key (security-scoped section, already used by the web path).
## File Structure
**New:**
- None. All changes modify existing files.
**Modified:**
- `vigilar/events/state.py` — rewrite `verify_pin` to delegate to `alerts/pin.verify_pin`; change `_pin_hash` source to `config.security.pin_hash`; stop re-hashing the PIN on the audit-log row.
- `vigilar/cli/cmd_config.py` — rewrite `set_pin_cmd` to use `alerts.pin.hash_pin` and emit a `[security] pin_hash` line.
- `vigilar/config.py` — add migration warning in `load_config` when `[system] arm_pin_hash` is set but `[security] pin_hash` is empty. Field stays on `SystemConfig` for backward-compat parsing but is no longer read by any runtime code.
- `vigilar/constants.py` — add `Topics.SYSTEM_ARM_REQUEST = "vigilar/system/arm_request"`.
- `vigilar/events/processor.py` — add a dedicated subscription on `Topics.SYSTEM_ARM_REQUEST` that calls `fsm.transition()` with the verified mode, pin, and triggered_by.
- `vigilar/web/blueprints/system.py``arm_system`/`disarm_system` publish on `SYSTEM_ARM_REQUEST` (without the PIN pre-check; the FSM verifies) and return 202 (accepted) with the requested mode. A thin helper wraps bus publish.
- `tests/unit/test_events.py``_make_config` moves `pin_hash` onto `security`, not `system`. Existing FSM tests continue passing against PBKDF2 hashes.
- `tests/unit/test_system_pin.py` — web tests extend to verify an arm-request message is published on MQTT.
- `docs/operator-guide.md` — update the PIN section to describe the single canonical key, the CLI output format, and the migration note.
**New tests:**
- `tests/unit/test_pin_unification.py` — end-to-end: set a PIN via the CLI helper (`hash_pin`), verify the FSM accepts it, and verify the web arm endpoint publishes the right MQTT message.
---
## Task 1: Add `Topics.SYSTEM_ARM_REQUEST` constant
**Files:**
- Modify: `vigilar/constants.py` around line 212 (the `# System` block in `class Topics`)
- Test: covered by later tasks' tests referencing the constant
- [ ] **Step 1: Add the constant**
Edit `vigilar/constants.py`. After `SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"` and before `# Classified events forwarded to the web SSE bridge ...`, add:
```python
# Web-to-FSM arm/disarm request (FSM verifies the PIN and transitions)
SYSTEM_ARM_REQUEST = "vigilar/system/arm_request"
```
- [ ] **Step 2: Verify the constant is importable**
Run: `.venv/bin/python -c "from vigilar.constants import Topics; print(Topics.SYSTEM_ARM_REQUEST)"`
Expected: `vigilar/system/arm_request`
- [ ] **Step 3: Commit**
```bash
git add vigilar/constants.py
git commit -m "feat(constants): add Topics.SYSTEM_ARM_REQUEST
Topic for web-originated arm/disarm requests that the event processor
will subscribe to and dispatch to ArmStateFSM.transition. Part of the
PIN unification work (issue #2)."
```
---
## Task 2: Rewrite `ArmStateFSM.verify_pin` to use `alerts.pin.verify_pin` (TDD)
**Files:**
- Modify: `vigilar/events/state.py` (lines 1-52 especially)
- Test: `tests/unit/test_events.py::TestArmStateFSM`
- Helper update: `tests/unit/test_events.py::_make_config` and `_pin_hash`
The FSM currently reads `config.system.arm_pin_hash` and verifies with unsalted SHA-256. We replace both: read `config.security.pin_hash`, verify via `alerts.pin.verify_pin`.
- [ ] **Step 1: Write the failing test (PBKDF2 verify path)**
Edit `tests/unit/test_events.py`. Replace the existing `_make_config` and `_pin_hash` helpers (around lines 20-30) with:
```python
def _make_config(rules=None, pin_hash=""):
return VigilarConfig(
security={"pin_hash": pin_hash},
cameras=[],
sensors=[],
rules=rules or [],
)
def _pin_hash(pin: str) -> str:
from vigilar.alerts.pin import hash_pin
return hash_pin(pin)
```
Also remove the `import hashlib` line at the top of the file if it is no longer used anywhere else in the file (grep first — `grep -n hashlib tests/unit/test_events.py` — leave it if any other test still calls `hashlib.*`).
- [ ] **Step 2: Run FSM tests to verify they now fail**
Run: `.venv/bin/pytest -q tests/unit/test_events.py::TestArmStateFSM`
Expected: Multiple failures in `test_transition_with_valid_pin`, `test_transition_with_invalid_pin`, `test_verify_pin` — failing because `ArmStateFSM` still reads `config.system.arm_pin_hash` and computes unsalted SHA-256, so it cannot validate a PBKDF2 hash.
- [ ] **Step 3: Rewrite `ArmStateFSM.verify_pin` and the constructor**
Edit `vigilar/events/state.py`. Replace the existing imports and the top of the class. Target state of lines 1-52:
```python
"""Arm state finite state machine."""
import logging
import time
from sqlalchemy.engine import Engine
from vigilar.alerts.pin import verify_pin as _verify_pin_hash
from vigilar.config import VigilarConfig
from vigilar.constants import ArmState, EventType, Severity, Topics
from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event
log = logging.getLogger(__name__)
class ArmStateFSM:
"""Manages DISARMED / ARMED_HOME / ARMED_AWAY state transitions."""
def __init__(self, engine: Engine, config: VigilarConfig):
self._engine = engine
self._pin_hash = config.security.pin_hash
self._state = ArmState.DISARMED
self._bus = None
self._load_initial_state()
def _load_initial_state(self) -> None:
"""Load the most recent arm state from the database."""
stored = get_current_arm_state(self._engine)
if stored and stored in ArmState.__members__:
self._state = ArmState(stored)
log.info("Loaded arm state from DB: %s", self._state)
else:
self._state = ArmState.DISARMED
log.info("No stored arm state, defaulting to DISARMED")
def set_bus(self, bus: object) -> None:
"""Attach a MessageBus for publishing state changes."""
self._bus = bus
@property
def state(self) -> ArmState:
return self._state
def verify_pin(self, pin: str) -> bool:
"""Verify a PIN against the stored PBKDF2 hash."""
if not self._pin_hash:
# No PIN configured — allow all transitions
return True
return _verify_pin_hash(pin, self._pin_hash)
```
Also update `transition()` — the audit-log call at line 72 currently re-hashes the pin with unsalted SHA-256 and passes it as `pin_hash`. Replace that whole line with `None` since a fresh PBKDF2 salt would produce a different hash each time, making the audit column useless for traceability:
```python
# Log to database (pin_hash column is no longer populated — see #2)
insert_arm_state(self._engine, new_state.value, triggered_by, None)
```
Also remove the now-unused `import hashlib` and `import hmac` at the top of the file.
- [ ] **Step 4: Run FSM tests to verify they pass**
Run: `.venv/bin/pytest -q tests/unit/test_events.py::TestArmStateFSM`
Expected: all 7 tests pass.
- [ ] **Step 5: Run the full suite**
Run: `.venv/bin/pytest -q`
Expected: 364 passing (same as baseline). If any other test fails because it constructed `VigilarConfig(system={"arm_pin_hash": ...})`, update it to `security={"pin_hash": ...}` — see Task 2a.
- [ ] **Step 6: Commit**
```bash
git add vigilar/events/state.py tests/unit/test_events.py
git commit -m "fix(events): ArmStateFSM uses PBKDF2 via alerts.pin (issue #2)
Was: unsalted SHA-256 read from [system] arm_pin_hash.
Now: PBKDF2-SHA256 600k iterations read from [security] pin_hash,
matching the web arm/disarm path and the alerts/pin module.
Also drops the redundant pin re-hash on the arm_state_log audit row
(a fresh PBKDF2 salt made the column valueless for traceability).
Part of issue #2 PIN hashing unification."
```
### Task 2a: Fix any other tests referencing the old config key
- [ ] **Step 1: Find stragglers**
Run: `grep -rn "arm_pin_hash" tests/`
- [ ] **Step 2: Update each matched test**
For every match, replace the pydantic construction pattern:
```python
VigilarConfig(system={"arm_pin_hash": ...})
```
with:
```python
VigilarConfig(security={"pin_hash": ...})
```
And if the value was a plain SHA-256 hex string, replace it with `hash_pin(pin)` from `vigilar.alerts.pin`.
- [ ] **Step 3: Run the full suite**
Run: `.venv/bin/pytest -q`
Expected: 364 passing.
- [ ] **Step 4: Commit if changes were made**
```bash
git add tests/
git commit -m "test: migrate remaining PIN hash test fixtures to security.pin_hash
Sweep after ArmStateFSM rewrite — any straggling test that still
passed arm_pin_hash via SystemConfig now uses SecurityConfig.pin_hash."
```
---
## Task 3: Rewrite `vigilar config set-pin` to output PBKDF2 under `[security] pin_hash` (TDD)
**Files:**
- Modify: `vigilar/cli/cmd_config.py` (the `set_pin_cmd` function around lines 83-95)
- Modify: `vigilar/cli/cmd_config.py` (the `show_cmd` redaction around lines 50-51, so it redacts `security.pin_hash` instead of `system.arm_pin_hash`)
- Test: `tests/unit/test_cli_set_pin.py` (new file)
The CLI currently computes an HMAC with a random secret and tells the user to paste it into `[system] arm_pin_hash`. Both the format and the destination are wrong.
- [ ] **Step 1: Create the failing test**
Create `tests/unit/test_cli_set_pin.py`:
```python
"""Tests for `vigilar config set-pin`."""
from click.testing import CliRunner
from vigilar.alerts.pin import verify_pin
from vigilar.cli.cmd_config import config_cmd
def test_set_pin_outputs_pbkdf2_hash_under_security_section():
"""The CLI must emit a hash that alerts.pin.verify_pin can validate,
and direct the user to [security] pin_hash (not [system] arm_pin_hash)."""
runner = CliRunner()
result = runner.invoke(config_cmd, ["set-pin"], input="1234\n1234\n")
assert result.exit_code == 0, result.output
# The output must direct the user to the [security] section
assert "[security]" in result.output
assert "arm_pin_hash" not in result.output
assert "pin_hash" in result.output
# Extract the emitted hash (line starts with `pin_hash = "..."`)
hash_line = next(
line for line in result.output.splitlines() if line.strip().startswith("pin_hash")
)
hash_value = hash_line.split('"')[1]
# Round-trip: the emitted hash must accept the plaintext PIN
assert verify_pin("1234", hash_value) is True
assert verify_pin("0000", hash_value) is False
# And it must be in PBKDF2 format (not the legacy HMAC "secret:mac" format)
assert hash_value.startswith("pbkdf2_sha256$")
```
- [ ] **Step 2: Run the test to verify it fails**
Run: `.venv/bin/pytest -q tests/unit/test_cli_set_pin.py`
Expected: FAIL — either `"[security]"` not in output (the CLI still prints `[system]`), or `hash_value` starts with raw hex (HMAC format) not `pbkdf2_sha256$`.
- [ ] **Step 3: Rewrite `set_pin_cmd`**
Edit `vigilar/cli/cmd_config.py`. Replace the entire `set_pin_cmd` function (currently lines 83-95) with:
```python
@config_cmd.command("set-pin")
def set_pin_cmd() -> None:
"""Generate a PBKDF2 hash for the arm/disarm PIN."""
from vigilar.alerts.pin import hash_pin
pin = click.prompt("Enter arm/disarm PIN", hide_input=True, confirmation_prompt=True)
hash_str = hash_pin(pin)
click.echo("\nAdd this to your vigilar.toml [security] section:")
click.echo(f'pin_hash = "{hash_str}"')
```
Remove any now-unused imports at the top of the function (the old `hashlib`, `hmac`, `os` locals).
- [ ] **Step 4: Update the show_cmd redaction**
Still in `vigilar/cli/cmd_config.py`, in `show_cmd` (around lines 47-54), replace:
```python
if data.get("system", {}).get("arm_pin_hash"):
data["system"]["arm_pin_hash"] = "***"
```
with:
```python
if data.get("security", {}).get("pin_hash"):
data["security"]["pin_hash"] = "***"
```
- [ ] **Step 5: Run the CLI test to verify it passes**
Run: `.venv/bin/pytest -q tests/unit/test_cli_set_pin.py`
Expected: PASS.
- [ ] **Step 6: Run the full suite**
Run: `.venv/bin/pytest -q`
Expected: 365 passing (+1 from the new test).
- [ ] **Step 7: Commit**
```bash
git add vigilar/cli/cmd_config.py tests/unit/test_cli_set_pin.py
git commit -m "fix(cli): set-pin emits PBKDF2 under [security] pin_hash (issue #2)
Was: HMAC-SHA256(random, pin) written to [system] arm_pin_hash —
no verifier in the codebase accepted this output.
Now: PBKDF2-SHA256 via alerts.pin.hash_pin written to [security]
pin_hash, matching what the web and FSM paths verify against.
Also fixes show_cmd to redact the new location."
```
---
## Task 4: Deprecate `[system] arm_pin_hash` with a migration warning (TDD)
**Files:**
- Modify: `vigilar/config.py` — keep the field on `SystemConfig` but emit a warning in `load_config` when the old key is set and the new one isn't
- Test: `tests/unit/test_config.py` (new test)
We do NOT remove the field — existing deployments may have it set, and deleting it from the schema would cause `load_config` to raise a validation error on old configs. Instead: parse it, log a loud warning, ignore it at runtime.
- [ ] **Step 1: Write the failing test**
Edit `tests/unit/test_config.py`. Add a new test at the end:
```python
def test_deprecation_warning_for_arm_pin_hash(tmp_path, caplog):
"""Loading a config that still uses the legacy [system] arm_pin_hash
must log a clear warning pointing the user at `vigilar config set-pin`."""
import logging
cfg_path = tmp_path / "legacy.toml"
cfg_path.write_text(
'[system]\n'
'arm_pin_hash = "pbkdf2_sha256$abc$def"\n'
)
with caplog.at_level(logging.WARNING):
from vigilar.config import load_config
load_config(str(cfg_path))
messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert any("arm_pin_hash" in m and "deprecated" in m.lower() for m in messages), (
f"expected deprecation warning mentioning arm_pin_hash, got: {messages}"
)
```
- [ ] **Step 2: Run the test to verify it fails**
Run: `.venv/bin/pytest -q tests/unit/test_config.py::test_deprecation_warning_for_arm_pin_hash`
Expected: FAIL — no warning is currently emitted.
- [ ] **Step 3: Add the warning in `load_config`**
Open `vigilar/config.py`. Find `load_config` (starts near line 410). Immediately after the Pydantic parse step and before returning the config object, add:
```python
if cfg.system.arm_pin_hash and not cfg.security.pin_hash:
log.warning(
"DEPRECATED: [system] arm_pin_hash is ignored; the arm/disarm "
"PIN lives under [security] pin_hash. Run `vigilar config "
"set-pin` and paste the output into [security]."
)
```
Ensure `log = logging.getLogger(__name__)` exists at module top (add `import logging` and the log declaration if missing).
- [ ] **Step 4: Run the deprecation test to verify it passes**
Run: `.venv/bin/pytest -q tests/unit/test_config.py::test_deprecation_warning_for_arm_pin_hash`
Expected: PASS.
- [ ] **Step 5: Run the full suite**
Run: `.venv/bin/pytest -q`
Expected: 366 passing (+1).
- [ ] **Step 6: Commit**
```bash
git add vigilar/config.py tests/unit/test_config.py
git commit -m "feat(config): deprecation warning for [system] arm_pin_hash
If a config still has the legacy [system] arm_pin_hash set but no
[security] pin_hash, load_config logs a WARNING telling the operator
to re-run 'vigilar config set-pin'. The legacy field is still parsed
(so old configs don't fail validation) but ignored at runtime.
Part of issue #2 PIN hashing unification."
```
---
## Task 5: Processor subscribes to `SYSTEM_ARM_REQUEST` and dispatches to FSM (TDD)
**Files:**
- Modify: `vigilar/events/processor.py` (the `run` method around lines 56-65, and add a new private method `_handle_arm_request`)
- Test: `tests/unit/test_events.py::TestArmRequestDispatch` (new test class)
Today the FSM only transitions when someone inside the event-processor process calls `fsm.transition()` directly. There is no MQTT entry point. This task adds one: a dedicated subscription on `Topics.SYSTEM_ARM_REQUEST` that dispatches to a new `_handle_arm_request` method.
- [ ] **Step 1: Write the failing test**
Append to `tests/unit/test_events.py`:
```python
class TestArmRequestDispatch:
"""SYSTEM_ARM_REQUEST messages must reach ArmStateFSM.transition."""
def test_arm_request_calls_fsm_transition(self, test_db):
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
state = ArmState.DISARMED
def transition(self, new_state, pin="", triggered_by="system"):
calls.append((new_state, pin, triggered_by))
return True
processor._handle_arm_request(
payload={"mode": "ARMED_AWAY", "pin": "1234", "triggered_by": "web"},
fsm=FakeFSM(),
)
assert len(calls) == 1
new_state, pin, triggered_by = calls[0]
assert new_state == ArmState.ARMED_AWAY
assert pin == "1234"
assert triggered_by == "web"
def test_arm_request_ignores_bad_mode(self, test_db):
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
def transition(self, *a, **kw):
calls.append((a, kw))
return True
processor._handle_arm_request(
payload={"mode": "NONSENSE", "pin": "1234"},
fsm=FakeFSM(),
)
assert calls == []
```
- [ ] **Step 2: Run the test to verify it fails**
Run: `.venv/bin/pytest -q tests/unit/test_events.py::TestArmRequestDispatch`
Expected: FAIL — `AttributeError: 'EventProcessor' object has no attribute '_handle_arm_request'`.
- [ ] **Step 3: Implement `_handle_arm_request`**
Edit `vigilar/events/processor.py`. After the existing `_handle_event` method (before `_classify_event`), add:
```python
def _handle_arm_request(
self,
payload: dict[str, Any],
fsm: "ArmStateFSM",
) -> None:
"""Handle an arm/disarm request received over MQTT.
Payload fields:
- mode: str — desired ArmState ("DISARMED", "ARMED_HOME", "ARMED_AWAY")
- pin: str — plaintext PIN (FSM verifies against security.pin_hash)
- triggered_by: str — origin tag for the audit log (e.g. "web")
"""
mode = payload.get("mode", "")
if mode not in ArmState.__members__:
log.warning("Ignoring arm request with invalid mode: %r", mode)
return
pin = payload.get("pin", "")
triggered_by = payload.get("triggered_by", "unknown")
fsm.transition(ArmState(mode), pin=pin, triggered_by=triggered_by)
```
Also add `ArmState` to the imports at the top of the file — change line 12 from:
```python
from vigilar.constants import EventType, Severity, Topics
```
to:
```python
from vigilar.constants import ArmState, EventType, Severity, Topics
```
- [ ] **Step 4: Run the test to verify it passes**
Run: `.venv/bin/pytest -q tests/unit/test_events.py::TestArmRequestDispatch`
Expected: both tests pass.
- [ ] **Step 5: Wire the subscription in `run`**
Still in `vigilar/events/processor.py`, inside `run()` (around the existing `bus.subscribe_all(on_message)` call on line 65), add a dedicated subscription for arm requests. Replace the block:
```python
# Subscribe to all Vigilar topics
def on_message(topic: str, payload: dict[str, Any]) -> None:
self._handle_event(topic, payload, engine, fsm, rule_engine, bus)
bus.subscribe_all(on_message)
```
with:
```python
# Subscribe to all Vigilar topics (events/motion/sensors/etc.)
def on_message(topic: str, payload: dict[str, Any]) -> None:
self._handle_event(topic, payload, engine, fsm, rule_engine, bus)
bus.subscribe_all(on_message)
# Dedicated subscription for web-originated arm/disarm requests.
# Kept separate from on_message because these are commands, not
# classifiable events.
def on_arm_request(topic: str, payload: dict[str, Any]) -> None:
self._handle_arm_request(payload, fsm)
bus.subscribe(Topics.SYSTEM_ARM_REQUEST, on_arm_request)
```
- [ ] **Step 6: Run the full suite**
Run: `.venv/bin/pytest -q`
Expected: 368 passing (+2 from the new test class).
- [ ] **Step 7: Commit**
```bash
git add vigilar/events/processor.py tests/unit/test_events.py
git commit -m "feat(events): processor handles SYSTEM_ARM_REQUEST over MQTT
Adds _handle_arm_request and a dedicated bus.subscribe on
Topics.SYSTEM_ARM_REQUEST. Payload {mode, pin, triggered_by} is
dispatched to ArmStateFSM.transition, which verifies the PIN via
alerts.pin.verify_pin and performs the state change.
This is the missing link for web /system/api/arm to actually move
the system into an armed state. Part of issue #2."
```
---
## Task 6: Web arm/disarm endpoints publish arm requests (TDD)
**Files:**
- Modify: `vigilar/web/blueprints/system.py``arm_system` and `disarm_system` publish `SYSTEM_ARM_REQUEST` via a bus client; remove the PIN pre-check (FSM verifies)
- Test: `tests/unit/test_system_pin.py` — verify the publish happens
The web endpoint currently has two bugs: (a) it verifies the PIN but never transitions state, and (b) if the PIN is wrong it returns 401 with no audit trail. We fix both by removing the local PIN check and forwarding the request to the FSM over MQTT. The FSM verifies and logs. The web response becomes 202 Accepted regardless of PIN — the client polls `/system/status` to see the actual state (if no transition occurred, the FSM logged a warning and the state is unchanged).
**Design note:** we rely on the local Mosquitto broker being trusted (127.0.0.1 only, no exposure). The PIN travels over localhost MQTT. This matches the existing convention.
- [ ] **Step 1: Extend the existing test fixtures**
Edit `tests/unit/test_system_pin.py`. At the top of the file add:
```python
from unittest.mock import MagicMock, patch
```
- [ ] **Step 2: Write the failing test**
Append to `tests/unit/test_system_pin.py`:
```python
def test_arm_publishes_arm_request_on_mqtt(app_with_pin):
"""POST /system/api/arm must publish a SYSTEM_ARM_REQUEST message
carrying the mode, pin, and a 'web' triggered_by tag."""
from vigilar.constants import Topics
fake_bus = MagicMock()
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post(
"/system/api/arm",
json={"mode": "ARMED_AWAY", "pin": "1234"},
)
assert rv.status_code == 202
assert rv.get_json()["accepted"] is True
pub.assert_called_once()
call_args = pub.call_args
# call_args.args[0] is the cfg, args[1] is the payload
payload = call_args.args[1] if len(call_args.args) > 1 else call_args.kwargs["payload"]
assert payload["mode"] == "ARMED_AWAY"
assert payload["pin"] == "1234"
assert payload["triggered_by"] == "web"
def test_disarm_publishes_arm_request(app_with_pin):
from vigilar.constants import Topics
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "1234"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["mode"] == "DISARMED"
```
Also **update** the existing tests that previously asserted the 200/401 PIN-check behavior to match the new 202 semantics. Specifically replace the bodies of `test_arm_without_pin_set`, `test_arm_correct_pin`, `test_arm_wrong_pin`, `test_disarm_correct_pin`, `test_disarm_wrong_pin` so they mock `_publish_arm_request` and assert only that the endpoint returns 202 and forwards the payload verbatim. The "wrong pin" case no longer returns 401 at the HTTP layer — the FSM logs and ignores. Example replacement for `test_arm_wrong_pin`:
```python
def test_arm_wrong_pin_still_accepted_by_web_fsm_rejects(app_with_pin):
"""HTTP layer no longer pre-checks the PIN — it forwards to the FSM
unconditionally. The FSM verifies and, on mismatch, logs a warning
and leaves the state unchanged."""
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "0000"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["pin"] == "0000" # forwarded verbatim — FSM will reject
```
- [ ] **Step 3: Run the new test to verify it fails**
Run: `.venv/bin/pytest -q tests/unit/test_system_pin.py::test_arm_publishes_arm_request_on_mqtt`
Expected: FAIL — `AttributeError` or `ImportError`, `_publish_arm_request` does not exist.
- [ ] **Step 4: Rewrite `arm_system`, `disarm_system`, add `_publish_arm_request` helper**
Edit `vigilar/web/blueprints/system.py`. Replace the `arm_system` and `disarm_system` handlers (the block around lines 57-77) with:
```python
def _publish_arm_request(cfg: VigilarConfig, payload: dict) -> None:
"""Publish an arm/disarm request on MQTT for the event processor to pick up."""
from vigilar.bus import MessageBus
from vigilar.constants import Topics
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-arm-request")
bus.connect()
try:
bus.publish(Topics.SYSTEM_ARM_REQUEST, payload)
finally:
bus.disconnect()
@system_bp.route("/api/arm", methods=["POST"])
def arm_system():
data = request.get_json() or {}
mode = data.get("mode", "ARMED_AWAY")
pin = data.get("pin", "")
payload = {"mode": mode, "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"accepted": True, "mode": mode}), 202
@system_bp.route("/api/disarm", methods=["POST"])
def disarm_system():
data = request.get_json() or {}
pin = data.get("pin", "")
payload = {"mode": "DISARMED", "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"accepted": True, "mode": "DISARMED"}), 202
```
The `from vigilar.alerts.pin import hash_pin, verify_pin` import at the top of the file is still needed for `reset_pin`; leave it alone.
- [ ] **Step 5: Run the web PIN tests to verify they pass**
Run: `.venv/bin/pytest -q tests/unit/test_system_pin.py`
Expected: all tests in that file pass.
- [ ] **Step 6: Run the full suite**
Run: `.venv/bin/pytest -q`
Expected: all tests passing (count depends on how many existing tests you rewrote in step 2; the delta should be +2 new publish tests, same or +3 on net).
- [ ] **Step 7: Commit**
```bash
git add vigilar/web/blueprints/system.py tests/unit/test_system_pin.py
git commit -m "fix(web): arm/disarm actually transition the FSM via MQTT (issue #2)
Was: /system/api/arm verified the PIN against [security] pin_hash and
returned {ok: true} without ever calling the FSM. State never changed.
Now: the endpoint publishes a SYSTEM_ARM_REQUEST message to the local
MQTT broker. The event processor (see previous commit) picks it up,
ArmStateFSM verifies the PIN via alerts.pin.verify_pin and performs
the transition. Response is 202 Accepted; clients poll /system/status
for the new state.
Design: PIN travels over localhost-only MQTT, which matches the
existing trust boundary for the internal bus."
```
---
## Task 7: End-to-end unification test
**Files:**
- Test: `tests/unit/test_pin_unification.py` (new file)
A single test that walks the full flow using only the public API of each layer, proving that the three formerly-incompatible paths now interoperate.
- [ ] **Step 1: Write the test**
Create `tests/unit/test_pin_unification.py`:
```python
"""End-to-end test: the CLI, FSM, and web arm flow all accept the same PIN.
Regression guard for issue #2 — the three layers previously used three
incompatible hash schemes under two different config keys."""
from click.testing import CliRunner
from vigilar.alerts.pin import hash_pin, verify_pin
from vigilar.cli.cmd_config import config_cmd
from vigilar.config import SecurityConfig, VigilarConfig
from vigilar.events.state import ArmStateFSM
from vigilar.constants import ArmState
def test_cli_output_is_accepted_by_fsm(test_db):
"""Hash produced by `vigilar config set-pin` must verify against
ArmStateFSM.verify_pin, same config key, same format."""
runner = CliRunner()
result = runner.invoke(config_cmd, ["set-pin"], input="9876\n9876\n")
assert result.exit_code == 0, result.output
hash_line = next(
line for line in result.output.splitlines()
if line.strip().startswith("pin_hash")
)
hash_value = hash_line.split('"')[1]
cfg = VigilarConfig(security=SecurityConfig(pin_hash=hash_value))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.verify_pin("9876") is True
assert fsm.verify_pin("0000") is False
def test_fsm_transitions_with_pin_from_alerts_module(test_db):
"""The alerts.pin module and ArmStateFSM agree on the hash format."""
stored = hash_pin("4242")
cfg = VigilarConfig(security=SecurityConfig(pin_hash=stored))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.ARMED_AWAY, pin="4242", triggered_by="test") is True
assert fsm.state == ArmState.ARMED_AWAY
# Same stored hash rejects the wrong PIN
assert verify_pin("0000", stored) is False
```
- [ ] **Step 2: Run it**
Run: `.venv/bin/pytest -q tests/unit/test_pin_unification.py`
Expected: both tests pass.
- [ ] **Step 3: Run the full suite**
Run: `.venv/bin/pytest -q`
Expected: all tests passing.
- [ ] **Step 4: Commit**
```bash
git add tests/unit/test_pin_unification.py
git commit -m "test: end-to-end PIN unification regression guard (issue #2)"
```
---
## Task 8: Update the operator guide
**Files:**
- Modify: `docs/operator-guide.md` — replace stale sections about `arm_pin_hash` and the three-way mismatch note
There are four stale passages to rewrite. Each `- [ ]` step below shows the exact old text and the replacement. Line numbers are approximate; match on the text.
- [ ] **Step 1: Rewrite the `vigilar config set-pin` description**
Find this paragraph (around line 368):
```
Prompts for an arm/disarm PIN, generates a random 32-byte HMAC key,
computes `HMAC-SHA256(key, pin)`, and prints an `arm_pin_hash =
"secret_hex:mac_hex"` line to paste into `[system]`. Again, no file
write.
```
Replace with:
```
Prompts for an arm/disarm PIN, derives a salted PBKDF2-SHA256 hash
(600,000 iterations) via `vigilar.alerts.pin.hash_pin`, and prints a
`pin_hash = "pbkdf2_sha256$salt$dk"` line to paste into `[security]`.
Again, no file write. The same hash format is verified identically by
the web arm/disarm endpoint and by `ArmStateFSM` in the event
processor — there is one canonical PIN store.
```
- [ ] **Step 2: Fix the security-section summary of PIN storage**
Find this bullet (around line 390):
```
- The web UI password is a scrypt hash set by `vigilar config
set-password` and stored at `[web] password_hash`. The arm PIN is
an HMAC stored at `[system] arm_pin_hash` (and/or `[security]
pin_hash`).
```
Replace with:
```
- The web UI password is a scrypt hash set by `vigilar config
set-password` and stored at `[web] password_hash`. The arm/disarm
PIN is a PBKDF2-SHA256 hash (600k iterations, salted) set by
`vigilar config set-pin` and stored at `[security] pin_hash`.
A legacy `[system] arm_pin_hash` field is still parsed but ignored
at runtime; if it's set and `[security] pin_hash` is empty, the
service logs a deprecation warning at startup and arm/disarm will
behave as if no PIN were configured until you re-run `set-pin`.
```
- [ ] **Step 3: Remove the "Duplicate PIN fields" known limitation**
Find this bullet in the `## Known limitations` section (around line 605):
```
- **Duplicate PIN fields.** `vigilar config set-pin` writes to
`[system] arm_pin_hash`, while the web arm/disarm flow reads from
`[security] pin_hash`. Both models exist. If you set one and the
other side does not behave as expected, mirror the value manually.
```
Delete it entirely (including the blank line it shares with the surrounding bullets — re-check the section to ensure no double-blank is left behind).
- [ ] **Step 4: Grep for any remaining `arm_pin_hash` references in operator-guide.md**
Run: `grep -n "arm_pin_hash" docs/operator-guide.md`
For any remaining hits that describe the field as the primary PIN store (e.g. in the `## Configuration reference` section around line 116, 294-296, 347), rewrite them to describe `[security] pin_hash` as the primary key with a one-line note that `[system] arm_pin_hash` is deprecated and ignored. Leave any reference that's explicitly about the legacy migration path intact.
- [ ] **Step 5: Commit**
```bash
git add docs/operator-guide.md
git commit -m "docs(operator-guide): PIN hashing is unified (issue #2)
Describes the canonical [security] pin_hash key, the PBKDF2 format
emitted by 'vigilar config set-pin', and the deprecation warning for
the legacy [system] arm_pin_hash. Drops the three-way mismatch
known-limitation."
```
---
## Task 9: Open the PR
**Files:** none
- [ ] **Step 1: Push the branch**
```bash
git push -u origin fix/issue-2-pin-unification
```
- [ ] **Step 2: Open the PR via tea**
```bash
tea pulls create --login alee --repo alee/vigilar \
--head fix/issue-2-pin-unification --base main \
--title "fix: unify PIN hashing across CLI, FSM, and web (closes #2)" \
--description "$(cat <<'EOF'
Closes #2.
## Problem (recap from the issue)
Three incompatible PIN hashing schemes across two config keys:
| Component | Hash scheme | Config key |
|---|---|---|
| Web arm/disarm/reset (via alerts/pin.py) | PBKDF2-SHA256, 600k iterations, salted | [security] pin_hash |
| ArmStateFSM (events/state.py) | Unsalted SHA-256(pin) | [system] arm_pin_hash |
| CLI set-pin (cli/cmd_config.py) | HMAC-SHA256(random, pin) | [system] arm_pin_hash |
Consequences: set-pin output no verifier understood; web arm endpoint verified the PIN but never transitioned state; FSM used trivially brute-forceable unsalted SHA-256.
## Resolution
- **One canonical scheme:** PBKDF2-SHA256 via alerts/pin (hash_pin/verify_pin).
- **One canonical key:** [security] pin_hash.
- **Legacy field deprecated:** [system] arm_pin_hash is still parsed (old configs don't fail) but ignored at runtime. A WARNING logs at startup if it's set and the new key isn't.
- **Web arm/disarm now actually works:** POST /system/api/arm publishes a SYSTEM_ARM_REQUEST on localhost MQTT; the event processor subscribes, ArmStateFSM verifies the PIN and transitions. HTTP response is 202 Accepted; clients poll /system/status for the new state.
## Commits
- feat(constants): add Topics.SYSTEM_ARM_REQUEST
- fix(events): ArmStateFSM uses PBKDF2 via alerts.pin
- fix(cli): set-pin emits PBKDF2 under [security] pin_hash
- feat(config): deprecation warning for [system] arm_pin_hash
- feat(events): processor handles SYSTEM_ARM_REQUEST over MQTT
- fix(web): arm/disarm actually transition the FSM via MQTT
- test: end-to-end PIN unification regression guard
- docs(operator-guide): PIN hashing is unified
## Test plan
- [x] ArmStateFSM PBKDF2 verify path (tests/unit/test_events.py::TestArmStateFSM)
- [x] CLI set-pin emits PBKDF2 under [security] (tests/unit/test_cli_set_pin.py)
- [x] Deprecation warning fires for legacy key (tests/unit/test_config.py)
- [x] Processor dispatches SYSTEM_ARM_REQUEST to FSM (tests/unit/test_events.py::TestArmRequestDispatch)
- [x] Web arm/disarm publish SYSTEM_ARM_REQUEST, return 202 (tests/unit/test_system_pin.py)
- [x] End-to-end: CLI → FSM round-trip (tests/unit/test_pin_unification.py)
- [x] Full suite green; config/vigilar.toml unmodified after pytest (session fixture from #3)
## Migration note for operators
If you previously ran `vigilar config set-pin`, you will see a deprecation warning on the next start. Re-run the command and paste the new `pin_hash = "pbkdf2_sha256$..."` line into `[security]` (not `[system]`). The old `[system] arm_pin_hash` can then be deleted.
EOF
)"
```
- [ ] **Step 3: Verify no stray diffs**
Run: `git status && git diff origin/main -- config/vigilar.toml`
Expected: clean. `config/vigilar.toml` must not be modified.
---
## Success Criteria
- `vigilar config set-pin` output can be pasted into `[security] pin_hash` and verified by both `alerts.pin.verify_pin` and `ArmStateFSM.verify_pin`.
- A `POST /system/api/arm` with a correct PIN actually transitions the FSM (verifiable via `/system/status` or the `arm_state_log` table) — a `POST` with a wrong PIN leaves state unchanged and emits a WARNING log on the event processor side.
- Starting with a config that has only the legacy `[system] arm_pin_hash` set logs a deprecation WARNING.
- Full test suite passes; `config/vigilar.toml` stays clean (session fixture from #3 protects this).
- No file references `arm_pin_hash` outside of (a) the `SystemConfig` pydantic field definition, (b) the deprecation warning, and (c) historical spec docs under `docs/superpowers/specs/`.

View File

@@ -0,0 +1,31 @@
"""Tests for `vigilar config set-pin`."""
from click.testing import CliRunner
from vigilar.alerts.pin import verify_pin
from vigilar.cli.cmd_config import config_cmd
def test_set_pin_outputs_pbkdf2_hash_under_security_section():
"""The CLI must emit a hash that alerts.pin.verify_pin can validate,
and direct the user to [security] pin_hash (not [system] arm_pin_hash)."""
runner = CliRunner()
result = runner.invoke(config_cmd, ["set-pin"], input="1234\n1234\n")
assert result.exit_code == 0, result.output
# The output must direct the user to the [security] section
assert "[security]" in result.output
assert "arm_pin_hash" not in result.output
assert "pin_hash" in result.output
# Extract the emitted hash (line starts with `pin_hash = "..."`)
hash_line = next(
line for line in result.output.splitlines() if line.strip().startswith("pin_hash")
)
hash_value = hash_line.split('"')[1]
# Round-trip: the emitted hash must accept the plaintext PIN
assert verify_pin("1234", hash_value) is True
assert verify_pin("0000", hash_value) is False
# And it must be in PBKDF2 format (not the legacy HMAC "secret:mac" format)
assert hash_value.startswith("pbkdf2_sha256$")

View File

@@ -138,3 +138,49 @@ class TestCameraConfigLocation:
from vigilar.config import CameraConfig
cfg = CameraConfig(id="test", display_name="Test", rtsp_url="rtsp://x", location="EXTERIOR")
assert cfg.location == "EXTERIOR"
def test_deprecation_warning_for_arm_pin_hash(tmp_path, caplog):
"""Loading a config that still uses the legacy [system] arm_pin_hash
must log a clear warning pointing the user at `vigilar config set-pin`."""
import logging
cfg_path = tmp_path / "legacy.toml"
cfg_path.write_text(
'[system]\n'
'arm_pin_hash = "pbkdf2_sha256$abc$def"\n'
)
with caplog.at_level(logging.WARNING):
from vigilar.config import load_config
load_config(str(cfg_path))
messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert any("arm_pin_hash" in m and "deprecated" in m.lower() for m in messages), (
f"expected deprecation warning mentioning arm_pin_hash, got: {messages}"
)
def test_no_deprecation_warning_when_security_pin_hash_set(tmp_path, caplog):
"""No warning should fire if [security] pin_hash is populated,
regardless of whether [system] arm_pin_hash is also still present.
The warning is specifically for un-migrated configs."""
import logging
cfg_path = tmp_path / "migrated.toml"
cfg_path.write_text(
'[system]\n'
'arm_pin_hash = "pbkdf2_sha256$legacy$value"\n'
'\n'
'[security]\n'
'pin_hash = "pbkdf2_sha256$current$value"\n'
)
with caplog.at_level(logging.WARNING):
from vigilar.config import load_config
load_config(str(cfg_path))
deprecation_messages = [
r.message for r in caplog.records
if r.levelno >= logging.WARNING and "arm_pin_hash" in r.message
]
assert deprecation_messages == [], (
f"deprecation warning should not fire on migrated configs, "
f"got: {deprecation_messages}"
)

View File

@@ -1,6 +1,5 @@
"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history."""
import hashlib
import time
import pytest
@@ -19,7 +18,7 @@ from vigilar.storage.queries import insert_event
def _make_config(rules=None, pin_hash=""):
return VigilarConfig(
system={"arm_pin_hash": pin_hash},
security={"pin_hash": pin_hash},
cameras=[],
sensors=[],
rules=rules or [],
@@ -27,7 +26,8 @@ def _make_config(rules=None, pin_hash=""):
def _pin_hash(pin: str) -> str:
return hashlib.sha256(pin.encode()).hexdigest()
from vigilar.alerts.pin import hash_pin
return hash_pin(pin)
# ---------------------------------------------------------------------------
@@ -511,3 +511,76 @@ class TestEventsPublishedBroadcast:
)
assert not any(t == Topics.EVENTS_PUBLISHED for t, _ in bus.published)
# ---------------------------------------------------------------------------
# Arm Request Dispatch
# ---------------------------------------------------------------------------
class TestArmRequestDispatch:
"""SYSTEM_ARM_REQUEST messages must reach ArmStateFSM.transition."""
def test_arm_request_calls_fsm_transition(self, test_db):
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
state = ArmState.DISARMED
def transition(self, new_state, pin="", triggered_by="system"):
calls.append((new_state, pin, triggered_by))
return True
processor._handle_arm_request(
payload={"mode": "ARMED_AWAY", "pin": "1234", "triggered_by": "web"},
fsm=FakeFSM(),
)
assert len(calls) == 1
new_state, pin, triggered_by = calls[0]
assert new_state == ArmState.ARMED_AWAY
assert pin == "1234"
assert triggered_by == "web"
def test_arm_request_ignores_bad_mode(self, test_db):
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
def transition(self, *a, **kw):
calls.append((a, kw))
return True
processor._handle_arm_request(
payload={"mode": "NONSENSE", "pin": "1234"},
fsm=FakeFSM(),
)
assert calls == []
def test_arm_request_default_triggered_by(self, test_db):
"""Omitting triggered_by must default to 'unknown' (audit-log value)."""
from vigilar.events.processor import EventProcessor
processor = EventProcessor.__new__(EventProcessor)
calls = []
class FakeFSM:
state = ArmState.DISARMED
def transition(self, new_state, pin="", triggered_by="system"):
calls.append((new_state, pin, triggered_by))
return True
processor._handle_arm_request(
payload={"mode": "DISARMED", "pin": ""},
fsm=FakeFSM(),
)
assert len(calls) == 1
assert calls[0][2] == "unknown"

View File

@@ -37,3 +37,14 @@ def test_verify_pin_handles_unicode():
stored = hash_pin("p@ss!")
assert verify_pin("p@ss!", stored) is True
assert verify_pin("p@ss?", stored) is False
def test_verify_pin_rejects_malformed_hash():
"""verify_pin must return False (not raise) on malformed stored hashes.
Fail-closed is load-bearing: a misconfigured or partially-migrated
[security] pin_hash must lock out transitions, not grant access."""
assert verify_pin("1234", "sha256:deadbeef") is False
assert verify_pin("1234", "garbage") is False
assert verify_pin("1234", "pbkdf2_sha256$only$two$extra") is False
# Wrong algo prefix
assert verify_pin("1234", "argon2id$salt$dk") is False

View File

@@ -0,0 +1,45 @@
"""End-to-end test: the CLI, FSM, and web arm flow all accept the same PIN.
Regression guard for issue #2 — the three layers previously used three
incompatible hash schemes under two different config keys."""
from click.testing import CliRunner
from vigilar.alerts.pin import hash_pin, verify_pin
from vigilar.cli.cmd_config import config_cmd
from vigilar.config import SecurityConfig, VigilarConfig
from vigilar.events.state import ArmStateFSM
from vigilar.constants import ArmState
def test_cli_output_is_accepted_by_fsm(test_db):
"""Hash produced by `vigilar config set-pin` must verify against
ArmStateFSM.verify_pin, same config key, same format."""
runner = CliRunner()
result = runner.invoke(config_cmd, ["set-pin"], input="9876\n9876\n")
assert result.exit_code == 0, result.output
hash_line = next(
line for line in result.output.splitlines()
if line.strip().startswith("pin_hash")
)
hash_value = hash_line.split('"')[1]
cfg = VigilarConfig(security=SecurityConfig(pin_hash=hash_value))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.verify_pin("9876") is True
assert fsm.verify_pin("0000") is False
def test_fsm_transitions_with_pin_from_alerts_module(test_db):
"""The alerts.pin module and ArmStateFSM agree on the hash format."""
stored = hash_pin("4242")
cfg = VigilarConfig(security=SecurityConfig(pin_hash=stored))
fsm = ArmStateFSM(test_db, cfg)
assert fsm.transition(ArmState.ARMED_AWAY, pin="4242", triggered_by="test") is True
assert fsm.state == ArmState.ARMED_AWAY
# Same stored hash rejects the wrong PIN
assert verify_pin("0000", stored) is False

View File

@@ -1,6 +1,7 @@
"""Tests for PIN verification on arm/disarm endpoints."""
import pytest
from unittest.mock import patch
from vigilar.alerts.pin import hash_pin
from vigilar.config import VigilarConfig, SecurityConfig
from vigilar.web.app import create_app
@@ -29,35 +30,55 @@ def app_no_pin():
def test_arm_without_pin_set(app_no_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_no_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY"})
assert rv.status_code == 200
assert rv.get_json()["ok"] is True
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["mode"] == "ARMED_AWAY"
assert payload["pin"] == ""
def test_arm_correct_pin(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "1234"})
assert rv.status_code == 200
assert rv.get_json()["ok"] is True
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["pin"] == "1234"
def test_arm_wrong_pin(app_with_pin):
def test_arm_wrong_pin_still_accepted_by_web_fsm_rejects(app_with_pin):
"""HTTP layer no longer pre-checks the PIN — it forwards to the FSM
unconditionally. The FSM verifies and, on mismatch, logs a warning
and leaves the state unchanged."""
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "0000"})
assert rv.status_code == 401
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["pin"] == "0000" # forwarded verbatim — FSM will reject
def test_disarm_correct_pin(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "1234"})
assert rv.status_code == 200
assert rv.status_code == 202
pub.assert_called_once()
def test_disarm_wrong_pin(app_with_pin):
def test_disarm_wrong_pin_still_accepted_by_web_fsm_rejects(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "9999"})
assert rv.status_code == 401
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["pin"] == "9999" # forwarded verbatim — FSM will reject
def test_reset_pin_correct_passphrase(app_with_pin):
@@ -77,3 +98,35 @@ def test_reset_pin_wrong_passphrase(app_with_pin):
"new_pin": "5678",
})
assert rv.status_code == 401
def test_arm_publishes_arm_request_on_mqtt(app_with_pin):
"""POST /system/api/arm must publish a SYSTEM_ARM_REQUEST message
carrying the mode, pin, and a 'web' triggered_by tag."""
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post(
"/system/api/arm",
json={"mode": "ARMED_AWAY", "pin": "1234"},
)
assert rv.status_code == 202
assert rv.get_json()["ok"] is True
pub.assert_called_once()
call_args = pub.call_args
# _publish_arm_request(cfg, payload) — payload is args[1] or kwargs["payload"]
payload = call_args.args[1] if len(call_args.args) > 1 else call_args.kwargs["payload"]
assert payload["mode"] == "ARMED_AWAY"
assert payload["pin"] == "1234"
assert payload["triggered_by"] == "web"
def test_disarm_publishes_arm_request(app_with_pin):
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
with app_with_pin.test_client() as c:
rv = c.post("/system/api/disarm", json={"pin": "1234"})
assert rv.status_code == 202
pub.assert_called_once()
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
assert payload["mode"] == "DISARMED"

View File

@@ -102,3 +102,36 @@ def test_recordings_page_loads():
with app.test_client() as client:
resp = client.get("/recordings/")
assert resp.status_code == 200
def test_system_status_reflects_fsm_arm_state(tmp_path, monkeypatch):
"""system_status must read the current arm state from the DB,
not return a hardcoded stub. Regression guard for the web-to-FSM
async flow introduced in issue #2."""
from vigilar.config import SystemConfig, VigilarConfig
import vigilar.storage.db as db_module
from vigilar.storage.db import get_db_path
from vigilar.storage.schema import metadata
from vigilar.storage.queries import insert_arm_state
from vigilar.web.app import create_app
from sqlalchemy import create_engine
data_dir = tmp_path / "data"
data_dir.mkdir()
cfg = VigilarConfig(system=SystemConfig(data_dir=str(data_dir)))
# Build an isolated engine (bypass the module-level singleton)
db_path = get_db_path(str(data_dir))
isolated_engine = create_engine(f"sqlite:///{db_path}", echo=False)
metadata.create_all(isolated_engine)
insert_arm_state(isolated_engine, "ARMED_AWAY", "test", None)
# Patch the singleton so the blueprint's get_engine() returns our engine
monkeypatch.setattr(db_module, "_engine", isolated_engine)
app = create_app(cfg)
with app.test_client() as c:
resp = c.get("/system/status")
assert resp.status_code == 200
assert resp.get_json()["arm_state"] == "ARMED_AWAY"

View File

@@ -47,8 +47,10 @@ def show_cmd(config_path: str | None) -> None:
# Redact sensitive fields
if data.get("web", {}).get("password_hash"):
data["web"]["password_hash"] = "***"
if data.get("system", {}).get("arm_pin_hash"):
data["system"]["arm_pin_hash"] = "***"
if data.get("security", {}).get("pin_hash"):
data["security"]["pin_hash"] = "***"
if data.get("security", {}).get("recovery_passphrase_hash"):
data["security"]["recovery_passphrase_hash"] = "***"
if data.get("alerts", {}).get("webhook", {}).get("secret"):
data["alerts"]["webhook"]["secret"] = "***"
click.echo(json.dumps(data, indent=2))
@@ -79,14 +81,10 @@ def set_password_cmd(config_path: str | None) -> None:
@config_cmd.command("set-pin")
def set_pin_cmd() -> None:
"""Generate an HMAC hash for the arm/disarm PIN."""
import hashlib
import hmac
import os
"""Generate a PBKDF2 hash for the arm/disarm PIN."""
from vigilar.alerts.pin import hash_pin
pin = click.prompt("Enter arm/disarm PIN", hide_input=True, confirmation_prompt=True)
secret = os.urandom(32)
mac = hmac.new(secret, pin.encode(), hashlib.sha256).hexdigest()
hash_str = secret.hex() + ":" + mac
click.echo(f"\nAdd this to your vigilar.toml [system] section:")
click.echo(f'arm_pin_hash = "{hash_str}"')
hash_str = hash_pin(pin)
click.echo("\nAdd this to your vigilar.toml [security] section:")
click.echo(f'pin_hash = "{hash_str}"')

View File

@@ -1,5 +1,6 @@
"""Configuration loading and validation via TOML + Pydantic."""
import logging
import sys
import tomllib
from pathlib import Path
@@ -23,6 +24,8 @@ from vigilar.constants import (
CameraLocation,
)
log = logging.getLogger(__name__)
# --- Camera Config ---
class CameraConfig(BaseModel):
@@ -435,4 +438,13 @@ def load_config(path: str | Path | None = None) -> VigilarConfig:
raw["sensors.gpio"] = gpio_config
# The [[sensors]] array items remain as 'sensors' key from TOML parsing
return VigilarConfig(**raw)
cfg = VigilarConfig(**raw)
if cfg.system.arm_pin_hash and not cfg.security.pin_hash:
log.warning(
"DEPRECATED: [system] arm_pin_hash is ignored; the arm/disarm "
"PIN lives under [security] pin_hash. Run `vigilar config "
"set-pin` and paste the output into [security]."
)
return cfg

View File

@@ -210,6 +210,8 @@ class Topics:
SYSTEM_ALERT = "vigilar/system/alert"
SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
# Web-to-FSM arm/disarm request (FSM verifies the PIN and transitions)
SYSTEM_ARM_REQUEST = "vigilar/system/arm_request"
# Classified events forwarded to the web SSE bridge (see events.processor
# and web.sse_bridge).

View File

@@ -9,7 +9,7 @@ from sqlalchemy.engine import Engine
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import EventType, Severity, Topics
from vigilar.constants import ArmState, EventType, Severity, Topics
from vigilar.events.rules import RuleEngine
from vigilar.events.state import ArmStateFSM
from vigilar.storage.db import get_db_path, init_db
@@ -58,12 +58,20 @@ class EventProcessor:
fsm.set_bus(bus)
bus.connect()
# Subscribe to all Vigilar topics
# Subscribe to all Vigilar topics (events/motion/sensors/etc.)
def on_message(topic: str, payload: dict[str, Any]) -> None:
self._handle_event(topic, payload, engine, fsm, rule_engine, bus)
bus.subscribe_all(on_message)
# Dedicated subscription for web-originated arm/disarm requests.
# Kept separate from on_message because these are commands, not
# classifiable events.
def on_arm_request(topic: str, payload: dict[str, Any]) -> None:
self._handle_arm_request(payload, fsm)
bus.subscribe(Topics.SYSTEM_ARM_REQUEST, on_arm_request)
log.info("Event processor started")
# Main loop
@@ -160,6 +168,26 @@ class EventProcessor:
except Exception:
log.exception("Error processing event on %s", topic)
def _handle_arm_request(
self,
payload: dict[str, Any],
fsm: ArmStateFSM,
) -> None:
"""Handle an arm/disarm request received over MQTT.
Payload fields:
- mode: str — desired ArmState ("DISARMED", "ARMED_HOME", "ARMED_AWAY")
- pin: str — plaintext PIN (FSM verifies against security.pin_hash)
- triggered_by: str — origin tag for the audit log (e.g. "web")
"""
mode = payload.get("mode", "")
if mode not in ArmState.__members__:
log.warning("Ignoring arm request with invalid mode: %r", mode)
return
pin = payload.get("pin", "")
triggered_by = payload.get("triggered_by", "unknown")
fsm.transition(ArmState(mode), pin=pin, triggered_by=triggered_by)
def _classify_event(
self, topic: str, payload: dict[str, Any]
) -> tuple[str | None, str | None, str | None]:

View File

@@ -1,12 +1,11 @@
"""Arm state finite state machine."""
import hashlib
import hmac
import logging
import time
from sqlalchemy.engine import Engine
from vigilar.alerts.pin import verify_pin
from vigilar.config import VigilarConfig
from vigilar.constants import ArmState, EventType, Severity, Topics
from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event
@@ -19,7 +18,7 @@ class ArmStateFSM:
def __init__(self, engine: Engine, config: VigilarConfig):
self._engine = engine
self._pin_hash = config.system.arm_pin_hash
self._pin_hash = config.security.pin_hash
self._state = ArmState.DISARMED
self._bus = None
self._load_initial_state()
@@ -43,12 +42,11 @@ class ArmStateFSM:
return self._state
def verify_pin(self, pin: str) -> bool:
"""Verify a PIN against the stored hash using HMAC comparison."""
"""Verify a PIN against the stored PBKDF2 hash."""
if not self._pin_hash:
# No PIN configured — allow all transitions
return True
candidate = hashlib.sha256(pin.encode()).hexdigest()
return hmac.compare_digest(candidate, self._pin_hash)
return verify_pin(pin, self._pin_hash)
def transition(
self,
@@ -68,9 +66,10 @@ class ArmStateFSM:
old_state = self._state
self._state = new_state
# Log to database
pin_hash = hashlib.sha256(pin.encode()).hexdigest() if pin else None
insert_arm_state(self._engine, new_state.value, triggered_by, pin_hash)
# pin_hash is always None here: PBKDF2 uses a random salt per call, so
# re-hashing the pin now would produce a value unrelated to the stored
# hash, making the column useless for audit correlation. See issue #2.
insert_arm_state(self._engine, new_state.value, triggered_by, None)
# Log event
insert_event(

View File

@@ -36,8 +36,20 @@ def _save_and_reload(new_cfg: VigilarConfig) -> None:
def system_status():
"""JSON API: overall system health."""
cfg = _get_cfg()
arm_state = "DISARMED"
try:
from vigilar.storage.db import get_db_path, get_engine
from vigilar.storage.queries import get_current_arm_state
engine = get_engine(get_db_path(cfg.system.data_dir))
stored = get_current_arm_state(engine)
if stored:
arm_state = stored
except Exception:
current_app.logger.exception("Failed to read arm state from DB")
return jsonify({
"arm_state": "DISARMED",
"arm_state": arm_state,
"ups": {"status": "UNKNOWN"},
"cameras_online": 0,
"cameras_total": len(cfg.cameras),
@@ -54,27 +66,46 @@ def settings_page():
# --- Arm/Disarm ---
def _publish_arm_request(cfg: VigilarConfig, payload: dict) -> None:
"""Publish an arm/disarm request on MQTT for the event processor to pick up."""
from vigilar.bus import MessageBus
from vigilar.constants import Topics
bus = MessageBus(cfg.mqtt, client_id="vigilar-web-arm-request")
bus.connect()
if not bus.connected:
raise RuntimeError("MQTT broker did not accept connection within timeout")
try:
bus.publish(Topics.SYSTEM_ARM_REQUEST, payload)
finally:
bus.disconnect()
@system_bp.route("/api/arm", methods=["POST"])
def arm_system():
data = request.get_json() or {}
mode = data.get("mode", "ARMED_AWAY")
pin = data.get("pin", "")
cfg = _get_cfg()
pin_hash = cfg.security.pin_hash
if pin_hash and not verify_pin(pin, pin_hash):
return jsonify({"error": "Invalid PIN"}), 401
return jsonify({"ok": True, "state": mode})
payload = {"mode": mode, "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"ok": True, "mode": mode}), 202
@system_bp.route("/api/disarm", methods=["POST"])
def disarm_system():
data = request.get_json() or {}
pin = data.get("pin", "")
cfg = _get_cfg()
pin_hash = cfg.security.pin_hash
if pin_hash and not verify_pin(pin, pin_hash):
return jsonify({"error": "Invalid PIN"}), 401
return jsonify({"ok": True, "state": "DISARMED"})
payload = {"mode": "DISARMED", "pin": pin, "triggered_by": "web"}
try:
_publish_arm_request(_get_cfg(), payload)
except Exception:
current_app.logger.exception("Failed to publish arm request")
return jsonify({"error": "bus unavailable"}), 503
return jsonify({"ok": True, "mode": "DISARMED"}), 202
@system_bp.route("/api/reset-pin", methods=["POST"])