vigilar/vigilar/camera/recorder.py
Aaron D. Lee 1b7f77b298 feat(F2): integrate AES-256-CTR encryption into AdaptiveRecorder
After FFmpeg finishes writing, _stop_ffmpeg() now reads VIGILAR_ENCRYPTION_KEY
and encrypts the MP4 to .vge format via encrypt_file(), updating the returned
RecordingSegment to reflect the encrypted file path and size.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 17:41:39 -04:00

250 lines
8.4 KiB
Python

"""Adaptive FPS recorder using FFmpeg subprocess.
Two recording modes:
1. Idle recording: writes frames at 2 FPS (every Nth frame skipped)
2. Motion recording: writes all frames at full 30 FPS, including
pre-motion buffer frames flushed from the ring buffer.
Uses FFmpeg via stdin pipe for frame-accurate control over what
gets written, while letting FFmpeg handle H.264 encoding efficiently.
"""
import logging
import os
import shutil
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import cv2
import numpy as np
from vigilar.camera.ring_buffer import TimestampedFrame
log = logging.getLogger(__name__)
@dataclass
class RecordingSegment:
"""Metadata about a completed recording segment."""
file_path: str
started_at: int # unix ms
ended_at: int # unix ms
duration_s: float
file_size: int
trigger: str # MOTION, CONTINUOUS, MANUAL
fps: int
frame_count: int
class AdaptiveRecorder:
"""Records video segments at adaptive FPS using FFmpeg.
Writes raw frames to FFmpeg via stdin pipe. FFmpeg encodes to H.264 MP4.
This gives us frame-accurate control over which frames get recorded
while keeping CPU usage low via hardware-accelerated encoding when available.
"""
def __init__(
self,
camera_id: str,
recordings_dir: str,
idle_fps: int = 2,
motion_fps: int = 30,
resolution: tuple[int, int] = (1920, 1080),
):
self._camera_id = camera_id
self._recordings_dir = Path(recordings_dir)
self._idle_fps = idle_fps
self._motion_fps = motion_fps
self._resolution = resolution
self._ffmpeg_path = shutil.which("ffmpeg") or "ffmpeg"
self._process: subprocess.Popen | None = None
self._current_path: Path | None = None
self._current_fps: int = idle_fps
self._current_trigger: str = "CONTINUOUS"
self._started_at: int = 0
self._frame_count: int = 0
self._recording = False
def _ensure_dir(self) -> Path:
"""Create date-based subdirectory for recordings."""
now = time.localtime()
subdir = self._recordings_dir / self._camera_id / f"{now.tm_year:04d}" / f"{now.tm_mon:02d}" / f"{now.tm_mday:02d}"
subdir.mkdir(parents=True, exist_ok=True)
return subdir
def _start_ffmpeg(self, fps: int) -> None:
"""Start an FFmpeg process for recording at the given FPS."""
subdir = self._ensure_dir()
timestamp = time.strftime("%H%M%S")
filename = f"{self._camera_id}_{timestamp}_{fps}fps.mp4"
self._current_path = subdir / filename
self._current_fps = fps
w, h = self._resolution
cmd = [
self._ffmpeg_path,
"-y",
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{w}x{h}",
"-r", str(fps),
"-i", "pipe:0",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
"-pix_fmt", "yuv420p",
"-movflags", "+faststart",
str(self._current_path),
]
try:
self._process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
self._started_at = int(time.time() * 1000)
self._frame_count = 0
self._recording = True
log.info("Recording started: %s at %d FPS", self._current_path.name, fps)
except FileNotFoundError:
log.error("FFmpeg not found at %s — recording disabled", self._ffmpeg_path)
self._process = None
self._recording = False
def _stop_ffmpeg(self) -> RecordingSegment | None:
"""Stop the current FFmpeg process and return segment metadata."""
if not self._process or not self._recording:
return None
try:
self._process.stdin.close()
self._process.wait(timeout=10)
except (subprocess.TimeoutExpired, BrokenPipeError):
self._process.kill()
self._process.wait()
self._recording = False
ended_at = int(time.time() * 1000)
duration = (ended_at - self._started_at) / 1000.0
file_size = 0
if self._current_path and self._current_path.exists():
file_size = self._current_path.stat().st_size
segment = RecordingSegment(
file_path=str(self._current_path) if self._current_path else "",
started_at=self._started_at,
ended_at=ended_at,
duration_s=duration,
file_size=file_size,
trigger=self._current_trigger,
fps=self._current_fps,
frame_count=self._frame_count,
)
log.info(
"Recording stopped: %s (%.1fs, %d frames, %d bytes)",
self._current_path.name if self._current_path else "unknown",
duration, self._frame_count, file_size,
)
# Encrypt if key is available
encryption_key = os.environ.get("VIGILAR_ENCRYPTION_KEY")
if encryption_key and self._current_path and self._current_path.exists():
try:
from vigilar.storage.encryption import encrypt_file
vge_path = encrypt_file(str(self._current_path), encryption_key)
segment = RecordingSegment(
file_path=vge_path,
started_at=self._started_at,
ended_at=ended_at,
duration_s=duration,
file_size=Path(vge_path).stat().st_size,
trigger=self._current_trigger,
fps=self._current_fps,
frame_count=self._frame_count,
)
log.info("Encrypted recording: %s", Path(vge_path).name)
except Exception:
log.exception("Encryption failed for %s, keeping plain MP4",
self._current_path.name)
self._process = None
self._current_path = None
return segment
def write_frame(self, frame: np.ndarray) -> None:
"""Write a single frame to the active recording."""
if not self._recording or not self._process or not self._process.stdin:
return
try:
# Resize to recording resolution if needed
h, w = frame.shape[:2]
rw, rh = self._resolution
if w != rw or h != rh:
frame = cv2.resize(frame, (rw, rh))
self._process.stdin.write(frame.tobytes())
self._frame_count += 1
except BrokenPipeError:
log.warning("FFmpeg pipe broken for %s", self._camera_id)
self._recording = False
def start_idle_recording(self) -> None:
"""Start or switch to idle (low FPS) recording."""
if self._recording and self._current_fps == self._idle_fps:
return # already recording at idle FPS
if self._recording:
self._stop_ffmpeg()
self._current_trigger = "CONTINUOUS"
self._start_ffmpeg(self._idle_fps)
def start_motion_recording(self, pre_buffer: list[TimestampedFrame] | None = None) -> None:
"""Start motion recording at full FPS.
Optionally flush pre-motion buffer frames into the new segment.
"""
if self._recording:
self._stop_ffmpeg()
self._current_trigger = "MOTION"
self._start_ffmpeg(self._motion_fps)
# Write pre-buffer frames
if pre_buffer:
for tsf in pre_buffer:
self.write_frame(tsf.frame)
log.debug("Flushed %d pre-buffer frames for %s", len(pre_buffer), self._camera_id)
def stop_recording(self) -> RecordingSegment | None:
"""Stop the current recording and return segment info."""
return self._stop_ffmpeg()
def start_manual_recording(self) -> None:
"""Start a manually-triggered recording at motion FPS."""
if self._recording:
self._stop_ffmpeg()
self._current_trigger = "MANUAL"
self._start_ffmpeg(self._motion_fps)
@property
def is_recording(self) -> bool:
return self._recording
@property
def current_fps(self) -> int:
return self._current_fps if self._recording else 0
@property
def current_trigger(self) -> str:
return self._current_trigger