vigilar/tests/unit/test_recordings_api.py
Aaron D. Lee 602945e99d feat(F2): recording list, download (decrypt), and delete API
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 17:42:35 -04:00

121 lines
4.5 KiB
Python

"""Tests for recording list, download, and delete API."""
import os
import time
from pathlib import Path
from unittest.mock import patch
import pytest
from vigilar.config import VigilarConfig
from vigilar.storage.encryption import encrypt_file
from vigilar.storage.queries import insert_recording
from vigilar.web.app import create_app
@pytest.fixture
def app_with_db(test_db):
cfg = VigilarConfig()
app = create_app(cfg)
app.config["TESTING"] = True
app.config["DB_ENGINE"] = test_db
return app
@pytest.fixture
def seeded_app(app_with_db, test_db):
now = int(time.time())
insert_recording(test_db, camera_id="front", started_at=now - 3600, ended_at=now - 3500,
duration_s=100, file_path="/tmp/r1.mp4", file_size=1000, trigger="MOTION", encrypted=0, starred=0)
insert_recording(test_db, camera_id="front", started_at=now - 1800, ended_at=now - 1700,
duration_s=100, file_path="/tmp/r2.mp4", file_size=2000, trigger="PERSON", encrypted=0,
starred=1, detection_type="person")
insert_recording(test_db, camera_id="back", started_at=now - 900, ended_at=now - 800,
duration_s=100, file_path="/tmp/r3.mp4", file_size=3000, trigger="MOTION", encrypted=0, starred=0)
return app_with_db
def test_recordings_api_list_all(seeded_app):
with seeded_app.test_client() as c:
rv = c.get("/recordings/api/list")
assert rv.status_code == 200
assert len(rv.get_json()) == 3
def test_recordings_api_filter_camera(seeded_app):
with seeded_app.test_client() as c:
rv = c.get("/recordings/api/list?camera_id=front")
data = rv.get_json()
assert len(data) == 2
assert all(r["camera_id"] == "front" for r in data)
def test_recordings_api_filter_starred(seeded_app):
with seeded_app.test_client() as c:
rv = c.get("/recordings/api/list?starred=1")
data = rv.get_json()
assert len(data) == 1
assert data[0]["starred"] is True
def test_recordings_api_filter_detection_type(seeded_app):
with seeded_app.test_client() as c:
rv = c.get("/recordings/api/list?detection_type=person")
data = rv.get_json()
assert len(data) == 1
def test_download_plain_mp4(app_with_db, test_db, tmp_path):
mp4_file = tmp_path / "plain.mp4"
mp4_file.write_bytes(b"fake mp4 content")
rec_id = insert_recording(test_db, camera_id="front", started_at=1000, ended_at=1100,
duration_s=100, file_path=str(mp4_file), file_size=16, trigger="MOTION", encrypted=0, starred=0)
with app_with_db.test_client() as c:
rv = c.get(f"/recordings/{rec_id}/download")
assert rv.status_code == 200
assert rv.data == b"fake mp4 content"
def test_download_encrypted_vge(app_with_db, test_db, tmp_path):
key_hex = os.urandom(32).hex()
mp4_file = tmp_path / "encrypted.mp4"
original_content = b"secret recording data" * 100
mp4_file.write_bytes(original_content)
vge_path = encrypt_file(str(mp4_file), key_hex)
rec_id = insert_recording(test_db, camera_id="front", started_at=2000, ended_at=2100,
duration_s=100, file_path=vge_path, file_size=len(original_content) + 16,
trigger="MOTION", encrypted=1, starred=0)
with patch.dict(os.environ, {"VIGILAR_ENCRYPTION_KEY": key_hex}):
with app_with_db.test_client() as c:
rv = c.get(f"/recordings/{rec_id}/download")
assert rv.status_code == 200
assert rv.data == original_content
def test_download_not_found(app_with_db):
with app_with_db.test_client() as c:
rv = c.get("/recordings/99999/download")
assert rv.status_code == 404
def test_delete_recording(app_with_db, test_db, tmp_path):
mp4_file = tmp_path / "to_delete.mp4"
mp4_file.write_bytes(b"content")
rec_id = insert_recording(test_db, camera_id="front", started_at=3000, ended_at=3100,
duration_s=100, file_path=str(mp4_file), file_size=7, trigger="MOTION", encrypted=0, starred=0)
with app_with_db.test_client() as c:
rv = c.delete(f"/recordings/{rec_id}")
assert rv.status_code == 200
assert rv.get_json()["ok"] is True
assert not mp4_file.exists()
from sqlalchemy import select
from vigilar.storage.schema import recordings
with test_db.connect() as conn:
row = conn.execute(select(recordings).where(recordings.c.id == rec_id)).first()
assert row is None
def test_delete_recording_not_found(app_with_db):
with app_with_db.test_client() as c:
rv = c.delete("/recordings/99999")
assert rv.status_code == 404