#!/usr/bin/env python3 """ Stegasoo Web Frontend (v4.0.0) A production Flask application demonstrating proper web architecture patterns. This isn't just a quick demo - it's built to run on a Raspberry Pi 24/7. ARCHITECTURE OVERVIEW ===================== ┌─────────────────────────────────────────────────────────────────────┐ │ FLASK APPLICATION │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ Routes (/encode, /decode, /api/*) │ │ │ │ │ ├── auth.py # Session management, user accounts │ │ ├── temp_storage.py # File-based temp storage with expiry │ │ ├── subprocess_stego.py # Isolated encode/decode workers │ │ └── ssl_utils.py # Self-signed cert generation │ │ │ │ Templates (Jinja2) │ │ └── base.html → encode.html, decode.html, etc. │ │ │ │ Static assets (CSS, JS) │ │ └── Vanilla JS, no framework (keeps it simple) │ │ │ └─────────────────────────────────────────────────────────────────────┘ KEY PATTERNS ============ 1. SUBPROCESS ISOLATION Stegasoo's DCT mode uses scipy/jpeglib which can crash on malformed input. We run encode/decode in subprocesses so crashes don't take down the server: subprocess_stego = SubprocessStego(timeout=180) result = subprocess_stego.encode(carrier, ref, message, ...) If the subprocess crashes, we catch it and return an error gracefully. 2. ASYNC JOBS WITH PROGRESS Encoding large images can take 30+ seconds. We use ThreadPoolExecutor to run jobs in background threads with progress reporting: job_id = generate_job_id() _executor.submit(_run_encode_job, job_id, params) # Client polls /api/encode/progress/ for updates 3. CONTEXT PROCESSORS @app.context_processor injects variables into ALL templates: return {"version": __version__, "has_dct": has_dct_support()} Now every template can use {{ version }} without passing it explicitly. 4. BEFORE_REQUEST HOOKS @app.before_request runs before every request. We use it for: - First-run setup redirect (no users → /setup) - Session validation - Cleanup of old temp files 5. SECURE SECRET KEY Flask sessions need a secret key. We persist it to a file so sessions survive server restarts (otherwise everyone gets logged out). CHANGES in v4.0.0: - Added channel key support for deployment/group isolation - New /api/channel/status endpoint - Channel key selector on encode/decode pages CHANGES in v3.2.0: - Removed date dependency from all operations - Simplified user experience for asynchronous communications """ import io import mimetypes import os import secrets import sys import threading import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path import temp_storage from auth import ( MAX_CHANNEL_KEYS, MAX_USERS, admin_required, can_create_user, can_save_channel_key, change_password, clear_recovery_key, create_admin_user, create_user, delete_channel_key, delete_user, generate_temp_password, get_all_users, get_channel_key_by_id, get_current_user, get_non_admin_count, get_recovery_key_hash, get_user_by_id, get_user_channel_keys, get_username, has_recovery_key, is_admin, is_authenticated, login_required, login_user, logout_user, reset_user_password, save_channel_key, set_recovery_key_hash, update_channel_key_last_used, update_channel_key_name, user_exists, verify_and_reset_admin_password, verify_user_password, ) from auth import ( init_app as init_auth, ) from flask import ( Flask, flash, jsonify, redirect, render_template, request, send_file, session, url_for, ) from PIL import Image from ssl_utils import ensure_certs os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0" os.environ["OMP_NUM_THREADS"] = "1" # Add parent to path for development sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) import stegasoo from stegasoo import ( CapacityError, DecryptionError, FilePayload, InvalidHeaderError, InvalidMagicBytesError, ReedSolomonError, StegasooError, export_rsa_key_pem, generate_credentials, generate_filename, get_channel_status, has_argon2, # Channel key functions (v4.0.0) has_dct_support, load_rsa_key, validate_channel_key, validate_file_payload, validate_image, validate_message, validate_passphrase, validate_pin, validate_rsa_key, validate_security_factors, ) from stegasoo.constants import ( DEFAULT_PASSPHRASE_WORDS, MAX_FILE_PAYLOAD_SIZE, MAX_FILE_SIZE, MAX_MESSAGE_CHARS, MAX_PIN_LENGTH, MAX_UPLOAD_SIZE, MIN_PASSPHRASE_WORDS, MIN_PIN_LENGTH, RECOMMENDED_PASSPHRASE_WORDS, TEMP_FILE_EXPIRY, TEMP_FILE_EXPIRY_MINUTES, THUMBNAIL_QUALITY, THUMBNAIL_SIZE, VALID_RSA_SIZES, __version__, ) # QR Code support try: import qrcode # noqa: F401 from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_M # noqa: F401 HAS_QRCODE = True except ImportError: HAS_QRCODE = False # QR Code reading try: from pyzbar.pyzbar import decode as pyzbar_decode # noqa: F401 HAS_QRCODE_READ = True except ImportError: HAS_QRCODE_READ = False # Import QR utilities # ============================================================================ # SUBPROCESS ISOLATION FOR STEGASOO OPERATIONS # ============================================================================ # # This is a critical reliability pattern. Here's the problem: # # scipy's DCT and jpeglib can crash (segfault) on: # - Malformed JPEG files # - Very large images that exhaust memory # - Certain edge cases in coefficient manipulation # # If these crash in the main Flask process, your whole server dies. # Users get a connection reset, and the service goes down. # # The solution: Run stegasoo operations in separate Python processes. # # Main Flask process Worker subprocess # ┌─────────────────┐ ┌─────────────────┐ # │ │ spawn │ │ # │ /api/encode │──────────────>│ encode() │ # │ │ │ │ # │ wait for │<──────────────│ return result │ # │ result │ or crash │ (or crash) │ # │ │ │ │ # │ handle error │ │ (process dies) │ # └─────────────────┘ └─────────────────┘ # # If the subprocess crashes, we catch the error and return a friendly message. # The main server keeps running. Users can try again with different input. # # The subprocess_stego module handles all the pickling/unpickling of data. from subprocess_stego import ( SubprocessStego, cleanup_progress_file, generate_job_id, get_progress_file_path, read_progress, ) from stegasoo.qr_utils import ( can_fit_in_qr, decompress_data, detect_and_crop_qr, extract_key_from_qr, generate_qr_code, is_compressed, ) # Initialize subprocess wrapper (worker script must be in same directory) subprocess_stego = SubprocessStego(timeout=180) # 3 minute timeout for large images # ============================================================================ # FLASK APP CONFIGURATION # ============================================================================ # # Flask configuration demonstrates several production patterns: # # 1. SECRET KEY PERSISTENCE # Flask uses secret_key to sign session cookies. If it changes, all users # get logged out. We save it to a file so it survives restarts. # # 2. CONTENT LENGTH LIMITS # MAX_CONTENT_LENGTH prevents DoS via huge uploads. Flask will reject # requests that exceed this before loading them into memory. # # 3. ENVIRONMENT-BASED CONFIG # Settings come from environment variables, allowing: # - Different settings per deployment (dev/staging/prod) # - Docker/systemd to inject config without code changes # - 12-factor app compliance # # 4. INSTANCE FOLDER # Flask's instance_path is for per-deployment data (databases, keys). # It's .gitignored by default - perfect for secrets. app = Flask(__name__) # Persist secret key so sessions survive restarts # Without this, every restart = everyone gets logged out _instance_path = Path(app.instance_path) _instance_path.mkdir(parents=True, exist_ok=True) _secret_key_file = _instance_path / ".secret_key" if _secret_key_file.exists(): app.secret_key = _secret_key_file.read_text().strip() else: # First run: generate a new key and save it app.secret_key = secrets.token_hex(32) # 256 bits of randomness _secret_key_file.write_text(app.secret_key) _secret_key_file.chmod(0o600) # Only owner can read # Reject uploads larger than this (prevents memory exhaustion) app.config["MAX_CONTENT_LENGTH"] = MAX_FILE_SIZE # Auth configuration from environment # STEGASOO_AUTH_ENABLED=false disables login (for local/dev use) app.config["AUTH_ENABLED"] = os.environ.get("STEGASOO_AUTH_ENABLED", "true").lower() == "true" app.config["HTTPS_ENABLED"] = os.environ.get("STEGASOO_HTTPS_ENABLED", "false").lower() == "true" # Initialize auth module (sets up session handling, user DB) init_auth(app) # ============================================================================ # ASYNC JOB MANAGEMENT (v4.1.2) # ============================================================================ # # Problem: DCT encoding a large image can take 30-60 seconds. # Solution: Run it in a background thread, let the client poll for progress. # # The flow: # # Client Server # ────── ────── # POST /api/encode/async ──────> Start background job # <────── Return job_id # # GET /api/encode/progress/123 ─> Check job status # <────── {"progress": 45, "phase": "embedding"} # # GET /api/encode/progress/123 ─> Check again # <────── {"status": "complete", "file_id": "abc"} # # GET /api/download/abc ────────> Download result # <────── Encoded image # # Why ThreadPoolExecutor instead of Celery/Redis? # - This runs on a Raspberry Pi with 1GB RAM # - We don't need distributed workers # - Keep it simple - threads are fine for 2 concurrent jobs # # The thread pool is limited to 2 workers because: # - Each encode loads the full image into memory # - Too many concurrent jobs = OOM on the Pi _executor = ThreadPoolExecutor(max_workers=2) # Job storage: job_id -> {status, result, error, file_id, created, ...} # We use a dict with a lock because threads access it concurrently _jobs = {} _jobs_lock = threading.Lock() def _store_job(job_id: str, data: dict) -> None: """Thread-safe job storage.""" with _jobs_lock: _jobs[job_id] = data def _get_job(job_id: str) -> dict | None: """Thread-safe job retrieval.""" with _jobs_lock: return _jobs.get(job_id) def _cleanup_old_jobs(max_age_seconds: int = 3600) -> None: """Remove jobs older than max_age_seconds.""" now = time.time() with _jobs_lock: to_remove = [ jid for jid, data in _jobs.items() if now - data.get("created", 0) > max_age_seconds ] for jid in to_remove: cleanup_progress_file(jid) del _jobs[jid] @app.before_request def require_setup(): """Force redirect to setup if no users exist (first-run).""" if not app.config.get("AUTH_ENABLED", True): return None # Skip for static files and setup-related routes if request.endpoint in ("static", "setup", "setup_recovery", None): return None # If no users exist, redirect to setup if not user_exists(): return redirect(url_for("setup")) return None # DEPRECATED: In-memory storage replaced by file-based temp_storage module # Kept for backwards compatibility during transition TEMP_FILES: dict[str, dict] = {} # Not used - see temp_storage.py THUMBNAIL_FILES: dict[str, bytes] = {} # Not used - see temp_storage.py # ============================================================================ # TEMPLATE CONTEXT PROCESSOR # ============================================================================ # # Context processors inject variables into EVERY template automatically. # Instead of passing the same data to every render_template() call: # # # Bad: repetitive and error-prone # return render_template("page.html", version=__version__, has_dct=...) # # We define it once here and it's available everywhere: # # # In any template: #

