""" Unified key management for SooSeF. Owns all key material across both Stegasoo (channel keys) and Verisoo (Ed25519 identity). Single entry point for key generation, retrieval, rotation, and export. """ from __future__ import annotations import os from pathlib import Path import soosef.paths as _paths from soosef.exceptions import KeystoreError from soosef.keystore.models import IdentityInfo, KeystoreStatus, RotationResult class KeystoreManager: """Manages all key material for a SooSeF instance.""" def __init__(self, identity_dir: Path | None = None, channel_key_file: Path | None = None): # Use lazy path resolution so that --data-dir / SOOSEF_DATA_DIR overrides # propagate correctly when paths.BASE_DIR is changed at runtime. self._identity_dir = identity_dir or _paths.IDENTITY_DIR self._channel_key_file = channel_key_file or _paths.CHANNEL_KEY_FILE # ── Verisoo Identity (Ed25519) ────────────────────────────────── def has_identity(self) -> bool: """Check if an Ed25519 identity exists.""" return (self._identity_dir / "private.pem").exists() def _identity_meta_path(self) -> Path: """Path to the identity creation-timestamp sidecar file.""" return self._identity_dir / "identity.meta.json" def get_identity(self) -> IdentityInfo: """Get identity info. Raises KeystoreError if no identity exists.""" pub_path = self._identity_dir / "public.pem" priv_path = self._identity_dir / "private.pem" if not pub_path.exists(): raise KeystoreError("No identity found. Run 'soosef init' to generate one.") from cryptography.hazmat.primitives.serialization import ( Encoding, PublicFormat, load_pem_public_key, ) pub_pem = pub_path.read_bytes() public_key = load_pem_public_key(pub_pem) pub_raw = public_key.public_bytes(Encoding.Raw, PublicFormat.Raw) import hashlib fingerprint = hashlib.sha256(pub_raw).hexdigest()[:32] # Resolve created_at from the sidecar written by generate_identity(). # Fall back to private key mtime for keys generated before the sidecar # was introduced (legacy compatibility). from datetime import UTC, datetime created_at: datetime | None = None meta_path = self._identity_meta_path() if meta_path.exists(): try: import json meta = json.loads(meta_path.read_text()) created_at = datetime.fromisoformat(meta["created_at"]) except Exception: pass # malformed sidecar — fall through to mtime if created_at is None and priv_path.exists(): created_at = datetime.fromtimestamp(priv_path.stat().st_mtime, tz=UTC) return IdentityInfo( fingerprint=fingerprint, public_key_pem=pub_pem.decode(), created_at=created_at, has_private_key=priv_path.exists(), ) def _archive_dir_for(self, parent: Path) -> Path: """Return a timestamped archive subdirectory under *parent*/archived/. The timestamp uses ISO-8601 basic format (no colons) so the directory name is safe on all filesystems: ``archived/2026-04-01T120000Z``. """ from datetime import UTC, datetime ts = datetime.now(UTC).strftime("%Y-%m-%dT%H%M%S_%fZ") return parent / "archived" / ts def generate_identity(self, password: bytes | None = None) -> IdentityInfo: """Generate a new Ed25519 keypair. Security note: the private key is stored unencrypted by default. This is intentional — the killswitch (secure deletion) is the primary defense for at-risk users, not key encryption. A password- protected key would require prompting on every attestation and chain operation, which is unworkable in field conditions. The key file is protected by 0o600 permissions. """ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import ( BestAvailableEncryption, Encoding, NoEncryption, PrivateFormat, PublicFormat, ) self._identity_dir.mkdir(parents=True, exist_ok=True) private_key = Ed25519PrivateKey.generate() public_key = private_key.public_key() # Save private key encryption = BestAvailableEncryption(password) if password else NoEncryption() priv_pem = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, encryption) priv_path = self._identity_dir / "private.pem" priv_path.write_bytes(priv_pem) priv_path.chmod(0o600) # Save public key pub_pem = public_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) pub_path = self._identity_dir / "public.pem" pub_path.write_bytes(pub_pem) # Write creation timestamp sidecar so get_identity() always returns an # authoritative created_at without relying on filesystem mtime. import json from datetime import UTC, datetime meta_path = self._identity_meta_path() meta_path.write_text(json.dumps({"created_at": datetime.now(UTC).isoformat()}, indent=None)) return self.get_identity() def rotate_identity(self, password: bytes | None = None) -> RotationResult: """Rotate the Ed25519 identity keypair. The current private and public keys are copied verbatim to a timestamped archive directory before the new keypair is generated. Both old and new fingerprints are returned so the caller can report them and prompt the user to notify collaborators. Raises KeystoreError if no identity exists yet (use generate_identity for initial setup). """ import shutil if not self.has_identity(): raise KeystoreError("No identity to rotate. Run 'soosef init' first.") old_info = self.get_identity() # Archive the current keypair under identity/archived// archive_dir = self._archive_dir_for(self._identity_dir) archive_dir.mkdir(parents=True, exist_ok=True) archive_dir.chmod(0o700) priv_src = self._identity_dir / "private.pem" pub_src = self._identity_dir / "public.pem" meta_src = self._identity_meta_path() shutil.copy2(priv_src, archive_dir / "private.pem") (archive_dir / "private.pem").chmod(0o600) shutil.copy2(pub_src, archive_dir / "public.pem") if meta_src.exists(): shutil.copy2(meta_src, archive_dir / "identity.meta.json") # Write a small provenance note alongside the archived key so an # operator can reconstruct the rotation timeline without tooling. from datetime import UTC, datetime (archive_dir / "rotation.txt").write_text( f"Rotated at: {datetime.now(UTC).isoformat()}\n" f"Old fingerprint: {old_info.fingerprint}\n" ) new_info = self.generate_identity(password=password) return RotationResult( old_fingerprint=old_info.fingerprint, new_fingerprint=new_info.fingerprint, archive_path=archive_dir, ) # ── Stegasoo Channel Key ──────────────────────────────────────── def has_channel_key(self) -> bool: """Check if a channel key is configured.""" return bool(os.environ.get("STEGASOO_CHANNEL_KEY")) or self._channel_key_file.exists() def get_channel_key(self) -> str | None: """Get the channel key, or None if not configured.""" env_key = os.environ.get("STEGASOO_CHANNEL_KEY") if env_key: return env_key if self._channel_key_file.exists(): return self._channel_key_file.read_text().strip() return None def set_channel_key(self, key: str) -> None: """Store a channel key.""" from stegasoo import validate_channel_key validate_channel_key(key) self._channel_key_file.parent.mkdir(parents=True, exist_ok=True) self._channel_key_file.write_text(key) self._channel_key_file.chmod(0o600) def generate_channel_key(self) -> str: """Generate and store a new channel key.""" from stegasoo import generate_channel_key key: str = generate_channel_key() self.set_channel_key(key) return key def rotate_channel_key(self) -> RotationResult: """Rotate the Stegasoo channel key. The current key is copied to a timestamped archive directory before the new key is generated. Both old and new channel fingerprints are returned. Raises KeystoreError if no channel key exists yet (use generate_channel_key for initial setup). """ import shutil if not self.has_channel_key(): raise KeystoreError("No channel key to rotate. Run 'soosef init' first.") # Only file-based keys can be archived; env-var keys have no on-disk # representation to back up, so we refuse rather than silently skip. if not self._channel_key_file.exists(): raise KeystoreError( "Channel key is set via STEGASOO_CHANNEL_KEY environment variable " "and cannot be rotated through soosef. Unset the variable and store " "the key in the keystore first." ) from stegasoo.crypto import get_channel_fingerprint old_key = self._channel_key_file.read_text().strip() old_fp = get_channel_fingerprint(old_key) # Archive under stegasoo/archived//channel.key archive_dir = self._archive_dir_for(self._channel_key_file.parent) archive_dir.mkdir(parents=True, exist_ok=True) archive_dir.chmod(0o700) shutil.copy2(self._channel_key_file, archive_dir / "channel.key") (archive_dir / "channel.key").chmod(0o600) from datetime import UTC, datetime (archive_dir / "rotation.txt").write_text( f"Rotated at: {datetime.now(UTC).isoformat()}\n" f"Old fingerprint: {old_fp}\n" ) new_key = self.generate_channel_key() new_fp = get_channel_fingerprint(new_key) return RotationResult( old_fingerprint=old_fp, new_fingerprint=new_fp, archive_path=archive_dir, ) # ── Unified Status ────────────────────────────────────────────── def status(self) -> KeystoreStatus: """Get an overview of all managed key material.""" channel_fp = None if self.has_channel_key(): key = self.get_channel_key() if key: from stegasoo.crypto import get_channel_fingerprint channel_fp = get_channel_fingerprint(key) identity_fp = None if self.has_identity(): identity_fp = self.get_identity().fingerprint return KeystoreStatus( has_identity=self.has_identity(), identity_fingerprint=identity_fp, has_channel_key=self.has_channel_key(), channel_fingerprint=channel_fp, )