1 Commits

Author SHA1 Message Date
adlee-was-taken
f99472fc1a docs(plan): implementation plan for PIN hashing unification (issue #2)
Plan document for issue #2 — the three-way PIN hash mismatch across
CLI, events FSM, and web arm/disarm. Proposes canonicalizing on
PBKDF2-SHA256 via alerts/pin and [security] pin_hash, deprecating
[system] arm_pin_hash, and wiring web arm/disarm through MQTT to the
FSM so the web buttons actually transition state.

Nine tasks, TDD throughout. No code changes in this commit.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:18:16 -04:00
6 changed files with 23 additions and 75 deletions

View File

@@ -11,28 +11,6 @@ from vigilar.config import VigilarConfig, load_config
from vigilar.storage.schema import metadata from vigilar.storage.schema import metadata
@pytest.fixture(autouse=True, scope="session")
def _isolate_vigilar_config(tmp_path_factory):
"""Prevent tests from writing to the real config/vigilar.toml.
Web endpoint handlers that call `_save_and_reload()` read the target
path from the VIGILAR_CONFIG env var, falling back to the relative
`"config/vigilar.toml"`. Without this fixture, any test that exercises
such an endpoint rewrites the repo's committed config file via a
Pydantic round-trip, stripping comments and non-default fields.
"""
tmp_config = tmp_path_factory.mktemp("vigilar-config") / "vigilar.toml"
prev = os.environ.get("VIGILAR_CONFIG")
os.environ["VIGILAR_CONFIG"] = str(tmp_config)
try:
yield
finally:
if prev is None:
os.environ.pop("VIGILAR_CONFIG", None)
else:
os.environ["VIGILAR_CONFIG"] = prev
def _create_test_engine(db_path: Path): def _create_test_engine(db_path: Path):
"""Create a fresh engine for testing (bypasses the global singleton).""" """Create a fresh engine for testing (bypasses the global singleton)."""
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)

View File

@@ -1,31 +0,0 @@
"""Tests for `vigilar config set-pin`."""
from click.testing import CliRunner
from vigilar.alerts.pin import verify_pin
from vigilar.cli.cmd_config import config_cmd
def test_set_pin_outputs_pbkdf2_hash_under_security_section():
"""The CLI must emit a hash that alerts.pin.verify_pin can validate,
and direct the user to [security] pin_hash (not [system] arm_pin_hash)."""
runner = CliRunner()
result = runner.invoke(config_cmd, ["set-pin"], input="1234\n1234\n")
assert result.exit_code == 0, result.output
# The output must direct the user to the [security] section
assert "[security]" in result.output
assert "arm_pin_hash" not in result.output
assert "pin_hash" in result.output
# Extract the emitted hash (line starts with `pin_hash = "..."`)
hash_line = next(
line for line in result.output.splitlines() if line.strip().startswith("pin_hash")
)
hash_value = hash_line.split('"')[1]
# Round-trip: the emitted hash must accept the plaintext PIN
assert verify_pin("1234", hash_value) is True
assert verify_pin("0000", hash_value) is False
# And it must be in PBKDF2 format (not the legacy HMAC "secret:mac" format)
assert hash_value.startswith("pbkdf2_sha256$")

View File

