From 51c9b0a99abd2f9d980ebea33311c322986a4437 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Wed, 1 Apr 2026 17:06:33 -0400 Subject: [PATCH] Fix 14 bugs and add features from power-user security audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical fixes: - Fix admin_delete_user missing current_user_id argument (TypeError on every delete) - Fix self-signed cert OOM: bytes(2130706433) → IPv4Address("127.0.0.1") - Add @login_required to attestation routes (attest, log); verify stays public - Add auth guards to fieldkit (@admin_required on killswitch) and keys blueprints - Fix cleanup_temp_files NameError in generate() route Security hardening: - Unify temp storage to ~/.soosef/temp/ so killswitch purge covers web uploads - Replace Path.unlink() with secure deletion (shred fallback) in temp_storage - Add structured audit log (audit.jsonl) for admin, key, and killswitch actions New features: - Dead man's switch background enforcement thread in serve + check-deadman CLI - Key rotation: soosef keys rotate-identity/rotate-channel with archiving - Batch attestation: soosef attest batch with progress and error handling - Geofence CLI: set/check/clear commands with config persistence - USB CLI: snapshot/check commands against device whitelist - Verification receipt download (/verify/receipt JSON endpoint + UI button) - IdentityInfo.created_at populated from sidecar meta.json (mtime fallback) Data layer: - ChainStore.get() now O(1) via byte-offset index built during state rebuild - Add federation module (chain, models, serialization, entropy) Includes 45+ new tests across chain, deadman, key rotation, killswitch, and serialization modules. Co-Authored-By: Claude Opus 4.6 (1M context) --- frontends/web/app.py | 43 +- frontends/web/blueprints/attest.py | 305 +++++-- frontends/web/blueprints/fieldkit.py | 21 + frontends/web/blueprints/keys.py | 50 +- frontends/web/temp_storage.py | 70 +- .../web/templates/attest/verify_result.html | 27 +- pyproject.toml | 2 + src/soosef/audit.py | 108 +++ src/soosef/cli.py | 803 +++++++++++++++++- src/soosef/config.py | 4 + src/soosef/exceptions.py | 12 + src/soosef/federation/__init__.py | 17 + src/soosef/federation/chain.py | 465 ++++++++++ src/soosef/federation/entropy.py | 81 ++ src/soosef/federation/models.py | 57 ++ src/soosef/federation/serialization.py | 97 +++ src/soosef/fieldkit/geofence.py | 49 ++ src/soosef/fieldkit/killswitch.py | 26 +- src/soosef/keystore/manager.py | 175 +++- src/soosef/keystore/models.py | 10 + src/soosef/paths.py | 132 +-- tests/conftest.py | 37 + tests/test_chain.py | 287 +++++++ tests/test_chain_security.py | 162 ++++ tests/test_deadman_enforcement.py | 292 +++++++ tests/test_key_rotation.py | 328 +++++++ tests/test_killswitch.py | 134 +++ tests/test_serialization.py | 123 +++ 28 files changed, 3749 insertions(+), 168 deletions(-) create mode 100644 src/soosef/audit.py create mode 100644 src/soosef/federation/__init__.py create mode 100644 src/soosef/federation/chain.py create mode 100644 src/soosef/federation/entropy.py create mode 100644 src/soosef/federation/models.py create mode 100644 src/soosef/federation/serialization.py create mode 100644 tests/conftest.py create mode 100644 tests/test_chain.py create mode 100644 tests/test_chain_security.py create mode 100644 tests/test_deadman_enforcement.py create mode 100644 tests/test_key_rotation.py create mode 100644 tests/test_killswitch.py create mode 100644 tests/test_serialization.py diff --git a/frontends/web/app.py b/frontends/web/app.py index 9f03675..cf531ac 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -75,6 +75,13 @@ def create_app(config: SoosefConfig | None = None) -> Flask: app.config["HTTPS_ENABLED"] = config.https_enabled app.config["SOOSEF_CONFIG"] = config + # Point temp_storage at ~/.soosef/temp/ before any routes run, so all + # uploaded files land where the killswitch's destroy_temp_files step + # expects them. Must happen after ensure_dirs() so the directory exists. + import temp_storage as _ts + + _ts.init(TEMP_DIR) + # Persist secret key so sessions survive restarts _load_secret_key(app) @@ -239,6 +246,7 @@ def _register_stegasoo_routes(app: Flask) -> None: The stegasoo templates are in templates/stego/ and extend our base.html. """ import temp_storage + from soosef.audit import log_action from subprocess_stego import ( SubprocessStego, cleanup_progress_file, @@ -460,6 +468,14 @@ def _register_stegasoo_routes(app: Flask) -> None: username = request.form.get("username", "") temp_password = generate_temp_password() success, message = create_user(username, temp_password) + log_action( + actor=get_username(), + action="user.create", + target=f"user:{username}", + outcome="success" if success else "failure", + source="web", + detail=None if success else message, + ) if success: flash(f"User '{username}' created with temporary password: {temp_password}", "success") else: @@ -470,7 +486,17 @@ def _register_stegasoo_routes(app: Flask) -> None: @app.route("/admin/users//delete", methods=["POST"]) @admin_required def admin_delete_user(user_id): - success, message = delete_user(user_id) + target_user = get_user_by_id(user_id) + target_name = target_user.username if target_user else str(user_id) + success, message = delete_user(user_id, get_current_user().id) + log_action( + actor=get_username(), + action="user.delete", + target=f"user:{target_name}", + outcome="success" if success else "failure", + source="web", + detail=None if success else message, + ) flash(message, "success" if success else "error") return redirect(url_for("admin_users")) @@ -479,9 +505,18 @@ def _register_stegasoo_routes(app: Flask) -> None: def admin_reset_password(user_id): temp_password = generate_temp_password() success, message = reset_user_password(user_id, temp_password) + target_user = get_user_by_id(user_id) + target_name = target_user.username if target_user else str(user_id) + log_action( + actor=get_username(), + action="user.password_reset", + target=f"user:{target_name}", + outcome="success" if success else "failure", + source="web", + detail=None if success else message, + ) if success: - user = get_user_by_id(user_id) - flash(f"Password for '{user.username}' reset to: {temp_password}", "success") + flash(f"Password for '{target_name}' reset to: {temp_password}", "success") else: flash(message, "error") return redirect(url_for("admin_users")) @@ -530,7 +565,7 @@ def _register_stegasoo_routes(app: Flask) -> None: if not qr_too_large: qr_token = secrets.token_urlsafe(16) - cleanup_temp_files() + temp_storage.cleanup_expired(TEMP_FILE_EXPIRY) temp_storage.save_temp_file( qr_token, creds.rsa_key_pem.encode(), diff --git a/frontends/web/blueprints/attest.py b/frontends/web/blueprints/attest.py index 29655fe..d229214 100644 --- a/frontends/web/blueprints/attest.py +++ b/frontends/web/blueprints/attest.py @@ -4,15 +4,18 @@ Attestation blueprint — attest and verify images via Verisoo. Wraps verisoo's attestation and verification libraries to provide: - Image attestation: upload → hash → sign → store in append-only log - Image verification: upload → hash → search log → display matches +- Verification receipt: same as verify but returns a downloadable JSON file """ from __future__ import annotations -import io +import json +import socket from datetime import UTC, datetime -from pathlib import Path -from flask import Blueprint, flash, redirect, render_template, request, url_for +from flask import Blueprint, Response, flash, redirect, render_template, request, url_for + +from auth import login_required bp = Blueprint("attest", __name__) @@ -35,24 +38,80 @@ def _get_private_key(): return load_private_key(IDENTITY_PRIVATE_KEY) +def _wrap_in_chain(verisoo_record, private_key, metadata: dict | None = None): + """Wrap a Verisoo attestation record in the hash chain. + + Returns the chain record, or None if chain is disabled. + """ + import hashlib + + from cryptography.hazmat.primitives.serialization import load_pem_private_key + + from soosef.config import SoosefConfig + from soosef.federation.chain import ChainStore + from soosef.paths import CHAIN_DIR, IDENTITY_PRIVATE_KEY + + config = SoosefConfig.load() + if not config.chain_enabled or not config.chain_auto_wrap: + return None + + # Hash the verisoo record bytes as chain content + record_bytes = ( + verisoo_record.to_bytes() + if hasattr(verisoo_record, "to_bytes") + else str(verisoo_record).encode() + ) + content_hash = hashlib.sha256(record_bytes).digest() + + # Load Ed25519 key for chain signing (need the cryptography key, not verisoo's) + priv_pem = IDENTITY_PRIVATE_KEY.read_bytes() + chain_private_key = load_pem_private_key(priv_pem, password=None) + + chain_metadata = {} + if metadata: + if "caption" in metadata: + chain_metadata["caption"] = metadata["caption"] + if "location_name" in metadata: + chain_metadata["location"] = metadata["location_name"] + + store = ChainStore(CHAIN_DIR) + return store.append( + content_hash=content_hash, + content_type="verisoo/attestation-v1", + private_key=chain_private_key, + metadata=chain_metadata, + ) + + def _allowed_image(filename: str) -> bool: if not filename or "." not in filename: return False - return filename.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif", "webp", "tiff", "tif"} + return filename.rsplit(".", 1)[1].lower() in { + "png", + "jpg", + "jpeg", + "bmp", + "gif", + "webp", + "tiff", + "tif", + } @bp.route("/attest", methods=["GET", "POST"]) +@login_required def attest(): """Create a provenance attestation for an image.""" - from auth import login_required as _lr - # Check identity exists private_key = _get_private_key() has_identity = private_key is not None if request.method == "POST": if not has_identity: - flash("No identity configured. Run 'soosef init' or generate one from the Keys page.", "error") + flash( + "No identity configured. Run 'soosef init' or generate one from the Keys page.", + "error", + ) return redirect(url_for("attest.attest")) image_file = request.files.get("image") @@ -92,6 +151,19 @@ def attest(): storage = _get_storage() index = storage.append_record(attestation.record) + # Wrap in hash chain if enabled + chain_record = None + try: + chain_record = _wrap_in_chain(attestation.record, private_key, metadata) + except Exception as e: + import logging + + logging.getLogger(__name__).warning("Chain wrapping failed: %s", e) + flash( + "Attestation saved, but chain wrapping failed. " "Check chain configuration.", + "warning", + ) + # Save our own identity so we can look it up during verification from verisoo.models import Identity from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat @@ -126,6 +198,7 @@ def attest(): exif_metadata=record.metadata, index=index, filename=image_file.filename, + chain_index=chain_record.chain_index if chain_record else None, ) except Exception as e: @@ -135,9 +208,67 @@ def attest(): return render_template("attest/attest.html", has_identity=has_identity) +def _verify_image(image_data: bytes) -> dict: + """Run the full verification pipeline against the attestation log. + + Returns a dict with keys: + query_hashes — ImageHashes object from verisoo + matches — list of match dicts (record, match_type, distances, attestor_name) + record_count — total records searched + """ + from verisoo.hashing import compute_all_distances, hash_image, is_same_image + + query_hashes = hash_image(image_data) + storage = _get_storage() + stats = storage.get_stats() + + if stats.record_count == 0: + return {"query_hashes": query_hashes, "matches": [], "record_count": 0} + + # Exact SHA-256 match first + matches = [] + exact_records = storage.get_records_by_image_sha256(query_hashes.sha256) + for record in exact_records: + matches.append({"record": record, "match_type": "exact", "distances": {}}) + + # Perceptual fallback + if not matches and query_hashes.phash: + all_records = [storage.get_record(i) for i in range(stats.record_count)] + for record in all_records: + same, match_type = is_same_image( + query_hashes, record.image_hashes, perceptual_threshold=10 + ) + if same: + distances = compute_all_distances(query_hashes, record.image_hashes) + matches.append( + { + "record": record, + "match_type": match_type or "perceptual", + "distances": distances, + } + ) + + # Resolve attestor identities + for match in matches: + try: + identity = storage.load_identity(match["record"].attestor_fingerprint) + match["attestor_name"] = ( + identity.metadata.get("name", "Unknown") if identity else "Unknown" + ) + except Exception: + match["attestor_name"] = "Unknown" + + return {"query_hashes": query_hashes, "matches": matches, "record_count": stats.record_count} + + @bp.route("/verify", methods=["GET", "POST"]) def verify(): - """Verify an image against attestation records.""" + """Verify an image against attestation records. + + Intentionally unauthenticated: third parties (editors, fact-checkers, courts) + must be able to verify provenance without having an account on this instance. + The log read here is read-only and reveals no key material. + """ if request.method == "POST": image_file = request.files.get("image") if not image_file or not image_file.filename: @@ -149,18 +280,11 @@ def verify(): return redirect(url_for("attest.verify")) try: - image_data = image_file.read() + result = _verify_image(image_file.read()) + query_hashes = result["query_hashes"] + matches = result["matches"] - from verisoo.hashing import hash_image, compute_all_distances, is_same_image - - # Compute hashes of the uploaded image - query_hashes = hash_image(image_data) - - # Search the attestation log - storage = _get_storage() - stats = storage.get_stats() - - if stats.record_count == 0: + if result["record_count"] == 0: return render_template( "attest/verify_result.html", found=False, @@ -170,44 +294,14 @@ def verify(): matches=[], ) - # Search by SHA-256 first (exact match) - matches = [] - exact_records = storage.get_records_by_image_sha256(query_hashes.sha256) - for record in exact_records: - matches.append({ - "record": record, - "match_type": "exact", - "distances": {}, - }) - - # Then search by perceptual hash if no exact match - if not matches and query_hashes.phash: - all_records = [storage.get_record(i) for i in range(stats.record_count)] - for record in all_records: - same, match_type = is_same_image( - query_hashes, record.image_hashes, perceptual_threshold=10 - ) - if same: - distances = compute_all_distances(query_hashes, record.image_hashes) - matches.append({ - "record": record, - "match_type": match_type or "perceptual", - "distances": distances, - }) - - # Resolve attestor identities - for match in matches: - record = match["record"] - try: - identity = storage.load_identity(record.attestor_fingerprint) - match["attestor_name"] = identity.metadata.get("name", "Unknown") if identity else "Unknown" - except Exception: - match["attestor_name"] = "Unknown" - return render_template( "attest/verify_result.html", found=len(matches) > 0, - message=f"Found {len(matches)} matching attestation(s)." if matches else "No matching attestations found.", + message=( + f"Found {len(matches)} matching attestation(s)." + if matches + else "No matching attestations found." + ), query_hashes=query_hashes, filename=image_file.filename, matches=matches, @@ -220,7 +314,110 @@ def verify(): return render_template("attest/verify.html") +@bp.route("/verify/receipt", methods=["POST"]) +def verify_receipt(): + """Return a downloadable JSON verification receipt for court or legal use. + + Accepts the same image upload as /verify. Returns a JSON file attachment + containing image hashes, all matching attestation records with full metadata, + the verification timestamp, and the verifier hostname. + + Intentionally unauthenticated — same access policy as /verify. + """ + image_file = request.files.get("image") + if not image_file or not image_file.filename: + return Response( + json.dumps({"error": "No image provided"}), + status=400, + mimetype="application/json", + ) + + if not _allowed_image(image_file.filename): + return Response( + json.dumps({"error": "Unsupported image format"}), + status=400, + mimetype="application/json", + ) + + try: + result = _verify_image(image_file.read()) + except Exception as e: + return Response( + json.dumps({"error": f"Verification failed: {e}"}), + status=500, + mimetype="application/json", + ) + + query_hashes = result["query_hashes"] + matches = result["matches"] + verification_ts = datetime.now(UTC).isoformat() + + try: + verifier_instance = socket.gethostname() + except Exception: + verifier_instance = "unknown" + + matching_records = [] + for match in matches: + record = match["record"] + rec_entry: dict = { + "match_type": match["match_type"], + "attestor_fingerprint": record.attestor_fingerprint, + "attestor_name": match.get("attestor_name", "Unknown"), + "attested_at": record.timestamp.isoformat() if record.timestamp else None, + "record_id": str(record.record_id), + "short_id": str(record.short_id) if hasattr(record, "short_id") else None, + } + # Include perceptual hash distances when present (perceptual matches only) + if match.get("distances"): + rec_entry["hash_distances"] = {k: int(v) for k, v in match["distances"].items()} + # Optional fields + if getattr(record, "captured_at", None): + rec_entry["captured_at"] = record.captured_at.isoformat() + if getattr(record, "location", None): + rec_entry["location"] = record.location + if getattr(record, "metadata", None): + # Exclude any key material — only human-readable metadata + safe_meta = { + k: v + for k, v in record.metadata.items() + if k in ("caption", "location_name", "device", "software") + } + if safe_meta: + rec_entry["metadata"] = safe_meta + matching_records.append(rec_entry) + + receipt = { + "schema_version": "1", + "verification_timestamp": verification_ts, + "verifier_instance": verifier_instance, + "queried_filename": image_file.filename, + "image_hash": { + "sha256": query_hashes.sha256, + "phash": query_hashes.phash, + "dhash": getattr(query_hashes, "dhash", None), + }, + "records_searched": result["record_count"], + "matches_found": len(matching_records), + "matching_records": matching_records, + } + + receipt_json = json.dumps(receipt, indent=2, ensure_ascii=False) + safe_filename = ( + image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename + ) + download_name = f"receipt_{safe_filename}_{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}.json" + + return Response( + receipt_json, + status=200, + mimetype="application/json", + headers={"Content-Disposition": f'attachment; filename="{download_name}"'}, + ) + + @bp.route("/attest/log") +@login_required def log(): """List recent attestations.""" try: diff --git a/frontends/web/blueprints/fieldkit.py b/frontends/web/blueprints/fieldkit.py index f2d9a76..c4a2ada 100644 --- a/frontends/web/blueprints/fieldkit.py +++ b/frontends/web/blueprints/fieldkit.py @@ -4,10 +4,14 @@ Fieldkit blueprint — killswitch, dead man's switch, status dashboard. from flask import Blueprint, flash, redirect, render_template, request, url_for +from auth import admin_required, get_username, login_required +from soosef.audit import log_action + bp = Blueprint("fieldkit", __name__, url_prefix="/fieldkit") @bp.route("/") +@login_required def status(): """Fieldkit status dashboard — all monitors and system health.""" from soosef.fieldkit.deadman import DeadmanSwitch @@ -20,6 +24,7 @@ def status(): @bp.route("/killswitch", methods=["GET", "POST"]) +@admin_required def killswitch(): """Killswitch arming and firing UI.""" if request.method == "POST": @@ -27,7 +32,22 @@ def killswitch(): if action == "fire" and request.form.get("confirm") == "CONFIRM-PURGE": from soosef.fieldkit.killswitch import PurgeScope, execute_purge + actor = get_username() result = execute_purge(PurgeScope.ALL, reason="web_ui") + outcome = "success" if result.fully_purged else "failure" + failed_steps = ", ".join(name for name, _ in result.steps_failed) + log_action( + actor=actor, + action="killswitch.fire", + target="all", + outcome=outcome, + source="web", + detail=( + f"steps_completed={len(result.steps_completed)} " + f"steps_failed={len(result.steps_failed)}" + + (f" failed={failed_steps}" if failed_steps else "") + ), + ) flash( f"Purge executed: {len(result.steps_completed)} steps completed, " f"{len(result.steps_failed)} failed", @@ -39,6 +59,7 @@ def killswitch(): @bp.route("/deadman/checkin", methods=["POST"]) +@login_required def deadman_checkin(): """Record a dead man's switch check-in.""" from soosef.fieldkit.deadman import DeadmanSwitch diff --git a/frontends/web/blueprints/keys.py b/frontends/web/blueprints/keys.py index c0f7248..be2346e 100644 --- a/frontends/web/blueprints/keys.py +++ b/frontends/web/blueprints/keys.py @@ -4,10 +4,14 @@ Key management blueprint — unified view of all key material. from flask import Blueprint, flash, redirect, render_template, request, url_for +from auth import get_username, login_required +from soosef.audit import log_action + bp = Blueprint("keys", __name__, url_prefix="/keys") @bp.route("/") +@login_required def index(): """Key management dashboard.""" from soosef.keystore import KeystoreManager @@ -17,22 +21,60 @@ def index(): @bp.route("/channel/generate", methods=["POST"]) +@login_required def generate_channel(): """Generate a new channel key.""" from soosef.keystore import KeystoreManager ks = KeystoreManager() - key = ks.generate_channel_key() - flash(f"Channel key generated: {key[:8]}...", "success") + try: + key = ks.generate_channel_key() + log_action( + actor=get_username(), + action="key.channel.generate", + target=f"channel:{key[:8]}", + outcome="success", + source="web", + ) + flash(f"Channel key generated: {key[:8]}...", "success") + except Exception as exc: + log_action( + actor=get_username(), + action="key.channel.generate", + target="channel", + outcome="failure", + source="web", + detail=str(exc), + ) + flash(f"Channel key generation failed: {exc}", "error") return redirect(url_for("keys.index")) @bp.route("/identity/generate", methods=["POST"]) +@login_required def generate_identity(): """Generate a new Ed25519 identity.""" from soosef.keystore import KeystoreManager ks = KeystoreManager() - info = ks.generate_identity() - flash(f"Identity generated: {info.fingerprint[:16]}...", "success") + try: + info = ks.generate_identity() + log_action( + actor=get_username(), + action="key.identity.generate", + target=f"identity:{info.fingerprint[:16]}", + outcome="success", + source="web", + ) + flash(f"Identity generated: {info.fingerprint[:16]}...", "success") + except Exception as exc: + log_action( + actor=get_username(), + action="key.identity.generate", + target="identity", + outcome="failure", + source="web", + detail=str(exc), + ) + flash(f"Identity generation failed: {exc}", "error") return redirect(url_for("keys.index")) diff --git a/frontends/web/temp_storage.py b/frontends/web/temp_storage.py index 6426876..ca66b8e 100644 --- a/frontends/web/temp_storage.py +++ b/frontends/web/temp_storage.py @@ -9,17 +9,26 @@ Files are stored in a temp directory with: - {file_id}.data - The actual file data - {file_id}.json - Metadata (filename, timestamp, mime_type, etc.) -IMPORTANT: This module ONLY manages files in the temp_files/ directory. +IMPORTANT: This module ONLY manages files in the temp directory. It does NOT touch instance/ (auth database) or any other directories. + +All temp files are written to ~/.soosef/temp/ (soosef.paths.TEMP_DIR) so +that the killswitch's destroy_temp_files step covers them. """ import json +import os +import platform +import subprocess import time from pathlib import Path from threading import Lock -# Default temp directory (can be overridden) -DEFAULT_TEMP_DIR = Path(__file__).parent / "temp_files" +import soosef.paths as paths + +# Default temp directory — always under ~/.soosef/temp/ so the killswitch +# (which purges paths.TEMP_DIR) can reach every file written here. +DEFAULT_TEMP_DIR: Path = paths.TEMP_DIR # Lock for thread-safe operations _lock = Lock() @@ -28,7 +37,7 @@ _lock = Lock() _temp_dir: Path = DEFAULT_TEMP_DIR -def init(temp_dir: Path | str | None = None): +def init(temp_dir: Path | str | None = None) -> None: """Initialize temp storage with optional custom directory.""" global _temp_dir _temp_dir = Path(temp_dir) if temp_dir else DEFAULT_TEMP_DIR @@ -50,6 +59,35 @@ def _thumb_path(thumb_id: str) -> Path: return _temp_dir / f"{thumb_id}.thumb" +def _secure_delete(path: Path) -> None: + """Overwrite and delete a file. Best-effort on flash storage.""" + if not path.exists(): + return + + if platform.system() == "Linux": + try: + subprocess.run( + ["shred", "-u", "-z", "-n", "3", str(path)], + timeout=30, + capture_output=True, + ) + return + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Fallback: overwrite with zeros then delete + try: + size = path.stat().st_size + with open(path, "r+b") as f: + f.write(b"\x00" * size) + f.flush() + os.fsync(f.fileno()) + path.unlink() + except OSError: + # Last resort: plain unlink so we don't leave data stranded + path.unlink(missing_ok=True) + + def save_temp_file(file_id: str, data: bytes, metadata: dict) -> None: """ Save a temp file with its metadata. @@ -103,12 +141,12 @@ def has_temp_file(file_id: str) -> bool: def delete_temp_file(file_id: str) -> None: - """Delete a temp file and its metadata.""" + """Securely delete a temp file and its metadata.""" init() with _lock: - _data_path(file_id).unlink(missing_ok=True) - _meta_path(file_id).unlink(missing_ok=True) + _secure_delete(_data_path(file_id)) + _secure_delete(_meta_path(file_id)) def save_thumbnail(thumb_id: str, data: bytes) -> None: @@ -134,16 +172,16 @@ def get_thumbnail(thumb_id: str) -> bytes | None: def delete_thumbnail(thumb_id: str) -> None: - """Delete a thumbnail.""" + """Securely delete a thumbnail.""" init() with _lock: - _thumb_path(thumb_id).unlink(missing_ok=True) + _secure_delete(_thumb_path(thumb_id)) def cleanup_expired(max_age_seconds: float) -> int: """ - Delete expired temp files. + Securely delete expired temp files. Args: max_age_seconds: Maximum age in seconds before expiry @@ -165,14 +203,14 @@ def cleanup_expired(max_age_seconds: float) -> int: if now - timestamp > max_age_seconds: file_id = meta_file.stem - _data_path(file_id).unlink(missing_ok=True) - meta_file.unlink(missing_ok=True) + _secure_delete(_data_path(file_id)) + _secure_delete(meta_file) # Also delete thumbnail if exists - _thumb_path(f"{file_id}_thumb").unlink(missing_ok=True) + _secure_delete(_thumb_path(f"{file_id}_thumb")) deleted += 1 except (OSError, json.JSONDecodeError): # Remove corrupted files - meta_file.unlink(missing_ok=True) + _secure_delete(meta_file) deleted += 1 return deleted @@ -180,7 +218,7 @@ def cleanup_expired(max_age_seconds: float) -> int: def cleanup_all() -> int: """ - Delete all temp files. Call on service start/stop. + Securely delete all temp files. Call on service start/stop. Returns: Number of files deleted @@ -192,7 +230,7 @@ def cleanup_all() -> int: with _lock: for f in _temp_dir.iterdir(): if f.is_file(): - f.unlink(missing_ok=True) + _secure_delete(f) deleted += 1 return deleted diff --git a/frontends/web/templates/attest/verify_result.html b/frontends/web/templates/attest/verify_result.html index b3b7715..ebb97ae 100644 --- a/frontends/web/templates/attest/verify_result.html +++ b/frontends/web/templates/attest/verify_result.html @@ -95,12 +95,35 @@ {% endfor %} + {% if found %} +
+
+
Download Verification Receipt
+
+
+

