From 602945e99d910b122cc340885d56ec0197cb32c3 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Fri, 3 Apr 2026 17:42:35 -0400 Subject: [PATCH] feat(F2): recording list, download (decrypt), and delete API Co-Authored-By: Claude Sonnet 4.6 --- tests/unit/test_recordings_api.py | 120 +++++++++++++++++++++++++++ vigilar/web/blueprints/recordings.py | 109 ++++++++++++++++++++++-- 2 files changed, 221 insertions(+), 8 deletions(-) create mode 100644 tests/unit/test_recordings_api.py diff --git a/tests/unit/test_recordings_api.py b/tests/unit/test_recordings_api.py new file mode 100644 index 0000000..791569f --- /dev/null +++ b/tests/unit/test_recordings_api.py @@ -0,0 +1,120 @@ +"""Tests for recording list, download, and delete API.""" + +import os +import time +from pathlib import Path +from unittest.mock import patch + +import pytest +from vigilar.config import VigilarConfig +from vigilar.storage.encryption import encrypt_file +from vigilar.storage.queries import insert_recording +from vigilar.web.app import create_app + + +@pytest.fixture +def app_with_db(test_db): + cfg = VigilarConfig() + app = create_app(cfg) + app.config["TESTING"] = True + app.config["DB_ENGINE"] = test_db + return app + + +@pytest.fixture +def seeded_app(app_with_db, test_db): + now = int(time.time()) + insert_recording(test_db, camera_id="front", started_at=now - 3600, ended_at=now - 3500, + duration_s=100, file_path="/tmp/r1.mp4", file_size=1000, trigger="MOTION", encrypted=0, starred=0) + insert_recording(test_db, camera_id="front", started_at=now - 1800, ended_at=now - 1700, + duration_s=100, file_path="/tmp/r2.mp4", file_size=2000, trigger="PERSON", encrypted=0, + starred=1, detection_type="person") + insert_recording(test_db, camera_id="back", started_at=now - 900, ended_at=now - 800, + duration_s=100, file_path="/tmp/r3.mp4", file_size=3000, trigger="MOTION", encrypted=0, starred=0) + return app_with_db + + +def test_recordings_api_list_all(seeded_app): + with seeded_app.test_client() as c: + rv = c.get("/recordings/api/list") + assert rv.status_code == 200 + assert len(rv.get_json()) == 3 + + +def test_recordings_api_filter_camera(seeded_app): + with seeded_app.test_client() as c: + rv = c.get("/recordings/api/list?camera_id=front") + data = rv.get_json() + assert len(data) == 2 + assert all(r["camera_id"] == "front" for r in data) + + +def test_recordings_api_filter_starred(seeded_app): + with seeded_app.test_client() as c: + rv = c.get("/recordings/api/list?starred=1") + data = rv.get_json() + assert len(data) == 1 + assert data[0]["starred"] is True + + +def test_recordings_api_filter_detection_type(seeded_app): + with seeded_app.test_client() as c: + rv = c.get("/recordings/api/list?detection_type=person") + data = rv.get_json() + assert len(data) == 1 + + +def test_download_plain_mp4(app_with_db, test_db, tmp_path): + mp4_file = tmp_path / "plain.mp4" + mp4_file.write_bytes(b"fake mp4 content") + rec_id = insert_recording(test_db, camera_id="front", started_at=1000, ended_at=1100, + duration_s=100, file_path=str(mp4_file), file_size=16, trigger="MOTION", encrypted=0, starred=0) + with app_with_db.test_client() as c: + rv = c.get(f"/recordings/{rec_id}/download") + assert rv.status_code == 200 + assert rv.data == b"fake mp4 content" + + +def test_download_encrypted_vge(app_with_db, test_db, tmp_path): + key_hex = os.urandom(32).hex() + mp4_file = tmp_path / "encrypted.mp4" + original_content = b"secret recording data" * 100 + mp4_file.write_bytes(original_content) + vge_path = encrypt_file(str(mp4_file), key_hex) + rec_id = insert_recording(test_db, camera_id="front", started_at=2000, ended_at=2100, + duration_s=100, file_path=vge_path, file_size=len(original_content) + 16, + trigger="MOTION", encrypted=1, starred=0) + with patch.dict(os.environ, {"VIGILAR_ENCRYPTION_KEY": key_hex}): + with app_with_db.test_client() as c: + rv = c.get(f"/recordings/{rec_id}/download") + assert rv.status_code == 200 + assert rv.data == original_content + + +def test_download_not_found(app_with_db): + with app_with_db.test_client() as c: + rv = c.get("/recordings/99999/download") + assert rv.status_code == 404 + + +def test_delete_recording(app_with_db, test_db, tmp_path): + mp4_file = tmp_path / "to_delete.mp4" + mp4_file.write_bytes(b"content") + rec_id = insert_recording(test_db, camera_id="front", started_at=3000, ended_at=3100, + duration_s=100, file_path=str(mp4_file), file_size=7, trigger="MOTION", encrypted=0, starred=0) + with app_with_db.test_client() as c: + rv = c.delete(f"/recordings/{rec_id}") + assert rv.status_code == 200 + assert rv.get_json()["ok"] is True + assert not mp4_file.exists() + from sqlalchemy import select + from vigilar.storage.schema import recordings + with test_db.connect() as conn: + row = conn.execute(select(recordings).where(recordings.c.id == rec_id)).first() + assert row is None + + +def test_delete_recording_not_found(app_with_db): + with app_with_db.test_client() as c: + rv = c.delete("/recordings/99999") + assert rv.status_code == 404 diff --git a/vigilar/web/blueprints/recordings.py b/vigilar/web/blueprints/recordings.py index c93be35..fd2467b 100644 --- a/vigilar/web/blueprints/recordings.py +++ b/vigilar/web/blueprints/recordings.py @@ -3,7 +3,7 @@ import time as time_mod from datetime import datetime, timedelta -from flask import Blueprint, current_app, jsonify, render_template, request +from flask import Blueprint, Response, current_app, jsonify, render_template, request recordings_bp = Blueprint("recordings", __name__, url_prefix="/recordings") @@ -18,10 +18,46 @@ def recordings_list(): @recordings_bp.route("/api/list") def recordings_api(): """JSON API: recording list.""" - camera_id = request.args.get("camera") - limit = request.args.get("limit", 50, type=int) - # TODO: query from DB - return jsonify([]) + engine = current_app.config.get("DB_ENGINE") + if engine is None: + return jsonify([]) + + from sqlalchemy import desc, select + from vigilar.storage.schema import recordings + + query = select(recordings).order_by(desc(recordings.c.started_at)) + + camera_id = request.args.get("camera_id") + if camera_id: + query = query.where(recordings.c.camera_id == camera_id) + + date_str = request.args.get("date") + if date_str: + day = datetime.strptime(date_str, "%Y-%m-%d") + day_start = int(day.timestamp()) + day_end = int((day + timedelta(days=1)).timestamp()) + query = query.where(recordings.c.started_at >= day_start, recordings.c.started_at < day_end) + + detection_type = request.args.get("detection_type") + if detection_type: + query = query.where(recordings.c.detection_type == detection_type) + + starred = request.args.get("starred") + if starred is not None: + query = query.where(recordings.c.starred == int(starred)) + + limit = min(request.args.get("limit", 50, type=int), 500) + query = query.limit(limit) + + with engine.connect() as conn: + rows = conn.execute(query).mappings().all() + + return jsonify([{ + "id": r["id"], "camera_id": r["camera_id"], + "started_at": r["started_at"], "ended_at": r["ended_at"], + "duration_s": r["duration_s"], "detection_type": r["detection_type"], + "starred": bool(r["starred"]), "file_size": r["file_size"], "trigger": r["trigger"], + } for r in rows]) @recordings_bp.route("/api/timeline") @@ -63,12 +99,69 @@ def timeline_api(): @recordings_bp.route("//download") def recording_download(recording_id: int): """Stream decrypted recording for download/playback.""" - # TODO: Phase 6 — decrypt and stream - return "Recording not available", 503 + engine = current_app.config.get("DB_ENGINE") + if engine is None: + return "Database not available", 503 + + from pathlib import Path + from sqlalchemy import select + from vigilar.storage.schema import recordings + + with engine.connect() as conn: + row = conn.execute(select(recordings).where(recordings.c.id == recording_id)).mappings().first() + + if not row: + return "Recording not found", 404 + + file_path = row["file_path"] + if not Path(file_path).exists(): + return "Recording file not found", 404 + + is_encrypted = row["encrypted"] and file_path.endswith(".vge") + + if is_encrypted: + import os + key_hex = os.environ.get("VIGILAR_ENCRYPTION_KEY") + if not key_hex: + return "Encryption key not configured", 500 + from vigilar.storage.encryption import decrypt_stream + return Response(decrypt_stream(file_path, key_hex), mimetype="video/mp4", + headers={"Content-Disposition": f"inline; filename=recording_{recording_id}.mp4"}) + else: + return Response(_read_file_chunks(file_path), mimetype="video/mp4", + headers={"Content-Disposition": f"inline; filename=recording_{recording_id}.mp4"}) + + +def _read_file_chunks(path: str, chunk_size: int = 64 * 1024): + with open(path, "rb") as f: + while True: + chunk = f.read(chunk_size) + if not chunk: + break + yield chunk @recordings_bp.route("/", methods=["DELETE"]) def recording_delete(recording_id: int): """Delete a recording.""" - # TODO: delete from DB + filesystem + engine = current_app.config.get("DB_ENGINE") + if engine is None: + return jsonify({"error": "Database not available"}), 503 + + from pathlib import Path + from sqlalchemy import select + from vigilar.storage.schema import recordings + + with engine.connect() as conn: + row = conn.execute(select(recordings).where(recordings.c.id == recording_id)).mappings().first() + + if not row: + return jsonify({"error": "Recording not found"}), 404 + + file_path = Path(row["file_path"]) + if file_path.exists(): + file_path.unlink() + + from vigilar.storage.queries import delete_recording + delete_recording(engine, recording_id) return jsonify({"ok": True})