feat(Q6): timelapse generator, schedules, and web routes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee 2026-04-03 19:08:29 -04:00
parent 622af22642
commit bdfadbb829
5 changed files with 236 additions and 0 deletions

View File

@ -0,0 +1,49 @@
import datetime
import time
import pytest
from vigilar.config import VigilarConfig, CameraConfig
from vigilar.storage.queries import insert_timelapse_schedule, get_timelapse_schedules, delete_timelapse_schedule
from vigilar.highlights.timelapse import generate_timelapse
from vigilar.web.app import create_app
def test_insert_schedule(test_db):
sid = insert_timelapse_schedule(test_db, "front", "Daily", 6, 20, "20:00")
assert sid > 0
def test_get_schedules(test_db):
insert_timelapse_schedule(test_db, "front", "A", 6, 12, "12:00")
insert_timelapse_schedule(test_db, "front", "B", 12, 20, "20:00")
assert len(get_timelapse_schedules(test_db, "front")) == 2
def test_delete_schedule(test_db):
sid = insert_timelapse_schedule(test_db, "front", "Test", 6, 20, "20:00")
delete_timelapse_schedule(test_db, sid)
assert len(get_timelapse_schedules(test_db, "front")) == 0
def test_generate_timelapse_no_recordings(test_db, tmp_path):
result = generate_timelapse("front", datetime.date.today(), 6, 20, 30, str(tmp_path), test_db)
assert result is None
@pytest.fixture
def timelapse_app(test_db):
cfg = VigilarConfig(cameras=[CameraConfig(id="front", display_name="Front", rtsp_url="rtsp://x")])
app = create_app(cfg)
app.config["TESTING"] = True
app.config["DB_ENGINE"] = test_db
return app
def test_post_timelapse(timelapse_app):
with timelapse_app.test_client() as c:
rv = c.post("/cameras/front/timelapse", json={"date": "2026-04-02"})
assert rv.status_code in (200, 202)
def test_get_schedules_route(timelapse_app, test_db):
insert_timelapse_schedule(test_db, "front", "Test", 6, 20, "20:00")
with timelapse_app.test_client() as c:
rv = c.get("/cameras/front/timelapse/schedules")
assert len(rv.get_json()) == 1
def test_create_schedule_route(timelapse_app):
with timelapse_app.test_client() as c:
rv = c.post("/cameras/front/timelapse/schedule", json={"name": "Daily", "time": "20:00"})
assert rv.status_code == 200

View File

@ -0,0 +1,96 @@
"""Time-lapse video generator with scheduling."""
import datetime
import logging
import shutil
import subprocess
import time
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.engine import Engine
from vigilar.constants import RecordingTrigger
from vigilar.storage.schema import recordings
log = logging.getLogger(__name__)
def generate_timelapse(
camera_id, date, start_hour, end_hour, fps, recordings_dir, engine,
) -> str | None:
day_start = int(datetime.datetime.combine(date, datetime.time(start_hour)).timestamp())
day_end = int(datetime.datetime.combine(date, datetime.time(end_hour)).timestamp())
with engine.connect() as conn:
rows = conn.execute(
select(recordings).where(
recordings.c.camera_id == camera_id,
recordings.c.started_at >= day_start,
recordings.c.started_at < day_end,
).order_by(recordings.c.started_at.asc())
).mappings().all()
if not rows:
log.info("No recordings for timelapse: %s on %s", camera_id, date)
return None
frames_dir = Path(recordings_dir) / "timelapse_tmp" / camera_id
frames_dir.mkdir(parents=True, exist_ok=True)
frame_idx = 0
for row in rows:
src_path = row["file_path"]
if not Path(src_path).exists():
continue
duration_s = row.get("duration_s", 60) or 60
for offset in range(0, int(duration_s), 60):
frame_path = frames_dir / f"frame_{frame_idx:06d}.jpg"
cmd = ["ffmpeg", "-y", "-ss", str(offset), "-i", src_path,
"-frames:v", "1", "-q:v", "2", str(frame_path)]
try:
subprocess.run(cmd, capture_output=True, timeout=10, check=True)
frame_idx += 1
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
continue
if frame_idx == 0:
return None
output_dir = Path(recordings_dir) / "timelapses"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{camera_id}_{date.isoformat()}_{start_hour}-{end_hour}.mp4"
cmd = ["ffmpeg", "-y", "-framerate", str(fps),
"-i", str(frames_dir / "frame_%06d.jpg"),
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "23",
"-pix_fmt", "yuv420p", "-movflags", "+faststart", str(output_path)]
try:
subprocess.run(cmd, capture_output=True, timeout=120, check=True)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return None
finally:
shutil.rmtree(frames_dir, ignore_errors=True)
if not output_path.exists():
return None
from vigilar.storage.queries import insert_recording
insert_recording(engine, camera_id=camera_id, started_at=day_start, ended_at=day_end,
duration_s=frame_idx / fps, file_path=str(output_path),
file_size=output_path.stat().st_size,
trigger=RecordingTrigger.TIMELAPSE, encrypted=0, starred=0)
return str(output_path)
def check_schedules(engine, recordings_dir):
from vigilar.storage.queries import get_timelapse_schedules
now = datetime.datetime.now()
current_time = now.strftime("%H:%M")
for sched in get_timelapse_schedules(engine):
if not sched.get("enabled"):
continue
if sched["generate_time"] != current_time:
continue
generate_timelapse(sched["camera_id"], now.date(), sched["start_hour"],
sched["end_hour"], 30, recordings_dir, engine)

