feat(F1): notification content mapping and Web Push sender

Add vigilar/alerts/sender.py with _CONTENT_MAP for human-readable push
notification titles/bodies, build_notification(), and send_alert() which
retrieves VAPID key, iterates push subscriptions, calls pywebpush, and
logs results to alert_log with auto-cleanup of expired (410) endpoints.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee 2026-04-03 18:01:07 -04:00
parent 602945e99d
commit 2c79e0c044
2 changed files with 135 additions and 0 deletions

38
tests/unit/test_sender.py Normal file
View File

@ -0,0 +1,38 @@
"""Tests for notification content builder and Web Push sender."""
from vigilar.alerts.sender import build_notification
from vigilar.constants import EventType, Severity
def test_build_notification_person_detected():
result = build_notification(EventType.PERSON_DETECTED, Severity.WARNING, "front_entrance", {})
assert result["title"] == "Person Detected"
assert "Front Entrance" in result["body"]
def test_build_notification_pet_escape():
result = build_notification(EventType.PET_ESCAPE, Severity.ALERT, "back_deck", {"pet_name": "Angel"})
assert result["title"] == "Pet Alert"
assert "Angel" in result["body"]
def test_build_notification_wildlife_predator():
result = build_notification(EventType.WILDLIFE_PREDATOR, Severity.CRITICAL, "front_entrance", {"species": "bear"})
assert result["title"] == "Wildlife Alert"
assert "bear" in result["body"].lower()
def test_build_notification_power_loss():
result = build_notification(EventType.POWER_LOSS, Severity.CRITICAL, "ups", {})
assert result["title"] == "Power Alert"
assert "battery" in result["body"].lower()
def test_build_notification_unknown_event_type():
result = build_notification(EventType.MOTION_START, Severity.WARNING, "cam1", {})
assert result["title"] == "Vigilar Alert"
def test_build_notification_has_url():
result = build_notification(EventType.PERSON_DETECTED, Severity.WARNING, "front", {})
assert "url" in result

97
vigilar/alerts/sender.py Normal file
View File

@ -0,0 +1,97 @@
"""Web Push notification sender with content mapping."""
import json
import logging
import os
import time
from sqlalchemy.engine import Engine
from vigilar.constants import AlertChannel, AlertStatus, EventType, Severity
log = logging.getLogger("vigilar.alerts")
_CONTENT_MAP: dict[str, tuple[str, str]] = {
EventType.PERSON_DETECTED: ("Person Detected", "Person on {source}"),
EventType.PET_ESCAPE: ("Pet Alert", "{pet_name} detected on {source} (exterior)"),
EventType.UNKNOWN_ANIMAL: ("Unknown Animal", "Unknown {species} on {source}"),
EventType.WILDLIFE_PREDATOR: ("Wildlife Alert", "{species} detected — {source}"),
EventType.WILDLIFE_NUISANCE: ("Wildlife", "{species} on {source}"),
EventType.UNKNOWN_VEHICLE_DETECTED: ("Unknown Vehicle", "Unknown vehicle on {source}"),
EventType.POWER_LOSS: ("Power Alert", "UPS on battery"),
EventType.LOW_BATTERY: ("Battery Critical", "UPS battery low"),
}
def _format_source(source_id: str) -> str:
return source_id.replace("_", " ").title() if source_id else "Unknown"
def build_notification(event_type: str, severity: str, source_id: str, payload: dict) -> dict:
title, body_template = _CONTENT_MAP.get(event_type, ("Vigilar Alert", "{source}"))
source = _format_source(source_id)
body = body_template.format(
source=source,
pet_name=payload.get("pet_name", "Pet"),
species=payload.get("species", "animal").title(),
)
return {"title": title, "body": body, "url": f"/events?source={source_id}", "severity": severity}
def _get_vapid_private_key(config) -> str | None:
key = os.environ.get("VIGILAR_VAPID_PRIVATE_KEY")
if key:
return key
key_file = config.alerts.web_push.vapid_private_key_file
if os.path.exists(key_file):
with open(key_file) as f:
return f.read().strip()
return None
def send_alert(
engine: Engine,
event_type: str,
severity: str,
source_id: str,
payload: dict,
config=None,
event_id: int | None = None,
) -> int:
from vigilar.storage.queries import delete_push_subscription, get_push_subscriptions, insert_alert_log
notification = build_notification(event_type, severity, source_id, payload)
subscriptions = get_push_subscriptions(engine)
if not subscriptions:
log.info("No push subscriptions — skipping notification for %s", event_type)
return 0
vapid_key = _get_vapid_private_key(config) if config else None
if not vapid_key:
log.warning("VAPID private key not available — cannot send push notifications")
return 0
claim_email = config.alerts.web_push.vapid_claim_email if config else "mailto:admin@vigilar.local"
sent_count = 0
for sub in subscriptions:
try:
from pywebpush import webpush
webpush(
subscription_info={
"endpoint": sub["endpoint"],
"keys": {"p256dh": sub["p256dh_key"], "auth": sub["auth_key"]},
},
data=json.dumps(notification),
vapid_private_key=vapid_key,
vapid_claims={"sub": claim_email},
)
insert_alert_log(engine, event_id, AlertChannel.WEB_PUSH, AlertStatus.SENT)
sent_count += 1
except Exception as e:
error_msg = str(e)
insert_alert_log(engine, event_id, AlertChannel.WEB_PUSH, AlertStatus.FAILED, error_msg)
if "410" in error_msg or "Gone" in error_msg:
delete_push_subscription(engine, sub["endpoint"])
return sent_count