fieldwitness/frontends/web/app.py
Aaron D. Lee 9431033c72
Some checks failed
CI / lint (push) Failing after 52s
CI / typecheck (push) Failing after 30s
Implement 7 real-world scenario features (Round 4)
1. Source drop box: token-gated anonymous upload with auto-attestation,
   EXIF stripping, receipt codes, and self-destructing URLs. New
   /dropbox blueprint with admin panel for token management. CSRF
   exempted for source-facing upload routes.

2. Investigation namespaces: attestation records tagged with
   investigation label via metadata. Log view filters by investigation
   with dropdown. Supports long-running multi-story workflows.

3. Scale fixes: replaced O(n) full-scan perceptual hash search with
   LMDB find_similar_images() index lookup. Added incremental chain
   verification (verify_incremental) with last_verified_index
   checkpoint in ChainState.

4. Deep forensic purge: killswitch now scrubs __pycache__, pip
   dist-info, pip cache, and shell history entries containing 'soosef'.
   Runs before package uninstall for maximum trace removal.

5. Cross-org federation: new federation/exchange.py with
   export_attestation_bundle() and import_attestation_bundle().
   Bundles are self-authenticating JSON with investigation filter.
   Import validates against trust store fingerprints.

6. Wrong-key diagnostics: enhanced decrypt error messages include
   current channel key fingerprint hint. New carrier_tracker.py
   tracks carrier SHA-256 hashes and warns on reuse (statistical
   analysis risk).

7. Selective disclosure: ChainStore.selective_disclosure() produces
   proof bundles with full selected records + hash-only redacted
   records + complete hash chain for linkage verification. New
   `soosef chain disclose -i 0,5,10 -o proof.json` CLI command
   for court-ordered evidence production.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 20:41:41 -04:00

959 lines
35 KiB
Python