View File

@ -686,6 +686,31 @@ def get_visits(engine, profile_id=None, camera_id=None, limit=50) -> list[dict]:
return [dict(r) for r in conn.execute(query).mappings().all()]
def insert_timelapse_schedule(engine, camera_id, name, start_hour, end_hour, generate_time) -> int:
from vigilar.storage.schema import timelapse_schedules
with engine.begin() as conn:
result = conn.execute(timelapse_schedules.insert().values(
camera_id=camera_id, name=name, start_hour=start_hour, end_hour=end_hour,
generate_time=generate_time, enabled=1, created_at=time.time()))
return result.inserted_primary_key[0]
def get_timelapse_schedules(engine, camera_id=None) -> list[dict]:
from vigilar.storage.schema import timelapse_schedules
query = select(timelapse_schedules)
if camera_id:
query = query.where(timelapse_schedules.c.camera_id == camera_id)
with engine.connect() as conn:
return [dict(r) for r in conn.execute(query).mappings().all()]
def delete_timelapse_schedule(engine, schedule_id) -> bool:
from vigilar.storage.schema import timelapse_schedules
with engine.begin() as conn:
return conn.execute(timelapse_schedules.delete().where(
timelapse_schedules.c.id == schedule_id)).rowcount > 0
def get_active_visits(engine) -> list[dict]:
from vigilar.storage.schema import visits
with engine.connect() as conn:

View File

@ -249,3 +249,15 @@ visits = Table(
)
Index("idx_visits_profile_ts", visits.c.profile_id, visits.c.arrived_at.desc())
Index("idx_visits_ts", visits.c.arrived_at.desc())
timelapse_schedules = Table(
"timelapse_schedules", metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("camera_id", String, nullable=False),
Column("name", String, nullable=False),
Column("start_hour", Integer, nullable=False),
Column("end_hour", Integer, nullable=False),
Column("generate_time", String, nullable=False),
Column("enabled", Integer, nullable=False, default=1),
Column("created_at", Float, nullable=False),
)

View File

@ -88,6 +88,60 @@ def camera_heatmap(camera_id: str):
return Response(png_bytes, mimetype="image/png")
@cameras_bp.route("/<camera_id>/timelapse", methods=["POST"])
def start_timelapse(camera_id):
data = request.get_json() or {}
date_str = data.get("date")
if not date_str:
return jsonify({"error": "date required"}), 400
import datetime
import threading
date = datetime.date.fromisoformat(date_str)
engine = current_app.config.get("DB_ENGINE")
cfg = current_app.config.get("VIGILAR_CONFIG")
recordings_dir = cfg.system.recordings_dir if cfg else "/var/vigilar/recordings"
def run():
from vigilar.highlights.timelapse import generate_timelapse
generate_timelapse(camera_id, date, data.get("start_hour", 6),
data.get("end_hour", 20), data.get("fps", 30), recordings_dir, engine)
threading.Thread(target=run, daemon=True).start()
return jsonify({"ok": True, "status": "generating"}), 202
@cameras_bp.route("/<camera_id>/timelapse/status")
def timelapse_status(camera_id):
return jsonify({"status": "idle"})
@cameras_bp.route("/<camera_id>/timelapse/schedules")
def timelapse_schedules(camera_id):
engine = current_app.config.get("DB_ENGINE")
if engine is None:
return jsonify([])
from vigilar.storage.queries import get_timelapse_schedules
return jsonify(get_timelapse_schedules(engine, camera_id))
@cameras_bp.route("/<camera_id>/timelapse/schedule", methods=["POST"])
def create_timelapse_schedule(camera_id):
data = request.get_json() or {}
if not data.get("name"):
return jsonify({"error": "name required"}), 400
engine = current_app.config.get("DB_ENGINE")
if engine is None:
return jsonify({"error": "database not available"}), 503
from vigilar.storage.queries import insert_timelapse_schedule
sid = insert_timelapse_schedule(engine, camera_id, data["name"],
data.get("start_hour", 6), data.get("end_hour", 20), data.get("time", "20:00"))
return jsonify({"ok": True, "id": sid})
@cameras_bp.route("/<camera_id>/timelapse/schedule/<int:schedule_id>", methods=["DELETE"])
def delete_timelapse_schedule_route(camera_id, schedule_id):
engine = current_app.config.get("DB_ENGINE")
if engine is None:
return jsonify({"error": "database not available"}), 503
from vigilar.storage.queries import delete_timelapse_schedule
delete_timelapse_schedule(engine, schedule_id)
return jsonify({"ok": True})
@cameras_bp.route("/api/status")
def cameras_status_api():
"""JSON API: all camera statuses."""