""" Attestation blueprint — attest and verify images via Verisoo. Wraps verisoo's attestation and verification libraries to provide: - Image attestation: upload → hash → sign → store in append-only log - Image verification: upload → hash → search log → display matches """ from __future__ import annotations import io from datetime import UTC, datetime from pathlib import Path from flask import Blueprint, flash, redirect, render_template, request, url_for bp = Blueprint("attest", __name__) def _get_storage(): """Get verisoo LocalStorage pointed at soosef's attestation directory.""" from verisoo.storage import LocalStorage from soosef.paths import ATTESTATIONS_DIR return LocalStorage(base_path=ATTESTATIONS_DIR) def _get_private_key(): """Load the Ed25519 private key from soosef identity directory.""" from verisoo.crypto import load_private_key from soosef.paths import IDENTITY_PRIVATE_KEY if not IDENTITY_PRIVATE_KEY.exists(): return None return load_private_key(IDENTITY_PRIVATE_KEY) def _allowed_image(filename: str) -> bool: if not filename or "." not in filename: return False return filename.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif", "webp", "tiff", "tif"} @bp.route("/attest", methods=["GET", "POST"]) def attest(): """Create a provenance attestation for an image.""" from auth import login_required as _lr # Check identity exists private_key = _get_private_key() has_identity = private_key is not None if request.method == "POST": if not has_identity: flash("No identity configured. Run 'soosef init' or generate one from the Keys page.", "error") return redirect(url_for("attest.attest")) image_file = request.files.get("image") if not image_file or not image_file.filename: flash("Please select an image to attest.", "error") return redirect(url_for("attest.attest")) if not _allowed_image(image_file.filename): flash("Unsupported image format. Use PNG, JPG, WebP, TIFF, or BMP.", "error") return redirect(url_for("attest.attest")) try: image_data = image_file.read() # Build optional metadata metadata = {} caption = request.form.get("caption", "").strip() location_name = request.form.get("location_name", "").strip() if caption: metadata["caption"] = caption if location_name: metadata["location_name"] = location_name auto_exif = request.form.get("auto_exif", "on") == "on" # Create the attestation from verisoo.attestation import create_attestation attestation = create_attestation( image_data=image_data, private_key=private_key, metadata=metadata if metadata else None, auto_exif=auto_exif, ) # Store in the append-only log storage = _get_storage() index = storage.append_record(attestation.record) # Save our own identity so we can look it up during verification from verisoo.models import Identity from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat pub_key = private_key.public_key() pub_bytes = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw) identity = Identity( public_key=pub_bytes, fingerprint=attestation.record.attestor_fingerprint, metadata={"name": "SooSeF Local Identity"}, ) try: storage.save_identity(identity) except Exception: pass # May already exist record = attestation.record hashes = record.image_hashes return render_template( "attest/result.html", success=True, record_id=record.record_id, short_id=record.short_id, attestor=record.attestor_fingerprint, timestamp=record.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC"), sha256=hashes.sha256, phash=hashes.phash, dhash=hashes.dhash, caption=metadata.get("caption", ""), location_name=metadata.get("location_name", ""), exif_metadata=record.metadata, index=index, filename=image_file.filename, ) except Exception as e: flash(f"Attestation failed: {e}", "error") return redirect(url_for("attest.attest")) return render_template("attest/attest.html", has_identity=has_identity) @bp.route("/verify", methods=["GET", "POST"]) def verify(): """Verify an image against attestation records.""" if request.method == "POST": image_file = request.files.get("image") if not image_file or not image_file.filename: flash("Please select an image to verify.", "error") return redirect(url_for("attest.verify")) if not _allowed_image(image_file.filename): flash("Unsupported image format.", "error") return redirect(url_for("attest.verify")) try: image_data = image_file.read() from verisoo.hashing import hash_image, compute_all_distances, is_same_image # Compute hashes of the uploaded image query_hashes = hash_image(image_data) # Search the attestation log storage = _get_storage() stats = storage.get_stats() if stats.record_count == 0: return render_template( "attest/verify_result.html", found=False, message="No attestations in the local log yet.", query_hashes=query_hashes, filename=image_file.filename, matches=[], ) # Search by SHA-256 first (exact match) matches = [] exact_records = storage.get_records_by_image_sha256(query_hashes.sha256) for record in exact_records: matches.append({ "record": record, "match_type": "exact", "distances": {}, }) # Then search by perceptual hash if no exact match if not matches and query_hashes.phash: all_records = [storage.get_record(i) for i in range(stats.record_count)] for record in all_records: same, match_type = is_same_image( query_hashes, record.image_hashes, perceptual_threshold=10 ) if same: distances = compute_all_distances(query_hashes, record.image_hashes) matches.append({ "record": record, "match_type": match_type or "perceptual", "distances": distances, }) # Resolve attestor identities for match in matches: record = match["record"] try: identity = storage.load_identity(record.attestor_fingerprint) match["attestor_name"] = identity.metadata.get("name", "Unknown") if identity else "Unknown" except Exception: match["attestor_name"] = "Unknown" return render_template( "attest/verify_result.html", found=len(matches) > 0, message=f"Found {len(matches)} matching attestation(s)." if matches else "No matching attestations found.", query_hashes=query_hashes, filename=image_file.filename, matches=matches, ) except Exception as e: flash(f"Verification failed: {e}", "error") return redirect(url_for("attest.verify")) return render_template("attest/verify.html") @bp.route("/attest/log") def log(): """List recent attestations.""" try: storage = _get_storage() stats = storage.get_stats() records = [] # Show last 50 records, newest first start = max(0, stats.record_count - 50) for i in range(stats.record_count - 1, start - 1, -1): try: record = storage.get_record(i) records.append({"index": i, "record": record}) except Exception: continue return render_template("attest/log.html", records=records, total=stats.record_count) except Exception as e: flash(f"Could not read attestation log: {e}", "error") return render_template("attest/log.html", records=[], total=0)