Add core modules, web frontend, CLI, keystore, and fieldkit

Core:
- paths.py: centralized ~/.soosef/ path constants
- config.py: JSON config loader with dataclass defaults
- exceptions.py: SoosefError hierarchy
- cli.py: unified Click CLI wrapping stegasoo + verisoo + native commands

Keystore:
- manager.py: unified key management (Ed25519 identity + channel keys)
- models.py: IdentityInfo, KeystoreStatus dataclasses
- export.py: encrypted key bundle export/import for USB transfer

Fieldkit:
- killswitch.py: ordered emergency data destruction (keys first)
- deadman.py: dead man's switch with check-in timer
- tamper.py: SHA-256 file integrity baseline + checking
- usb_monitor.py: pyudev USB whitelist enforcement
- geofence.py: haversine-based GPS boundary checking

Web frontend (Flask app factory + blueprints):
- app.py: create_app() factory with context processor
- blueprints: stego, attest, fieldkit, keys, admin
- templates: base.html (dark theme, unified nav), dashboard, all section pages
- static: CSS, favicon

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-03-31 14:30:13 -04:00
parent 06485879d2
commit b8d4eb5933
41 changed files with 2193 additions and 0 deletions

288
src/soosef/cli.py Normal file
View File

@@ -0,0 +1,288 @@
"""
SooSeF unified CLI.
Wraps Stegasoo and Verisoo CLIs as sub-command groups,
plus native SooSeF commands for init, fieldkit, keys, and serve.
"""
from __future__ import annotations
from pathlib import Path
import click
@click.group()
@click.option(
"--data-dir",
envvar="SOOSEF_DATA_DIR",
type=click.Path(path_type=Path),
help="Override data directory (default: ~/.soosef)",
)
@click.option("--json", "json_output", is_flag=True, help="JSON output mode")
@click.version_option(package_name="soosef")
@click.pass_context
def main(ctx, data_dir, json_output):
"""SooSeF — Soo Security Fieldkit"""
ctx.ensure_object(dict)
ctx.obj["json"] = json_output
if data_dir:
import soosef.paths as paths
paths.BASE_DIR = data_dir
# ── Init ────────────────────────────────────────────────────────────
@main.command()
@click.option("--no-identity", is_flag=True, help="Skip Ed25519 identity generation")
@click.option("--no-channel", is_flag=True, help="Skip channel key generation")
@click.pass_context
def init(ctx, no_identity, no_channel):
"""Initialize a new SooSeF instance — generate keys and create directory structure."""
from soosef.paths import ensure_dirs
from soosef.keystore.manager import KeystoreManager
from soosef.config import SoosefConfig
click.echo("Initializing SooSeF...")
ensure_dirs()
click.echo(" Created directory structure")
config = SoosefConfig()
config.save()
click.echo(" Created default config")
ks = KeystoreManager()
if not no_identity:
if ks.has_identity():
click.echo(" Identity already exists — skipping")
else:
info = ks.generate_identity()
click.echo(f" Generated Ed25519 identity: {info.fingerprint[:16]}...")
if not no_channel:
if ks.has_channel_key():
click.echo(" Channel key already exists — skipping")
else:
key = ks.generate_channel_key()
click.echo(f" Generated channel key: {key[:8]}...")
click.echo("Done. Run 'soosef serve' to start the web UI.")
# ── Serve ───────────────────────────────────────────────────────────
@main.command()
@click.option("--host", default="127.0.0.1", help="Bind address")
@click.option("--port", default=5000, type=int, help="Port")
@click.option("--no-https", is_flag=True, help="Disable HTTPS")
@click.option("--debug", is_flag=True, help="Debug mode")
def serve(host, port, no_https, debug):
"""Start the SooSeF web UI."""
from soosef.config import SoosefConfig
config = SoosefConfig.load()
config.host = host
config.port = port
if no_https:
config.https_enabled = False
from frontends.web.app import create_app
app = create_app(config)
ssl_context = None
if config.https_enabled:
from soosef.paths import SSL_CERT, SSL_KEY, CERTS_DIR
CERTS_DIR.mkdir(parents=True, exist_ok=True)
if not SSL_CERT.exists():
click.echo("Generating self-signed SSL certificate...")
_generate_self_signed_cert(SSL_CERT, SSL_KEY)
ssl_context = (str(SSL_CERT), str(SSL_KEY))
click.echo(f"Starting SooSeF on {'https' if ssl_context else 'http'}://{host}:{port}")
app.run(host=host, port=port, debug=debug, ssl_context=ssl_context)
def _generate_self_signed_cert(cert_path: Path, key_path: Path) -> None:
"""Generate a self-signed certificate for development/local use."""
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from datetime import datetime, timedelta, UTC
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "SooSeF Local"),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(UTC))
.not_valid_after(datetime.now(UTC) + timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName("localhost"),
x509.IPAddress(b"\x7f\x00\x00\x01".__class__(0x7F000001)),
]),
critical=False,
)
.sign(key, hashes.SHA256())
)
key_path.write_bytes(
key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.NoEncryption())
)
key_path.chmod(0o600)
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
# ── Stegasoo sub-commands ───────────────────────────────────────────
@main.group()
def stego():
"""Steganography operations (Stegasoo)."""
pass
try:
from stegasoo.cli import main as stegasoo_cli
# Re-register stegasoo commands under the 'stego' group
for name, cmd in stegasoo_cli.commands.items():
stego.add_command(cmd, name)
except ImportError:
@stego.command()
def unavailable():
"""Stegasoo is not installed."""
click.echo("Error: stegasoo package not found. Install with: pip install stegasoo")
# ── Verisoo sub-commands ────────────────────────────────────────────
@main.group()
def attest():
"""Provenance attestation (Verisoo)."""
pass
try:
from verisoo.cli import main as verisoo_cli
for name, cmd in verisoo_cli.commands.items():
attest.add_command(cmd, name)
except ImportError:
@attest.command()
def unavailable():
"""Verisoo is not installed."""
click.echo("Error: verisoo package not found. Install with: pip install verisoo")
# ── Fieldkit sub-commands ───────────────────────────────────────────
@main.group()
def fieldkit():
"""Field security features."""
pass
@fieldkit.command()
def status():
"""Show fieldkit status."""
from soosef.fieldkit.deadman import DeadmanSwitch
from soosef.keystore.manager import KeystoreManager
ks = KeystoreManager()
ks_status = ks.status()
click.echo("=== SooSeF Fieldkit Status ===")
click.echo(f"Identity: {'Active (' + ks_status.identity_fingerprint[:16] + '...)' if ks_status.has_identity else 'None'}")
click.echo(f"Channel Key: {'Active (' + ks_status.channel_fingerprint[:16] + '...)' if ks_status.has_channel_key else 'None'}")
dm = DeadmanSwitch()
dm_status = dm.status()
click.echo(f"Dead Man: {'Armed (overdue!)' if dm_status['overdue'] else 'Armed' if dm_status['armed'] else 'Disarmed'}")
@fieldkit.command()
@click.option("--confirm", required=True, help="Type CONFIRM-PURGE to confirm")
def purge(confirm):
"""Execute emergency purge — destroy all keys and data."""
if confirm != "CONFIRM-PURGE":
click.echo("Error: must pass --confirm CONFIRM-PURGE")
raise SystemExit(1)
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
click.echo("EXECUTING EMERGENCY PURGE...")
result = execute_purge(PurgeScope.ALL, reason="cli")
click.echo(f"Completed: {len(result.steps_completed)} steps")
if result.steps_failed:
click.echo(f"Failed: {len(result.steps_failed)} steps")
for name, err in result.steps_failed:
click.echo(f" - {name}: {err}")
@fieldkit.command()
def checkin():
"""Record a dead man's switch check-in."""
from soosef.fieldkit.deadman import DeadmanSwitch
dm = DeadmanSwitch()
dm.checkin()
click.echo("Check-in recorded.")
# ── Keys sub-commands ───────────────────────────────────────────────
@main.group()
def keys():
"""Key management."""
pass
@keys.command()
def show():
"""Show all key status."""
from soosef.keystore.manager import KeystoreManager
ks = KeystoreManager()
s = ks.status()
click.echo(f"Identity: {s.identity_fingerprint or 'Not configured'}")
click.echo(f"Channel Key: {s.channel_fingerprint or 'Not configured'}")
@keys.command("export")
@click.argument("output", type=click.Path(path_type=Path))
@click.option("--password", prompt=True, hide_input=True, confirmation_prompt=True)
def export_keys(output, password):
"""Export all keys to an encrypted bundle file."""
from soosef.keystore.export import export_bundle
from soosef.paths import IDENTITY_DIR, CHANNEL_KEY_FILE
export_bundle(IDENTITY_DIR, CHANNEL_KEY_FILE, output, password.encode())
click.echo(f"Key bundle exported to: {output}")
@keys.command("import")
@click.argument("bundle", type=click.Path(exists=True, path_type=Path))
@click.option("--password", prompt=True, hide_input=True)
def import_keys(bundle, password):
"""Import keys from an encrypted bundle file."""
from soosef.keystore.export import import_bundle
from soosef.paths import IDENTITY_DIR, CHANNEL_KEY_FILE
imported = import_bundle(bundle, IDENTITY_DIR, CHANNEL_KEY_FILE, password.encode())
click.echo(f"Imported: {', '.join(imported.keys())}")

