Files
vigilar/vigilar/events/state.py
adlee-was-taken 7fda351c02 fix(events): ArmStateFSM uses PBKDF2 via alerts.pin (issue #2)
Was: unsalted SHA-256 read from [system] arm_pin_hash.
Now: PBKDF2-SHA256 600k iterations read from [security] pin_hash,
matching the web arm/disarm path and the alerts/pin module.

Also drops the redundant pin re-hash on the arm_state_log audit row
(a fresh PBKDF2 salt made the column valueless for traceability).

Part of issue #2 PIN hashing unification.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 11:26:07 -04:00

92 lines
3.0 KiB
Python

"""Arm state finite state machine."""
import logging
import time
from sqlalchemy.engine import Engine
from vigilar.alerts.pin import verify_pin as _verify_pin_hash
from vigilar.config import VigilarConfig
from vigilar.constants import ArmState, EventType, Severity, Topics
from vigilar.storage.queries import get_current_arm_state, insert_arm_state, insert_event
log = logging.getLogger(__name__)
class ArmStateFSM:
"""Manages DISARMED / ARMED_HOME / ARMED_AWAY state transitions."""
def __init__(self, engine: Engine, config: VigilarConfig):
self._engine = engine
self._pin_hash = config.security.pin_hash
self._state = ArmState.DISARMED
self._bus = None
self._load_initial_state()
def _load_initial_state(self) -> None:
"""Load the most recent arm state from the database."""
stored = get_current_arm_state(self._engine)
if stored and stored in ArmState.__members__:
self._state = ArmState(stored)
log.info("Loaded arm state from DB: %s", self._state)
else:
self._state = ArmState.DISARMED
log.info("No stored arm state, defaulting to DISARMED")
def set_bus(self, bus: object) -> None:
"""Attach a MessageBus for publishing state changes."""
self._bus = bus
@property
def state(self) -> ArmState:
return self._state
def verify_pin(self, pin: str) -> bool:
"""Verify a PIN against the stored PBKDF2 hash."""
if not self._pin_hash:
# No PIN configured — allow all transitions
return True
return _verify_pin_hash(pin, self._pin_hash)
def transition(
self,
new_state: ArmState,
pin: str = "",
triggered_by: str = "system",
) -> bool:
"""Attempt a state transition. Returns True if successful."""
if new_state == self._state:
return True
# PIN required for arming from disarmed or disarming from armed
if self._pin_hash and not self.verify_pin(pin):
log.warning("Arm state change rejected: bad PIN (by %s)", triggered_by)
return False
old_state = self._state
self._state = new_state
# Log to database (pin_hash column is no longer populated — see #2)
insert_arm_state(self._engine, new_state.value, triggered_by, None)
# Log event
insert_event(
self._engine,
event_type=EventType.ARM_STATE_CHANGED,
severity=Severity.INFO,
source_id="system",
payload={"old_state": old_state.value, "new_state": new_state.value},
)
# Publish to MQTT
if self._bus is not None:
self._bus.publish(Topics.SYSTEM_ARM_STATE, {
"ts": int(time.time() * 1000),
"state": new_state.value,
"old_state": old_state.value,
"triggered_by": triggered_by,
})
log.info("Arm state: %s -> %s (by %s)", old_state, new_state, triggered_by)
return True