""" Attestation blueprint — attest and verify images via Verisoo. Wraps verisoo's attestation and verification libraries to provide: - Image attestation: upload → hash → sign → store in append-only log - Image verification: upload → hash → search log → display matches - Verification receipt: same as verify but returns a downloadable JSON file """ from __future__ import annotations import hashlib import json import socket from datetime import UTC, datetime from auth import login_required from flask import Blueprint, Response, flash, redirect, render_template, request, url_for bp = Blueprint("attest", __name__) def _get_storage(): """Get verisoo LocalStorage pointed at soosef's attestation directory.""" from soosef.verisoo.storage import LocalStorage from soosef.paths import ATTESTATIONS_DIR return LocalStorage(base_path=ATTESTATIONS_DIR) def _get_private_key(): """Load the Ed25519 private key from soosef identity directory.""" from soosef.verisoo.crypto import load_private_key from soosef.paths import IDENTITY_PRIVATE_KEY if not IDENTITY_PRIVATE_KEY.exists(): return None return load_private_key(IDENTITY_PRIVATE_KEY) def _wrap_in_chain(verisoo_record, private_key, metadata: dict | None = None): """Wrap a Verisoo attestation record in the hash chain. Returns the chain record, or None if chain is disabled. """ import hashlib from cryptography.hazmat.primitives.serialization import load_pem_private_key from soosef.config import SoosefConfig from soosef.federation.chain import ChainStore from soosef.paths import CHAIN_DIR, IDENTITY_PRIVATE_KEY config = SoosefConfig.load() if not config.chain_enabled or not config.chain_auto_wrap: return None # Hash the verisoo record bytes as chain content record_bytes = ( verisoo_record.to_bytes() if hasattr(verisoo_record, "to_bytes") else str(verisoo_record).encode() ) content_hash = hashlib.sha256(record_bytes).digest() # Load Ed25519 key for chain signing (need the cryptography key, not verisoo's) priv_pem = IDENTITY_PRIVATE_KEY.read_bytes() chain_private_key = load_pem_private_key(priv_pem, password=None) chain_metadata = {} if metadata: if "caption" in metadata: chain_metadata["caption"] = metadata["caption"] if "location_name" in metadata: chain_metadata["location"] = metadata["location_name"] store = ChainStore(CHAIN_DIR) return store.append( content_hash=content_hash, content_type="verisoo/attestation-v1", private_key=chain_private_key, metadata=chain_metadata, ) def _allowed_image(filename: str) -> bool: if not filename or "." not in filename: return False return filename.rsplit(".", 1)[1].lower() in { "png", "jpg", "jpeg", "bmp", "gif", "webp", "tiff", "tif", } @bp.route("/attest", methods=["GET", "POST"]) @login_required def attest(): """Create a provenance attestation for an image.""" # Check identity exists private_key = _get_private_key() has_identity = private_key is not None if request.method == "POST": if not has_identity: flash( "No identity configured. Run 'soosef init' or generate one from the Keys page.", "error", ) return redirect(url_for("attest.attest")) image_file = request.files.get("image") if not image_file or not image_file.filename: flash("Please select an image to attest.", "error") return redirect(url_for("attest.attest")) if not _allowed_image(image_file.filename): flash("Unsupported image format. Use PNG, JPG, WebP, TIFF, or BMP.", "error") return redirect(url_for("attest.attest")) try: image_data = image_file.read() # Build optional metadata metadata = {} caption = request.form.get("caption", "").strip() location_name = request.form.get("location_name", "").strip() if caption: metadata["caption"] = caption if location_name: metadata["location_name"] = location_name auto_exif = request.form.get("auto_exif", "on") == "on" # Create the attestation from soosef.verisoo.attestation import create_attestation attestation = create_attestation( image_data=image_data, private_key=private_key, metadata=metadata if metadata else None, auto_exif=auto_exif, ) # Store in the append-only log storage = _get_storage() index = storage.append_record(attestation.record) # Wrap in hash chain if enabled chain_record = None try: chain_record = _wrap_in_chain(attestation.record, private_key, metadata) except Exception as e: import logging logging.getLogger(__name__).warning("Chain wrapping failed: %s", e) flash( "Attestation saved, but chain wrapping failed. " "Check chain configuration.", "warning", ) # Save our own identity so we can look it up during verification from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from soosef.verisoo.models import Identity pub_key = private_key.public_key() pub_bytes = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw) identity = Identity( public_key=pub_bytes, fingerprint=attestation.record.attestor_fingerprint, metadata={"name": "SooSeF Local Identity"}, ) try: storage.save_identity(identity) except Exception: pass # May already exist record = attestation.record hashes = record.image_hashes return render_template( "attest/result.html", success=True, record_id=record.record_id, short_id=record.short_id, attestor=record.attestor_fingerprint, timestamp=record.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC"), sha256=hashes.sha256, phash=hashes.phash, dhash=hashes.dhash, caption=metadata.get("caption", ""), location_name=metadata.get("location_name", ""), exif_metadata=record.metadata, index=index, filename=image_file.filename, chain_index=chain_record.chain_index if chain_record else None, ) except Exception as e: flash(f"Attestation failed: {e}", "error") return redirect(url_for("attest.attest")) return render_template("attest/attest.html", has_identity=has_identity) @bp.route("/attest/batch", methods=["POST"]) @login_required def attest_batch(): """Batch attestation — accepts multiple image files. Returns JSON with results for each file (success/skip/error). Skips images already attested (by SHA-256 match). """ import hashlib from soosef.verisoo.hashing import hash_image private_key = _get_private_key() if private_key is None: return {"error": "No identity key. Run soosef init first."}, 400 files = request.files.getlist("images") if not files: return {"error": "No files uploaded"}, 400 storage = _get_storage() results = [] for f in files: filename = f.filename or "unknown" try: image_data = f.read() sha256 = hashlib.sha256(image_data).hexdigest() # Skip already-attested images existing = storage.get_records_by_image_sha256(sha256) if existing: results.append({"file": filename, "status": "skipped", "reason": "already attested"}) continue from soosef.verisoo.attestation import create_attestation attestation = create_attestation(image_data, private_key) index = storage.append_record(attestation.record) # Wrap in chain if enabled chain_index = None config = request.app.config.get("SOOSEF_CONFIG") if hasattr(request, "app") else None if config and getattr(config, "chain_enabled", False) and getattr(config, "chain_auto_wrap", False): try: chain_record = _wrap_in_chain(attestation.record, private_key, {}) chain_index = chain_record.chain_index except Exception: pass results.append({ "file": filename, "status": "attested", "record_id": attestation.record.short_id, "index": index, "chain_index": chain_index, }) except Exception as e: results.append({"file": filename, "status": "error", "error": str(e)}) attested = sum(1 for r in results if r["status"] == "attested") skipped = sum(1 for r in results if r["status"] == "skipped") errors = sum(1 for r in results if r["status"] == "error") return { "total": len(results), "attested": attested, "skipped": skipped, "errors": errors, "results": results, } @bp.route("/verify/batch", methods=["POST"]) @login_required def verify_batch(): """Batch verification — accepts multiple image files. Returns JSON with per-file verification results. Uses SHA-256 fast path before falling back to perceptual scan. """ files = request.files.getlist("images") if not files: return {"error": "No files uploaded"}, 400 results = [] for f in files: filename = f.filename or "unknown" try: image_data = f.read() result = _verify_image(image_data) if result["matches"]: best = result["matches"][0] results.append({ "file": filename, "status": "verified", "match_type": best["match_type"], "record_id": best["record"].short_id if hasattr(best["record"], "short_id") else "unknown", "matches": len(result["matches"]), }) else: results.append({"file": filename, "status": "unverified", "matches": 0}) except Exception as e: results.append({"file": filename, "status": "error", "error": str(e)}) verified = sum(1 for r in results if r["status"] == "verified") unverified = sum(1 for r in results if r["status"] == "unverified") errors = sum(1 for r in results if r["status"] == "error") # Count by match type exact = sum(1 for r in results if r.get("match_type") == "exact") perceptual = verified - exact return { "total": len(results), "verified": verified, "verified_exact": exact, "verified_perceptual": perceptual, "unverified": unverified, "errors": errors, "results": results, } def _verify_image(image_data: bytes) -> dict: """Run the full verification pipeline against the attestation log. Returns a dict with keys: query_hashes — ImageHashes object from verisoo matches — list of match dicts (record, match_type, distances, attestor_name) record_count — total records searched """ from soosef.verisoo.hashing import compute_all_distances, hash_image, is_same_image query_hashes = hash_image(image_data) storage = _get_storage() stats = storage.get_stats() if stats.record_count == 0: return {"query_hashes": query_hashes, "matches": [], "record_count": 0} # Exact SHA-256 match first matches = [] exact_records = storage.get_records_by_image_sha256(query_hashes.sha256) for record in exact_records: matches.append({"record": record, "match_type": "exact", "distances": {}}) # Perceptual fallback if not matches and query_hashes.phash: all_records = [storage.get_record(i) for i in range(stats.record_count)] for record in all_records: same, match_type = is_same_image( query_hashes, record.image_hashes, perceptual_threshold=10 ) if same: distances = compute_all_distances(query_hashes, record.image_hashes) matches.append( { "record": record, "match_type": match_type or "perceptual", "distances": distances, } ) # Resolve attestor identities for match in matches: try: identity = storage.load_identity(match["record"].attestor_fingerprint) match["attestor_name"] = ( identity.metadata.get("name", "Unknown") if identity else "Unknown" ) except Exception: match["attestor_name"] = "Unknown" return {"query_hashes": query_hashes, "matches": matches, "record_count": stats.record_count} @bp.route("/verify", methods=["GET", "POST"]) def verify(): """Verify an image against attestation records. Intentionally unauthenticated: third parties (editors, fact-checkers, courts) must be able to verify provenance without having an account on this instance. The log read here is read-only and reveals no key material. """ if request.method == "POST": image_file = request.files.get("image") if not image_file or not image_file.filename: flash("Please select an image to verify.", "error") return redirect(url_for("attest.verify")) if not _allowed_image(image_file.filename): flash("Unsupported image format.", "error") return redirect(url_for("attest.verify")) try: result = _verify_image(image_file.read()) query_hashes = result["query_hashes"] matches = result["matches"] if result["record_count"] == 0: return render_template( "attest/verify_result.html", found=False, message="No attestations in the local log yet.", query_hashes=query_hashes, filename=image_file.filename, matches=[], ) return render_template( "attest/verify_result.html", found=len(matches) > 0, message=( f"Found {len(matches)} matching attestation(s)." if matches else "No matching attestations found." ), query_hashes=query_hashes, filename=image_file.filename, matches=matches, ) except Exception as e: flash(f"Verification failed: {e}", "error") return redirect(url_for("attest.verify")) return render_template("attest/verify.html") @bp.route("/verify/receipt", methods=["POST"]) def verify_receipt(): """Return a downloadable JSON verification receipt for court or legal use. Accepts the same image upload as /verify. Returns a JSON file attachment containing image hashes, all matching attestation records with full metadata, the verification timestamp, and the verifier hostname. Intentionally unauthenticated — same access policy as /verify. """ image_file = request.files.get("image") if not image_file or not image_file.filename: return Response( json.dumps({"error": "No image provided"}), status=400, mimetype="application/json", ) if not _allowed_image(image_file.filename): return Response( json.dumps({"error": "Unsupported image format"}), status=400, mimetype="application/json", ) try: result = _verify_image(image_file.read()) except Exception as e: return Response( json.dumps({"error": f"Verification failed: {e}"}), status=500, mimetype="application/json", ) query_hashes = result["query_hashes"] matches = result["matches"] verification_ts = datetime.now(UTC).isoformat() try: verifier_instance = socket.gethostname() except Exception: verifier_instance = "unknown" matching_records = [] for match in matches: record = match["record"] rec_entry: dict = { "match_type": match["match_type"], "attestor_fingerprint": record.attestor_fingerprint, "attestor_name": match.get("attestor_name", "Unknown"), "attested_at": record.timestamp.isoformat() if record.timestamp else None, "record_id": str(record.record_id), "short_id": str(record.short_id) if hasattr(record, "short_id") else None, } # Include perceptual hash distances when present (perceptual matches only) if match.get("distances"): rec_entry["hash_distances"] = {k: int(v) for k, v in match["distances"].items()} # Optional fields if getattr(record, "captured_at", None): rec_entry["captured_at"] = record.captured_at.isoformat() if getattr(record, "location", None): rec_entry["location"] = record.location if getattr(record, "metadata", None): # Exclude any key material — only human-readable metadata safe_meta = { k: v for k, v in record.metadata.items() if k in ("caption", "location_name", "device", "software") } if safe_meta: rec_entry["metadata"] = safe_meta # Chain position proof — look up this attestation in the hash chain try: from soosef.config import SoosefConfig from soosef.federation.chain import ChainStore from soosef.federation.serialization import compute_record_hash from soosef.paths import CHAIN_DIR chain_config = SoosefConfig.load() if chain_config.chain_enabled: chain_store = ChainStore(CHAIN_DIR) # Search chain for a record whose content_hash matches this attestation content_hash_hex = getattr(record, "image_hashes", None) if content_hash_hex and hasattr(content_hash_hex, "sha256"): target_sha = content_hash_hex.sha256 for chain_rec in chain_store: if chain_rec.content_hash.hex() == target_sha or chain_rec.metadata.get("attestor") == record.attestor_fingerprint: rec_entry["chain_proof"] = { "chain_id": chain_store.state().chain_id.hex() if chain_store.state() else None, "chain_index": chain_rec.chain_index, "prev_hash": chain_rec.prev_hash.hex(), "record_hash": compute_record_hash(chain_rec).hex(), "content_type": chain_rec.content_type, "claimed_ts": chain_rec.claimed_ts, } break except Exception: pass # Chain proof is optional — don't fail the receipt matching_records.append(rec_entry) receipt = { "schema_version": "3", "verification_timestamp": verification_ts, "verifier_instance": verifier_instance, "queried_filename": image_file.filename, "image_hash": { "sha256": query_hashes.sha256, "phash": query_hashes.phash, "dhash": getattr(query_hashes, "dhash", None), }, "records_searched": result["record_count"], "matches_found": len(matching_records), "matching_records": matching_records, } # Sign the receipt with the instance's Ed25519 identity key private_key = _get_private_key() if private_key is not None: from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat pub_bytes = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) receipt["verifier_fingerprint"] = hashlib.sha256(pub_bytes).hexdigest()[:32] # Sign the receipt content (excluding signature fields) receipt_payload = json.dumps(receipt, sort_keys=True, ensure_ascii=False).encode() sig = private_key.sign(receipt_payload) receipt["signature"] = sig.hex() receipt["verifier_pubkey"] = pub_bytes.hex() receipt_json = json.dumps(receipt, indent=2, ensure_ascii=False) safe_filename = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) download_name = f"receipt_{safe_filename}_{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}.json" return Response( receipt_json, status=200, mimetype="application/json", headers={"Content-Disposition": f'attachment; filename="{download_name}"'}, ) @bp.route("/attest/log") @login_required def log(): """List recent attestations.""" try: storage = _get_storage() stats = storage.get_stats() records = [] # Show last 50 records, newest first start = max(0, stats.record_count - 50) for i in range(stats.record_count - 1, start - 1, -1): try: record = storage.get_record(i) records.append({"index": i, "record": record}) except Exception: continue return render_template("attest/log.html", records=records, total=stats.record_count) except Exception as e: flash(f"Could not read attestation log: {e}", "error") return render_template("attest/log.html", records=[], total=0)