58
src/soosef/config.py Normal file
View File

@@ -0,0 +1,58 @@
"""
Unified configuration for SooSeF.
Loads from ~/.soosef/config.json with environment variable overrides.
Config is intentionally simple — a flat JSON file with sensible defaults.
"""
import json
from dataclasses import dataclass, field
from pathlib import Path
from soosef.paths import CONFIG_FILE
@dataclass
class SoosefConfig:
"""Runtime configuration for SooSeF."""
# Web UI
host: str = "127.0.0.1"
port: int = 5000
https_enabled: bool = True
auth_enabled: bool = True
max_upload_mb: int = 50
# Stegasoo defaults
default_embed_mode: str = "auto"
# Fieldkit
killswitch_enabled: bool = False
deadman_enabled: bool = False
deadman_interval_hours: int = 24
deadman_grace_hours: int = 2
usb_monitoring_enabled: bool = False
tamper_monitoring_enabled: bool = False
# Hardware (RPi)
gpio_killswitch_pin: int = 17
gpio_killswitch_hold_seconds: float = 5.0
@classmethod
def load(cls, path: Path | None = None) -> "SoosefConfig":
"""Load config from JSON file, falling back to defaults."""
config_path = path or CONFIG_FILE
if config_path.exists():
with open(config_path) as f:
data = json.load(f)
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
return cls()
def save(self, path: Path | None = None) -> None:
"""Persist config to JSON file."""
config_path = path or CONFIG_FILE
config_path.parent.mkdir(parents=True, exist_ok=True)
from dataclasses import asdict
with open(config_path, "w") as f:
json.dump(asdict(self), f, indent=2)

