Compare commits
1 Commits
94cc206fb4
...
plan/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f99472fc1a |
@@ -11,28 +11,6 @@ from vigilar.config import VigilarConfig, load_config
|
|||||||
from vigilar.storage.schema import metadata
|
from vigilar.storage.schema import metadata
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True, scope="session")
|
|
||||||
def _isolate_vigilar_config(tmp_path_factory):
|
|
||||||
"""Prevent tests from writing to the real config/vigilar.toml.
|
|
||||||
|
|
||||||
Web endpoint handlers that call `_save_and_reload()` read the target
|
|
||||||
path from the VIGILAR_CONFIG env var, falling back to the relative
|
|
||||||
`"config/vigilar.toml"`. Without this fixture, any test that exercises
|
|
||||||
such an endpoint rewrites the repo's committed config file via a
|
|
||||||
Pydantic round-trip, stripping comments and non-default fields.
|
|
||||||
"""
|
|
||||||
tmp_config = tmp_path_factory.mktemp("vigilar-config") / "vigilar.toml"
|
|
||||||
prev = os.environ.get("VIGILAR_CONFIG")
|
|
||||||
os.environ["VIGILAR_CONFIG"] = str(tmp_config)
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
if prev is None:
|
|
||||||
os.environ.pop("VIGILAR_CONFIG", None)
|
|
||||||
else:
|
|
||||||
os.environ["VIGILAR_CONFIG"] = prev
|
|
||||||
|
|
||||||
|
|
||||||
def _create_test_engine(db_path: Path):
|
def _create_test_engine(db_path: Path):
|
||||||
"""Create a fresh engine for testing (bypasses the global singleton)."""
|
"""Create a fresh engine for testing (bypasses the global singleton)."""
|
||||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|||||||
@@ -1,31 +0,0 @@
|
|||||||
"""Tests for `vigilar config set-pin`."""
|
|
||||||
|
|
||||||
from click.testing import CliRunner
|
|
||||||
|
|
||||||
from vigilar.alerts.pin import verify_pin
|
|
||||||
from vigilar.cli.cmd_config import config_cmd
|
|
||||||
|
|
||||||
|
|
||||||
def test_set_pin_outputs_pbkdf2_hash_under_security_section():
|
|
||||||
"""The CLI must emit a hash that alerts.pin.verify_pin can validate,
|
|
||||||
and direct the user to [security] pin_hash (not [system] arm_pin_hash)."""
|
|
||||||
runner = CliRunner()
|
|
||||||
result = runner.invoke(config_cmd, ["set-pin"], input="1234\n1234\n")
|
|
||||||
|
|
||||||
assert result.exit_code == 0, result.output
|
|
||||||
# The output must direct the user to the [security] section
|
|
||||||
assert "[security]" in result.output
|
|
||||||
assert "arm_pin_hash" not in result.output
|
|
||||||
assert "pin_hash" in result.output
|
|
||||||
|
|
||||||
# Extract the emitted hash (line starts with `pin_hash = "..."`)
|
|
||||||
hash_line = next(
|
|
||||||
line for line in result.output.splitlines() if line.strip().startswith("pin_hash")
|
|
||||||
)
|
|
||||||
hash_value = hash_line.split('"')[1]
|
|
||||||
|
|
||||||
# Round-trip: the emitted hash must accept the plaintext PIN
|
|
||||||
assert verify_pin("1234", hash_value) is True
|
|
||||||
assert verify_pin("0000", hash_value) is False
|
|
||||||
# And it must be in PBKDF2 format (not the legacy HMAC "secret:mac" format)
|
|
||||||
assert hash_value.startswith("pbkdf2_sha256$")
|
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history."""
|
"""Tests for the Phase 6 events subsystem: rules, arm state FSM, history."""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -18,7 +19,7 @@ from vigilar.storage.queries import insert_event
|
|||||||
|
|
||||||
def _make_config(rules=None, pin_hash=""):
|
def _make_config(rules=None, pin_hash=""):
|
||||||
return VigilarConfig(
|
return VigilarConfig(
|
||||||
security={"pin_hash": pin_hash},
|
system={"arm_pin_hash": pin_hash},
|
||||||
cameras=[],
|
cameras=[],
|
||||||
sensors=[],
|
sensors=[],
|
||||||
rules=rules or [],
|
rules=rules or [],
|
||||||
@@ -26,8 +27,7 @@ def _make_config(rules=None, pin_hash=""):
|
|||||||
|
|
||||||
|
|
||||||
def _pin_hash(pin: str) -> str:
|
def _pin_hash(pin: str) -> str:
|
||||||
from vigilar.alerts.pin import hash_pin
|
return hashlib.sha256(pin.encode()).hexdigest()
|
||||||
return hash_pin(pin)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -47,10 +47,8 @@ def show_cmd(config_path: str | None) -> None:
|
|||||||
# Redact sensitive fields
|
# Redact sensitive fields
|
||||||
if data.get("web", {}).get("password_hash"):
|
if data.get("web", {}).get("password_hash"):
|
||||||
data["web"]["password_hash"] = "***"
|
data["web"]["password_hash"] = "***"
|
||||||
if data.get("security", {}).get("pin_hash"):
|
if data.get("system", {}).get("arm_pin_hash"):
|
||||||
data["security"]["pin_hash"] = "***"
|
data["system"]["arm_pin_hash"] = "***"
|
||||||
if data.get("security", {}).get("recovery_passphrase_hash"):
|
|
||||||
data["security"]["recovery_passphrase_hash"] = "***"
|
|
||||||
if data.get("alerts", {}).get("webhook", {}).get("secret"):
|
if data.get("alerts", {}).get("webhook", {}).get("secret"):
|
||||||
data["alerts"]["webhook"]["secret"] = "***"
|
data["alerts"]["webhook"]["secret"] = "***"
|
||||||
click.echo(json.dumps(data, indent=2))
|
click.echo(json.dumps(data, indent=2))
|
||||||
@@ -84,10 +82,14 @@ def set_password_cmd(config_path: str | None) -> None:
|
|||||||
|
|
||||||
@config_cmd.command("set-pin")
|
@config_cmd.command("set-pin")
|
||||||
def set_pin_cmd() -> None:
|
def set_pin_cmd() -> None:
|
||||||
"""Generate a PBKDF2 hash for the arm/disarm PIN."""
|
"""Generate an HMAC hash for the arm/disarm PIN."""
|
||||||
from vigilar.alerts.pin import hash_pin
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import os
|
||||||
|
|
||||||
pin = click.prompt("Enter arm/disarm PIN", hide_input=True, confirmation_prompt=True)
|
pin = click.prompt("Enter arm/disarm PIN", hide_input=True, confirmation_prompt=True)
|
||||||
hash_str = hash_pin(pin)
|
secret = os.urandom(32)
|
||||||
click.echo("\nAdd this to your vigilar.toml [security] section:")
|
mac = hmac.new(secret, pin.encode(), hashlib.sha256).hexdigest()
|
||||||
click.echo(f'pin_hash = "{hash_str}"')
|
hash_str = secret.hex() + ":" + mac
|
||||||
|
click.echo(f"\nAdd this to your vigilar.toml [system] section:")
|
||||||
|
click.echo(f'arm_pin_hash = "{hash_str}"')
|
||||||
|
|||||||
@@ -210,8 +210,6 @@ class Topics:
|
|||||||
SYSTEM_ALERT = "vigilar/system/alert"
|
SYSTEM_ALERT = "vigilar/system/alert"
|
||||||
SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
|
SYSTEM_SHUTDOWN = "vigilar/system/shutdown"
|
||||||
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
|
SYSTEM_RULES_UPDATED = "vigilar/system/rules_updated"
|
||||||
# Web-to-FSM arm/disarm request (FSM verifies the PIN and transitions)
|
|
||||||
SYSTEM_ARM_REQUEST = "vigilar/system/arm_request"
|
|
||||||
|
|
||||||
# Wildcard subscriptions
|
# Wildcard subscriptions
|
||||||
ALL = "vigilar/#"
|
ALL = "vigilar/#"
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
"""Arm state finite state machine."""
|
"""Arm state finite state machine."""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from sqlalchemy.engine import Engine
|
from sqlalchemy.engine import Engine
|
||||||
|
|
||||||
from vigilar.alerts.pin import verify_pin
|
|
||||||
from vigilar.config import VigilarConfig
|
from vigilar.config import VigilarConfig
|
||||||
from vigilar.constants import ArmState, EventType, Severity, Topics
|
from vigilar.constants import ArmState, EventType, Severity, Topics
|
||||||
from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event
|
from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event
|
||||||
@@ -18,7 +19,7 @@ class ArmStateFSM:
|
|||||||
|
|
||||||
def __init__(self, engine: Engine, config: VigilarConfig):
|
def __init__(self, engine: Engine, config: VigilarConfig):
|
||||||
self._engine = engine
|
self._engine = engine
|
||||||
self._pin_hash = config.security.pin_hash
|
self._pin_hash = config.system.arm_pin_hash
|
||||||
self._state = ArmState.DISARMED
|
self._state = ArmState.DISARMED
|
||||||
self._bus = None
|
self._bus = None
|
||||||
self._load_initial_state()
|
self._load_initial_state()
|
||||||
@@ -42,11 +43,12 @@ class ArmStateFSM:
|
|||||||
return self._state
|
return self._state
|
||||||
|
|
||||||
def verify_pin(self, pin: str) -> bool:
|
def verify_pin(self, pin: str) -> bool:
|
||||||
"""Verify a PIN against the stored PBKDF2 hash."""
|
"""Verify a PIN against the stored hash using HMAC comparison."""
|
||||||
if not self._pin_hash:
|
if not self._pin_hash:
|
||||||
# No PIN configured — allow all transitions
|
# No PIN configured — allow all transitions
|
||||||
return True
|
return True
|
||||||
return verify_pin(pin, self._pin_hash)
|
candidate = hashlib.sha256(pin.encode()).hexdigest()
|
||||||
|
return hmac.compare_digest(candidate, self._pin_hash)
|
||||||
|
|
||||||
def transition(
|
def transition(
|
||||||
self,
|
self,
|
||||||
@@ -66,10 +68,9 @@ class ArmStateFSM:
|
|||||||
old_state = self._state
|
old_state = self._state
|
||||||
self._state = new_state
|
self._state = new_state
|
||||||
|
|
||||||
# pin_hash is always None here: PBKDF2 uses a random salt per call, so
|
# Log to database
|
||||||
# re-hashing the pin now would produce a value unrelated to the stored
|
pin_hash = hashlib.sha256(pin.encode()).hexdigest() if pin else None
|
||||||
# hash, making the column useless for audit correlation. See issue #2.
|
insert_arm_state(self._engine, new_state.value, triggered_by, pin_hash)
|
||||||
insert_arm_state(self._engine, new_state.value, triggered_by, None)
|
|
||||||
|
|
||||||
# Log event
|
# Log event
|
||||||
insert_event(
|
insert_event(
|
||||||
|
|||||||
Reference in New Issue
Block a user