fieldwitness/frontends/web/blueprints/attest.py
Aaron D. Lee 067c4073ee Add Verisoo attest/verify web MVP — full attestation lifecycle
Attest page (/attest):
- Image upload with optional caption and location
- EXIF auto-extraction toggle
- Creates Ed25519-signed attestation record
- Stores in verisoo append-only binary log + LMDB index
- Displays: record ID, attestor fingerprint, timestamp, image hashes

Verify page (/verify):
- Image upload for verification against local attestation log
- SHA-256 exact matching + perceptual hash matching (pHash, dHash)
- Shows match type (exact/perceptual), hash distances, attestor info
- Color-coded distance badges (green=0, info<5, warning<10, danger>=10)

Attestation log (/attest/log):
- Lists recent attestations with short ID, attestor, timestamp, SHA-256
- Shows total record count

Verified: full lifecycle works — attest image → verify same image → exact match found

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 17:02:49 -04:00

242 lines
8.8 KiB
Python

"""
Attestation blueprint — attest and verify images via Verisoo.
Wraps verisoo's attestation and verification libraries to provide:
- Image attestation: upload → hash → sign → store in append-only log
- Image verification: upload → hash → search log → display matches
"""
from __future__ import annotations
import io
from datetime import UTC, datetime
from pathlib import Path
from flask import Blueprint, flash, redirect, render_template, request, url_for
bp = Blueprint("attest", __name__)
def _get_storage():
"""Get verisoo LocalStorage pointed at soosef's attestation directory."""
from verisoo.storage import LocalStorage
from soosef.paths import ATTESTATIONS_DIR
return LocalStorage(base_path=ATTESTATIONS_DIR)
def _get_private_key():
"""Load the Ed25519 private key from soosef identity directory."""
from verisoo.crypto import load_private_key
from soosef.paths import IDENTITY_PRIVATE_KEY
if not IDENTITY_PRIVATE_KEY.exists():
return None
return load_private_key(IDENTITY_PRIVATE_KEY)
def _allowed_image(filename: str) -> bool:
if not filename or "." not in filename:
return False
return filename.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif", "webp", "tiff", "tif"}
@bp.route("/attest", methods=["GET", "POST"])
def attest():
"""Create a provenance attestation for an image."""
from auth import login_required as _lr
# Check identity exists
private_key = _get_private_key()
has_identity = private_key is not None
if request.method == "POST":
if not has_identity:
flash("No identity configured. Run 'soosef init' or generate one from the Keys page.", "error")
return redirect(url_for("attest.attest"))
image_file = request.files.get("image")
if not image_file or not image_file.filename:
flash("Please select an image to attest.", "error")
return redirect(url_for("attest.attest"))
if not _allowed_image(image_file.filename):
flash("Unsupported image format. Use PNG, JPG, WebP, TIFF, or BMP.", "error")
return redirect(url_for("attest.attest"))
try:
image_data = image_file.read()
# Build optional metadata
metadata = {}
caption = request.form.get("caption", "").strip()
location_name = request.form.get("location_name", "").strip()
if caption:
metadata["caption"] = caption
if location_name:
metadata["location_name"] = location_name
auto_exif = request.form.get("auto_exif", "on") == "on"
# Create the attestation
from verisoo.attestation import create_attestation
attestation = create_attestation(
image_data=image_data,
private_key=private_key,
metadata=metadata if metadata else None,
auto_exif=auto_exif,
)
# Store in the append-only log
storage = _get_storage()
index = storage.append_record(attestation.record)
# Save our own identity so we can look it up during verification
from verisoo.models import Identity
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
pub_key = private_key.public_key()
pub_bytes = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
identity = Identity(
public_key=pub_bytes,
fingerprint=attestation.record.attestor_fingerprint,
metadata={"name": "SooSeF Local Identity"},
)
try:
storage.save_identity(identity)
except Exception:
pass # May already exist
record = attestation.record
hashes = record.image_hashes
return render_template(
"attest/result.html",
success=True,
record_id=record.record_id,
short_id=record.short_id,
attestor=record.attestor_fingerprint,
timestamp=record.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC"),
sha256=hashes.sha256,
phash=hashes.phash,
dhash=hashes.dhash,
caption=metadata.get("caption", ""),
location_name=metadata.get("location_name", ""),
exif_metadata=record.metadata,
index=index,
filename=image_file.filename,
)
except Exception as e:
flash(f"Attestation failed: {e}", "error")
return redirect(url_for("attest.attest"))
return render_template("attest/attest.html", has_identity=has_identity)
@bp.route("/verify", methods=["GET", "POST"])
def verify():
"""Verify an image against attestation records."""
if request.method == "POST":
image_file = request.files.get("image")
if not image_file or not image_file.filename:
flash("Please select an image to verify.", "error")
return redirect(url_for("attest.verify"))
if not _allowed_image(image_file.filename):
flash("Unsupported image format.", "error")
return redirect(url_for("attest.verify"))
try:
image_data = image_file.read()
from verisoo.hashing import hash_image, compute_all_distances, is_same_image
# Compute hashes of the uploaded image
query_hashes = hash_image(image_data)
# Search the attestation log
storage = _get_storage()
stats = storage.get_stats()
if stats.record_count == 0:
return render_template(
"attest/verify_result.html",
found=False,
message="No attestations in the local log yet.",
query_hashes=query_hashes,
filename=image_file.filename,
matches=[],
)
# Search by SHA-256 first (exact match)
matches = []
exact_records = storage.get_records_by_image_sha256(query_hashes.sha256)
for record in exact_records:
matches.append({
"record": record,
"match_type": "exact",
"distances": {},
})
# Then search by perceptual hash if no exact match
if not matches and query_hashes.phash:
all_records = [storage.get_record(i) for i in range(stats.record_count)]
for record in all_records:
same, match_type = is_same_image(
query_hashes, record.image_hashes, perceptual_threshold=10
)
if same:
distances = compute_all_distances(query_hashes, record.image_hashes)
matches.append({
"record": record,
"match_type": match_type or "perceptual",
"distances": distances,
})
# Resolve attestor identities
for match in matches:
record = match["record"]
try:
identity = storage.load_identity(record.attestor_fingerprint)
match["attestor_name"] = identity.metadata.get("name", "Unknown") if identity else "Unknown"
except Exception:
match["attestor_name"] = "Unknown"
return render_template(
"attest/verify_result.html",
found=len(matches) > 0,
message=f"Found {len(matches)} matching attestation(s)." if matches else "No matching attestations found.",
query_hashes=query_hashes,
filename=image_file.filename,
matches=matches,
)
except Exception as e:
flash(f"Verification failed: {e}", "error")
return redirect(url_for("attest.verify"))
return render_template("attest/verify.html")
@bp.route("/attest/log")
def log():
"""List recent attestations."""
try:
storage = _get_storage()
stats = storage.get_stats()
records = []
# Show last 50 records, newest first
start = max(0, stats.record_count - 50)
for i in range(stats.record_count - 1, start - 1, -1):
try:
record = storage.get_record(i)
records.append({"index": i, "record": record})
except Exception:
continue
return render_template("attest/log.html", records=records, total=stats.record_count)
except Exception as e:
flash(f"Could not read attestation log: {e}", "error")
return render_template("attest/log.html", records=[], total=0)