30
src/soosef/exceptions.py Normal file
View File

@@ -0,0 +1,30 @@
"""
Exception hierarchy for SooSeF.
SooSeF-specific exceptions. Stegasoo and Verisoo exceptions pass through
from their respective packages — no wrapping needed.
"""
class SoosefError(Exception):
"""Base exception for all SooSeF errors."""
class ConfigError(SoosefError):
"""Configuration error."""
class KeystoreError(SoosefError):
"""Key management error."""
class FieldkitError(SoosefError):
"""Fieldkit operation error."""
class KillswitchError(FieldkitError):
"""Killswitch operation error."""
class InitError(SoosefError):
"""Initialization/setup error."""

View File

@@ -0,0 +1,11 @@
"""
SooSeF Fieldkit — field security features.
Killswitch, dead man's switch, tamper detection, USB monitoring.
All features are opt-in and disabled by default.
"""
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
from soosef.fieldkit.deadman import DeadmanSwitch
__all__ = ["PurgeScope", "execute_purge", "DeadmanSwitch"]

View File

@@ -0,0 +1,113 @@
"""
Dead man's switch.
Check in periodically. If you don't, the system executes the killswitch
after a grace period. State persisted to ~/.soosef/fieldkit/deadman.json.
"""
from __future__ import annotations
import json
import logging
from datetime import UTC, datetime, timedelta
from pathlib import Path
from soosef.paths import DEADMAN_STATE
logger = logging.getLogger(__name__)
class DeadmanSwitch:
"""Timer-based auto-purge if operator fails to check in."""
def __init__(self, state_file: Path | None = None):
self._state_file = state_file or DEADMAN_STATE
def _load_state(self) -> dict:
if self._state_file.exists():
with open(self._state_file) as f:
return json.load(f)
return {
"armed": False,
"last_checkin": None,
"interval_hours": 24,
"grace_hours": 2,
}
def _save_state(self, state: dict) -> None:
self._state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self._state_file, "w") as f:
json.dump(state, f, indent=2)
def arm(self, interval_hours: int = 24, grace_hours: int = 2) -> None:
"""Arm the dead man's switch and record initial check-in."""
state = self._load_state()
state["armed"] = True
state["interval_hours"] = interval_hours
state["grace_hours"] = grace_hours
state["last_checkin"] = datetime.now(UTC).isoformat()
self._save_state(state)
logger.info(
"Dead man's switch armed: %dh interval, %dh grace", interval_hours, grace_hours
)
def disarm(self) -> None:
"""Disarm the dead man's switch."""
state = self._load_state()
state["armed"] = False
self._save_state(state)
logger.info("Dead man's switch disarmed")
def checkin(self) -> None:
"""Record a check-in, resetting the timer."""
state = self._load_state()
state["last_checkin"] = datetime.now(UTC).isoformat()
self._save_state(state)
logger.info("Dead man's switch check-in recorded")
def is_armed(self) -> bool:
return self._load_state()["armed"]
def is_overdue(self) -> bool:
"""Check if the switch has expired (past interval, ignoring grace)."""
state = self._load_state()
if not state["armed"] or not state["last_checkin"]:
return False
last = datetime.fromisoformat(state["last_checkin"])
deadline = last + timedelta(hours=state["interval_hours"])
return datetime.now(UTC) > deadline
def should_fire(self) -> bool:
"""Check if the switch should fire (past interval + grace)."""
state = self._load_state()
if not state["armed"] or not state["last_checkin"]:
return False
last = datetime.fromisoformat(state["last_checkin"])
deadline = last + timedelta(
hours=state["interval_hours"] + state["grace_hours"]
)
return datetime.now(UTC) > deadline
def status(self) -> dict:
"""Get current status for display."""
state = self._load_state()
result = {
"armed": state["armed"],
"interval_hours": state["interval_hours"],
"grace_hours": state["grace_hours"],
"last_checkin": state["last_checkin"],
"overdue": self.is_overdue(),
}
if state["last_checkin"]:
last = datetime.fromisoformat(state["last_checkin"])
next_due = last + timedelta(hours=state["interval_hours"])
result["next_due"] = next_due.isoformat()
return result
def check(self) -> None:
"""Run the check loop (called by systemd timer or background thread)."""
if self.should_fire():
logger.warning("DEAD MAN'S SWITCH EXPIRED — firing killswitch")
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
execute_purge(PurgeScope.ALL, reason="deadman_expired")

