feat(Q3): wildlife journal query functions

Add get_wildlife_sightings_paginated, get_wildlife_stats, and
get_wildlife_frequency to queries.py with full test coverage.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-04-03 18:46:25 -04:00
parent 66a53f0cd8
commit 8c2a8ea1c5
2 changed files with 97 additions and 0 deletions

View File

@@ -426,6 +426,65 @@ def get_wildlife_sightings(
return [dict(r._mapping) for r in rows]
# --- Wildlife Journal Queries ---
def get_wildlife_sightings_paginated(
engine: Engine,
species: str | None = None,
threat_level: str | None = None,
camera_id: str | None = None,
since_ts: float | None = None,
until_ts: float | None = None,
limit: int = 50,
offset: int = 0,
) -> list[dict[str, Any]]:
query = (
select(wildlife_sightings).order_by(desc(wildlife_sightings.c.ts))
.limit(limit).offset(offset)
)
if species:
query = query.where(wildlife_sightings.c.species == species)
if threat_level:
query = query.where(wildlife_sightings.c.threat_level == threat_level)
if camera_id:
query = query.where(wildlife_sightings.c.camera_id == camera_id)
if since_ts:
query = query.where(wildlife_sightings.c.ts >= since_ts)
if until_ts:
query = query.where(wildlife_sightings.c.ts < until_ts)
with engine.connect() as conn:
return [dict(r) for r in conn.execute(query).mappings().all()]
def get_wildlife_stats(engine: Engine) -> dict[str, Any]:
from sqlalchemy import func
with engine.connect() as conn:
total = conn.execute(
select(func.count()).select_from(wildlife_sightings)
).scalar() or 0
species_rows = conn.execute(
select(wildlife_sightings.c.species, func.count().label("cnt"))
.group_by(wildlife_sightings.c.species)
).mappings().all()
per_species = {r["species"]: r["cnt"] for r in species_rows}
return {"total": total, "species_count": len(per_species), "per_species": per_species}
def get_wildlife_frequency(engine: Engine) -> dict[str, dict[str, int]]:
import datetime
buckets: dict[str, dict[str, int]] = {
"00-04": {}, "04-08": {}, "08-12": {}, "12-16": {}, "16-20": {}, "20-24": {}
}
with engine.connect() as conn:
rows = conn.execute(select(wildlife_sightings)).mappings().all()
for r in rows:
dt = datetime.datetime.fromtimestamp(r["ts"])
bucket_key = list(buckets.keys())[dt.hour // 4]
species = r["species"]
buckets[bucket_key][species] = buckets[bucket_key].get(species, 0) + 1
return buckets
# --- Training Images ---
def insert_training_image(