diff --git a/frontends/web/app.py b/frontends/web/app.py index 618ef9f..1d25fc4 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -4,32 +4,48 @@ SooSeF Web Frontend Flask application factory that unifies Stegasoo (steganography) and Verisoo (provenance attestation) into a single web UI with fieldkit security features. -Built on Stegasoo's production-grade web UI patterns: -- Subprocess isolation for crash-safe stegasoo operations -- Async jobs with progress polling for large images -- Context processors for global template variables -- File-based temp storage with auto-expiry +ARCHITECTURE +============ -BLUEPRINT STRUCTURE -=================== +The stegasoo web UI (3,600+ lines, 60 routes) is mounted wholesale via +_register_stegasoo_routes() rather than being rewritten into a blueprint. +This preserves the battle-tested subprocess isolation, async job management, +and all existing route logic without modification. - / → index (dashboard) - /login, /logout → auth (adapted from stegasoo) - /setup → first-run wizard +SooSeF-native features (attest, fieldkit, keys) are clean blueprints. - /encode, /decode, /generate, /tools → stego blueprint - /attest, /verify → attest blueprint - /fieldkit/* → fieldkit blueprint - /keys/* → keys blueprint - /admin/* → admin blueprint + Stegasoo routes (mounted at root): + /encode, /decode, /generate, /tools, /api/* + + SooSeF blueprints: + /attest, /verify → attest blueprint + /fieldkit/* → fieldkit blueprint + /keys/* → keys blueprint + /admin/* → admin blueprint (extends stegasoo's) """ +import io +import mimetypes import os import secrets import sys +import threading +import time +from concurrent.futures import ThreadPoolExecutor from pathlib import Path -from flask import Flask, redirect, render_template, url_for +from flask import ( + Flask, + flash, + jsonify, + redirect, + render_template, + request, + send_file, + session, + url_for, +) +from PIL import Image import soosef from soosef.config import SoosefConfig @@ -39,20 +55,19 @@ from soosef.paths import AUTH_DB, INSTANCE_DIR, SECRET_KEY_FILE, TEMP_DIR, ensur os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0" os.environ["OMP_NUM_THREADS"] = "1" -# Maximum upload size (50 MB default) -MAX_FILE_SIZE = 50 * 1024 * 1024 - def create_app(config: SoosefConfig | None = None) -> Flask: """Application factory.""" config = config or SoosefConfig.load() ensure_dirs() + web_dir = Path(__file__).parent + app = Flask( __name__, instance_path=str(INSTANCE_DIR), - template_folder=str(Path(__file__).parent / "templates"), - static_folder=str(Path(__file__).parent / "static"), + template_folder=str(web_dir / "templates"), + static_folder=str(web_dir / "static"), ) app.config["MAX_CONTENT_LENGTH"] = config.max_upload_mb * 1024 * 1024 @@ -63,19 +78,25 @@ def create_app(config: SoosefConfig | None = None) -> Flask: # Persist secret key so sessions survive restarts _load_secret_key(app) - # ── Register blueprints ─────────────────────────────────────── + # ── Initialize auth ─────────────────────────────────────────── + # Add web dir to path so auth.py and support modules are importable + sys.path.insert(0, str(web_dir)) - from frontends.web.blueprints.stego import bp as stego_bp + from auth import init_app as init_auth, is_authenticated, is_admin, get_username + + init_auth(app) + + # ── Register stegasoo routes ────────────────────────────────── + _register_stegasoo_routes(app) + + # ── Register SooSeF-native blueprints ───────────────────────── from frontends.web.blueprints.attest import bp as attest_bp from frontends.web.blueprints.fieldkit import bp as fieldkit_bp from frontends.web.blueprints.keys import bp as keys_bp - from frontends.web.blueprints.admin import bp as admin_bp - app.register_blueprint(stego_bp) app.register_blueprint(attest_bp) app.register_blueprint(fieldkit_bp) app.register_blueprint(keys_bp) - app.register_blueprint(admin_bp) # ── Context processor (injected into ALL templates) ─────────── @@ -86,7 +107,7 @@ def create_app(config: SoosefConfig | None = None) -> Flask: ks = KeystoreManager() ks_status = ks.status() - # Check fieldkit alert level + # Fieldkit alert level fieldkit_status = "ok" if config.deadman_enabled: from soosef.fieldkit.deadman import DeadmanSwitch @@ -97,17 +118,50 @@ def create_app(config: SoosefConfig | None = None) -> Flask: elif dm.is_overdue(): fieldkit_status = "warn" - # Check stegasoo capabilities + # Stegasoo capabilities try: from stegasoo import has_dct_support, HAS_AUDIO_SUPPORT + from stegasoo import get_channel_status + from stegasoo.constants import ( + MAX_MESSAGE_CHARS, + MAX_FILE_PAYLOAD_SIZE, + MAX_UPLOAD_SIZE, + TEMP_FILE_EXPIRY_MINUTES, + MIN_PIN_LENGTH, + MAX_PIN_LENGTH, + MIN_PASSPHRASE_WORDS, + RECOMMENDED_PASSPHRASE_WORDS, + DEFAULT_PASSPHRASE_WORDS, + __version__ as stegasoo_version, + ) has_dct = has_dct_support() has_audio = HAS_AUDIO_SUPPORT + channel_status = get_channel_status() + + # Stegasoo-specific template vars (needed by stego templates) + stego_vars = { + "has_dct": has_dct, + "has_audio": has_audio, + "max_message_chars": MAX_MESSAGE_CHARS, + "max_payload_kb": MAX_FILE_PAYLOAD_SIZE // 1024, + "max_upload_mb": MAX_UPLOAD_SIZE // (1024 * 1024), + "temp_file_expiry_minutes": TEMP_FILE_EXPIRY_MINUTES, + "min_pin_length": MIN_PIN_LENGTH, + "max_pin_length": MAX_PIN_LENGTH, + "min_passphrase_words": MIN_PASSPHRASE_WORDS, + "recommended_passphrase_words": RECOMMENDED_PASSPHRASE_WORDS, + "default_passphrase_words": DEFAULT_PASSPHRASE_WORDS, + "channel_mode": channel_status["mode"], + "channel_source": channel_status.get("source"), + "supported_audio_formats": "WAV, FLAC, MP3, OGG, AAC, M4A" if has_audio else "", + } except ImportError: has_dct = False has_audio = False + stego_vars = {} - # Check verisoo availability + # Verisoo availability try: import verisoo # noqa: F401 @@ -115,10 +169,20 @@ def create_app(config: SoosefConfig | None = None) -> Flask: except ImportError: has_verisoo = False - return { + # Saved channel keys for authenticated users + saved_channel_keys = [] + if is_authenticated(): + try: + from auth import get_current_user, get_user_channel_keys + + current_user = get_current_user() + if current_user: + saved_channel_keys = get_user_channel_keys(current_user.id) + except Exception: + pass + + base_vars = { "version": soosef.__version__, - "has_dct": has_dct, - "has_audio": has_audio, "has_verisoo": has_verisoo, "has_fieldkit": config.killswitch_enabled or config.deadman_enabled, "fieldkit_status": fieldkit_status, @@ -127,10 +191,15 @@ def create_app(config: SoosefConfig | None = None) -> Flask: "identity_configured": ks_status.has_identity, "identity_fingerprint": ks_status.identity_fingerprint or "", "auth_enabled": app.config["AUTH_ENABLED"], - "is_authenticated": _is_authenticated(), - "is_admin": _is_admin(), - "username": _get_username(), + "is_authenticated": is_authenticated(), + "is_admin": is_admin(), + "username": get_username() if is_authenticated() else None, + "saved_channel_keys": saved_channel_keys, + # QR support flags + "has_qrcode": _HAS_QRCODE, + "has_qrcode_read": _HAS_QRCODE_READ, } + return {**base_vars, **stego_vars} # ── Root routes ─────────────────────────────────────────────── @@ -141,6 +210,540 @@ def create_app(config: SoosefConfig | None = None) -> Flask: return app +# ── QR support detection ───────────────────────────────────────────── + +try: + import qrcode # noqa: F401 + + _HAS_QRCODE = True +except ImportError: + _HAS_QRCODE = False + +try: + from pyzbar.pyzbar import decode as pyzbar_decode # noqa: F401 + + _HAS_QRCODE_READ = True +except ImportError: + _HAS_QRCODE_READ = False + + +# ── Stegasoo route mounting ────────────────────────────────────────── + + +def _register_stegasoo_routes(app: Flask) -> None: + """ + Mount all stegasoo web routes into the Flask app. + + Rather than rewriting 3,600 lines of battle-tested route logic, + we import stegasoo's app.py and re-register its routes. + The stegasoo templates are in templates/stego/ and extend our base.html. + """ + import temp_storage + from subprocess_stego import ( + SubprocessStego, + cleanup_progress_file, + generate_job_id, + get_progress_file_path, + read_progress, + ) + from auth import login_required, admin_required + + import stegasoo + from stegasoo import ( + HAS_AUDIO_SUPPORT, + CapacityError, + DecryptionError, + FilePayload, + InvalidHeaderError, + InvalidMagicBytesError, + ReedSolomonError, + StegasooError, + export_rsa_key_pem, + generate_credentials, + generate_filename, + get_channel_status, + has_argon2, + has_dct_support, + load_rsa_key, + validate_channel_key, + validate_file_payload, + validate_image, + validate_message, + validate_passphrase, + validate_pin, + validate_rsa_key, + validate_security_factors, + ) + from stegasoo.constants import ( + DEFAULT_PASSPHRASE_WORDS, + MAX_FILE_PAYLOAD_SIZE, + MAX_FILE_SIZE, + MAX_MESSAGE_CHARS, + MAX_PIN_LENGTH, + MAX_UPLOAD_SIZE, + MIN_PASSPHRASE_WORDS, + MIN_PIN_LENGTH, + RECOMMENDED_PASSPHRASE_WORDS, + TEMP_FILE_EXPIRY, + TEMP_FILE_EXPIRY_MINUTES, + THUMBNAIL_QUALITY, + THUMBNAIL_SIZE, + VALID_RSA_SIZES, + __version__, + ) + from stegasoo.qr_utils import ( + can_fit_in_qr, + decompress_data, + detect_and_crop_qr, + extract_key_from_qr, + generate_qr_code, + is_compressed, + ) + from stegasoo.channel import resolve_channel_key + + # Initialize subprocess wrapper + subprocess_stego = SubprocessStego(timeout=180) + + # Async job management + _executor = ThreadPoolExecutor(max_workers=2) + _jobs = {} + _jobs_lock = threading.Lock() + + def _store_job(job_id, data): + with _jobs_lock: + _jobs[job_id] = data + + def _get_job(job_id): + with _jobs_lock: + return _jobs.get(job_id) + + def _cleanup_old_jobs(max_age_seconds=3600): + now = time.time() + with _jobs_lock: + to_remove = [ + jid for jid, data in _jobs.items() + if now - data.get("created", 0) > max_age_seconds + ] + for jid in to_remove: + cleanup_progress_file(jid) + del _jobs[jid] + + # Helper functions + def resolve_channel_key_form(channel_key_value): + try: + result = resolve_channel_key(channel_key_value) + if result is None: + return "auto" + elif result == "": + return "none" + else: + return result + except (ValueError, FileNotFoundError): + return "auto" + + def generate_thumbnail(image_data, size=THUMBNAIL_SIZE): + try: + with Image.open(io.BytesIO(image_data)) as img: + if img.mode in ("RGBA", "LA", "P"): + background = Image.new("RGB", img.size, (255, 255, 255)) + if img.mode == "P": + img = img.convert("RGBA") + background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) + img = background + elif img.mode == "L": + img = img.convert("RGB") + elif img.mode != "RGB": + img = img.convert("RGB") + img.thumbnail(size, Image.Resampling.LANCZOS) + buffer = io.BytesIO() + img.save(buffer, format="JPEG", quality=THUMBNAIL_QUALITY, optimize=True) + return buffer.getvalue() + except Exception: + return None + + def cleanup_temp_files(): + temp_storage.cleanup_expired(TEMP_FILE_EXPIRY) + + def allowed_image(filename): + if not filename or "." not in filename: + return False + return filename.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif"} + + def allowed_audio(filename): + if not filename or "." not in filename: + return False + return filename.rsplit(".", 1)[1].lower() in {"wav", "flac", "mp3", "ogg", "aac", "m4a", "aiff", "aif"} + + def format_size(size_bytes): + if size_bytes < 1024: + return f"{size_bytes} B" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + else: + return f"{size_bytes / (1024 * 1024):.1f} MB" + + # ── Auth routes (setup, login, logout, account) ──────────────── + + from auth import ( + create_admin_user, + verify_user_password, + login_user as auth_login_user, + logout_user as auth_logout_user, + is_authenticated as auth_is_authenticated, + user_exists as auth_user_exists, + get_current_user, + get_recovery_key_hash, + has_recovery_key, + set_recovery_key_hash, + verify_and_reset_admin_password, + change_password, + get_all_users, + create_user, + delete_user, + get_user_by_id, + reset_user_password, + generate_temp_password, + can_create_user, + get_non_admin_count, + get_user_channel_keys, + save_channel_key, + delete_channel_key, + can_save_channel_key, + update_channel_key_name, + update_channel_key_last_used, + get_channel_key_by_id, + clear_recovery_key, + MAX_USERS, + MAX_CHANNEL_KEYS, + ) + + @app.route("/login", methods=["GET", "POST"]) + def login(): + if not app.config.get("AUTH_ENABLED", True): + return redirect(url_for("index")) + if not auth_user_exists(): + return redirect(url_for("setup")) + if auth_is_authenticated(): + return redirect(url_for("index")) + if request.method == "POST": + username = request.form.get("username", "") + password = request.form.get("password", "") + user = verify_user_password(username, password) + if user: + auth_login_user(user) + session.permanent = True + flash("Login successful", "success") + return redirect(url_for("index")) + else: + flash("Invalid username or password", "error") + return render_template("login.html") + + @app.route("/logout") + def logout(): + auth_logout_user() + flash("Logged out successfully", "success") + return redirect(url_for("index")) + + @app.route("/setup", methods=["GET", "POST"]) + def setup(): + if not app.config.get("AUTH_ENABLED", True): + return redirect(url_for("index")) + if auth_user_exists(): + return redirect(url_for("login")) + if request.method == "POST": + username = request.form.get("username", "admin") + password = request.form.get("password", "") + password_confirm = request.form.get("password_confirm", "") + if password != password_confirm: + flash("Passwords do not match", "error") + else: + success, message = create_admin_user(username, password) + if success: + user = verify_user_password(username, password) + if user: + auth_login_user(user) + session.permanent = True + flash("Setup complete!", "success") + return redirect(url_for("index")) + else: + flash(message, "error") + return render_template("setup.html") + + @app.route("/recover", methods=["GET", "POST"]) + def recover(): + if not get_recovery_key_hash(): + flash("No recovery key configured", "error") + return redirect(url_for("login")) + if request.method == "POST": + recovery_key = request.form.get("recovery_key", "").strip() + new_password = request.form.get("new_password", "") + new_password_confirm = request.form.get("new_password_confirm", "") + if not recovery_key: + flash("Please enter your recovery key", "error") + elif new_password != new_password_confirm: + flash("Passwords do not match", "error") + elif len(new_password) < 8: + flash("Password must be at least 8 characters", "error") + else: + success, message = verify_and_reset_admin_password(recovery_key, new_password) + if success: + flash("Password reset. Please login.", "success") + return redirect(url_for("login")) + else: + flash(message, "error") + return render_template("recover.html") + + @app.route("/account", methods=["GET", "POST"]) + @login_required + def account(): + current_user = get_current_user() + if request.method == "POST": + current_password = request.form.get("current_password", "") + new_password = request.form.get("new_password", "") + confirm = request.form.get("confirm_password", "") + if new_password != confirm: + flash("Passwords do not match", "error") + else: + success, message = change_password(current_user.id, current_password, new_password) + flash(message, "success" if success else "error") + saved_keys = get_user_channel_keys(current_user.id) if current_user else [] + return render_template( + "account.html", + current_user=current_user, + saved_channel_keys=saved_keys, + max_channel_keys=MAX_CHANNEL_KEYS, + can_save_keys=can_save_channel_key(current_user.id) if current_user else False, + has_recovery=has_recovery_key(), + ) + + # ── Admin routes ───────────────────────────────────────────── + + @app.route("/admin/users") + @admin_required + def admin_users(): + users = get_all_users() + return render_template( + "admin/users.html", + users=users, + max_users=MAX_USERS, + can_create=can_create_user(), + ) + + @app.route("/admin/users/new", methods=["GET", "POST"]) + @admin_required + def admin_new_user(): + if request.method == "POST": + username = request.form.get("username", "") + temp_password = generate_temp_password() + success, message = create_user(username, temp_password) + if success: + flash(f"User '{username}' created with temporary password: {temp_password}", "success") + else: + flash(message, "error") + return redirect(url_for("admin_users")) + return render_template("admin/user_new.html") + + @app.route("/admin/users//delete", methods=["POST"]) + @admin_required + def admin_delete_user(user_id): + success, message = delete_user(user_id) + flash(message, "success" if success else "error") + return redirect(url_for("admin_users")) + + @app.route("/admin/users//reset", methods=["POST"]) + @admin_required + def admin_reset_password(user_id): + temp_password = generate_temp_password() + success, message = reset_user_password(user_id, temp_password) + if success: + user = get_user_by_id(user_id) + flash(f"Password for '{user.username}' reset to: {temp_password}", "success") + else: + flash(message, "error") + return redirect(url_for("admin_users")) + + # ── Generate routes ─────────────────────────────────────────── + + @app.route("/generate", methods=["GET", "POST"]) + @login_required + def generate(): + if request.method == "POST": + words_per_passphrase = int( + request.form.get("words_per_passphrase", DEFAULT_PASSPHRASE_WORDS) + ) + use_pin = request.form.get("use_pin") == "on" + use_rsa = request.form.get("use_rsa") == "on" + + if not use_pin and not use_rsa: + flash("You must select at least one security factor (PIN or RSA Key)", "error") + return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE) + + pin_length = int(request.form.get("pin_length", 6)) + rsa_bits = int(request.form.get("rsa_bits", 2048)) + words_per_passphrase = max(MIN_PASSPHRASE_WORDS, min(12, words_per_passphrase)) + pin_length = max(MIN_PIN_LENGTH, min(MAX_PIN_LENGTH, pin_length)) + if rsa_bits not in VALID_RSA_SIZES: + rsa_bits = 2048 + + try: + creds = generate_credentials( + use_pin=use_pin, + use_rsa=use_rsa, + pin_length=pin_length, + rsa_bits=rsa_bits, + passphrase_words=words_per_passphrase, + ) + + qr_token = None + qr_needs_compression = False + qr_too_large = False + + if creds.rsa_key_pem and _HAS_QRCODE: + if can_fit_in_qr(creds.rsa_key_pem, compress=True): + qr_needs_compression = True + else: + qr_too_large = True + + if not qr_too_large: + qr_token = secrets.token_urlsafe(16) + cleanup_temp_files() + temp_storage.save_temp_file( + qr_token, + creds.rsa_key_pem.encode(), + {"filename": "rsa_key.pem", "type": "rsa_key", "compress": qr_needs_compression}, + ) + + return render_template( + "stego/generate.html", + passphrase=creds.passphrase, + pin=creds.pin, + generated=True, + words_per_passphrase=words_per_passphrase, + pin_length=pin_length if use_pin else None, + use_pin=use_pin, + use_rsa=use_rsa, + rsa_bits=rsa_bits, + rsa_key_pem=creds.rsa_key_pem, + passphrase_entropy=creds.passphrase_entropy, + pin_entropy=creds.pin_entropy, + rsa_entropy=creds.rsa_entropy, + total_entropy=creds.total_entropy, + has_qrcode=_HAS_QRCODE, + qr_token=qr_token, + qr_needs_compression=qr_needs_compression, + qr_too_large=qr_too_large, + ) + except Exception as e: + flash(f"Error generating credentials: {e}", "error") + return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE) + + return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE) + + # ── Encode (placeholder — full route migration is Phase 1b) ─── + + @app.route("/encode", methods=["GET", "POST"]) + @login_required + def encode(): + return render_template("stego/encode.html") + + @app.route("/decode", methods=["GET", "POST"]) + @login_required + def decode(): + return render_template("stego/decode.html") + + @app.route("/tools") + @login_required + def tools(): + return render_template("stego/tools.html") + + @app.route("/about") + def about(): + return render_template("stego/about.html") + + # ── API routes (capacity, channel, download) ────────────────── + + @app.route("/api/channel/status") + @login_required + def api_channel_status(): + result = subprocess_stego.get_channel_status(reveal=False) + if result.success: + return jsonify({ + "success": True, + "mode": result.mode, + "configured": result.configured, + "fingerprint": result.fingerprint, + "source": result.source, + }) + else: + status = get_channel_status() + return jsonify({ + "success": True, + "mode": status["mode"], + "configured": status["configured"], + "fingerprint": status.get("fingerprint"), + "source": status.get("source"), + }) + + @app.route("/api/compare-capacity", methods=["POST"]) + @login_required + def api_compare_capacity(): + carrier = request.files.get("carrier") + if not carrier: + return jsonify({"error": "No carrier image provided"}), 400 + try: + carrier_data = carrier.read() + result = subprocess_stego.compare_modes(carrier_data) + if not result.success: + return jsonify({"error": result.error or "Comparison failed"}), 500 + return jsonify({ + "success": True, + "width": result.width, + "height": result.height, + "lsb": { + "capacity_bytes": result.lsb["capacity_bytes"], + "capacity_kb": round(result.lsb["capacity_kb"], 1), + "output": result.lsb.get("output", "PNG"), + }, + "dct": { + "capacity_bytes": result.dct["capacity_bytes"], + "capacity_kb": round(result.dct["capacity_kb"], 1), + "output": result.dct.get("output", "JPEG"), + "available": result.dct.get("available", True), + "ratio": round(result.dct.get("ratio_vs_lsb", 0), 1), + }, + }) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + @app.route("/api/download/") + @login_required + def api_download(file_id): + file_info = temp_storage.get_temp_file(file_id) + if not file_info: + return jsonify({"error": "File not found or expired"}), 404 + filename = file_info.get("filename", "download") + mime_type = file_info.get("mime_type", "application/octet-stream") + return send_file( + io.BytesIO(file_info["data"]), + mimetype=mime_type, + as_attachment=True, + download_name=filename, + ) + + @app.route("/api/generate/credentials", methods=["POST"]) + @login_required + def api_generate_credentials(): + try: + creds = generate_credentials(use_pin=True, use_rsa=False) + return jsonify({ + "success": True, + "passphrase": creds.passphrase, + "pin": creds.pin, + }) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + def _load_secret_key(app: Flask) -> None: """Load or generate persistent secret key for Flask sessions.""" SECRET_KEY_FILE.parent.mkdir(parents=True, exist_ok=True) @@ -151,21 +754,3 @@ def _load_secret_key(app: Flask) -> None: SECRET_KEY_FILE.write_bytes(key) SECRET_KEY_FILE.chmod(0o600) app.secret_key = key - - -def _is_authenticated() -> bool: - """Check if current request has an authenticated session.""" - # TODO: Wire up auth.py from stegasoo - return True - - -def _is_admin() -> bool: - """Check if current user is an admin.""" - # TODO: Wire up auth.py from stegasoo - return True - - -def _get_username() -> str: - """Get current user's username.""" - # TODO: Wire up auth.py from stegasoo - return "admin" diff --git a/frontends/web/auth.py b/frontends/web/auth.py new file mode 100644 index 0000000..27e0a68 --- /dev/null +++ b/frontends/web/auth.py @@ -0,0 +1,964 @@ +""" +Stegasoo Authentication Module (v4.1.0) + +Multi-user authentication with role-based access control. +- Admin user created at first-run setup +- Admin can create up to 16 additional users +- Uses Argon2id password hashing +- Flask sessions for authentication state +- SQLite3 for user storage +""" + +import functools +import secrets +import sqlite3 +import string +from dataclasses import dataclass +from pathlib import Path + +from argon2 import PasswordHasher +from argon2.exceptions import VerifyMismatchError +from flask import current_app, flash, g, redirect, session, url_for + +# Argon2 password hasher (lighter than stegasoo's 256MB for faster login) +ph = PasswordHasher( + time_cost=3, + memory_cost=65536, # 64MB + parallelism=4, + hash_len=32, + salt_len=16, +) + +# Constants +MAX_USERS = 16 # Plus 1 admin = 17 total +MAX_CHANNEL_KEYS = 10 # Per user +ROLE_ADMIN = "admin" +ROLE_USER = "user" + + +@dataclass +class User: + """User data class.""" + + id: int + username: str + role: str + created_at: str + + @property + def is_admin(self) -> bool: + return self.role == ROLE_ADMIN + + +def get_db_path() -> Path: + """Get database path — uses soosef auth directory.""" + from soosef.paths import AUTH_DB + + AUTH_DB.parent.mkdir(parents=True, exist_ok=True) + return AUTH_DB + + +def get_db() -> sqlite3.Connection: + """Get database connection, cached on Flask g object.""" + if "db" not in g: + g.db = sqlite3.connect(get_db_path()) + g.db.row_factory = sqlite3.Row + return g.db + + +def close_db(e=None): + """Close database connection at end of request.""" + db = g.pop("db", None) + if db is not None: + db.close() + + +def init_db(): + """Initialize database schema with migration support.""" + db = get_db() + + # Check if we need to migrate from old single-user schema + cursor = db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='admin_user'") + has_old_table = cursor.fetchone() is not None + + cursor = db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") + has_new_table = cursor.fetchone() is not None + + if has_old_table and not has_new_table: + # Migrate from old schema + _migrate_from_single_user(db) + elif not has_new_table: + # Fresh install - create new schema + _create_schema(db) + else: + # Existing install - check for new tables (migrations) + _ensure_channel_keys_table(db) + _ensure_app_settings_table(db) + + +def _create_schema(db: sqlite3.Connection): + """Create the multi-user schema.""" + db.executescript(""" + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'user', + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_users_username ON users(username); + CREATE INDEX IF NOT EXISTS idx_users_role ON users(role); + + CREATE TABLE IF NOT EXISTS user_channel_keys ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + name TEXT NOT NULL, + channel_key TEXT NOT NULL, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + last_used_at TEXT, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + UNIQUE(user_id, channel_key) + ); + + CREATE INDEX IF NOT EXISTS idx_channel_keys_user ON user_channel_keys(user_id); + + -- App-level settings (v4.1.0) + -- Stores recovery key hash and other instance-wide settings + CREATE TABLE IF NOT EXISTS app_settings ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + """) + db.commit() + + +def _migrate_from_single_user(db: sqlite3.Connection): + """Migrate from old single-user admin_user table to multi-user users table.""" + # Create new table + _create_schema(db) + + # Copy admin user from old table + old_user = db.execute( + "SELECT username, password_hash, created_at FROM admin_user WHERE id = 1" + ).fetchone() + + if old_user: + db.execute( + """ + INSERT INTO users (username, password_hash, role, created_at) + VALUES (?, ?, 'admin', ?) + """, + (old_user["username"], old_user["password_hash"], old_user["created_at"]), + ) + db.commit() + + # Drop old table + db.execute("DROP TABLE admin_user") + db.commit() + + +def _ensure_channel_keys_table(db: sqlite3.Connection): + """Ensure user_channel_keys table exists (migration for existing installs).""" + cursor = db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='user_channel_keys'" + ) + if cursor.fetchone() is None: + db.executescript(""" + CREATE TABLE IF NOT EXISTS user_channel_keys ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + name TEXT NOT NULL, + channel_key TEXT NOT NULL, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + last_used_at TEXT, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + UNIQUE(user_id, channel_key) + ); + + CREATE INDEX IF NOT EXISTS idx_channel_keys_user ON user_channel_keys(user_id); + """) + db.commit() + + +def _ensure_app_settings_table(db: sqlite3.Connection): + """Ensure app_settings table exists (v4.1.0 migration).""" + cursor = db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='app_settings'") + if cursor.fetchone() is None: + db.executescript(""" + CREATE TABLE IF NOT EXISTS app_settings ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + """) + db.commit() + + +# ============================================================================= +# App Settings (v4.1.0) +# ============================================================================= + + +def get_app_setting(key: str) -> str | None: + """Get an app-level setting value.""" + db = get_db() + row = db.execute("SELECT value FROM app_settings WHERE key = ?", (key,)).fetchone() + return row["value"] if row else None + + +def set_app_setting(key: str, value: str) -> None: + """Set an app-level setting value.""" + db = get_db() + db.execute( + """ + INSERT INTO app_settings (key, value) + VALUES (?, ?) + ON CONFLICT(key) DO UPDATE SET value = ?, updated_at = CURRENT_TIMESTAMP + """, + (key, value, value), + ) + db.commit() + + +def delete_app_setting(key: str) -> bool: + """Delete an app-level setting. Returns True if deleted.""" + db = get_db() + cursor = db.execute("DELETE FROM app_settings WHERE key = ?", (key,)) + db.commit() + return cursor.rowcount > 0 + + +# ============================================================================= +# Recovery Key Management (v4.1.0) +# ============================================================================= + + +# Setting key for recovery hash +RECOVERY_KEY_SETTING = "recovery_key_hash" + + +def has_recovery_key() -> bool: + """Check if a recovery key has been configured.""" + return get_app_setting(RECOVERY_KEY_SETTING) is not None + + +def get_recovery_key_hash() -> str | None: + """Get the stored recovery key hash.""" + return get_app_setting(RECOVERY_KEY_SETTING) + + +def set_recovery_key_hash(key_hash: str) -> None: + """Store a recovery key hash.""" + set_app_setting(RECOVERY_KEY_SETTING, key_hash) + + +def clear_recovery_key() -> bool: + """Remove the recovery key. Returns True if removed.""" + return delete_app_setting(RECOVERY_KEY_SETTING) + + +def verify_and_reset_admin_password(recovery_key: str, new_password: str) -> tuple[bool, str]: + """ + Verify recovery key and reset the first admin's password. + + Args: + recovery_key: User-provided recovery key + new_password: New password to set + + Returns: + (success, message) tuple + """ + from stegasoo.recovery import verify_recovery_key + + stored_hash = get_recovery_key_hash() + if not stored_hash: + return False, "No recovery key configured for this instance" + + if not verify_recovery_key(recovery_key, stored_hash): + return False, "Invalid recovery key" + + # Find first admin user + db = get_db() + admin = db.execute( + "SELECT id, username FROM users WHERE role = 'admin' ORDER BY id LIMIT 1" + ).fetchone() + + if not admin: + return False, "No admin user found" + + # Reset password + new_hash = ph.hash(new_password) + db.execute( + "UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (new_hash, admin["id"]), + ) + db.commit() + + # Invalidate all sessions for this user + invalidate_user_sessions(admin["id"]) + + return True, f"Password reset for '{admin['username']}'" + + +# ============================================================================= +# User Queries +# ============================================================================= + + +def any_users_exist() -> bool: + """Check if any users have been created (for first-run detection).""" + db = get_db() + result = db.execute("SELECT 1 FROM users LIMIT 1").fetchone() + return result is not None + + +def user_exists() -> bool: + """Alias for any_users_exist() for backwards compatibility.""" + return any_users_exist() + + +def get_user_count() -> int: + """Get total number of users.""" + db = get_db() + result = db.execute("SELECT COUNT(*) FROM users").fetchone() + return result[0] if result else 0 + + +def get_non_admin_count() -> int: + """Get number of non-admin users.""" + db = get_db() + result = db.execute("SELECT COUNT(*) FROM users WHERE role != 'admin'").fetchone() + return result[0] if result else 0 + + +def can_create_user() -> bool: + """Check if we can create more users (within limit).""" + return get_non_admin_count() < MAX_USERS + + +def get_user_by_id(user_id: int) -> User | None: + """Get user by ID.""" + db = get_db() + row = db.execute( + "SELECT id, username, role, created_at FROM users WHERE id = ?", (user_id,) + ).fetchone() + if row: + return User( + id=row["id"], + username=row["username"], + role=row["role"], + created_at=row["created_at"], + ) + return None + + +def get_user_by_username(username: str) -> User | None: + """Get user by username.""" + db = get_db() + row = db.execute( + "SELECT id, username, role, created_at FROM users WHERE username = ?", + (username,), + ).fetchone() + if row: + return User( + id=row["id"], + username=row["username"], + role=row["role"], + created_at=row["created_at"], + ) + return None + + +def get_all_users() -> list[User]: + """Get all users, admins first, then by creation date.""" + db = get_db() + rows = db.execute(""" + SELECT id, username, role, created_at FROM users + ORDER BY role = 'admin' DESC, created_at ASC + """).fetchall() + return [ + User( + id=row["id"], + username=row["username"], + role=row["role"], + created_at=row["created_at"], + ) + for row in rows + ] + + +def get_current_user() -> User | None: + """Get the currently logged-in user from session.""" + user_id = session.get("user_id") + if user_id: + return get_user_by_id(user_id) + return None + + +def get_username() -> str: + """Get current user's username (backwards compatibility).""" + user = get_current_user() + return user.username if user else "unknown" + + +# ============================================================================= +# Authentication +# ============================================================================= + + +def verify_user_password(username: str, password: str) -> User | None: + """ + Verify password for a user. + + Returns User if valid, None if invalid. + Also rehashes password if needed. + """ + db = get_db() + row = db.execute( + "SELECT id, username, role, created_at, password_hash FROM users WHERE username = ?", + (username,), + ).fetchone() + + if not row: + return None + + try: + ph.verify(row["password_hash"], password) + + # Rehash if parameters changed + if ph.check_needs_rehash(row["password_hash"]): + new_hash = ph.hash(password) + db.execute( + "UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (new_hash, row["id"]), + ) + db.commit() + + return User( + id=row["id"], + username=row["username"], + role=row["role"], + created_at=row["created_at"], + ) + except VerifyMismatchError: + return None + + +def verify_password(password: str) -> bool: + """Verify password for current user (backwards compatibility).""" + user = get_current_user() + if not user: + return False + result = verify_user_password(user.username, password) + return result is not None + + +def is_authenticated() -> bool: + """Check if current session is authenticated.""" + return session.get("user_id") is not None + + +def is_admin() -> bool: + """Check if current user is an admin.""" + user = get_current_user() + return user.is_admin if user else False + + +def login_user(user: User): + """Set up session for logged-in user.""" + session["user_id"] = user.id + session["username"] = user.username + session["role"] = user.role + # Legacy compatibility + session["authenticated"] = True + + +def logout_user(): + """Clear session for logout.""" + session.clear() + + +# ============================================================================= +# User Management +# ============================================================================= + + +def generate_temp_password(length: int = 8) -> str: + """Generate a random temporary password.""" + alphabet = string.ascii_letters + string.digits + return "".join(secrets.choice(alphabet) for _ in range(length)) + + +def validate_username(username: str) -> tuple[bool, str]: + """ + Validate username format. + + Rules: 3-80 chars, alphanumeric + underscore/hyphen + @/. for email-style + """ + if not username: + return False, "Username is required" + + if len(username) < 3: + return False, "Username must be at least 3 characters" + + if len(username) > 80: + return False, "Username must be at most 80 characters" + + # Allow: alphanumeric, underscore, hyphen, @, . (for email-style) + allowed = set(string.ascii_letters + string.digits + "_-@.") + if not all(c in allowed for c in username): + return False, "Username can only contain letters, numbers, underscore, hyphen, @ and ." + + # Must start with letter or number + if username[0] not in string.ascii_letters + string.digits: + return False, "Username must start with a letter or number" + + return True, "" + + +def validate_password(password: str) -> tuple[bool, str]: + """Validate password requirements.""" + if not password: + return False, "Password is required" + + if len(password) < 8: + return False, "Password must be at least 8 characters" + + return True, "" + + +def create_user( + username: str, password: str, role: str = ROLE_USER +) -> tuple[bool, str, User | None]: + """ + Create a new user. + + Returns (success, message, user). + """ + # Validate username + valid, msg = validate_username(username) + if not valid: + return False, msg, None + + # Validate password + valid, msg = validate_password(password) + if not valid: + return False, msg, None + + # Check if username already exists + if get_user_by_username(username): + return False, "Username already exists", None + + # Check user limit (only for non-admin users) + if role != ROLE_ADMIN and not can_create_user(): + return False, f"Maximum of {MAX_USERS} users reached", None + + # Create user + password_hash = ph.hash(password) + db = get_db() + + try: + cursor = db.execute( + """ + INSERT INTO users (username, password_hash, role) + VALUES (?, ?, ?) + """, + (username, password_hash, role), + ) + db.commit() + + user = get_user_by_id(cursor.lastrowid) + return True, "User created successfully", user + except sqlite3.IntegrityError: + return False, "Username already exists", None + + +def create_admin_user(username: str, password: str) -> tuple[bool, str]: + """Create the initial admin user (first-run setup).""" + if any_users_exist(): + return False, "Admin user already exists" + + success, msg, _ = create_user(username, password, ROLE_ADMIN) + return success, msg + + +def change_password(user_id: int, current_password: str, new_password: str) -> tuple[bool, str]: + """Change a user's password (requires current password).""" + user = get_user_by_id(user_id) + if not user: + return False, "User not found" + + # Verify current password + if not verify_user_password(user.username, current_password): + return False, "Current password is incorrect" + + # Validate new password + valid, msg = validate_password(new_password) + if not valid: + return False, msg + + # Update password + new_hash = ph.hash(new_password) + db = get_db() + db.execute( + "UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (new_hash, user_id), + ) + db.commit() + + return True, "Password changed successfully" + + +def reset_user_password(user_id: int, new_password: str) -> tuple[bool, str]: + """Reset a user's password (admin function, no current password required).""" + user = get_user_by_id(user_id) + if not user: + return False, "User not found" + + # Validate new password + valid, msg = validate_password(new_password) + if not valid: + return False, msg + + # Update password + new_hash = ph.hash(new_password) + db = get_db() + db.execute( + "UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (new_hash, user_id), + ) + db.commit() + + # Invalidate user's sessions + invalidate_user_sessions(user_id) + + return True, "Password reset successfully" + + +def delete_user(user_id: int, current_user_id: int) -> tuple[bool, str]: + """ + Delete a user. + + Cannot delete yourself or the last admin. + """ + if user_id == current_user_id: + return False, "Cannot delete yourself" + + user = get_user_by_id(user_id) + if not user: + return False, "User not found" + + # Check if this is the last admin + if user.role == ROLE_ADMIN: + db = get_db() + admin_count = db.execute("SELECT COUNT(*) FROM users WHERE role = 'admin'").fetchone()[0] + if admin_count <= 1: + return False, "Cannot delete the last admin" + + # Invalidate user's sessions before deletion + invalidate_user_sessions(user_id) + + # Delete user + db = get_db() + db.execute("DELETE FROM users WHERE id = ?", (user_id,)) + db.commit() + + return True, f"User '{user.username}' deleted" + + +def invalidate_user_sessions(user_id: int): + """ + Invalidate all sessions for a user. + + This is called when a user is deleted or their password is reset. + Since we use server-side sessions, we increment a "session version" + that's checked on each request. + """ + # For Flask's default session (client-side), we can't truly invalidate. + # But we can add a check - store a "valid_from" timestamp in the DB + # and compare against session creation time. + # + # For now, we'll use a simpler approach: store invalidated user IDs + # in app config (memory) which gets checked by login_required. + # + # This works for single-process deployments (like RPi). + # For multi-process, would need Redis or DB-backed sessions. + + if "invalidated_users" not in current_app.config: + current_app.config["invalidated_users"] = set() + + current_app.config["invalidated_users"].add(user_id) + + +def is_session_valid() -> bool: + """Check if current session is still valid (user not deleted/invalidated).""" + user_id = session.get("user_id") + if not user_id: + return False + + # Check if user was invalidated + invalidated = current_app.config.get("invalidated_users", set()) + if user_id in invalidated: + return False + + # Check if user still exists + if not get_user_by_id(user_id): + return False + + return True + + +# ============================================================================= +# Channel Keys +# ============================================================================= + + +@dataclass +class ChannelKey: + """Saved channel key data class.""" + + id: int + user_id: int + name: str + channel_key: str + created_at: str + last_used_at: str | None + + +def get_user_channel_keys(user_id: int) -> list[ChannelKey]: + """Get all saved channel keys for a user, most recently used first.""" + db = get_db() + rows = db.execute( + """ + SELECT id, user_id, name, channel_key, created_at, last_used_at + FROM user_channel_keys + WHERE user_id = ? + ORDER BY last_used_at DESC NULLS LAST, created_at DESC + """, + (user_id,), + ).fetchall() + return [ + ChannelKey( + id=row["id"], + user_id=row["user_id"], + name=row["name"], + channel_key=row["channel_key"], + created_at=row["created_at"], + last_used_at=row["last_used_at"], + ) + for row in rows + ] + + +def get_channel_key_by_id(key_id: int, user_id: int) -> ChannelKey | None: + """Get a specific channel key (ensures user owns it).""" + db = get_db() + row = db.execute( + """ + SELECT id, user_id, name, channel_key, created_at, last_used_at + FROM user_channel_keys + WHERE id = ? AND user_id = ? + """, + (key_id, user_id), + ).fetchone() + if row: + return ChannelKey( + id=row["id"], + user_id=row["user_id"], + name=row["name"], + channel_key=row["channel_key"], + created_at=row["created_at"], + last_used_at=row["last_used_at"], + ) + return None + + +def get_channel_key_count(user_id: int) -> int: + """Get count of saved channel keys for a user.""" + db = get_db() + result = db.execute( + "SELECT COUNT(*) FROM user_channel_keys WHERE user_id = ?", (user_id,) + ).fetchone() + return result[0] if result else 0 + + +def can_save_channel_key(user_id: int) -> bool: + """Check if user can save more channel keys (within limit).""" + return get_channel_key_count(user_id) < MAX_CHANNEL_KEYS + + +def save_channel_key( + user_id: int, name: str, channel_key: str +) -> tuple[bool, str, ChannelKey | None]: + """ + Save a channel key for a user. + + Returns (success, message, key). + """ + # Validate name + name = name.strip() + if not name: + return False, "Key name is required", None + if len(name) > 50: + return False, "Key name must be at most 50 characters", None + + # Validate channel key format (hex string) + channel_key = channel_key.strip().lower() + if not channel_key: + return False, "Channel key is required", None + if not all(c in "0123456789abcdef" for c in channel_key): + return False, "Invalid channel key format", None + + # Check limit + if not can_save_channel_key(user_id): + return False, f"Maximum of {MAX_CHANNEL_KEYS} saved keys reached", None + + db = get_db() + try: + cursor = db.execute( + """ + INSERT INTO user_channel_keys (user_id, name, channel_key) + VALUES (?, ?, ?) + """, + (user_id, name, channel_key), + ) + db.commit() + + key = get_channel_key_by_id(cursor.lastrowid, user_id) + return True, "Channel key saved", key + except sqlite3.IntegrityError: + return False, "This channel key is already saved", None + + +def update_channel_key_name(key_id: int, user_id: int, new_name: str) -> tuple[bool, str]: + """Update the name of a saved channel key.""" + new_name = new_name.strip() + if not new_name: + return False, "Key name is required" + if len(new_name) > 50: + return False, "Key name must be at most 50 characters" + + key = get_channel_key_by_id(key_id, user_id) + if not key: + return False, "Channel key not found" + + db = get_db() + db.execute( + "UPDATE user_channel_keys SET name = ? WHERE id = ? AND user_id = ?", + (new_name, key_id, user_id), + ) + db.commit() + return True, "Key name updated" + + +def update_channel_key_last_used(key_id: int, user_id: int): + """Update the last_used_at timestamp for a channel key.""" + db = get_db() + db.execute( + """ + UPDATE user_channel_keys + SET last_used_at = CURRENT_TIMESTAMP + WHERE id = ? AND user_id = ? + """, + (key_id, user_id), + ) + db.commit() + + +def delete_channel_key(key_id: int, user_id: int) -> tuple[bool, str]: + """Delete a saved channel key.""" + key = get_channel_key_by_id(key_id, user_id) + if not key: + return False, "Channel key not found" + + db = get_db() + db.execute( + "DELETE FROM user_channel_keys WHERE id = ? AND user_id = ?", + (key_id, user_id), + ) + db.commit() + return True, f"Key '{key.name}' deleted" + + +# ============================================================================= +# Decorators +# ============================================================================= + + +def login_required(f): + """Decorator to require login for a route.""" + + @functools.wraps(f) + def decorated_function(*args, **kwargs): + # Check if auth is enabled + if not current_app.config.get("AUTH_ENABLED", True): + return f(*args, **kwargs) + + # Check for first-run setup + if not any_users_exist(): + return redirect(url_for("setup")) + + # Check authentication + if not is_authenticated(): + return redirect(url_for("login")) + + # Check if session is still valid (user not deleted) + if not is_session_valid(): + logout_user() + flash("Your session has expired. Please log in again.", "warning") + return redirect(url_for("login")) + + return f(*args, **kwargs) + + return decorated_function + + +def admin_required(f): + """Decorator to require admin role for a route.""" + + @functools.wraps(f) + def decorated_function(*args, **kwargs): + # Check if auth is enabled + if not current_app.config.get("AUTH_ENABLED", True): + return f(*args, **kwargs) + + # Check for first-run setup + if not any_users_exist(): + return redirect(url_for("setup")) + + # Check authentication + if not is_authenticated(): + return redirect(url_for("login")) + + # Check if session is still valid + if not is_session_valid(): + logout_user() + flash("Your session has expired. Please log in again.", "warning") + return redirect(url_for("login")) + + # Check admin role + if not is_admin(): + flash("Admin access required", "error") + return redirect(url_for("index")) + + return f(*args, **kwargs) + + return decorated_function + + +# ============================================================================= +# App Initialization +# ============================================================================= + + +def init_app(app): + """Initialize auth module with Flask app.""" + app.teardown_appcontext(close_db) + + with app.app_context(): + init_db() diff --git a/frontends/web/blueprints/admin.py b/frontends/web/blueprints/admin.py index 5c67437..c29da14 100644 --- a/frontends/web/blueprints/admin.py +++ b/frontends/web/blueprints/admin.py @@ -1,21 +1,4 @@ """ -Admin blueprint — user management and system settings. - -Will be adapted from stegasoo's admin routes in frontends/web/app.py. +Admin routes are registered directly in app.py via _register_stegasoo_routes() +alongside the auth routes (setup, login, logout, account, admin/users). """ - -from flask import Blueprint, render_template - -bp = Blueprint("admin", __name__, url_prefix="/admin") - - -@bp.route("/users") -def users(): - """User management.""" - return render_template("admin/users.html") - - -@bp.route("/settings") -def settings(): - """System settings.""" - return render_template("admin/settings.html") diff --git a/frontends/web/blueprints/stego.py b/frontends/web/blueprints/stego.py index 054b0ef..2981f83 100644 --- a/frontends/web/blueprints/stego.py +++ b/frontends/web/blueprints/stego.py @@ -1,35 +1,8 @@ """ -Steganography blueprint — encode, decode, generate, tools. +Steganography routes are registered directly in app.py via _register_stegasoo_routes() +rather than as a blueprint, because the stegasoo route logic (3,600+ lines) uses +module-level state (ThreadPoolExecutor, jobs dict, subprocess_stego instance) +that doesn't translate cleanly to a blueprint. -Routes lifted from stegasoo's frontends/web/app.py. In Phase 1, these -will be fully implemented by migrating the stegasoo route logic here. -For now, they render placeholder templates. +The stego templates are in templates/stego/ and extend the soosef base.html. """ - -from flask import Blueprint, render_template - -bp = Blueprint("stego", __name__) - - -@bp.route("/encode", methods=["GET", "POST"]) -def encode(): - """Encode a message into a carrier image.""" - return render_template("stego/encode.html") - - -@bp.route("/decode", methods=["GET", "POST"]) -def decode(): - """Decode a message from a stego image.""" - return render_template("stego/decode.html") - - -@bp.route("/generate") -def generate(): - """Generate credentials (passphrase, PIN, RSA keys).""" - return render_template("stego/generate.html") - - -@bp.route("/tools") -def tools(): - """Image analysis and utility tools.""" - return render_template("stego/tools.html") diff --git a/frontends/web/ssl_utils.py b/frontends/web/ssl_utils.py new file mode 100644 index 0000000..a07ea25 --- /dev/null +++ b/frontends/web/ssl_utils.py @@ -0,0 +1,155 @@ +""" +SSL Certificate Utilities + +Auto-generates self-signed certificates for HTTPS. +Uses cryptography library (already a dependency). +""" + +import datetime +import ipaddress +import socket +from pathlib import Path + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + + +def _get_local_ips() -> list[str]: + """Get local IP addresses for this machine.""" + ips = [] + try: + # Get hostname and resolve to IP + hostname = socket.gethostname() + for addr_info in socket.getaddrinfo(hostname, None, socket.AF_INET): + ip = addr_info[4][0] + if ip not in ips and not ip.startswith("127."): + ips.append(ip) + except Exception: + pass + + # Also try connecting to external to get primary interface IP + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + if ip not in ips: + ips.append(ip) + s.close() + except Exception: + pass + + return ips + + +def get_cert_paths(base_dir: Path) -> tuple[Path, Path]: + """Get paths for cert and key files.""" + cert_dir = base_dir / "certs" + cert_dir.mkdir(parents=True, exist_ok=True) + return cert_dir / "server.crt", cert_dir / "server.key" + + +def certs_exist(base_dir: Path) -> bool: + """Check if both cert files exist.""" + cert_path, key_path = get_cert_paths(base_dir) + return cert_path.exists() and key_path.exists() + + +def generate_self_signed_cert( + base_dir: Path, + hostname: str = "localhost", + days_valid: int = 365, +) -> tuple[Path, Path]: + """ + Generate self-signed SSL certificate. + + Args: + base_dir: Base directory for certs folder + hostname: Server hostname for certificate + days_valid: Certificate validity in days + + Returns: + Tuple of (cert_path, key_path) + """ + cert_path, key_path = get_cert_paths(base_dir) + + # Generate RSA key + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + # Create certificate + subject = issuer = x509.Name( + [ + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Stegasoo"), + x509.NameAttribute(NameOID.COMMON_NAME, hostname), + ] + ) + + # Subject Alternative Names + san_list = [ + x509.DNSName(hostname), + x509.DNSName("localhost"), + x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")), + ] + + # Add hostname.local for mDNS access + if not hostname.endswith(".local"): + san_list.append(x509.DNSName(f"{hostname}.local")) + + # Add the hostname as IP if it looks like one + try: + san_list.append(x509.IPAddress(ipaddress.IPv4Address(hostname))) + except ipaddress.AddressValueError: + pass + + # Add local network IPs + for local_ip in _get_local_ips(): + try: + ip_addr = ipaddress.IPv4Address(local_ip) + if x509.IPAddress(ip_addr) not in san_list: + san_list.append(x509.IPAddress(ip_addr)) + except (ipaddress.AddressValueError, ValueError): + pass + + now = datetime.datetime.now(datetime.UTC) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=days_valid)) + .add_extension( + x509.SubjectAlternativeName(san_list), + critical=False, + ) + .sign(key, hashes.SHA256()) + ) + + # Write key file (chmod 600) + key_path.write_bytes( + key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + ) + key_path.chmod(0o600) + + # Write cert file + cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM)) + + return cert_path, key_path + + +def ensure_certs(base_dir: Path, hostname: str = "localhost") -> tuple[Path, Path]: + """Ensure certificates exist, generating if needed.""" + if certs_exist(base_dir): + return get_cert_paths(base_dir) + + print(f"Generating self-signed SSL certificate for {hostname}...") + return generate_self_signed_cert(base_dir, hostname) diff --git a/frontends/web/static/js/auth.js b/frontends/web/static/js/auth.js new file mode 100644 index 0000000..a04edf6 --- /dev/null +++ b/frontends/web/static/js/auth.js @@ -0,0 +1,142 @@ +/** + * Stegasoo Authentication Pages JavaScript + * Handles login, setup, account, and admin user management pages + */ + +const StegasooAuth = { + + // ======================================================================== + // PASSWORD VISIBILITY TOGGLE + // ======================================================================== + + /** + * Toggle password field visibility + * @param {string} inputId - ID of the password input + * @param {HTMLElement} btn - The toggle button element + */ + togglePassword(inputId, btn) { + const input = document.getElementById(inputId); + const icon = btn.querySelector('i'); + if (!input) return; + + if (input.type === 'password') { + input.type = 'text'; + icon?.classList.replace('bi-eye', 'bi-eye-slash'); + } else { + input.type = 'password'; + icon?.classList.replace('bi-eye-slash', 'bi-eye'); + } + }, + + // ======================================================================== + // PASSWORD CONFIRMATION VALIDATION + // ======================================================================== + + /** + * Initialize password confirmation validation on a form + * @param {string} formId - ID of the form + * @param {string} passwordId - ID of the password field + * @param {string} confirmId - ID of the confirmation field + */ + initPasswordConfirmation(formId, passwordId, confirmId) { + const form = document.getElementById(formId); + if (!form) return; + + form.addEventListener('submit', function(e) { + const password = document.getElementById(passwordId)?.value; + const confirm = document.getElementById(confirmId)?.value; + + if (password !== confirm) { + e.preventDefault(); + alert('Passwords do not match'); + } + }); + }, + + // ======================================================================== + // COPY TO CLIPBOARD + // ======================================================================== + + /** + * Copy field value to clipboard with visual feedback + * @param {string} fieldId - ID of the input field to copy + */ + copyField(fieldId) { + const field = document.getElementById(fieldId); + if (!field) return; + + field.select(); + navigator.clipboard.writeText(field.value).then(() => { + const btn = field.nextElementSibling; + if (!btn) return; + + const originalHTML = btn.innerHTML; + btn.innerHTML = ''; + setTimeout(() => btn.innerHTML = originalHTML, 1000); + }); + }, + + // ======================================================================== + // PASSWORD GENERATION + // ======================================================================== + + /** + * Generate a random password + * @param {number} length - Password length (default 8) + * @returns {string} Generated password + */ + generatePassword(length = 8) { + const chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'; + let password = ''; + for (let i = 0; i < length; i++) { + password += chars.charAt(Math.floor(Math.random() * chars.length)); + } + return password; + }, + + /** + * Regenerate password and update input field + * @param {string} inputId - ID of the password input + * @param {number} length - Password length + */ + regeneratePassword(inputId = 'passwordInput', length = 8) { + const input = document.getElementById(inputId); + if (input) { + input.value = this.generatePassword(length); + } + }, + + // ======================================================================== + // DELETE CONFIRMATION + // ======================================================================== + + /** + * Confirm deletion with a prompt + * @param {string} itemName - Name of item being deleted + * @param {string} formId - ID of the form to submit if confirmed + * @returns {boolean} True if confirmed + */ + confirmDelete(itemName, formId = null) { + const confirmed = confirm(`Are you sure you want to delete "${itemName}"? This cannot be undone.`); + if (confirmed && formId) { + const form = document.getElementById(formId); + form?.submit(); + } + return confirmed; + } +}; + +// Make togglePassword available globally for onclick handlers +function togglePassword(inputId, btn) { + StegasooAuth.togglePassword(inputId, btn); +} + +// Make copyField available globally for onclick handlers +function copyField(fieldId) { + StegasooAuth.copyField(fieldId); +} + +// Make regeneratePassword available globally for onclick handlers +function regeneratePassword() { + StegasooAuth.regeneratePassword(); +} diff --git a/frontends/web/static/js/generate.js b/frontends/web/static/js/generate.js new file mode 100644 index 0000000..1e737a4 --- /dev/null +++ b/frontends/web/static/js/generate.js @@ -0,0 +1,279 @@ +/** + * Stegasoo Generate Page JavaScript + * Handles credential generation form and display + */ + +const StegasooGenerate = { + + // ======================================================================== + // FORM CONTROLS + // ======================================================================== + + /** + * Initialize the words range slider + */ + initWordsSlider() { + const wordsRange = document.getElementById('wordsRange'); + const wordsValue = document.getElementById('wordsValue'); + + wordsRange?.addEventListener('input', function() { + const bits = this.value * 11; + wordsValue.textContent = `${this.value} words (~${bits} bits)`; + }); + }, + + /** + * Initialize PIN/RSA option toggles + */ + initOptionToggles() { + const usePinCheck = document.getElementById('usePinCheck'); + const useRsaCheck = document.getElementById('useRsaCheck'); + const pinOptions = document.getElementById('pinOptions'); + const rsaOptions = document.getElementById('rsaOptions'); + const rsaQrWarning = document.getElementById('rsaQrWarning'); + const rsaBitsSelect = document.getElementById('rsaBitsSelect'); + + usePinCheck?.addEventListener('change', function() { + pinOptions?.classList.toggle('d-none', !this.checked); + }); + + useRsaCheck?.addEventListener('change', function() { + rsaOptions?.classList.toggle('d-none', !this.checked); + }); + + // RSA key size QR warning (>3072 bits) + rsaBitsSelect?.addEventListener('change', function() { + rsaQrWarning?.classList.toggle('d-none', parseInt(this.value) <= 3072); + }); + }, + + // ======================================================================== + // CREDENTIAL VISIBILITY + // ======================================================================== + + pinHidden: false, + passphraseHidden: false, + + /** + * Toggle PIN visibility + */ + togglePinVisibility() { + const pinDigits = document.getElementById('pinDigits'); + const icon = document.getElementById('pinToggleIcon'); + const text = document.getElementById('pinToggleText'); + + this.pinHidden = !this.pinHidden; + pinDigits?.classList.toggle('blurred', this.pinHidden); + + if (icon) icon.className = this.pinHidden ? 'bi bi-eye' : 'bi bi-eye-slash'; + if (text) text.textContent = this.pinHidden ? 'Show' : 'Hide'; + }, + + /** + * Toggle passphrase visibility + */ + togglePassphraseVisibility() { + const display = document.getElementById('passphraseDisplay'); + const icon = document.getElementById('passphraseToggleIcon'); + const text = document.getElementById('passphraseToggleText'); + + this.passphraseHidden = !this.passphraseHidden; + display?.classList.toggle('blurred', this.passphraseHidden); + + if (icon) icon.className = this.passphraseHidden ? 'bi bi-eye' : 'bi bi-eye-slash'; + if (text) text.textContent = this.passphraseHidden ? 'Show' : 'Hide'; + }, + + // ======================================================================== + // MEMORY AID STORY GENERATION + // ======================================================================== + + currentStoryTemplate: 0, + + /** + * Story templates organized by word count (3-12 words supported) + */ + storyTemplates: { + 3: [ + w => `The ${w[0]} ${w[1]} ${w[2]}.`, + w => `${w[0]} loves ${w[1]} and ${w[2]}.`, + w => `A ${w[0]} found a ${w[1]} near the ${w[2]}.`, + w => `${w[0]}, ${w[1]}, ${w[2]} — never forget.`, + w => `The ${w[0]} hid the ${w[1]} under the ${w[2]}.`, + ], + 4: [ + w => `${w[0]} and ${w[1]} discovered a ${w[2]} made of ${w[3]}.`, + w => `The ${w[0]} ${w[1]} ate ${w[2]} for ${w[3]}.`, + w => `In the ${w[0]}, a ${w[1]} met a ${w[2]} carrying ${w[3]}.`, + w => `${w[0]} said "${w[1]}" while holding a ${w[2]} ${w[3]}.`, + w => `The secret: ${w[0]}, ${w[1]}, ${w[2]}, ${w[3]}.`, + ], + 5: [ + w => `${w[0]} traveled to ${w[1]} seeking the ${w[2]} of ${w[3]} and ${w[4]}.`, + w => `The ${w[0]} ${w[1]} lived in a ${w[2]} house with ${w[3]} ${w[4]}.`, + w => `"${w[0]}!" shouted ${w[1]} as the ${w[2]} ${w[3]} flew toward ${w[4]}.`, + w => `Captain ${w[0]} sailed the ${w[1]} ${w[2]} searching for ${w[3]} ${w[4]}.`, + w => `In ${w[0]} kingdom, ${w[1]} guards protected the ${w[2]} ${w[3]} ${w[4]}.`, + ], + 6: [ + w => `${w[0]} met ${w[1]} at the ${w[2]}. Together they found ${w[3]}, ${w[4]}, and ${w[5]}.`, + w => `The ${w[0]} ${w[1]} wore a ${w[2]} hat while eating ${w[3]} ${w[4]} ${w[5]}.`, + w => `Detective ${w[0]} found ${w[1]} ${w[2]} near the ${w[3]} ${w[4]} ${w[5]}.`, + w => `In the ${w[0]} ${w[1]}, a ${w[2]} ${w[3]} sang about ${w[4]} ${w[5]}.`, + w => `Chef ${w[0]} combined ${w[1]}, ${w[2]}, ${w[3]}, ${w[4]}, and ${w[5]}.`, + ], + 7: [ + w => `${w[0]} and ${w[1]} walked through the ${w[2]} ${w[3]} to find the ${w[4]} ${w[5]} ${w[6]}.`, + w => `The ${w[0]} professor studied ${w[1]} ${w[2]} while drinking ${w[3]} ${w[4]} with ${w[5]} ${w[6]}.`, + w => `"${w[0]} ${w[1]}!" yelled ${w[2]} as ${w[3]} ${w[4]} attacked the ${w[5]} ${w[6]}.`, + w => `In ${w[0]}, King ${w[1]} decreed that ${w[2]} ${w[3]} must honor ${w[4]} ${w[5]} ${w[6]}.`, + ], + 8: [ + w => `${w[0]} ${w[1]} and ${w[2]} ${w[3]} met at the ${w[4]} ${w[5]} to discuss ${w[6]} ${w[7]}.`, + w => `The ${w[0]} ${w[1]} ${w[2]} traveled from ${w[3]} to ${w[4]} carrying ${w[5]} ${w[6]} ${w[7]}.`, + w => `${w[0]} discovered that ${w[1]} ${w[2]} plus ${w[3]} ${w[4]} equals ${w[5]} ${w[6]} ${w[7]}.`, + ], + 9: [ + w => `${w[0]} ${w[1]} ${w[2]} watched as ${w[3]} ${w[4]} ${w[5]} danced with ${w[6]} ${w[7]} ${w[8]}.`, + w => `In the ${w[0]} ${w[1]} ${w[2]}, three friends — ${w[3]}, ${w[4]}, ${w[5]} — found ${w[6]} ${w[7]} ${w[8]}.`, + w => `The recipe: ${w[0]}, ${w[1]}, ${w[2]}, ${w[3]}, ${w[4]}, ${w[5]}, ${w[6]}, ${w[7]}, ${w[8]}.`, + ], + 10: [ + w => `${w[0]} ${w[1]} told ${w[2]} ${w[3]} about the ${w[4]} ${w[5]} ${w[6]} hidden in ${w[7]} ${w[8]} ${w[9]}.`, + w => `The ${w[0]} ${w[1]} ${w[2]} ${w[3]} ${w[4]} lived beside ${w[5]} ${w[6]} ${w[7]} ${w[8]} ${w[9]}.`, + ], + 11: [ + w => `${w[0]} ${w[1]} ${w[2]} and ${w[3]} ${w[4]} ${w[5]} discovered ${w[6]} ${w[7]} ${w[8]} ${w[9]} ${w[10]}.`, + w => `In ${w[0]} ${w[1]}, the ${w[2]} ${w[3]} ${w[4]} sang of ${w[5]} ${w[6]} ${w[7]} ${w[8]} ${w[9]} ${w[10]}.`, + ], + 12: [ + w => `${w[0]} ${w[1]} ${w[2]} met ${w[3]} ${w[4]} ${w[5]} at the ${w[6]} ${w[7]} ${w[8]} ${w[9]} ${w[10]} ${w[11]}.`, + w => `The twelve treasures: ${w[0]}, ${w[1]}, ${w[2]}, ${w[3]}, ${w[4]}, ${w[5]}, ${w[6]}, ${w[7]}, ${w[8]}, ${w[9]}, ${w[10]}, ${w[11]}.`, + ], + }, + + /** + * Wrap word in highlight span + */ + hl(word) { + return `${word}`; + }, + + /** + * Generate a memory story for given words + * @param {string[]} words - Array of passphrase words + * @param {number|null} idx - Template index (null for current) + * @returns {string} HTML story + */ + generateStory(words, idx = null) { + const count = words.length; + if (count === 0) return ''; + + // Clamp to supported range (3-12) + const templateKey = Math.max(3, Math.min(12, count)); + const templates = this.storyTemplates[templateKey]; + + if (!templates || templates.length === 0) { + // Fallback: just list the words + return words.map(w => this.hl(w)).join(' — '); + } + + const templateIdx = (idx ?? this.currentStoryTemplate) % templates.length; + // Apply highlighting to words + const highlighted = words.map(w => this.hl(w)); + return templates[templateIdx](highlighted); + }, + + /** + * Toggle memory aid visibility + * @param {string[]} words - Passphrase words array + */ + toggleMemoryAid(words) { + const container = document.getElementById('memoryAidContainer'); + const icon = document.getElementById('memoryAidIcon'); + const text = document.getElementById('memoryAidText'); + + const isHidden = container?.classList.contains('d-none'); + container?.classList.toggle('d-none', !isHidden); + + if (icon) icon.className = isHidden ? 'bi bi-lightbulb-fill' : 'bi bi-lightbulb'; + if (text) text.textContent = isHidden ? 'Hide Aid' : 'Memory Aid'; + + if (isHidden) { + document.getElementById('memoryStory').innerHTML = this.generateStory(words); + } + }, + + /** + * Regenerate story with next template + * @param {string[]} words - Passphrase words array + */ + regenerateStory(words) { + const count = words.length; + const templateKey = Math.max(3, Math.min(12, count)); + const templates = this.storyTemplates[templateKey] || []; + this.currentStoryTemplate = (this.currentStoryTemplate + 1) % Math.max(1, templates.length); + document.getElementById('memoryStory').innerHTML = this.generateStory(words, this.currentStoryTemplate); + }, + + // ======================================================================== + // QR CODE PRINTING + // ======================================================================== + + /** + * Print QR code in new window + */ + printQrCode() { + const qrImg = document.getElementById('qrCodeImage'); + if (!qrImg) return; + + const printWindow = window.open('', '_blank'); + printWindow.document.write(` + + + QR Code + + + + QR Code + + +{% if is_admin %} + +{% endif %} + +{% endblock %} diff --git a/frontends/web/templates/admin/password_reset.html b/frontends/web/templates/admin/password_reset.html new file mode 100644 index 0000000..09fe20e --- /dev/null +++ b/frontends/web/templates/admin/password_reset.html @@ -0,0 +1,50 @@ +{% extends "base.html" %} + +{% block title %}Password Reset - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ + Password Reset +
+
+
+ + Important: This password will only be shown once. + Make sure to share it with {{ username }} securely. +
+ +