View File

@@ -0,0 +1,41 @@
"""
GPS-based geographic boundary enforcement.
Define a geographic boundary. If the device leaves it, trigger an action.
Requires GPS hardware or location services.
"""
from __future__ import annotations
import logging
import math
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class GeoCircle:
"""A circular geofence defined by center + radius in meters."""
lat: float
lon: float
radius_m: float
name: str = "default"
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Distance in meters between two lat/lon points."""
R = 6371000 # Earth radius in meters
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def is_inside(fence: GeoCircle, lat: float, lon: float) -> bool:
"""Check if a point is inside the geofence."""
return haversine_distance(fence.lat, fence.lon, lat, lon) <= fence.radius_m

View File

@@ -0,0 +1,171 @@
"""
Emergency data destruction — the killswitch.
Ordered destruction to maximize what's gone before any interruption.
Priority: keys first (most sensitive), then data, then logs.
On SSDs/flash where shred is less effective, key destruction is the real win —
without keys, the encrypted data is unrecoverable.
"""
from __future__ import annotations
import enum
import logging
import platform
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from soosef.exceptions import KillswitchError
from soosef.paths import (
ATTESTATIONS_DIR,
AUTH_DB,
BASE_DIR,
CHANNEL_KEY_FILE,
CONFIG_FILE,
IDENTITY_DIR,
INSTANCE_DIR,
TEMP_DIR,
)
logger = logging.getLogger(__name__)
class PurgeScope(enum.Enum):
"""What to destroy."""
KEYS_ONLY = "keys_only" # Just key material
ALL = "all" # Everything
@dataclass
class PurgeResult:
"""Report of what was destroyed."""
steps_completed: list[str] = field(default_factory=list)
steps_failed: list[tuple[str, str]] = field(default_factory=list)
fully_purged: bool = False
def _secure_delete_file(path: Path) -> None:
"""Overwrite and delete a file. Best-effort on flash storage."""
if not path.exists():
return
if platform.system() == "Linux":
try:
subprocess.run(
["shred", "-u", "-z", "-n", "3", str(path)],
timeout=30,
capture_output=True,
)
return
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Fallback: overwrite with zeros then delete
size = path.stat().st_size
with open(path, "wb") as f:
f.write(b"\x00" * size)
f.flush()
path.unlink()
def _secure_delete_dir(path: Path) -> None:
"""Recursively secure-delete all files in a directory, then remove it."""
if not path.exists():
return
for child in path.rglob("*"):
if child.is_file():
_secure_delete_file(child)
shutil.rmtree(path, ignore_errors=True)
def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") -> PurgeResult:
"""
Execute emergency purge.
Destruction order is intentional — keys go first because they're
the smallest and most critical. Even if the purge is interrupted
after step 1, the remaining data is cryptographically useless.
"""
result = PurgeResult()
logger.warning("KILLSWITCH ACTIVATED — reason: %s, scope: %s", reason, scope.value)
steps: list[tuple[str, callable]] = [
("destroy_identity_keys", lambda: _secure_delete_dir(IDENTITY_DIR)),
("destroy_channel_key", lambda: _secure_delete_file(CHANNEL_KEY_FILE)),
("destroy_flask_secret", lambda: _secure_delete_file(INSTANCE_DIR / ".secret_key")),
]
if scope == PurgeScope.ALL:
steps.extend([
("destroy_auth_db", lambda: _secure_delete_file(AUTH_DB)),
("destroy_attestation_log", lambda: _secure_delete_dir(ATTESTATIONS_DIR)),
("destroy_temp_files", lambda: _secure_delete_dir(TEMP_DIR)),
("destroy_config", lambda: _secure_delete_file(CONFIG_FILE)),
("clear_journald", _clear_system_logs),
])
for name, action in steps:
try:
action()
result.steps_completed.append(name)
logger.info("Purge step completed: %s", name)
except Exception as e:
result.steps_failed.append((name, str(e)))
logger.error("Purge step failed: %s%s", name, e)
result.fully_purged = len(result.steps_failed) == 0
return result
def _clear_system_logs() -> None:
"""Best-effort clearing of system journal entries for soosef."""
if platform.system() != "Linux":
return
try:
subprocess.run(
["journalctl", "--vacuum-time=1s", "--unit=soosef*"],
timeout=10,
capture_output=True,
)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# ── Hardware GPIO killswitch ─────────────────────────────────────────
try:
from gpiozero import Button
HAS_GPIO = True
except ImportError:
HAS_GPIO = False
def watch_hardware_button(
pin: int = 17,
hold_seconds: float = 5.0,
callback: callable | None = None,
) -> None:
"""
Monitor GPIO pin for physical killswitch button.
Requires holding for hold_seconds to prevent accidental trigger.
"""
if not HAS_GPIO:
logger.warning("gpiozero not available — hardware killswitch disabled")
return
def _on_held():
logger.warning("Hardware killswitch button held for %ss — firing", hold_seconds)
if callback:
callback()
else:
execute_purge(PurgeScope.ALL, reason="hardware_button")
btn = Button(pin, hold_time=hold_seconds)
btn.when_held = _on_held
logger.info("Hardware killswitch armed on GPIO pin %d (hold %ss to trigger)", pin, hold_seconds)

View File

@@ -0,0 +1,69 @@
"""
File integrity monitoring.
Computes SHA-256 baseline of critical files and watches for unexpected changes.
Uses watchdog for real-time filesystem event monitoring when available.
"""
from __future__ import annotations
import hashlib
import json
import logging
from pathlib import Path
from soosef.paths import TAMPER_BASELINE
logger = logging.getLogger(__name__)
def _hash_file(path: Path) -> str:
"""SHA-256 hash of a file."""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def create_baseline(watch_paths: list[Path], baseline_file: Path | None = None) -> dict:
"""Create a SHA-256 baseline of watched files."""
baseline_file = baseline_file or TAMPER_BASELINE
baseline = {}
for path in watch_paths:
if path.is_file():
baseline[str(path)] = _hash_file(path)
elif path.is_dir():
for child in path.rglob("*"):
if child.is_file():
baseline[str(child)] = _hash_file(child)
baseline_file.parent.mkdir(parents=True, exist_ok=True)
with open(baseline_file, "w") as f:
json.dump(baseline, f, indent=2)
logger.info("Tamper baseline created: %d files", len(baseline))
return baseline
def check_baseline(baseline_file: Path | None = None) -> list[dict]:
"""Check current files against baseline. Returns list of violations."""
baseline_file = baseline_file or TAMPER_BASELINE
if not baseline_file.exists():
return [{"type": "error", "message": "No baseline exists"}]
with open(baseline_file) as f:
baseline = json.load(f)
violations = []
for path_str, expected_hash in baseline.items():
path = Path(path_str)
if not path.exists():
violations.append({"type": "deleted", "path": path_str})
else:
current = _hash_file(path)
if current != expected_hash:
violations.append({"type": "modified", "path": path_str})
return violations

View File

@@ -0,0 +1,89 @@
"""
USB device whitelist enforcement (Linux only).
Monitors USB events via pyudev. On unrecognized device connection,
triggers a configurable callback (default: log warning).
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from soosef.paths import USB_WHITELIST
logger = logging.getLogger(__name__)
try:
import pyudev
HAS_PYUDEV = True
except ImportError:
HAS_PYUDEV = False
def load_whitelist(path: Path | None = None) -> set[str]:
"""Load USB whitelist as set of 'vendor_id:product_id' strings."""
wl_path = path or USB_WHITELIST
if wl_path.exists():
with open(wl_path) as f:
data = json.load(f)
return set(data.get("allowed", []))
return set()
def save_whitelist(devices: set[str], path: Path | None = None) -> None:
"""Save USB whitelist."""
wl_path = path or USB_WHITELIST
wl_path.parent.mkdir(parents=True, exist_ok=True)
with open(wl_path, "w") as f:
json.dump({"allowed": sorted(devices)}, f, indent=2)
class USBMonitor:
"""Watch for USB device connections and check against whitelist."""
def __init__(self, on_violation: callable | None = None, whitelist_path: Path | None = None):
if not HAS_PYUDEV:
raise RuntimeError("pyudev not available — USB monitoring requires Linux + pyudev")
self.whitelist = load_whitelist(whitelist_path)
self.on_violation = on_violation or self._default_violation
self._observer = None
def start(self) -> None:
"""Start monitoring USB events in a background thread."""
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem="usb")
self._observer = pyudev.MonitorObserver(monitor, self._handle_event)
self._observer.start()
logger.info("USB monitoring started (whitelist: %d devices)", len(self.whitelist))
def stop(self) -> None:
"""Stop monitoring."""
if self._observer:
self._observer.stop()
self._observer = None
def _handle_event(self, action: str, device: pyudev.Device) -> None:
if action != "add":
return
vid = device.get("ID_VENDOR_ID", "")
pid = device.get("ID_MODEL_ID", "")
if not vid or not pid:
return
device_id = f"{vid}:{pid}"
if device_id not in self.whitelist:
vendor = device.get("ID_VENDOR", "unknown")
model = device.get("ID_MODEL", "unknown")
logger.warning("USB VIOLATION: %s (%s %s)", device_id, vendor, model)
self.on_violation(device_id, vendor, model)
@staticmethod
def _default_violation(device_id: str, vendor: str, model: str) -> None:
logger.warning("Unrecognized USB device: %s (%s %s)", device_id, vendor, model)

