vigilar/vigilar/presence/monitor.py
Aaron D. Lee 8314a61815 Add presence detection, person/vehicle AI detection, health monitoring
Task 1 — Presence: ping family phones, derive household state
(EMPTY/KIDS_HOME/ADULTS_HOME/ALL_HOME), configurable departure delay,
per-member roles, auto-arm actions via MQTT.

Task 2 — Detection: MobileNet-SSD v2 via OpenCV DNN for person/vehicle
classification. Vehicle color/size fingerprinting for known car matching.
Zone-based filtering per camera. Model download script.

Task 3 — Health: periodic disk/MQTT/subsystem checks, auto-prune oldest
non-starred recordings on disk pressure, daily digest builder.

126 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 00:06:45 -04:00

114 lines
3.6 KiB
Python

"""Presence monitor — pings family phones to determine who's home."""
import logging
import signal
import subprocess
import time
from vigilar.bus import MessageBus
from vigilar.config import VigilarConfig
from vigilar.constants import HouseholdState, Topics
from vigilar.presence.models import MemberPresence, derive_household_state
log = logging.getLogger(__name__)
def ping_host(ip: str, timeout_s: int = 2) -> bool:
try:
result = subprocess.run(
["ping", "-c", "1", "-W", str(timeout_s), ip],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=timeout_s + 1,
)
return result.returncode == 0
except subprocess.TimeoutExpired:
return False
except FileNotFoundError:
log.error("ping command not found")
return False
def should_mark_away(last_seen: float, departure_delay_m: int) -> bool:
elapsed_m = (time.monotonic() - last_seen) / 60
return elapsed_m >= departure_delay_m
class PresenceMonitor:
def __init__(self, config: VigilarConfig):
self._cfg = config.presence
self._mqtt_cfg = config.mqtt
self._members: list[MemberPresence] = []
self._last_household_state = HouseholdState.EMPTY
self._bus: MessageBus | None = None
for m in self._cfg.members:
self._members.append(MemberPresence(
name=m.name, role=m.role, is_home=False, last_seen=0,
))
def run(self) -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [presence] %(levelname)s: %(message)s",
)
self._bus = MessageBus(self._mqtt_cfg, client_id="presence-monitor")
self._bus.connect()
shutdown = False
def handle_signal(signum, frame):
nonlocal shutdown
shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
log.info("Presence monitor started, tracking %d members", len(self._members))
while not shutdown:
self._poll_once()
for _ in range(self._cfg.ping_interval_s):
if shutdown:
break
time.sleep(1)
if self._bus:
self._bus.disconnect()
log.info("Presence monitor stopped")
def _poll_once(self) -> None:
for member in self._members:
reachable = ping_host(member.ip)
if reachable:
member.is_home = True
member.last_seen = time.monotonic()
elif member.is_home:
if should_mark_away(member.last_seen, self._cfg.departure_delay_m):
member.is_home = False
log.info("%s departed", member.name)
if self._bus:
self._bus.publish_event(
Topics.presence_member(member.name),
state="HOME" if member.is_home else "AWAY",
name=member.name,
role=member.role,
)
new_state = derive_household_state(self._members)
if new_state != self._last_household_state:
log.info("Household state: %s -> %s", self._last_household_state, new_state)
self._last_household_state = new_state
if self._bus:
members_dict = {m.name: "HOME" if m.is_home else "AWAY" for m in self._members}
self._bus.publish_event(
Topics.PRESENCE_STATUS,
household=new_state.value,
members=members_dict,
)
def run_presence_monitor(config: VigilarConfig) -> None:
monitor = PresenceMonitor(config)
monitor.run()