Files
vigilar/vigilar/alerts/sender.py
Aaron D. Lee cdc13e05f6 feat(Q5): package event queries and tracker state machine
Add insert/get/update queries for package_events table, and
notification content for PACKAGE_DELIVERED and PACKAGE_REMINDER events.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 18:47:40 -04:00

100 lines
3.7 KiB
Python

"""Web Push notification sender with content mapping."""
import json
import logging
import os
import time
from sqlalchemy.engine import Engine
from vigilar.constants import AlertChannel, AlertStatus, EventType, Severity
log = logging.getLogger("vigilar.alerts")
_CONTENT_MAP: dict[str, tuple[str, str]] = {
EventType.PERSON_DETECTED: ("Person Detected", "Person on {source}"),
EventType.PET_ESCAPE: ("Pet Alert", "{pet_name} detected on {source} (exterior)"),
EventType.UNKNOWN_ANIMAL: ("Unknown Animal", "Unknown {species} on {source}"),
EventType.WILDLIFE_PREDATOR: ("Wildlife Alert", "{species} detected — {source}"),
EventType.WILDLIFE_NUISANCE: ("Wildlife", "{species} on {source}"),
EventType.UNKNOWN_VEHICLE_DETECTED: ("Unknown Vehicle", "Unknown vehicle on {source}"),
EventType.POWER_LOSS: ("Power Alert", "UPS on battery"),
EventType.LOW_BATTERY: ("Battery Critical", "UPS battery low"),
EventType.PACKAGE_DELIVERED: ("Package Delivered", "Package delivered — {source}"),
EventType.PACKAGE_REMINDER: ("Package Reminder", "Package still on porch — {source}"),
}
def _format_source(source_id: str) -> str:
return source_id.replace("_", " ").title() if source_id else "Unknown"
def build_notification(event_type: str, severity: str, source_id: str, payload: dict) -> dict:
title, body_template = _CONTENT_MAP.get(event_type, ("Vigilar Alert", "{source}"))
source = _format_source(source_id)
body = body_template.format(
source=source,
pet_name=payload.get("pet_name", "Pet"),
species=payload.get("species", "animal").title(),
)
return {"title": title, "body": body, "url": f"/events?source={source_id}", "severity": severity}
def _get_vapid_private_key(config) -> str | None:
key = os.environ.get("VIGILAR_VAPID_PRIVATE_KEY")
if key:
return key
key_file = config.alerts.web_push.vapid_private_key_file
if os.path.exists(key_file):
with open(key_file) as f:
return f.read().strip()
return None
def send_alert(
engine: Engine,
event_type: str,
severity: str,
source_id: str,
payload: dict,
config=None,
event_id: int | None = None,
) -> int:
from vigilar.storage.queries import delete_push_subscription, get_push_subscriptions, insert_alert_log
notification = build_notification(event_type, severity, source_id, payload)
subscriptions = get_push_subscriptions(engine)
if not subscriptions:
log.info("No push subscriptions — skipping notification for %s", event_type)
return 0
vapid_key = _get_vapid_private_key(config) if config else None
if not vapid_key:
log.warning("VAPID private key not available — cannot send push notifications")
return 0
claim_email = config.alerts.web_push.vapid_claim_email if config else "mailto:admin@vigilar.local"
sent_count = 0
for sub in subscriptions:
try:
from pywebpush import webpush
webpush(
subscription_info={
"endpoint": sub["endpoint"],
"keys": {"p256dh": sub["p256dh_key"], "auth": sub["auth_key"]},
},
data=json.dumps(notification),
vapid_private_key=vapid_key,
vapid_claims={"sub": claim_email},
)
insert_alert_log(engine, event_id, AlertChannel.WEB_PUSH, AlertStatus.SENT)
sent_count += 1
except Exception as e:
error_msg = str(e)
insert_alert_log(engine, event_id, AlertChannel.WEB_PUSH, AlertStatus.FAILED, error_msg)
if "410" in error_msg or "Gone" in error_msg:
delete_push_subscription(engine, sub["endpoint"])
return sent_count