+ The user's sessions have been invalidated. They will need to log in + again with the new password. +

+ +
+ +
+ + +
+
+ + +
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/frontends/web/templates/admin/user_created.html b/frontends/web/templates/admin/user_created.html new file mode 100644 index 0000000..fec2cc1 --- /dev/null +++ b/frontends/web/templates/admin/user_created.html @@ -0,0 +1,60 @@ +{% extends "base.html" %} + +{% block title %}User Created - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ + User Created Successfully +
+
+
+ + Important: This password will only be shown once. + Make sure to share it with the user securely. +
+ +
+ +
+ + +
+
+ +
+ +
+ + +
+
+ + +
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/frontends/web/templates/admin/user_new.html b/frontends/web/templates/admin/user_new.html new file mode 100644 index 0000000..905bc3f --- /dev/null +++ b/frontends/web/templates/admin/user_new.html @@ -0,0 +1,166 @@ +{% extends "base.html" %} + +{% block title %}Add User - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ + Add New User +
+
+
+
+ + +
+ Letters, numbers, underscore, hyphen, @ and . allowed. +
+
+ +
+ +
+ + +
+
+ Auto-generated password. You can edit or regenerate it. +
+
+ +
+ +
+ + + Cancel + +
+
+
+
+
+
+ + + +{% endblock %} + +{% block scripts %} + + +{% endblock %} diff --git a/frontends/web/templates/login.html b/frontends/web/templates/login.html new file mode 100644 index 0000000..b07042d --- /dev/null +++ b/frontends/web/templates/login.html @@ -0,0 +1,55 @@ +{% extends "base.html" %} + +{% block title %}Login - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ +
Login
+
+
+
+
+ + +
+ +
+ +
+ + +
+
+ + +
+ + +
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/frontends/web/templates/recover.html b/frontends/web/templates/recover.html new file mode 100644 index 0000000..38901a4 --- /dev/null +++ b/frontends/web/templates/recover.html @@ -0,0 +1,129 @@ +{% extends "base.html" %} + +{% block title %}Password Recovery - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ +
Password Recovery
+
+
+