"""
SooSeF Web Frontend
Flask application factory that unifies Stegasoo (steganography) and Verisoo
(provenance attestation) into a single web UI with fieldkit security features.
ARCHITECTURE
============
The stegasoo web UI (3,600+ lines, 60 routes) is mounted wholesale via
_register_stegasoo_routes() rather than being rewritten into a blueprint.
This preserves the battle-tested subprocess isolation, async job management,
and all existing route logic without modification.
SooSeF-native features (attest, fieldkit, keys) are clean blueprints.
Stegasoo routes (mounted at root):
/encode, /decode, /generate, /tools, /api/*
SooSeF blueprints:
/attest, /verify → attest blueprint
/fieldkit/* → fieldkit blueprint
/keys/* → keys blueprint
/admin/* → admin blueprint (extends stegasoo's)
"""
import io
import os
import secrets
import sys
from pathlib import Path
from flask import (
Flask,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
session,
url_for,
)
import soosef
from soosef.config import SoosefConfig
from soosef.paths import INSTANCE_DIR, SECRET_KEY_FILE, TEMP_DIR, ensure_dirs
# Suppress numpy/scipy warnings in subprocesses
os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0"
os.environ["OMP_NUM_THREADS"] = "1"
def create_app(config: SoosefConfig | None = None) -> Flask:
"""Application factory."""
config = config or SoosefConfig.load()
ensure_dirs()
web_dir = Path(__file__).parent
app = Flask(
__name__,
instance_path=str(INSTANCE_DIR),
template_folder=str(web_dir / "templates"),
static_folder=str(web_dir / "static"),
)
app.config["MAX_CONTENT_LENGTH"] = config.max_upload_mb * 1024 * 1024
app.config["AUTH_ENABLED"] = config.auth_enabled
app.config["HTTPS_ENABLED"] = config.https_enabled
app.config["SOOSEF_CONFIG"] = config
# Session security: timeout + secure cookie flags
from datetime import timedelta
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=config.session_timeout_minutes)
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["SESSION_COOKIE_SAMESITE"] = "Strict"
if config.https_enabled:
app.config["SESSION_COOKIE_SECURE"] = True
# CSRF protection on all POST/PUT/DELETE routes
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# Point temp_storage at ~/.soosef/temp/ before any routes run, so all
# uploaded files land where the killswitch's destroy_temp_files step
# expects them. Must happen after ensure_dirs() so the directory exists.
import temp_storage as _ts
_ts.init(TEMP_DIR)
# Persist secret key so sessions survive restarts
_load_secret_key(app)
# ── Initialize auth ───────────────────────────────────────────
# Add web dir to path so auth.py and support modules are importable
sys.path.insert(0, str(web_dir))
from auth import get_username, is_admin, is_authenticated
from auth import init_app as init_auth
init_auth(app)
# ── Register stegasoo routes ──────────────────────────────────
_register_stegasoo_routes(app)
# ── Register SooSeF-native blueprints ─────────────────────────
from frontends.web.blueprints.attest import bp as attest_bp
from frontends.web.blueprints.fieldkit import bp as fieldkit_bp
from frontends.web.blueprints.keys import bp as keys_bp
app.register_blueprint(attest_bp)
app.register_blueprint(fieldkit_bp)
app.register_blueprint(keys_bp)
from frontends.web.blueprints.dropbox import bp as dropbox_bp
app.register_blueprint(dropbox_bp)
# Exempt drop box upload from CSRF (sources don't have sessions)
csrf.exempt(dropbox_bp)
# ── Context processor (injected into ALL templates) ───────────
@app.context_processor
def inject_globals():
from soosef.keystore import KeystoreManager
ks = KeystoreManager()
ks_status = ks.status()
# Fieldkit alert level
fieldkit_status = "ok"
if config.deadman_enabled:
from soosef.fieldkit.deadman import DeadmanSwitch
dm = DeadmanSwitch()
if dm.should_fire():
fieldkit_status = "alarm"
elif dm.is_overdue():
fieldkit_status = "warn"
# Stegasoo capabilities
try:
from soosef.stegasoo import HAS_AUDIO_SUPPORT, get_channel_status, has_dct_support
from soosef.stegasoo.constants import (
DEFAULT_PASSPHRASE_WORDS,
MAX_FILE_PAYLOAD_SIZE,
MAX_MESSAGE_CHARS,
MAX_PIN_LENGTH,
MAX_UPLOAD_SIZE,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
RECOMMENDED_PASSPHRASE_WORDS,
TEMP_FILE_EXPIRY_MINUTES,
)
has_dct = has_dct_support()
has_audio = HAS_AUDIO_SUPPORT
channel_status = get_channel_status()
# Stegasoo-specific template vars (needed by stego templates)
stego_vars = {
"has_dct": has_dct,
"has_audio": has_audio,
"max_message_chars": MAX_MESSAGE_CHARS,
"max_payload_kb": MAX_FILE_PAYLOAD_SIZE // 1024,
"max_upload_mb": MAX_UPLOAD_SIZE // (1024 * 1024),
"temp_file_expiry_minutes": TEMP_FILE_EXPIRY_MINUTES,
"min_pin_length": MIN_PIN_LENGTH,
"max_pin_length": MAX_PIN_LENGTH,
"min_passphrase_words": MIN_PASSPHRASE_WORDS,
"recommended_passphrase_words": RECOMMENDED_PASSPHRASE_WORDS,
"default_passphrase_words": DEFAULT_PASSPHRASE_WORDS,
"channel_mode": channel_status["mode"],
"channel_source": channel_status.get("source"),
"supported_audio_formats": "WAV, FLAC, MP3, OGG, AAC, M4A" if has_audio else "",
}
except ImportError:
has_dct = False
has_audio = False
stego_vars = {}
# Verisoo availability
try:
import soosef.verisoo # noqa: F401
has_verisoo = True
except ImportError:
has_verisoo = False
# Saved channel keys for authenticated users
saved_channel_keys = []
if is_authenticated():
try:
from auth import get_current_user, get_user_channel_keys
current_user = get_current_user()
if current_user:
saved_channel_keys = get_user_channel_keys(current_user.id)
except Exception:
pass
base_vars = {
"version": soosef.__version__,
"has_verisoo": has_verisoo,
"has_fieldkit": config.killswitch_enabled or config.deadman_enabled,
"fieldkit_status": fieldkit_status,
"channel_configured": ks_status.has_channel_key,
"channel_fingerprint": ks_status.channel_fingerprint or "",
"identity_configured": ks_status.has_identity,
"identity_fingerprint": ks_status.identity_fingerprint or "",
"auth_enabled": app.config["AUTH_ENABLED"],
"is_authenticated": is_authenticated(),
"is_admin": is_admin(),
"username": get_username() if is_authenticated() else None,
"saved_channel_keys": saved_channel_keys,
# QR support flags
"has_qrcode": _HAS_QRCODE,
"has_qrcode_read": _HAS_QRCODE_READ,
}
return {**base_vars, **stego_vars}
# ── Root routes ───────────────────────────────────────────────
@app.route("/")
def index():
return render_template("index.html")
# ── Health check ─────────────────────────────────────────────
@app.route("/health")
def health():
"""System health and capability report.
Unauthenticated — returns what's installed, what's missing,
and what's degraded. No secrets or key material exposed.
"""
import platform
import sys
from flask import jsonify
from soosef.keystore.manager import KeystoreManager
ks = KeystoreManager()
# Core modules
modules = {}
for name, import_path in [
("stegasoo", "soosef.stegasoo"),
("verisoo", "soosef.verisoo"),
]:
try:
mod = __import__(import_path, fromlist=["__version__"])
modules[name] = {"status": "ok", "version": getattr(mod, "__version__", "unknown")}
except ImportError as e:
modules[name] = {"status": "missing", "error": str(e)}
# Optional capabilities
capabilities = {}
# DCT steganography
try:
from soosef.stegasoo import has_dct_support
capabilities["stego_dct"] = {
"status": "ok" if has_dct_support() else "unavailable",
"hint": None if has_dct_support() else "Install soosef[stego-dct] (scipy, jpeglib, reedsolo)",
}
except ImportError:
capabilities["stego_dct"] = {"status": "missing", "hint": "Install soosef[stego-dct]"}
# Audio steganography
try:
from soosef.stegasoo import HAS_AUDIO_SUPPORT
capabilities["stego_audio"] = {
"status": "ok" if HAS_AUDIO_SUPPORT else "unavailable",
"hint": None if HAS_AUDIO_SUPPORT else "Install soosef[stego-audio] (soundfile, numpy)",
}
except ImportError:
capabilities["stego_audio"] = {"status": "missing", "hint": "Install soosef[stego-audio]"}
# Video steganography
try:
from soosef.stegasoo.constants import VIDEO_ENABLED
capabilities["stego_video"] = {
"status": "ok" if VIDEO_ENABLED else "unavailable",
"hint": None if VIDEO_ENABLED else "Requires ffmpeg in PATH",
}
except (ImportError, AttributeError):
capabilities["stego_video"] = {"status": "missing", "hint": "Requires ffmpeg"}
# LMDB (verisoo storage)
try:
import lmdb # noqa: F401
capabilities["lmdb"] = {"status": "ok"}
except ImportError:
capabilities["lmdb"] = {"status": "missing", "hint": "Install soosef[attest]"}
# Perceptual hashing
try:
import imagehash # noqa: F401
capabilities["imagehash"] = {"status": "ok"}
except ImportError:
capabilities["imagehash"] = {"status": "missing", "hint": "Install soosef[attest]"}
# USB monitoring
try:
import pyudev # noqa: F401
capabilities["usb_monitor"] = {"status": "ok"}
except ImportError:
capabilities["usb_monitor"] = {"status": "unavailable", "hint": "Install soosef[fieldkit] (Linux only)"}
# GPIO (RPi killswitch)
try:
import gpiozero # noqa: F401
capabilities["gpio"] = {"status": "ok"}
except ImportError:
capabilities["gpio"] = {"status": "unavailable", "hint": "Install soosef[rpi] (Raspberry Pi only)"}
# Key status (existence only, no material)
keys = {
"identity": "ok" if ks.has_identity() else "missing",
"channel_key": "ok" if ks.has_channel_key() else "missing",
"trusted_keys": len(ks.get_trusted_keys()),
}
# Backup status
backup_info = ks.last_backup_info()
keys["last_backup"] = backup_info["timestamp"] if backup_info else "never"
keys["backup_overdue"] = ks.is_backup_overdue(config.backup_reminder_days)
# Fieldkit status
fieldkit = {
"killswitch_enabled": config.killswitch_enabled,
"deadman_enabled": config.deadman_enabled,
"chain_enabled": config.chain_enabled,
}
if config.deadman_enabled:
from soosef.fieldkit.deadman import DeadmanSwitch
dm = DeadmanSwitch()
dm_status = dm.status()
fieldkit["deadman_armed"] = dm_status["armed"]
fieldkit["deadman_overdue"] = dm_status.get("overdue", False)
# System info (no secrets)
system = {
"python": sys.version.split()[0],
"platform": platform.machine(),
"os": platform.system(),
}
# Memory (for Argon2 sizing awareness)
try:
import os as _os
mem_bytes = _os.sysconf("SC_PAGE_SIZE") * _os.sysconf("SC_PHYS_PAGES")
system["memory_mb"] = mem_bytes // (1024 * 1024)
except (ValueError, OSError):
pass
# Overall status
all_ok = (
all(m["status"] == "ok" for m in modules.values())
and keys["identity"] == "ok"
and keys["channel_key"] == "ok"
)
return jsonify({
"status": "ok" if all_ok else "degraded",
"version": __import__("soosef").__version__,
"modules": modules,
"capabilities": capabilities,
"keys": keys,
"fieldkit": fieldkit,
"system": system,
})
return app
# ── QR support detection ─────────────────────────────────────────────
try:
import qrcode # noqa: F401
_HAS_QRCODE = True
except ImportError:
_HAS_QRCODE = False
try:
from pyzbar.pyzbar import decode as pyzbar_decode # noqa: F401
_HAS_QRCODE_READ = True
except ImportError:
_HAS_QRCODE_READ = False
# ── Stegasoo route mounting ──────────────────────────────────────────
def _register_stegasoo_routes(app: Flask) -> None:
"""
Mount all stegasoo web routes into the Flask app.
Rather than rewriting 3,600 lines of battle-tested route logic,
we import stegasoo's app.py and re-register its routes.
The stegasoo templates are in templates/stego/ and extend our base.html.
"""
import temp_storage
from auth import admin_required, login_required
from soosef.stegasoo import (
export_rsa_key_pem,
generate_credentials,
get_channel_status,
load_rsa_key,
)
from soosef.stegasoo.constants import (
DEFAULT_PASSPHRASE_WORDS,
MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
TEMP_FILE_EXPIRY,
VALID_RSA_SIZES,
)
from soosef.stegasoo.qr_utils import (
can_fit_in_qr,
generate_qr_code,
)
from subprocess_stego import (
SubprocessStego,
)
from soosef.audit import log_action
# Initialize subprocess wrapper
subprocess_stego = SubprocessStego(timeout=180)
# ── Auth routes (setup, login, logout, account) ────────────────
from auth import (
MAX_CHANNEL_KEYS,
MAX_USERS,
can_create_user,
can_save_channel_key,
change_password,
create_admin_user,
create_user,
delete_user,
generate_temp_password,
get_all_users,
get_current_user,
get_recovery_key_hash,
get_user_by_id,
get_user_channel_keys,
has_recovery_key,
reset_user_password,
verify_and_reset_admin_password,
verify_user_password,
)
from auth import (
is_authenticated as auth_is_authenticated,
)
from auth import (
login_user as auth_login_user,
)
from auth import (
logout_user as auth_logout_user,
)
from auth import (
user_exists as auth_user_exists,
)
# Login rate limiting: {username: [(timestamp, ...),]}
_login_attempts: dict[str, list[float]] = {}
@app.route("/login", methods=["GET", "POST"])
def login():
if not app.config.get("AUTH_ENABLED", True):
return redirect(url_for("index"))
if not auth_user_exists():
return redirect(url_for("setup"))
if auth_is_authenticated():
return redirect(url_for("index"))
if request.method == "POST":
import time
username = request.form.get("username", "")
password = request.form.get("password", "")
# Check lockout
max_attempts = config.login_lockout_attempts
lockout_mins = config.login_lockout_minutes
now = time.time()
window = lockout_mins * 60
attempts = _login_attempts.get(username, [])
# Prune old attempts
attempts = [t for t in attempts if now - t < window]
_login_attempts[username] = attempts
if len(attempts) >= max_attempts:
from soosef.audit import log_action
log_action(
actor=username,
action="user.login_locked",
target=username,
outcome="blocked",
source="web",
)
flash(f"Account locked for {lockout_mins} minutes after too many failed attempts.", "error")
return render_template("login.html")
user = verify_user_password(username, password)
if user:
_login_attempts.pop(username, None)
auth_login_user(user)
session.permanent = True
flash("Login successful", "success")
return redirect(url_for("index"))
else:
attempts.append(now)
_login_attempts[username] = attempts
remaining = max_attempts - len(attempts)
if remaining <= 2:
flash(f"Invalid credentials. {remaining} attempts remaining.", "error")
else:
flash("Invalid username or password", "error")
return render_template("login.html")
@app.route("/logout", methods=["POST"])
def logout():
auth_logout_user()
flash("Logged out successfully", "success")
return redirect(url_for("index"))
@app.route("/setup", methods=["GET", "POST"])
def setup():
if not app.config.get("AUTH_ENABLED", True):
return redirect(url_for("index"))
if auth_user_exists():
return redirect(url_for("login"))
if request.method == "POST":
username = request.form.get("username", "admin")
password = request.form.get("password", "")
password_confirm = request.form.get("password_confirm", "")
if password != password_confirm:
flash("Passwords do not match", "error")
else:
success, message = create_admin_user(username, password)
if success:
user = verify_user_password(username, password)
if user:
auth_login_user(user)
session.permanent = True
flash("Setup complete!", "success")
return redirect(url_for("index"))
else:
flash(message, "error")
return render_template("setup.html")
@app.route("/recover", methods=["GET", "POST"])
def recover():
if not get_recovery_key_hash():
flash("No recovery key configured", "error")
return redirect(url_for("login"))
if request.method == "POST":
recovery_key = request.form.get("recovery_key", "").strip()
new_password = request.form.get("new_password", "")
new_password_confirm = request.form.get("new_password_confirm", "")
if not recovery_key:
flash("Please enter your recovery key", "error")
elif new_password != new_password_confirm:
flash("Passwords do not match", "error")
elif len(new_password) < 8:
flash("Password must be at least 8 characters", "error")
else:
success, message = verify_and_reset_admin_password(recovery_key, new_password)
if success:
flash("Password reset. Please login.", "success")
return redirect(url_for("login"))
else:
flash(message, "error")
return render_template("recover.html")
@app.route("/account", methods=["GET", "POST"])
@login_required
def account():
current_user = get_current_user()
if request.method == "POST":
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm = request.form.get("confirm_password", "")
if new_password != confirm:
flash("Passwords do not match", "error")
else:
success, message = change_password(current_user.id, current_password, new_password)
flash(message, "success" if success else "error")
saved_keys = get_user_channel_keys(current_user.id) if current_user else []
return render_template(
"account.html",
current_user=current_user,
saved_channel_keys=saved_keys,
max_channel_keys=MAX_CHANNEL_KEYS,
can_save_keys=can_save_channel_key(current_user.id) if current_user else False,
has_recovery=has_recovery_key(),
)
# ── Admin routes ─────────────────────────────────────────────
@app.route("/admin/users")
@admin_required
def admin_users():
users = get_all_users()
return render_template(
"admin/users.html",
users=users,
user_count=len(users),
max_users=MAX_USERS,
can_create=can_create_user(),
current_user=get_current_user(),
)
@app.route("/admin/users/new", methods=["GET", "POST"])
@admin_required
def admin_new_user():
if request.method == "POST":
username = request.form.get("username", "")
temp_password = generate_temp_password()
success, message = create_user(username, temp_password)
log_action(
actor=get_username(), # noqa: F821
action="user.create",
target=f"user:{username}",
outcome="success" if success else "failure",
source="web",
detail=None if success else message,
)
if success:
flash(
f"User '{username}' created with temporary password: {temp_password}", "success"
)
else:
flash(message, "error")
return redirect(url_for("admin_users"))
return render_template("admin/user_new.html")
@app.route("/admin/users/<int:user_id>/delete", methods=["POST"])
@admin_required
def admin_delete_user(user_id):
target_user = get_user_by_id(user_id)
target_name = target_user.username if target_user else str(user_id)
success, message = delete_user(user_id, get_current_user().id)
log_action(
actor=get_username(), # noqa: F821
action="user.delete",
target=f"user:{target_name}",
outcome="success" if success else "failure",
source="web",
detail=None if success else message,
)
flash(message, "success" if success else "error")
return redirect(url_for("admin_users"))
@app.route("/admin/users/<int:user_id>/reset", methods=["POST"])
@admin_required
def admin_reset_password(user_id):
temp_password = generate_temp_password()
success, message = reset_user_password(user_id, temp_password)
target_user = get_user_by_id(user_id)
target_name = target_user.username if target_user else str(user_id)
log_action(
actor=get_username(), # noqa: F821
action="user.password_reset",
target=f"user:{target_name}",
outcome="success" if success else "failure",
source="web",
detail=None if success else message,
)
if success:
flash(f"Password for '{target_name}' reset to: {temp_password}", "success")
else:
flash(message, "error")
return redirect(url_for("admin_users"))
# ── Generate routes ───────────────────────────────────────────
@app.route("/generate", methods=["GET", "POST"])
@login_required
def generate():
if request.method == "POST":
words_per_passphrase = int(
request.form.get("words_per_passphrase", DEFAULT_PASSPHRASE_WORDS)
)
use_pin = request.form.get("use_pin") == "on"
use_rsa = request.form.get("use_rsa") == "on"
if not use_pin and not use_rsa:
flash("You must select at least one security factor (PIN or RSA Key)", "error")
return render_template(
"stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE
)
pin_length = int(request.form.get("pin_length", 6))
rsa_bits = int(request.form.get("rsa_bits", 2048))
words_per_passphrase = max(MIN_PASSPHRASE_WORDS, min(12, words_per_passphrase))
pin_length = max(MIN_PIN_LENGTH, min(MAX_PIN_LENGTH, pin_length))
if rsa_bits not in VALID_RSA_SIZES:
rsa_bits = 2048
try:
creds = generate_credentials(
use_pin=use_pin,
use_rsa=use_rsa,
pin_length=pin_length,
rsa_bits=rsa_bits,
passphrase_words=words_per_passphrase,
)
qr_token = None
qr_needs_compression = False
qr_too_large = False
if creds.rsa_key_pem and _HAS_QRCODE:
if can_fit_in_qr(creds.rsa_key_pem, compress=True):
qr_needs_compression = True
else:
qr_too_large = True
if not qr_too_large:
qr_token = secrets.token_urlsafe(16)
temp_storage.cleanup_expired(TEMP_FILE_EXPIRY)
temp_storage.save_temp_file(
qr_token,
creds.rsa_key_pem.encode(),
{
"filename": "rsa_key.pem",
"type": "rsa_key",
"compress": qr_needs_compression,
},
)
return render_template(
"stego/generate.html",
passphrase=creds.passphrase,
pin=creds.pin,
generated=True,
words_per_passphrase=words_per_passphrase,
pin_length=pin_length if use_pin else None,
use_pin=use_pin,
use_rsa=use_rsa,
rsa_bits=rsa_bits,
rsa_key_pem=creds.rsa_key_pem,
passphrase_entropy=creds.passphrase_entropy,
pin_entropy=creds.pin_entropy,
rsa_entropy=creds.rsa_entropy,
total_entropy=creds.total_entropy,
has_qrcode=_HAS_QRCODE,
qr_token=qr_token,
qr_needs_compression=qr_needs_compression,
qr_too_large=qr_too_large,
)
except Exception as e:
flash(f"Error generating credentials: {e}", "error")
return render_template(
"stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE
)
return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE)
# ── Generate QR + Key download routes ───────────────────────
@app.route("/generate/qr/<token>")
@login_required
def generate_qr(token):
if not _HAS_QRCODE:
return "QR code support not available", 501
file_info = temp_storage.get_temp_file(token)
if not file_info:
return "Token expired or invalid", 404
if file_info.get("type") != "rsa_key":
return "Invalid token type", 400
try:
key_pem = file_info["data"].decode("utf-8")
compress = file_info.get("compress", False)
qr_png = generate_qr_code(key_pem, compress=compress)
return send_file(io.BytesIO(qr_png), mimetype="image/png", as_attachment=False)
except Exception as e:
return f"Error generating QR code: {e}", 500
@app.route("/generate/qr-download/<token>")
@login_required
def generate_qr_download(token):
if not _HAS_QRCODE:
return "QR code support not available", 501
file_info = temp_storage.get_temp_file(token)
if not file_info:
return "Token expired or invalid", 404
if file_info.get("type") != "rsa_key":
return "Invalid token type", 400
try:
key_pem = file_info["data"].decode("utf-8")
compress = file_info.get("compress", False)
qr_png = generate_qr_code(key_pem, compress=compress)
return send_file(
io.BytesIO(qr_png),
mimetype="image/png",
as_attachment=True,
download_name="soosef_rsa_key_qr.png",
)
except Exception as e:
return f"Error generating QR code: {e}", 500
@app.route("/generate/download-key", methods=["POST"])
@login_required
def download_key():
key_pem = request.form.get("key_pem", "")
password = request.form.get("key_password", "")
if not key_pem:
flash("No key to download", "error")
return redirect(url_for("generate"))
if not password or len(password) < 8:
flash("Password must be at least 8 characters", "error")
return redirect(url_for("generate"))
try:
private_key = load_rsa_key(key_pem.encode("utf-8"))
encrypted_pem = export_rsa_key_pem(private_key, password=password)
key_id = secrets.token_hex(4)
filename = f"soosef_key_{private_key.key_size}_{key_id}.pem"
return send_file(
io.BytesIO(encrypted_pem),
mimetype="application/x-pem-file",
as_attachment=True,
download_name=filename,
)
except Exception as e:
flash(f"Error creating key file: {e}", "error")
return redirect(url_for("generate"))
# ── Encode/Decode/Tools routes (from stego_routes.py) ────────
from stego_routes import register_stego_routes
register_stego_routes(
app,
**{
"login_required": login_required,
"subprocess_stego": subprocess_stego,
"temp_storage": temp_storage,
"has_qrcode_read": _HAS_QRCODE_READ,
},
)
# /about route is in stego_routes.py
# ── API routes (capacity, channel, download) ──────────────────
@app.route("/api/channel/status")
@login_required
def api_channel_status():
result = subprocess_stego.get_channel_status(reveal=False)
if result.success:
return jsonify(
{
"success": True,
"mode": result.mode,
"configured": result.configured,
"fingerprint": result.fingerprint,
"source": result.source,
}
)
else:
status = get_channel_status()
return jsonify(
{
"success": True,
"mode": status["mode"],
"configured": status["configured"],
"fingerprint": status.get("fingerprint"),
"source": status.get("source"),
}
)
@app.route("/api/compare-capacity", methods=["POST"])
@login_required
def api_compare_capacity():
carrier = request.files.get("carrier")
if not carrier:
return jsonify({"error": "No carrier image provided"}), 400
try:
carrier_data = carrier.read()
result = subprocess_stego.compare_modes(carrier_data)
if not result.success:
return jsonify({"error": result.error or "Comparison failed"}), 500
return jsonify(
{
"success": True,
"width": result.width,
"height": result.height,
"lsb": {
"capacity_bytes": result.lsb["capacity_bytes"],
"capacity_kb": round(result.lsb["capacity_kb"], 1),
"output": result.lsb.get("output", "PNG"),
},
"dct": {
"capacity_bytes": result.dct["capacity_bytes"],
"capacity_kb": round(result.dct["capacity_kb"], 1),
"output": result.dct.get("output", "JPEG"),
"available": result.dct.get("available", True),
"ratio": round(result.dct.get("ratio_vs_lsb", 0), 1),
},
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/download/<file_id>")
@login_required
def api_download(file_id):
file_info = temp_storage.get_temp_file(file_id)
if not file_info:
return jsonify({"error": "File not found or expired"}), 404
filename = file_info.get("filename", "download")
mime_type = file_info.get("mime_type", "application/octet-stream")
return send_file(
io.BytesIO(file_info["data"]),
mimetype=mime_type,
as_attachment=True,
download_name=filename,
)
@app.route("/api/generate/credentials", methods=["POST"])
@login_required
def api_generate_credentials():
try:
creds = generate_credentials(use_pin=True, use_rsa=False)
return jsonify(
{
"success": True,
"passphrase": creds.passphrase,
"pin": creds.pin,
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
def _load_secret_key(app: Flask) -> None:
"""Load or generate persistent secret key for Flask sessions."""
SECRET_KEY_FILE.parent.mkdir(parents=True, exist_ok=True)
if SECRET_KEY_FILE.exists():
app.secret_key = SECRET_KEY_FILE.read_bytes()
else:
key = secrets.token_bytes(32)
SECRET_KEY_FILE.write_bytes(key)
SECRET_KEY_FILE.chmod(0o600)
app.secret_key = key