@@ -1,5 +1,6 @@
"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history.""" """Tests for the Phase 6 events subsystem: rules, arm state FSM, history."""
import hashlib
import time import time
import pytest import pytest
@@ -18,7 +19,7 @@ from vigilar.storage.queries import insert_event
def _make_config(rules=None, pin_hash=""): def _make_config(rules=None, pin_hash=""):
return VigilarConfig( return VigilarConfig(
security={"pin_hash": pin_hash}, system={"arm_pin_hash": pin_hash},
cameras=[], cameras=[],
sensors=[], sensors=[],
rules=rules or [], rules=rules or [],
@@ -26,8 +27,7 @@ def _make_config(rules=None, pin_hash=""):
def _pin_hash(pin: str) -> str: def _pin_hash(pin: str) -> str:
from vigilar.alerts.pin import hash_pin return hashlib.sha256(pin.encode()).hexdigest()
return hash_pin(pin)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -47,10 +47,8 @@ def show_cmd(config_path: str | None) -> None:
# Redact sensitive fields # Redact sensitive fields
if data.get("web", {}).get("password_hash"): if data.get("web", {}).get("password_hash"):
data["web"]["password_hash"] = "***" data["web"]["password_hash"] = "***"
if data.get("security", {}).get("pin_hash"): if data.get("system", {}).get("arm_pin_hash"):
data["security"]["pin_hash"] = "***" data["system"]["arm_pin_hash"] = "***"
if data.get("security", {}).get("recovery_passphrase_hash"):
data["security"]["recovery_passphrase_hash"] = "***"
if data.get("alerts", {}).get("webhook", {}).get("secret"): if data.get("alerts", {}).get("webhook", {}).get("secret"):
data["alerts"]["webhook"]["secret"] = "***" data["alerts"]["webhook"]["secret"] = "***"
click.echo(json.dumps(data, indent=2)) click.echo(json.dumps(data, indent=2))
@@ -84,10 +82,14 @@ def set_password_cmd(config_path: str | None) -> None:
@config_cmd.command("set-pin") @config_cmd.command("set-pin")
def set_pin_cmd() -> None: def set_pin_cmd() -> None:
"""Generate a PBKDF2 hash for the arm/disarm PIN.""" """Generate an HMAC hash for the arm/disarm PIN."""
from vigilar.alerts.pin import hash_pin import hashlib
import hmac
import os
pin = click.prompt("Enter arm/disarm PIN", hide_input=True, confirmation_prompt=True) pin = click.prompt("Enter arm/disarm PIN", hide_input=True, confirmation_prompt=True)
hash_str = hash_pin(pin) secret = os.urandom(32)
click.echo("\nAdd this to your vigilar.toml [security] section:") mac = hmac.new(secret, pin.encode(), hashlib.sha256).hexdigest()
click.echo(f'pin_hash = "{hash_str}"') hash_str = secret.hex() + ":" + mac
click.echo(f"\nAdd this to your vigilar.toml [system] section:")
click.echo(f'arm_pin_hash = "{hash_str}"')

View File

@@ -210,8 +210,6 @@ class Topics:
SYSTEM_ALERT = "vigilar/system/alert" SYSTEM_ALERT = "vigilar/system/alert"
SYSTEM_SHUTDOWN = "vigilar/system/shutdown" SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated" SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
# Web-to-FSM arm/disarm request (FSM verifies the PIN and transitions)
SYSTEM_ARM_REQUEST = "vigilar/system/arm_request"
# Wildcard subscriptions # Wildcard subscriptions
ALL = "vigilar/#" ALL = "vigilar/#"

View File

@@ -1,11 +1,12 @@
"""Arm state finite state machine.""" """Arm state finite state machine."""
import hashlib
import hmac
import logging import logging
import time import time
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from vigilar.alerts.pin import verify_pin
from vigilar.config import VigilarConfig from vigilar.config import VigilarConfig
from vigilar.constants import ArmState, EventType, Severity, Topics from vigilar.constants import ArmState, EventType, Severity, Topics
from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event
@@ -18,7 +19,7 @@ class ArmStateFSM:
def __init__(self, engine: Engine, config: VigilarConfig): def __init__(self, engine: Engine, config: VigilarConfig):
self._engine = engine self._engine = engine
self._pin_hash = config.security.pin_hash self._pin_hash = config.system.arm_pin_hash
self._state = ArmState.DISARMED self._state = ArmState.DISARMED
self._bus = None self._bus = None
self._load_initial_state() self._load_initial_state()
@@ -42,11 +43,12 @@ class ArmStateFSM:
return self._state return self._state
def verify_pin(self, pin: str) -> bool: def verify_pin(self, pin: str) -> bool:
"""Verify a PIN against the stored PBKDF2 hash.""" """Verify a PIN against the stored hash using HMAC comparison."""
if not self._pin_hash: if not self._pin_hash:
# No PIN configured — allow all transitions # No PIN configured — allow all transitions
return True return True
return verify_pin(pin, self._pin_hash) candidate = hashlib.sha256(pin.encode()).hexdigest()
return hmac.compare_digest(candidate, self._pin_hash)
def transition( def transition(
self, self,
@@ -66,10 +68,9 @@ class ArmStateFSM:
old_state = self._state old_state = self._state
self._state = new_state self._state = new_state
# pin_hash is always None here: PBKDF2 uses a random salt per call, so # Log to database
# re-hashing the pin now would produce a value unrelated to the stored pin_hash = hashlib.sha256(pin.encode()).hexdigest() if pin else None
# hash, making the column useless for audit correlation. See issue #2. insert_arm_state(self._engine, new_state.value, triggered_by, pin_hash)
insert_arm_state(self._engine, new_state.value, triggered_by, None)
# Log event # Log event
insert_event( insert_event(