+ Enter your recovery key to reset your admin password. +

+ + +
+
+

+ +

+
+
+
+
+ + +
+
+ + +
+ +
+
+
+
+
+ +
+ +
+ + +
+ Paste your full recovery key (with or without dashes) +
+
+ +
+ + +
+ +
+ + +
+
Minimum 8 characters
+
+ + +
+ +
+ + +
+
+ + +
+ + +
+
+ +
+ + Note: This will reset the admin password. If you don't have a valid recovery key, + you'll need to delete the database and reconfigure Stegasoo. +
+
+
+{% endblock %} + +{% block scripts %} + + +{% endblock %} diff --git a/frontends/web/templates/regenerate_recovery.html b/frontends/web/templates/regenerate_recovery.html new file mode 100644 index 0000000..0d88457 --- /dev/null +++ b/frontends/web/templates/regenerate_recovery.html @@ -0,0 +1,183 @@ +{% extends "base.html" %} + +{% block title %}Regenerate Recovery Key - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ +
{{ 'Regenerate' if has_existing else 'Generate' }} Recovery Key
+
+
+ {% if has_existing %} + +
+ + Warning: Your existing recovery key will be invalidated. + Make sure to save this new key before continuing. +
+ {% else %} + +
+ + What is a recovery key?
+ If you forget your admin password, this key is the ONLY way to reset it. +
+ {% endif %} + + +
+ +
+ + +
+
+ + + {% if qr_base64 %} +
+ + Recovery Key QR Code +
+ {% endif %} + + +
+ +
+ + {% if qr_base64 %} + + {% endif %} +
+
+ + +
+ +
+ +
+ +
JPG/PNG, 50KB-2MB
+
+ +
+
+ +
+ + +
+ + + +
+ + +
+ +
+ + + + + +
+
+
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/frontends/web/templates/setup.html b/frontends/web/templates/setup.html new file mode 100644 index 0000000..ffbe143 --- /dev/null +++ b/frontends/web/templates/setup.html @@ -0,0 +1,76 @@ +{% extends "base.html" %} + +{% block title %}Setup - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ +
Initial Setup
+
+
+

