fieldwitness/src/soosef/verisoo/models.py
Aaron D. Lee e3bc1cce1f Consolidate stegasoo and verisoo into soosef monorepo
Merge stegasoo (v4.3.0, steganography) and verisoo (v0.1.0, attestation)
as subpackages under soosef.stegasoo and soosef.verisoo. This eliminates
cross-repo coordination and enables atomic changes across the full stack.

- Copy stegasoo (34 modules) and verisoo (15 modules) into src/soosef/
- Convert all verisoo absolute imports to relative imports
- Rewire ~50 import sites across soosef code (cli, web, keystore, tests)
- Replace stegasoo/verisoo pip deps with inlined code + pip extras
  (stego-dct, stego-audio, attest, web, api, cli, fieldkit, all, dev)
- Add _availability.py for runtime feature detection
- Add unified FastAPI mount point at soosef.api
- Copy and adapt tests from both repos (155 pass, 1 skip)
- Drop standalone CLI/web frontends; keep FastAPI as optional modules
- Both source repos tagged pre-monorepo-consolidation on GitHub

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 19:06:14 -04:00

459 lines
14 KiB
Python

"""
Core data models for Verisoo.
Designed for the photographer provenance use case:
"I took this photo in Ukraine, not San Francisco - here's my cryptographic proof."
"""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
@dataclass(frozen=True)
class Identity:
"""
An attestor identity backed by Ed25519 keypair.
The fingerprint is the first 16 bytes of SHA-256(public_key), hex-encoded.
This provides a short, recognizable identifier without exposing the full key.
"""
public_key: bytes
fingerprint: str
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
metadata: dict[str, Any] = field(default_factory=dict)
@property
def display_name(self) -> str:
"""Human-readable name if set, otherwise short fingerprint."""
return self.metadata.get("name", f"{self.fingerprint[:8]}...")
def __str__(self) -> str:
return f"Identity({self.display_name})"
@dataclass(frozen=True)
class GeoLocation:
"""
Geographic location where image was captured.
GPS coordinates with optional accuracy and altitude.
"""
latitude: float # -90 to 90
longitude: float # -180 to 180
accuracy_meters: float | None = None # GPS accuracy
altitude_meters: float | None = None
location_name: str | None = None # "Kyiv, Ukraine" - human readable
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"lat": self.latitude,
"lon": self.longitude,
}
if self.accuracy_meters is not None:
d["accuracy"] = self.accuracy_meters
if self.altitude_meters is not None:
d["altitude"] = self.altitude_meters
if self.location_name:
d["name"] = self.location_name
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> GeoLocation:
return cls(
latitude=d["lat"],
longitude=d["lon"],
accuracy_meters=d.get("accuracy"),
altitude_meters=d.get("altitude"),
location_name=d.get("name"),
)
def __str__(self) -> str:
if self.location_name:
return f"{self.location_name} ({self.latitude:.4f}, {self.longitude:.4f})"
return f"({self.latitude:.4f}, {self.longitude:.4f})"
@dataclass(frozen=True)
class CaptureDevice:
"""
Information about the device that captured the image.
Helps establish authenticity - "taken with iPhone 15 Pro" vs "photoshopped".
"""
make: str | None = None # "Apple"
model: str | None = None # "iPhone 15 Pro"
software: str | None = None # "iOS 17.4"
serial_hash: str | None = None # Hash of device serial (privacy-preserving)
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {}
if self.make:
d["make"] = self.make
if self.model:
d["model"] = self.model
if self.software:
d["software"] = self.software
if self.serial_hash:
d["serial_hash"] = self.serial_hash
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> CaptureDevice:
return cls(
make=d.get("make"),
model=d.get("model"),
software=d.get("software"),
serial_hash=d.get("serial_hash"),
)
def __str__(self) -> str:
parts = [p for p in [self.make, self.model] if p]
return " ".join(parts) if parts else "Unknown device"
@dataclass(frozen=True)
class CaptureMetadata:
"""
Rich metadata about image capture for provenance.
This is what lets a photographer say "I took this in Kyiv, not San Francisco."
"""
# When was it actually captured (from device clock/EXIF, not attestation time)
captured_at: datetime | None = None
# Where was it captured
location: GeoLocation | None = None
# What device captured it
device: CaptureDevice | None = None
# Photographer's notes at capture time
caption: str | None = None
# Image technical details
width: int | None = None
height: int | None = None
mime_type: str | None = None
# Original filename
filename: str | None = None
# Free-form tags
tags: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {}
if self.captured_at:
d["captured_at"] = self.captured_at.isoformat()
if self.location:
d["location"] = self.location.to_dict()
if self.device:
d["device"] = self.device.to_dict()
if self.caption:
d["caption"] = self.caption
if self.width:
d["width"] = self.width
if self.height:
d["height"] = self.height
if self.mime_type:
d["mime_type"] = self.mime_type
if self.filename:
d["filename"] = self.filename
if self.tags:
d["tags"] = self.tags
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> CaptureMetadata:
return cls(
captured_at=datetime.fromisoformat(d["captured_at"]) if d.get("captured_at") else None,
location=GeoLocation.from_dict(d["location"]) if d.get("location") else None,
device=CaptureDevice.from_dict(d["device"]) if d.get("device") else None,
caption=d.get("caption"),
width=d.get("width"),
height=d.get("height"),
mime_type=d.get("mime_type"),
filename=d.get("filename"),
tags=d.get("tags", []),
)
@dataclass(frozen=True)
class ImageHashes:
"""
Multi-algorithm image fingerprinting for robust matching.
Designed to survive social media mangling:
- JPEG recompression
- Resizing
- Format conversion
- Cropping
- Color adjustments
Match if ANY hash is within threshold - defense in depth.
"""
sha256: str # Exact match only - rarely survives sharing
phash: str # DCT-based perceptual hash - survives compression
dhash: str # Difference hash - survives resizing
ahash: str | None = None # Average hash - very tolerant
colorhash: str | None = None # Color distribution - survives crops
crop_resistant: str | None = None # Center-region hash
def matches_exactly(self, other: ImageHashes) -> bool:
"""Exact byte-for-byte match."""
return self.sha256 == other.sha256
def matches_perceptually(
self,
other: ImageHashes,
threshold: int = 10,
) -> tuple[bool, str | None]:
"""
Check if images match perceptually.
Returns (matches, best_matching_algorithm).
Uses multiple algorithms - match if ANY passes.
"""
# Check each hash type, return first match
checks = [
("phash", self.phash, other.phash),
("dhash", self.dhash, other.dhash),
]
if self.ahash and other.ahash:
checks.append(("ahash", self.ahash, other.ahash))
if self.colorhash and other.colorhash:
checks.append(("colorhash", self.colorhash, other.colorhash))
if self.crop_resistant and other.crop_resistant:
checks.append(("crop_resistant", self.crop_resistant, other.crop_resistant))
for name, h1, h2 in checks:
if h1 and h2:
distance = _hamming_distance(h1, h2)
if distance <= threshold:
return True, name
return False, None
def to_dict(self) -> dict[str, Any]:
d = {
"sha256": self.sha256,
"phash": self.phash,
"dhash": self.dhash,
}
if self.ahash:
d["ahash"] = self.ahash
if self.colorhash:
d["colorhash"] = self.colorhash
if self.crop_resistant:
d["crop_resistant"] = self.crop_resistant
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> ImageHashes:
return cls(
sha256=d["sha256"],
phash=d["phash"],
dhash=d["dhash"],
ahash=d.get("ahash"),
colorhash=d.get("colorhash"),
crop_resistant=d.get("crop_resistant"),
)
@dataclass(frozen=True)
class AttestationRecord:
"""
The core attestation record stored in the append-only log.
This is the cryptographic proof that a specific image existed
at a specific time, attested by a specific identity, with specific metadata.
Once in the log, it cannot be modified or deleted.
"""
image_hashes: ImageHashes
signature: bytes
attestor_fingerprint: str
timestamp: datetime # When attestation was created
metadata: dict[str, Any] = field(default_factory=dict) # CaptureMetadata.to_dict()
@property
def record_id(self) -> str:
"""
Unique identifier for this record.
SHA-256 of (sha256 || attestor_fingerprint || timestamp_iso)[:32].
Deterministic, collision-resistant, URL-safe.
"""
content = f"{self.image_hashes.sha256}|{self.attestor_fingerprint}|{self.timestamp.isoformat()}"
return hashlib.sha256(content.encode()).hexdigest()[:32]
@property
def short_id(self) -> str:
"""Short ID for display/URLs (first 12 chars)."""
return self.record_id[:12]
@property
def capture_metadata(self) -> CaptureMetadata | None:
"""Parse metadata as CaptureMetadata if present."""
if not self.metadata:
return None
try:
return CaptureMetadata.from_dict(self.metadata)
except (KeyError, TypeError):
return None
@property
def location(self) -> GeoLocation | None:
"""Shortcut to capture location."""
cm = self.capture_metadata
return cm.location if cm else None
@property
def captured_at(self) -> datetime | None:
"""Shortcut to capture time (may differ from attestation time)."""
cm = self.capture_metadata
return cm.captured_at if cm else None
def to_bytes(self) -> bytes:
"""Serialize for signing/hashing."""
data = {
"hashes": self.image_hashes.to_dict(),
"attestor": self.attestor_fingerprint,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
}
return json.dumps(data, sort_keys=True, separators=(",", ":")).encode()
@classmethod
def from_bytes(cls, data: bytes, signature: bytes) -> AttestationRecord:
"""Deserialize from wire format."""
obj = json.loads(data.decode())
return cls(
image_hashes=ImageHashes.from_dict(obj["hashes"]),
signature=signature,
attestor_fingerprint=obj["attestor"],
timestamp=datetime.fromisoformat(obj["timestamp"]),
metadata=obj.get("metadata", {}),
)
@dataclass
class Attestation:
"""
Full attestation including the image data (for creation/verification).
This is the "working" object - AttestationRecord is what gets stored/transmitted.
"""
image_data: bytes
image_hashes: ImageHashes
record: AttestationRecord | None = None
@property
def is_attested(self) -> bool:
return self.record is not None
@dataclass(frozen=True)
class VerificationResult:
"""
Result of verifying an image against attestation records.
Tells you: "Yes, this image was attested by @photographer in Kyiv on 2024-03-15"
"""
is_valid: bool
match_type: str | None # "exact", "phash", "dhash", etc.
attestor: Identity | None
record: AttestationRecord | None
error: str | None = None
# Match quality details
hash_distances: dict[str, int] = field(default_factory=dict)
@property
def location(self) -> GeoLocation | None:
"""Where was the original image taken?"""
return self.record.location if self.record else None
@property
def captured_at(self) -> datetime | None:
"""When was the original image captured?"""
return self.record.captured_at if self.record else None
@property
def attested_at(self) -> datetime | None:
"""When was the attestation created?"""
return self.record.timestamp if self.record else None
def summary(self) -> str:
"""Human-readable summary for display."""
if not self.is_valid:
return f"Not verified: {self.error or 'No matching attestation found'}"
parts = []
if self.attestor:
parts.append(f"Attested by {self.attestor.display_name}")
if self.attested_at:
parts.append(f"on {self.attested_at.strftime('%Y-%m-%d')}")
if self.location:
parts.append(f"in {self.location}")
if self.match_type and self.match_type != "exact":
parts.append(f"({self.match_type} match)")
return " ".join(parts) if parts else "Verified"
@dataclass(frozen=True)
class ProofLink:
"""
A shareable link to an attestation proof.
Photographers can share these to prove provenance:
"Here's proof I took this photo: verisoo.io/v/a8f3c2d1e9b7"
"""
record_id: str
base_url: str = "https://verisoo.io"
@property
def short_id(self) -> str:
return self.record_id[:12]
@property
def url(self) -> str:
return f"{self.base_url}/v/{self.short_id}"
@property
def full_url(self) -> str:
return f"{self.base_url}/verify/{self.record_id}"
def __str__(self) -> str:
return self.url
def _hamming_distance(hash1: str, hash2: str) -> int:
"""Compute hamming distance between two hex-encoded hashes."""
# Handle different length hashes by padding shorter one
if len(hash1) != len(hash2):
max_len = max(len(hash1), len(hash2))
hash1 = hash1.zfill(max_len)
hash2 = hash2.zfill(max_len)
# Convert hex to int, XOR, count bits
val1 = int(hash1, 16)
val2 = int(hash2, 16)
return bin(val1 ^ val2).count("1")