vigilar/vigilar/storage/queries.py
Aaron D. Lee d83710b839 Add pet and wildlife database query functions
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 13:13:56 -04:00

461 lines
13 KiB
Python

"""Named query functions for Vigilar database operations."""
import json
import time
from typing import Any
from sqlalchemy import desc, select
from sqlalchemy.engine import Engine
from vigilar.storage.schema import (
alert_log,
arm_state_log,
events,
pet_sightings,
pet_training_images,
pets,
push_subscriptions,
recordings,
sensor_states,
system_events,
wildlife_sightings,
)
# --- Events ---
def insert_event(
engine: Engine,
event_type: str,
severity: str,
source_id: str | None = None,
payload: dict[str, Any] | None = None,
) -> int:
ts = int(time.time() * 1000)
with engine.begin() as conn:
result = conn.execute(
events.insert().values(
ts=ts,
type=event_type,
source_id=source_id,
severity=severity,
payload=json.dumps(payload) if payload else None,
)
)
return result.inserted_primary_key[0]
def get_events(
engine: Engine,
event_type: str | None = None,
severity: str | None = None,
source_id: str | None = None,
since_ts: int | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]:
query = select(events).order_by(desc(events.c.ts)).limit(limit).offset(offset)
if event_type:
query = query.where(events.c.type == event_type)
if severity:
query = query.where(events.c.severity == severity)
if source_id:
query = query.where(events.c.source_id == source_id)
if since_ts:
query = query.where(events.c.ts >= since_ts)
with engine.connect() as conn:
rows = conn.execute(query).mappings().all()
return [dict(r) for r in rows]
def acknowledge_event(engine: Engine, event_id: int) -> bool:
ts = int(time.time() * 1000)
with engine.begin() as conn:
result = conn.execute(
events.update()
.where(events.c.id == event_id)
.values(acknowledged=1, ack_ts=ts)
)
return result.rowcount > 0
# --- Recordings ---
def insert_recording(engine: Engine, **kwargs: Any) -> int:
with engine.begin() as conn:
result = conn.execute(recordings.insert().values(**kwargs))
return result.inserted_primary_key[0]
def get_recordings(
engine: Engine,
camera_id: str | None = None,
since_ts: int | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
query = select(recordings).order_by(desc(recordings.c.started_at)).limit(limit)
if camera_id:
query = query.where(recordings.c.camera_id == camera_id)
if since_ts:
query = query.where(recordings.c.started_at >= since_ts)
with engine.connect() as conn:
return [dict(r) for r in conn.execute(query).mappings().all()]
def get_timeline_data(
engine: Engine,
camera_id: str,
day_start_ts: int,
day_end_ts: int,
) -> list[dict[str, Any]]:
query = (
select(
recordings.c.id,
recordings.c.started_at,
recordings.c.ended_at,
recordings.c.detection_type,
recordings.c.starred,
)
.where(
recordings.c.camera_id == camera_id,
recordings.c.started_at >= day_start_ts,
recordings.c.started_at < day_end_ts,
)
.order_by(recordings.c.started_at.asc())
)
with engine.connect() as conn:
return [dict(r) for r in conn.execute(query).mappings().all()]
def delete_recording(engine: Engine, recording_id: int) -> bool:
with engine.begin() as conn:
result = conn.execute(
recordings.delete().where(recordings.c.id == recording_id)
)
return result.rowcount > 0
# --- Sensor States ---
def upsert_sensor_state(
engine: Engine, sensor_id: str, state_key: str, value: Any
) -> None:
ts = int(time.time() * 1000)
value_str = json.dumps(value) if not isinstance(value, str) else value
with engine.begin() as conn:
existing = conn.execute(
select(sensor_states).where(
sensor_states.c.sensor_id == sensor_id,
sensor_states.c.state_key == state_key,
)
).first()
if existing:
conn.execute(
sensor_states.update()
.where(
sensor_states.c.sensor_id == sensor_id,
sensor_states.c.state_key == state_key,
)
.values(value=value_str, updated_at=ts)
)
else:
conn.execute(
sensor_states.insert().values(
sensor_id=sensor_id,
state_key=state_key,
value=value_str,
updated_at=ts,
)
)
def get_sensor_state(engine: Engine, sensor_id: str) -> dict[str, Any]:
with engine.connect() as conn:
rows = conn.execute(
select(sensor_states).where(sensor_states.c.sensor_id == sensor_id)
).mappings().all()
return {r["state_key"]: json.loads(r["value"]) for r in rows}
# --- Arm State ---
def insert_arm_state(
engine: Engine, state: str, triggered_by: str, pin_hash: str | None = None
) -> None:
ts = int(time.time() * 1000)
with engine.begin() as conn:
conn.execute(
arm_state_log.insert().values(
ts=ts, state=state, triggered_by=triggered_by, pin_hash=pin_hash
)
)
def get_current_arm_state(engine: Engine) -> str | None:
with engine.connect() as conn:
row = conn.execute(
select(arm_state_log.c.state)
.order_by(desc(arm_state_log.c.ts))
.limit(1)
).first()
return row[0] if row else None
# --- Alert Log ---
def insert_alert_log(
engine: Engine,
event_id: int | None,
channel: str,
status: str,
error: str | None = None,
) -> None:
ts = int(time.time() * 1000)
with engine.begin() as conn:
conn.execute(
alert_log.insert().values(
ts=ts, event_id=event_id, channel=channel, status=status, error=error
)
)
# --- System Events ---
def insert_system_event(
engine: Engine, component: str, level: str, message: str
) -> None:
ts = int(time.time() * 1000)
with engine.begin() as conn:
conn.execute(
system_events.insert().values(
ts=ts, component=component, level=level, message=message
)
)
# --- Push Subscriptions ---
def save_push_subscription(
engine: Engine, endpoint: str, p256dh_key: str, auth_key: str, user_agent: str = ""
) -> None:
ts = int(time.time() * 1000)
with engine.begin() as conn:
existing = conn.execute(
select(push_subscriptions).where(push_subscriptions.c.endpoint == endpoint)
).first()
if existing:
conn.execute(
push_subscriptions.update()
.where(push_subscriptions.c.endpoint == endpoint)
.values(p256dh_key=p256dh_key, auth_key=auth_key, last_used_at=ts)
)
else:
conn.execute(
push_subscriptions.insert().values(
endpoint=endpoint,
p256dh_key=p256dh_key,
auth_key=auth_key,
created_at=ts,
user_agent=user_agent,
)
)
def get_push_subscriptions(engine: Engine) -> list[dict[str, Any]]:
with engine.connect() as conn:
rows = conn.execute(select(push_subscriptions)).mappings().all()
return [dict(r) for r in rows]
def delete_push_subscription(engine: Engine, endpoint: str) -> bool:
with engine.begin() as conn:
result = conn.execute(
push_subscriptions.delete().where(push_subscriptions.c.endpoint == endpoint)
)
return result.rowcount > 0
# --- Pets ---
def insert_pet(
engine: Engine,
name: str,
species: str,
breed: str | None = None,
color_description: str | None = None,
photo_path: str | None = None,
) -> str:
import uuid
pet_id = str(uuid.uuid4())
with engine.begin() as conn:
conn.execute(pets.insert().values(
id=pet_id, name=name, species=species, breed=breed,
color_description=color_description, photo_path=photo_path,
training_count=0, created_at=time.time(),
))
return pet_id
def get_pet(engine: Engine, pet_id: str) -> dict[str, Any] | None:
with engine.connect() as conn:
row = conn.execute(pets.select().where(pets.c.id == pet_id)).first()
return dict(row._mapping) if row else None
def get_all_pets(engine: Engine) -> list[dict[str, Any]]:
with engine.connect() as conn:
rows = conn.execute(pets.select().order_by(pets.c.name)).fetchall()
return [dict(r._mapping) for r in rows]
# --- Pet Sightings ---
def insert_pet_sighting(
engine: Engine,
species: str,
camera_id: str,
confidence: float,
pet_id: str | None = None,
crop_path: str | None = None,
event_id: int | None = None,
) -> int:
with engine.begin() as conn:
result = conn.execute(pet_sightings.insert().values(
ts=time.time(), pet_id=pet_id, species=species,
camera_id=camera_id, confidence=confidence,
crop_path=crop_path, labeled=1 if pet_id else 0,
event_id=event_id,
))
return result.inserted_primary_key[0]
def get_pet_sightings(
engine: Engine,
pet_id: str | None = None,
camera_id: str | None = None,
since_ts: float | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
query = select(pet_sightings).order_by(desc(pet_sightings.c.ts)).limit(limit)
if pet_id:
query = query.where(pet_sightings.c.pet_id == pet_id)
if camera_id:
query = query.where(pet_sightings.c.camera_id == camera_id)
if since_ts:
query = query.where(pet_sightings.c.ts >= since_ts)
with engine.connect() as conn:
rows = conn.execute(query).fetchall()
return [dict(r._mapping) for r in rows]
def get_pet_last_location(engine: Engine, pet_id: str) -> dict[str, Any] | None:
with engine.connect() as conn:
row = conn.execute(
select(pet_sightings)
.where(pet_sightings.c.pet_id == pet_id)
.order_by(desc(pet_sightings.c.ts))
.limit(1)
).first()
return dict(row._mapping) if row else None
def get_unlabeled_sightings(
engine: Engine,
species: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
query = (
select(pet_sightings)
.where(pet_sightings.c.labeled == 0)
.order_by(desc(pet_sightings.c.ts))
.limit(limit)
)
if species:
query = query.where(pet_sightings.c.species == species)
with engine.connect() as conn:
rows = conn.execute(query).fetchall()
return [dict(r._mapping) for r in rows]
def label_sighting(engine: Engine, sighting_id: int, pet_id: str) -> None:
with engine.begin() as conn:
conn.execute(
pet_sightings.update()
.where(pet_sightings.c.id == sighting_id)
.values(pet_id=pet_id, labeled=1)
)
# --- Wildlife Sightings ---
def insert_wildlife_sighting(
engine: Engine,
species: str,
threat_level: str,
camera_id: str,
confidence: float,
crop_path: str | None = None,
event_id: int | None = None,
) -> int:
with engine.begin() as conn:
result = conn.execute(wildlife_sightings.insert().values(
ts=time.time(), species=species, threat_level=threat_level,
camera_id=camera_id, confidence=confidence,
crop_path=crop_path, event_id=event_id,
))
return result.inserted_primary_key[0]
def get_wildlife_sightings(
engine: Engine,
threat_level: str | None = None,
camera_id: str | None = None,
since_ts: float | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
query = select(wildlife_sightings).order_by(desc(wildlife_sightings.c.ts)).limit(limit)
if threat_level:
query = query.where(wildlife_sightings.c.threat_level == threat_level)
if camera_id:
query = query.where(wildlife_sightings.c.camera_id == camera_id)
if since_ts:
query = query.where(wildlife_sightings.c.ts >= since_ts)
with engine.connect() as conn:
rows = conn.execute(query).fetchall()
return [dict(r._mapping) for r in rows]
# --- Training Images ---
def insert_training_image(
engine: Engine,
pet_id: str,
image_path: str,
source: str,
) -> int:
with engine.begin() as conn:
result = conn.execute(pet_training_images.insert().values(
pet_id=pet_id, image_path=image_path,
source=source, created_at=time.time(),
))
conn.execute(
pets.update().where(pets.c.id == pet_id)
.values(training_count=pets.c.training_count + 1)
)
return result.inserted_primary_key[0]
def get_training_images(
engine: Engine,
pet_id: str,
) -> list[dict[str, Any]]:
with engine.connect() as conn:
rows = conn.execute(
select(pet_training_images)
.where(pet_training_images.c.pet_id == pet_id)
.order_by(desc(pet_training_images.c.created_at))
).fetchall()
return [dict(r._mapping) for r in rows]