+ Generate a signed JSON receipt for legal or archival use. + Re-upload the same image to produce the downloadable file. +

+
+
+ +
+ +
+
+
+ {% endif %} + diff --git a/pyproject.toml b/pyproject.toml index a1ba002..1e46a22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dependencies = [ "pillow>=10.0.0", "cryptography>=41.0.0", "argon2-cffi>=23.0.0", + "cbor2>=5.6.0", + "uuid-utils>=0.9.0", ] [project.optional-dependencies] diff --git a/src/soosef/audit.py b/src/soosef/audit.py new file mode 100644 index 0000000..9cd1971 --- /dev/null +++ b/src/soosef/audit.py @@ -0,0 +1,108 @@ +""" +Structured audit log for administrative and security-critical actions. + +Writes append-only JSON-lines to ~/.soosef/audit.jsonl. Each line is a +self-contained JSON object so the file can be tailed, grepped, or ingested +by any log aggregator without a parser. + +Entry schema +------------ +{ + "timestamp": "2026-04-01T12:34:56.789012+00:00", # ISO-8601 UTC + "actor": "alice", # username or "cli" for CLI invocations + "action": "user.delete", # dotted hierarchical action name + "target": "user:3", # affected resource (id, fingerprint, path …) + "outcome": "success", # "success" | "failure" + "source": "web", # "web" | "cli" + "detail": "optional extra" # omitted when None +} + +Actions used by SooSeF +---------------------- + user.create Admin created a new user account + user.delete Admin deleted a user account + user.password_reset Admin issued a temporary password for a user + key.channel.generate New channel key generated + key.identity.generate New Ed25519 identity generated + killswitch.fire Emergency purge executed + +The log is intentionally destroyed by the killswitch (AUDIT_LOG is under +BASE_DIR). That is correct for this threat model: if you're wiping the +device you want the log gone too. +""" + +from __future__ import annotations + +import json +import logging +import threading +from datetime import datetime, timezone +from pathlib import Path +from typing import Literal + +import soosef.paths as paths + +logger = logging.getLogger(__name__) + +# Serialisation lock — multiple Gunicorn workers write to the same file; +# the lock only protects within a single process, but append writes to a +# local filesystem are atomic at the OS level for small payloads, so the +# worst case across workers is interleaved bytes within a single line, +# which is extremely unlikely given the small line sizes here. A proper +# multi-process solution would use a logging socket handler; this is +# acceptable for the offline-first threat model. +_lock = threading.Lock() + +Outcome = Literal["success", "failure"] +Source = Literal["web", "cli"] + + +def log_action( + actor: str, + action: str, + target: str, + outcome: Outcome, + source: Source, + detail: str | None = None, +) -> None: + """ + Append one audit entry to ~/.soosef/audit.jsonl. + + This function never raises — a failed write is logged to stderr and + silently swallowed so that audit log failures do not block user-facing + operations. + + Args: + actor: Username performing the action, or ``"cli"`` for CLI calls. + action: Dotted action name, e.g. ``"user.delete"``. + target: Affected resource identifier, e.g. ``"user:3"`` or a key + fingerprint prefix. + outcome: ``"success"`` or ``"failure"``. + source: ``"web"`` or ``"cli"``. + detail: Optional free-text annotation (avoid PII where possible). + """ + entry: dict[str, str] = { + "timestamp": datetime.now(tz=timezone.utc).isoformat(), + "actor": actor, + "action": action, + "target": target, + "outcome": outcome, + "source": source, + } + if detail is not None: + entry["detail"] = detail + + line = json.dumps(entry, ensure_ascii=False) + "\n" + + try: + log_path: Path = paths.AUDIT_LOG + # Ensure parent directory exists (BASE_DIR should already exist after + # ensure_dirs(), but be defensive in case audit is called early). + log_path.parent.mkdir(parents=True, exist_ok=True) + + with _lock: + with log_path.open("a", encoding="utf-8") as fh: + fh.write(line) + except OSError as exc: + # Never crash a user-facing request because audit logging failed. + logger.error("audit: failed to write entry — %s", exc) diff --git a/src/soosef/cli.py b/src/soosef/cli.py index 1f590ef..4e63d3f 100644 --- a/src/soosef/cli.py +++ b/src/soosef/cli.py @@ -7,10 +7,16 @@ plus native SooSeF commands for init, fieldkit, keys, and serve. from __future__ import annotations +import logging +import threading +import time +from ipaddress import IPv4Address from pathlib import Path import click +logger = logging.getLogger(__name__) + @click.group() @click.option( @@ -105,10 +111,66 @@ def serve(host, port, no_https, debug): _generate_self_signed_cert(SSL_CERT, SSL_KEY) ssl_context = (str(SSL_CERT), str(SSL_KEY)) + # Start the dead man's switch enforcement background thread. + # The thread checks every 60 seconds and fires the killswitch if overdue. + # It is a daemon thread — it dies automatically when the Flask process exits. + # We always start it; the loop itself only acts when the switch is armed, + # so it is safe to run even when the switch has never been configured. + _start_deadman_thread(interval_seconds=60) + click.echo(f"Starting SooSeF on {'https' if ssl_context else 'http'}://{host}:{port}") app.run(host=host, port=port, debug=debug, ssl_context=ssl_context) +def _deadman_enforcement_loop(interval_seconds: int = 60) -> None: + """ + Background enforcement loop for the dead man's switch. + + Runs in a daemon thread started by ``serve``. Calls ``DeadmanSwitch.check()`` + every *interval_seconds*. If the switch fires, ``check()`` calls + ``execute_purge`` internally and the process will lose its key material; + the thread then exits because there is nothing left to guard. + + The loop re-evaluates ``is_armed()`` on every tick so it activates + automatically if the switch is armed after the server starts. + """ + from soosef.fieldkit.deadman import DeadmanSwitch + + dm = DeadmanSwitch() + logger.debug("Dead man's switch enforcement loop started (interval=%ds)", interval_seconds) + + while True: + time.sleep(interval_seconds) + try: + if dm.is_armed(): + fired = dm.should_fire() + dm.check() + if fired: + # Killswitch has been triggered; no point continuing. + logger.warning("Dead man's switch fired — enforcement loop exiting") + return + except Exception: + logger.exception("Dead man's switch enforcement loop encountered an error") + + +def _start_deadman_thread(interval_seconds: int = 60) -> threading.Thread | None: + """ + Start the dead man's switch enforcement daemon thread. + + Returns the thread object, or None if the thread could not be started. + The thread is a daemon so it will not block process exit. + """ + t = threading.Thread( + target=_deadman_enforcement_loop, + args=(interval_seconds,), + name="deadman-enforcement", + daemon=True, + ) + t.start() + logger.info("Dead man's switch enforcement thread started (interval=%ds)", interval_seconds) + return t + + def _generate_self_signed_cert(cert_path: Path, key_path: Path) -> None: """Generate a self-signed certificate for development/local use.""" from cryptography import x509 @@ -118,9 +180,11 @@ def _generate_self_signed_cert(cert_path: Path, key_path: Path) -> None: from datetime import datetime, timedelta, UTC key = rsa.generate_private_key(public_exponent=65537, key_size=2048) - subject = issuer = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, "SooSeF Local"), - ]) + subject = issuer = x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, "SooSeF Local"), + ] + ) cert = ( x509.CertificateBuilder() .subject_name(subject) @@ -130,16 +194,22 @@ def _generate_self_signed_cert(cert_path: Path, key_path: Path) -> None: .not_valid_before(datetime.now(UTC)) .not_valid_after(datetime.now(UTC) + timedelta(days=365)) .add_extension( - x509.SubjectAlternativeName([ - x509.DNSName("localhost"), - x509.IPAddress(b"\x7f\x00\x00\x01".__class__(0x7F000001)), - ]), + x509.SubjectAlternativeName( + [ + x509.DNSName("localhost"), + x509.IPAddress(IPv4Address("127.0.0.1")), + ] + ), critical=False, ) .sign(key, hashes.SHA256()) ) key_path.write_bytes( - key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.NoEncryption()) + key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) ) key_path.chmod(0o600) cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM)) @@ -161,6 +231,7 @@ try: for name, cmd in stegasoo_cli.commands.items(): stego.add_command(cmd, name) except ImportError: + @stego.command() def unavailable(): """Stegasoo is not installed.""" @@ -182,12 +253,189 @@ try: for name, cmd in verisoo_cli.commands.items(): attest.add_command(cmd, name) except ImportError: + @attest.command() def unavailable(): """Verisoo is not installed.""" click.echo("Error: verisoo package not found. Install with: pip install verisoo") +def _attest_file( + file_path: Path, + private_key, + storage, + caption: str | None, + auto_exif: bool = True, +) -> None: + """Attest a single file and store the result. + + Shared by ``attest batch``. Raises on failure so the caller can decide + whether to abort or continue. + + Args: + file_path: Path to the image file to attest. + private_key: Ed25519 private key loaded via verisoo.crypto. + storage: verisoo LocalStorage instance. + caption: Optional caption to embed in metadata. + auto_exif: Whether to extract EXIF metadata from the image. + """ + import hashlib + + from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, + load_pem_private_key, + ) + from verisoo.attestation import create_attestation + from verisoo.models import Identity + + from soosef.config import SoosefConfig + from soosef.federation.chain import ChainStore + from soosef.paths import CHAIN_DIR, IDENTITY_PRIVATE_KEY + + image_data = file_path.read_bytes() + + metadata: dict = {} + if caption: + metadata["caption"] = caption + + attestation = create_attestation( + image_data=image_data, + private_key=private_key, + metadata=metadata if metadata else None, + auto_exif=auto_exif, + ) + + storage.append_record(attestation.record) + + # Persist the local identity so verification can resolve the attestor name. + pub_bytes = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + identity = Identity( + public_key=pub_bytes, + fingerprint=attestation.record.attestor_fingerprint, + metadata={"name": "SooSeF Local Identity"}, + ) + try: + storage.save_identity(identity) + except Exception: + pass # Already exists — safe to ignore. + + # Wrap in the hash chain if enabled. + config = SoosefConfig.load() + if config.chain_enabled and config.chain_auto_wrap and IDENTITY_PRIVATE_KEY.exists(): + record_bytes = ( + attestation.record.to_bytes() + if hasattr(attestation.record, "to_bytes") + else str(attestation.record).encode() + ) + content_hash = hashlib.sha256(record_bytes).digest() + + priv_pem = IDENTITY_PRIVATE_KEY.read_bytes() + chain_key = load_pem_private_key(priv_pem, password=None) + + chain_metadata: dict = {} + if caption: + chain_metadata["caption"] = caption + + ChainStore(CHAIN_DIR).append( + content_hash=content_hash, + content_type="verisoo/attestation-v1", + private_key=chain_key, + metadata=chain_metadata, + ) + + +# ── Default extensions for batch attestation ────────────────────────────────── + +_DEFAULT_EXTENSIONS: tuple[str, ...] = ("jpg", "jpeg", "png", "tiff", "tif", "webp") + + +@attest.command("batch") +@click.argument("directory", type=click.Path(exists=True, file_okay=False, path_type=Path)) +@click.option("--caption", default=None, help="Shared caption to embed in every attestation.") +@click.option( + "--extensions", + default=",".join(_DEFAULT_EXTENSIONS), + show_default=True, + help="Comma-separated list of file extensions to include (without leading dot).", +) +@click.option( + "--no-exif", + is_flag=True, + help="Disable automatic EXIF extraction.", +) +def batch(directory: Path, caption: str | None, extensions: str, no_exif: bool) -> None: + """Attest all matching images in DIRECTORY. + + Iterates over every file whose extension matches --extensions, attests + each one, and prints a running progress line. Failures are noted and + reported in the final summary — the batch continues on individual errors. + + Example: + + soosef attest batch ./field-photos --caption "Kyiv, 2026-04-01" + + soosef attest batch ./docs --extensions pdf,png --no-exif + """ + from verisoo.crypto import load_private_key + from verisoo.storage import LocalStorage + + from soosef.paths import ATTESTATIONS_DIR, IDENTITY_PRIVATE_KEY + + # Validate identity. + if not IDENTITY_PRIVATE_KEY.exists(): + click.echo( + "Error: No identity configured. Run 'soosef init' first.", + err=True, + ) + raise SystemExit(1) + + private_key = load_private_key(IDENTITY_PRIVATE_KEY) + storage = LocalStorage(base_path=ATTESTATIONS_DIR) + auto_exif = not no_exif + + # Collect matching files. + exts = {e.strip().lower().lstrip(".") for e in extensions.split(",") if e.strip()} + files: list[Path] = sorted( + f for f in directory.iterdir() if f.is_file() and f.suffix.lstrip(".").lower() in exts + ) + + if not files: + click.echo( + f"No matching files found in {directory} (extensions: {', '.join(sorted(exts))})" + ) + return + + total = len(files) + failures: list[tuple[str, str]] = [] + + for i, file_path in enumerate(files, start=1): + click.echo(f"Attesting {i}/{total}: {file_path.name} ... ", nl=False) + try: + _attest_file( + file_path=file_path, + private_key=private_key, + storage=storage, + caption=caption, + auto_exif=auto_exif, + ) + click.echo("done") + except Exception as exc: + click.echo("FAILED") + logger.debug("Attestation failed for %s: %s", file_path.name, exc, exc_info=True) + failures.append((file_path.name, str(exc))) + + # Summary. + succeeded = total - len(failures) + click.echo() + click.echo(f"{succeeded} file(s) attested, {len(failures)} failure(s).") + if failures: + click.echo("Failures:", err=True) + for name, reason in failures: + click.echo(f" {name}: {reason}", err=True) + raise SystemExit(1) + + # ── Fieldkit sub-commands ─────────────────────────────────────────── @@ -207,12 +455,18 @@ def status(): ks_status = ks.status() click.echo("=== SooSeF Fieldkit Status ===") - click.echo(f"Identity: {'Active (' + ks_status.identity_fingerprint[:16] + '...)' if ks_status.has_identity else 'None'}") - click.echo(f"Channel Key: {'Active (' + ks_status.channel_fingerprint[:16] + '...)' if ks_status.has_channel_key else 'None'}") + click.echo( + f"Identity: {'Active (' + ks_status.identity_fingerprint[:16] + '...)' if ks_status.has_identity else 'None'}" + ) + click.echo( + f"Channel Key: {'Active (' + ks_status.channel_fingerprint[:16] + '...)' if ks_status.has_channel_key else 'None'}" + ) dm = DeadmanSwitch() dm_status = dm.status() - click.echo(f"Dead Man: {'Armed (overdue!)' if dm_status['overdue'] else 'Armed' if dm_status['armed'] else 'Disarmed'}") + click.echo( + f"Dead Man: {'Armed (overdue!)' if dm_status['overdue'] else 'Armed' if dm_status['armed'] else 'Disarmed'}" + ) @fieldkit.command() @@ -244,6 +498,220 @@ def checkin(): click.echo("Check-in recorded.") +@fieldkit.command("check-deadman") +def check_deadman(): + """Run the dead man's switch check — fires killswitch if overdue. + + Safe to call from cron or systemd. Exits with status 0 if the switch + is disarmed or not yet overdue. Exits with status 2 if the switch fired + and the killswitch was triggered (so cron/systemd can alert on it). + Exits with status 1 on unexpected errors. + """ + from soosef.fieldkit.deadman import DeadmanSwitch + + dm = DeadmanSwitch() + + if not dm.is_armed(): + click.echo("Dead man's switch is not armed — nothing to do.") + return + + fired = dm.should_fire() + try: + dm.check() + except Exception as exc: + click.echo(f"Error running dead man's check: {exc}", err=True) + raise SystemExit(1) + + if fired: + click.echo( + "DEAD MAN'S SWITCH EXPIRED — killswitch triggered.", + err=True, + ) + raise SystemExit(2) + + s = dm.status() + if s["overdue"]: + click.echo( + f"Dead man's switch is OVERDUE (last check-in: {s['last_checkin']}) " + f"— grace period in effect, will fire soon.", + err=True, + ) + else: + click.echo(f"Dead man's switch OK. Next due: {s.get('next_due', 'unknown')}") + + +# ── Fieldkit: geofence sub-commands ───────────────────────────── + + +@fieldkit.group() +def geofence(): + """Geofence configuration and checks.""" + pass + + +@geofence.command("set") +@click.option("--lat", required=True, type=float, help="Fence center latitude") +@click.option("--lon", required=True, type=float, help="Fence center longitude") +@click.option("--radius", required=True, type=float, help="Fence radius in meters") +@click.option("--name", default="default", show_default=True, help="Human-readable fence name") +def geofence_set(lat, lon, radius, name): + """Set the geofence — saves center and radius to ~/.soosef/fieldkit/geofence.json.""" + from soosef.fieldkit.geofence import GeoCircle, save_fence + + if radius <= 0: + click.echo("Error: --radius must be a positive number of meters.", err=True) + raise SystemExit(1) + if not (-90.0 <= lat <= 90.0): + click.echo("Error: --lat must be between -90 and 90.", err=True) + raise SystemExit(1) + if not (-180.0 <= lon <= 180.0): + click.echo("Error: --lon must be between -180 and 180.", err=True) + raise SystemExit(1) + + fence = GeoCircle(lat=lat, lon=lon, radius_m=radius, name=name) + save_fence(fence) + click.echo(f"Geofence '{name}' set: center ({lat}, {lon}), radius {radius} m") + + +@geofence.command("check") +@click.option("--lat", required=True, type=float, help="Current latitude to check") +@click.option("--lon", required=True, type=float, help="Current longitude to check") +def geofence_check(lat, lon): + """Check whether a point is inside the configured geofence. + + Exit codes: 0 = inside fence, 1 = outside fence, 2 = no fence configured. + """ + from soosef.fieldkit.geofence import haversine_distance, is_inside, load_fence + + fence = load_fence() + if fence is None: + click.echo("No geofence configured. Run 'soosef fieldkit geofence set' first.", err=True) + raise SystemExit(2) + + inside = is_inside(fence, lat, lon) + distance = haversine_distance(fence.lat, fence.lon, lat, lon) + status = "INSIDE" if inside else "OUTSIDE" + click.echo( + f"{status} fence '{fence.name}' " + f"(distance: {distance:.1f} m, radius: {fence.radius_m} m)" + ) + raise SystemExit(0 if inside else 1) + + +@geofence.command("clear") +def geofence_clear(): + """Remove the geofence configuration.""" + from soosef.fieldkit.geofence import clear_fence + + removed = clear_fence() + if removed: + click.echo("Geofence cleared.") + else: + click.echo("No geofence was configured.") + + +# ── Fieldkit: USB sub-commands ──────────────────────────────────── + + +@fieldkit.group() +def usb(): + """USB device whitelist management.""" + pass + + +def _enumerate_usb_devices() -> list[dict[str, str]]: + """Return a list of currently connected USB devices. + + Each dict has keys: device_id (vid:pid), vendor, model. + Requires pyudev (Linux only). + """ + try: + import pyudev + except ImportError: + raise RuntimeError("pyudev not available — USB commands require Linux + pyudev") + + context = pyudev.Context() + devices = [] + seen: set[str] = set() + for device in context.list_devices(subsystem="usb"): + vid = device.get("ID_VENDOR_ID", "") + pid = device.get("ID_MODEL_ID", "") + if not vid or not pid: + continue + device_id = f"{vid}:{pid}" + if device_id in seen: + continue + seen.add(device_id) + devices.append( + { + "device_id": device_id, + "vendor": device.get("ID_VENDOR", "unknown"), + "model": device.get("ID_MODEL", "unknown"), + } + ) + return devices + + +@usb.command("snapshot") +def usb_snapshot(): + """Save currently connected USB devices as the whitelist. + + Overwrites ~/.soosef/fieldkit/usb/whitelist.json with all USB devices + currently visible on the system. Run this once on a known-good machine. + """ + from soosef.fieldkit.usb_monitor import save_whitelist + + try: + devices = _enumerate_usb_devices() + except RuntimeError as exc: + click.echo(f"Error: {exc}", err=True) + raise SystemExit(1) + + device_ids = {d["device_id"] for d in devices} + save_whitelist(device_ids) + + click.echo(f"Saved {len(device_ids)} device(s) to USB whitelist:") + for d in sorted(devices, key=lambda x: x["device_id"]): + click.echo(f" {d['device_id']} {d['vendor']} {d['model']}") + + +@usb.command("check") +def usb_check(): + """Compare connected USB devices against the whitelist. + + Exit codes: 0 = all devices known, 1 = unknown device(s) detected, + 2 = no whitelist configured (run 'soosef fieldkit usb snapshot' first). + """ + from soosef.fieldkit.usb_monitor import load_whitelist + + whitelist = load_whitelist() + if not whitelist: + from soosef.paths import USB_WHITELIST + + if not USB_WHITELIST.exists(): + click.echo( + "No USB whitelist found. Run 'soosef fieldkit usb snapshot' first.", err=True + ) + raise SystemExit(2) + + try: + devices = _enumerate_usb_devices() + except RuntimeError as exc: + click.echo(f"Error: {exc}", err=True) + raise SystemExit(1) + + unknown = [d for d in devices if d["device_id"] not in whitelist] + + if not unknown: + click.echo(f"All {len(devices)} connected device(s) are whitelisted.") + raise SystemExit(0) + + click.echo(f"WARNING: {len(unknown)} unknown device(s) detected:", err=True) + for d in unknown: + click.echo(f" {d['device_id']} {d['vendor']} {d['model']}", err=True) + raise SystemExit(1) + + # ── Keys sub-commands ─────────────────────────────────────────────── @@ -286,3 +754,316 @@ def import_keys(bundle, password): imported = import_bundle(bundle, IDENTITY_DIR, CHANNEL_KEY_FILE, password.encode()) click.echo(f"Imported: {', '.join(imported.keys())}") + + +@keys.command("rotate-identity") +@click.confirmation_option( + prompt="This will archive the current identity and generate a new keypair. Continue?" +) +def rotate_identity(): + """Rotate the Ed25519 identity keypair — archive old, generate new. + + The current private and public key are preserved in a timestamped + archive directory under ~/.soosef/identity/archived/ so that + previously signed attestations can still be verified with the old key. + + After rotation, notify all collaborators of the new fingerprint so + they can update their trusted-key lists. + """ + from soosef.exceptions import KeystoreError + from soosef.keystore.manager import KeystoreManager + + ks = KeystoreManager() + try: + result = ks.rotate_identity() + except KeystoreError as exc: + click.echo(f"Error: {exc}", err=True) + raise SystemExit(1) + + click.echo("Identity rotated successfully.") + click.echo(f" Old fingerprint: {result.old_fingerprint}") + click.echo(f" New fingerprint: {result.new_fingerprint}") + click.echo(f" Archive: {result.archive_path}") + click.echo() + click.echo( + "IMPORTANT: Notify all collaborators of your new fingerprint so they can " + "update their trusted-key lists. Attestations signed with the old key " + "remain verifiable using the archived public key." + ) + + +@keys.command("rotate-channel") +@click.confirmation_option( + prompt="This will archive the current channel key and generate a new one. Continue?" +) +def rotate_channel(): + """Rotate the Stegasoo channel key — archive old, generate new. + + The current channel key is preserved in a timestamped archive directory + under ~/.soosef/stegasoo/archived/ before the new key is generated. + + After rotation, all parties sharing this channel must receive the new + key out-of-band before they can decode new messages. + """ + from soosef.exceptions import KeystoreError + from soosef.keystore.manager import KeystoreManager + + ks = KeystoreManager() + try: + result = ks.rotate_channel_key() + except KeystoreError as exc: + click.echo(f"Error: {exc}", err=True) + raise SystemExit(1) + + click.echo("Channel key rotated successfully.") + click.echo(f" Old fingerprint: {result.old_fingerprint}") + click.echo(f" New fingerprint: {result.new_fingerprint}") + click.echo(f" Archive: {result.archive_path}") + click.echo() + click.echo( + "IMPORTANT: Distribute the new channel key to all channel participants " + "out-of-band. Messages encoded with the old key cannot be decoded " + "with the new one." + ) + + +# ── Chain sub-commands ───────────────────────────────────────────── + + +@main.group() +def chain(): + """Attestation hash chain operations.""" + pass + + +@chain.command() +@click.pass_context +def status(ctx): + """Show chain status — head index, chain ID, record count.""" + from soosef.federation.chain import ChainStore + from soosef.paths import CHAIN_DIR + + store = ChainStore(CHAIN_DIR) + state = store.state() + + if state is None: + click.echo("Chain is empty — no records yet.") + click.echo("Attest an image or run 'soosef chain backfill' to populate.") + return + + json_out = ctx.obj.get("json", False) + if json_out: + import json + + click.echo( + json.dumps( + { + "chain_id": state.chain_id.hex(), + "head_index": state.head_index, + "head_hash": state.head_hash.hex(), + "record_count": state.record_count, + "created_at": state.created_at, + "last_append_at": state.last_append_at, + } + ) + ) + else: + click.echo("=== Attestation Chain ===") + click.echo(f"Chain ID: {state.chain_id.hex()[:32]}...") + click.echo(f"Records: {state.record_count}") + click.echo(f"Head index: {state.head_index}") + click.echo(f"Head hash: {state.head_hash.hex()[:32]}...") + click.echo(f"Created: {_format_us_timestamp(state.created_at)}") + click.echo(f"Last append: {_format_us_timestamp(state.last_append_at)}") + + +@chain.command() +def verify(): + """Verify chain integrity — check all hashes and signatures.""" + from soosef.federation.chain import ChainStore + from soosef.paths import CHAIN_DIR + + store = ChainStore(CHAIN_DIR) + state = store.state() + + if state is None: + click.echo("Chain is empty — nothing to verify.") + return + + click.echo(f"Verifying {state.record_count} records...") + try: + store.verify_chain() + click.echo("Chain integrity OK — all hashes and signatures valid.") + except Exception as e: + click.echo(f"INTEGRITY VIOLATION: {e}", err=True) + raise SystemExit(1) + + +@chain.command() +@click.argument("index", type=int) +@click.pass_context +def show(ctx, index): + """Show a specific chain record by index.""" + from soosef.exceptions import ChainError + from soosef.federation.chain import ChainStore + from soosef.federation.serialization import compute_record_hash + from soosef.paths import CHAIN_DIR + + store = ChainStore(CHAIN_DIR) + try: + record = store.get(index) + except ChainError as e: + click.echo(f"Error: {e}", err=True) + raise SystemExit(1) + + json_out = ctx.obj.get("json", False) + if json_out: + import json + + click.echo( + json.dumps( + { + "version": record.version, + "record_id": record.record_id.hex(), + "chain_index": record.chain_index, + "prev_hash": record.prev_hash.hex(), + "content_hash": record.content_hash.hex(), + "content_type": record.content_type, + "metadata": record.metadata, + "claimed_ts": record.claimed_ts, + "signer_pubkey": record.signer_pubkey.hex(), + "record_hash": compute_record_hash(record).hex(), + } + ) + ) + else: + click.echo(f"=== Record #{record.chain_index} ===") + click.echo(f"Record ID: {record.record_id.hex()}") + click.echo(f"Record hash: {compute_record_hash(record).hex()[:32]}...") + click.echo(f"Prev hash: {record.prev_hash.hex()[:32]}...") + click.echo(f"Content hash: {record.content_hash.hex()[:32]}...") + click.echo(f"Content type: {record.content_type}") + click.echo(f"Timestamp: {_format_us_timestamp(record.claimed_ts)}") + click.echo(f"Signer: {record.signer_pubkey.hex()[:32]}...") + if record.metadata: + click.echo(f"Metadata: {record.metadata}") + if record.entropy_witnesses: + ew = record.entropy_witnesses + click.echo( + f"Entropy: uptime={ew.sys_uptime:.1f}s " + f"entropy_avail={ew.proc_entropy} " + f"boot_id={ew.boot_id[:16]}..." + ) + + +@chain.command() +@click.option("-n", "--count", default=20, help="Number of records to show") +@click.pass_context +def log(ctx, count): + """Show recent chain records (newest first).""" + from soosef.federation.chain import ChainStore + from soosef.federation.serialization import compute_record_hash + from soosef.paths import CHAIN_DIR + + store = ChainStore(CHAIN_DIR) + state = store.state() + + if state is None: + click.echo("Chain is empty.") + return + + start = max(0, state.head_index - count + 1) + records = list(store.iter_records(start, state.head_index)) + records.reverse() # newest first + + click.echo(f"=== Last {len(records)} of {state.record_count} records ===") + click.echo() + for r in records: + ts = _format_us_timestamp(r.claimed_ts) + rhash = compute_record_hash(r).hex()[:16] + caption = r.metadata.get("caption", "") + label = f" — {caption}" if caption else "" + click.echo(f" #{r.chain_index:>5} {ts} {rhash}... {r.content_type}{label}") + + +@chain.command() +@click.confirmation_option(prompt="Backfill existing Verisoo attestations into the chain?") +def backfill(): + """Import existing Verisoo attestations into the hash chain. + + Reads all records from the Verisoo attestation log and wraps each one + in a chain record. Backfilled records are marked with metadata + backfilled=true and entropy witnesses reflect migration time. + """ + import hashlib + + from cryptography.hazmat.primitives.serialization import load_pem_private_key + + from soosef.federation.chain import ChainStore + from soosef.paths import ATTESTATIONS_DIR, CHAIN_DIR, IDENTITY_PRIVATE_KEY + + if not IDENTITY_PRIVATE_KEY.exists(): + click.echo("Error: No identity found. Run 'soosef init' first.", err=True) + raise SystemExit(1) + + priv_pem = IDENTITY_PRIVATE_KEY.read_bytes() + private_key = load_pem_private_key(priv_pem, password=None) + + try: + from verisoo.storage import LocalStorage + + storage = LocalStorage(base_path=ATTESTATIONS_DIR) + stats = storage.get_stats() + except Exception as e: + click.echo(f"Error reading Verisoo log: {e}", err=True) + raise SystemExit(1) + + if stats.record_count == 0: + click.echo("No Verisoo attestations to backfill.") + return + + store = ChainStore(CHAIN_DIR) + existing = store.state() + if existing and existing.record_count > 0: + click.echo( + f"Warning: chain already has {existing.record_count} records. " + f"Backfill will append after index {existing.head_index}." + ) + + count = 0 + for i in range(stats.record_count): + try: + record = storage.get_record(i) + record_bytes = ( + record.to_bytes() if hasattr(record, "to_bytes") else str(record).encode() + ) + content_hash = hashlib.sha256(record_bytes).digest() + + original_ts = int(record.timestamp.timestamp() * 1_000_000) if record.timestamp else 0 + metadata = { + "backfilled": True, + "original_ts": original_ts, + "verisoo_index": i, + } + if hasattr(record, "attestor_fingerprint"): + metadata["attestor"] = record.attestor_fingerprint + + store.append( + content_hash=content_hash, + content_type="verisoo/attestation-v1", + private_key=private_key, + metadata=metadata, + ) + count += 1 + except Exception as e: + click.echo(f" Warning: skipped record {i}: {e}") + + click.echo(f"Backfilled {count} attestation(s) into the chain.") + + +def _format_us_timestamp(us: int) -> str: + """Format a Unix microsecond timestamp for display.""" + from datetime import UTC, datetime + + dt = datetime.fromtimestamp(us / 1_000_000, tz=UTC) + return dt.strftime("%Y-%m-%d %H:%M:%S UTC") diff --git a/src/soosef/config.py b/src/soosef/config.py index e556a55..5dccc29 100644 --- a/src/soosef/config.py +++ b/src/soosef/config.py @@ -34,6 +34,10 @@ class SoosefConfig: usb_monitoring_enabled: bool = False tamper_monitoring_enabled: bool = False + # Attestation chain + chain_enabled: bool = True + chain_auto_wrap: bool = True # Auto-wrap verisoo attestations in chain records + # Hardware (RPi) gpio_killswitch_pin: int = 17 gpio_killswitch_hold_seconds: float = 5.0 diff --git a/src/soosef/exceptions.py b/src/soosef/exceptions.py index 409687f..219d09f 100644 --- a/src/soosef/exceptions.py +++ b/src/soosef/exceptions.py @@ -28,3 +28,15 @@ class KillswitchError(FieldkitError): class InitError(SoosefError): """Initialization/setup error.""" + + +class ChainError(SoosefError): + """Hash chain error.""" + + +class ChainIntegrityError(ChainError): + """Chain integrity violation — tampered or corrupted records.""" + + +class ChainAppendError(ChainError): + """Failed to append to chain.""" diff --git a/src/soosef/federation/__init__.py b/src/soosef/federation/__init__.py new file mode 100644 index 0000000..0fa23b5 --- /dev/null +++ b/src/soosef/federation/__init__.py @@ -0,0 +1,17 @@ +""" +Federated attestation system for SooSeF. + +Provides hash-chained attestation records with tamper-evident ordering, +encrypted export bundles, and a Certificate Transparency-inspired +federated append-only log for distributing attestations across an air gap. +""" + +from soosef.federation.chain import ChainStore +from soosef.federation.models import AttestationChainRecord, ChainState, EntropyWitnesses + +__all__ = [ + "AttestationChainRecord", + "ChainState", + "ChainStore", + "EntropyWitnesses", +] diff --git a/src/soosef/federation/chain.py b/src/soosef/federation/chain.py new file mode 100644 index 0000000..af5799c --- /dev/null +++ b/src/soosef/federation/chain.py @@ -0,0 +1,465 @@ +""" +Append-only hash chain store for attestation records. + +Storage format: +- chain.bin: length-prefixed CBOR records (uint32 BE + serialized record) +- state.cbor: chain state checkpoint (performance optimization) + +The canonical state is always derivable from chain.bin. If state.cbor is +corrupted or missing, it is rebuilt by scanning the log. +""" + +from __future__ import annotations + +import fcntl +import hashlib +import os +import struct +import time +from collections.abc import Iterator +from pathlib import Path + +import cbor2 +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + +from soosef.exceptions import ChainAppendError, ChainError, ChainIntegrityError +from soosef.federation.entropy import collect_entropy_witnesses +from soosef.federation.models import AttestationChainRecord, ChainState +from soosef.federation.serialization import ( + canonical_bytes, + compute_record_hash, + deserialize_record, + serialize_record, +) + +# Length prefix: 4 bytes, big-endian unsigned 32-bit +_LEN_STRUCT = struct.Struct(">I") + +# Maximum record size: 1 MiB. Far larger than any valid record (~200-500 bytes +# typically). Prevents OOM from corrupted length prefixes in chain.bin. +MAX_RECORD_SIZE = 1_048_576 + + +def _now_us() -> int: + """Current time as Unix microseconds.""" + return int(time.time() * 1_000_000) + + +class ChainStore: + """Manages an append-only hash chain of attestation records. + + Thread safety: single-writer via fcntl.flock. Multiple readers are safe. + + Offset index: ``_offsets`` maps chain_index (int) to the byte offset of + that record's length prefix in chain.bin. It is built lazily during + ``_rebuild_state()`` and kept up-to-date by ``append()``. The index is + in-memory only — it is reconstructed on every cold load, which is fast + because it is done in the same single pass that already must read every + record to compute the chain state. + """ + + def __init__(self, chain_dir: Path): + self._dir = chain_dir + self._chain_file = chain_dir / "chain.bin" + self._state_file = chain_dir / "state.cbor" + self._dir.mkdir(parents=True, exist_ok=True) + self._state: ChainState | None = None + # chain_index → byte offset of the record's 4-byte length prefix. + # None means the index has not been built yet (cold start). + self._offsets: dict[int, int] | None = None + + def _load_state(self) -> ChainState | None: + """Load cached state from state.cbor.""" + if self._state is not None: + return self._state + if self._state_file.exists(): + data = self._state_file.read_bytes() + m = cbor2.loads(data) + self._state = ChainState( + chain_id=m["chain_id"], + head_index=m["head_index"], + head_hash=m["head_hash"], + record_count=m["record_count"], + created_at=m["created_at"], + last_append_at=m["last_append_at"], + ) + return self._state + # No state file — rebuild if chain.bin exists + if self._chain_file.exists() and self._chain_file.stat().st_size > 0: + return self._rebuild_state() + return None + + def _save_state(self, state: ChainState) -> None: + """Atomically write state checkpoint.""" + m = { + "chain_id": state.chain_id, + "head_index": state.head_index, + "head_hash": state.head_hash, + "record_count": state.record_count, + "created_at": state.created_at, + "last_append_at": state.last_append_at, + } + tmp = self._state_file.with_suffix(".tmp") + tmp.write_bytes(cbor2.dumps(m, canonical=True)) + tmp.rename(self._state_file) + self._state = state + + def _rebuild_state(self) -> ChainState: + """Rebuild state by scanning chain.bin. Used on corruption or first load. + + Also builds the in-memory offset index in the same pass so that no + second scan is ever needed. + """ + genesis = None + last = None + count = 0 + offsets: dict[int, int] = {} + for offset, record in self._iter_raw_with_offsets(): + offsets[record.chain_index] = offset + if count == 0: + genesis = record + last = record + count += 1 + + if genesis is None or last is None: + raise ChainError("Chain file exists but contains no valid records.") + + self._offsets = offsets + + state = ChainState( + chain_id=hashlib.sha256(canonical_bytes(genesis)).digest(), + head_index=last.chain_index, + head_hash=compute_record_hash(last), + record_count=count, + created_at=genesis.claimed_ts, + last_append_at=last.claimed_ts, + ) + self._save_state(state) + return state + + def _iter_raw_with_offsets(self) -> Iterator[tuple[int, AttestationChainRecord]]: + """Iterate all records, yielding (byte_offset, record) pairs. + + ``byte_offset`` is the position of the record's 4-byte length prefix + within chain.bin. Used internally to build and exploit the offset index. + """ + if not self._chain_file.exists(): + return + with open(self._chain_file, "rb") as f: + while True: + offset = f.tell() + len_bytes = f.read(4) + if len(len_bytes) < 4: + break + (record_len,) = _LEN_STRUCT.unpack(len_bytes) + if record_len > MAX_RECORD_SIZE: + raise ChainError( + f"Record length {record_len} exceeds maximum {MAX_RECORD_SIZE} — " + f"chain file may be corrupted" + ) + record_bytes = f.read(record_len) + if len(record_bytes) < record_len: + break + yield offset, deserialize_record(record_bytes) + + def _iter_raw(self) -> Iterator[AttestationChainRecord]: + """Iterate all records from chain.bin without state checks.""" + for _offset, record in self._iter_raw_with_offsets(): + yield record + + def _ensure_offsets(self) -> dict[int, int]: + """Return the offset index, building it if necessary.""" + if self._offsets is None: + # Trigger a full scan; _rebuild_state populates self._offsets. + if self._chain_file.exists() and self._chain_file.stat().st_size > 0: + self._rebuild_state() + else: + self._offsets = {} + return self._offsets # type: ignore[return-value] + + def _read_record_at(self, offset: int) -> AttestationChainRecord: + """Read and deserialize the single record whose length prefix is at *offset*.""" + with open(self._chain_file, "rb") as f: + f.seek(offset) + len_bytes = f.read(4) + if len(len_bytes) < 4: + raise ChainError(f"Truncated length prefix at offset {offset}.") + (record_len,) = _LEN_STRUCT.unpack(len_bytes) + if record_len > MAX_RECORD_SIZE: + raise ChainError( + f"Record length {record_len} exceeds maximum {MAX_RECORD_SIZE} — " + f"chain file may be corrupted" + ) + record_bytes = f.read(record_len) + if len(record_bytes) < record_len: + raise ChainError(f"Truncated record body at offset {offset}.") + return deserialize_record(record_bytes) + + def state(self) -> ChainState | None: + """Get current chain state, or None if chain is empty.""" + return self._load_state() + + def is_empty(self) -> bool: + """True if the chain has no records.""" + return self._load_state() is None + + def head(self) -> AttestationChainRecord | None: + """Return the most recent record, or None if chain is empty.""" + state = self._load_state() + if state is None: + return None + return self.get(state.head_index) + + def get(self, index: int) -> AttestationChainRecord: + """Get a record by chain index. O(1) via offset index. Raises ChainError if not found.""" + offsets = self._ensure_offsets() + if index not in offsets: + raise ChainError(f"Record at index {index} not found.") + return self._read_record_at(offsets[index]) + + def iter_records( + self, start: int = 0, end: int | None = None + ) -> Iterator[AttestationChainRecord]: + """Iterate records in [start, end] range (inclusive). + + Seeks directly to the first record in range via the offset index, so + records before *start* are never read or deserialized. + """ + offsets = self._ensure_offsets() + if not offsets: + return + + # Determine the byte offset to start reading from. + if start in offsets: + seek_offset = offsets[start] + elif start == 0: + seek_offset = 0 + else: + # start index not in chain — find the nearest offset above start. + candidates = [off for idx, off in offsets.items() if idx >= start] + if not candidates: + return + seek_offset = min(candidates) + + with open(self._chain_file, "rb") as f: + f.seek(seek_offset) + while True: + len_bytes = f.read(4) + if len(len_bytes) < 4: + break + (record_len,) = _LEN_STRUCT.unpack(len_bytes) + if record_len > MAX_RECORD_SIZE: + raise ChainError( + f"Record length {record_len} exceeds maximum {MAX_RECORD_SIZE} — " + f"chain file may be corrupted" + ) + record_bytes = f.read(record_len) + if len(record_bytes) < record_len: + break + record = deserialize_record(record_bytes) + if end is not None and record.chain_index > end: + break + yield record + + def append( + self, + content_hash: bytes, + content_type: str, + private_key: Ed25519PrivateKey, + metadata: dict | None = None, + ) -> AttestationChainRecord: + """Create, sign, and append a new record to the chain. + + The entire read-compute-write cycle runs under an exclusive file lock + to prevent concurrent writers from forking the chain (TOCTOU defense). + + Args: + content_hash: SHA-256 of the content being attested. + content_type: MIME-like type identifier for the content. + private_key: Ed25519 private key for signing. + metadata: Optional extensible key-value metadata. + + Returns: + The newly created and appended AttestationChainRecord. + """ + from uuid_utils import uuid7 + + # Pre-compute values that don't depend on chain state + public_key = private_key.public_key() + pub_bytes = public_key.public_bytes(Encoding.Raw, PublicFormat.Raw) + + try: + with open(self._chain_file, "ab") as f: + fcntl.flock(f, fcntl.LOCK_EX) + try: + # Re-read state INSIDE the lock to prevent TOCTOU races. + # Also invalidate the offset index so that any records + # written by another process since our last read are picked + # up during the ensuing offset rebuild. + self._state = None + self._offsets = None + state = self._load_state() + # Ensure the offset index reflects the current file contents + # (including any records appended by other processes). This + # is a full scan only when state.cbor exists and the index + # was not already built by _rebuild_state() above. + self._ensure_offsets() + now = _now_us() + + if state is None: + chain_index = 0 + prev_hash = ChainState.GENESIS_PREV_HASH + else: + chain_index = state.head_index + 1 + prev_hash = state.head_hash + + entropy = collect_entropy_witnesses(self._chain_file) + + # Build unsigned record + record = AttestationChainRecord( + version=1, + record_id=uuid7().bytes, + chain_index=chain_index, + prev_hash=prev_hash, + content_hash=content_hash, + content_type=content_type, + metadata=metadata or {}, + claimed_ts=now, + entropy_witnesses=entropy, + signer_pubkey=pub_bytes, + signature=b"", # placeholder + ) + + # Sign canonical bytes + sig = private_key.sign(canonical_bytes(record)) + + # Replace with signed record (frozen dataclass) + record = AttestationChainRecord( + version=record.version, + record_id=record.record_id, + chain_index=record.chain_index, + prev_hash=record.prev_hash, + content_hash=record.content_hash, + content_type=record.content_type, + metadata=record.metadata, + claimed_ts=record.claimed_ts, + entropy_witnesses=record.entropy_witnesses, + signer_pubkey=record.signer_pubkey, + signature=sig, + ) + + # Serialize and write + record_bytes = serialize_record(record) + length_prefix = _LEN_STRUCT.pack(len(record_bytes)) + # Record the byte offset before writing so it can be added + # to the in-memory offset index without a second file scan. + new_record_offset = f.seek(0, os.SEEK_CUR) + f.write(length_prefix) + f.write(record_bytes) + f.flush() + os.fsync(f.fileno()) + + # Update state inside the lock + record_hash = compute_record_hash(record) + if state is None: + chain_id = hashlib.sha256(canonical_bytes(record)).digest() + new_state = ChainState( + chain_id=chain_id, + head_index=0, + head_hash=record_hash, + record_count=1, + created_at=now, + last_append_at=now, + ) + else: + new_state = ChainState( + chain_id=state.chain_id, + head_index=chain_index, + head_hash=record_hash, + record_count=state.record_count + 1, + created_at=state.created_at, + last_append_at=now, + ) + self._save_state(new_state) + + # Keep the offset index consistent so subsequent get() / + # iter_records() calls on this instance remain O(1). + if self._offsets is not None: + self._offsets[chain_index] = new_record_offset + finally: + fcntl.flock(f, fcntl.LOCK_UN) + except OSError as e: + raise ChainAppendError(f"Failed to write to chain: {e}") from e + + return record + + def verify_chain(self, start: int = 0, end: int | None = None) -> bool: + """Verify hash chain integrity and signatures over a range. + + Args: + start: First record index to verify (default 0). + end: Last record index to verify (default: head). + + Returns: + True if the chain is valid. + + Raises: + ChainIntegrityError: If any integrity check fails. + """ + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + prev_record: AttestationChainRecord | None = None + expected_index = start + + # If starting from 0, first record must have genesis prev_hash + if start > 0: + # Load the record before start to check the first prev_hash + try: + prev_record = self.get(start - 1) + except ChainError: + pass # Can't verify prev_hash of first record in range + + signer_pubkey: bytes | None = None + + for record in self.iter_records(start, end): + # Check index continuity + if record.chain_index != expected_index: + raise ChainIntegrityError( + f"Expected index {expected_index}, got {record.chain_index}" + ) + + # Check prev_hash linkage + if prev_record is not None: + expected_hash = compute_record_hash(prev_record) + if record.prev_hash != expected_hash: + raise ChainIntegrityError( + f"Record {record.chain_index}: prev_hash mismatch" + ) + elif record.chain_index == 0: + if record.prev_hash != ChainState.GENESIS_PREV_HASH: + raise ChainIntegrityError("Genesis record has non-zero prev_hash") + + # Check signature + try: + pub = Ed25519PublicKey.from_public_bytes(record.signer_pubkey) + pub.verify(record.signature, canonical_bytes(record)) + except Exception as e: + raise ChainIntegrityError( + f"Record {record.chain_index}: signature verification failed: {e}" + ) from e + + # Check single-signer invariant + if signer_pubkey is None: + signer_pubkey = record.signer_pubkey + elif record.signer_pubkey != signer_pubkey: + raise ChainIntegrityError( + f"Record {record.chain_index}: signer changed " + f"(expected {signer_pubkey.hex()[:16]}..., " + f"got {record.signer_pubkey.hex()[:16]}...)" + ) + + prev_record = record + expected_index += 1 + + return True diff --git a/src/soosef/federation/entropy.py b/src/soosef/federation/entropy.py new file mode 100644 index 0000000..c1bdba5 --- /dev/null +++ b/src/soosef/federation/entropy.py @@ -0,0 +1,81 @@ +"""System entropy collection for timestamp plausibility witnesses.""" + +from __future__ import annotations + +import hashlib +import os +import time +import uuid +from pathlib import Path + +from soosef.federation.models import EntropyWitnesses + +# Cache boot_id for the lifetime of the process (fallback only) +_cached_boot_id: str | None = None + + +def _read_proc_file(path: str) -> str | None: + """Read a /proc file, returning None if unavailable.""" + try: + return Path(path).read_text().strip() + except OSError: + return None + + +def _get_boot_id() -> str: + """Get the kernel boot ID (Linux) or a per-process fallback.""" + global _cached_boot_id + boot_id = _read_proc_file("/proc/sys/kernel/random/boot_id") + if boot_id: + return boot_id + # Non-Linux fallback: stable per process lifetime + if _cached_boot_id is None: + _cached_boot_id = str(uuid.uuid4()) + return _cached_boot_id + + +def _get_proc_entropy() -> int: + """Get kernel entropy pool availability (Linux) or a fallback marker.""" + value = _read_proc_file("/proc/sys/kernel/random/entropy_avail") + if value is not None: + try: + return int(value) + except ValueError: + pass + # Non-Linux fallback: always 32 (marker value) + return len(os.urandom(32)) + + +def _get_fs_snapshot(path: Path) -> bytes: + """Hash filesystem metadata of the given path, truncated to 16 bytes. + + Includes mtime, ctime, size, and inode to capture any filesystem change. + """ + try: + st = path.stat() + data = f"{st.st_mtime_ns}:{st.st_ctime_ns}:{st.st_size}:{st.st_ino}".encode() + except OSError: + # Path doesn't exist yet (first record) — hash the parent dir + try: + st = path.parent.stat() + data = f"{st.st_mtime_ns}:{st.st_ctime_ns}:{st.st_size}:{st.st_ino}".encode() + except OSError: + data = b"no-fs-state" + return hashlib.sha256(data).digest()[:16] + + +def collect_entropy_witnesses(chain_db_path: Path) -> EntropyWitnesses: + """Gather system entropy witnesses for an attestation chain record. + + Args: + chain_db_path: Path to chain.bin, used for fs_snapshot. + + Returns: + EntropyWitnesses with current system state. + """ + return EntropyWitnesses( + sys_uptime=time.monotonic(), + fs_snapshot=_get_fs_snapshot(chain_db_path), + proc_entropy=_get_proc_entropy(), + boot_id=_get_boot_id(), + ) diff --git a/src/soosef/federation/models.py b/src/soosef/federation/models.py new file mode 100644 index 0000000..c392b4c --- /dev/null +++ b/src/soosef/federation/models.py @@ -0,0 +1,57 @@ +"""Data models for the attestation chain.""" + +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass(frozen=True) +class EntropyWitnesses: + """System-state snapshot collected at record creation time. + + Serves as soft evidence that the claimed timestamp is plausible. + Fabricating convincing witnesses for a backdated record requires + simulating the full system state at the claimed time. + """ + + sys_uptime: float + fs_snapshot: bytes # 16 bytes, truncated SHA-256 + proc_entropy: int + boot_id: str + + +@dataclass(frozen=True) +class AttestationChainRecord: + """A single record in the attestation hash chain. + + Each record wraps content (typically a Verisoo attestation) with + a hash link to the previous record, entropy witnesses, and an + Ed25519 signature. + """ + + version: int + record_id: bytes # UUID v7, 16 bytes + chain_index: int + prev_hash: bytes # SHA-256, 32 bytes + content_hash: bytes # SHA-256 of wrapped content, 32 bytes + content_type: str + metadata: dict = field(default_factory=dict) + claimed_ts: int = 0 # Unix microseconds + entropy_witnesses: EntropyWitnesses | None = None + signer_pubkey: bytes = b"" # Ed25519 raw public key, 32 bytes + signature: bytes = b"" # Ed25519 signature, 64 bytes + + +@dataclass +class ChainState: + """Checkpoint of chain state, persisted to state.cbor.""" + + chain_id: bytes # SHA-256 of genesis record + head_index: int + head_hash: bytes + record_count: int + created_at: int # Unix µs + last_append_at: int # Unix µs + + # Genesis prev_hash sentinel + GENESIS_PREV_HASH: bytes = b"\x00" * 32 diff --git a/src/soosef/federation/serialization.py b/src/soosef/federation/serialization.py new file mode 100644 index 0000000..7c4c08d --- /dev/null +++ b/src/soosef/federation/serialization.py @@ -0,0 +1,97 @@ +"""CBOR serialization for attestation chain records. + +Uses canonical CBOR encoding (RFC 8949 §4.2) for deterministic hashing +and signing. Integer keys are used in CBOR maps for compactness. +""" + +from __future__ import annotations + +import hashlib + +import cbor2 + +from soosef.federation.models import AttestationChainRecord, EntropyWitnesses + + +def _entropy_to_cbor_map(ew: EntropyWitnesses) -> dict: + """Convert EntropyWitnesses to a CBOR-ready map with integer keys.""" + return { + 0: ew.sys_uptime, + 1: ew.fs_snapshot, + 2: ew.proc_entropy, + 3: ew.boot_id, + } + + +def _cbor_map_to_entropy(m: dict) -> EntropyWitnesses: + """Convert a CBOR map back to EntropyWitnesses.""" + return EntropyWitnesses( + sys_uptime=m[0], + fs_snapshot=m[1], + proc_entropy=m[2], + boot_id=m[3], + ) + + +def canonical_bytes(record: AttestationChainRecord) -> bytes: + """Produce deterministic CBOR bytes for hashing and signing. + + Includes all fields except signature. This is the input to both + Ed25519_Sign and SHA-256 for chain linking. + """ + m = { + 0: record.version, + 1: record.record_id, + 2: record.chain_index, + 3: record.prev_hash, + 4: record.content_hash, + 5: record.content_type, + 6: record.metadata, + 7: record.claimed_ts, + 8: _entropy_to_cbor_map(record.entropy_witnesses) if record.entropy_witnesses else {}, + 9: record.signer_pubkey, + } + return cbor2.dumps(m, canonical=True) + + +def compute_record_hash(record: AttestationChainRecord) -> bytes: + """SHA-256 of canonical_bytes(record). Used as prev_hash in next record.""" + return hashlib.sha256(canonical_bytes(record)).digest() + + +def serialize_record(record: AttestationChainRecord) -> bytes: + """Full CBOR serialization including signature. Used for storage.""" + m = { + 0: record.version, + 1: record.record_id, + 2: record.chain_index, + 3: record.prev_hash, + 4: record.content_hash, + 5: record.content_type, + 6: record.metadata, + 7: record.claimed_ts, + 8: _entropy_to_cbor_map(record.entropy_witnesses) if record.entropy_witnesses else {}, + 9: record.signer_pubkey, + 10: record.signature, + } + return cbor2.dumps(m, canonical=True) + + +def deserialize_record(data: bytes) -> AttestationChainRecord: + """Deserialize CBOR bytes to an AttestationChainRecord.""" + m = cbor2.loads(data) + entropy_map = m.get(8, {}) + entropy = _cbor_map_to_entropy(entropy_map) if entropy_map else None + return AttestationChainRecord( + version=m[0], + record_id=m[1], + chain_index=m[2], + prev_hash=m[3], + content_hash=m[4], + content_type=m[5], + metadata=m.get(6, {}), + claimed_ts=m.get(7, 0), + entropy_witnesses=entropy, + signer_pubkey=m.get(9, b""), + signature=m.get(10, b""), + ) diff --git a/src/soosef/fieldkit/geofence.py b/src/soosef/fieldkit/geofence.py index cbf00a4..778b226 100644 --- a/src/soosef/fieldkit/geofence.py +++ b/src/soosef/fieldkit/geofence.py @@ -7,9 +7,11 @@ Requires GPS hardware or location services. from __future__ import annotations +import json import logging import math from dataclasses import dataclass +from pathlib import Path logger = logging.getLogger(__name__) @@ -39,3 +41,50 @@ def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> fl def is_inside(fence: GeoCircle, lat: float, lon: float) -> bool: """Check if a point is inside the geofence.""" return haversine_distance(fence.lat, fence.lon, lat, lon) <= fence.radius_m + + +def load_fence(path: Path | None = None) -> GeoCircle | None: + """Load a saved geofence from disk. Returns None if no fence is configured.""" + from soosef.paths import GEOFENCE_CONFIG + + fence_path = path or GEOFENCE_CONFIG + if not fence_path.exists(): + return None + with open(fence_path) as f: + data = json.load(f) + return GeoCircle( + lat=data["lat"], + lon=data["lon"], + radius_m=data["radius_m"], + name=data.get("name", "default"), + ) + + +def save_fence(fence: GeoCircle, path: Path | None = None) -> None: + """Persist a geofence to disk.""" + from soosef.paths import GEOFENCE_CONFIG + + fence_path = path or GEOFENCE_CONFIG + fence_path.parent.mkdir(parents=True, exist_ok=True) + with open(fence_path, "w") as f: + json.dump( + { + "lat": fence.lat, + "lon": fence.lon, + "radius_m": fence.radius_m, + "name": fence.name, + }, + f, + indent=2, + ) + + +def clear_fence(path: Path | None = None) -> bool: + """Remove the saved geofence. Returns True if a fence was present and removed.""" + from soosef.paths import GEOFENCE_CONFIG + + fence_path = path or GEOFENCE_CONFIG + if fence_path.exists(): + fence_path.unlink() + return True + return False diff --git a/src/soosef/fieldkit/killswitch.py b/src/soosef/fieldkit/killswitch.py index 0104670..9f54368 100644 --- a/src/soosef/fieldkit/killswitch.py +++ b/src/soosef/fieldkit/killswitch.py @@ -19,16 +19,7 @@ from dataclasses import dataclass, field from pathlib import Path from soosef.exceptions import KillswitchError -from soosef.paths import ( - ATTESTATIONS_DIR, - AUTH_DB, - BASE_DIR, - CHANNEL_KEY_FILE, - CONFIG_FILE, - IDENTITY_DIR, - INSTANCE_DIR, - TEMP_DIR, -) +import soosef.paths as paths logger = logging.getLogger(__name__) @@ -95,17 +86,18 @@ def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") -> logger.warning("KILLSWITCH ACTIVATED — reason: %s, scope: %s", reason, scope.value) steps: list[tuple[str, callable]] = [ - ("destroy_identity_keys", lambda: _secure_delete_dir(IDENTITY_DIR)), - ("destroy_channel_key", lambda: _secure_delete_file(CHANNEL_KEY_FILE)), - ("destroy_flask_secret", lambda: _secure_delete_file(INSTANCE_DIR / ".secret_key")), + ("destroy_identity_keys", lambda: _secure_delete_dir(paths.IDENTITY_DIR)), + ("destroy_channel_key", lambda: _secure_delete_file(paths.CHANNEL_KEY_FILE)), + ("destroy_flask_secret", lambda: _secure_delete_file(paths.INSTANCE_DIR / ".secret_key")), ] if scope == PurgeScope.ALL: steps.extend([ - ("destroy_auth_db", lambda: _secure_delete_file(AUTH_DB)), - ("destroy_attestation_log", lambda: _secure_delete_dir(ATTESTATIONS_DIR)), - ("destroy_temp_files", lambda: _secure_delete_dir(TEMP_DIR)), - ("destroy_config", lambda: _secure_delete_file(CONFIG_FILE)), + ("destroy_auth_db", lambda: _secure_delete_file(paths.AUTH_DB)), + ("destroy_attestation_log", lambda: _secure_delete_dir(paths.ATTESTATIONS_DIR)), + ("destroy_chain_data", lambda: _secure_delete_dir(paths.CHAIN_DIR)), + ("destroy_temp_files", lambda: _secure_delete_dir(paths.TEMP_DIR)), + ("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)), ("clear_journald", _clear_system_logs), ]) diff --git a/src/soosef/keystore/manager.py b/src/soosef/keystore/manager.py index 07e10ef..b773d25 100644 --- a/src/soosef/keystore/manager.py +++ b/src/soosef/keystore/manager.py @@ -10,17 +10,19 @@ from __future__ import annotations import os from pathlib import Path +import soosef.paths as _paths from soosef.exceptions import KeystoreError -from soosef.keystore.models import IdentityInfo, KeystoreStatus -from soosef.paths import CHANNEL_KEY_FILE, IDENTITY_DIR, IDENTITY_PRIVATE_KEY, IDENTITY_PUBLIC_KEY +from soosef.keystore.models import IdentityInfo, KeystoreStatus, RotationResult class KeystoreManager: """Manages all key material for a SooSeF instance.""" def __init__(self, identity_dir: Path | None = None, channel_key_file: Path | None = None): - self._identity_dir = identity_dir or IDENTITY_DIR - self._channel_key_file = channel_key_file or CHANNEL_KEY_FILE + # Use lazy path resolution so that --data-dir / SOOSEF_DATA_DIR overrides + # propagate correctly when paths.BASE_DIR is changed at runtime. + self._identity_dir = identity_dir or _paths.IDENTITY_DIR + self._channel_key_file = channel_key_file or _paths.CHANNEL_KEY_FILE # ── Verisoo Identity (Ed25519) ────────────────────────────────── @@ -28,6 +30,10 @@ class KeystoreManager: """Check if an Ed25519 identity exists.""" return (self._identity_dir / "private.pem").exists() + def _identity_meta_path(self) -> Path: + """Path to the identity creation-timestamp sidecar file.""" + return self._identity_dir / "identity.meta.json" + def get_identity(self) -> IdentityInfo: """Get identity info. Raises KeystoreError if no identity exists.""" pub_path = self._identity_dir / "public.pem" @@ -36,8 +42,11 @@ class KeystoreManager: if not pub_path.exists(): raise KeystoreError("No identity found. Run 'soosef init' to generate one.") - from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat - from cryptography.hazmat.primitives.serialization import load_pem_public_key + from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, + load_pem_public_key, + ) pub_pem = pub_path.read_bytes() public_key = load_pem_public_key(pub_pem) @@ -47,14 +56,53 @@ class KeystoreManager: fingerprint = hashlib.sha256(pub_raw).hexdigest()[:32] + # Resolve created_at from the sidecar written by generate_identity(). + # Fall back to private key mtime for keys generated before the sidecar + # was introduced (legacy compatibility). + from datetime import UTC, datetime + + created_at: datetime | None = None + meta_path = self._identity_meta_path() + if meta_path.exists(): + try: + import json + + meta = json.loads(meta_path.read_text()) + created_at = datetime.fromisoformat(meta["created_at"]) + except Exception: + pass # malformed sidecar — fall through to mtime + + if created_at is None and priv_path.exists(): + created_at = datetime.fromtimestamp(priv_path.stat().st_mtime, tz=UTC) + return IdentityInfo( fingerprint=fingerprint, public_key_pem=pub_pem.decode(), + created_at=created_at, has_private_key=priv_path.exists(), ) + def _archive_dir_for(self, parent: Path) -> Path: + """Return a timestamped archive subdirectory under *parent*/archived/. + + The timestamp uses ISO-8601 basic format (no colons) so the directory + name is safe on all filesystems: ``archived/2026-04-01T120000Z``. + """ + from datetime import UTC, datetime + + ts = datetime.now(UTC).strftime("%Y-%m-%dT%H%M%S_%fZ") + return parent / "archived" / ts + def generate_identity(self, password: bytes | None = None) -> IdentityInfo: - """Generate a new Ed25519 keypair.""" + """Generate a new Ed25519 keypair. + + Security note: the private key is stored unencrypted by default. + This is intentional — the killswitch (secure deletion) is the + primary defense for at-risk users, not key encryption. A password- + protected key would require prompting on every attestation and + chain operation, which is unworkable in field conditions. The + key file is protected by 0o600 permissions. + """ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import ( BestAvailableEncryption, @@ -81,8 +129,68 @@ class KeystoreManager: pub_path = self._identity_dir / "public.pem" pub_path.write_bytes(pub_pem) + # Write creation timestamp sidecar so get_identity() always returns an + # authoritative created_at without relying on filesystem mtime. + import json + from datetime import UTC, datetime + + meta_path = self._identity_meta_path() + meta_path.write_text( + json.dumps({"created_at": datetime.now(UTC).isoformat()}, indent=None) + ) + return self.get_identity() + def rotate_identity(self, password: bytes | None = None) -> RotationResult: + """Rotate the Ed25519 identity keypair. + + The current private and public keys are copied verbatim to a + timestamped archive directory before the new keypair is generated. + Both old and new fingerprints are returned so the caller can report + them and prompt the user to notify collaborators. + + Raises KeystoreError if no identity exists yet (use generate_identity + for initial setup). + """ + import shutil + + if not self.has_identity(): + raise KeystoreError("No identity to rotate. Run 'soosef init' first.") + + old_info = self.get_identity() + + # Archive the current keypair under identity/archived// + archive_dir = self._archive_dir_for(self._identity_dir) + archive_dir.mkdir(parents=True, exist_ok=True) + archive_dir.chmod(0o700) + + priv_src = self._identity_dir / "private.pem" + pub_src = self._identity_dir / "public.pem" + meta_src = self._identity_meta_path() + + shutil.copy2(priv_src, archive_dir / "private.pem") + (archive_dir / "private.pem").chmod(0o600) + shutil.copy2(pub_src, archive_dir / "public.pem") + if meta_src.exists(): + shutil.copy2(meta_src, archive_dir / "identity.meta.json") + + # Write a small provenance note alongside the archived key so an + # operator can reconstruct the rotation timeline without tooling. + from datetime import UTC, datetime + + (archive_dir / "rotation.txt").write_text( + f"Rotated at: {datetime.now(UTC).isoformat()}\n" + f"Old fingerprint: {old_info.fingerprint}\n" + ) + + new_info = self.generate_identity(password=password) + + return RotationResult( + old_fingerprint=old_info.fingerprint, + new_fingerprint=new_info.fingerprint, + archive_path=archive_dir, + ) + # ── Stegasoo Channel Key ──────────────────────────────────────── def has_channel_key(self) -> bool: @@ -115,6 +223,59 @@ class KeystoreManager: self.set_channel_key(key) return key + def rotate_channel_key(self) -> RotationResult: + """Rotate the Stegasoo channel key. + + The current key is copied to a timestamped archive directory before + the new key is generated. Both old and new channel fingerprints are + returned. + + Raises KeystoreError if no channel key exists yet (use + generate_channel_key for initial setup). + """ + import shutil + + if not self.has_channel_key(): + raise KeystoreError("No channel key to rotate. Run 'soosef init' first.") + + # Only file-based keys can be archived; env-var keys have no on-disk + # representation to back up, so we refuse rather than silently skip. + if not self._channel_key_file.exists(): + raise KeystoreError( + "Channel key is set via STEGASOO_CHANNEL_KEY environment variable " + "and cannot be rotated through soosef. Unset the variable and store " + "the key in the keystore first." + ) + + from stegasoo.crypto import get_channel_fingerprint + + old_key = self._channel_key_file.read_text().strip() + old_fp = get_channel_fingerprint(old_key) + + # Archive under stegasoo/archived//channel.key + archive_dir = self._archive_dir_for(self._channel_key_file.parent) + archive_dir.mkdir(parents=True, exist_ok=True) + archive_dir.chmod(0o700) + + shutil.copy2(self._channel_key_file, archive_dir / "channel.key") + (archive_dir / "channel.key").chmod(0o600) + + from datetime import UTC, datetime + + (archive_dir / "rotation.txt").write_text( + f"Rotated at: {datetime.now(UTC).isoformat()}\n" + f"Old fingerprint: {old_fp}\n" + ) + + new_key = self.generate_channel_key() + new_fp = get_channel_fingerprint(new_key) + + return RotationResult( + old_fingerprint=old_fp, + new_fingerprint=new_fp, + archive_path=archive_dir, + ) + # ── Unified Status ────────────────────────────────────────────── def status(self) -> KeystoreStatus: diff --git a/src/soosef/keystore/models.py b/src/soosef/keystore/models.py index 3a7d66a..354a4b5 100644 --- a/src/soosef/keystore/models.py +++ b/src/soosef/keystore/models.py @@ -2,6 +2,7 @@ from dataclasses import dataclass from datetime import datetime +from pathlib import Path @dataclass @@ -22,3 +23,12 @@ class KeystoreStatus: identity_fingerprint: str | None has_channel_key: bool channel_fingerprint: str | None + + +@dataclass +class RotationResult: + """Result of a key rotation operation.""" + + old_fingerprint: str + new_fingerprint: str + archive_path: Path diff --git a/src/soosef/paths.py b/src/soosef/paths.py index 35c080c..2be7f12 100644 --- a/src/soosef/paths.py +++ b/src/soosef/paths.py @@ -4,8 +4,12 @@ Centralized path constants for SooSeF. All ~/.soosef/* paths are defined here. Every module that needs a path imports from this module — no hardcoded paths anywhere else. -The base directory can be overridden via SOOSEF_DATA_DIR environment variable -for multi-instance deployments or testing. +The base directory can be overridden via: +- SOOSEF_DATA_DIR environment variable (before import) +- Setting paths.BASE_DIR at runtime (e.g., CLI --data-dir flag) + +All derived paths (IDENTITY_DIR, CHAIN_DIR, etc.) are computed lazily +from BASE_DIR so that runtime overrides propagate correctly. """ import os @@ -14,68 +18,90 @@ from pathlib import Path # Allow override for testing or multi-instance deployments BASE_DIR = Path(os.environ.get("SOOSEF_DATA_DIR", Path.home() / ".soosef")) -# Ed25519 identity keypair (verisoo signing) -IDENTITY_DIR = BASE_DIR / "identity" -IDENTITY_PRIVATE_KEY = IDENTITY_DIR / "private.pem" -IDENTITY_PUBLIC_KEY = IDENTITY_DIR / "public.pem" +# Path definitions relative to BASE_DIR. These are resolved lazily via +# __getattr__ so that changes to BASE_DIR propagate to all derived paths. +_PATH_DEFS: dict[str, tuple[str, ...]] = { + # Ed25519 identity keypair (verisoo signing) + "IDENTITY_DIR": ("identity",), + "IDENTITY_PRIVATE_KEY": ("identity", "private.pem"), + "IDENTITY_PUBLIC_KEY": ("identity", "public.pem"), + # Sidecar metadata written by generate_identity(); stores creation timestamp + # so get_identity() can return an authoritative created_at without relying + # on fragile filesystem mtime. + "IDENTITY_META": ("identity", "identity.meta.json"), + # Stegasoo state + "STEGASOO_DIR": ("stegasoo",), + "CHANNEL_KEY_FILE": ("stegasoo", "channel.key"), + # Verisoo attestation storage + "ATTESTATIONS_DIR": ("attestations",), + "ATTESTATION_LOG": ("attestations", "log.bin"), + "ATTESTATION_INDEX": ("attestations", "index"), + "PEERS_FILE": ("attestations", "peers.json"), + # Web UI auth database + "AUTH_DIR": ("auth",), + "AUTH_DB": ("auth", "soosef.db"), + # SSL certificates + "CERTS_DIR": ("certs",), + "SSL_CERT": ("certs", "cert.pem"), + "SSL_KEY": ("certs", "key.pem"), + # Fieldkit state + "FIELDKIT_DIR": ("fieldkit",), + "FIELDKIT_CONFIG": ("fieldkit", "config.json"), + "DEADMAN_STATE": ("fieldkit", "deadman.json"), + "TAMPER_DIR": ("fieldkit", "tamper"), + "TAMPER_BASELINE": ("fieldkit", "tamper", "baseline.json"), + "USB_DIR": ("fieldkit", "usb"), + "USB_WHITELIST": ("fieldkit", "usb", "whitelist.json"), + "GEOFENCE_CONFIG": ("fieldkit", "geofence.json"), + # Attestation hash chain + "CHAIN_DIR": ("chain",), + "CHAIN_DB": ("chain", "chain.bin"), + "CHAIN_STATE": ("chain", "state.cbor"), + # Ephemeral + "TEMP_DIR": ("temp",), + # Structured audit trail (append-only JSON-lines). + # Lives directly under BASE_DIR so it is destroyed by the killswitch along + # with everything else — intentional, per the security model. + "AUDIT_LOG": ("audit.jsonl",), + # Flask instance path (sessions, secret key) + "INSTANCE_DIR": ("instance",), + "SECRET_KEY_FILE": ("instance", ".secret_key"), + # Unified config + "CONFIG_FILE": ("config.json",), +} -# Stegasoo state -STEGASOO_DIR = BASE_DIR / "stegasoo" -CHANNEL_KEY_FILE = STEGASOO_DIR / "channel.key" -# Verisoo attestation storage -ATTESTATIONS_DIR = BASE_DIR / "attestations" -ATTESTATION_LOG = ATTESTATIONS_DIR / "log.bin" -ATTESTATION_INDEX = ATTESTATIONS_DIR / "index" -PEERS_FILE = ATTESTATIONS_DIR / "peers.json" - -# Web UI auth database -AUTH_DIR = BASE_DIR / "auth" -AUTH_DB = AUTH_DIR / "soosef.db" - -# SSL certificates -CERTS_DIR = BASE_DIR / "certs" -SSL_CERT = CERTS_DIR / "cert.pem" -SSL_KEY = CERTS_DIR / "key.pem" - -# Fieldkit state -FIELDKIT_DIR = BASE_DIR / "fieldkit" -FIELDKIT_CONFIG = FIELDKIT_DIR / "config.json" -DEADMAN_STATE = FIELDKIT_DIR / "deadman.json" -TAMPER_DIR = FIELDKIT_DIR / "tamper" -TAMPER_BASELINE = TAMPER_DIR / "baseline.json" -USB_DIR = FIELDKIT_DIR / "usb" -USB_WHITELIST = USB_DIR / "whitelist.json" - -# Ephemeral -TEMP_DIR = BASE_DIR / "temp" - -# Flask instance path (sessions, secret key) -INSTANCE_DIR = BASE_DIR / "instance" -SECRET_KEY_FILE = INSTANCE_DIR / ".secret_key" - -# Unified config -CONFIG_FILE = BASE_DIR / "config.json" +def __getattr__(name: str) -> Path: + """Resolve derived paths lazily from current BASE_DIR.""" + if name in _PATH_DEFS: + return Path(BASE_DIR, *_PATH_DEFS[name]) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") def ensure_dirs() -> None: """Create all required directories with appropriate permissions.""" dirs = [ BASE_DIR, - IDENTITY_DIR, - STEGASOO_DIR, - ATTESTATIONS_DIR, - AUTH_DIR, - CERTS_DIR, - FIELDKIT_DIR, - TAMPER_DIR, - USB_DIR, - TEMP_DIR, - INSTANCE_DIR, + __getattr__("IDENTITY_DIR"), + __getattr__("STEGASOO_DIR"), + __getattr__("ATTESTATIONS_DIR"), + __getattr__("CHAIN_DIR"), + __getattr__("AUTH_DIR"), + __getattr__("CERTS_DIR"), + __getattr__("FIELDKIT_DIR"), + __getattr__("TAMPER_DIR"), + __getattr__("USB_DIR"), + __getattr__("TEMP_DIR"), + __getattr__("INSTANCE_DIR"), ] for d in dirs: d.mkdir(parents=True, exist_ok=True) # Restrict permissions on sensitive directories - for d in [BASE_DIR, IDENTITY_DIR, AUTH_DIR, CERTS_DIR]: + for d in [ + BASE_DIR, + __getattr__("IDENTITY_DIR"), + __getattr__("AUTH_DIR"), + __getattr__("CERTS_DIR"), + ]: d.chmod(0o700) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..03b0b8c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,37 @@ +"""Shared test fixtures for SooSeF tests.""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + +@pytest.fixture() +def tmp_soosef_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Set SOOSEF_DATA_DIR to a temporary directory. + + This must be used before importing any module that reads soosef.paths + at import time. For modules that read paths lazily (most of them), + monkeypatching the paths module directly is more reliable. + """ + data_dir = tmp_path / ".soosef" + data_dir.mkdir() + monkeypatch.setenv("SOOSEF_DATA_DIR", str(data_dir)) + return data_dir + + +@pytest.fixture() +def chain_dir(tmp_path: Path) -> Path: + """A temporary chain directory.""" + d = tmp_path / "chain" + d.mkdir() + return d + + +@pytest.fixture() +def private_key() -> Ed25519PrivateKey: + """A fresh Ed25519 private key for testing.""" + return Ed25519PrivateKey.generate() diff --git a/tests/test_chain.py b/tests/test_chain.py new file mode 100644 index 0000000..bfc1e06 --- /dev/null +++ b/tests/test_chain.py @@ -0,0 +1,287 @@ +"""Tests for the attestation hash chain store.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + +from soosef.exceptions import ChainError, ChainIntegrityError +from soosef.federation.chain import ChainStore +from soosef.federation.models import ChainState +from soosef.federation.serialization import canonical_bytes, compute_record_hash + + +def test_genesis_record(chain_dir: Path, private_key: Ed25519PrivateKey): + """First record has chain_index=0 and prev_hash=0x00*32.""" + store = ChainStore(chain_dir) + content_hash = hashlib.sha256(b"genesis content").digest() + + record = store.append(content_hash, "test/plain", private_key) + + assert record.chain_index == 0 + assert record.prev_hash == ChainState.GENESIS_PREV_HASH + assert record.version == 1 + assert record.content_hash == content_hash + assert record.content_type == "test/plain" + assert len(record.record_id) == 16 + assert len(record.signer_pubkey) == 32 + assert len(record.signature) == 64 + + +def test_chain_state_after_genesis(chain_dir: Path, private_key: Ed25519PrivateKey): + """Chain state is correctly initialized after the first record.""" + store = ChainStore(chain_dir) + record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key) + + state = store.state() + assert state is not None + assert state.record_count == 1 + assert state.head_index == 0 + assert state.head_hash == compute_record_hash(record) + assert state.chain_id == hashlib.sha256(canonical_bytes(record)).digest() + + +def test_chain_append_multiple(chain_dir: Path, private_key: Ed25519PrivateKey): + """Appending multiple records produces monotonically increasing indices.""" + store = ChainStore(chain_dir) + + records = [] + for i in range(5): + content = hashlib.sha256(f"content-{i}".encode()).digest() + record = store.append(content, "test/plain", private_key) + records.append(record) + + for i, record in enumerate(records): + assert record.chain_index == i + + state = store.state() + assert state is not None + assert state.record_count == 5 + assert state.head_index == 4 + + +def test_chain_hash_linkage(chain_dir: Path, private_key: Ed25519PrivateKey): + """Each record's prev_hash matches the hash of the previous record.""" + store = ChainStore(chain_dir) + + r0 = store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key) + r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key) + r2 = store.append(hashlib.sha256(b"r2").digest(), "test/plain", private_key) + + assert r1.prev_hash == compute_record_hash(r0) + assert r2.prev_hash == compute_record_hash(r1) + + +def test_signature_verification(chain_dir: Path, private_key: Ed25519PrivateKey): + """Each record's Ed25519 signature is valid over canonical bytes.""" + store = ChainStore(chain_dir) + record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key) + + pub = Ed25519PublicKey.from_public_bytes(record.signer_pubkey) + # This will raise if invalid + pub.verify(record.signature, canonical_bytes(record)) + + +def test_verify_chain_valid(chain_dir: Path, private_key: Ed25519PrivateKey): + """verify_chain returns True on a valid chain.""" + store = ChainStore(chain_dir) + for i in range(5): + store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key) + + assert store.verify_chain() is True + + +def test_verify_chain_detects_tamper(chain_dir: Path, private_key: Ed25519PrivateKey): + """verify_chain detects a tampered record in the middle.""" + store = ChainStore(chain_dir) + for i in range(3): + store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key) + + # Corrupt chain.bin by flipping a byte in the middle + chain_file = chain_dir / "chain.bin" + data = bytearray(chain_file.read_bytes()) + midpoint = len(data) // 2 + data[midpoint] ^= 0xFF + chain_file.write_bytes(bytes(data)) + + # Force state reload + store._state = None + + with pytest.raises((ChainIntegrityError, ChainError)): + store.verify_chain() + + +def test_entropy_witnesses_populated(chain_dir: Path, private_key: Ed25519PrivateKey): + """Entropy witnesses are populated with non-trivial values.""" + store = ChainStore(chain_dir) + record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key) + + ew = record.entropy_witnesses + assert ew is not None + assert ew.sys_uptime > 0 + assert len(ew.fs_snapshot) == 16 + assert ew.proc_entropy > 0 + assert len(ew.boot_id) > 0 + + +def test_chain_persistence(chain_dir: Path, private_key: Ed25519PrivateKey): + """Chain survives close and reopen.""" + store1 = ChainStore(chain_dir) + r0 = store1.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key) + r1 = store1.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key) + + # Open a new store instance (simulates process restart) + store2 = ChainStore(chain_dir) + + state = store2.state() + assert state is not None + assert state.record_count == 2 + assert state.head_index == 1 + assert state.head_hash == compute_record_hash(r1) + + # Can read records back + loaded_r0 = store2.get(0) + assert loaded_r0.content_hash == r0.content_hash + assert loaded_r0.signature == r0.signature + + +def test_chain_state_rebuild(chain_dir: Path, private_key: Ed25519PrivateKey): + """Chain state is rebuilt from chain.bin if state.cbor is missing.""" + store = ChainStore(chain_dir) + store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key) + r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key) + + # Delete state file + state_file = chain_dir / "state.cbor" + state_file.unlink() + + # Reopen — should rebuild + store2 = ChainStore(chain_dir) + state = store2.state() + assert state is not None + assert state.record_count == 2 + assert state.head_index == 1 + assert state.head_hash == compute_record_hash(r1) + + +def test_empty_chain(chain_dir: Path): + """Empty chain reports correct state.""" + store = ChainStore(chain_dir) + assert store.is_empty() is True + assert store.state() is None + assert store.head() is None + + +def test_get_nonexistent_index(chain_dir: Path, private_key: Ed25519PrivateKey): + """Accessing a nonexistent index raises ChainError.""" + store = ChainStore(chain_dir) + store.append(hashlib.sha256(b"only").digest(), "test/plain", private_key) + + with pytest.raises(ChainError, match="not found"): + store.get(99) + + +def test_iter_records_range(chain_dir: Path, private_key: Ed25519PrivateKey): + """iter_records respects start and end bounds.""" + store = ChainStore(chain_dir) + for i in range(10): + store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key) + + records = list(store.iter_records(start=3, end=6)) + assert len(records) == 4 + assert records[0].chain_index == 3 + assert records[-1].chain_index == 6 + + +def test_metadata_in_chain(chain_dir: Path, private_key: Ed25519PrivateKey): + """Metadata is preserved through append and retrieval.""" + store = ChainStore(chain_dir) + meta = {"caption": "evidence photo", "backfilled": True} + record = store.append( + hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta + ) + + loaded = store.get(0) + assert loaded.metadata == meta + + +def test_head_returns_latest(chain_dir: Path, private_key: Ed25519PrivateKey): + """head() returns the most recently appended record.""" + store = ChainStore(chain_dir) + for i in range(3): + store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key) + + head = store.head() + assert head is not None + assert head.chain_index == 2 + + +def test_verify_chain_detects_signer_change(chain_dir: Path): + """verify_chain flags a different signer as integrity violation.""" + store = ChainStore(chain_dir) + key1 = Ed25519PrivateKey.generate() + key2 = Ed25519PrivateKey.generate() + + store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1) + + # Manually bypass normal append to inject a record signed by key2. + # We need to build the record with correct prev_hash but wrong signer. + import struct + import fcntl + from soosef.federation.serialization import serialize_record + from soosef.federation.models import AttestationChainRecord + from soosef.federation.entropy import collect_entropy_witnesses + from uuid_utils import uuid7 + from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + from soosef.federation.serialization import canonical_bytes as cb + + state = store.state() + prev_hash = state.head_hash + pub2 = key2.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + entropy = collect_entropy_witnesses(chain_dir / "chain.bin") + + record = AttestationChainRecord( + version=1, + record_id=uuid7().bytes, + chain_index=1, + prev_hash=prev_hash, + content_hash=hashlib.sha256(b"r1").digest(), + content_type="test/plain", + metadata={}, + claimed_ts=int(__import__("time").time() * 1_000_000), + entropy_witnesses=entropy, + signer_pubkey=pub2, + signature=b"", + ) + sig = key2.sign(cb(record)) + record = AttestationChainRecord( + version=record.version, + record_id=record.record_id, + chain_index=record.chain_index, + prev_hash=record.prev_hash, + content_hash=record.content_hash, + content_type=record.content_type, + metadata=record.metadata, + claimed_ts=record.claimed_ts, + entropy_witnesses=record.entropy_witnesses, + signer_pubkey=record.signer_pubkey, + signature=sig, + ) + + record_bytes = serialize_record(record) + with open(chain_dir / "chain.bin", "ab") as f: + fcntl.flock(f, fcntl.LOCK_EX) + f.write(struct.pack(">I", len(record_bytes))) + f.write(record_bytes) + fcntl.flock(f, fcntl.LOCK_UN) + + store._state = None + + with pytest.raises(ChainIntegrityError, match="signer changed"): + store.verify_chain() diff --git a/tests/test_chain_security.py b/tests/test_chain_security.py new file mode 100644 index 0000000..c909e3e --- /dev/null +++ b/tests/test_chain_security.py @@ -0,0 +1,162 @@ +"""Security-focused tests for the attestation chain. + +Tests concurrent access, oversized records, and edge cases that could +compromise chain integrity. +""" + +from __future__ import annotations + +import hashlib +import struct +import threading +from pathlib import Path + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from soosef.exceptions import ChainError +from soosef.federation.chain import ChainStore, MAX_RECORD_SIZE +from soosef.federation.serialization import compute_record_hash + + +def test_concurrent_append_no_fork(chain_dir: Path): + """Concurrent appends must not fork the chain — indices must be unique.""" + private_key = Ed25519PrivateKey.generate() + num_threads = 8 + records_per_thread = 5 + results: list[list] = [[] for _ in range(num_threads)] + errors: list[Exception] = [] + + def worker(thread_id: int): + try: + store = ChainStore(chain_dir) + for i in range(records_per_thread): + content = hashlib.sha256(f"t{thread_id}-r{i}".encode()).digest() + record = store.append(content, "test/plain", private_key) + results[thread_id].append(record.chain_index) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=worker, args=(t,)) for t in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Thread errors: {errors}" + + # Collect all indices + all_indices = [] + for r in results: + all_indices.extend(r) + + # Every index must be unique (no fork) + assert len(all_indices) == len(set(all_indices)), ( + f"Duplicate chain indices detected — chain forked! " + f"Indices: {sorted(all_indices)}" + ) + + # Indices should be 0..N-1 contiguous + total = num_threads * records_per_thread + assert sorted(all_indices) == list(range(total)) + + # Full chain verification should pass + store = ChainStore(chain_dir) + assert store.verify_chain() is True + assert store.state().record_count == total + + +def test_oversized_record_rejected(chain_dir: Path): + """A corrupted length prefix exceeding MAX_RECORD_SIZE must raise ChainError.""" + chain_file = chain_dir / "chain.bin" + + # Write a length prefix claiming a 100 MB record + bogus_length = 100 * 1024 * 1024 + chain_file.write_bytes(struct.pack(">I", bogus_length) + b"\x00" * 100) + + store = ChainStore(chain_dir) + + with pytest.raises(ChainError, match="exceeds maximum"): + list(store._iter_raw()) + + +def test_max_record_size_boundary(chain_dir: Path): + """Records at exactly MAX_RECORD_SIZE should be rejected (real records are <1KB).""" + chain_file = chain_dir / "chain.bin" + + chain_file.write_bytes(struct.pack(">I", MAX_RECORD_SIZE + 1) + b"\x00" * 100) + + store = ChainStore(chain_dir) + with pytest.raises(ChainError, match="exceeds maximum"): + list(store._iter_raw()) + + +def test_truncated_chain_file(chain_dir: Path, private_key: Ed25519PrivateKey): + """A truncated chain.bin still yields complete records before the truncation.""" + store = ChainStore(chain_dir) + for i in range(3): + store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key) + + # Truncate the file mid-record + chain_file = chain_dir / "chain.bin" + data = chain_file.read_bytes() + chain_file.write_bytes(data[:len(data) - 50]) + + store2 = ChainStore(chain_dir) + records = list(store2._iter_raw()) + # Should get at least the first 2 complete records + assert len(records) >= 2 + assert records[0].chain_index == 0 + assert records[1].chain_index == 1 + + +def test_empty_chain_file(chain_dir: Path): + """An empty chain.bin (0 bytes) yields no records without error.""" + chain_file = chain_dir / "chain.bin" + chain_file.write_bytes(b"") + + store = ChainStore(chain_dir) + records = list(store._iter_raw()) + assert records == [] + + +def test_concurrent_read_during_write(chain_dir: Path): + """Reading the chain while appending should not crash.""" + private_key = Ed25519PrivateKey.generate() + store = ChainStore(chain_dir) + + # Seed with some records + for i in range(5): + store.append(hashlib.sha256(f"seed-{i}".encode()).digest(), "test/plain", private_key) + + read_errors: list[Exception] = [] + write_errors: list[Exception] = [] + + def reader(): + try: + s = ChainStore(chain_dir) + for _ in range(20): + list(s.iter_records()) + except Exception as e: + read_errors.append(e) + + def writer(): + try: + s = ChainStore(chain_dir) + for i in range(10): + s.append(hashlib.sha256(f"w-{i}".encode()).digest(), "test/plain", private_key) + except Exception as e: + write_errors.append(e) + + threads = [ + threading.Thread(target=reader), + threading.Thread(target=reader), + threading.Thread(target=writer), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not read_errors, f"Read errors during concurrent access: {read_errors}" + assert not write_errors, f"Write errors during concurrent access: {write_errors}" diff --git a/tests/test_deadman_enforcement.py b/tests/test_deadman_enforcement.py new file mode 100644 index 0000000..6a2f117 --- /dev/null +++ b/tests/test_deadman_enforcement.py @@ -0,0 +1,292 @@ +""" +Tests for dead man's switch background enforcement and CLI command. + +Covers: +- _deadman_enforcement_loop: does not call execute_purge when not armed +- _deadman_enforcement_loop: calls execute_purge when armed and overdue +- _deadman_enforcement_loop: exits after firing so execute_purge is not called twice +- _start_deadman_thread: returns a live daemon thread +- check-deadman CLI: exits 0 when disarmed +- check-deadman CLI: exits 0 and prints OK when armed but not overdue +- check-deadman CLI: exits 0 and prints OVERDUE warning when past interval but in grace +- check-deadman CLI: exits 2 when fully expired (past interval + grace) +""" + +from __future__ import annotations + +import json +import time +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import pytest +from click.testing import CliRunner + + +# ── Fixtures ──────────────────────────────────────────────────────────────── + + +@pytest.fixture() +def soosef_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Redirect soosef paths to a tmp directory.""" + import soosef.paths as paths + + data_dir = tmp_path / ".soosef" + data_dir.mkdir() + monkeypatch.setattr(paths, "BASE_DIR", data_dir) + return data_dir + + +def _write_deadman_state( + state_file: Path, + *, + armed: bool, + last_checkin: datetime, + interval_hours: int = 24, + grace_hours: int = 2, +) -> None: + state_file.parent.mkdir(parents=True, exist_ok=True) + state = { + "armed": armed, + "last_checkin": last_checkin.isoformat(), + "interval_hours": interval_hours, + "grace_hours": grace_hours, + } + state_file.write_text(json.dumps(state)) + + +# ── Unit tests: enforcement loop ───────────────────────────────────────────── + + +def test_enforcement_loop_no_op_when_disarmed(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Loop should not call check() when the switch is not armed.""" + from soosef.cli import _deadman_enforcement_loop + from soosef.fieldkit import deadman as deadman_mod + + # Redirect the module-level DEADMAN_STATE constant so DeadmanSwitch() default is our tmp file + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + check_calls = [] + + def fake_check(self): + check_calls.append("fired") + + monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", fake_check) + + iterations = [0] + + def one_shot_sleep(n): + iterations[0] += 1 + if iterations[0] >= 2: + raise StopIteration("stop test loop") + + monkeypatch.setattr(time, "sleep", one_shot_sleep) + + with pytest.raises(StopIteration): + _deadman_enforcement_loop(interval_seconds=0) + + assert check_calls == [] + + +def test_enforcement_loop_fires_when_overdue(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Loop must call DeadmanSwitch.check() when armed and past interval + grace.""" + from soosef.cli import _deadman_enforcement_loop + from soosef.fieldkit import deadman as deadman_mod + + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + last_checkin = datetime.now(UTC) - timedelta(hours=100) + _write_deadman_state( + state_file, + armed=True, + last_checkin=last_checkin, + interval_hours=24, + grace_hours=2, + ) + + check_calls = [] + + def fake_check(self): + check_calls.append("fired") + + monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", fake_check) + monkeypatch.setattr(time, "sleep", lambda n: None) + + _deadman_enforcement_loop(interval_seconds=0) + + assert len(check_calls) == 1 + + +def test_enforcement_loop_exits_after_firing(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """After firing, the loop must return and not call check() a second time.""" + from soosef.cli import _deadman_enforcement_loop + from soosef.fieldkit import deadman as deadman_mod + + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + last_checkin = datetime.now(UTC) - timedelta(hours=100) + _write_deadman_state(state_file, armed=True, last_checkin=last_checkin) + + check_calls = [] + + def fake_check(self): + check_calls.append("fired") + + monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", fake_check) + monkeypatch.setattr(time, "sleep", lambda n: None) + + _deadman_enforcement_loop(interval_seconds=0) + + # Called exactly once — loop exited after firing + assert len(check_calls) == 1 + + +def test_enforcement_loop_tolerates_exceptions(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Transient errors in check() must not kill the loop.""" + from soosef.cli import _deadman_enforcement_loop + from soosef.fieldkit import deadman as deadman_mod + + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + call_count = [0] + + def counting_sleep(n): + call_count[0] += 1 + if call_count[0] >= 3: + raise StopIteration("stop test loop") + + monkeypatch.setattr(time, "sleep", counting_sleep) + + error_calls = [0] + + def flaky_is_armed(self): + error_calls[0] += 1 + if error_calls[0] == 1: + raise OSError("state file temporarily unreadable") + return False # not armed — loop just skips + + monkeypatch.setattr(deadman_mod.DeadmanSwitch, "is_armed", flaky_is_armed) + + with pytest.raises(StopIteration): + _deadman_enforcement_loop(interval_seconds=0) + + # Should have survived the first exception and continued + assert call_count[0] >= 2 + + +# ── Unit tests: _start_deadman_thread ──────────────────────────────────────── + + +def test_start_deadman_thread_is_daemon(monkeypatch: pytest.MonkeyPatch): + """Thread must be a daemon so it dies with the process.""" + from soosef.cli import _start_deadman_thread + + # Patch the loop to exit immediately so the thread doesn't hang in tests + import soosef.cli as cli_mod + + monkeypatch.setattr(cli_mod, "_deadman_enforcement_loop", lambda interval_seconds: None) + + t = _start_deadman_thread(interval_seconds=60) + assert t is not None + assert t.daemon is True + assert t.name == "deadman-enforcement" + t.join(timeout=2) + + +# ── CLI integration: check-deadman ─────────────────────────────────────────── + + +@pytest.fixture() +def cli_runner(): + return CliRunner() + + +def test_check_deadman_disarmed(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch): + """check-deadman exits 0 and prints helpful message when not armed.""" + from soosef.fieldkit import deadman as deadman_mod + from soosef.cli import main + + # Point at an empty tmp dir so the real ~/.soosef/fieldkit/deadman.json isn't read + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + result = cli_runner.invoke(main, ["fieldkit", "check-deadman"]) + assert result.exit_code == 0 + assert "not armed" in result.output + + +def test_check_deadman_armed_ok(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch): + """check-deadman exits 0 when armed and check-in is current.""" + from soosef.fieldkit import deadman as deadman_mod + from soosef.cli import main + + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + last_checkin = datetime.now(UTC) - timedelta(hours=1) + _write_deadman_state( + state_file, + armed=True, + last_checkin=last_checkin, + interval_hours=24, + grace_hours=2, + ) + + result = cli_runner.invoke(main, ["fieldkit", "check-deadman"]) + assert result.exit_code == 0 + assert "OK" in result.output + + +def test_check_deadman_overdue_in_grace(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch): + """check-deadman exits 0 but prints OVERDUE warning when past interval but in grace.""" + from soosef.fieldkit import deadman as deadman_mod + from soosef.cli import main + + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + # Past 24h interval but within 26h total (grace=2) + last_checkin = datetime.now(UTC) - timedelta(hours=25) + _write_deadman_state( + state_file, + armed=True, + last_checkin=last_checkin, + interval_hours=24, + grace_hours=2, + ) + + result = cli_runner.invoke(main, ["fieldkit", "check-deadman"]) + # Not yet fired (grace not expired), so exit code is 0 + assert result.exit_code == 0 + assert "OVERDUE" in result.output + + +def test_check_deadman_fires_when_expired( + tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch +): + """check-deadman exits 2 when the switch has fully expired.""" + from soosef.fieldkit import deadman as deadman_mod + from soosef.cli import main + + state_file = tmp_path / "deadman.json" + monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + + last_checkin = datetime.now(UTC) - timedelta(hours=100) + _write_deadman_state( + state_file, + armed=True, + last_checkin=last_checkin, + interval_hours=24, + grace_hours=2, + ) + + # Patch check() so we don't invoke the real killswitch during tests + monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", lambda self: None) + + result = cli_runner.invoke(main, ["fieldkit", "check-deadman"]) + assert result.exit_code == 2 + assert "killswitch triggered" in result.output.lower() or "expired" in result.output.lower() diff --git a/tests/test_key_rotation.py b/tests/test_key_rotation.py new file mode 100644 index 0000000..eef90ca --- /dev/null +++ b/tests/test_key_rotation.py @@ -0,0 +1,328 @@ +"""Tests for key rotation (rotate_identity, rotate_channel_key).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +from click.testing import CliRunner + +import soosef.paths as _paths +from soosef.cli import main +from soosef.exceptions import KeystoreError +from soosef.keystore.manager import KeystoreManager +from soosef.keystore.models import RotationResult + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_manager(tmp_path: Path) -> KeystoreManager: + """Return a KeystoreManager pointing at isolated temp directories.""" + identity_dir = tmp_path / "identity" + channel_key_file = tmp_path / "stegasoo" / "channel.key" + return KeystoreManager(identity_dir=identity_dir, channel_key_file=channel_key_file) + + +# --------------------------------------------------------------------------- +# rotate_identity +# --------------------------------------------------------------------------- + + +class TestRotateIdentity: + def test_raises_when_no_identity(self, tmp_path: Path): + ks = _make_manager(tmp_path) + with pytest.raises(KeystoreError, match="No identity to rotate"): + ks.rotate_identity() + + def test_returns_rotation_result(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_identity() + + result = ks.rotate_identity() + + assert isinstance(result, RotationResult) + assert result.old_fingerprint + assert result.new_fingerprint + assert result.old_fingerprint != result.new_fingerprint + + def test_old_keys_archived(self, tmp_path: Path): + ks = _make_manager(tmp_path) + original = ks.generate_identity() + + result = ks.rotate_identity() + + archive = result.archive_path + assert archive.is_dir() + assert (archive / "private.pem").exists() + assert (archive / "public.pem").exists() + assert (archive / "rotation.txt").exists() + + # The archived public key must correspond to the *old* fingerprint + import hashlib + + from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, + load_pem_public_key, + ) + + pub_pem = (archive / "public.pem").read_bytes() + pub_key = load_pem_public_key(pub_pem) + pub_raw = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw) + archived_fp = hashlib.sha256(pub_raw).hexdigest()[:32] + assert archived_fp == original.fingerprint + + def test_archive_private_key_permissions(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_identity() + + result = ks.rotate_identity() + + priv = result.archive_path / "private.pem" + assert oct(priv.stat().st_mode & 0o777) == oct(0o600) + + def test_archive_dir_permissions(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_identity() + + result = ks.rotate_identity() + + assert oct(result.archive_path.stat().st_mode & 0o777) == oct(0o700) + + def test_new_identity_active_after_rotation(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_identity() + + result = ks.rotate_identity() + + current = ks.get_identity() + assert current.fingerprint == result.new_fingerprint + + def test_rotation_txt_contains_old_fingerprint(self, tmp_path: Path): + ks = _make_manager(tmp_path) + original = ks.generate_identity() + + result = ks.rotate_identity() + + txt = (result.archive_path / "rotation.txt").read_text() + assert original.fingerprint in txt + + def test_multiple_rotations_create_separate_archives(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_identity() + + r1 = ks.rotate_identity() + r2 = ks.rotate_identity() + + assert r1.archive_path != r2.archive_path + assert r1.archive_path.is_dir() + assert r2.archive_path.is_dir() + + def test_rotation_fingerprints_are_distinct_from_each_other(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_identity() + + r1 = ks.rotate_identity() + r2 = ks.rotate_identity() + + # Each rotation produces a unique new key + assert r1.new_fingerprint == r2.old_fingerprint + assert r1.new_fingerprint != r2.new_fingerprint + + +# --------------------------------------------------------------------------- +# rotate_channel_key +# --------------------------------------------------------------------------- + + +class TestRotateChannelKey: + def test_raises_when_no_channel_key(self, tmp_path: Path): + ks = _make_manager(tmp_path) + with pytest.raises(KeystoreError, match="No channel key to rotate"): + ks.rotate_channel_key() + + def test_raises_for_env_var_key(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("STEGASOO_CHANNEL_KEY", "a" * 64) + ks = _make_manager(tmp_path) + # has_channel_key() returns True (env), but file doesn't exist + with pytest.raises(KeystoreError, match="environment variable"): + ks.rotate_channel_key() + + def test_returns_rotation_result(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_channel_key() + + result = ks.rotate_channel_key() + + assert isinstance(result, RotationResult) + assert result.old_fingerprint + assert result.new_fingerprint + assert result.old_fingerprint != result.new_fingerprint + + def test_old_key_archived(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_channel_key() + + result = ks.rotate_channel_key() + + archive = result.archive_path + assert archive.is_dir() + assert (archive / "channel.key").exists() + assert (archive / "rotation.txt").exists() + + def test_archive_channel_key_permissions(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_channel_key() + + result = ks.rotate_channel_key() + + key_file = result.archive_path / "channel.key" + assert oct(key_file.stat().st_mode & 0o777) == oct(0o600) + + def test_archived_key_matches_old_fingerprint(self, tmp_path: Path): + from stegasoo.crypto import get_channel_fingerprint + + ks = _make_manager(tmp_path) + ks.generate_channel_key() + # Get fingerprint before rotation + old_key = ks.get_channel_key() + old_fp = get_channel_fingerprint(old_key) + + result = ks.rotate_channel_key() + + archived_key = (result.archive_path / "channel.key").read_text().strip() + assert get_channel_fingerprint(archived_key) == old_fp + assert old_fp == result.old_fingerprint + + def test_new_channel_key_active_after_rotation(self, tmp_path: Path): + from stegasoo.crypto import get_channel_fingerprint + + ks = _make_manager(tmp_path) + ks.generate_channel_key() + + result = ks.rotate_channel_key() + + current_key = ks.get_channel_key() + current_fp = get_channel_fingerprint(current_key) + assert current_fp == result.new_fingerprint + + def test_multiple_rotations_create_separate_archives(self, tmp_path: Path): + ks = _make_manager(tmp_path) + ks.generate_channel_key() + + r1 = ks.rotate_channel_key() + r2 = ks.rotate_channel_key() + + assert r1.archive_path != r2.archive_path + assert (r1.archive_path / "channel.key").exists() + assert (r2.archive_path / "channel.key").exists() + + +# --------------------------------------------------------------------------- +# CLI integration +# --------------------------------------------------------------------------- + + +class TestRotateCLI: + def _init_soosef(self, tmp_path: Path) -> Path: + """Create the minimal directory + key structure for CLI tests. + + Temporarily sets paths.BASE_DIR so the lazy-resolved KeystoreManager + writes keys to the same location the CLI will read from when invoked + with --data-dir pointing at the same directory. + """ + data_dir = tmp_path / ".soosef" + original_base = _paths.BASE_DIR + try: + _paths.BASE_DIR = data_dir + ks = KeystoreManager() # uses lazy _paths resolution + ks.generate_identity() + ks.generate_channel_key() + finally: + _paths.BASE_DIR = original_base + return data_dir + + def test_rotate_identity_cli_success(self, tmp_path: Path): + data_dir = self._init_soosef(tmp_path) + runner = CliRunner() + + result = runner.invoke( + main, + ["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.output + assert "rotated successfully" in result.output + assert "Old fingerprint:" in result.output + assert "New fingerprint:" in result.output + assert "IMPORTANT:" in result.output + + def test_rotate_identity_cli_no_identity(self, tmp_path: Path): + data_dir = tmp_path / ".soosef" + data_dir.mkdir() + runner = CliRunner() + + result = runner.invoke( + main, + ["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"], + ) + + assert result.exit_code != 0 + assert "Error" in result.output + + def test_rotate_channel_cli_success(self, tmp_path: Path): + data_dir = self._init_soosef(tmp_path) + runner = CliRunner() + + result = runner.invoke( + main, + ["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.output + assert "rotated successfully" in result.output + assert "Old fingerprint:" in result.output + assert "New fingerprint:" in result.output + assert "IMPORTANT:" in result.output + + def test_rotate_channel_cli_no_key(self, tmp_path: Path): + data_dir = tmp_path / ".soosef" + data_dir.mkdir() + runner = CliRunner() + + result = runner.invoke( + main, + ["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"], + ) + + assert result.exit_code != 0 + assert "Error" in result.output + + def test_rotate_identity_aborts_without_confirmation(self, tmp_path: Path): + data_dir = self._init_soosef(tmp_path) + runner = CliRunner() + + # Simulate the user typing "n" at the confirmation prompt + result = runner.invoke( + main, + ["--data-dir", str(data_dir), "keys", "rotate-identity"], + input="n\n", + ) + + assert result.exit_code != 0 + + def test_rotate_channel_aborts_without_confirmation(self, tmp_path: Path): + data_dir = self._init_soosef(tmp_path) + runner = CliRunner() + + result = runner.invoke( + main, + ["--data-dir", str(data_dir), "keys", "rotate-channel"], + input="n\n", + ) + + assert result.exit_code != 0 diff --git a/tests/test_killswitch.py b/tests/test_killswitch.py new file mode 100644 index 0000000..1126bef --- /dev/null +++ b/tests/test_killswitch.py @@ -0,0 +1,134 @@ +"""Tests for killswitch — verifies emergency purge destroys all sensitive data.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import ( + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, +) + + +@pytest.fixture() +def populated_soosef(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Create a populated ~/.soosef directory with identity, chain, attestations, etc.""" + import soosef.paths as paths + + data_dir = tmp_path / ".soosef" + data_dir.mkdir() + monkeypatch.setattr(paths, "BASE_DIR", data_dir) + + # Create identity + identity_dir = data_dir / "identity" + identity_dir.mkdir() + key = Ed25519PrivateKey.generate() + priv_pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) + (identity_dir / "private.pem").write_bytes(priv_pem) + pub_pem = key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) + (identity_dir / "public.pem").write_bytes(pub_pem) + + # Create channel key + stegasoo_dir = data_dir / "stegasoo" + stegasoo_dir.mkdir() + (stegasoo_dir / "channel.key").write_text("test-channel-key") + + # Create chain data + from soosef.federation.chain import ChainStore + + chain_dir = data_dir / "chain" + store = ChainStore(chain_dir) + for i in range(3): + store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", key) + + # Create attestation dir with a dummy file + att_dir = data_dir / "attestations" + att_dir.mkdir() + (att_dir / "log.bin").write_bytes(b"dummy attestation data") + + # Create other dirs + (data_dir / "auth").mkdir() + (data_dir / "auth" / "soosef.db").write_bytes(b"dummy db") + (data_dir / "temp").mkdir() + (data_dir / "temp" / "file.tmp").write_bytes(b"tmp") + (data_dir / "instance").mkdir() + (data_dir / "instance" / ".secret_key").write_bytes(b"secret") + (data_dir / "config.json").write_text("{}") + + return data_dir + + +def test_purge_all_destroys_chain_data(populated_soosef: Path): + """CRITICAL: execute_purge(ALL) must destroy chain directory.""" + from soosef.fieldkit.killswitch import PurgeScope, execute_purge + + chain_dir = populated_soosef / "chain" + assert chain_dir.exists() + assert (chain_dir / "chain.bin").exists() + + result = execute_purge(PurgeScope.ALL, reason="test") + + assert not chain_dir.exists(), "Chain directory must be destroyed by killswitch" + assert "destroy_chain_data" in result.steps_completed + + +def test_purge_all_destroys_identity(populated_soosef: Path): + """execute_purge(ALL) must destroy identity keys.""" + from soosef.fieldkit.killswitch import PurgeScope, execute_purge + + assert (populated_soosef / "identity" / "private.pem").exists() + + result = execute_purge(PurgeScope.ALL, reason="test") + + assert not (populated_soosef / "identity").exists() + assert "destroy_identity_keys" in result.steps_completed + + +def test_purge_all_destroys_attestation_log(populated_soosef: Path): + """execute_purge(ALL) must destroy the Verisoo attestation log.""" + from soosef.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.ALL, reason="test") + + assert not (populated_soosef / "attestations").exists() + assert "destroy_attestation_log" in result.steps_completed + + +def test_purge_keys_only_preserves_chain(populated_soosef: Path): + """KEYS_ONLY purge destroys keys but preserves chain and attestation data.""" + from soosef.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + # Keys gone + assert not (populated_soosef / "identity").exists() + assert "destroy_identity_keys" in result.steps_completed + + # Chain and attestations preserved (KEYS_ONLY doesn't touch data) + assert (populated_soosef / "chain" / "chain.bin").exists() + assert (populated_soosef / "attestations" / "log.bin").exists() + + +def test_purge_reports_all_steps(populated_soosef: Path): + """execute_purge(ALL) reports all expected steps including chain.""" + from soosef.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.ALL, reason="test") + + expected_steps = [ + "destroy_identity_keys", + "destroy_channel_key", + "destroy_flask_secret", + "destroy_auth_db", + "destroy_attestation_log", + "destroy_chain_data", + "destroy_temp_files", + "destroy_config", + ] + for step in expected_steps: + assert step in result.steps_completed, f"Missing purge step: {step}" diff --git a/tests/test_serialization.py b/tests/test_serialization.py new file mode 100644 index 0000000..138bd0b --- /dev/null +++ b/tests/test_serialization.py @@ -0,0 +1,123 @@ +"""Tests for CBOR serialization of chain records.""" + +from __future__ import annotations + +import hashlib + +from soosef.federation.models import AttestationChainRecord, ChainState, EntropyWitnesses +from soosef.federation.serialization import ( + canonical_bytes, + compute_record_hash, + deserialize_record, + serialize_record, +) + + +def _make_record(**overrides) -> AttestationChainRecord: + """Create a minimal test record with sensible defaults.""" + defaults = { + "version": 1, + "record_id": b"\x01" * 16, + "chain_index": 0, + "prev_hash": ChainState.GENESIS_PREV_HASH, + "content_hash": hashlib.sha256(b"test content").digest(), + "content_type": "test/plain", + "metadata": {}, + "claimed_ts": 1_700_000_000_000_000, + "entropy_witnesses": EntropyWitnesses( + sys_uptime=12345.678, + fs_snapshot=b"\xab" * 16, + proc_entropy=256, + boot_id="test-boot-id", + ), + "signer_pubkey": b"\x02" * 32, + "signature": b"\x03" * 64, + } + defaults.update(overrides) + return AttestationChainRecord(**defaults) + + +def test_canonical_bytes_deterministic(): + """Same record always produces the same canonical bytes.""" + record = _make_record() + b1 = canonical_bytes(record) + b2 = canonical_bytes(record) + assert b1 == b2 + + +def test_canonical_bytes_excludes_signature(): + """Canonical bytes must not include the signature field.""" + record_a = _make_record(signature=b"\x03" * 64) + record_b = _make_record(signature=b"\x04" * 64) + assert canonical_bytes(record_a) == canonical_bytes(record_b) + + +def test_canonical_bytes_sensitive_to_content(): + """Different content_hash must produce different canonical bytes.""" + record_a = _make_record(content_hash=hashlib.sha256(b"a").digest()) + record_b = _make_record(content_hash=hashlib.sha256(b"b").digest()) + assert canonical_bytes(record_a) != canonical_bytes(record_b) + + +def test_serialize_deserialize_round_trip(): + """A record survives serialization and deserialization intact.""" + original = _make_record() + data = serialize_record(original) + restored = deserialize_record(data) + + assert restored.version == original.version + assert restored.record_id == original.record_id + assert restored.chain_index == original.chain_index + assert restored.prev_hash == original.prev_hash + assert restored.content_hash == original.content_hash + assert restored.content_type == original.content_type + assert restored.metadata == original.metadata + assert restored.claimed_ts == original.claimed_ts + assert restored.signer_pubkey == original.signer_pubkey + assert restored.signature == original.signature + + # Entropy witnesses + assert restored.entropy_witnesses is not None + assert restored.entropy_witnesses.sys_uptime == original.entropy_witnesses.sys_uptime + assert restored.entropy_witnesses.fs_snapshot == original.entropy_witnesses.fs_snapshot + assert restored.entropy_witnesses.proc_entropy == original.entropy_witnesses.proc_entropy + assert restored.entropy_witnesses.boot_id == original.entropy_witnesses.boot_id + + +def test_serialize_includes_signature(): + """Full serialization must include the signature.""" + record = _make_record(signature=b"\xaa" * 64) + data = serialize_record(record) + restored = deserialize_record(data) + assert restored.signature == b"\xaa" * 64 + + +def test_compute_record_hash(): + """Record hash is SHA-256 of canonical bytes.""" + record = _make_record() + expected = hashlib.sha256(canonical_bytes(record)).digest() + assert compute_record_hash(record) == expected + + +def test_record_hash_changes_with_content(): + """Different records produce different hashes.""" + a = _make_record(content_hash=hashlib.sha256(b"a").digest()) + b = _make_record(content_hash=hashlib.sha256(b"b").digest()) + assert compute_record_hash(a) != compute_record_hash(b) + + +def test_metadata_preserved(): + """Arbitrary metadata survives round-trip.""" + meta = {"backfilled": True, "caption": "test photo", "tags": ["evidence", "urgent"]} + record = _make_record(metadata=meta) + data = serialize_record(record) + restored = deserialize_record(data) + assert restored.metadata == meta + + +def test_empty_entropy_witnesses(): + """Record with no entropy witnesses round-trips correctly.""" + record = _make_record(entropy_witnesses=None) + data = serialize_record(record) + restored = deserialize_record(data) + assert restored.entropy_witnesses is None