feat(Q1): highlight reel event scoring and FFmpeg clip assembly

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee 2026-04-03 19:07:12 -04:00
parent b4dbb41624
commit 622af22642
3 changed files with 204 additions and 0 deletions

36
tests/unit/test_reel.py Normal file
View File

@ -0,0 +1,36 @@
import datetime
import time
import pytest
from vigilar.highlights.reel import score_event, select_top_events
from vigilar.constants import EventType
from vigilar.storage.queries import insert_event
from vigilar.config import HighlightsConfig
def test_score_pet_escape():
assert score_event(EventType.PET_ESCAPE, {}) == 10
def test_score_wildlife_predator():
assert score_event(EventType.WILDLIFE_PREDATOR, {}) == 9
def test_score_person_detected():
assert score_event(EventType.PERSON_DETECTED, {}) == 4
def test_score_unknown_returns_zero():
assert score_event(EventType.MOTION_START, {}) == 0
def test_score_pet_high_motion():
assert score_event(EventType.PET_DETECTED, {"motion_confidence": 0.9}) == 6
def test_score_pet_normal():
assert score_event(EventType.PET_DETECTED, {"motion_confidence": 0.3}) == 1
def test_select_top_events(test_db):
today = datetime.date.today()
insert_event(test_db, EventType.PET_ESCAPE, "ALERT", source_id="cam1", payload={"pet_name": "Angel"})
insert_event(test_db, EventType.PERSON_DETECTED, "WARNING", source_id="cam1")
insert_event(test_db, EventType.WILDLIFE_PREDATOR, "CRITICAL", source_id="cam2", payload={"species": "bear"})
insert_event(test_db, EventType.MOTION_START, "WARNING", source_id="cam1")
config = HighlightsConfig(max_clips=3)
events_list = select_top_events(test_db, today, config)
assert len(events_list) <= 3
assert events_list[0]["type"] in (EventType.PET_ESCAPE, EventType.WILDLIFE_PREDATOR)

View File

168
vigilar/highlights/reel.py Normal file
View File

@ -0,0 +1,168 @@
"""Daily highlight reel generator — event scoring and clip assembly."""
import datetime
import json
import logging
import subprocess
import time
from pathlib import Path
from sqlalchemy import desc, select
from sqlalchemy.engine import Engine
from vigilar.config import HighlightsConfig
from vigilar.constants import EventType, RecordingTrigger
from vigilar.storage.schema import events, recordings
log = logging.getLogger(__name__)
_SCORE_MAP = {
EventType.PET_ESCAPE: 10,
EventType.WILDLIFE_PREDATOR: 9,
EventType.UNKNOWN_ANIMAL: 7,
EventType.WILDLIFE_NUISANCE: 5,
EventType.PERSON_DETECTED: 4,
EventType.UNKNOWN_VEHICLE_DETECTED: 4,
EventType.WILDLIFE_PASSIVE: 3,
EventType.VEHICLE_DETECTED: 2,
EventType.PET_DETECTED: 1,
}
ZOOMIE_THRESHOLD = 0.8
def score_event(event_type: str, payload: dict) -> int:
base = _SCORE_MAP.get(event_type, 0)
if event_type == EventType.PET_DETECTED:
if payload.get("motion_confidence", 0) >= ZOOMIE_THRESHOLD:
return 6
return base
def select_top_events(engine: Engine, date: datetime.date, config: HighlightsConfig) -> list[dict]:
day_start_ms = int(datetime.datetime.combine(date, datetime.time.min).timestamp() * 1000)
day_end_ms = int(datetime.datetime.combine(
date + datetime.timedelta(days=1), datetime.time.min).timestamp() * 1000)
query = select(events).where(
events.c.ts >= day_start_ms, events.c.ts < day_end_ms
).order_by(desc(events.c.ts)).limit(500)
if config.cameras:
query = query.where(events.c.source_id.in_(config.cameras))
if config.event_types:
query = query.where(events.c.type.in_(config.event_types))
with engine.connect() as conn:
rows = [dict(r) for r in conn.execute(query).mappings().all()]
scored = []
for row in rows:
payload = json.loads(row["payload"]) if row["payload"] else {}
s = score_event(row["type"], payload)
if s > 0:
scored.append({**row, "_score": s, "_payload": payload})
scored.sort(key=lambda x: x["_score"], reverse=True)
return scored[:config.max_clips]
def generate_daily_reel(
engine: Engine, recordings_dir: str, date: datetime.date, config: HighlightsConfig,
) -> str | None:
top_events = select_top_events(engine, date, config)
if not top_events:
log.info("No events to highlight for %s", date)
return None
highlights_dir = Path(recordings_dir) / "highlights"
highlights_dir.mkdir(parents=True, exist_ok=True)
output_path = highlights_dir / f"{date.isoformat()}.mp4"
clips = []
for evt in top_events:
clip_path = _extract_clip(engine, evt, config.clip_duration_s, recordings_dir)
if clip_path:
clips.append(clip_path)
if not clips:
log.warning("No clips extracted for %s", date)
return None
_concatenate_clips(clips, str(output_path))
for clip in clips:
Path(clip).unlink(missing_ok=True)
if not output_path.exists():
return None
from vigilar.storage.queries import insert_recording
insert_recording(
engine, camera_id="all",
started_at=int(datetime.datetime.combine(date, datetime.time.min).timestamp()),
ended_at=int(datetime.datetime.combine(date, datetime.time.max).timestamp()),
duration_s=len(clips) * config.clip_duration_s,
file_path=str(output_path), file_size=output_path.stat().st_size,
trigger=RecordingTrigger.HIGHLIGHT, encrypted=0, starred=0,
)
log.info("Highlight reel: %s (%d clips)", output_path, len(clips))
return str(output_path)
def _extract_clip(engine, event, duration_s, recordings_dir):
event_ts = event["ts"] / 1000
source_id = event.get("source_id", "")
with engine.connect() as conn:
row = conn.execute(
select(recordings).where(
recordings.c.camera_id == source_id,
recordings.c.started_at <= int(event_ts),
recordings.c.ended_at >= int(event_ts),
).limit(1)
).mappings().first()
if not row or not Path(row["file_path"]).exists():
return None
offset = max(0, event_ts - row["started_at"] - duration_s / 2)
clip_path = Path(recordings_dir) / "highlights" / f"clip_{int(event_ts)}.mp4"
source = source_id.replace("_", " ").title()
ts_str = datetime.datetime.fromtimestamp(event_ts).strftime("%I:%M %p")
watermark = f"{source}{ts_str}"
cmd = [
"ffmpeg", "-y", "-ss", str(offset), "-t", str(duration_s),
"-i", row["file_path"],
"-vf", (f"drawtext=text='{watermark}':fontsize=24:fontcolor=white:"
f"x=20:y=h-50:shadowcolor=black:shadowx=2:shadowy=2,"
f"scale=1280:720:force_original_aspect_ratio=decrease,"
f"pad=1280:720:(ow-iw)/2:(oh-ih)/2"),
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "23", "-an", str(clip_path),
]
try:
subprocess.run(cmd, capture_output=True, timeout=30, check=True)
return str(clip_path)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return None
def _concatenate_clips(clips, output_path):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
for clip in clips:
f.write(f"file '{clip}'\n")
concat_file = f.name
cmd = [
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_file,
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "23",
"-movflags", "+faststart", output_path,
]
try:
subprocess.run(cmd, capture_output=True, timeout=120, check=True)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
log.error("Reel concatenation failed")
finally:
Path(concat_file).unlink(missing_ok=True)