fix: unify PIN hashing across CLI, FSM, and web (closes #2) #7
@@ -292,10 +292,15 @@ enabled = false`, `[visitors] enabled = false`, `[highlights] enabled
|
|||||||
|
|
||||||
- `[location] latitude`, `longitude` (default `0.0`): used for sunrise
|
- `[location] latitude`, `longitude` (default `0.0`): used for sunrise
|
||||||
and sunset lookups.
|
and sunset lookups.
|
||||||
- `[security] pin_hash` (canonical arm/disarm PIN store) and
|
- `[security] pin_hash` (canonical arm/disarm PIN store): populated by
|
||||||
`recovery_passphrase_hash`: both populated by
|
`vigilar config set-pin`, which emits a PBKDF2-SHA256 hash to paste
|
||||||
`vigilar config set-pin`. The `[system] arm_pin_hash` field is
|
into the `[security]` section. The legacy `[system] arm_pin_hash`
|
||||||
deprecated; see the `[system]` section above.
|
field is deprecated; see the `[system]` section above.
|
||||||
|
- `[security] recovery_passphrase_hash`: used by the web
|
||||||
|
`/system/api/reset-pin` endpoint to authenticate PIN-reset requests.
|
||||||
|
There is no CLI helper for this field today — set it by hashing a
|
||||||
|
passphrase manually with `vigilar.alerts.pin.hash_pin` and pasting
|
||||||
|
the result into `[security]`, or leave it unset to disable recovery.
|
||||||
|
|
||||||
## CLI reference
|
## CLI reference
|
||||||
|
|
||||||
|
|||||||
@@ -157,3 +157,30 @@ def test_deprecation_warning_for_arm_pin_hash(tmp_path, caplog):
|
|||||||
assert any("arm_pin_hash" in m and "deprecated" in m.lower() for m in messages), (
|
assert any("arm_pin_hash" in m and "deprecated" in m.lower() for m in messages), (
|
||||||
f"expected deprecation warning mentioning arm_pin_hash, got: {messages}"
|
f"expected deprecation warning mentioning arm_pin_hash, got: {messages}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_deprecation_warning_when_security_pin_hash_set(tmp_path, caplog):
|
||||||
|
"""No warning should fire if [security] pin_hash is populated,
|
||||||
|
regardless of whether [system] arm_pin_hash is also still present.
|
||||||
|
The warning is specifically for un-migrated configs."""
|
||||||
|
import logging
|
||||||
|
cfg_path = tmp_path / "migrated.toml"
|
||||||
|
cfg_path.write_text(
|
||||||
|
'[system]\n'
|
||||||
|
'arm_pin_hash = "pbkdf2_sha256$legacy$value"\n'
|
||||||
|
'\n'
|
||||||
|
'[security]\n'
|
||||||
|
'pin_hash = "pbkdf2_sha256$current$value"\n'
|
||||||
|
)
|
||||||
|
with caplog.at_level(logging.WARNING):
|
||||||
|
from vigilar.config import load_config
|
||||||
|
load_config(str(cfg_path))
|
||||||
|
|
||||||
|
deprecation_messages = [
|
||||||
|
r.message for r in caplog.records
|
||||||
|
if r.levelno >= logging.WARNING and "arm_pin_hash" in r.message
|
||||||
|
]
|
||||||
|
assert deprecation_messages == [], (
|
||||||
|
f"deprecation warning should not fire on migrated configs, "
|
||||||
|
f"got: {deprecation_messages}"
|
||||||
|
)
|
||||||
|
|||||||
@@ -37,3 +37,14 @@ def test_verify_pin_handles_unicode():
|
|||||||
stored = hash_pin("p@ss!")
|
stored = hash_pin("p@ss!")
|
||||||
assert verify_pin("p@ss!", stored) is True
|
assert verify_pin("p@ss!", stored) is True
|
||||||
assert verify_pin("p@ss?", stored) is False
|
assert verify_pin("p@ss?", stored) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_pin_rejects_malformed_hash():
|
||||||
|
"""verify_pin must return False (not raise) on malformed stored hashes.
|
||||||
|
Fail-closed is load-bearing: a misconfigured or partially-migrated
|
||||||
|
[security] pin_hash must lock out transitions, not grant access."""
|
||||||
|
assert verify_pin("1234", "sha256:deadbeef") is False
|
||||||
|
assert verify_pin("1234", "garbage") is False
|
||||||
|
assert verify_pin("1234", "pbkdf2_sha256$only$two$extra") is False
|
||||||
|
# Wrong algo prefix
|
||||||
|
assert verify_pin("1234", "argon2id$salt$dk") is False
|
||||||
|
|||||||
@@ -102,3 +102,36 @@ def test_recordings_page_loads():
|
|||||||
with app.test_client() as client:
|
with app.test_client() as client:
|
||||||
resp = client.get("/recordings/")
|
resp = client.get("/recordings/")
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_system_status_reflects_fsm_arm_state(tmp_path, monkeypatch):
|
||||||
|
"""system_status must read the current arm state from the DB,
|
||||||
|
not return a hardcoded stub. Regression guard for the web-to-FSM
|
||||||
|
async flow introduced in issue #2."""
|
||||||
|
from vigilar.config import SystemConfig, VigilarConfig
|
||||||
|
import vigilar.storage.db as db_module
|
||||||
|
from vigilar.storage.db import get_db_path
|
||||||
|
from vigilar.storage.schema import metadata
|
||||||
|
from vigilar.storage.queries import insert_arm_state
|
||||||
|
from vigilar.web.app import create_app
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
|
data_dir = tmp_path / "data"
|
||||||
|
data_dir.mkdir()
|
||||||
|
cfg = VigilarConfig(system=SystemConfig(data_dir=str(data_dir)))
|
||||||
|
|
||||||
|
# Build an isolated engine (bypass the module-level singleton)
|
||||||
|
db_path = get_db_path(str(data_dir))
|
||||||
|
isolated_engine = create_engine(f"sqlite:///{db_path}", echo=False)
|
||||||
|
metadata.create_all(isolated_engine)
|
||||||
|
|
||||||
|
insert_arm_state(isolated_engine, "ARMED_AWAY", "test", None)
|
||||||
|
|
||||||
|
# Patch the singleton so the blueprint's get_engine() returns our engine
|
||||||
|
monkeypatch.setattr(db_module, "_engine", isolated_engine)
|
||||||
|
|
||||||
|
app = create_app(cfg)
|
||||||
|
with app.test_client() as c:
|
||||||
|
resp = c.get("/system/status")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.get_json()["arm_state"] == "ARMED_AWAY"
|
||||||
|
|||||||
@@ -36,8 +36,20 @@ def _save_and_reload(new_cfg: VigilarConfig) -> None:
|
|||||||
def system_status():
|
def system_status():
|
||||||
"""JSON API: overall system health."""
|
"""JSON API: overall system health."""
|
||||||
cfg = _get_cfg()
|
cfg = _get_cfg()
|
||||||
|
|
||||||
|
arm_state = "DISARMED"
|
||||||
|
try:
|
||||||
|
from vigilar.storage.db import get_db_path, get_engine
|
||||||
|
from vigilar.storage.queries import get_current_arm_state
|
||||||
|
engine = get_engine(get_db_path(cfg.system.data_dir))
|
||||||
|
stored = get_current_arm_state(engine)
|
||||||
|
if stored:
|
||||||
|
arm_state = stored
|
||||||
|
except Exception:
|
||||||
|
current_app.logger.exception("Failed to read arm state from DB")
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
"arm_state": "DISARMED",
|
"arm_state": arm_state,
|
||||||
"ups": {"status": "UNKNOWN"},
|
"ups": {"status": "UNKNOWN"},
|
||||||
"cameras_online": 0,
|
"cameras_online": 0,
|
||||||
"cameras_total": len(cfg.cameras),
|
"cameras_total": len(cfg.cameras),
|
||||||
|
|||||||
Reference in New Issue
Block a user