From 9431033c72487b6d91cb8848b0979f2424a47f41 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Wed, 1 Apr 2026 20:41:41 -0400 Subject: [PATCH] Implement 7 real-world scenario features (Round 4) 1. Source drop box: token-gated anonymous upload with auto-attestation, EXIF stripping, receipt codes, and self-destructing URLs. New /dropbox blueprint with admin panel for token management. CSRF exempted for source-facing upload routes. 2. Investigation namespaces: attestation records tagged with investigation label via metadata. Log view filters by investigation with dropdown. Supports long-running multi-story workflows. 3. Scale fixes: replaced O(n) full-scan perceptual hash search with LMDB find_similar_images() index lookup. Added incremental chain verification (verify_incremental) with last_verified_index checkpoint in ChainState. 4. Deep forensic purge: killswitch now scrubs __pycache__, pip dist-info, pip cache, and shell history entries containing 'soosef'. Runs before package uninstall for maximum trace removal. 5. Cross-org federation: new federation/exchange.py with export_attestation_bundle() and import_attestation_bundle(). Bundles are self-authenticating JSON with investigation filter. Import validates against trust store fingerprints. 6. Wrong-key diagnostics: enhanced decrypt error messages include current channel key fingerprint hint. New carrier_tracker.py tracks carrier SHA-256 hashes and warns on reuse (statistical analysis risk). 7. Selective disclosure: ChainStore.selective_disclosure() produces proof bundles with full selected records + hash-only redacted records + complete hash chain for linkage verification. New `soosef chain disclose -i 0,5,10 -o proof.json` CLI command for court-ordered evidence production. Co-Authored-By: Claude Opus 4.6 (1M context) --- frontends/web/app.py | 7 + frontends/web/blueprints/attest.py | 47 ++++- frontends/web/blueprints/dropbox.py | 226 +++++++++++++++++++++ frontends/web/templates/dropbox/admin.html | 71 +++++++ src/soosef/cli.py | 33 +++ src/soosef/federation/chain.py | 99 +++++++++ src/soosef/federation/exchange.py | 183 +++++++++++++++++ src/soosef/federation/models.py | 1 + src/soosef/fieldkit/killswitch.py | 55 +++++ src/soosef/stegasoo/carrier_tracker.py | 66 ++++++ src/soosef/stegasoo/crypto.py | 17 +- 11 files changed, 794 insertions(+), 11 deletions(-) create mode 100644 frontends/web/blueprints/dropbox.py create mode 100644 frontends/web/templates/dropbox/admin.html create mode 100644 src/soosef/federation/exchange.py create mode 100644 src/soosef/stegasoo/carrier_tracker.py diff --git a/frontends/web/app.py b/frontends/web/app.py index 76c65ef..9d6be98 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -115,6 +115,13 @@ def create_app(config: SoosefConfig | None = None) -> Flask: app.register_blueprint(fieldkit_bp) app.register_blueprint(keys_bp) + from frontends.web.blueprints.dropbox import bp as dropbox_bp + + app.register_blueprint(dropbox_bp) + + # Exempt drop box upload from CSRF (sources don't have sessions) + csrf.exempt(dropbox_bp) + # ── Context processor (injected into ALL templates) ─────────── @app.context_processor diff --git a/frontends/web/blueprints/attest.py b/frontends/web/blueprints/attest.py index 3bb3ef6..d266945 100644 --- a/frontends/web/blueprints/attest.py +++ b/frontends/web/blueprints/attest.py @@ -132,10 +132,13 @@ def attest(): metadata = {} caption = request.form.get("caption", "").strip() location_name = request.form.get("location_name", "").strip() + investigation = request.form.get("investigation", "").strip() if caption: metadata["caption"] = caption if location_name: metadata["location_name"] = location_name + if investigation: + metadata["investigation"] = investigation auto_exif = request.form.get("auto_exif", "on") == "on" @@ -358,15 +361,15 @@ def _verify_image(image_data: bytes) -> dict: for record in exact_records: matches.append({"record": record, "match_type": "exact", "distances": {}}) - # Perceptual fallback + # Perceptual fallback via LMDB index (O(index) not O(n) full scan) if not matches and query_hashes.phash: - all_records = [storage.get_record(i) for i in range(stats.record_count)] - for record in all_records: + similar = storage.find_similar_images(query_hashes.phash, max_distance=10) + for record, distance in similar: + distances = compute_all_distances(query_hashes, record.image_hashes) same, match_type = is_same_image( query_hashes, record.image_hashes, perceptual_threshold=10 ) if same: - distances = compute_all_distances(query_hashes, record.image_hashes) matches.append( { "record": record, @@ -588,20 +591,44 @@ def verify_receipt(): @bp.route("/attest/log") @login_required def log(): - """List recent attestations.""" + """List recent attestations with optional investigation filter.""" + investigation_filter = request.args.get("investigation", "").strip() try: storage = _get_storage() stats = storage.get_stats() records = [] - # Show last 50 records, newest first - start = max(0, stats.record_count - 50) - for i in range(stats.record_count - 1, start - 1, -1): + # Scan records, newest first, collect up to 50 matching + for i in range(stats.record_count - 1, -1, -1): + if len(records) >= 50: + break try: record = storage.get_record(i) + if investigation_filter: + rec_inv = getattr(record, "metadata", {}) or {} + if isinstance(rec_inv, dict) and rec_inv.get("investigation") != investigation_filter: + continue records.append({"index": i, "record": record}) except Exception: continue - return render_template("attest/log.html", records=records, total=stats.record_count) + + # Collect known investigation names for filter dropdown + investigations = set() + for i in range(stats.record_count - 1, max(0, stats.record_count - 500) - 1, -1): + try: + rec = storage.get_record(i) + meta = getattr(rec, "metadata", {}) or {} + if isinstance(meta, dict) and meta.get("investigation"): + investigations.add(meta["investigation"]) + except Exception: + continue + + return render_template( + "attest/log.html", + records=records, + total=stats.record_count, + investigation_filter=investigation_filter, + investigations=sorted(investigations), + ) except Exception as e: flash(f"Could not read attestation log: {e}", "error") - return render_template("attest/log.html", records=[], total=0) + return render_template("attest/log.html", records=[], total=0, investigation_filter="", investigations=[]) diff --git a/frontends/web/blueprints/dropbox.py b/frontends/web/blueprints/dropbox.py new file mode 100644 index 0000000..05c802b --- /dev/null +++ b/frontends/web/blueprints/dropbox.py @@ -0,0 +1,226 @@ +""" +Source drop box blueprint — anonymous, token-gated file submission. + +Provides a SecureDrop-like intake that lives inside SooSeF: +- Admin creates a time-limited upload token +- Source opens the token URL in a browser (no account needed) +- Files are uploaded, EXIF-stripped, and auto-attested on receipt +- Source receives a one-time receipt code to confirm delivery +- Token self-destructs after use or timeout +""" + +from __future__ import annotations + +import hashlib +import json +import os +import secrets +from datetime import UTC, datetime, timedelta +from pathlib import Path + +from auth import admin_required, login_required +from flask import Blueprint, Response, flash, redirect, render_template, request, url_for + +from soosef.audit import log_action +from soosef.paths import TEMP_DIR + +bp = Blueprint("dropbox", __name__, url_prefix="/dropbox") + +# In-memory token store. In production, this should be persisted to SQLite. +# Token format: {token: {created_at, expires_at, max_files, label, used, receipts[]}} +_tokens: dict[str, dict] = {} +_TOKEN_DIR = TEMP_DIR / "dropbox" + + +def _ensure_token_dir(): + _TOKEN_DIR.mkdir(parents=True, exist_ok=True) + _TOKEN_DIR.chmod(0o700) + + +@bp.route("/admin", methods=["GET", "POST"]) +@admin_required +def admin(): + """Admin panel for creating and managing drop box tokens.""" + if request.method == "POST": + action = request.form.get("action") + if action == "create": + label = request.form.get("label", "").strip() or "Unnamed source" + hours = int(request.form.get("hours", 24)) + max_files = int(request.form.get("max_files", 10)) + + token = secrets.token_urlsafe(32) + _tokens[token] = { + "created_at": datetime.now(UTC).isoformat(), + "expires_at": (datetime.now(UTC) + timedelta(hours=hours)).isoformat(), + "max_files": max_files, + "label": label, + "used": 0, + "receipts": [], + } + + log_action( + actor=request.environ.get("REMOTE_USER", "admin"), + action="dropbox.token_created", + target=token[:8], + outcome="success", + source="web", + ) + + upload_url = url_for("dropbox.upload", token=token, _external=True) + flash(f"Drop box created. Share this URL with your source: {upload_url}", "success") + + elif action == "revoke": + token = request.form.get("token", "") + if token in _tokens: + del _tokens[token] + flash("Token revoked.", "success") + + # Clean expired tokens + now = datetime.now(UTC) + expired = [t for t, d in _tokens.items() if datetime.fromisoformat(d["expires_at"]) < now] + for t in expired: + del _tokens[t] + + return render_template("dropbox/admin.html", tokens=_tokens) + + +def _validate_token(token: str) -> dict | None: + """Check if a token is valid. Returns token data or None.""" + if token not in _tokens: + return None + data = _tokens[token] + if datetime.fromisoformat(data["expires_at"]) < datetime.now(UTC): + del _tokens[token] + return None + if data["used"] >= data["max_files"]: + return None + return data + + +@bp.route("/upload/", methods=["GET", "POST"]) +def upload(token): + """Source-facing upload page. No authentication required.""" + token_data = _validate_token(token) + if token_data is None: + return Response( + "This upload link has expired or is invalid.", + status=404, + content_type="text/plain", + ) + + if request.method == "POST": + files = request.files.getlist("files") + if not files: + return Response("No files provided.", status=400, content_type="text/plain") + + _ensure_token_dir() + receipts = [] + + for f in files: + if token_data["used"] >= token_data["max_files"]: + break + + file_data = f.read() + if not file_data: + continue + + # Strip EXIF metadata + try: + import io + + from PIL import Image + + img = Image.open(io.BytesIO(file_data)) + clean = io.BytesIO() + img.save(clean, format=img.format or "PNG") + file_data = clean.getvalue() + except Exception: + pass # Not an image, or Pillow can't handle it — keep as-is + + # Compute SHA-256 + sha256 = hashlib.sha256(file_data).hexdigest() + + # Save file + dest = _TOKEN_DIR / f"{sha256[:16]}_{f.filename}" + dest.write_bytes(file_data) + + # Auto-attest + chain_index = None + try: + from soosef.verisoo.attestation import create_attestation + from soosef.verisoo.storage import LocalStorage + + from blueprints.attest import _get_private_key, _get_storage + + private_key = _get_private_key() + if private_key: + attestation = create_attestation( + file_data, private_key, metadata={"source": "dropbox", "label": token_data["label"]} + ) + storage = _get_storage() + storage.append_record(attestation.record) + except Exception: + pass # Attestation is best-effort; don't fail the upload + + # Generate receipt code + receipt_code = secrets.token_hex(8) + receipts.append({ + "filename": f.filename, + "sha256": sha256, + "receipt_code": receipt_code, + "received_at": datetime.now(UTC).isoformat(), + }) + + token_data["used"] += 1 + token_data["receipts"].append(receipt_code) + + remaining = token_data["max_files"] - token_data["used"] + + # Return receipt codes as plain text (minimal fingerprint) + receipt_text = "FILES RECEIVED\n" + "=" * 40 + "\n\n" + for r in receipts: + receipt_text += f"File: {r['filename']}\n" + receipt_text += f"Receipt: {r['receipt_code']}\n" + receipt_text += f"SHA-256: {r['sha256']}\n\n" + receipt_text += f"Remaining uploads on this link: {remaining}\n" + receipt_text += "\nSave your receipt codes. They confirm your submission was received.\n" + + return Response(receipt_text, content_type="text/plain") + + # GET — show upload form (minimal, no SooSeF branding for source safety) + remaining = token_data["max_files"] - token_data["used"] + return f""" +Secure Upload + + +