View File

@@ -0,0 +1,6 @@
"""Unified key management for SooSeF."""
from soosef.keystore.manager import KeystoreManager
from soosef.keystore.models import IdentityInfo, KeystoreStatus
__all__ = ["KeystoreManager", "IdentityInfo", "KeystoreStatus"]

View File

@@ -0,0 +1,140 @@
"""
Key bundle export/import for USB key transfer between devices.
Bundles contain all key material encrypted with a user-provided password.
Format: AES-256-GCM encrypted JSON, key derived via Argon2id.
"""
from __future__ import annotations
import json
import os
import struct
from pathlib import Path
from soosef.exceptions import KeystoreError
# Bundle file magic bytes
BUNDLE_MAGIC = b"SOOBNDL\x00"
BUNDLE_VERSION = 1
def export_bundle(
identity_dir: Path,
channel_key_file: Path,
output_path: Path,
password: bytes,
) -> None:
"""Export all key material to an encrypted bundle file."""
from argon2.low_level import Type, hash_secret_raw
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
payload = {}
# Collect identity keys
priv_path = identity_dir / "private.pem"
pub_path = identity_dir / "public.pem"
if priv_path.exists():
payload["identity_private_pem"] = priv_path.read_text()
if pub_path.exists():
payload["identity_public_pem"] = pub_path.read_text()
# Collect channel key
if channel_key_file.exists():
payload["channel_key"] = channel_key_file.read_text().strip()
if not payload:
raise KeystoreError("No key material to export.")
# Encrypt
salt = os.urandom(32)
key = hash_secret_raw(
secret=password,
salt=salt,
time_cost=4,
memory_cost=262144, # 256 MB
parallelism=4,
hash_len=32,
type=Type.ID,
)
nonce = os.urandom(12)
plaintext = json.dumps(payload).encode()
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
# Write bundle: magic + version + salt + nonce + ciphertext
with open(output_path, "wb") as f:
f.write(BUNDLE_MAGIC)
f.write(struct.pack("<B", BUNDLE_VERSION))
f.write(salt)
f.write(nonce)
f.write(ciphertext)
def import_bundle(
bundle_path: Path,
identity_dir: Path,
channel_key_file: Path,
password: bytes,
) -> dict[str, bool]:
"""Import key material from an encrypted bundle. Returns what was imported."""
from argon2.low_level import Type, hash_secret_raw
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
data = bundle_path.read_bytes()
if not data.startswith(BUNDLE_MAGIC):
raise KeystoreError("Not a valid SooSeF key bundle.")
offset = len(BUNDLE_MAGIC)
version = struct.unpack_from("<B", data, offset)[0]
offset += 1
if version != BUNDLE_VERSION:
raise KeystoreError(f"Unsupported bundle version: {version}")
salt = data[offset : offset + 32]
offset += 32
nonce = data[offset : offset + 12]
offset += 12
ciphertext = data[offset:]
key = hash_secret_raw(
secret=password,
salt=salt,
time_cost=4,
memory_cost=262144,
parallelism=4,
hash_len=32,
type=Type.ID,
)
aesgcm = AESGCM(key)
try:
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
except Exception:
raise KeystoreError("Decryption failed — wrong password or corrupted bundle.")
payload = json.loads(plaintext)
imported = {}
if "identity_private_pem" in payload:
identity_dir.mkdir(parents=True, exist_ok=True)
priv_path = identity_dir / "private.pem"
priv_path.write_text(payload["identity_private_pem"])
priv_path.chmod(0o600)
imported["identity_private"] = True
if "identity_public_pem" in payload:
identity_dir.mkdir(parents=True, exist_ok=True)
(identity_dir / "public.pem").write_text(payload["identity_public_pem"])
imported["identity_public"] = True
if "channel_key" in payload:
channel_key_file.parent.mkdir(parents=True, exist_ok=True)
channel_key_file.write_text(payload["channel_key"])
channel_key_file.chmod(0o600)
imported["channel_key"] = True
return imported

View File

@@ -0,0 +1,139 @@
"""
Unified key management for SooSeF.
Owns all key material across both Stegasoo (channel keys) and Verisoo (Ed25519 identity).
Single entry point for key generation, retrieval, rotation, and export.
"""
from __future__ import annotations
import os
from pathlib import Path
from soosef.exceptions import KeystoreError
from soosef.keystore.models import IdentityInfo, KeystoreStatus
from soosef.paths import CHANNEL_KEY_FILE, IDENTITY_DIR, IDENTITY_PRIVATE_KEY, IDENTITY_PUBLIC_KEY
class KeystoreManager:
"""Manages all key material for a SooSeF instance."""
def __init__(self, identity_dir: Path | None = None, channel_key_file: Path | None = None):
self._identity_dir = identity_dir or IDENTITY_DIR
self._channel_key_file = channel_key_file or CHANNEL_KEY_FILE
# ── Verisoo Identity (Ed25519) ──────────────────────────────────
def has_identity(self) -> bool:
"""Check if an Ed25519 identity exists."""
return (self._identity_dir / "private.pem").exists()
def get_identity(self) -> IdentityInfo:
"""Get identity info. Raises KeystoreError if no identity exists."""
pub_path = self._identity_dir / "public.pem"
priv_path = self._identity_dir / "private.pem"
if not pub_path.exists():
raise KeystoreError("No identity found. Run 'soosef init' to generate one.")
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.hazmat.primitives.serialization import load_pem_public_key
pub_pem = pub_path.read_bytes()
public_key = load_pem_public_key(pub_pem)
pub_raw = public_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
import hashlib
fingerprint = hashlib.sha256(pub_raw).hexdigest()[:32]
return IdentityInfo(
fingerprint=fingerprint,
public_key_pem=pub_pem.decode(),
has_private_key=priv_path.exists(),
)
def generate_identity(self, password: bytes | None = None) -> IdentityInfo:
"""Generate a new Ed25519 keypair."""
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
BestAvailableEncryption,
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
self._identity_dir.mkdir(parents=True, exist_ok=True)
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# Save private key
encryption = BestAvailableEncryption(password) if password else NoEncryption()
priv_pem = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, encryption)
priv_path = self._identity_dir / "private.pem"
priv_path.write_bytes(priv_pem)
priv_path.chmod(0o600)
# Save public key
pub_pem = public_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
pub_path = self._identity_dir / "public.pem"
pub_path.write_bytes(pub_pem)
return self.get_identity()
# ── Stegasoo Channel Key ────────────────────────────────────────
def has_channel_key(self) -> bool:
"""Check if a channel key is configured."""
return bool(os.environ.get("STEGASOO_CHANNEL_KEY")) or self._channel_key_file.exists()
def get_channel_key(self) -> str | None:
"""Get the channel key, or None if not configured."""
env_key = os.environ.get("STEGASOO_CHANNEL_KEY")
if env_key:
return env_key
if self._channel_key_file.exists():
return self._channel_key_file.read_text().strip()
return None
def set_channel_key(self, key: str) -> None:
"""Store a channel key."""
from stegasoo import validate_channel_key
validate_channel_key(key)
self._channel_key_file.parent.mkdir(parents=True, exist_ok=True)
self._channel_key_file.write_text(key)
self._channel_key_file.chmod(0o600)
def generate_channel_key(self) -> str:
"""Generate and store a new channel key."""
from stegasoo import generate_channel_key
key = generate_channel_key()
self.set_channel_key(key)
return key
# ── Unified Status ──────────────────────────────────────────────
def status(self) -> KeystoreStatus:
"""Get an overview of all managed key material."""
channel_fp = None
if self.has_channel_key():
key = self.get_channel_key()
if key:
from stegasoo.crypto import get_channel_fingerprint
channel_fp = get_channel_fingerprint(key)
identity_fp = None
if self.has_identity():
identity_fp = self.get_identity().fingerprint
return KeystoreStatus(
has_identity=self.has_identity(),
identity_fingerprint=identity_fp,
has_channel_key=self.has_channel_key(),
channel_fingerprint=channel_fp,
)

