diff --git a/frontends/web/blueprints/attest.py b/frontends/web/blueprints/attest.py index d266945..06daa7b 100644 --- a/frontends/web/blueprints/attest.py +++ b/frontends/web/blueprints/attest.py @@ -141,6 +141,24 @@ def attest(): metadata["investigation"] = investigation auto_exif = request.form.get("auto_exif", "on") == "on" + strip_device = request.form.get("strip_device", "on") == "on" + + # Extract-then-classify: get evidentiary metadata before attestation + # so user can control what's included + if auto_exif and strip_device: + from soosef.metadata import extract_and_classify + + extraction = extract_and_classify(image_data) + # Merge evidentiary fields (GPS, timestamp) but exclude + # dangerous device fields (serial, firmware version) + for key, value in extraction.evidentiary.items(): + if key not in metadata: # User metadata takes precedence + if hasattr(value, "isoformat"): + metadata[f"exif_{key}"] = value.isoformat() + elif isinstance(value, dict): + metadata[f"exif_{key}"] = value + else: + metadata[f"exif_{key}"] = str(value) # Create the attestation from soosef.verisoo.attestation import create_attestation @@ -149,7 +167,7 @@ def attest(): image_data=image_data, private_key=private_key, metadata=metadata if metadata else None, - auto_exif=auto_exif, + auto_exif=auto_exif and not strip_device, # Full EXIF only if not stripping device ) # Store in the append-only log diff --git a/frontends/web/blueprints/dropbox.py b/frontends/web/blueprints/dropbox.py index 05c802b..e9bd4a5 100644 --- a/frontends/web/blueprints/dropbox.py +++ b/frontends/web/blueprints/dropbox.py @@ -120,42 +120,55 @@ def upload(token): if token_data["used"] >= token_data["max_files"]: break - file_data = f.read() - if not file_data: + raw_data = f.read() + if not raw_data: continue - # Strip EXIF metadata - try: - import io + # Extract-then-strip pipeline: + # 1. Extract EXIF into attestation metadata (evidentiary fields) + # 2. Attest the ORIGINAL bytes (hash matches what source submitted) + # 3. Strip metadata from the stored copy (protect source device info) + from soosef.metadata import extract_strip_pipeline - from PIL import Image + extraction, stripped_data = extract_strip_pipeline(raw_data) - img = Image.open(io.BytesIO(file_data)) - clean = io.BytesIO() - img.save(clean, format=img.format or "PNG") - file_data = clean.getvalue() - except Exception: - pass # Not an image, or Pillow can't handle it — keep as-is + # SHA-256 of what the source actually submitted + sha256 = extraction.original_sha256 - # Compute SHA-256 - sha256 = hashlib.sha256(file_data).hexdigest() - - # Save file + # Save the stripped copy for display/storage (no device fingerprint on disk) dest = _TOKEN_DIR / f"{sha256[:16]}_{f.filename}" - dest.write_bytes(file_data) + dest.write_bytes(stripped_data) - # Auto-attest - chain_index = None + # Auto-attest the ORIGINAL bytes so the attestation hash matches + # what the source submitted. Evidentiary EXIF (GPS, timestamp) + # is preserved in the attestation metadata; dangerous fields + # (device serial) are excluded. try: from soosef.verisoo.attestation import create_attestation - from soosef.verisoo.storage import LocalStorage from blueprints.attest import _get_private_key, _get_storage + attest_metadata = { + "source": "dropbox", + "label": token_data["label"], + "stripped_sha256": extraction.stripped_sha256, + } + # Include evidentiary EXIF in attestation (GPS, timestamp) + for key, value in extraction.evidentiary.items(): + if hasattr(value, "isoformat"): + attest_metadata[key] = value.isoformat() + elif hasattr(value, "__dataclass_fields__"): + from dataclasses import asdict + attest_metadata[key] = asdict(value) + elif isinstance(value, dict): + attest_metadata[key] = value + else: + attest_metadata[key] = str(value) + private_key = _get_private_key() if private_key: attestation = create_attestation( - file_data, private_key, metadata={"source": "dropbox", "label": token_data["label"]} + raw_data, private_key, metadata=attest_metadata ) storage = _get_storage() storage.append_record(attestation.record) diff --git a/src/soosef/metadata.py b/src/soosef/metadata.py new file mode 100644 index 0000000..2fbff67 --- /dev/null +++ b/src/soosef/metadata.py @@ -0,0 +1,149 @@ +""" +Metadata handling pipeline: extract, classify, strip. + +Implements the extract-then-strip model for image metadata: +1. Extract all EXIF/metadata from raw bytes +2. Classify fields as evidentiary (GPS, timestamp) or dangerous (device serial) +3. Preserve evidentiary fields in a sidecar record +4. Strip all metadata from the stored/display copy + +This resolves the tension between steganography (strip everything to protect +sources) and attestation (preserve everything to prove provenance). +""" + +from __future__ import annotations + +import io +from dataclasses import dataclass, field + + +# Fields classified by risk level: +# - EVIDENTIARY: valuable for proving provenance (location, time) +# - DANGEROUS: could identify the source's device or person +# - NEUTRAL: neither helpful nor harmful + +EVIDENTIARY_FIELDS = { + "captured_at", # When the photo was taken + "location", # GPS coordinates (proves location) + "width", # Image dimensions + "height", +} + +DANGEROUS_FIELDS = { + "device", # Camera make/model/software/serial — fingerprints the source +} + +# Granular device sub-fields +DANGEROUS_DEVICE_FIELDS = { + "serial_hash", # Device serial number hash + "software", # Firmware version (narrows device population) +} + +SAFE_DEVICE_FIELDS = { + "make", # Broad manufacturer (e.g., "Apple") — low risk + "model", # Device model — moderate risk, useful for sensor analysis +} + + +@dataclass +class MetadataExtraction: + """Result of extracting and classifying image metadata.""" + + # Raw EXIF data as extracted by verisoo + raw_exif: dict = field(default_factory=dict) + + # Classified subsets + evidentiary: dict = field(default_factory=dict) # Safe to include in attestation + dangerous: dict = field(default_factory=dict) # Should be redacted or stored separately + all_fields: dict = field(default_factory=dict) # Everything (for secure sidecar) + + # Hashes + original_sha256: str = "" # Hash of original bytes (with EXIF) + stripped_sha256: str = "" # Hash of stripped bytes (without EXIF) + + +def extract_and_classify(image_data: bytes) -> MetadataExtraction: + """Extract EXIF metadata from image bytes and classify by risk level. + + Returns a MetadataExtraction with evidentiary (safe for attestation) + and dangerous (should be redacted) fields separated. + """ + import hashlib + + result = MetadataExtraction() + result.original_sha256 = hashlib.sha256(image_data).hexdigest() + + # Extract raw EXIF via verisoo's extractor + try: + from soosef.verisoo.attestation import extract_exif_metadata + + raw = extract_exif_metadata(image_data) + result.raw_exif = raw + result.all_fields = dict(raw) + except ImportError: + return result + + # Classify fields + for key in EVIDENTIARY_FIELDS: + if key in raw: + result.evidentiary[key] = raw[key] + + # Handle device fields granularly + device = raw.get("device") + if device: + # Keep safe device fields in evidentiary + safe_device = {} + dangerous_device = {} + device_dict = device if isinstance(device, dict) else {} + if hasattr(device, "__dataclass_fields__"): + from dataclasses import asdict + device_dict = asdict(device) + + for k, v in device_dict.items(): + if v is None: + continue + if k in DANGEROUS_DEVICE_FIELDS: + dangerous_device[k] = v + elif k in SAFE_DEVICE_FIELDS: + safe_device[k] = v + else: + safe_device[k] = v # Default to safe for unknown fields + + if safe_device: + result.evidentiary["device"] = safe_device + if dangerous_device: + result.dangerous["device"] = dangerous_device + + return result + + +def strip_metadata(image_data: bytes) -> bytes: + """Strip all metadata from image bytes. Returns clean image bytes.""" + import hashlib + + try: + from PIL import Image + + img = Image.open(io.BytesIO(image_data)) + clean = io.BytesIO() + # Re-save without copying info/exif — strips all metadata + img.save(clean, format=img.format or "PNG") + return clean.getvalue() + except Exception: + # Not an image or Pillow can't handle it — return as-is + return image_data + + +def extract_strip_pipeline(image_data: bytes) -> tuple[MetadataExtraction, bytes]: + """Full extract-then-strip pipeline. + + Returns: + (extraction, stripped_bytes) — extraction contains classified metadata, + stripped_bytes is the clean image for storage/display. + """ + import hashlib + + extraction = extract_and_classify(image_data) + stripped = strip_metadata(image_data) + extraction.stripped_sha256 = hashlib.sha256(stripped).hexdigest() + return extraction, stripped