Version: {{ version }}

# {% if has_dct %}DCT mode available{% endif %} # # This is great for: # - Version numbers (show in footer) # - Feature flags (has_dct, auth_enabled) # - User info (username, is_admin) # - Global config (max sizes, limits) # # The function runs on EVERY request, so keep it fast. # Don't do expensive database queries here. @app.context_processor def inject_globals(): """Inject global variables into all templates.""" # Get channel status (v4.0.0) channel_status = get_channel_status() # Get saved channel keys for authenticated users (v4.2.0) saved_channel_keys = [] if is_authenticated(): current_user = get_current_user() if current_user: saved_channel_keys = get_user_channel_keys(current_user.id) return { "version": __version__, "max_message_chars": MAX_MESSAGE_CHARS, "max_payload_kb": MAX_FILE_PAYLOAD_SIZE // 1024, "max_upload_mb": MAX_UPLOAD_SIZE // (1024 * 1024), "temp_file_expiry_minutes": TEMP_FILE_EXPIRY_MINUTES, "min_pin_length": MIN_PIN_LENGTH, "max_pin_length": MAX_PIN_LENGTH, # NEW in v3.2.0 "min_passphrase_words": MIN_PASSPHRASE_WORDS, "recommended_passphrase_words": RECOMMENDED_PASSPHRASE_WORDS, "default_passphrase_words": DEFAULT_PASSPHRASE_WORDS, # NEW in v3.0 "has_dct": has_dct_support(), # NEW in v4.0.0 - Channel key status "channel_mode": channel_status["mode"], "channel_configured": channel_status["configured"], "channel_fingerprint": channel_status.get("fingerprint"), "channel_source": channel_status.get("source"), # NEW in v4.0.2 - Auth state "auth_enabled": app.config.get("AUTH_ENABLED", True), "is_authenticated": is_authenticated(), "username": get_username() if is_authenticated() else None, # NEW in v4.1.0 - Admin state "is_admin": is_admin(), # NEW in v4.2.0 - Saved channel keys "saved_channel_keys": saved_channel_keys, } # ============================================================================ # CONFIGURATION # ============================================================================ try: print(f"Stegasoo v{__version__} - Web Frontend") print(f"Current MAX_FILE_SIZE: {MAX_FILE_SIZE}") print(f"Current MAX_FILE_PAYLOAD_SIZE: {MAX_FILE_PAYLOAD_SIZE}") print(f"DCT support: {has_dct_support()}") print(f"QR code support: write={HAS_QRCODE}, read={HAS_QRCODE_READ}") # Channel key status (v4.0.0) channel_status = get_channel_status() print(f"Channel key: {channel_status['mode']} mode") if channel_status["configured"]: print(f" Fingerprint: {channel_status.get('fingerprint')}") print(f" Source: {channel_status.get('source')}") DESIRED_PAYLOAD_SIZE = 2 * 1024 * 1024 # 2MB if hasattr(stegasoo, "MAX_FILE_PAYLOAD_SIZE"): print(f"Overriding MAX_FILE_PAYLOAD_SIZE to {DESIRED_PAYLOAD_SIZE}") stegasoo.MAX_FILE_PAYLOAD_SIZE = DESIRED_PAYLOAD_SIZE except Exception as e: print(f"Could not override stegasoo limits: {e}") # ============================================================================ # CHANNEL KEY HELPER (v4.0.0) # ============================================================================ def resolve_channel_key_form(channel_key_value: str) -> str: """ Resolve channel key from form input. Wrapper around library's resolve_channel_key for subprocess compatibility. Returns string values for subprocess_stego ('auto', 'none', or explicit key). """ from stegasoo.channel import resolve_channel_key try: result = resolve_channel_key(channel_key_value) if result is None: return "auto" elif result == "": return "none" else: return result except (ValueError, FileNotFoundError): # Invalid format, fall back to auto return "auto" def generate_thumbnail(image_data: bytes, size: tuple = THUMBNAIL_SIZE) -> bytes: """Generate thumbnail from image data.""" try: with Image.open(io.BytesIO(image_data)) as img: # Convert to RGB if necessary (handle grayscale too) if img.mode in ("RGBA", "LA", "P"): # Create white background for transparent images background = Image.new("RGB", img.size, (255, 255, 255)) if img.mode == "P": img = img.convert("RGBA") background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) img = background elif img.mode == "L": # Convert grayscale to RGB for thumbnail img = img.convert("RGB") elif img.mode != "RGB": img = img.convert("RGB") # Create thumbnail img.thumbnail(size, Image.Resampling.LANCZOS) # Save to bytes buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=THUMBNAIL_QUALITY, optimize=True) return buffer.getvalue() except Exception as e: print(f"Thumbnail generation error: {e}") return None def cleanup_temp_files(): """Remove expired temporary files.""" temp_storage.cleanup_expired(TEMP_FILE_EXPIRY) def allowed_image(filename: str) -> bool: """Check if file has allowed image extension.""" if not filename or "." not in filename: return False ext = filename.rsplit(".", 1)[1].lower() return ext in {"png", "jpg", "jpeg", "bmp", "gif"} def format_size(size_bytes: int) -> str: """Format file size for display.""" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.1f} KB" else: return f"{size_bytes / (1024 * 1024):.1f} MB" # ============================================================================ # ROUTES # ============================================================================ @app.route("/") def index(): return render_template("index.html") # ============================================================================ # CHANNEL KEY API (v4.0.0) # ============================================================================ @app.route("/api/channel/status") @login_required def api_channel_status(): """ Get current channel key status (v4.0.0). Returns JSON with mode, fingerprint, and source. """ # Use subprocess for isolation result = subprocess_stego.get_channel_status(reveal=False) if result.success: return jsonify( { "success": True, "mode": result.mode, "configured": result.configured, "fingerprint": result.fingerprint, "source": result.source, } ) else: # Fallback to direct call if subprocess fails status = get_channel_status() return jsonify( { "success": True, "mode": status["mode"], "configured": status["configured"], "fingerprint": status.get("fingerprint"), "source": status.get("source"), } ) @app.route("/api/channel/validate", methods=["POST"]) @login_required def api_channel_validate(): """ Validate a channel key format (v4.0.0). Returns JSON with validation result. """ key = request.form.get("key", "") or (request.json.get("key", "") if request.is_json else "") if not key: return jsonify({"valid": False, "error": "No key provided"}) is_valid = validate_channel_key(key) if is_valid: fingerprint = f"{key[:4]}-••••-••••-••••-••••-••••-••••-{key[-4:]}" return jsonify( { "valid": True, "fingerprint": fingerprint, } ) else: return jsonify( { "valid": False, "error": "Invalid format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX", } ) # ============================================================================ # GENERATE # ============================================================================ @app.route("/generate", methods=["GET", "POST"]) @login_required def generate(): if request.method == "POST": # v3.2.0: Changed from words_per_phrase to words_per_passphrase, default increased to 4 words_per_passphrase = int( request.form.get("words_per_passphrase", DEFAULT_PASSPHRASE_WORDS) ) use_pin = request.form.get("use_pin") == "on" use_rsa = request.form.get("use_rsa") == "on" if not use_pin and not use_rsa: flash("You must select at least one security factor (PIN or RSA Key)", "error") return render_template("generate.html", generated=False, has_qrcode=HAS_QRCODE) pin_length = int(request.form.get("pin_length", 6)) rsa_bits = int(request.form.get("rsa_bits", 2048)) # Clamp values words_per_passphrase = max(MIN_PASSPHRASE_WORDS, min(12, words_per_passphrase)) pin_length = max(MIN_PIN_LENGTH, min(MAX_PIN_LENGTH, pin_length)) if rsa_bits not in VALID_RSA_SIZES: rsa_bits = 2048 try: # v3.2.0 FIX: Use correct parameter name 'passphrase_words' creds = generate_credentials( use_pin=use_pin, use_rsa=use_rsa, pin_length=pin_length, rsa_bits=rsa_bits, passphrase_words=words_per_passphrase, # FIX: was words_per_passphrase= ) # Store RSA key temporarily for QR generation qr_token = None qr_needs_compression = False qr_too_large = False if creds.rsa_key_pem and HAS_QRCODE: # Check if key fits in QR code if can_fit_in_qr(creds.rsa_key_pem, compress=True): qr_needs_compression = True else: qr_too_large = True if not qr_too_large: qr_token = secrets.token_urlsafe(16) cleanup_temp_files() temp_storage.save_temp_file(qr_token, creds.rsa_key_pem.encode(), { "filename": "rsa_key.pem", "type": "rsa_key", "compress": qr_needs_compression, }) # v3.2.0: Single passphrase instead of daily phrases return render_template( "generate.html", passphrase=creds.passphrase, # v3.2.0: Single passphrase pin=creds.pin, generated=True, words_per_passphrase=words_per_passphrase, pin_length=pin_length if use_pin else None, use_pin=use_pin, use_rsa=use_rsa, rsa_bits=rsa_bits, rsa_key_pem=creds.rsa_key_pem, passphrase_entropy=creds.passphrase_entropy, pin_entropy=creds.pin_entropy, rsa_entropy=creds.rsa_entropy, total_entropy=creds.total_entropy, has_qrcode=HAS_QRCODE, qr_token=qr_token, qr_needs_compression=qr_needs_compression, qr_too_large=qr_too_large, ) except Exception as e: flash(f"Error generating credentials: {e}", "error") return render_template("generate.html", generated=False, has_qrcode=HAS_QRCODE) return render_template("generate.html", generated=False, has_qrcode=HAS_QRCODE) @app.route("/generate/qr/") @login_required def generate_qr(token): """Generate QR code for RSA key.""" if not HAS_QRCODE: return "QR code support not available", 501 file_info = temp_storage.get_temp_file(token) if not file_info: return "Token expired or invalid", 404 if file_info.get("type") != "rsa_key": return "Invalid token type", 400 try: key_pem = file_info["data"].decode("utf-8") compress = file_info.get("compress", False) qr_png = generate_qr_code(key_pem, compress=compress) return send_file(io.BytesIO(qr_png), mimetype="image/png", as_attachment=False) except Exception as e: return f"Error generating QR code: {e}", 500 @app.route("/generate/qr-download/") @login_required def generate_qr_download(token): """Download QR code as PNG file.""" if not HAS_QRCODE: return "QR code support not available", 501 file_info = temp_storage.get_temp_file(token) if not file_info: return "Token expired or invalid", 404 if file_info.get("type") != "rsa_key": return "Invalid token type", 400 try: key_pem = file_info["data"].decode("utf-8") compress = file_info.get("compress", False) qr_png = generate_qr_code(key_pem, compress=compress) return send_file( io.BytesIO(qr_png), mimetype="image/png", as_attachment=True, download_name="stegasoo_rsa_key_qr.png", ) except Exception as e: return f"Error generating QR code: {e}", 500 @app.route("/qr/crop", methods=["POST"]) @login_required def qr_crop(): """ Detect and crop QR code from an image. Useful for extracting QR codes from photos taken at an angle, with extra background, etc. Returns the cropped QR as PNG. """ if not HAS_QRCODE_READ: return jsonify({"error": "QR code reading not available (install pyzbar)"}), 501 image_file = request.files.get("image") if not image_file: return jsonify({"error": "No image provided"}), 400 try: image_data = image_file.read() # Use the new crop function cropped = detect_and_crop_qr(image_data) if cropped is None: return jsonify({"error": "No QR code detected in image"}), 404 # Return as downloadable PNG or inline based on query param as_attachment = request.args.get("download", "").lower() in ("1", "true", "yes") return send_file( io.BytesIO(cropped), mimetype="image/png", as_attachment=as_attachment, download_name="cropped_qr.png", ) except Exception as e: return jsonify({"error": f"Error processing image: {e}"}), 500 @app.route("/generate/download-key", methods=["POST"]) @login_required def download_key(): """Download RSA key as password-protected PEM file.""" key_pem = request.form.get("key_pem", "") password = request.form.get("key_password", "") if not key_pem: flash("No key to download", "error") return redirect(url_for("generate")) if not password or len(password) < 8: flash("Password must be at least 8 characters", "error") return redirect(url_for("generate")) try: private_key = load_rsa_key(key_pem.encode("utf-8")) encrypted_pem = export_rsa_key_pem(private_key, password=password) key_id = secrets.token_hex(4) filename = f"stegasoo_key_{private_key.key_size}_{key_id}.pem" return send_file( io.BytesIO(encrypted_pem), mimetype="application/x-pem-file", as_attachment=True, download_name=filename, ) except Exception as e: flash(f"Error creating key file: {e}", "error") return redirect(url_for("generate")) @app.route("/extract-key-from-qr", methods=["POST"]) @login_required def extract_key_from_qr_route(): """ Extract RSA key from uploaded QR code image. Returns JSON with the extracted key or error. """ if not HAS_QRCODE_READ: return ( jsonify( { "success": False, "error": "QR code reading not available. Install pyzbar and libzbar.", } ), 501, ) qr_image = request.files.get("qr_image") if not qr_image: return jsonify({"success": False, "error": "No QR image provided"}), 400 try: image_data = qr_image.read() key_pem = extract_key_from_qr(image_data) if key_pem: return jsonify({"success": True, "key_pem": key_pem}) else: return jsonify({"success": False, "error": "No valid RSA key found in QR code"}), 400 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 # ============================================================================ # NEW in v3.0 - CAPACITY COMPARISON API # ============================================================================ @app.route("/api/compare-capacity", methods=["POST"]) @login_required def api_compare_capacity(): """ Compare LSB and DCT capacity for an uploaded carrier image. Returns JSON with capacity info for both modes. Uses subprocess isolation to prevent crashes. """ carrier = request.files.get("carrier") if not carrier: return jsonify({"error": "No carrier image provided"}), 400 try: carrier_data = carrier.read() # Use subprocess-isolated compare_modes result = subprocess_stego.compare_modes(carrier_data) if not result.success: return jsonify({"error": result.error or "Comparison failed"}), 500 return jsonify( { "success": True, "width": result.width, "height": result.height, "lsb": { "capacity_bytes": result.lsb["capacity_bytes"], "capacity_kb": round(result.lsb["capacity_kb"], 1), "output": result.lsb.get("output", "PNG"), }, "dct": { "capacity_bytes": result.dct["capacity_bytes"], "capacity_kb": round(result.dct["capacity_kb"], 1), "output": result.dct.get("output", "JPEG"), "available": result.dct.get("available", True), "ratio": round(result.dct.get("ratio_vs_lsb", 0), 1), }, } ) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/check-fit", methods=["POST"]) @login_required def api_check_fit(): """ Check if a payload will fit in the carrier with selected mode. Returns JSON with fit status and details. Uses subprocess isolation to prevent crashes. """ carrier = request.files.get("carrier") payload_size = request.form.get("payload_size", type=int) embed_mode = request.form.get("embed_mode", "lsb") if not carrier or payload_size is None: return jsonify({"error": "Missing carrier or payload_size"}), 400 if embed_mode not in ("lsb", "dct"): return jsonify({"error": "Invalid embed_mode"}), 400 if embed_mode == "dct" and not has_dct_support(): return jsonify({"error": "DCT mode requires scipy"}), 400 try: carrier_data = carrier.read() # Use subprocess-isolated capacity check result = subprocess_stego.check_capacity( carrier_data=carrier_data, payload_size=payload_size, embed_mode=embed_mode, ) if not result.success: return jsonify({"error": result.error or "Capacity check failed"}), 500 return jsonify( { "success": True, "fits": result.fits, "payload_size": result.payload_size, "capacity": result.capacity, "usage_percent": round(result.usage_percent, 1), "headroom": result.headroom, "mode": result.mode, } ) except Exception as e: return jsonify({"error": str(e)}), 500 # ============================================================================ # ENCODE # ============================================================================ def _run_encode_job(job_id: str, encode_params: dict) -> None: """Background thread function for async encode.""" progress_file = get_progress_file_path(job_id) try: _store_job(job_id, {"status": "running", "created": time.time()}) # Run encode with progress file if encode_params.get("file_data"): encode_result = subprocess_stego.encode( carrier_data=encode_params["carrier_data"], reference_data=encode_params["ref_data"], file_data=encode_params["file_data"], file_name=encode_params["file_name"], file_mime=encode_params["file_mime"], passphrase=encode_params["passphrase"], pin=encode_params.get("pin"), rsa_key_data=encode_params.get("rsa_key_data"), rsa_password=encode_params.get("key_password"), embed_mode=encode_params["embed_mode"], dct_output_format=encode_params.get("dct_output_format", "png"), dct_color_mode=encode_params.get("dct_color_mode", "color"), channel_key=encode_params.get("channel_key"), progress_file=progress_file, ) else: encode_result = subprocess_stego.encode( carrier_data=encode_params["carrier_data"], reference_data=encode_params["ref_data"], message=encode_params["message"], passphrase=encode_params["passphrase"], pin=encode_params.get("pin"), rsa_key_data=encode_params.get("rsa_key_data"), rsa_password=encode_params.get("key_password"), embed_mode=encode_params["embed_mode"], dct_output_format=encode_params.get("dct_output_format", "png"), dct_color_mode=encode_params.get("dct_color_mode", "color"), channel_key=encode_params.get("channel_key"), progress_file=progress_file, ) if not encode_result.success: _store_job( job_id, { "status": "error", "error": encode_result.error or "Encoding failed", "created": time.time(), }, ) return # Determine output format embed_mode = encode_params["embed_mode"] dct_output_format = encode_params.get("dct_output_format", "png") dct_color_mode = encode_params.get("dct_color_mode", "color") if embed_mode == "dct" and dct_output_format == "jpeg": output_ext = ".jpg" output_mime = "image/jpeg" else: output_ext = ".png" output_mime = "image/png" filename = encode_result.filename if not filename: filename = generate_filename("stego", output_ext) elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"): filename = filename[:-4] + ".jpg" # Store result file_id = secrets.token_urlsafe(16) temp_storage.save_temp_file(file_id, encode_result.stego_data, { "filename": filename, "embed_mode": embed_mode, "output_format": dct_output_format if embed_mode == "dct" else "png", "color_mode": dct_color_mode if embed_mode == "dct" else None, "mime_type": output_mime, "channel_mode": encode_result.channel_mode, "channel_fingerprint": encode_result.channel_fingerprint, }) _store_job( job_id, { "status": "complete", "file_id": file_id, "created": time.time(), }, ) except Exception as e: _store_job( job_id, { "status": "error", "error": str(e), "created": time.time(), }, ) finally: cleanup_progress_file(job_id) @app.route("/encode", methods=["GET", "POST"]) @login_required def encode_page(): if request.method == "POST": # Check if async mode requested is_async = request.form.get("async") == "true" or request.headers.get("X-Async") == "true" def _error_response(msg): """Return error as JSON (async) or HTML flash (sync).""" if is_async: return jsonify({"error": msg}), 400 flash(msg, "error") return render_template("encode.html", has_qrcode_read=HAS_QRCODE_READ) try: # Get files ref_photo = request.files.get("reference_photo") carrier = request.files.get("carrier") rsa_key_file = request.files.get("rsa_key") payload_file = request.files.get("payload_file") if not ref_photo or not carrier: return _error_response("Both reference photo and carrier image are required") if not allowed_image(ref_photo.filename) or not allowed_image(carrier.filename): return _error_response("Invalid file type. Use PNG, JPG, or BMP") # Get form data - v3.2.0: renamed from day_phrase to passphrase message = request.form.get("message", "") passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") payload_type = request.form.get("payload_type", "text") # NEW in v3.0 - Embedding mode embed_mode = request.form.get("embed_mode", "lsb") if embed_mode not in ("lsb", "dct"): embed_mode = "lsb" # NEW in v3.0.1 - DCT output format dct_output_format = request.form.get("dct_output_format", "png") if dct_output_format not in ("png", "jpeg"): dct_output_format = "png" # NEW in v3.0.1 - DCT color mode dct_color_mode = request.form.get("dct_color_mode", "color") if dct_color_mode not in ("grayscale", "color"): dct_color_mode = "color" # NEW in v4.0.0 - Channel key channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) # Check DCT availability if embed_mode == "dct" and not has_dct_support(): return _error_response("DCT mode requires scipy. Install with: pip install scipy") # Determine payload if payload_type == "file" and payload_file and payload_file.filename: # File payload file_data = payload_file.read() result = validate_file_payload(file_data, payload_file.filename) if not result.is_valid: return _error_response(result.error_message) mime_type, _ = mimetypes.guess_type(payload_file.filename) payload = FilePayload( data=file_data, filename=payload_file.filename, mime_type=mime_type ) else: # Text message result = validate_message(message) if not result.is_valid: return _error_response(result.error_message) payload = message # v3.2.0: Renamed from day_phrase if not passphrase: return _error_response("Passphrase is required") # v3.2.0: Validate passphrase result = validate_passphrase(passphrase) if not result.is_valid: return _error_response(result.error_message) # Show warning if passphrase is short if result.warning: flash(result.warning, "warning") # Read files ref_data = ref_photo.read() carrier_data = carrier.read() # Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False if rsa_key_pem: # Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd) if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) rsa_key_data = rsa_key_pem.encode("utf-8") rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() elif rsa_key_qr and rsa_key_qr.filename and HAS_QRCODE_READ: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True else: return _error_response("Could not extract RSA key from QR code image.") # Validate security factors result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: return _error_response(result.error_message) # Validate PIN if provided if pin: result = validate_pin(pin) if not result.is_valid: return _error_response(result.error_message) # Determine key password key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) # Validate RSA key if provided if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: return _error_response(result.error_message) # Validate carrier image result = validate_image(carrier_data, "Carrier image") if not result.is_valid: return _error_response(result.error_message) # Pre-check payload capacity BEFORE encode (fail fast) from stegasoo.steganography import will_fit_by_mode payload_size = ( len(payload.data) if hasattr(payload, "data") else len(payload.encode("utf-8")) ) fit_check = will_fit_by_mode(payload_size, carrier_data, embed_mode=embed_mode) if not fit_check.get("fits", True): error_msg = ( f"Payload too large for {embed_mode.upper()} mode. " f"Payload: {payload_size:,} bytes, " f"Capacity: {fit_check.get('capacity', 0):,} bytes" ) # Suggest alternative mode if embed_mode == "dct": alt_check = will_fit_by_mode(payload_size, carrier_data, embed_mode="lsb") if alt_check.get("fits"): error_msg += " - Try LSB mode instead." return _error_response(error_msg) # Build encode params for either sync or async encode_params = { "carrier_data": carrier_data, "ref_data": ref_data, "passphrase": passphrase, "pin": pin if pin else None, "rsa_key_data": rsa_key_data, "key_password": key_password, "embed_mode": embed_mode, "dct_output_format": dct_output_format if embed_mode == "dct" else "png", "dct_color_mode": dct_color_mode if embed_mode == "dct" else "color", "channel_key": channel_key, } if payload_type == "file" and payload_file and payload_file.filename: encode_params["file_data"] = payload.data encode_params["file_name"] = payload.filename encode_params["file_mime"] = payload.mime_type else: encode_params["message"] = payload # ASYNC MODE: Start background job and return JSON if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_encode_job, job_id, encode_params) return jsonify({"job_id": job_id, "status": "pending"}) # SYNC MODE: Run inline (original behavior) if payload_type == "file" and payload_file and payload_file.filename: encode_result = subprocess_stego.encode( carrier_data=carrier_data, reference_data=ref_data, file_data=payload.data, file_name=payload.filename, file_mime=payload.mime_type, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, dct_output_format=dct_output_format if embed_mode == "dct" else "png", dct_color_mode=dct_color_mode if embed_mode == "dct" else "color", channel_key=channel_key, ) else: encode_result = subprocess_stego.encode( carrier_data=carrier_data, reference_data=ref_data, message=payload, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, dct_output_format=dct_output_format if embed_mode == "dct" else "png", dct_color_mode=dct_color_mode if embed_mode == "dct" else "color", channel_key=channel_key, ) # Check for subprocess errors if not encode_result.success: error_msg = encode_result.error or "Encoding failed" if "capacity" in error_msg.lower(): raise CapacityError(error_msg) raise StegasooError(error_msg) # Determine actual output format for filename and storage if embed_mode == "dct" and dct_output_format == "jpeg": output_ext = ".jpg" output_mime = "image/jpeg" else: output_ext = ".png" output_mime = "image/png" # Use filename from result or generate one filename = encode_result.filename if not filename: filename = generate_filename("stego", output_ext) elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"): filename = filename[:-4] + ".jpg" # Store temporarily file_id = secrets.token_urlsafe(16) cleanup_temp_files() temp_storage.save_temp_file(file_id, encode_result.stego_data, { "filename": filename, "embed_mode": embed_mode, "output_format": dct_output_format if embed_mode == "dct" else "png", "color_mode": dct_color_mode if embed_mode == "dct" else None, "mime_type": output_mime, # Channel info (v4.0.0) "channel_mode": encode_result.channel_mode, "channel_fingerprint": encode_result.channel_fingerprint, }) return redirect(url_for("encode_result", file_id=file_id)) except CapacityError as e: return _error_response(str(e)) except StegasooError as e: return _error_response(str(e)) except Exception as e: return _error_response(f"Error: {e}") return render_template("encode.html", has_qrcode_read=HAS_QRCODE_READ) # ============================================================================ # ENCODE PROGRESS ENDPOINTS (v4.1.2) # ============================================================================ @app.route("/encode/status/") @login_required def encode_status(job_id): """Get the status of an async encode job.""" job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 response = {"status": job.get("status", "unknown")} if job["status"] == "complete": response["file_id"] = job.get("file_id") elif job["status"] == "error": response["error"] = job.get("error", "Unknown error") return jsonify(response) @app.route("/encode/progress/") @login_required def encode_progress(job_id): """Get the progress of an async encode job.""" progress = read_progress(job_id) if progress: return jsonify(progress) # No progress file yet - check job status job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 if job["status"] == "complete": return jsonify({"percent": 100, "phase": "complete"}) elif job["status"] == "error": return jsonify({"percent": 0, "phase": "error", "error": job.get("error")}) elif job["status"] == "pending": return jsonify({"percent": 0, "phase": "starting"}) # Running but no progress file yet return jsonify({"percent": 0, "phase": "initializing"}) @app.route("/encode/result/") @login_required def encode_result(file_id): file_info = temp_storage.get_temp_file(file_id) if not file_info: flash("File expired or not found. Please encode again.", "error") return redirect(url_for("encode_page")) # Generate thumbnail thumbnail_data = generate_thumbnail(file_info["data"]) thumbnail_id = None if thumbnail_data: thumbnail_id = f"{file_id}_thumb" temp_storage.save_thumbnail(thumbnail_id, thumbnail_data) return render_template( "encode_result.html", file_id=file_id, filename=file_info["filename"], thumbnail_url=url_for("encode_thumbnail", thumb_id=thumbnail_id) if thumbnail_id else None, embed_mode=file_info.get("embed_mode", "lsb"), output_format=file_info.get("output_format", "png"), color_mode=file_info.get("color_mode"), # Channel info (v4.0.0) channel_mode=file_info.get("channel_mode", "public"), channel_fingerprint=file_info.get("channel_fingerprint"), ) @app.route("/encode/thumbnail/") @login_required def encode_thumbnail(thumb_id): """Serve thumbnail image.""" thumb_data = temp_storage.get_thumbnail(thumb_id) if not thumb_data: return "Thumbnail not found", 404 return send_file( io.BytesIO(thumb_data), mimetype="image/jpeg", as_attachment=False ) @app.route("/encode/download/") @login_required def encode_download(file_id): file_info = temp_storage.get_temp_file(file_id) if not file_info: flash("File expired or not found.", "error") return redirect(url_for("encode_page")) mime_type = file_info.get("mime_type", "image/png") return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=True, download_name=file_info["filename"], ) @app.route("/encode/file/") @login_required def encode_file_route(file_id): """Serve file for Web Share API.""" file_info = temp_storage.get_temp_file(file_id) if not file_info: return "Not found", 404 mime_type = file_info.get("mime_type", "image/png") return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=False, download_name=file_info["filename"], ) @app.route("/encode/cleanup/", methods=["POST"]) @login_required def encode_cleanup(file_id): """Manually cleanup a file after sharing.""" temp_storage.delete_temp_file(file_id) # Also cleanup thumbnail if exists thumb_id = f"{file_id}_thumb" temp_storage.delete_thumbnail(thumb_id) return jsonify({"status": "ok"}) # ============================================================================ # DECODE # ============================================================================ def _run_decode_job(job_id: str, decode_params: dict) -> None: """Background thread function for async decode.""" progress_file = get_progress_file_path(job_id) try: _store_job(job_id, {"status": "running", "created": time.time()}) # Run decode with progress file decode_result = subprocess_stego.decode( stego_data=decode_params["stego_data"], reference_data=decode_params["ref_data"], passphrase=decode_params["passphrase"], pin=decode_params.get("pin"), rsa_key_data=decode_params.get("rsa_key_data"), rsa_password=decode_params.get("rsa_password"), embed_mode=decode_params.get("embed_mode", "auto"), channel_key=decode_params.get("channel_key"), progress_file=progress_file, ) if not decode_result.success: _store_job( job_id, { "status": "error", "error": decode_result.error or "Decoding failed", "error_type": decode_result.error_type, "created": time.time(), }, ) return # Store result based on type if decode_result.is_file: file_id = secrets.token_urlsafe(16) filename = decode_result.filename or "decoded_file" temp_storage.save_temp_file(file_id, decode_result.file_data, { "filename": filename, "mime_type": decode_result.mime_type, }) _store_job( job_id, { "status": "complete", "file_id": file_id, "is_file": True, "filename": filename, "file_size": len(decode_result.file_data), "mime_type": decode_result.mime_type, "created": time.time(), }, ) else: _store_job( job_id, { "status": "complete", "is_file": False, "message": decode_result.message, "created": time.time(), }, ) except Exception as e: _store_job( job_id, { "status": "error", "error": str(e), "created": time.time(), }, ) finally: cleanup_progress_file(job_id) @app.route("/decode", methods=["GET", "POST"]) @login_required def decode_page(): if request.method == "POST": try: # Get files ref_photo = request.files.get("reference_photo") stego_image = request.files.get("stego_image") rsa_key_file = request.files.get("rsa_key") if not ref_photo or not stego_image: flash("Both reference photo and stego image are required", "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) # Get form data - v3.2.0: renamed from day_phrase to passphrase passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") # NEW in v3.0 - Extraction mode embed_mode = request.form.get("embed_mode", "auto") if embed_mode not in ("auto", "lsb", "dct"): embed_mode = "auto" # NEW in v4.0.0 - Channel key channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) # Check DCT availability if embed_mode == "dct" and not has_dct_support(): flash("DCT mode requires scipy. Install with: pip install scipy", "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) # v3.2.0: Removed date handling (no stego_date needed) # v3.2.0: Renamed from day_phrase if not passphrase: flash("Passphrase is required", "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) # Read files ref_data = ref_photo.read() stego_data = stego_image.read() # Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False if rsa_key_pem: # Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd) if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) rsa_key_data = rsa_key_pem.encode("utf-8") rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() elif rsa_key_qr and rsa_key_qr.filename and HAS_QRCODE_READ: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True else: flash("Could not extract RSA key from QR code image.", "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) # Validate security factors result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: flash(result.error_message, "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) # Validate PIN if provided if pin: result = validate_pin(pin) if not result.is_valid: flash(result.error_message, "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) # Determine key password key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) # Validate RSA key if provided if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: flash(result.error_message, "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) # Check for async mode (v4.1.5) is_async = request.form.get("async") == "true" or request.headers.get("X-Async") == "true" # Build decode params decode_params = { "stego_data": stego_data, "ref_data": ref_data, "passphrase": passphrase, "pin": pin if pin else None, "rsa_key_data": rsa_key_data, "rsa_password": key_password, "embed_mode": embed_mode, "channel_key": channel_key, } # ASYNC MODE: Start background job and return JSON if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_decode_job, job_id, decode_params) return jsonify({"job_id": job_id, "status": "pending"}) # SYNC MODE: Run inline (original behavior) # v4.0.0: Include channel_key parameter # Use subprocess-isolated decode to prevent crashes decode_result = subprocess_stego.decode( stego_data=stego_data, reference_data=ref_data, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, channel_key=channel_key, # v4.0.0 ) # Check for subprocess errors if not decode_result.success: error_msg = decode_result.error or "Decoding failed" # Check for channel key related errors if "channel key" in error_msg.lower(): flash(error_msg, "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) if "decrypt" in error_msg.lower() or decode_result.error_type == "DecryptionError": raise DecryptionError(error_msg) raise StegasooError(error_msg) if decode_result.is_file: # File content - store temporarily for download file_id = secrets.token_urlsafe(16) cleanup_temp_files() filename = decode_result.filename or "decoded_file" temp_storage.save_temp_file(file_id, decode_result.file_data, { "filename": filename, "mime_type": decode_result.mime_type, }) return render_template( "decode.html", decoded_file=True, file_id=file_id, filename=filename, file_size=format_size(len(decode_result.file_data)), mime_type=decode_result.mime_type, has_qrcode_read=HAS_QRCODE_READ, ) else: # Text content return render_template( "decode.html", decoded_message=decode_result.message, has_qrcode_read=HAS_QRCODE_READ, ) except InvalidMagicBytesError: flash( "This doesn't appear to be a Stegasoo image. Try a different mode (LSB/DCT).", "warning", ) return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) except ReedSolomonError: flash( "Image too corrupted to decode. It may have been re-saved or compressed.", "error", ) return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) except InvalidHeaderError: flash( "Invalid or corrupted header. The image may have been modified.", "error", ) return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) except DecryptionError: flash( "Wrong credentials. Double-check your reference photo, passphrase, PIN, and channel key.", "warning", ) return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) except StegasooError as e: flash(str(e), "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) except Exception as e: flash(f"Error: {e}", "error") return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ) @app.route("/decode/download/") @login_required def decode_download(file_id): """Download decoded file.""" file_info = temp_storage.get_temp_file(file_id) if not file_info: flash("File expired or not found.", "error") return redirect(url_for("decode_page")) mime_type = file_info.get("mime_type", "application/octet-stream") return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=True, download_name=file_info["filename"], ) # ============================================================================ # DECODE PROGRESS ENDPOINTS (v4.1.5) # ============================================================================ @app.route("/decode/status/") @login_required def decode_status(job_id): """Get the status of an async decode job.""" job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 response = {"status": job.get("status", "unknown")} if job["status"] == "complete": response["is_file"] = job.get("is_file", False) if job.get("is_file"): response["file_id"] = job.get("file_id") response["filename"] = job.get("filename") response["file_size"] = job.get("file_size") response["mime_type"] = job.get("mime_type") else: response["message"] = job.get("message") elif job["status"] == "error": response["error"] = job.get("error", "Unknown error") response["error_type"] = job.get("error_type") return jsonify(response) @app.route("/decode/progress/") @login_required def decode_progress(job_id): """Get the progress of an async decode job.""" progress = read_progress(job_id) if progress: return jsonify(progress) # No progress file yet - check job status job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 if job["status"] == "complete": return jsonify({"percent": 100, "phase": "complete"}) elif job["status"] == "error": return jsonify({"percent": 0, "phase": "error", "error": job.get("error")}) elif job["status"] == "pending": return jsonify({"percent": 0, "phase": "starting"}) # Running but no progress file yet return jsonify({"percent": 5, "phase": "reading"}) @app.route("/decode/result/") @login_required def decode_result(job_id): """Get the result page for an async decode job.""" job = _get_job(job_id) if not job: flash("Job not found or expired.", "error") return redirect(url_for("decode_page")) if job["status"] != "complete": flash("Decode not complete.", "error") return redirect(url_for("decode_page")) if job.get("is_file"): return render_template( "decode.html", decoded_file=True, file_id=job.get("file_id"), filename=job.get("filename"), file_size=format_size(job.get("file_size", 0)), mime_type=job.get("mime_type"), has_qrcode_read=HAS_QRCODE_READ, ) else: return render_template( "decode.html", decoded_message=job.get("message"), has_qrcode_read=HAS_QRCODE_READ, ) @app.route("/about") def about(): from stegasoo.channel import get_channel_status channel_status = get_channel_status() # Check if user is admin (for QR sharing) current_user = get_current_user() is_admin = current_user.is_admin if current_user else False return render_template( "about.html", has_argon2=has_argon2(), has_qrcode_read=HAS_QRCODE_READ, # Channel info (bugfix - was not being passed) channel_configured=channel_status["configured"], channel_fingerprint=channel_status.get("fingerprint"), channel_source=channel_status.get("source"), # Admin check for QR sharing is_admin=is_admin, ) # ============================================================================ # TOOLS ROUTES (v4.1.0) # ============================================================================ @app.route("/tools") @login_required def tools(): """Advanced tools page.""" return render_template("tools.html", has_dct=has_dct_support()) @app.route("/api/tools/capacity", methods=["POST"]) @login_required def api_tools_capacity(): """Calculate image capacity for steganography.""" from stegasoo.dct_steganography import estimate_capacity_comparison carrier = request.files.get("image") if not carrier: return jsonify({"success": False, "error": "No image provided"}), 400 try: image_data = carrier.read() result = estimate_capacity_comparison(image_data) result["success"] = True result["filename"] = carrier.filename result["megapixels"] = round((result["width"] * result["height"]) / 1_000_000, 2) return jsonify(result) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 @app.route("/api/tools/strip-metadata", methods=["POST"]) @login_required def api_tools_strip_metadata(): """Strip EXIF/metadata from image.""" import io from stegasoo.utils import strip_image_metadata image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 try: image_data = image_file.read() clean_data = strip_image_metadata(image_data, output_format="PNG") buffer = io.BytesIO(clean_data) filename = image_file.filename.rsplit(".", 1)[0] + "_clean.png" return send_file(buffer, mimetype="image/png", as_attachment=True, download_name=filename) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 @app.route("/api/tools/exif", methods=["POST"]) @login_required def api_tools_exif(): """Read EXIF metadata from image.""" from stegasoo.utils import read_image_exif image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 try: image_data = image_file.read() exif = read_image_exif(image_data) # Check if it's a JPEG (editable) or not is_jpeg = image_data[:2] == b"\xff\xd8" return jsonify( { "success": True, "filename": image_file.filename, "exif": exif, "editable": is_jpeg, "field_count": len(exif), } ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 @app.route("/api/tools/exif/update", methods=["POST"]) @login_required def api_tools_exif_update(): """Update EXIF fields in image.""" from stegasoo.utils import write_image_exif image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 # Get updates from form data updates_json = request.form.get("updates", "{}") try: import json updates = json.loads(updates_json) except json.JSONDecodeError: return jsonify({"success": False, "error": "Invalid updates JSON"}), 400 if not updates: return jsonify({"success": False, "error": "No updates provided"}), 400 try: image_data = image_file.read() updated_data = write_image_exif(image_data, updates) # Return as downloadable file buffer = io.BytesIO(updated_data) return send_file( buffer, mimetype="image/jpeg", as_attachment=True, download_name=f"exif_{image_file.filename}", ) except ValueError as e: return jsonify({"success": False, "error": str(e)}), 400 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/exif/clear", methods=["POST"]) @login_required def api_tools_exif_clear(): """Remove all EXIF metadata from image.""" from stegasoo.utils import strip_image_metadata image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 # Get desired output format (default to PNG for lossless) output_format = request.form.get("format", "PNG").upper() if output_format not in ("PNG", "JPEG", "BMP"): output_format = "PNG" try: image_data = image_file.read() clean_data = strip_image_metadata(image_data, output_format=output_format) # Determine extension and mimetype ext_map = { "PNG": ("png", "image/png"), "JPEG": ("jpg", "image/jpeg"), "BMP": ("bmp", "image/bmp"), } ext, mimetype = ext_map.get(output_format, ("png", "image/png")) # Return as downloadable file stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) buffer = io.BytesIO(clean_data) return send_file( buffer, mimetype=mimetype, as_attachment=True, download_name=f"{stem}_clean.{ext}", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/rotate", methods=["POST"]) @login_required def api_tools_rotate(): """Rotate and/or flip an image.""" from PIL import Image image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 rotation = int(request.form.get("rotation", 0)) flip_h = request.form.get("flip_h", "false").lower() == "true" flip_v = request.form.get("flip_v", "false").lower() == "true" try: img = Image.open(io.BytesIO(image_file.read())) # Apply rotation (PIL rotates counter-clockwise, so negate) if rotation: img = img.rotate(-rotation, expand=True) # Apply flips if flip_h: img = img.transpose(Image.FLIP_LEFT_RIGHT) if flip_v: img = img.transpose(Image.FLIP_TOP_BOTTOM) # Output as PNG (lossless) buffer = io.BytesIO() img.save(buffer, format="PNG") buffer.seek(0) stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) return send_file( buffer, mimetype="image/png", as_attachment=True, download_name=f"{stem}_transformed.png", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/compress", methods=["POST"]) @login_required def api_tools_compress(): """Compress image to JPEG at specified quality.""" from PIL import Image image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 quality = int(request.form.get("quality", 85)) quality = max(10, min(100, quality)) # Clamp to valid range try: img = Image.open(io.BytesIO(image_file.read())) # Convert to RGB if necessary (JPEG doesn't support alpha) if img.mode in ("RGBA", "LA", "P"): img = img.convert("RGB") buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=quality) buffer.seek(0) stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) return send_file( buffer, mimetype="image/jpeg", as_attachment=True, download_name=f"{stem}_q{quality}.jpg", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/convert", methods=["POST"]) @login_required def api_tools_convert(): """Convert image to different format.""" from PIL import Image image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 output_format = request.form.get("format", "PNG").upper() quality = int(request.form.get("quality", 90)) quality = max(10, min(100, quality)) # Validate format format_map = { "PNG": ("png", "image/png"), "JPEG": ("jpg", "image/jpeg"), "WEBP": ("webp", "image/webp"), } if output_format not in format_map: return jsonify({"success": False, "error": f"Unsupported format: {output_format}"}), 400 try: img = Image.open(io.BytesIO(image_file.read())) # Convert to RGB for JPEG (no alpha) if output_format == "JPEG" and img.mode in ("RGBA", "LA", "P"): img = img.convert("RGB") buffer = io.BytesIO() save_kwargs = {"format": output_format} if output_format in ("JPEG", "WEBP"): save_kwargs["quality"] = quality img.save(buffer, **save_kwargs) buffer.seek(0) ext, mimetype = format_map[output_format] stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) return send_file( buffer, mimetype=mimetype, as_attachment=True, download_name=f"{stem}.{ext}", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 # Add these two test routes anywhere in app.py after the app = Flask(...) line: @app.route("/test-capacity", methods=["POST"]) def test_capacity(): """Minimal capacity test - no stegasoo code, just PIL.""" carrier = request.files.get("carrier") if not carrier: return jsonify({"error": "No carrier image provided"}), 400 try: carrier_data = carrier.read() buffer = io.BytesIO(carrier_data) img = Image.open(buffer) width, height = img.size fmt = img.format img.close() buffer.close() pixels = width * height lsb_bytes = (pixels * 3) // 8 dct_bytes = ((width // 8) * (height // 8) * 16) // 8 - 10 return jsonify( { "success": True, "width": width, "height": height, "format": fmt, "lsb_kb": round(lsb_bytes / 1024, 1), "dct_kb": round(dct_bytes / 1024, 1), } ) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/test-capacity-nopil", methods=["POST"]) def test_capacity_nopil(): """Ultra-minimal test - no PIL, no stegasoo.""" carrier = request.files.get("carrier") if not carrier: return jsonify({"error": "No carrier image provided"}), 400 carrier_data = carrier.read() return jsonify( { "success": True, "data_size": len(carrier_data), } ) # ============================================================================ # AUTHENTICATION ROUTES (v4.0.2) # ============================================================================ @app.route("/login", methods=["GET", "POST"]) def login(): """Login page.""" if not app.config.get("AUTH_ENABLED", True): return redirect(url_for("index")) if not user_exists(): return redirect(url_for("setup")) if is_authenticated(): return redirect(url_for("index")) if request.method == "POST": username = request.form.get("username", "") password = request.form.get("password", "") user = verify_user_password(username, password) if user: login_user(user) session.permanent = True flash("Login successful", "success") return redirect(url_for("index")) else: flash("Invalid username or password", "error") return render_template("login.html") @app.route("/logout") def logout(): """Logout and clear session.""" logout_user() flash("Logged out successfully", "success") return redirect(url_for("index")) @app.route("/setup", methods=["GET", "POST"]) def setup(): """First-run setup page - create admin account (Step 1).""" if not app.config.get("AUTH_ENABLED", True): return redirect(url_for("index")) if user_exists(): return redirect(url_for("login")) if request.method == "POST": username = request.form.get("username", "admin") password = request.form.get("password", "") password_confirm = request.form.get("password_confirm", "") if password != password_confirm: flash("Passwords do not match", "error") else: success, message = create_admin_user(username, password) if success: # Auto-login the new admin user = verify_user_password(username, password) if user: login_user(user) session.permanent = True # Redirect to recovery key setup (Step 2) return redirect(url_for("setup_recovery")) else: flash(message, "error") return render_template("setup.html") @app.route("/setup/recovery", methods=["GET", "POST"]) @login_required def setup_recovery(): """Recovery key setup page (Step 2 of initial setup).""" import base64 from stegasoo.recovery import generate_recovery_key, generate_recovery_qr, hash_recovery_key # Only allow during initial setup (no recovery key yet, first admin) if has_recovery_key(): return redirect(url_for("index")) current_user = get_current_user() if current_user.role != "admin": return redirect(url_for("index")) if request.method == "POST": action = request.form.get("action") if action == "skip": # No recovery key - most secure but no way to recover flash("Setup complete. No recovery key configured.", "warning") return redirect(url_for("index")) elif action == "save": # User confirmed they saved the key recovery_key = request.form.get("recovery_key") if recovery_key: key_hash = hash_recovery_key(recovery_key) set_recovery_key_hash(key_hash) flash("Setup complete. Recovery key saved.", "success") return redirect(url_for("index")) # Generate a new key to show recovery_key = generate_recovery_key() # Generate QR code as base64 try: qr_bytes = generate_recovery_qr(recovery_key) qr_base64 = base64.b64encode(qr_bytes).decode("utf-8") except ImportError: qr_base64 = None return render_template( "setup_recovery.html", recovery_key=recovery_key, qr_base64=qr_base64, ) @app.route("/recover", methods=["GET", "POST"]) def recover(): """Password recovery page - reset password using recovery key.""" # Don't show if no recovery key configured if not get_recovery_key_hash(): flash("No recovery key configured for this instance", "error") return redirect(url_for("login")) if request.method == "POST": recovery_key = request.form.get("recovery_key", "").strip() new_password = request.form.get("new_password", "") new_password_confirm = request.form.get("new_password_confirm", "") if not recovery_key: flash("Please enter your recovery key", "error") elif new_password != new_password_confirm: flash("Passwords do not match", "error") elif len(new_password) < 8: flash("Password must be at least 8 characters", "error") else: success, message = verify_and_reset_admin_password(recovery_key, new_password) if success: flash("Password reset successfully. Please login.", "success") return redirect(url_for("login")) else: flash(message, "error") return render_template("recover.html") @app.route("/account/recovery/regenerate", methods=["GET", "POST"]) @login_required @admin_required def regenerate_recovery(): """Generate a new recovery key (replaces existing one).""" import base64 from stegasoo.recovery import generate_recovery_key, generate_recovery_qr, hash_recovery_key if request.method == "POST": action = request.form.get("action") if action == "cancel": flash("Recovery key generation cancelled", "warning") return redirect(url_for("account")) elif action == "save": # User confirmed they saved the key recovery_key = request.form.get("recovery_key") if recovery_key: key_hash = hash_recovery_key(recovery_key) set_recovery_key_hash(key_hash) flash("New recovery key saved successfully", "success") return redirect(url_for("account")) # Generate a new key to show recovery_key = generate_recovery_key() # Generate QR code as base64 try: qr_bytes = generate_recovery_qr(recovery_key) qr_base64 = base64.b64encode(qr_bytes).decode("utf-8") except ImportError: qr_base64 = None return render_template( "regenerate_recovery.html", recovery_key=recovery_key, qr_base64=qr_base64, has_existing=has_recovery_key(), ) @app.route("/account/recovery/disable", methods=["POST"]) @login_required @admin_required def disable_recovery(): """Disable recovery key (no password reset possible).""" if clear_recovery_key(): flash("Recovery key disabled. Password reset is no longer possible.", "warning") else: flash("No recovery key was configured", "error") return redirect(url_for("account")) @app.route("/account/recovery/stego-backup", methods=["POST"]) @login_required @admin_required def create_stego_backup(): """Create stego backup - hide recovery key in an image.""" from stegasoo.recovery import create_stego_backup as make_backup recovery_key = request.form.get("recovery_key", "") if not recovery_key: flash("No recovery key provided", "error") return redirect(url_for("regenerate_recovery")) if "carrier_image" not in request.files: flash("No image uploaded", "error") return redirect(url_for("regenerate_recovery")) carrier_file = request.files["carrier_image"] if not carrier_file.filename: flash("No image selected", "error") return redirect(url_for("regenerate_recovery")) try: carrier_data = carrier_file.read() stego_data = make_backup(recovery_key, carrier_data) # Return as downloadable PNG buffer = io.BytesIO(stego_data) return send_file( buffer, mimetype="image/png", as_attachment=True, download_name="stegasoo-recovery-backup.png", ) except ValueError as e: flash(str(e), "error") return redirect(url_for("regenerate_recovery")) @app.route("/recover/stego", methods=["POST"]) def recover_from_stego(): """Extract recovery key from stego backup image.""" from stegasoo.recovery import extract_stego_backup if "stego_image" not in request.files or "reference_image" not in request.files: flash("Both stego image and reference image are required", "error") return redirect(url_for("recover")) stego_file = request.files["stego_image"] reference_file = request.files["reference_image"] if not stego_file.filename or not reference_file.filename: flash("Both images must be selected", "error") return redirect(url_for("recover")) try: stego_data = stego_file.read() reference_data = reference_file.read() extracted_key = extract_stego_backup(stego_data, reference_data) if extracted_key: # Return the key to pre-fill the recovery form return render_template("recover.html", prefilled_key=extracted_key) else: flash("Could not extract recovery key. Check images are correct.", "error") return redirect(url_for("recover")) except Exception as e: flash(f"Extraction failed: {e}", "error") return redirect(url_for("recover")) @app.route("/account", methods=["GET", "POST"]) @login_required def account(): """Account management page.""" current_user = get_current_user() if request.method == "POST": current = request.form.get("current_password", "") new = request.form.get("new_password", "") new_confirm = request.form.get("new_password_confirm", "") if new != new_confirm: flash("New passwords do not match", "error") else: success, message = change_password(current_user.id, current, new) flash(message, "success" if success else "error") # Get saved channel keys channel_keys = get_user_channel_keys(current_user.id) return render_template( "account.html", username=current_user.username, user=current_user, is_admin=current_user.is_admin, has_recovery=has_recovery_key(), channel_keys=channel_keys, max_channel_keys=MAX_CHANNEL_KEYS, can_save_key=can_save_channel_key(current_user.id), ) # ============================================================================ # CHANNEL KEY MANAGEMENT ROUTES (v4.2.0) # ============================================================================ @app.route("/account/keys/save", methods=["POST"]) @login_required def account_save_key(): """Save a new channel key.""" current_user = get_current_user() name = request.form.get("key_name", "").strip() channel_key = request.form.get("channel_key", "").strip() # Normalize key format (remove dashes if present) channel_key = channel_key.replace("-", "").lower() success, message, key = save_channel_key(current_user.id, name, channel_key) flash(message, "success" if success else "error") return redirect(url_for("account")) @app.route("/account/keys//delete", methods=["POST"]) @login_required def account_delete_key(key_id): """Delete a saved channel key.""" current_user = get_current_user() success, message = delete_channel_key(key_id, current_user.id) flash(message, "success" if success else "error") return redirect(url_for("account")) @app.route("/account/keys//rename", methods=["POST"]) @login_required def account_rename_key(key_id): """Rename a saved channel key.""" current_user = get_current_user() new_name = request.form.get("new_name", "").strip() success, message = update_channel_key_name(key_id, current_user.id, new_name) flash(message, "success" if success else "error") return redirect(url_for("account")) @app.route("/api/channel/keys") @login_required def api_channel_keys(): """Get saved channel keys for current user (JSON API).""" current_user = get_current_user() keys = get_user_channel_keys(current_user.id) return jsonify( { "success": True, "keys": [ { "id": k.id, "name": k.name, "fingerprint": f"{k.channel_key[:4]}...{k.channel_key[-4:]}", "channel_key": k.channel_key, "last_used_at": k.last_used_at, } for k in keys ], "can_save": can_save_channel_key(current_user.id), "max_keys": MAX_CHANNEL_KEYS, } ) @app.route("/api/channel/keys//use", methods=["POST"]) @login_required def api_channel_key_use(key_id): """Mark a channel key as used (updates last_used_at).""" current_user = get_current_user() key = get_channel_key_by_id(key_id, current_user.id) if not key: return jsonify({"success": False, "error": "Key not found"}), 404 update_channel_key_last_used(key_id, current_user.id) return jsonify({"success": True}) # ============================================================================ # ADMIN ROUTES (v4.1.0) # ============================================================================ @app.route("/admin/settings") @admin_required def admin_settings(): """System settings page (admin only).""" import platform import sys from stegasoo import __version__ from stegasoo.channel import get_channel_status channel_status = get_channel_status() return render_template( "admin/settings.html", # Channel info (key hidden until password verified) channel_configured=channel_status["configured"], channel_fingerprint=channel_status.get("fingerprint"), channel_source=channel_status.get("source"), # Server config hostname=os.environ.get("STEGASOO_HOSTNAME") or socket.gethostname(), port=os.environ.get("STEGASOO_PORT", "5000"), https_enabled=app.config.get("HTTPS_ENABLED", False), auth_enabled=app.config.get("AUTH_ENABLED", True), max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024, max_upload_mb=MAX_FILE_SIZE // (1024 * 1024), dct_available=has_dct_support(), qr_available=HAS_QRCODE_READ, # Environment version=__version__, python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", platform=platform.system(), kdf_type="Argon2id" if has_argon2() else "PBKDF2", ) @app.route("/admin/settings/unlock", methods=["POST"]) @admin_required def admin_settings_unlock(): """Verify password and return channel key (AJAX).""" from stegasoo.channel import get_channel_status data = request.get_json() or {} password = data.get("password", "") if not password: return jsonify({"success": False, "error": "Password required"}) # Get current user and verify password username = get_username() user = verify_user_password(username, password) if not user: return jsonify({"success": False, "error": "Incorrect password"}) # Password verified - return channel key channel_status = get_channel_status() channel_key = channel_status.get("key") if channel_status["configured"] else "" return jsonify({ "success": True, "channel_key": channel_key }) @app.route("/admin/users") @admin_required def admin_users(): """User management page (admin only).""" users = get_all_users() current_user = get_current_user() return render_template( "admin/users.html", users=users, current_user=current_user, user_count=get_non_admin_count(), max_users=MAX_USERS, can_create=can_create_user(), ) @app.route("/admin/users/new", methods=["GET", "POST"]) @admin_required def admin_user_new(): """Create new user (admin only).""" if request.method == "POST": username = request.form.get("username", "") password = request.form.get("password", "") success, message, user = create_user(username, password) # Check if AJAX request if request.headers.get("X-Requested-With") == "XMLHttpRequest": if success: return jsonify({"success": True, "username": username, "password": password}) else: return jsonify({"success": False, "error": message}) # Regular form submission fallback if success: flash(f"User '{username}' created successfully", "success") session["temp_password"] = password session["temp_username"] = username return redirect(url_for("admin_user_created")) else: flash(message, "error") # Generate a temp password for the form temp_password = generate_temp_password() return render_template("admin/user_new.html", temp_password=temp_password) @app.route("/admin/users/created") @admin_required def admin_user_created(): """Show created user confirmation with password.""" username = session.pop("temp_username", None) password = session.pop("temp_password", None) if not username or not password: return redirect(url_for("admin_users")) return render_template( "admin/user_created.html", username=username, password=password, ) @app.route("/admin/users//delete", methods=["POST"]) @admin_required def admin_user_delete(user_id): """Delete a user (admin only).""" current_user = get_current_user() success, message = delete_user(user_id, current_user.id) flash(message, "success" if success else "error") return redirect(url_for("admin_users")) @app.route("/admin/users//reset-password", methods=["POST"]) @admin_required def admin_user_reset_password(user_id): """Reset a user's password (admin only).""" user = get_user_by_id(user_id) if not user: flash("User not found", "error") return redirect(url_for("admin_users")) # Generate new password new_password = generate_temp_password() success, message = reset_user_password(user_id, new_password) if success: # Store for display session["temp_password"] = new_password session["temp_username"] = user.username return redirect(url_for("admin_user_password_reset")) else: flash(message, "error") return redirect(url_for("admin_users")) @app.route("/admin/users/password-reset") @admin_required def admin_user_password_reset(): """Show password reset confirmation.""" username = session.pop("temp_username", None) password = session.pop("temp_password", None) if not username or not password: return redirect(url_for("admin_users")) return render_template( "admin/password_reset.html", username=username, password=password, ) # ============================================================================ # MAIN # ============================================================================ if __name__ == "__main__": base_dir = Path(__file__).parent # Clean up any leftover temp files from previous runs temp_storage.init(base_dir / "temp_files") cleaned = temp_storage.cleanup_all() if cleaned > 0: print(f"Cleaned up {cleaned} leftover temp files from previous run") # HTTPS configuration ssl_context = None if app.config.get("HTTPS_ENABLED", False): import socket hostname = os.environ.get("STEGASOO_HOSTNAME") or socket.gethostname() try: cert_path, key_path = ensure_certs(base_dir, hostname) if cert_path.exists() and key_path.exists(): ssl_context = (str(cert_path), str(key_path)) print(f"HTTPS enabled with self-signed certificate for {hostname}") else: print("ERROR: SSL certificates not found after generation attempt") print(f" Expected: {cert_path}, {key_path}") print(" Falling back to HTTP (INSECURE)") except Exception as e: print(f"ERROR: Failed to generate SSL certificates: {e}") print(" Falling back to HTTP (INSECURE)") print(" To fix: mkdir -p certs && openssl req -x509 -newkey rsa:2048 \\") print(" -keyout certs/server.key -out certs/server.crt -days 365 -nodes \\") print(" -subj '/CN=localhost'") # Auth status if app.config.get("AUTH_ENABLED", True): print("Authentication enabled") else: print("Authentication disabled") port = int(os.environ.get("STEGASOO_PORT", "5000")) app.run( host="0.0.0.0", port=port, debug=False, ssl_context=ssl_context, )