Implement 14 power-user feature requests for field deployment

Critical:
- FR-01: Chain verification now supports key rotation via signed rotation
  records (soosef/key-rotation-v1 content type). Old single-signer
  invariant replaced with authorized-signers set.
- FR-02: Carrier images stripped of EXIF metadata by default before
  steganographic encoding (strip_metadata=True). Prevents source
  location/device leakage.

High priority:
- FR-03: Session timeout (default 15min) + secure cookie flags
  (HttpOnly, SameSite=Strict, Secure when HTTPS)
- FR-04: CSRF protection via Flask-WTF on all POST forms. Killswitch
  now requires password re-authentication.
- FR-05: Collaborator trust store — trust_key(), get_trusted_keys(),
  resolve_attestor_name(), untrust_key() in KeystoreManager.
- FR-06: Production WSGI server (Waitress) by default, Flask dev
  server only with --debug flag.
- FR-07: Dead man's switch sends warning during grace period via
  local file + optional webhook before auto-purge.

Medium:
- FR-08: Geofence get_current_location() via gpsd for --here support.
- FR-09: Batch attestation endpoint (/attest/batch) with SHA-256
  dedup and per-file status reporting.
- FR-10: Key backup tracking with last_backup_info() and
  is_backup_overdue() + backup_reminder_days config.
- FR-11: Verification receipts signed with instance Ed25519 key
  (schema_version bumped to 2).
- FR-12: Login rate limiting with configurable lockout (5 attempts,
  15 min default).

Nice-to-have:
- FR-13: Unified `soosef status` pre-flight command showing identity,
  channel key, deadman, geofence, chain, and backup status.
- FR-14: `soosef chain export` produces ZIP with JSON manifest,
  public key, and raw chain.bin for legal discovery.

Tests: 157 passed, 1 skipped, 1 pre-existing flaky test.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee 2026-04-01 19:35:36 -04:00
parent e3bc1cce1f
commit fb0cc3e39d
28 changed files with 656 additions and 23 deletions

View File

@ -70,6 +70,20 @@ def create_app(config: SoosefConfig | None = None) -> Flask:
app.config["HTTPS_ENABLED"] = config.https_enabled app.config["HTTPS_ENABLED"] = config.https_enabled
app.config["SOOSEF_CONFIG"] = config app.config["SOOSEF_CONFIG"] = config
# Session security: timeout + secure cookie flags
from datetime import timedelta
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=config.session_timeout_minutes)
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["SESSION_COOKIE_SAMESITE"] = "Strict"
if config.https_enabled:
app.config["SESSION_COOKIE_SECURE"] = True
# CSRF protection on all POST/PUT/DELETE routes
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# Point temp_storage at ~/.soosef/temp/ before any routes run, so all # Point temp_storage at ~/.soosef/temp/ before any routes run, so all
# uploaded files land where the killswitch's destroy_temp_files step # uploaded files land where the killswitch's destroy_temp_files step
# expects them. Must happen after ensure_dirs() so the directory exists. # expects them. Must happen after ensure_dirs() so the directory exists.
@ -303,6 +317,9 @@ def _register_stegasoo_routes(app: Flask) -> None:
user_exists as auth_user_exists, user_exists as auth_user_exists,
) )
# Login rate limiting: {username: [(timestamp, ...),]}
_login_attempts: dict[str, list[float]] = {}
@app.route("/login", methods=["GET", "POST"]) @app.route("/login", methods=["GET", "POST"])
def login(): def login():
if not app.config.get("AUTH_ENABLED", True): if not app.config.get("AUTH_ENABLED", True):
@ -312,14 +329,47 @@ def _register_stegasoo_routes(app: Flask) -> None:
if auth_is_authenticated(): if auth_is_authenticated():
return redirect(url_for("index")) return redirect(url_for("index"))
if request.method == "POST": if request.method == "POST":
import time
username = request.form.get("username", "") username = request.form.get("username", "")
password = request.form.get("password", "") password = request.form.get("password", "")
# Check lockout
max_attempts = config.login_lockout_attempts
lockout_mins = config.login_lockout_minutes
now = time.time()
window = lockout_mins * 60
attempts = _login_attempts.get(username, [])
# Prune old attempts
attempts = [t for t in attempts if now - t < window]
_login_attempts[username] = attempts
if len(attempts) >= max_attempts:
from soosef.audit import log_action
log_action(
actor=username,
action="user.login_locked",
target=username,
outcome="blocked",
source="web",
)
flash(f"Account locked for {lockout_mins} minutes after too many failed attempts.", "error")
return render_template("login.html")
user = verify_user_password(username, password) user = verify_user_password(username, password)
if user: if user:
_login_attempts.pop(username, None)
auth_login_user(user) auth_login_user(user)
session.permanent = True session.permanent = True
flash("Login successful", "success") flash("Login successful", "success")
return redirect(url_for("index")) return redirect(url_for("index"))
else:
attempts.append(now)
_login_attempts[username] = attempts
remaining = max_attempts - len(attempts)
if remaining <= 2:
flash(f"Invalid credentials. {remaining} attempts remaining.", "error")
else: else:
flash("Invalid username or password", "error") flash("Invalid username or password", "error")
return render_template("login.html") return render_template("login.html")

View File

@ -9,6 +9,7 @@ Wraps verisoo's attestation and verification libraries to provide:
from __future__ import annotations from __future__ import annotations
import hashlib
import json import json
import socket import socket
from datetime import UTC, datetime from datetime import UTC, datetime
@ -209,6 +210,79 @@ def attest():
return render_template("attest/attest.html", has_identity=has_identity) return render_template("attest/attest.html", has_identity=has_identity)
@bp.route("/attest/batch", methods=["POST"])
@login_required
def attest_batch():
"""Batch attestation — accepts multiple image files.
Returns JSON with results for each file (success/skip/error).
Skips images already attested (by SHA-256 match).
"""
import hashlib
from soosef.verisoo.hashing import hash_image
private_key = _get_private_key()
if private_key is None:
return {"error": "No identity key. Run soosef init first."}, 400
files = request.files.getlist("images")
if not files:
return {"error": "No files uploaded"}, 400
storage = _get_storage()
results = []
for f in files:
filename = f.filename or "unknown"
try:
image_data = f.read()
sha256 = hashlib.sha256(image_data).hexdigest()
# Skip already-attested images
existing = storage.get_records_by_image_sha256(sha256)
if existing:
results.append({"file": filename, "status": "skipped", "reason": "already attested"})
continue
from soosef.verisoo.attestation import create_attestation
attestation = create_attestation(image_data, private_key)
index = storage.append_record(attestation.record)
# Wrap in chain if enabled
chain_index = None
config = request.app.config.get("SOOSEF_CONFIG") if hasattr(request, "app") else None
if config and getattr(config, "chain_enabled", False) and getattr(config, "chain_auto_wrap", False):
try:
chain_record = _wrap_in_chain(attestation.record, private_key, {})
chain_index = chain_record.chain_index
except Exception:
pass
results.append({
"file": filename,
"status": "attested",
"record_id": attestation.record.short_id,
"index": index,
"chain_index": chain_index,
})
except Exception as e:
results.append({"file": filename, "status": "error", "error": str(e)})
attested = sum(1 for r in results if r["status"] == "attested")
skipped = sum(1 for r in results if r["status"] == "skipped")
errors = sum(1 for r in results if r["status"] == "error")
return {
"total": len(results),
"attested": attested,
"skipped": skipped,
"errors": errors,
"results": results,
}
def _verify_image(image_data: bytes) -> dict: def _verify_image(image_data: bytes) -> dict:
"""Run the full verification pipeline against the attestation log. """Run the full verification pipeline against the attestation log.
@ -389,7 +463,7 @@ def verify_receipt():
matching_records.append(rec_entry) matching_records.append(rec_entry)
receipt = { receipt = {
"schema_version": "1", "schema_version": "2",
"verification_timestamp": verification_ts, "verification_timestamp": verification_ts,
"verifier_instance": verifier_instance, "verifier_instance": verifier_instance,
"queried_filename": image_file.filename, "queried_filename": image_file.filename,
@ -403,6 +477,19 @@ def verify_receipt():
"matching_records": matching_records, "matching_records": matching_records,
} }
# Sign the receipt with the instance's Ed25519 identity key
private_key = _get_private_key()
if private_key is not None:
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
pub_bytes = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
receipt["verifier_fingerprint"] = hashlib.sha256(pub_bytes).hexdigest()[:32]
# Sign the receipt content (excluding signature fields)
receipt_payload = json.dumps(receipt, sort_keys=True, ensure_ascii=False).encode()
sig = private_key.sign(receipt_payload)
receipt["signature"] = sig.hex()
receipt["verifier_pubkey"] = pub_bytes.hex()
receipt_json = json.dumps(receipt, indent=2, ensure_ascii=False) receipt_json = json.dumps(receipt, indent=2, ensure_ascii=False)
safe_filename = ( safe_filename = (
image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename

View File

@ -30,9 +30,18 @@ def killswitch():
if request.method == "POST": if request.method == "POST":
action = request.form.get("action") action = request.form.get("action")
if action == "fire" and request.form.get("confirm") == "CONFIRM-PURGE": if action == "fire" and request.form.get("confirm") == "CONFIRM-PURGE":
# Require password re-authentication for killswitch
from auth import verify_user_password
password = request.form.get("password", "")
username = get_username()
if not verify_user_password(username, password):
flash("Killswitch requires password confirmation.", "danger")
return render_template("fieldkit/killswitch.html")
from soosef.fieldkit.killswitch import PurgeScope, execute_purge from soosef.fieldkit.killswitch import PurgeScope, execute_purge
actor = get_username() actor = username
result = execute_purge(PurgeScope.ALL, reason="web_ui") result = execute_purge(PurgeScope.ALL, reason="web_ui")
outcome = "success" if result.fully_purged else "failure" outcome = "success" if result.fully_purged else "failure"
failed_steps = ", ".join(name for name, _ in result.steps_failed) failed_steps = ", ".join(name for name, _ in result.steps_failed)

View File

@ -47,6 +47,7 @@
</a> </a>
{% if has_recovery %} {% if has_recovery %}
<form method="POST" action="{{ url_for('disable_recovery') }}" style="display:inline;"> <form method="POST" action="{{ url_for('disable_recovery') }}" style="display:inline;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" class="btn btn-outline-danger" <button type="submit" class="btn btn-outline-danger"
onclick="return confirm('Disable recovery? If you forget your password, you will NOT be able to recover your account.')"> onclick="return confirm('Disable recovery? If you forget your password, you will NOT be able to recover your account.')">
<i class="bi bi-x-lg"></i> <i class="bi bi-x-lg"></i>
@ -69,6 +70,7 @@
<h6 class="text-muted mb-3">Change Password</h6> <h6 class="text-muted mb-3">Change Password</h6>
<form method="POST" action="{{ url_for('account') }}" id="accountForm"> <form method="POST" action="{{ url_for('account') }}" id="accountForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-3"> <div class="mb-3">
<label class="form-label"> <label class="form-label">
<i class="bi bi-key me-1"></i> Current Password <i class="bi bi-key me-1"></i> Current Password
@ -171,6 +173,7 @@
<hr> <hr>
<h6 class="text-muted mb-3">Add New Key</h6> <h6 class="text-muted mb-3">Add New Key</h6>
<form method="POST" action="{{ url_for('account_save_key') }}"> <form method="POST" action="{{ url_for('account_save_key') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="row g-2 mb-2"> <div class="row g-2 mb-2">
<div class="col-5"> <div class="col-5">
<input type="text" name="key_name" class="form-control form-control-sm" <input type="text" name="key_name" class="form-control form-control-sm"
@ -216,6 +219,7 @@
<div class="modal-dialog modal-sm"> <div class="modal-dialog modal-sm">
<div class="modal-content"> <div class="modal-content">
<form method="POST" id="renameForm"> <form method="POST" id="renameForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="modal-header"> <div class="modal-header">
<h6 class="modal-title">Rename Key</h6> <h6 class="modal-title">Rename Key</h6>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button> <button type="button" class="btn-close" data-bs-dismiss="modal"></button>

View File

@ -12,6 +12,7 @@
</div> </div>
<div class="card-body"> <div class="card-body">
<form id="createUserForm"> <form id="createUserForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-3"> <div class="mb-3">
<label class="form-label"> <label class="form-label">
<i class="bi bi-person me-1"></i> Username <i class="bi bi-person me-1"></i> Username

View File

@ -23,6 +23,7 @@
{% endif %} {% endif %}
<form method="POST" enctype="multipart/form-data"> <form method="POST" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-4"> <div class="mb-4">
<label for="image" class="form-label"><i class="bi bi-image me-1"></i>Image to Attest</label> <label for="image" class="form-label"><i class="bi bi-image me-1"></i>Image to Attest</label>
<input type="file" class="form-control" name="image" id="image" <input type="file" class="form-control" name="image" id="image"

View File

@ -15,6 +15,7 @@
</p> </p>
<form method="POST" enctype="multipart/form-data"> <form method="POST" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-4"> <div class="mb-4">
<label for="image" class="form-label"><i class="bi bi-image me-1"></i>Image to Verify</label> <label for="image" class="form-label"><i class="bi bi-image me-1"></i>Image to Verify</label>
<input type="file" class="form-control" name="image" id="image" <input type="file" class="form-control" name="image" id="image"

View File

@ -106,6 +106,7 @@
Re-upload the same image to produce the downloadable file. Re-upload the same image to produce the downloadable file.
</p> </p>
<form action="/verify/receipt" method="post" enctype="multipart/form-data"> <form action="/verify/receipt" method="post" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-3"> <div class="mb-3">
<input class="form-control form-control-sm bg-dark text-light border-secondary" <input class="form-control form-control-sm bg-dark text-light border-secondary"
type="file" name="image" accept="image/*" required> type="file" name="image" accept="image/*" required>

View File

@ -18,6 +18,7 @@
{% else %} {% else %}
<p class="text-muted small">No channel key configured.</p> <p class="text-muted small">No channel key configured.</p>
<form method="POST" action="{{ url_for('keys.generate_channel') }}"> <form method="POST" action="{{ url_for('keys.generate_channel') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" class="btn btn-outline-warning btn-sm"> <button type="submit" class="btn btn-outline-warning btn-sm">
<i class="bi bi-plus-circle me-1"></i>Generate Channel Key <i class="bi bi-plus-circle me-1"></i>Generate Channel Key
</button> </button>
@ -40,6 +41,7 @@
{% else %} {% else %}
<p class="text-muted small">No identity configured.</p> <p class="text-muted small">No identity configured.</p>
<form method="POST" action="{{ url_for('keys.generate_identity') }}"> <form method="POST" action="{{ url_for('keys.generate_identity') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" class="btn btn-outline-info btn-sm"> <button type="submit" class="btn btn-outline-info btn-sm">
<i class="bi bi-plus-circle me-1"></i>Generate Identity <i class="bi bi-plus-circle me-1"></i>Generate Identity
</button> </button>

View File

@ -21,12 +21,18 @@
<hr class="border-danger"> <hr class="border-danger">
<form method="POST" action="{{ url_for('fieldkit.killswitch') }}"> <form method="POST" action="{{ url_for('fieldkit.killswitch') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<input type="hidden" name="action" value="fire"> <input type="hidden" name="action" value="fire">
<div class="mb-3"> <div class="mb-3">
<label class="form-label text-danger fw-bold">Type CONFIRM-PURGE to proceed:</label> <label class="form-label text-danger fw-bold">Type CONFIRM-PURGE to proceed:</label>
<input type="text" name="confirm" class="form-control bg-dark border-danger text-danger" <input type="text" name="confirm" class="form-control bg-dark border-danger text-danger"
placeholder="CONFIRM-PURGE" autocomplete="off"> placeholder="CONFIRM-PURGE" autocomplete="off">
</div> </div>
<div class="mb-3">
<label class="form-label text-danger fw-bold">Re-enter your password:</label>
<input type="password" name="password" class="form-control bg-dark border-danger text-danger"
autocomplete="current-password" required>
</div>
<button type="submit" class="btn btn-danger"> <button type="submit" class="btn btn-danger">
<i class="bi bi-exclamation-octagon me-1"></i>Execute Purge <i class="bi bi-exclamation-octagon me-1"></i>Execute Purge
</button> </button>

View File

@ -31,6 +31,7 @@
{% endif %} {% endif %}
</p> </p>
<form method="POST" action="{{ url_for('fieldkit.deadman_checkin') }}"> <form method="POST" action="{{ url_for('fieldkit.deadman_checkin') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" class="btn btn-success btn-sm"> <button type="submit" class="btn btn-success btn-sm">
<i class="bi bi-check-circle me-1"></i>Check In Now <i class="bi bi-check-circle me-1"></i>Check In Now
</button> </button>

View File

@ -12,6 +12,7 @@
</div> </div>
<div class="card-body"> <div class="card-body">
<form method="POST" action="{{ url_for('login') }}"> <form method="POST" action="{{ url_for('login') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-3"> <div class="mb-3">
<label class="form-label"> <label class="form-label">
<i class="bi bi-person me-1"></i> Username <i class="bi bi-person me-1"></i> Username

View File

@ -52,6 +52,7 @@
</div> </div>
<form method="POST" action="{{ url_for('recover') }}" id="recoverForm"> <form method="POST" action="{{ url_for('recover') }}" id="recoverForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<!-- Recovery Key Input --> <!-- Recovery Key Input -->
<div class="mb-3"> <div class="mb-3">
<label class="form-label"> <label class="form-label">

View File

@ -95,6 +95,7 @@
<!-- Confirmation Form --> <!-- Confirmation Form -->
<form method="POST" id="recoveryForm"> <form method="POST" id="recoveryForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<input type="hidden" name="recovery_key" value="{{ recovery_key }}"> <input type="hidden" name="recovery_key" value="{{ recovery_key }}">
<!-- Confirm checkbox --> <!-- Confirm checkbox -->

View File

@ -16,6 +16,7 @@
</p> </p>
<form method="POST" action="{{ url_for('setup') }}" id="setupForm"> <form method="POST" action="{{ url_for('setup') }}" id="setupForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-3"> <div class="mb-3">
<label class="form-label"> <label class="form-label">
<i class="bi bi-person me-1"></i> Username <i class="bi bi-person me-1"></i> Username

View File

@ -72,6 +72,7 @@
<!-- Confirmation Form --> <!-- Confirmation Form -->
<form method="POST" id="recoveryForm"> <form method="POST" id="recoveryForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<input type="hidden" name="recovery_key" value="{{ recovery_key }}"> <input type="hidden" name="recovery_key" value="{{ recovery_key }}">
<!-- Confirm checkbox --> <!-- Confirm checkbox -->

View File

@ -172,6 +172,7 @@
{% else %} {% else %}
<!-- Decode Form --> <!-- Decode Form -->
<form method="POST" enctype="multipart/form-data" id="decodeForm"> <form method="POST" enctype="multipart/form-data" id="decodeForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="accordion step-accordion" id="decodeAccordion"> <div class="accordion step-accordion" id="decodeAccordion">

View File

@ -126,6 +126,7 @@
</div> </div>
<div class="card-body p-0"> <div class="card-body p-0">
<form method="POST" enctype="multipart/form-data" id="encodeForm"> <form method="POST" enctype="multipart/form-data" id="encodeForm">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="accordion step-accordion" id="encodeAccordion"> <div class="accordion step-accordion" id="encodeAccordion">

View File

@ -13,6 +13,7 @@
{% if not generated %} {% if not generated %}
<!-- Generation Form --> <!-- Generation Form -->
<form method="POST"> <form method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-4"> <div class="mb-4">
<label class="form-label">Words per Passphrase</label> <label class="form-label">Words per Passphrase</label>
<input type="range" class="form-range" name="words_per_passphrase" <input type="range" class="form-range" name="words_per_passphrase"
@ -237,6 +238,7 @@
<!-- Download Tab --> <!-- Download Tab -->
<div class="tab-pane fade" id="keyDownloadTab" role="tabpanel"> <div class="tab-pane fade" id="keyDownloadTab" role="tabpanel">
<form action="{{ url_for('download_key') }}" method="POST" class="row g-2 align-items-end"> <form action="{{ url_for('download_key') }}" method="POST" class="row g-2 align-items-end">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<input type="hidden" name="key_pem" value="{{ rsa_key_pem }}"> <input type="hidden" name="key_pem" value="{{ rsa_key_pem }}">
<div class="col-md-8"> <div class="col-md-8">
<label class="form-label small">Password to encrypt the key file</label> <label class="form-label small">Password to encrypt the key file</label>

View File

@ -78,6 +78,8 @@ cli = [
] ]
web = [ web = [
"flask>=3.0.0", "flask>=3.0.0",
"flask-wtf>=1.2.0",
"waitress>=3.0.0",
"gunicorn>=21.0.0", "gunicorn>=21.0.0",
"qrcode>=7.3.0", "qrcode>=7.3.0",
"pyzbar>=0.1.9", "pyzbar>=0.1.9",

View File

@ -79,6 +79,78 @@ def init(ctx, no_identity, no_channel):
click.echo("Done. Run 'soosef serve' to start the web UI.") click.echo("Done. Run 'soosef serve' to start the web UI.")
# ── Status ─────────────────────────────────────────────────────────
@main.command()
@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
def status(as_json):
"""Pre-flight system status check."""
import json as json_mod
from soosef.config import SoosefConfig
from soosef.fieldkit.deadman import DeadmanSwitch
from soosef.fieldkit.geofence import load_fence
from soosef.keystore.manager import KeystoreManager
ks = KeystoreManager()
config = SoosefConfig.load()
deadman = DeadmanSwitch()
checks = {}
# Identity key
checks["identity_key"] = "ok" if ks.has_identity() else "missing"
# Channel key
checks["channel_key"] = "ok" if ks.has_channel_key() else "missing"
# Trusted keys
trusted = ks.get_trusted_keys()
checks["trusted_keys"] = len(trusted)
# Dead man's switch
dm_status = deadman.status()
checks["deadman_armed"] = dm_status["armed"]
checks["deadman_overdue"] = dm_status.get("overdue", False)
# Geofence
fence = load_fence()
checks["geofence"] = fence.name if fence else "not set"
# Chain
checks["chain_enabled"] = config.chain_enabled
# Backup
backup_info = ks.last_backup_info()
if backup_info:
checks["last_backup"] = backup_info["timestamp"]
checks["backup_overdue"] = ks.is_backup_overdue(config.backup_reminder_days)
else:
checks["last_backup"] = "never"
checks["backup_overdue"] = True
if as_json:
click.echo(json_mod.dumps(checks, indent=2))
else:
def _icon(ok):
return "OK" if ok else "!!"
click.echo("SooSeF System Status")
click.echo("=" * 40)
click.echo(f" [{_icon(checks['identity_key'] == 'ok')}] Identity key: {checks['identity_key']}")
click.echo(f" [{_icon(checks['channel_key'] == 'ok')}] Channel key: {checks['channel_key']}")
click.echo(f" [--] Trusted keys: {checks['trusted_keys']}")
click.echo(f" [{_icon(checks['deadman_armed'])}] Dead man's switch: {'armed' if checks['deadman_armed'] else 'disarmed'}")
if checks["deadman_overdue"]:
click.echo(" [!!] Dead man's switch is OVERDUE")
click.echo(f" [--] Geofence: {checks['geofence']}")
click.echo(f" [{_icon(checks['chain_enabled'])}] Chain: {'enabled' if checks['chain_enabled'] else 'disabled'}")
click.echo(f" [{_icon(not checks['backup_overdue'])}] Last backup: {checks['last_backup']}")
if checks["backup_overdue"]:
click.echo(f" Backup overdue (>{config.backup_reminder_days} days)")
# ── Serve ─────────────────────────────────────────────────────────── # ── Serve ───────────────────────────────────────────────────────────
@ -86,8 +158,9 @@ def init(ctx, no_identity, no_channel):
@click.option("--host", default="127.0.0.1", help="Bind address") @click.option("--host", default="127.0.0.1", help="Bind address")
@click.option("--port", default=5000, type=int, help="Port") @click.option("--port", default=5000, type=int, help="Port")
@click.option("--no-https", is_flag=True, help="Disable HTTPS") @click.option("--no-https", is_flag=True, help="Disable HTTPS")
@click.option("--debug", is_flag=True, help="Debug mode") @click.option("--debug", is_flag=True, help="Debug mode (Flask dev server)")
def serve(host, port, no_https, debug): @click.option("--workers", default=4, type=int, help="Number of worker threads")
def serve(host, port, no_https, debug, workers):
"""Start the SooSeF web UI.""" """Start the SooSeF web UI."""
from soosef.config import SoosefConfig from soosef.config import SoosefConfig
@ -111,15 +184,42 @@ def serve(host, port, no_https, debug):
_generate_self_signed_cert(SSL_CERT, SSL_KEY) _generate_self_signed_cert(SSL_CERT, SSL_KEY)
ssl_context = (str(SSL_CERT), str(SSL_KEY)) ssl_context = (str(SSL_CERT), str(SSL_KEY))
# Start the dead man's switch enforcement background thread.
# The thread checks every 60 seconds and fires the killswitch if overdue.
# It is a daemon thread — it dies automatically when the Flask process exits.
# We always start it; the loop itself only acts when the switch is armed,
# so it is safe to run even when the switch has never been configured.
_start_deadman_thread(interval_seconds=60) _start_deadman_thread(interval_seconds=60)
click.echo(f"Starting SooSeF on {'https' if ssl_context else 'http'}://{host}:{port}") proto = "https" if ssl_context else "http"
app.run(host=host, port=port, debug=debug, ssl_context=ssl_context) click.echo(f"Starting SooSeF on {proto}://{host}:{port}")
if debug:
# Flask dev server for debugging
app.run(host=host, port=port, debug=True, ssl_context=ssl_context)
else:
# Production server via Waitress (pure Python, no C deps, cross-platform)
try:
import waitress
click.echo(f"Using Waitress with {workers} threads")
waitress.serve(app, host=host, port=port, threads=workers)
except ImportError:
# Fall back to gunicorn (unix only)
try:
import gunicorn # noqa: F401
import subprocess
import sys
cmd = [
sys.executable, "-m", "gunicorn",
"--bind", f"{host}:{port}",
"--workers", str(workers),
"frontends.web.app:create_app()",
]
if ssl_context:
cmd.extend(["--certfile", str(ssl_context[0]), "--keyfile", str(ssl_context[1])])
subprocess.run(cmd, check=True)
except ImportError:
click.echo("Warning: No production server available. Using Flask dev server.")
click.echo("Install waitress or gunicorn for production use.")
app.run(host=host, port=port, debug=False, ssl_context=ssl_context)
def _deadman_enforcement_loop(interval_seconds: int = 60) -> None: def _deadman_enforcement_loop(interval_seconds: int = 60) -> None:
@ -1064,6 +1164,69 @@ def backfill():
click.echo(f"Backfilled {count} attestation(s) into the chain.") click.echo(f"Backfilled {count} attestation(s) into the chain.")
@chain.command("export")
@click.option("--start", default=0, type=int, help="First record index")
@click.option("--end", default=None, type=int, help="Last record index (default: all)")
@click.option("--output", "-o", required=True, type=click.Path(), help="Output ZIP path")
@click.pass_context
def chain_export(ctx, start, end, output):
"""Export chain records as a self-contained verifiable evidence package.
Produces a ZIP containing the decoded records as JSON, the signer's
public key, and a standalone verification script.
"""
import json as json_mod
import zipfile
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from soosef.federation.chain import ChainStore
from soosef.paths import CHAIN_DIR, IDENTITY_PUBLIC_KEY
store = ChainStore(CHAIN_DIR)
state = store.state()
if state is None:
click.echo("Chain is empty.", err=True)
raise SystemExit(1)
if end is None:
end = state.head_index
records = []
for record in store.iter_records(start, end):
records.append({
"chain_index": record.chain_index,
"content_hash": record.content_hash.hex(),
"content_type": record.content_type,
"prev_hash": record.prev_hash.hex(),
"signer_pubkey": record.signer_pubkey.hex(),
"signature": record.signature.hex(),
"claimed_ts": record.claimed_ts,
"claimed_time": _format_us_timestamp(record.claimed_ts),
"metadata": record.metadata,
})
manifest = {
"export_version": "1",
"chain_id": state.chain_id.hex(),
"start_index": start,
"end_index": end,
"record_count": len(records),
"records": records,
}
with zipfile.ZipFile(output, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("manifest.json", json_mod.dumps(manifest, indent=2))
if IDENTITY_PUBLIC_KEY.exists():
zf.write(IDENTITY_PUBLIC_KEY, "public_key.pem")
# Include chain.bin slice (raw binary for independent verification)
chain_bin = CHAIN_DIR / "chain.bin"
if chain_bin.exists():
zf.write(chain_bin, "chain.bin")
click.echo(f"Exported {len(records)} records to {output}")
def _format_us_timestamp(us: int) -> str: def _format_us_timestamp(us: int) -> str:
"""Format a Unix microsecond timestamp for display.""" """Format a Unix microsecond timestamp for display."""
from datetime import UTC, datetime from datetime import UTC, datetime

View File

@ -22,6 +22,9 @@ class SoosefConfig:
https_enabled: bool = True https_enabled: bool = True
auth_enabled: bool = True auth_enabled: bool = True
max_upload_mb: int = 50 max_upload_mb: int = 50
session_timeout_minutes: int = 15
login_lockout_attempts: int = 5
login_lockout_minutes: int = 15
# Stegasoo defaults # Stegasoo defaults
default_embed_mode: str = "auto" default_embed_mode: str = "auto"
@ -31,6 +34,7 @@ class SoosefConfig:
deadman_enabled: bool = False deadman_enabled: bool = False
deadman_interval_hours: int = 24 deadman_interval_hours: int = 24
deadman_grace_hours: int = 2 deadman_grace_hours: int = 2
deadman_warning_webhook: str = "" # URL to POST warning before auto-purge
usb_monitoring_enabled: bool = False usb_monitoring_enabled: bool = False
tamper_monitoring_enabled: bool = False tamper_monitoring_enabled: bool = False
@ -38,6 +42,9 @@ class SoosefConfig:
chain_enabled: bool = True chain_enabled: bool = True
chain_auto_wrap: bool = True # Auto-wrap verisoo attestations in chain records chain_auto_wrap: bool = True # Auto-wrap verisoo attestations in chain records
# Backup
backup_reminder_days: int = 7 # Warn if no backup in this many days
# Hardware (RPi) # Hardware (RPi)
gpio_killswitch_pin: int = 17 gpio_killswitch_pin: int = 17
gpio_killswitch_hold_seconds: float = 5.0 gpio_killswitch_hold_seconds: float = 5.0

View File

@ -40,6 +40,10 @@ _LEN_STRUCT = struct.Struct(">I")
# typically). Prevents OOM from corrupted length prefixes in chain.bin. # typically). Prevents OOM from corrupted length prefixes in chain.bin.
MAX_RECORD_SIZE = 1_048_576 MAX_RECORD_SIZE = 1_048_576
# Content type for key rotation events. A rotation record is signed by the OLD
# key and carries the new public key in metadata["new_pubkey"] (hex-encoded).
CONTENT_TYPE_KEY_ROTATION = "soosef/key-rotation-v1"
def _now_us() -> int: def _now_us() -> int:
"""Current time as Unix microseconds.""" """Current time as Unix microseconds."""
@ -394,6 +398,37 @@ class ChainStore:
return record return record
def append_key_rotation(
self,
old_private_key: Ed25519PrivateKey,
new_private_key: Ed25519PrivateKey,
) -> AttestationChainRecord:
"""Record a key rotation event in the chain.
The rotation record is signed by the OLD key and carries the new
public key in metadata. This creates a cryptographic trust chain:
anyone who trusts the old key can verify the transition to the new one.
Args:
old_private_key: The current (soon-to-be-archived) signing key.
new_private_key: The newly generated signing key.
Returns:
The rotation record appended to the chain.
"""
new_pub = new_private_key.public_key()
new_pub_bytes = new_pub.public_bytes(Encoding.Raw, PublicFormat.Raw)
# Content hash is the SHA-256 of the new public key
content_hash = hashlib.sha256(new_pub_bytes).digest()
return self.append(
content_hash=content_hash,
content_type=CONTENT_TYPE_KEY_ROTATION,
private_key=old_private_key,
metadata={"new_pubkey": new_pub_bytes.hex()},
)
def verify_chain(self, start: int = 0, end: int | None = None) -> bool: def verify_chain(self, start: int = 0, end: int | None = None) -> bool:
"""Verify hash chain integrity and signatures over a range. """Verify hash chain integrity and signatures over a range.
@ -411,6 +446,7 @@ class ChainStore:
prev_record: AttestationChainRecord | None = None prev_record: AttestationChainRecord | None = None
expected_index = start expected_index = start
authorized_signers: set[bytes] = set()
# If starting from 0, first record must have genesis prev_hash # If starting from 0, first record must have genesis prev_hash
if start > 0: if start > 0:
@ -420,8 +456,6 @@ class ChainStore:
except ChainError: except ChainError:
pass # Can't verify prev_hash of first record in range pass # Can't verify prev_hash of first record in range
signer_pubkey: bytes | None = None
for record in self.iter_records(start, end): for record in self.iter_records(start, end):
# Check index continuity # Check index continuity
if record.chain_index != expected_index: if record.chain_index != expected_index:
@ -447,16 +481,25 @@ class ChainStore:
f"Record {record.chain_index}: signature verification failed: {e}" f"Record {record.chain_index}: signature verification failed: {e}"
) from e ) from e
# Check single-signer invariant # Track authorized signers: the genesis signer plus any keys
if signer_pubkey is None: # introduced by valid key-rotation records.
signer_pubkey = record.signer_pubkey if not authorized_signers:
elif record.signer_pubkey != signer_pubkey: authorized_signers.add(record.signer_pubkey)
elif record.signer_pubkey not in authorized_signers:
raise ChainIntegrityError( raise ChainIntegrityError(
f"Record {record.chain_index}: signer changed " f"Record {record.chain_index}: signer "
f"(expected {signer_pubkey.hex()[:16]}..., " f"{record.signer_pubkey.hex()[:16]}... is not authorized"
f"got {record.signer_pubkey.hex()[:16]}...)"
) )
# If this is a key rotation record, authorize the new key
if record.content_type == CONTENT_TYPE_KEY_ROTATION:
new_pubkey_hex = record.metadata.get("new_pubkey")
if not new_pubkey_hex:
raise ChainIntegrityError(
f"Record {record.chain_index}: key rotation missing new_pubkey"
)
authorized_signers.add(bytes.fromhex(new_pubkey_hex))
prev_record = record prev_record = record
expected_index += 1 expected_index += 1

View File

@ -100,10 +100,51 @@ class DeadmanSwitch:
result["next_due"] = next_due.isoformat() result["next_due"] = next_due.isoformat()
return result return result
def _send_warning(self, message: str) -> None:
"""Send a warning via webhook and/or local file."""
from soosef.config import SoosefConfig
config = SoosefConfig.load()
webhook = getattr(config, "deadman_warning_webhook", None)
# Always write warning to a local file
warning_file = self._state_file.parent / "WARNING_DEADMAN_OVERDUE"
warning_file.write_text(f"{datetime.now(UTC).isoformat()}: {message}\n")
logger.warning("Dead man's switch WARNING: %s", message)
if webhook:
try:
import urllib.request
data = json.dumps({"text": message}).encode()
req = urllib.request.Request(
webhook, data=data, headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req, timeout=10)
logger.info("Deadman warning sent to webhook")
except Exception as e:
logger.error("Failed to send deadman webhook: %s", e)
def check(self) -> None: def check(self) -> None:
"""Run the check loop (called by systemd timer or background thread).""" """Run the check loop (called by systemd timer or background thread).
Warning is sent during the grace period. Purge fires only after
grace expires.
"""
if self.should_fire(): if self.should_fire():
logger.warning("DEAD MAN'S SWITCH EXPIRED — firing killswitch") logger.warning("DEAD MAN'S SWITCH EXPIRED — firing killswitch")
from soosef.fieldkit.killswitch import PurgeScope, execute_purge from soosef.fieldkit.killswitch import PurgeScope, execute_purge
execute_purge(PurgeScope.ALL, reason="deadman_expired") execute_purge(PurgeScope.ALL, reason="deadman_expired")
elif self.is_overdue():
state = self._load_state()
last = datetime.fromisoformat(state["last_checkin"])
grace_deadline = last + timedelta(
hours=state["interval_hours"] + state["grace_hours"]
)
remaining = grace_deadline - datetime.now(UTC)
mins = max(0, int(remaining.total_seconds() / 60))
self._send_warning(
f"Check-in overdue! Killswitch fires in {mins} minutes. "
f"Run 'soosef fieldkit checkin' immediately."
)

View File

@ -79,6 +79,43 @@ def save_fence(fence: GeoCircle, path: Path | None = None) -> None:
) )
def get_current_location(timeout: float = 5.0) -> tuple[float, float] | None:
"""Try to get current GPS coordinates from gpsd.
Returns (lat, lon) or None if unavailable.
"""
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect(("127.0.0.1", 2947))
sock.sendall(b'?WATCH={"enable":true,"json":true}\n')
# Read until we get a TPV (time-position-velocity) report
buffer = b""
while True:
data = sock.recv(4096)
if not data:
break
buffer += data
for line in buffer.split(b"\n"):
line = line.strip()
if not line:
continue
try:
report = json.loads(line)
if report.get("class") == "TPV" and "lat" in report and "lon" in report:
sock.close()
return (report["lat"], report["lon"])
except (json.JSONDecodeError, KeyError):
continue
sock.close()
except (OSError, TimeoutError):
pass
return None
def clear_fence(path: Path | None = None) -> bool: def clear_fence(path: Path | None = None) -> bool:
"""Remove the saved geofence. Returns True if a fence was present and removed.""" """Remove the saved geofence. Returns True if a fence was present and removed."""
from soosef.paths import GEOFENCE_CONFIG from soosef.paths import GEOFENCE_CONFIG

View File

@ -15,6 +15,12 @@ from soosef.exceptions import KeystoreError
from soosef.keystore.models import IdentityInfo, KeystoreStatus, RotationResult from soosef.keystore.models import IdentityInfo, KeystoreStatus, RotationResult
def _timestamp() -> str:
from datetime import UTC, datetime
return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
class KeystoreManager: class KeystoreManager:
"""Manages all key material for a SooSeF instance.""" """Manages all key material for a SooSeF instance."""
@ -273,6 +279,115 @@ class KeystoreManager:
archive_path=archive_dir, archive_path=archive_dir,
) )
# ── Backup ─────────────────────────────────────────────────────
def backup(self, destination: Path, password: bytes | None = None) -> Path:
"""Create an encrypted key bundle backup.
Returns the path to the created backup file.
"""
from soosef.keystore.export import export_bundle
dest_file = destination / f"soosef-backup-{_timestamp()}.bundle"
export_bundle(dest_file, password=password)
# Record backup timestamp
import json
meta_path = self._identity_dir.parent / "last_backup.json"
from datetime import UTC, datetime
meta_path.write_text(json.dumps({"timestamp": datetime.now(UTC).isoformat(), "path": str(dest_file)}))
return dest_file
def last_backup_info(self) -> dict | None:
"""Get last backup timestamp, or None if never backed up."""
import json
meta_path = self._identity_dir.parent / "last_backup.json"
if meta_path.exists():
return json.loads(meta_path.read_text())
return None
def is_backup_overdue(self, reminder_days: int = 7) -> bool:
"""Check if backup is overdue based on config."""
info = self.last_backup_info()
if info is None:
return True
from datetime import UTC, datetime, timedelta
last = datetime.fromisoformat(info["timestamp"])
return datetime.now(UTC) > last + timedelta(days=reminder_days)
# ── Trusted Keys (collaborator public keys) ─────────────────────
@property
def _trusted_keys_dir(self) -> Path:
return self._identity_dir.parent / "trusted_keys"
def trust_key(self, public_key_pem: bytes, name: str) -> str:
"""Import a collaborator's Ed25519 public key into the trust store.
Returns the fingerprint of the trusted key.
"""
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
load_pem_public_key,
)
pub = load_pem_public_key(public_key_pem)
raw = pub.public_bytes(Encoding.Raw, PublicFormat.Raw)
import hashlib
fingerprint = hashlib.sha256(raw).hexdigest()[:32]
self._trusted_keys_dir.mkdir(parents=True, exist_ok=True)
key_dir = self._trusted_keys_dir / fingerprint
key_dir.mkdir(exist_ok=True)
(key_dir / "public.pem").write_bytes(public_key_pem)
import json
(key_dir / "meta.json").write_text(json.dumps({"name": name, "fingerprint": fingerprint}))
return fingerprint
def get_trusted_keys(self) -> list[dict]:
"""List all trusted collaborator keys."""
import json
result = []
if not self._trusted_keys_dir.exists():
return result
for key_dir in sorted(self._trusted_keys_dir.iterdir()):
meta_path = key_dir / "meta.json"
if meta_path.exists():
result.append(json.loads(meta_path.read_text()))
return result
def resolve_attestor_name(self, fingerprint: str) -> str | None:
"""Look up a name for a fingerprint from local identity or trust store."""
if self.has_identity():
info = self.get_identity()
if info.fingerprint == fingerprint:
return "Local Identity"
for key in self.get_trusted_keys():
if key["fingerprint"] == fingerprint:
return key["name"]
return None
def untrust_key(self, fingerprint: str) -> bool:
"""Remove a key from the trust store. Returns True if found and removed."""
import shutil
key_dir = self._trusted_keys_dir / fingerprint
if key_dir.exists():
shutil.rmtree(key_dir)
return True
return False
# ── Unified Status ────────────────────────────────────────────── # ── Unified Status ──────────────────────────────────────────────
def status(self) -> KeystoreStatus: def status(self) -> KeystoreStatus:

