""" Emergency data destruction — the killswitch. Ordered destruction to maximize what's gone before any interruption. Priority: keys first (most sensitive), then data, then logs. On SSDs/flash where shred is less effective, key destruction is the real win — without keys, the encrypted data is unrecoverable. """ from __future__ import annotations import enum import logging import platform import shutil import subprocess import sys from collections.abc import Callable from dataclasses import dataclass, field from pathlib import Path import soosef.paths as paths logger = logging.getLogger(__name__) class PurgeScope(enum.Enum): """What to destroy.""" KEYS_ONLY = "keys_only" # Just key material ALL = "all" # Everything @dataclass class PurgeResult: """Report of what was destroyed.""" steps_completed: list[str] = field(default_factory=list) steps_failed: list[tuple[str, str]] = field(default_factory=list) fully_purged: bool = False def _secure_delete_file(path: Path) -> None: """Overwrite and delete a file. Best-effort on flash storage.""" if not path.exists(): return if platform.system() == "Linux": try: result = subprocess.run( ["shred", "-u", "-z", "-n", "3", str(path)], timeout=30, capture_output=True, ) if result.returncode == 0: return # shred failed (permissions, read-only FS, etc.) — fall through to overwrite except (subprocess.TimeoutExpired, FileNotFoundError): pass # Fallback: overwrite with zeros then delete size = path.stat().st_size with open(path, "wb") as f: f.write(b"\x00" * size) f.flush() path.unlink() def _secure_delete_dir(path: Path) -> None: """Recursively secure-delete all files in a directory, then remove it.""" if not path.exists(): return for child in path.rglob("*"): if child.is_file(): _secure_delete_file(child) shutil.rmtree(path, ignore_errors=True) def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") -> PurgeResult: """ Execute emergency purge. Destruction order is intentional — keys go first because they're the smallest and most critical. Even if the purge is interrupted after step 1, the remaining data is cryptographically useless. """ result = PurgeResult() # Disable all logging BEFORE activation to prevent the audit log # from recording killswitch activity that could survive an interrupted purge. logging.disable(logging.CRITICAL) steps: list[tuple[str, Callable]] = [ ("destroy_identity_keys", lambda: _secure_delete_dir(paths.IDENTITY_DIR)), ("destroy_channel_key", lambda: _secure_delete_file(paths.CHANNEL_KEY_FILE)), ("destroy_flask_secret", lambda: _secure_delete_file(paths.INSTANCE_DIR / ".secret_key")), ] if scope == PurgeScope.ALL: steps.extend( [ # Audit log destroyed EARLY — before other data — to minimize # forensic evidence if the purge is interrupted. ("destroy_audit_log", lambda: _secure_delete_file(paths.AUDIT_LOG)), ("destroy_auth_db", lambda: _secure_delete_file(paths.AUTH_DB)), ("destroy_attestation_log", lambda: _secure_delete_dir(paths.ATTESTATIONS_DIR)), ("destroy_chain_data", lambda: _secure_delete_dir(paths.CHAIN_DIR)), ("destroy_temp_files", lambda: _secure_delete_dir(paths.TEMP_DIR)), ("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)), ("clear_journald", _clear_system_logs), ("deep_forensic_scrub", _deep_forensic_scrub), ("uninstall_package", _uninstall_package), ] ) for name, action in steps: try: action() result.steps_completed.append(name) logger.info("Purge step completed: %s", name) except Exception as e: result.steps_failed.append((name, str(e))) logger.error("Purge step failed: %s — %s", name, e) result.fully_purged = len(result.steps_failed) == 0 return result def _clear_system_logs() -> None: """Best-effort clearing of system journal entries for soosef.""" if platform.system() != "Linux": return try: subprocess.run( ["journalctl", "--vacuum-time=1s", "--unit=soosef*"], timeout=10, capture_output=True, ) except (subprocess.TimeoutExpired, FileNotFoundError): pass def _uninstall_package() -> None: """Best-effort self-uninstall of the soosef pip package.""" try: subprocess.run( [sys.executable, "-m", "pip", "uninstall", "-y", "soosef"], timeout=30, capture_output=True, ) except (subprocess.TimeoutExpired, FileNotFoundError): pass def _deep_forensic_scrub() -> None: """Best-effort removal of all forensic traces of SooSeF installation. Targets: - Python __pycache__ and .pyc files for soosef/stegasoo/verisoo - pip dist-info directories - pip download cache - Shell history entries containing 'soosef' """ import glob import site # Scrub __pycache__ and dist-info in site-packages for site_dir in site.getsitepackages() + [site.getusersitepackages()]: if not isinstance(site_dir, str): continue site_path = Path(site_dir) if not site_path.exists(): continue for pattern in ["soosef*", "stegasoo*", "verisoo*"]: for match in site_path.glob(pattern): try: if match.is_dir(): shutil.rmtree(match) else: _secure_delete_file(match) except OSError: pass # Scrub pip cache pip_cache = Path.home() / ".cache" / "pip" if pip_cache.exists(): for pattern in ["*soosef*", "*stegasoo*", "*verisoo*"]: for match in pip_cache.rglob(pattern): try: if match.is_dir(): shutil.rmtree(match) else: match.unlink() except OSError: pass # Scrub shell history (best-effort, rewrite without soosef lines) for hist_file in [".bash_history", ".zsh_history", ".local/share/fish/fish_history"]: hist_path = Path.home() / hist_file if hist_path.exists(): try: lines = hist_path.read_text().splitlines() cleaned = [l for l in lines if "soosef" not in l.lower()] hist_path.write_text("\n".join(cleaned) + "\n") except OSError: pass # ── Hardware GPIO killswitch ───────────────────────────────────────── try: from gpiozero import Button HAS_GPIO = True except ImportError: HAS_GPIO = False def watch_hardware_button( pin: int = 17, hold_seconds: float = 5.0, callback: Callable | None = None, ) -> None: """ Monitor GPIO pin for physical killswitch button. Requires holding for hold_seconds to prevent accidental trigger. """ if not HAS_GPIO: logger.warning("gpiozero not available — hardware killswitch disabled") return def _on_held(): logger.warning("Hardware killswitch button held for %ss — firing", hold_seconds) if callback: callback() else: execute_purge(PurgeScope.ALL, reason="hardware_button") btn = Button(pin, hold_time=hold_seconds) btn.when_held = _on_held logger.info("Hardware killswitch armed on GPIO pin %d (hold %ss to trigger)", pin, hold_seconds)