Attest page (/attest): - Image upload with optional caption and location - EXIF auto-extraction toggle - Creates Ed25519-signed attestation record - Stores in verisoo append-only binary log + LMDB index - Displays: record ID, attestor fingerprint, timestamp, image hashes Verify page (/verify): - Image upload for verification against local attestation log - SHA-256 exact matching + perceptual hash matching (pHash, dHash) - Shows match type (exact/perceptual), hash distances, attestor info - Color-coded distance badges (green=0, info<5, warning<10, danger>=10) Attestation log (/attest/log): - Lists recent attestations with short ID, attestor, timestamp, SHA-256 - Shows total record count Verified: full lifecycle works — attest image → verify same image → exact match found Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
242 lines
8.8 KiB
Python
242 lines
8.8 KiB
Python
"""
|
|
Attestation blueprint — attest and verify images via Verisoo.
|
|
|
|
Wraps verisoo's attestation and verification libraries to provide:
|
|
- Image attestation: upload → hash → sign → store in append-only log
|
|
- Image verification: upload → hash → search log → display matches
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
|
|
from flask import Blueprint, flash, redirect, render_template, request, url_for
|
|
|
|
bp = Blueprint("attest", __name__)
|
|
|
|
|
|
def _get_storage():
|
|
"""Get verisoo LocalStorage pointed at soosef's attestation directory."""
|
|
from verisoo.storage import LocalStorage
|
|
from soosef.paths import ATTESTATIONS_DIR
|
|
|
|
return LocalStorage(base_path=ATTESTATIONS_DIR)
|
|
|
|
|
|
def _get_private_key():
|
|
"""Load the Ed25519 private key from soosef identity directory."""
|
|
from verisoo.crypto import load_private_key
|
|
from soosef.paths import IDENTITY_PRIVATE_KEY
|
|
|
|
if not IDENTITY_PRIVATE_KEY.exists():
|
|
return None
|
|
return load_private_key(IDENTITY_PRIVATE_KEY)
|
|
|
|
|
|
def _allowed_image(filename: str) -> bool:
|
|
if not filename or "." not in filename:
|
|
return False
|
|
return filename.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif", "webp", "tiff", "tif"}
|
|
|
|
|
|
@bp.route("/attest", methods=["GET", "POST"])
|
|
def attest():
|
|
"""Create a provenance attestation for an image."""
|
|
from auth import login_required as _lr
|
|
|
|
# Check identity exists
|
|
private_key = _get_private_key()
|
|
has_identity = private_key is not None
|
|
|
|
if request.method == "POST":
|
|
if not has_identity:
|
|
flash("No identity configured. Run 'soosef init' or generate one from the Keys page.", "error")
|
|
return redirect(url_for("attest.attest"))
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file or not image_file.filename:
|
|
flash("Please select an image to attest.", "error")
|
|
return redirect(url_for("attest.attest"))
|
|
|
|
if not _allowed_image(image_file.filename):
|
|
flash("Unsupported image format. Use PNG, JPG, WebP, TIFF, or BMP.", "error")
|
|
return redirect(url_for("attest.attest"))
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
|
|
# Build optional metadata
|
|
metadata = {}
|
|
caption = request.form.get("caption", "").strip()
|
|
location_name = request.form.get("location_name", "").strip()
|
|
if caption:
|
|
metadata["caption"] = caption
|
|
if location_name:
|
|
metadata["location_name"] = location_name
|
|
|
|
auto_exif = request.form.get("auto_exif", "on") == "on"
|
|
|
|
# Create the attestation
|
|
from verisoo.attestation import create_attestation
|
|
|
|
attestation = create_attestation(
|
|
image_data=image_data,
|
|
private_key=private_key,
|
|
metadata=metadata if metadata else None,
|
|
auto_exif=auto_exif,
|
|
)
|
|
|
|
# Store in the append-only log
|
|
storage = _get_storage()
|
|
index = storage.append_record(attestation.record)
|
|
|
|
# Save our own identity so we can look it up during verification
|
|
from verisoo.models import Identity
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
|
|
|
|
pub_key = private_key.public_key()
|
|
pub_bytes = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
|
|
identity = Identity(
|
|
public_key=pub_bytes,
|
|
fingerprint=attestation.record.attestor_fingerprint,
|
|
metadata={"name": "SooSeF Local Identity"},
|
|
)
|
|
try:
|
|
storage.save_identity(identity)
|
|
except Exception:
|
|
pass # May already exist
|
|
|
|
record = attestation.record
|
|
hashes = record.image_hashes
|
|
|
|
return render_template(
|
|
"attest/result.html",
|
|
success=True,
|
|
record_id=record.record_id,
|
|
short_id=record.short_id,
|
|
attestor=record.attestor_fingerprint,
|
|
timestamp=record.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC"),
|
|
sha256=hashes.sha256,
|
|
phash=hashes.phash,
|
|
dhash=hashes.dhash,
|
|
caption=metadata.get("caption", ""),
|
|
location_name=metadata.get("location_name", ""),
|
|
exif_metadata=record.metadata,
|
|
index=index,
|
|
filename=image_file.filename,
|
|
)
|
|
|
|
except Exception as e:
|
|
flash(f"Attestation failed: {e}", "error")
|
|
return redirect(url_for("attest.attest"))
|
|
|
|
return render_template("attest/attest.html", has_identity=has_identity)
|
|
|
|
|
|
@bp.route("/verify", methods=["GET", "POST"])
|
|
def verify():
|
|
"""Verify an image against attestation records."""
|
|
if request.method == "POST":
|
|
image_file = request.files.get("image")
|
|
if not image_file or not image_file.filename:
|
|
flash("Please select an image to verify.", "error")
|
|
return redirect(url_for("attest.verify"))
|
|
|
|
if not _allowed_image(image_file.filename):
|
|
flash("Unsupported image format.", "error")
|
|
return redirect(url_for("attest.verify"))
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
|
|
from verisoo.hashing import hash_image, compute_all_distances, is_same_image
|
|
|
|
# Compute hashes of the uploaded image
|
|
query_hashes = hash_image(image_data)
|
|
|
|
# Search the attestation log
|
|
storage = _get_storage()
|
|
stats = storage.get_stats()
|
|
|
|
if stats.record_count == 0:
|
|
return render_template(
|
|
"attest/verify_result.html",
|
|
found=False,
|
|
message="No attestations in the local log yet.",
|
|
query_hashes=query_hashes,
|
|
filename=image_file.filename,
|
|
matches=[],
|
|
)
|
|
|
|
# Search by SHA-256 first (exact match)
|
|
matches = []
|
|
exact_records = storage.get_records_by_image_sha256(query_hashes.sha256)
|
|
for record in exact_records:
|
|
matches.append({
|
|
"record": record,
|
|
"match_type": "exact",
|
|
"distances": {},
|
|
})
|
|
|
|
# Then search by perceptual hash if no exact match
|
|
if not matches and query_hashes.phash:
|
|
all_records = [storage.get_record(i) for i in range(stats.record_count)]
|
|
for record in all_records:
|
|
same, match_type = is_same_image(
|
|
query_hashes, record.image_hashes, perceptual_threshold=10
|
|
)
|
|
if same:
|
|
distances = compute_all_distances(query_hashes, record.image_hashes)
|
|
matches.append({
|
|
"record": record,
|
|
"match_type": match_type or "perceptual",
|
|
"distances": distances,
|
|
})
|
|
|
|
# Resolve attestor identities
|
|
for match in matches:
|
|
record = match["record"]
|
|
try:
|
|
identity = storage.load_identity(record.attestor_fingerprint)
|
|
match["attestor_name"] = identity.metadata.get("name", "Unknown") if identity else "Unknown"
|
|
except Exception:
|
|
match["attestor_name"] = "Unknown"
|
|
|
|
return render_template(
|
|
"attest/verify_result.html",
|
|
found=len(matches) > 0,
|
|
message=f"Found {len(matches)} matching attestation(s)." if matches else "No matching attestations found.",
|
|
query_hashes=query_hashes,
|
|
filename=image_file.filename,
|
|
matches=matches,
|
|
)
|
|
|
|
except Exception as e:
|
|
flash(f"Verification failed: {e}", "error")
|
|
return redirect(url_for("attest.verify"))
|
|
|
|
return render_template("attest/verify.html")
|
|
|
|
|
|
@bp.route("/attest/log")
|
|
def log():
|
|
"""List recent attestations."""
|
|
try:
|
|
storage = _get_storage()
|
|
stats = storage.get_stats()
|
|
records = []
|
|
# Show last 50 records, newest first
|
|
start = max(0, stats.record_count - 50)
|
|
for i in range(stats.record_count - 1, start - 1, -1):
|
|
try:
|
|
record = storage.get_record(i)
|
|
records.append({"index": i, "record": record})
|
|
except Exception:
|
|
continue
|
|
return render_template("attest/log.html", records=records, total=stats.record_count)
|
|
except Exception as e:
|
|
flash(f"Could not read attestation log: {e}", "error")
|
|
return render_template("attest/log.html", records=[], total=0)
|