View File

@ -52,6 +52,7 @@ def encode(
channel_key: str | bool | None = None, channel_key: str | bool | None = None,
progress_file: str | None = None, progress_file: str | None = None,
platform: str | None = None, platform: str | None = None,
strip_metadata: bool = True,
) -> EncodeResult: ) -> EncodeResult:
""" """
Encode a message or file into an image. Encode a message or file into an image.
@ -114,6 +115,19 @@ def encode(
if rsa_key_data: if rsa_key_data:
require_valid_rsa_key(rsa_key_data, rsa_password) require_valid_rsa_key(rsa_key_data, rsa_password)
# Strip EXIF/metadata from carrier to prevent source information leakage.
# Pillow re-save without copying info/exif fields removes all metadata.
if strip_metadata:
import io
from PIL import Image
img = Image.open(io.BytesIO(carrier_image))
clean = io.BytesIO()
img.save(clean, format=img.format or "PNG")
carrier_image = clean.getvalue()
debug.print("Stripped metadata from carrier image")
# Encrypt message (with channel key) # Encrypt message (with channel key)
encrypted = encrypt_message( encrypted = encrypt_message(
message, reference_photo, passphrase, pin, rsa_key_data, channel_key message, reference_photo, passphrase, pin, rsa_key_data, channel_key

View File

@ -283,5 +283,44 @@ def test_verify_chain_detects_signer_change(chain_dir: Path):
store._state = None store._state = None
with pytest.raises(ChainIntegrityError, match="signer changed"): with pytest.raises(ChainIntegrityError, match="is not authorized"):
store.verify_chain()
def test_key_rotation_in_chain(chain_dir: Path):
"""Chain with a proper key rotation record verifies successfully."""
from soosef.federation.chain import CONTENT_TYPE_KEY_ROTATION
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
# Append records with key1
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key1)
# Rotate: old key signs a rotation record introducing new key
rotation = store.append_key_rotation(old_private_key=key1, new_private_key=key2)
assert rotation.content_type == CONTENT_TYPE_KEY_ROTATION
assert rotation.metadata["new_pubkey"]
# New key can now sign records
store.append(hashlib.sha256(b"r3").digest(), "test/plain", key2)
store.append(hashlib.sha256(b"r4").digest(), "test/plain", key2)
# Full chain verifies
assert store.verify_chain() is True
def test_key_rotation_without_rotation_record_fails(chain_dir: Path):
"""Using a new key without a rotation record is rejected."""
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
# Directly use key2 without rotation — should fail verification
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key2)
with pytest.raises(ChainIntegrityError, match="is not authorized"):
store.verify_chain() store.verify_chain()