""" Stegasoo encode/decode/tools routes. Ported from stegasoo's frontends/web/app.py. These routes handle: - Image encode with async progress tracking - Audio encode (v4.3.0) - Image/audio decode - Encode result display - Encode/decode progress polling - Tools API (capacity, EXIF, rotate, compress, convert) All routes use subprocess isolation via SubprocessStego for crash safety. """ import io import mimetypes import os import secrets import threading import time from concurrent.futures import ThreadPoolExecutor from flask import ( flash, jsonify, redirect, render_template, request, send_file, url_for, ) from PIL import Image def register_stego_routes(app, **deps): """Register all stegasoo encode/decode routes on the Flask app.""" # Unpack dependencies passed from app.py login_required = deps["login_required"] subprocess_stego = deps["subprocess_stego"] temp_storage = deps["temp_storage"] _has_qrcode_read = deps.get("has_qrcode_read", False) from soosef.stegasoo import ( HAS_AUDIO_SUPPORT, CapacityError, DecryptionError, FilePayload, InvalidHeaderError, InvalidMagicBytesError, ReedSolomonError, StegasooError, generate_filename, has_dct_support, validate_file_payload, validate_image, validate_message, validate_passphrase, validate_pin, validate_rsa_key, validate_security_factors, ) from soosef.stegasoo.channel import resolve_channel_key from soosef.stegasoo.constants import ( TEMP_FILE_EXPIRY, THUMBNAIL_QUALITY, THUMBNAIL_SIZE, ) from soosef.stegasoo.qr_utils import ( decompress_data, extract_key_from_qr, is_compressed, ) from subprocess_stego import ( cleanup_progress_file, generate_job_id, get_progress_file_path, read_progress, ) # Async job management _executor = ThreadPoolExecutor(max_workers=2) _jobs = {} _jobs_lock = threading.Lock() def _store_job(job_id, data): with _jobs_lock: _jobs[job_id] = data def _get_job(job_id): with _jobs_lock: return _jobs.get(job_id) def _cleanup_old_jobs(max_age_seconds=3600): now = time.time() with _jobs_lock: to_remove = [ jid for jid, data in _jobs.items() if now - data.get("created", 0) > max_age_seconds ] for jid in to_remove: cleanup_progress_file(jid) del _jobs[jid] def resolve_channel_key_form(value): try: result = resolve_channel_key(value) return "auto" if result is None else ("none" if result == "" else result) except (ValueError, FileNotFoundError): return "auto" def generate_thumbnail(image_data, size=THUMBNAIL_SIZE): try: with Image.open(io.BytesIO(image_data)) as img: if img.mode in ("RGBA", "LA", "P"): bg = Image.new("RGB", img.size, (255, 255, 255)) if img.mode == "P": img = img.convert("RGBA") bg.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) img = bg elif img.mode != "RGB": img = img.convert("RGB") img.thumbnail(size, Image.Resampling.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=THUMBNAIL_QUALITY, optimize=True) return buf.getvalue() except Exception: return None def cleanup_temp_files(): temp_storage.cleanup_expired(TEMP_FILE_EXPIRY) def allowed_image(fn): return bool( fn and "." in fn and fn.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif"} ) def allowed_audio(fn): return bool( fn and "." in fn and fn.rsplit(".", 1)[1].lower() in {"wav", "flac", "mp3", "ogg", "aac", "m4a", "aiff", "aif"} ) def format_size(n): if n < 1024: return f"{n} B" elif n < 1024 * 1024: return f"{n/1024:.1f} KB" else: return f"{n/(1024*1024):.1f} MB" # ── Routes below are extracted from stegasoo app.py ── def _run_encode_job(job_id: str, encode_params: dict) -> None: """Background thread function for async encode.""" progress_file = get_progress_file_path(job_id) try: _store_job(job_id, {"status": "running", "created": time.time()}) # Run encode with progress file if encode_params.get("file_data"): encode_result = subprocess_stego.encode( carrier_data=encode_params["carrier_data"], reference_data=encode_params["ref_data"], file_data=encode_params["file_data"], file_name=encode_params["file_name"], file_mime=encode_params["file_mime"], passphrase=encode_params["passphrase"], pin=encode_params.get("pin"), rsa_key_data=encode_params.get("rsa_key_data"), rsa_password=encode_params.get("key_password"), embed_mode=encode_params["embed_mode"], dct_output_format=encode_params.get("dct_output_format", "png"), dct_color_mode=encode_params.get("dct_color_mode", "color"), channel_key=encode_params.get("channel_key"), progress_file=progress_file, ) else: encode_result = subprocess_stego.encode( carrier_data=encode_params["carrier_data"], reference_data=encode_params["ref_data"], message=encode_params["message"], passphrase=encode_params["passphrase"], pin=encode_params.get("pin"), rsa_key_data=encode_params.get("rsa_key_data"), rsa_password=encode_params.get("key_password"), embed_mode=encode_params["embed_mode"], dct_output_format=encode_params.get("dct_output_format", "png"), dct_color_mode=encode_params.get("dct_color_mode", "color"), channel_key=encode_params.get("channel_key"), progress_file=progress_file, ) if not encode_result.success: _store_job( job_id, { "status": "error", "error": encode_result.error or "Encoding failed", "created": time.time(), }, ) return # Determine output format embed_mode = encode_params["embed_mode"] dct_output_format = encode_params.get("dct_output_format", "png") dct_color_mode = encode_params.get("dct_color_mode", "color") if embed_mode == "dct" and dct_output_format == "jpeg": output_ext = ".jpg" output_mime = "image/jpeg" else: output_ext = ".png" output_mime = "image/png" filename = encode_result.filename if not filename: filename = generate_filename("stego", output_ext) elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"): filename = filename[:-4] + ".jpg" # Store result file_id = secrets.token_urlsafe(16) temp_storage.save_temp_file( file_id, encode_result.stego_data, { "filename": filename, "embed_mode": embed_mode, "output_format": dct_output_format if embed_mode == "dct" else "png", "color_mode": dct_color_mode if embed_mode == "dct" else None, "mime_type": output_mime, "channel_mode": encode_result.channel_mode, "channel_fingerprint": encode_result.channel_fingerprint, }, ) _store_job( job_id, { "status": "complete", "file_id": file_id, "created": time.time(), }, ) except Exception as e: _store_job( job_id, { "status": "error", "error": str(e), "created": time.time(), }, ) finally: cleanup_progress_file(job_id) def _run_encode_audio_job(job_id: str, encode_params: dict) -> None: """Background thread function for async audio encode (v4.3.0).""" progress_file = get_progress_file_path(job_id) try: _store_job(job_id, {"status": "running", "created": time.time()}) if encode_params.get("file_data"): encode_result = subprocess_stego.encode_audio( carrier_data=encode_params["carrier_data"], reference_data=encode_params["ref_data"], file_data=encode_params["file_data"], file_name=encode_params["file_name"], file_mime=encode_params["file_mime"], passphrase=encode_params["passphrase"], pin=encode_params.get("pin"), rsa_key_data=encode_params.get("rsa_key_data"), rsa_password=encode_params.get("key_password"), embed_mode=encode_params["embed_mode"], channel_key=encode_params.get("channel_key"), progress_file=progress_file, chip_tier=encode_params.get("chip_tier"), ) else: encode_result = subprocess_stego.encode_audio( carrier_data=encode_params["carrier_data"], reference_data=encode_params["ref_data"], message=encode_params.get("message"), passphrase=encode_params["passphrase"], pin=encode_params.get("pin"), rsa_key_data=encode_params.get("rsa_key_data"), rsa_password=encode_params.get("key_password"), embed_mode=encode_params["embed_mode"], channel_key=encode_params.get("channel_key"), progress_file=progress_file, chip_tier=encode_params.get("chip_tier"), ) if not encode_result.success: _store_job( job_id, { "status": "error", "error": encode_result.error or "Audio encoding failed", "created": time.time(), }, ) return filename = generate_filename("stego_audio", ".wav") file_id = secrets.token_urlsafe(16) temp_storage.save_temp_file( file_id, encode_result.stego_data, { "filename": filename, "embed_mode": encode_params["embed_mode"], "carrier_type": "audio", "mime_type": "audio/wav", "channel_mode": encode_result.channel_mode, "channel_fingerprint": encode_result.channel_fingerprint, }, ) _store_job( job_id, { "status": "complete", "file_id": file_id, "created": time.time(), }, ) except Exception as e: _store_job( job_id, { "status": "error", "error": str(e), "created": time.time(), }, ) finally: cleanup_progress_file(job_id) @app.route("/encode", methods=["GET", "POST"]) @login_required def encode(): if request.method == "POST": # Check if async mode requested is_async = ( request.form.get("async") == "true" or request.headers.get("X-Async") == "true" ) def _error_response(msg): """Return error as JSON (async) or HTML flash (sync).""" if is_async: return jsonify({"error": msg}), 400 flash(msg, "error") return render_template("stego/encode.html", has_qrcode_read=_has_qrcode_read) try: # Get files ref_photo = request.files.get("reference_photo") carrier = request.files.get("carrier") rsa_key_file = request.files.get("rsa_key") payload_file = request.files.get("payload_file") # Determine carrier type (v4.3.0) carrier_type = request.form.get("carrier_type", "image") if carrier_type == "audio": # ========== AUDIO ENCODE PATH (v4.3.0) ========== if not HAS_AUDIO_SUPPORT: return _error_response( "Audio steganography is not available. Install audio dependencies." ) if not ref_photo or not carrier: return _error_response( "Both reference photo and audio carrier are required" ) if not allowed_image(ref_photo.filename): return _error_response("Reference must be an image (PNG, JPG, BMP)") if not allowed_audio(carrier.filename): return _error_response( "Invalid audio format. Use WAV, FLAC, MP3, OGG, AAC, or M4A" ) # Get form data message = request.form.get("message", "") passphrase = request.form.get("passphrase", "") pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") payload_type = request.form.get("payload_type", "text") embed_mode = request.form.get("embed_mode", "audio_lsb") if embed_mode not in ("audio_lsb", "audio_spread"): embed_mode = "audio_lsb" # Chip tier for spread spectrum (None = default) chip_tier_str = request.form.get("chip_tier") chip_tier = None if chip_tier_str and chip_tier_str.isdigit(): chip_tier = int(chip_tier_str) if chip_tier not in (0, 1, 2): chip_tier = None channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) # Determine payload if payload_type == "file" and payload_file and payload_file.filename: file_data = payload_file.read() result = validate_file_payload(file_data, payload_file.filename) if not result.is_valid: return _error_response(result.error_message) mime_type, _ = mimetypes.guess_type(payload_file.filename) payload = FilePayload( data=file_data, filename=payload_file.filename, mime_type=mime_type, ) else: result = validate_message(message) if not result.is_valid: return _error_response(result.error_message) payload = message if not passphrase: return _error_response("Passphrase is required") result = validate_passphrase(passphrase) if not result.is_valid: return _error_response(result.error_message) if result.warning: flash(result.warning, "warning") ref_data = ref_photo.read() carrier_data = carrier.read() # Handle RSA key (same as image path) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False if rsa_key_pem: if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) rsa_key_data = rsa_key_pem.encode("utf-8") rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True else: return _error_response("Could not extract RSA key from QR code image.") result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: return _error_response(result.error_message) if pin: result = validate_pin(pin) if not result.is_valid: return _error_response(result.error_message) key_password = ( None if rsa_key_from_qr else (rsa_password if rsa_password else None) ) if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: return _error_response(result.error_message) # Build audio encode params encode_params = { "carrier_data": carrier_data, "ref_data": ref_data, "passphrase": passphrase, "pin": pin if pin else None, "rsa_key_data": rsa_key_data, "key_password": key_password, "embed_mode": embed_mode, "channel_key": channel_key, "carrier_type": "audio", "chip_tier": chip_tier, } if payload_type == "file" and payload_file and payload_file.filename: encode_params["file_data"] = payload.data encode_params["file_name"] = payload.filename encode_params["file_mime"] = payload.mime_type else: encode_params["message"] = payload if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_encode_audio_job, job_id, encode_params) return jsonify({"job_id": job_id, "status": "pending"}) # Sync audio encode if encode_params.get("file_data"): encode_result = subprocess_stego.encode_audio( carrier_data=carrier_data, reference_data=ref_data, file_data=encode_params["file_data"], file_name=encode_params["file_name"], file_mime=encode_params["file_mime"], passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, channel_key=channel_key, chip_tier=chip_tier, ) else: encode_result = subprocess_stego.encode_audio( carrier_data=carrier_data, reference_data=ref_data, message=payload, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, channel_key=channel_key, chip_tier=chip_tier, ) if not encode_result.success: error_msg = encode_result.error or "Audio encoding failed" return _error_response(error_msg) filename = generate_filename("stego_audio", ".wav") file_id = secrets.token_urlsafe(16) cleanup_temp_files() temp_storage.save_temp_file( file_id, encode_result.stego_data, { "filename": filename, "embed_mode": embed_mode, "carrier_type": "audio", "mime_type": "audio/wav", "channel_mode": encode_result.channel_mode, "channel_fingerprint": encode_result.channel_fingerprint, }, ) return redirect(url_for("encode_result", file_id=file_id)) # ========== IMAGE ENCODE PATH (original) ========== if not ref_photo or not carrier: return _error_response("Both reference photo and carrier image are required") if not allowed_image(ref_photo.filename) or not allowed_image(carrier.filename): return _error_response("Invalid file type. Use PNG, JPG, or BMP") # Get form data - v3.2.0: renamed from day_phrase to passphrase message = request.form.get("message", "") passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") payload_type = request.form.get("payload_type", "text") # NEW in v3.0 - Embedding mode embed_mode = request.form.get("embed_mode", "lsb") if embed_mode not in ("lsb", "dct"): embed_mode = "lsb" # NEW in v3.0.1 - DCT output format dct_output_format = request.form.get("dct_output_format", "png") if dct_output_format not in ("png", "jpeg"): dct_output_format = "png" # NEW in v3.0.1 - DCT color mode dct_color_mode = request.form.get("dct_color_mode", "color") if dct_color_mode not in ("grayscale", "color"): dct_color_mode = "color" # NEW in v4.0.0 - Channel key channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) # Check DCT availability if embed_mode == "dct" and not has_dct_support(): return _error_response( "DCT mode requires scipy. Install with: pip install scipy" ) # Determine payload if payload_type == "file" and payload_file and payload_file.filename: # File payload file_data = payload_file.read() result = validate_file_payload(file_data, payload_file.filename) if not result.is_valid: return _error_response(result.error_message) mime_type, _ = mimetypes.guess_type(payload_file.filename) payload = FilePayload( data=file_data, filename=payload_file.filename, mime_type=mime_type ) else: # Text message result = validate_message(message) if not result.is_valid: return _error_response(result.error_message) payload = message # v3.2.0: Renamed from day_phrase if not passphrase: return _error_response("Passphrase is required") # v3.2.0: Validate passphrase result = validate_passphrase(passphrase) if not result.is_valid: return _error_response(result.error_message) # Show warning if passphrase is short if result.warning: flash(result.warning, "warning") # Read files ref_data = ref_photo.read() carrier_data = carrier.read() # Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False if rsa_key_pem: # Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd) if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) rsa_key_data = rsa_key_pem.encode("utf-8") rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True else: return _error_response("Could not extract RSA key from QR code image.") # Validate security factors result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: return _error_response(result.error_message) # Validate PIN if provided if pin: result = validate_pin(pin) if not result.is_valid: return _error_response(result.error_message) # Determine key password key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) # Validate RSA key if provided if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: return _error_response(result.error_message) # Validate carrier image result = validate_image(carrier_data, "Carrier image") if not result.is_valid: return _error_response(result.error_message) # Pre-check payload capacity BEFORE encode (fail fast) from soosef.stegasoo.steganography import will_fit_by_mode payload_size = ( len(payload.data) if hasattr(payload, "data") else len(payload.encode("utf-8")) ) fit_check = will_fit_by_mode(payload_size, carrier_data, embed_mode=embed_mode) if not fit_check.get("fits", True): error_msg = ( f"Payload too large for {embed_mode.upper()} mode. " f"Payload: {payload_size:,} bytes, " f"Capacity: {fit_check.get('capacity', 0):,} bytes" ) # Suggest alternative mode if embed_mode == "dct": alt_check = will_fit_by_mode(payload_size, carrier_data, embed_mode="lsb") if alt_check.get("fits"): error_msg += " - Try LSB mode instead." return _error_response(error_msg) # Build encode params for either sync or async encode_params = { "carrier_data": carrier_data, "ref_data": ref_data, "passphrase": passphrase, "pin": pin if pin else None, "rsa_key_data": rsa_key_data, "key_password": key_password, "embed_mode": embed_mode, "dct_output_format": dct_output_format if embed_mode == "dct" else "png", "dct_color_mode": dct_color_mode if embed_mode == "dct" else "color", "channel_key": channel_key, } if payload_type == "file" and payload_file and payload_file.filename: encode_params["file_data"] = payload.data encode_params["file_name"] = payload.filename encode_params["file_mime"] = payload.mime_type else: encode_params["message"] = payload # ASYNC MODE: Start background job and return JSON if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_encode_job, job_id, encode_params) return jsonify({"job_id": job_id, "status": "pending"}) # SYNC MODE: Run inline (original behavior) if payload_type == "file" and payload_file and payload_file.filename: encode_result = subprocess_stego.encode( carrier_data=carrier_data, reference_data=ref_data, file_data=payload.data, file_name=payload.filename, file_mime=payload.mime_type, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, dct_output_format=dct_output_format if embed_mode == "dct" else "png", dct_color_mode=dct_color_mode if embed_mode == "dct" else "color", channel_key=channel_key, ) else: encode_result = subprocess_stego.encode( carrier_data=carrier_data, reference_data=ref_data, message=payload, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, dct_output_format=dct_output_format if embed_mode == "dct" else "png", dct_color_mode=dct_color_mode if embed_mode == "dct" else "color", channel_key=channel_key, ) # Check for subprocess errors if not encode_result.success: error_msg = encode_result.error or "Encoding failed" if "capacity" in error_msg.lower(): raise CapacityError(error_msg) raise StegasooError(error_msg) # Determine actual output format for filename and storage if embed_mode == "dct" and dct_output_format == "jpeg": output_ext = ".jpg" output_mime = "image/jpeg" else: output_ext = ".png" output_mime = "image/png" # Use filename from result or generate one filename = encode_result.filename if not filename: filename = generate_filename("stego", output_ext) elif ( embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png") ): filename = filename[:-4] + ".jpg" # Store temporarily file_id = secrets.token_urlsafe(16) cleanup_temp_files() temp_storage.save_temp_file( file_id, encode_result.stego_data, { "filename": filename, "embed_mode": embed_mode, "output_format": dct_output_format if embed_mode == "dct" else "png", "color_mode": dct_color_mode if embed_mode == "dct" else None, "mime_type": output_mime, # Channel info (v4.0.0) "channel_mode": encode_result.channel_mode, "channel_fingerprint": encode_result.channel_fingerprint, }, ) return redirect(url_for("encode_result", file_id=file_id)) except CapacityError as e: return _error_response(str(e)) except StegasooError as e: return _error_response(str(e)) except Exception as e: return _error_response(f"Error: {e}") return render_template("stego/encode.html", has_qrcode_read=_has_qrcode_read) # ============================================================================ # ENCODE PROGRESS ENDPOINTS (v4.1.2) # ============================================================================ @app.route("/encode/status/") @login_required def encode_status(job_id): """Get the status of an async encode job.""" job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 response = {"status": job.get("status", "unknown")} if job["status"] == "complete": response["file_id"] = job.get("file_id") elif job["status"] == "error": response["error"] = job.get("error", "Unknown error") return jsonify(response) @app.route("/encode/progress/") @login_required def encode_progress(job_id): """Get the progress of an async encode job.""" progress = read_progress(job_id) if progress: return jsonify(progress) # No progress file yet - check job status job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 if job["status"] == "complete": return jsonify({"percent": 100, "phase": "complete"}) elif job["status"] == "error": return jsonify({"percent": 0, "phase": "error", "error": job.get("error")}) elif job["status"] == "pending": return jsonify({"percent": 0, "phase": "starting"}) # Running but no progress file yet return jsonify({"percent": 0, "phase": "initializing"}) @app.route("/encode/result/") @login_required def encode_result(file_id): file_info = temp_storage.get_temp_file(file_id) if not file_info: flash("File expired or not found. Please encode again.", "error") return redirect(url_for("encode")) carrier_type = file_info.get("carrier_type", "image") # Generate thumbnail only for images thumbnail_data = None thumbnail_id = None if carrier_type != "audio": thumbnail_data = generate_thumbnail(file_info["data"]) if thumbnail_data: thumbnail_id = f"{file_id}_thumb" temp_storage.save_thumbnail(thumbnail_id, thumbnail_data) return render_template( "encode_result.html", file_id=file_id, filename=file_info["filename"], thumbnail_url=( url_for("encode_thumbnail", thumb_id=thumbnail_id) if thumbnail_id else None ), embed_mode=file_info.get("embed_mode", "lsb"), output_format=file_info.get("output_format", "png"), color_mode=file_info.get("color_mode"), carrier_type=carrier_type, # Channel info (v4.0.0) channel_mode=file_info.get("channel_mode", "public"), channel_fingerprint=file_info.get("channel_fingerprint"), ) @app.route("/encode/thumbnail/") @login_required def encode_thumbnail(thumb_id): """Serve thumbnail image.""" thumb_data = temp_storage.get_thumbnail(thumb_id) if not thumb_data: return "Thumbnail not found", 404 return send_file(io.BytesIO(thumb_data), mimetype="image/jpeg", as_attachment=False) @app.route("/encode/download/") @login_required def encode_download(file_id): file_info = temp_storage.get_temp_file(file_id) if not file_info: flash("File expired or not found.", "error") return redirect(url_for("encode")) mime_type = file_info.get("mime_type", "image/png") return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=True, download_name=file_info["filename"], ) @app.route("/encode/file/") @login_required def encode_file_route(file_id): """Serve file for Web Share API.""" file_info = temp_storage.get_temp_file(file_id) if not file_info: return "Not found", 404 mime_type = file_info.get("mime_type", "image/png") return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=False, download_name=file_info["filename"], ) @app.route("/encode/cleanup/", methods=["POST"]) @login_required def encode_cleanup(file_id): """Manually cleanup a file after sharing.""" temp_storage.delete_temp_file(file_id) # Also cleanup thumbnail if exists thumb_id = f"{file_id}_thumb" temp_storage.delete_thumbnail(thumb_id) return jsonify({"status": "ok"}) # ============================================================================ # DECODE # ============================================================================ def _run_decode_job(job_id: str, decode_params: dict) -> None: """Background thread function for async decode.""" progress_file = get_progress_file_path(job_id) try: _store_job(job_id, {"status": "running", "created": time.time()}) # Run decode with progress file decode_result = subprocess_stego.decode( stego_data=decode_params["stego_data"], reference_data=decode_params["ref_data"], passphrase=decode_params["passphrase"], pin=decode_params.get("pin"), rsa_key_data=decode_params.get("rsa_key_data"), rsa_password=decode_params.get("rsa_password"), embed_mode=decode_params.get("embed_mode", "auto"), channel_key=decode_params.get("channel_key"), progress_file=progress_file, ) if not decode_result.success: _store_job( job_id, { "status": "error", "error": decode_result.error or "Decoding failed", "error_type": decode_result.error_type, "created": time.time(), }, ) return # Store result based on type if decode_result.is_file: file_id = secrets.token_urlsafe(16) filename = decode_result.filename or "decoded_file" temp_storage.save_temp_file( file_id, decode_result.file_data, { "filename": filename, "mime_type": decode_result.mime_type, }, ) _store_job( job_id, { "status": "complete", "file_id": file_id, "is_file": True, "filename": filename, "file_size": len(decode_result.file_data), "mime_type": decode_result.mime_type, "created": time.time(), }, ) else: _store_job( job_id, { "status": "complete", "is_file": False, "message": decode_result.message, "created": time.time(), }, ) except Exception as e: _store_job( job_id, { "status": "error", "error": str(e), "created": time.time(), }, ) finally: cleanup_progress_file(job_id) def _run_decode_audio_job(job_id: str, decode_params: dict) -> None: """Background thread function for async audio decode (v4.3.0).""" progress_file = get_progress_file_path(job_id) try: _store_job(job_id, {"status": "running", "created": time.time()}) decode_result = subprocess_stego.decode_audio( stego_data=decode_params["stego_data"], reference_data=decode_params["ref_data"], passphrase=decode_params["passphrase"], pin=decode_params.get("pin"), rsa_key_data=decode_params.get("rsa_key_data"), rsa_password=decode_params.get("rsa_password"), embed_mode=decode_params.get("embed_mode", "audio_auto"), channel_key=decode_params.get("channel_key"), progress_file=progress_file, ) if not decode_result.success: _store_job( job_id, { "status": "error", "error": decode_result.error or "Audio decoding failed", "error_type": decode_result.error_type, "created": time.time(), }, ) return if decode_result.is_file: file_id = secrets.token_urlsafe(16) filename = decode_result.filename or "decoded_file" temp_storage.save_temp_file( file_id, decode_result.file_data, { "filename": filename, "mime_type": decode_result.mime_type, }, ) _store_job( job_id, { "status": "complete", "file_id": file_id, "is_file": True, "filename": filename, "file_size": len(decode_result.file_data), "mime_type": decode_result.mime_type, "created": time.time(), }, ) else: _store_job( job_id, { "status": "complete", "is_file": False, "message": decode_result.message, "created": time.time(), }, ) except Exception as e: _store_job( job_id, { "status": "error", "error": str(e), "created": time.time(), }, ) finally: cleanup_progress_file(job_id) @app.route("/decode", methods=["GET", "POST"]) @login_required def decode_page(): if request.method == "POST": try: # Get files ref_photo = request.files.get("reference_photo") stego_image = request.files.get("stego_image") rsa_key_file = request.files.get("rsa_key") # Determine carrier type (v4.3.0) carrier_type = request.form.get("carrier_type", "image") if carrier_type == "audio": # ========== AUDIO DECODE PATH (v4.3.0) ========== if not HAS_AUDIO_SUPPORT: flash("Audio steganography is not available.", "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if not ref_photo or not stego_image: flash("Both reference photo and stego audio are required", "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if not allowed_image(ref_photo.filename): flash("Reference must be an image", "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if not allowed_audio(stego_image.filename): flash("Invalid audio format", "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) passphrase = request.form.get("passphrase", "") pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") embed_mode = request.form.get("embed_mode", "audio_auto") if embed_mode not in ("audio_auto", "audio_lsb", "audio_spread"): embed_mode = "audio_auto" channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) if not passphrase: flash("Passphrase is required", "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) ref_data = ref_photo.read() stego_data = stego_image.read() # Handle RSA key (same as image path) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False if rsa_key_pem: if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) rsa_key_data = rsa_key_pem.encode("utf-8") rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True else: flash("Could not extract RSA key from QR code image.", "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: flash(result.error_message, "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if pin: result = validate_pin(pin) if not result.is_valid: flash(result.error_message, "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) key_password = ( None if rsa_key_from_qr else (rsa_password if rsa_password else None) ) if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: flash(result.error_message, "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) is_async = ( request.form.get("async") == "true" or request.headers.get("X-Async") == "true" ) decode_params = { "stego_data": stego_data, "ref_data": ref_data, "passphrase": passphrase, "pin": pin if pin else None, "rsa_key_data": rsa_key_data, "rsa_password": key_password, "embed_mode": embed_mode, "channel_key": channel_key, } if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_decode_audio_job, job_id, decode_params) return jsonify({"job_id": job_id, "status": "pending"}) # Sync audio decode decode_result = subprocess_stego.decode_audio( stego_data=stego_data, reference_data=ref_data, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, channel_key=channel_key, ) if not decode_result.success: error_msg = decode_result.error or "Audio decoding failed" if ( "decrypt" in error_msg.lower() or decode_result.error_type == "DecryptionError" ): flash( "Wrong credentials. Double-check your reference photo, " "passphrase, PIN, and channel key.", "warning", ) else: flash(error_msg, "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if decode_result.is_file: file_id = secrets.token_urlsafe(16) cleanup_temp_files() filename = decode_result.filename or "decoded_file" temp_storage.save_temp_file( file_id, decode_result.file_data, { "filename": filename, "mime_type": decode_result.mime_type, }, ) return render_template( "decode.html", decoded_file=True, file_id=file_id, filename=filename, file_size=format_size(len(decode_result.file_data)), mime_type=decode_result.mime_type, has_qrcode_read=_has_qrcode_read, ) else: return render_template( "decode.html", decoded_message=decode_result.message, has_qrcode_read=_has_qrcode_read, ) # ========== IMAGE DECODE PATH (original) ========== if not ref_photo or not stego_image: flash("Both reference photo and stego image are required", "error") return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # Get form data - v3.2.0: renamed from day_phrase to passphrase passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") # NEW in v3.0 - Extraction mode embed_mode = request.form.get("embed_mode", "auto") if embed_mode not in ("auto", "lsb", "dct"): embed_mode = "auto" # NEW in v4.0.0 - Channel key channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) # Check DCT availability if embed_mode == "dct" and not has_dct_support(): flash("DCT mode requires scipy. Install with: pip install scipy", "error") return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # v3.2.0: Removed date handling (no stego_date needed) # v3.2.0: Renamed from day_phrase if not passphrase: flash("Passphrase is required", "error") return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # Read files ref_data = ref_photo.read() stego_data = stego_image.read() # Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False if rsa_key_pem: # Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd) if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) rsa_key_data = rsa_key_pem.encode("utf-8") rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True else: flash("Could not extract RSA key from QR code image.", "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) # Validate security factors result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: flash(result.error_message, "error") return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # Validate PIN if provided if pin: result = validate_pin(pin) if not result.is_valid: flash(result.error_message, "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) # Determine key password key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) # Validate RSA key if provided if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: flash(result.error_message, "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) # Check for async mode (v4.1.5) is_async = ( request.form.get("async") == "true" or request.headers.get("X-Async") == "true" ) # Build decode params decode_params = { "stego_data": stego_data, "ref_data": ref_data, "passphrase": passphrase, "pin": pin if pin else None, "rsa_key_data": rsa_key_data, "rsa_password": key_password, "embed_mode": embed_mode, "channel_key": channel_key, } # ASYNC MODE: Start background job and return JSON if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_decode_job, job_id, decode_params) return jsonify({"job_id": job_id, "status": "pending"}) # SYNC MODE: Run inline (original behavior) # v4.0.0: Include channel_key parameter # Use subprocess-isolated decode to prevent crashes decode_result = subprocess_stego.decode( stego_data=stego_data, reference_data=ref_data, passphrase=passphrase, pin=pin if pin else None, rsa_key_data=rsa_key_data, rsa_password=key_password, embed_mode=embed_mode, channel_key=channel_key, # v4.0.0 ) # Check for subprocess errors if not decode_result.success: error_msg = decode_result.error or "Decoding failed" # Check for channel key related errors if "channel key" in error_msg.lower(): flash(error_msg, "error") return render_template( "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if ( "decrypt" in error_msg.lower() or decode_result.error_type == "DecryptionError" ): raise DecryptionError(error_msg) raise StegasooError(error_msg) if decode_result.is_file: # File content - store temporarily for download file_id = secrets.token_urlsafe(16) cleanup_temp_files() filename = decode_result.filename or "decoded_file" temp_storage.save_temp_file( file_id, decode_result.file_data, { "filename": filename, "mime_type": decode_result.mime_type, }, ) return render_template( "decode.html", decoded_file=True, file_id=file_id, filename=filename, file_size=format_size(len(decode_result.file_data)), mime_type=decode_result.mime_type, has_qrcode_read=_has_qrcode_read, ) else: # Text content return render_template( "decode.html", decoded_message=decode_result.message, has_qrcode_read=_has_qrcode_read, ) except InvalidMagicBytesError: flash( "This doesn't appear to be a Stegasoo image. Try a different mode (LSB/DCT).", "warning", ) return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except ReedSolomonError: flash( "Image too corrupted to decode. It may have been re-saved or compressed.", "error", ) return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except InvalidHeaderError: flash( "Invalid or corrupted header. The image may have been modified.", "error", ) return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except DecryptionError: flash( "Wrong credentials. Double-check your reference photo, passphrase, PIN, and channel key.", "warning", ) return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except StegasooError as e: flash(str(e), "error") return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except Exception as e: flash(f"Error: {e}", "error") return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) @app.route("/decode/download/") @login_required def decode_download(file_id): """Download decoded file.""" file_info = temp_storage.get_temp_file(file_id) if not file_info: flash("File expired or not found.", "error") return redirect(url_for("decode_page")) mime_type = file_info.get("mime_type", "application/octet-stream") return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=True, download_name=file_info["filename"], ) # ============================================================================ # DECODE PROGRESS ENDPOINTS (v4.1.5) # ============================================================================ @app.route("/decode/status/") @login_required def decode_status(job_id): """Get the status of an async decode job.""" job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 response = {"status": job.get("status", "unknown")} if job["status"] == "complete": response["is_file"] = job.get("is_file", False) if job.get("is_file"): response["file_id"] = job.get("file_id") response["filename"] = job.get("filename") response["file_size"] = job.get("file_size") response["mime_type"] = job.get("mime_type") else: response["message"] = job.get("message") elif job["status"] == "error": response["error"] = job.get("error", "Unknown error") response["error_type"] = job.get("error_type") return jsonify(response) @app.route("/decode/progress/") @login_required def decode_progress(job_id): """Get the progress of an async decode job.""" progress = read_progress(job_id) if progress: return jsonify(progress) # No progress file yet - check job status job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 if job["status"] == "complete": return jsonify({"percent": 100, "phase": "complete"}) elif job["status"] == "error": return jsonify({"percent": 0, "phase": "error", "error": job.get("error")}) elif job["status"] == "pending": return jsonify({"percent": 0, "phase": "starting"}) # Running but no progress file yet return jsonify({"percent": 5, "phase": "reading"}) @app.route("/decode/result/") @login_required def decode_result(job_id): """Get the result page for an async decode job.""" job = _get_job(job_id) if not job: flash("Job not found or expired.", "error") return redirect(url_for("decode_page")) if job["status"] != "complete": flash("Decode not complete.", "error") return redirect(url_for("decode_page")) if job.get("is_file"): return render_template( "decode.html", decoded_file=True, file_id=job.get("file_id"), filename=job.get("filename"), file_size=format_size(job.get("file_size", 0)), mime_type=job.get("mime_type"), has_qrcode_read=_has_qrcode_read, ) else: return render_template( "decode.html", decoded_message=job.get("message"), has_qrcode_read=_has_qrcode_read, ) @app.route("/about") def about(): from auth import get_current_user from soosef.stegasoo import has_argon2 from soosef.stegasoo.channel import get_channel_status channel_status = get_channel_status() current_user = get_current_user() is_admin_user = current_user.is_admin if current_user else False return render_template( "stego/about.html", has_argon2=has_argon2(), has_qrcode_read=_has_qrcode_read, channel_configured=channel_status["configured"], channel_fingerprint=channel_status.get("fingerprint"), channel_source=channel_status.get("source"), is_admin=is_admin_user, ) # ============================================================================ # TOOLS ROUTES (v4.1.0) # ============================================================================ @app.route("/tools") @login_required def tools(): """Advanced tools page.""" return render_template("stego/tools.html", has_dct=has_dct_support()) @app.route("/api/tools/capacity", methods=["POST"]) @login_required def api_tools_capacity(): """Calculate image capacity for steganography.""" from soosef.stegasoo.dct_steganography import estimate_capacity_comparison carrier = request.files.get("image") if not carrier: return jsonify({"success": False, "error": "No image provided"}), 400 try: image_data = carrier.read() result = estimate_capacity_comparison(image_data) result["success"] = True result["filename"] = carrier.filename result["megapixels"] = round((result["width"] * result["height"]) / 1_000_000, 2) return jsonify(result) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 @app.route("/api/tools/strip-metadata", methods=["POST"]) @login_required def api_tools_strip_metadata(): """Strip EXIF/metadata from image.""" import io from soosef.stegasoo.utils import strip_image_metadata image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 try: image_data = image_file.read() clean_data = strip_image_metadata(image_data, output_format="PNG") buffer = io.BytesIO(clean_data) filename = image_file.filename.rsplit(".", 1)[0] + "_clean.png" return send_file( buffer, mimetype="image/png", as_attachment=True, download_name=filename ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 @app.route("/api/tools/exif", methods=["POST"]) @login_required def api_tools_exif(): """Read EXIF metadata from image.""" from soosef.stegasoo.utils import read_image_exif image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 try: image_data = image_file.read() exif = read_image_exif(image_data) # Check if it's a JPEG (editable) or not is_jpeg = image_data[:2] == b"\xff\xd8" return jsonify( { "success": True, "filename": image_file.filename, "exif": exif, "editable": is_jpeg, "field_count": len(exif), } ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 @app.route("/api/tools/exif/update", methods=["POST"]) @login_required def api_tools_exif_update(): """Update EXIF fields in image.""" from soosef.stegasoo.utils import write_image_exif image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 # Get updates from form data updates_json = request.form.get("updates", "{}") try: import json updates = json.loads(updates_json) except json.JSONDecodeError: return jsonify({"success": False, "error": "Invalid updates JSON"}), 400 if not updates: return jsonify({"success": False, "error": "No updates provided"}), 400 try: image_data = image_file.read() updated_data = write_image_exif(image_data, updates) # Return as downloadable file buffer = io.BytesIO(updated_data) return send_file( buffer, mimetype="image/jpeg", as_attachment=True, download_name=f"exif_{image_file.filename}", ) except ValueError as e: return jsonify({"success": False, "error": str(e)}), 400 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/exif/clear", methods=["POST"]) @login_required def api_tools_exif_clear(): """Remove all EXIF metadata from image.""" from soosef.stegasoo.utils import strip_image_metadata image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 # Get desired output format (default to PNG for lossless) output_format = request.form.get("format", "PNG").upper() if output_format not in ("PNG", "JPEG", "BMP"): output_format = "PNG" try: image_data = image_file.read() clean_data = strip_image_metadata(image_data, output_format=output_format) # Determine extension and mimetype ext_map = { "PNG": ("png", "image/png"), "JPEG": ("jpg", "image/jpeg"), "BMP": ("bmp", "image/bmp"), } ext, mimetype = ext_map.get(output_format, ("png", "image/png")) # Return as downloadable file stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) buffer = io.BytesIO(clean_data) return send_file( buffer, mimetype=mimetype, as_attachment=True, download_name=f"{stem}_clean.{ext}", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/rotate", methods=["POST"]) @login_required def api_tools_rotate(): """Rotate and/or flip an image, using lossless jpegtran for JPEGs.""" import shutil import subprocess import tempfile from PIL import Image image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 rotation = int(request.form.get("rotation", 0)) flip_h = request.form.get("flip_h", "false").lower() == "true" flip_v = request.form.get("flip_v", "false").lower() == "true" try: image_data = image_file.read() img = Image.open(io.BytesIO(image_data)) original_format = img.format # JPEG, PNG, etc. img.close() # For JPEGs, use jpegtran for lossless rotation/flip (preserves DCT stego) has_jpegtran = shutil.which("jpegtran") is not None use_jpegtran = ( original_format == "JPEG" and has_jpegtran and (rotation or flip_h or flip_v) ) if use_jpegtran: # Chain jpegtran operations for lossless transformation current_data = image_data # Apply rotation first if rotation in (90, 180, 270): with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: f.write(current_data) input_path = f.name output_path = tempfile.mktemp(suffix=".jpg") try: result = subprocess.run( [ "jpegtran", "-rotate", str(rotation), "-copy", "all", "-outfile", output_path, input_path, ], capture_output=True, timeout=30, ) if result.returncode == 0: with open(output_path, "rb") as f: current_data = f.read() finally: for p in [input_path, output_path]: try: os.unlink(p) except OSError: pass # Apply horizontal flip if flip_h: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: f.write(current_data) input_path = f.name output_path = tempfile.mktemp(suffix=".jpg") try: result = subprocess.run( [ "jpegtran", "-flip", "horizontal", "-copy", "all", "-outfile", output_path, input_path, ], capture_output=True, timeout=30, ) if result.returncode == 0: with open(output_path, "rb") as f: current_data = f.read() finally: for p in [input_path, output_path]: try: os.unlink(p) except OSError: pass # Apply vertical flip if flip_v: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: f.write(current_data) input_path = f.name output_path = tempfile.mktemp(suffix=".jpg") try: result = subprocess.run( [ "jpegtran", "-flip", "vertical", "-copy", "all", "-outfile", output_path, input_path, ], capture_output=True, timeout=30, ) if result.returncode == 0: with open(output_path, "rb") as f: current_data = f.read() finally: for p in [input_path, output_path]: try: os.unlink(p) except OSError: pass buffer = io.BytesIO(current_data) mimetype = "image/jpeg" ext = "jpg" else: # Fallback to PIL for non-JPEGs or when jpegtran unavailable img = Image.open(io.BytesIO(image_data)) # Apply rotation (PIL rotates counter-clockwise, so negate) if rotation: img = img.rotate(-rotation, expand=True) # Apply flips if flip_h: img = img.transpose(Image.FLIP_LEFT_RIGHT) if flip_v: img = img.transpose(Image.FLIP_TOP_BOTTOM) # Preserve original format buffer = io.BytesIO() if original_format == "JPEG": if img.mode in ("RGBA", "P"): img = img.convert("RGB") img.save(buffer, format="JPEG", quality=95) mimetype = "image/jpeg" ext = "jpg" else: img.save(buffer, format="PNG") mimetype = "image/png" ext = "png" buffer.seek(0) stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) return send_file( buffer, mimetype=mimetype, as_attachment=True, download_name=f"{stem}_transformed.{ext}", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/compress", methods=["POST"]) @login_required def api_tools_compress(): """Compress image to JPEG at specified quality.""" from PIL import Image image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 quality = int(request.form.get("quality", 85)) quality = max(10, min(100, quality)) # Clamp to valid range try: img = Image.open(io.BytesIO(image_file.read())) # Convert to RGB if necessary (JPEG doesn't support alpha) if img.mode in ("RGBA", "LA", "P"): img = img.convert("RGB") buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=quality) buffer.seek(0) stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) return send_file( buffer, mimetype="image/jpeg", as_attachment=True, download_name=f"{stem}_q{quality}.jpg", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/tools/convert", methods=["POST"]) @login_required def api_tools_convert(): """Convert image to different format.""" from PIL import Image image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 output_format = request.form.get("format", "PNG").upper() quality = int(request.form.get("quality", 90)) quality = max(10, min(100, quality)) # Validate format format_map = { "PNG": ("png", "image/png"), "JPEG": ("jpg", "image/jpeg"), "WEBP": ("webp", "image/webp"), } if output_format not in format_map: return jsonify({"success": False, "error": f"Unsupported format: {output_format}"}), 400 try: img = Image.open(io.BytesIO(image_file.read())) # Convert to RGB for JPEG (no alpha) if output_format == "JPEG" and img.mode in ("RGBA", "LA", "P"): img = img.convert("RGB") buffer = io.BytesIO() save_kwargs = {"format": output_format} if output_format in ("JPEG", "WEBP"): save_kwargs["quality"] = quality img.save(buffer, **save_kwargs) buffer.seek(0) ext, mimetype = format_map[output_format] stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename ) return send_file( buffer, mimetype=mimetype, as_attachment=True, download_name=f"{stem}.{ext}", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 # Add these two test routes anywhere in app.py after the app = Flask(...) line: @app.route("/test-capacity", methods=["POST"]) def test_capacity(): """Minimal capacity test - no stegasoo code, just PIL.""" carrier = request.files.get("carrier") if not carrier: return jsonify({"error": "No carrier image provided"}), 400 try: carrier_data = carrier.read() buffer = io.BytesIO(carrier_data) img = Image.open(buffer) width, height = img.size fmt = img.format img.close() buffer.close() pixels = width * height lsb_bytes = (pixels * 3) // 8 dct_bytes = ((width // 8) * (height // 8) * 16) // 8 - 10 return jsonify( { "success": True, "width": width, "height": height, "format": fmt, "lsb_kb": round(lsb_bytes / 1024, 1), "dct_kb": round(dct_bytes / 1024, 1), } ) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/test-capacity-nopil", methods=["POST"]) def test_capacity_nopil(): """Ultra-minimal test - no PIL, no stegasoo.""" carrier = request.files.get("carrier") if not carrier: return jsonify({"error": "No carrier image provided"}), 400 carrier_data = carrier.read() return jsonify( { "success": True, "data_size": len(carrier_data), } ) # ============================================================================ # AUTHENTICATION ROUTES (v4.0.2) # ============================================================================