""" Emergency data destruction — the killswitch. Ordered destruction to maximize what's gone before any interruption. Priority: keys first (most sensitive), then data, then logs. On SSDs/flash where shred is less effective, key destruction is the real win — without keys, the encrypted data is unrecoverable. """ from __future__ import annotations import enum import logging import platform import shutil import subprocess from dataclasses import dataclass, field from pathlib import Path import soosef.paths as paths logger = logging.getLogger(__name__) class PurgeScope(enum.Enum): """What to destroy.""" KEYS_ONLY = "keys_only" # Just key material ALL = "all" # Everything @dataclass class PurgeResult: """Report of what was destroyed.""" steps_completed: list[str] = field(default_factory=list) steps_failed: list[tuple[str, str]] = field(default_factory=list) fully_purged: bool = False def _secure_delete_file(path: Path) -> None: """Overwrite and delete a file. Best-effort on flash storage.""" if not path.exists(): return if platform.system() == "Linux": try: subprocess.run( ["shred", "-u", "-z", "-n", "3", str(path)], timeout=30, capture_output=True, ) return except (subprocess.TimeoutExpired, FileNotFoundError): pass # Fallback: overwrite with zeros then delete size = path.stat().st_size with open(path, "wb") as f: f.write(b"\x00" * size) f.flush() path.unlink() def _secure_delete_dir(path: Path) -> None: """Recursively secure-delete all files in a directory, then remove it.""" if not path.exists(): return for child in path.rglob("*"): if child.is_file(): _secure_delete_file(child) shutil.rmtree(path, ignore_errors=True) def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") -> PurgeResult: """ Execute emergency purge. Destruction order is intentional — keys go first because they're the smallest and most critical. Even if the purge is interrupted after step 1, the remaining data is cryptographically useless. """ result = PurgeResult() logger.warning("KILLSWITCH ACTIVATED — reason: %s, scope: %s", reason, scope.value) steps: list[tuple[str, callable]] = [ ("destroy_identity_keys", lambda: _secure_delete_dir(paths.IDENTITY_DIR)), ("destroy_channel_key", lambda: _secure_delete_file(paths.CHANNEL_KEY_FILE)), ("destroy_flask_secret", lambda: _secure_delete_file(paths.INSTANCE_DIR / ".secret_key")), ] if scope == PurgeScope.ALL: steps.extend( [ ("destroy_auth_db", lambda: _secure_delete_file(paths.AUTH_DB)), ("destroy_attestation_log", lambda: _secure_delete_dir(paths.ATTESTATIONS_DIR)), ("destroy_chain_data", lambda: _secure_delete_dir(paths.CHAIN_DIR)), ("destroy_temp_files", lambda: _secure_delete_dir(paths.TEMP_DIR)), ("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)), ("clear_journald", _clear_system_logs), ] ) for name, action in steps: try: action() result.steps_completed.append(name) logger.info("Purge step completed: %s", name) except Exception as e: result.steps_failed.append((name, str(e))) logger.error("Purge step failed: %s — %s", name, e) result.fully_purged = len(result.steps_failed) == 0 return result def _clear_system_logs() -> None: """Best-effort clearing of system journal entries for soosef.""" if platform.system() != "Linux": return try: subprocess.run( ["journalctl", "--vacuum-time=1s", "--unit=soosef*"], timeout=10, capture_output=True, ) except (subprocess.TimeoutExpired, FileNotFoundError): pass # ── Hardware GPIO killswitch ───────────────────────────────────────── try: from gpiozero import Button HAS_GPIO = True except ImportError: HAS_GPIO = False def watch_hardware_button( pin: int = 17, hold_seconds: float = 5.0, callback: callable | None = None, ) -> None: """ Monitor GPIO pin for physical killswitch button. Requires holding for hold_seconds to prevent accidental trigger. """ if not HAS_GPIO: logger.warning("gpiozero not available — hardware killswitch disabled") return def _on_held(): logger.warning("Hardware killswitch button held for %ss — firing", hold_seconds) if callback: callback() else: execute_purge(PurgeScope.ALL, reason="hardware_button") btn = Button(pin, hold_time=hold_seconds) btn.when_held = _on_held logger.info("Hardware killswitch armed on GPIO pin %d (hold %ss to trigger)", pin, hold_seconds)