View File

@@ -0,0 +1,24 @@
"""Data models for keystore."""
from dataclasses import dataclass
from datetime import datetime
@dataclass
class IdentityInfo:
"""Verisoo Ed25519 identity summary."""
fingerprint: str
public_key_pem: str
created_at: datetime | None = None
has_private_key: bool = False
@dataclass
class KeystoreStatus:
"""Overview of all managed key material."""
has_identity: bool
identity_fingerprint: str | None
has_channel_key: bool
channel_fingerprint: str | None

81
src/soosef/paths.py Normal file
View File

@@ -0,0 +1,81 @@
"""
Centralized path constants for SooSeF.
All ~/.soosef/* paths are defined here. Every module that needs a path
imports from this module — no hardcoded paths anywhere else.
The base directory can be overridden via SOOSEF_DATA_DIR environment variable
for multi-instance deployments or testing.
"""
import os
from pathlib import Path
# Allow override for testing or multi-instance deployments
BASE_DIR = Path(os.environ.get("SOOSEF_DATA_DIR", Path.home() / ".soosef"))
# Ed25519 identity keypair (verisoo signing)
IDENTITY_DIR = BASE_DIR / "identity"
IDENTITY_PRIVATE_KEY = IDENTITY_DIR / "private.pem"
IDENTITY_PUBLIC_KEY = IDENTITY_DIR / "public.pem"
# Stegasoo state
STEGASOO_DIR = BASE_DIR / "stegasoo"
CHANNEL_KEY_FILE = STEGASOO_DIR / "channel.key"
# Verisoo attestation storage
ATTESTATIONS_DIR = BASE_DIR / "attestations"
ATTESTATION_LOG = ATTESTATIONS_DIR / "log.bin"
ATTESTATION_INDEX = ATTESTATIONS_DIR / "index"
PEERS_FILE = ATTESTATIONS_DIR / "peers.json"
# Web UI auth database
AUTH_DIR = BASE_DIR / "auth"
AUTH_DB = AUTH_DIR / "soosef.db"
# SSL certificates
CERTS_DIR = BASE_DIR / "certs"
SSL_CERT = CERTS_DIR / "cert.pem"
SSL_KEY = CERTS_DIR / "key.pem"
# Fieldkit state
FIELDKIT_DIR = BASE_DIR / "fieldkit"
FIELDKIT_CONFIG = FIELDKIT_DIR / "config.json"
DEADMAN_STATE = FIELDKIT_DIR / "deadman.json"
TAMPER_DIR = FIELDKIT_DIR / "tamper"
TAMPER_BASELINE = TAMPER_DIR / "baseline.json"
USB_DIR = FIELDKIT_DIR / "usb"
USB_WHITELIST = USB_DIR / "whitelist.json"
# Ephemeral
TEMP_DIR = BASE_DIR / "temp"
# Flask instance path (sessions, secret key)
INSTANCE_DIR = BASE_DIR / "instance"
SECRET_KEY_FILE = INSTANCE_DIR / ".secret_key"
# Unified config
CONFIG_FILE = BASE_DIR / "config.json"
def ensure_dirs() -> None:
"""Create all required directories with appropriate permissions."""
dirs = [
BASE_DIR,
IDENTITY_DIR,
STEGASOO_DIR,
ATTESTATIONS_DIR,
AUTH_DIR,
CERTS_DIR,
FIELDKIT_DIR,
TAMPER_DIR,
USB_DIR,
TEMP_DIR,
INSTANCE_DIR,
]
for d in dirs:
d.mkdir(parents=True, exist_ok=True)
# Restrict permissions on sensitive directories
for d in [BASE_DIR, IDENTITY_DIR, AUTH_DIR, CERTS_DIR]:
d.chmod(0o700)