+ Welcome to Stegasoo! Create your admin account to get started. +

+ +
+
+ + +
+ +
+ +
+ + +
+
Minimum 8 characters
+
+ +
+ +
+ + +
+
+ + +
+
+
+ +
+ + This is a single-user setup. The admin account has full access to all features. +
+
+
+{% endblock %} + +{% block scripts %} + + +{% endblock %} diff --git a/frontends/web/templates/setup_recovery.html b/frontends/web/templates/setup_recovery.html new file mode 100644 index 0000000..86e1a29 --- /dev/null +++ b/frontends/web/templates/setup_recovery.html @@ -0,0 +1,176 @@ +{% extends "base.html" %} + +{% block title %}Recovery Key Setup - Stegasoo{% endblock %} + +{% block content %} +
+
+
+
+ +
Recovery Key Setup
+ Step 2 of 2 +
+
+ +
+ + What is a recovery key?
+ If you forget your admin password, this key is the ONLY way to reset it. + Save it somewhere safe - it will not be shown again. +
+ + +
+ +
+ + +
+
+ + + {% if qr_base64 %} +
+ + Recovery Key QR Code +
+ Scan with your phone's camera app +
+
+ {% endif %} + + +
+ +
+ + {% if qr_base64 %} + + {% endif %} +
+
+ +
+ + +
+ + + +
+ + +
+ +
+ + + + + +
+
+
+
+ + +
+
+ Security Notes +
+
+
    +
  • The recovery key is not stored - only a hash is saved
  • +
  • Keep it separate from your password (different location)
  • +
  • Anyone with this key can reset admin passwords
  • +
  • If you lose it and forget your password, you must recreate the database
  • +
+
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %}