Was: /system/api/arm verified the PIN against [security] pin_hash and
returned {ok: true} without ever calling the FSM. State never changed.
Now: the endpoint publishes a SYSTEM_ARM_REQUEST message to the local
MQTT broker. The event processor (see previous commit) picks it up,
ArmStateFSM verifies the PIN via alerts.pin.verify_pin and performs
the transition. Response is 202 Accepted; clients poll /system/status
for the new state.
Design: PIN travels over localhost-only MQTT, which matches the
existing trust boundary for the internal bus.
133 lines
5.0 KiB
Python
133 lines
5.0 KiB
Python
"""Tests for PIN verification on arm/disarm endpoints."""
|
|
|
|
import pytest
|
|
from unittest.mock import patch
|
|
from vigilar.alerts.pin import hash_pin
|
|
from vigilar.config import VigilarConfig, SecurityConfig
|
|
from vigilar.web.app import create_app
|
|
|
|
|
|
@pytest.fixture
|
|
def app_with_pin():
|
|
pin_hash = hash_pin("1234")
|
|
cfg = VigilarConfig(
|
|
security=SecurityConfig(
|
|
pin_hash=pin_hash,
|
|
recovery_passphrase_hash=hash_pin("recover123"),
|
|
)
|
|
)
|
|
app = create_app(cfg)
|
|
app.config["TESTING"] = True
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def app_no_pin():
|
|
cfg = VigilarConfig()
|
|
app = create_app(cfg)
|
|
app.config["TESTING"] = True
|
|
return app
|
|
|
|
|
|
def test_arm_without_pin_set(app_no_pin):
|
|
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
|
|
with app_no_pin.test_client() as c:
|
|
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY"})
|
|
assert rv.status_code == 202
|
|
pub.assert_called_once()
|
|
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
|
|
assert payload["mode"] == "ARMED_AWAY"
|
|
assert payload["pin"] == ""
|
|
|
|
|
|
def test_arm_correct_pin(app_with_pin):
|
|
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "1234"})
|
|
assert rv.status_code == 202
|
|
pub.assert_called_once()
|
|
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
|
|
assert payload["pin"] == "1234"
|
|
|
|
|
|
def test_arm_wrong_pin_still_accepted_by_web_fsm_rejects(app_with_pin):
|
|
"""HTTP layer no longer pre-checks the PIN — it forwards to the FSM
|
|
unconditionally. The FSM verifies and, on mismatch, logs a warning
|
|
and leaves the state unchanged."""
|
|
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post("/system/api/arm", json={"mode": "ARMED_AWAY", "pin": "0000"})
|
|
assert rv.status_code == 202
|
|
pub.assert_called_once()
|
|
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
|
|
assert payload["pin"] == "0000" # forwarded verbatim — FSM will reject
|
|
|
|
|
|
def test_disarm_correct_pin(app_with_pin):
|
|
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post("/system/api/disarm", json={"pin": "1234"})
|
|
assert rv.status_code == 202
|
|
pub.assert_called_once()
|
|
|
|
|
|
def test_disarm_wrong_pin_still_accepted_by_web_fsm_rejects(app_with_pin):
|
|
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post("/system/api/disarm", json={"pin": "9999"})
|
|
assert rv.status_code == 202
|
|
pub.assert_called_once()
|
|
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
|
|
assert payload["pin"] == "9999" # forwarded verbatim — FSM will reject
|
|
|
|
|
|
def test_reset_pin_correct_passphrase(app_with_pin):
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post("/system/api/reset-pin", json={
|
|
"recovery_passphrase": "recover123",
|
|
"new_pin": "5678",
|
|
})
|
|
assert rv.status_code == 200
|
|
assert rv.get_json()["ok"] is True
|
|
|
|
|
|
def test_reset_pin_wrong_passphrase(app_with_pin):
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post("/system/api/reset-pin", json={
|
|
"recovery_passphrase": "wrong",
|
|
"new_pin": "5678",
|
|
})
|
|
assert rv.status_code == 401
|
|
|
|
|
|
def test_arm_publishes_arm_request_on_mqtt(app_with_pin):
|
|
"""POST /system/api/arm must publish a SYSTEM_ARM_REQUEST message
|
|
carrying the mode, pin, and a 'web' triggered_by tag."""
|
|
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post(
|
|
"/system/api/arm",
|
|
json={"mode": "ARMED_AWAY", "pin": "1234"},
|
|
)
|
|
assert rv.status_code == 202
|
|
assert rv.get_json()["accepted"] is True
|
|
|
|
pub.assert_called_once()
|
|
call_args = pub.call_args
|
|
# _publish_arm_request(cfg, payload) — payload is args[1] or kwargs["payload"]
|
|
payload = call_args.args[1] if len(call_args.args) > 1 else call_args.kwargs["payload"]
|
|
assert payload["mode"] == "ARMED_AWAY"
|
|
assert payload["pin"] == "1234"
|
|
assert payload["triggered_by"] == "web"
|
|
|
|
|
|
def test_disarm_publishes_arm_request(app_with_pin):
|
|
with patch("vigilar.web.blueprints.system._publish_arm_request") as pub:
|
|
with app_with_pin.test_client() as c:
|
|
rv = c.post("/system/api/disarm", json={"pin": "1234"})
|
|
assert rv.status_code == 202
|
|
|
|
pub.assert_called_once()
|
|
payload = pub.call_args.args[1] if len(pub.call_args.args) > 1 else pub.call_args.kwargs["payload"]
|
|
assert payload["mode"] == "DISARMED"
|