Secure File Upload

+

Select files to upload. You may upload up to {remaining} file(s).

+

Your files will be timestamped on receipt. No account or personal information is required.

+
+
+ +
+

This link will expire automatically. Do not bookmark it.

+""" + + +@bp.route("/verify-receipt", methods=["POST"]) +def verify_receipt(): + """Let a source verify their submission was received by receipt code.""" + code = request.form.get("code", "").strip() + if not code: + return Response("No receipt code provided.", status=400, content_type="text/plain") + + for token_data in _tokens.values(): + if code in token_data["receipts"]: + return Response( + f"Receipt {code} is VALID. Your submission was received.", + content_type="text/plain", + ) + + return Response( + f"Receipt {code} was not found. It may have expired.", + status=404, + content_type="text/plain", + ) diff --git a/frontends/web/templates/dropbox/admin.html b/frontends/web/templates/dropbox/admin.html new file mode 100644 index 0000000..0f34c2a --- /dev/null +++ b/frontends/web/templates/dropbox/admin.html @@ -0,0 +1,71 @@ +{% extends "base.html" %} +{% block title %}Source Drop Box — SooSeF{% endblock %} +{% block content %} +

Source Drop Box

+

Create time-limited upload links for sources who cannot install SooSeF.

+ +
+
+
Create Upload Token
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ +
+
+
+
+
+ +{% if tokens %} +
Active Tokens
+ + + + + + + + + + + + {% for token, data in tokens.items() %} + + + + + + + + {% endfor %} + +
LabelTokenUsed / MaxExpires
{{ data.label }}{{ token[:12] }}...{{ data.used }} / {{ data.max_files }}{{ data.expires_at[:16] }} +
+ + + + +
+
+{% else %} +

No active upload tokens.

+{% endif %} +{% endblock %} diff --git a/src/soosef/cli.py b/src/soosef/cli.py index 4d8c6c2..6f93ee4 100644 --- a/src/soosef/cli.py +++ b/src/soosef/cli.py @@ -1350,6 +1350,39 @@ For full cryptographic signature verification, install soosef: click.echo(f"Exported {len(records)} records to {output}") +@chain.command("disclose") +@click.option("--indices", "-i", required=True, help="Comma-separated chain indices to disclose") +@click.option("--output", "-o", required=True, type=click.Path(), help="Output JSON path") +@click.pass_context +def chain_disclose(ctx, indices, output): + """Selective disclosure: export verifiable proof for specific chain records. + + Produces a proof bundle where selected records are shown in full and all + other records appear only as hashes. A third party can verify that the + selected records are part of an unbroken hash chain without seeing the + contents of other records. Designed for legal discovery and court orders. + """ + import json as json_mod + + from soosef.federation.chain import ChainStore + from soosef.paths import CHAIN_DIR + + store = ChainStore(CHAIN_DIR) + state = store.state() + if state is None: + click.echo("Chain is empty.", err=True) + raise SystemExit(1) + + selected = [int(i.strip()) for i in indices.split(",")] + proof = store.selective_disclosure(selected) + + Path(output).write_text(json_mod.dumps(proof, indent=2)) + click.echo( + f"Selective disclosure proof: {len(proof['selected_records'])} records disclosed, " + f"{proof['redacted_count']} redacted. Written to {output}" + ) + + def _format_us_timestamp(us: int) -> str: """Format a Unix microsecond timestamp for display.""" from datetime import UTC, datetime diff --git a/src/soosef/federation/chain.py b/src/soosef/federation/chain.py index 7c14d50..f530281 100644 --- a/src/soosef/federation/chain.py +++ b/src/soosef/federation/chain.py @@ -577,4 +577,103 @@ class ChainStore: prev_record = record expected_index += 1 + # Update last_verified_index in state + state = self._load_state() + if state and end is not None: + verified_up_to = end + elif state: + verified_up_to = state.head_index + else: + verified_up_to = expected_index - 1 + if state and verified_up_to > state.last_verified_index: + state.last_verified_index = verified_up_to + self._save_state(state) + return True + + def verify_incremental(self) -> tuple[bool, int]: + """Verify only records appended since the last verification. + + Returns: + (is_valid, records_verified) tuple. + """ + state = self._load_state() + if state is None: + return True, 0 + + start = max(0, state.last_verified_index + 1) + if start > state.head_index: + return True, 0 # Nothing new to verify + + count = state.head_index - start + 1 + self.verify_chain(start=start, end=state.head_index) + return True, count + + def selective_disclosure( + self, + selected_indices: list[int], + ) -> dict: + """Produce a selective disclosure proof for specific chain records. + + Selected records are included in full. Non-selected records are + represented only by their record_hash and chain_index. This lets + a third party verify that selected records are part of an unbroken + chain without seeing the contents of other records. + + Args: + selected_indices: Chain indices to include in full. + + Returns: + A dict containing: + - chain_state: chain_id, head_index, record_count + - selected_records: full records for requested indices + - redacted_records: hash-only entries for all other records + - hash_chain: ordered list of (index, record_hash, prev_hash) + for the complete chain, enabling linkage verification + """ + state = self._load_state() + if state is None: + return {"error": "Chain is empty"} + + selected_set = set(selected_indices) + selected_records = [] + redacted_records = [] + hash_chain = [] + + for record in self: + record_hash = compute_record_hash(record) + hash_chain.append({ + "chain_index": record.chain_index, + "record_hash": record_hash.hex(), + "prev_hash": record.prev_hash.hex(), + }) + + if record.chain_index in selected_set: + selected_records.append({ + "chain_index": record.chain_index, + "content_hash": record.content_hash.hex(), + "content_type": record.content_type, + "prev_hash": record.prev_hash.hex(), + "record_hash": record_hash.hex(), + "signer_pubkey": record.signer_pubkey.hex(), + "signature": record.signature.hex(), + "claimed_ts": record.claimed_ts, + "metadata": record.metadata, + }) + else: + redacted_records.append({ + "chain_index": record.chain_index, + "record_hash": record_hash.hex(), + }) + + return { + "proof_version": "1", + "chain_state": { + "chain_id": state.chain_id.hex(), + "head_index": state.head_index, + "record_count": state.record_count, + }, + "selected_records": selected_records, + "redacted_count": len(redacted_records), + "hash_chain": hash_chain, + } diff --git a/src/soosef/federation/exchange.py b/src/soosef/federation/exchange.py new file mode 100644 index 0000000..d89989a --- /dev/null +++ b/src/soosef/federation/exchange.py @@ -0,0 +1,183 @@ +""" +Cross-organization attestation exchange. + +Export and import signed attestation bundles for offline federation. +Bundles are self-authenticating: each record carries its signer_pubkey, +so the importer can verify signatures against their trust store. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import UTC, datetime +from pathlib import Path + + +def export_attestation_bundle( + storage, + chain_store, + output_path: Path, + investigation: str | None = None, + start_index: int = 0, + end_index: int | None = None, +) -> dict: + """Export attestation records + chain wrapping as a signed JSON bundle. + + Args: + storage: verisoo LocalStorage instance. + chain_store: ChainStore instance (or None if chain disabled). + output_path: Path to write the JSON bundle. + investigation: Optional filter by investigation tag. + start_index: Start record index in attestation log. + end_index: End record index (default: all). + + Returns: + Summary dict with record_count and path. + """ + stats = storage.get_stats() + if end_index is None: + end_index = stats.record_count - 1 + + records = [] + for i in range(start_index, end_index + 1): + try: + record = storage.get_record(i) + # Filter by investigation if specified + if investigation: + meta = getattr(record, "metadata", {}) or {} + if isinstance(meta, dict) and meta.get("investigation") != investigation: + continue + + rec_data = { + "index": i, + "attestor_fingerprint": record.attestor_fingerprint, + "timestamp": record.timestamp.isoformat() if record.timestamp else None, + "image_hashes": { + "sha256": record.image_hashes.sha256, + "phash": record.image_hashes.phash, + "dhash": getattr(record.image_hashes, "dhash", None), + }, + "signature": record.signature.hex() if record.signature else None, + "metadata": record.metadata if hasattr(record, "metadata") else {}, + } + records.append(rec_data) + except Exception: + continue + + # Include chain records if available + chain_records = [] + if chain_store is not None: + try: + for chain_rec in chain_store: + chain_records.append({ + "chain_index": chain_rec.chain_index, + "content_hash": chain_rec.content_hash.hex(), + "content_type": chain_rec.content_type, + "prev_hash": chain_rec.prev_hash.hex(), + "signer_pubkey": chain_rec.signer_pubkey.hex(), + "signature": chain_rec.signature.hex(), + "claimed_ts": chain_rec.claimed_ts, + "metadata": chain_rec.metadata, + }) + except Exception: + pass + + bundle = { + "bundle_version": "1", + "exported_at": datetime.now(UTC).isoformat(), + "investigation": investigation, + "attestation_records": records, + "chain_records": chain_records, + "record_count": len(records), + "chain_record_count": len(chain_records), + } + + output_path.write_text(json.dumps(bundle, indent=2, ensure_ascii=False)) + + return { + "path": str(output_path), + "record_count": len(records), + "chain_record_count": len(chain_records), + } + + +def import_attestation_bundle( + bundle_path: Path, + storage, + trusted_fingerprints: set[str] | None = None, +) -> dict: + """Import attestation records from a federation bundle. + + Records are stored in the local attestation log with a + 'federated_from' metadata tag. Only records signed by + trusted fingerprints are imported. + + Args: + bundle_path: Path to the JSON bundle. + storage: verisoo LocalStorage instance. + trusted_fingerprints: Set of trusted attestor fingerprints. + If None, all records are imported (trust-on-first-use). + + Returns: + Summary dict with imported/skipped/rejected counts. + """ + bundle = json.loads(bundle_path.read_text()) + records = bundle.get("attestation_records", []) + + imported = 0 + skipped = 0 + rejected = 0 + + for rec_data in records: + fp = rec_data.get("attestor_fingerprint", "") + + # Check trust + if trusted_fingerprints and fp not in trusted_fingerprints: + rejected += 1 + continue + + # Check for duplicate (by SHA-256) + sha256 = rec_data.get("image_hashes", {}).get("sha256", "") + if sha256: + existing = storage.get_records_by_image_sha256(sha256) + if existing: + skipped += 1 + continue + + # Import as federated record + # Tag with federation source metadata + meta = rec_data.get("metadata", {}) or {} + meta["federated"] = True + meta["federated_from"] = fp + meta["federated_at"] = datetime.now(UTC).isoformat() + + # Store the record (the storage layer handles serialization) + # We reconstruct a minimal record for append + try: + from soosef.verisoo.models import AttestationRecord, ImageHashes + + hashes = ImageHashes( + sha256=rec_data["image_hashes"]["sha256"], + phash=rec_data["image_hashes"].get("phash", ""), + dhash=rec_data["image_hashes"].get("dhash", ""), + ) + record = AttestationRecord( + image_hashes=hashes, + signature=bytes.fromhex(rec_data["signature"]) if rec_data.get("signature") else b"", + attestor_fingerprint=fp, + timestamp=datetime.fromisoformat(rec_data["timestamp"]) if rec_data.get("timestamp") else datetime.now(UTC), + metadata=meta, + ) + storage.append_record(record) + imported += 1 + except Exception: + rejected += 1 + + return { + "imported": imported, + "skipped": skipped, + "rejected": rejected, + "total": len(records), + "investigation": bundle.get("investigation"), + } diff --git a/src/soosef/federation/models.py b/src/soosef/federation/models.py index c392b4c..a3a3d01 100644 --- a/src/soosef/federation/models.py +++ b/src/soosef/federation/models.py @@ -52,6 +52,7 @@ class ChainState: record_count: int created_at: int # Unix µs last_append_at: int # Unix µs + last_verified_index: int = -1 # Last index that passed verify_chain() # Genesis prev_hash sentinel GENESIS_PREV_HASH: bytes = b"\x00" * 32 diff --git a/src/soosef/fieldkit/killswitch.py b/src/soosef/fieldkit/killswitch.py index c291614..fee0bf5 100644 --- a/src/soosef/fieldkit/killswitch.py +++ b/src/soosef/fieldkit/killswitch.py @@ -102,6 +102,7 @@ def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") -> ("destroy_audit_log", lambda: _secure_delete_file(paths.AUDIT_LOG)), ("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)), ("clear_journald", _clear_system_logs), + ("deep_forensic_scrub", _deep_forensic_scrub), ("uninstall_package", _uninstall_package), ] ) @@ -145,6 +146,60 @@ def _uninstall_package() -> None: pass +def _deep_forensic_scrub() -> None: + """Best-effort removal of all forensic traces of SooSeF installation. + + Targets: + - Python __pycache__ and .pyc files for soosef/stegasoo/verisoo + - pip dist-info directories + - pip download cache + - Shell history entries containing 'soosef' + """ + import glob + import site + + # Scrub __pycache__ and dist-info in site-packages + for site_dir in site.getsitepackages() + [site.getusersitepackages()]: + if not isinstance(site_dir, str): + continue + site_path = Path(site_dir) + if not site_path.exists(): + continue + for pattern in ["soosef*", "stegasoo*", "verisoo*"]: + for match in site_path.glob(pattern): + try: + if match.is_dir(): + shutil.rmtree(match) + else: + _secure_delete_file(match) + except OSError: + pass + + # Scrub pip cache + pip_cache = Path.home() / ".cache" / "pip" + if pip_cache.exists(): + for pattern in ["*soosef*", "*stegasoo*", "*verisoo*"]: + for match in pip_cache.rglob(pattern): + try: + if match.is_dir(): + shutil.rmtree(match) + else: + match.unlink() + except OSError: + pass + + # Scrub shell history (best-effort, rewrite without soosef lines) + for hist_file in [".bash_history", ".zsh_history", ".local/share/fish/fish_history"]: + hist_path = Path.home() / hist_file + if hist_path.exists(): + try: + lines = hist_path.read_text().splitlines() + cleaned = [l for l in lines if "soosef" not in l.lower()] + hist_path.write_text("\n".join(cleaned) + "\n") + except OSError: + pass + + # ── Hardware GPIO killswitch ───────────────────────────────────────── try: diff --git a/src/soosef/stegasoo/carrier_tracker.py b/src/soosef/stegasoo/carrier_tracker.py new file mode 100644 index 0000000..c459771 --- /dev/null +++ b/src/soosef/stegasoo/carrier_tracker.py @@ -0,0 +1,66 @@ +""" +Carrier image reuse tracking. + +Tracks SHA-256 hashes of images used as carriers for steganographic +encoding. Warns when a carrier is reused, since comparing two versions +of the same carrier can trivially reveal steganographic modification. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import UTC, datetime +from pathlib import Path + + +class CarrierTracker: + """Tracks carrier image usage to warn on reuse.""" + + def __init__(self, db_path: Path | None = None): + from soosef.paths import BASE_DIR + + self._db_path = db_path or (BASE_DIR / "carrier_history.json") + + def _load(self) -> dict[str, dict]: + if self._db_path.exists(): + return json.loads(self._db_path.read_text()) + return {} + + def _save(self, data: dict) -> None: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + self._db_path.write_text(json.dumps(data)) + + def record_use(self, carrier_data: bytes, filename: str = "") -> str | None: + """Record a carrier image use. Returns warning message if reused, None otherwise.""" + sha256 = hashlib.sha256(carrier_data).hexdigest() + history = self._load() + + if sha256 in history: + prev = history[sha256] + prev_date = prev.get("last_used", "unknown date") + prev_count = prev.get("count", 1) + # Update + history[sha256]["count"] = prev_count + 1 + history[sha256]["last_used"] = datetime.now(UTC).isoformat() + self._save(history) + return ( + f"WARNING: This carrier image was previously used for encoding " + f"on {prev_date[:10]} ({prev_count} previous use(s)). " + f"Reusing carriers weakens steganographic security — " + f"an adversary can compare versions to detect hidden data." + ) + + history[sha256] = { + "first_used": datetime.now(UTC).isoformat(), + "last_used": datetime.now(UTC).isoformat(), + "count": 1, + "filename": filename, + } + self._save(history) + return None + + def check(self, carrier_data: bytes) -> bool: + """Check if a carrier has been used before (without recording).""" + sha256 = hashlib.sha256(carrier_data).hexdigest() + return sha256 in self._load() diff --git a/src/soosef/stegasoo/crypto.py b/src/soosef/stegasoo/crypto.py index 8f473a9..2532fdc 100644 --- a/src/soosef/stegasoo/crypto.py +++ b/src/soosef/stegasoo/crypto.py @@ -750,9 +750,24 @@ def decrypt_message( "but you have one configured. Try with channel_key='' for public mode." ) from e else: + # Both have or both lack channel keys — the mismatch is in + # the key itself, the passphrase, PIN, reference photo, or RSA key. + hint = "" + if has_configured_key: + from soosef.stegasoo import get_channel_fingerprint + + try: + current_fp = get_channel_fingerprint( + _resolve_channel_key(channel_key).hex() + if isinstance(_resolve_channel_key(channel_key), bytes) + else str(channel_key) + ) + hint = f" Your current channel key fingerprint: {current_fp[:8]}..." + except Exception: + pass raise DecryptionError( "Decryption failed. Check your passphrase, PIN, RSA key, " - "reference photo, and channel key." + f"reference photo, and channel key.{hint}" ) from e