diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index 9d821ca..bd8c686 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: git clone --depth=1 --branch="${GITHUB_REF_NAME}" https://git.golfcards.club/alee/soosef.git "$GITHUB_WORKSPACE" || git clone --depth=1 https://git.golfcards.club/alee/soosef.git "$GITHUB_WORKSPACE" - run: pip install ruff black - name: Check formatting - run: black --check src/ tests/ frontends/ + run: black --check --target-version py312 src/ tests/ frontends/ - name: Lint run: ruff check src/ tests/ frontends/ diff --git a/frontends/web/app.py b/frontends/web/app.py index cf531ac..34289ee 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -477,7 +477,9 @@ def _register_stegasoo_routes(app: Flask) -> None: detail=None if success else message, ) if success: - flash(f"User '{username}' created with temporary password: {temp_password}", "success") + flash( + f"User '{username}' created with temporary password: {temp_password}", "success" + ) else: flash(message, "error") return redirect(url_for("admin_users")) @@ -535,7 +537,9 @@ def _register_stegasoo_routes(app: Flask) -> None: if not use_pin and not use_rsa: flash("You must select at least one security factor (PIN or RSA Key)", "error") - return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE) + return render_template( + "stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE + ) pin_length = int(request.form.get("pin_length", 6)) rsa_bits = int(request.form.get("rsa_bits", 2048)) @@ -569,7 +573,11 @@ def _register_stegasoo_routes(app: Flask) -> None: temp_storage.save_temp_file( qr_token, creds.rsa_key_pem.encode(), - {"filename": "rsa_key.pem", "type": "rsa_key", "compress": qr_needs_compression}, + { + "filename": "rsa_key.pem", + "type": "rsa_key", + "compress": qr_needs_compression, + }, ) return render_template( @@ -594,7 +602,9 @@ def _register_stegasoo_routes(app: Flask) -> None: ) except Exception as e: flash(f"Error generating credentials: {e}", "error") - return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE) + return render_template( + "stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE + ) return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE) @@ -633,8 +643,10 @@ def _register_stegasoo_routes(app: Flask) -> None: compress = file_info.get("compress", False) qr_png = generate_qr_code(key_pem, compress=compress) return send_file( - io.BytesIO(qr_png), mimetype="image/png", - as_attachment=True, download_name="soosef_rsa_key_qr.png", + io.BytesIO(qr_png), + mimetype="image/png", + as_attachment=True, + download_name="soosef_rsa_key_qr.png", ) except Exception as e: return f"Error generating QR code: {e}", 500 @@ -656,8 +668,10 @@ def _register_stegasoo_routes(app: Flask) -> None: key_id = secrets.token_hex(4) filename = f"soosef_key_{private_key.key_size}_{key_id}.pem" return send_file( - io.BytesIO(encrypted_pem), mimetype="application/x-pem-file", - as_attachment=True, download_name=filename, + io.BytesIO(encrypted_pem), + mimetype="application/x-pem-file", + as_attachment=True, + download_name=filename, ) except Exception as e: flash(f"Error creating key file: {e}", "error") @@ -667,12 +681,15 @@ def _register_stegasoo_routes(app: Flask) -> None: from stego_routes import register_stego_routes - register_stego_routes(app, **{ - "login_required": login_required, - "subprocess_stego": subprocess_stego, - "temp_storage": temp_storage, - "has_qrcode_read": _HAS_QRCODE_READ, - }) + register_stego_routes( + app, + **{ + "login_required": login_required, + "subprocess_stego": subprocess_stego, + "temp_storage": temp_storage, + "has_qrcode_read": _HAS_QRCODE_READ, + }, + ) # /about route is in stego_routes.py @@ -683,22 +700,26 @@ def _register_stegasoo_routes(app: Flask) -> None: def api_channel_status(): result = subprocess_stego.get_channel_status(reveal=False) if result.success: - return jsonify({ - "success": True, - "mode": result.mode, - "configured": result.configured, - "fingerprint": result.fingerprint, - "source": result.source, - }) + return jsonify( + { + "success": True, + "mode": result.mode, + "configured": result.configured, + "fingerprint": result.fingerprint, + "source": result.source, + } + ) else: status = get_channel_status() - return jsonify({ - "success": True, - "mode": status["mode"], - "configured": status["configured"], - "fingerprint": status.get("fingerprint"), - "source": status.get("source"), - }) + return jsonify( + { + "success": True, + "mode": status["mode"], + "configured": status["configured"], + "fingerprint": status.get("fingerprint"), + "source": status.get("source"), + } + ) @app.route("/api/compare-capacity", methods=["POST"]) @login_required @@ -711,23 +732,25 @@ def _register_stegasoo_routes(app: Flask) -> None: result = subprocess_stego.compare_modes(carrier_data) if not result.success: return jsonify({"error": result.error or "Comparison failed"}), 500 - return jsonify({ - "success": True, - "width": result.width, - "height": result.height, - "lsb": { - "capacity_bytes": result.lsb["capacity_bytes"], - "capacity_kb": round(result.lsb["capacity_kb"], 1), - "output": result.lsb.get("output", "PNG"), - }, - "dct": { - "capacity_bytes": result.dct["capacity_bytes"], - "capacity_kb": round(result.dct["capacity_kb"], 1), - "output": result.dct.get("output", "JPEG"), - "available": result.dct.get("available", True), - "ratio": round(result.dct.get("ratio_vs_lsb", 0), 1), - }, - }) + return jsonify( + { + "success": True, + "width": result.width, + "height": result.height, + "lsb": { + "capacity_bytes": result.lsb["capacity_bytes"], + "capacity_kb": round(result.lsb["capacity_kb"], 1), + "output": result.lsb.get("output", "PNG"), + }, + "dct": { + "capacity_bytes": result.dct["capacity_bytes"], + "capacity_kb": round(result.dct["capacity_kb"], 1), + "output": result.dct.get("output", "JPEG"), + "available": result.dct.get("available", True), + "ratio": round(result.dct.get("ratio_vs_lsb", 0), 1), + }, + } + ) except Exception as e: return jsonify({"error": str(e)}), 500 @@ -751,11 +774,13 @@ def _register_stegasoo_routes(app: Flask) -> None: def api_generate_credentials(): try: creds = generate_credentials(use_pin=True, use_rsa=False) - return jsonify({ - "success": True, - "passphrase": creds.passphrase, - "pin": creds.pin, - }) + return jsonify( + { + "success": True, + "passphrase": creds.passphrase, + "pin": creds.pin, + } + ) except Exception as e: return jsonify({"error": str(e)}), 500 diff --git a/frontends/web/stego_routes.py b/frontends/web/stego_routes.py index a97822d..98250ea 100644 --- a/frontends/web/stego_routes.py +++ b/frontends/web/stego_routes.py @@ -95,7 +95,9 @@ def register_stego_routes(app, **deps): def _cleanup_old_jobs(max_age_seconds=3600): now = time.time() with _jobs_lock: - to_remove = [jid for jid, data in _jobs.items() if now - data.get("created", 0) > max_age_seconds] + to_remove = [ + jid for jid, data in _jobs.items() if now - data.get("created", 0) > max_age_seconds + ] for jid in to_remove: cleanup_progress_file(jid) del _jobs[jid] @@ -112,7 +114,8 @@ def register_stego_routes(app, **deps): with Image.open(io.BytesIO(image_data)) as img: if img.mode in ("RGBA", "LA", "P"): bg = Image.new("RGB", img.size, (255, 255, 255)) - if img.mode == "P": img = img.convert("RGBA") + if img.mode == "P": + img = img.convert("RGBA") bg.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) img = bg elif img.mode != "RGB": @@ -128,25 +131,37 @@ def register_stego_routes(app, **deps): temp_storage.cleanup_expired(TEMP_FILE_EXPIRY) def allowed_image(fn): - return bool(fn and "." in fn and fn.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif"}) + return bool( + fn + and "." in fn + and fn.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif"} + ) def allowed_audio(fn): - return bool(fn and "." in fn and fn.rsplit(".", 1)[1].lower() in {"wav", "flac", "mp3", "ogg", "aac", "m4a", "aiff", "aif"}) + return bool( + fn + and "." in fn + and fn.rsplit(".", 1)[1].lower() + in {"wav", "flac", "mp3", "ogg", "aac", "m4a", "aiff", "aif"} + ) def format_size(n): - if n < 1024: return f"{n} B" - elif n < 1024*1024: return f"{n/1024:.1f} KB" - else: return f"{n/(1024*1024):.1f} MB" + if n < 1024: + return f"{n} B" + elif n < 1024 * 1024: + return f"{n/1024:.1f} KB" + else: + return f"{n/(1024*1024):.1f} MB" # ── Routes below are extracted from stegasoo app.py ── def _run_encode_job(job_id: str, encode_params: dict) -> None: """Background thread function for async encode.""" progress_file = get_progress_file_path(job_id) - + try: _store_job(job_id, {"status": "running", "created": time.time()}) - + # Run encode with progress file if encode_params.get("file_data"): encode_result = subprocess_stego.encode( @@ -180,7 +195,7 @@ def register_stego_routes(app, **deps): channel_key=encode_params.get("channel_key"), progress_file=progress_file, ) - + if not encode_result.success: _store_job( job_id, @@ -191,25 +206,25 @@ def register_stego_routes(app, **deps): }, ) return - + # Determine output format embed_mode = encode_params["embed_mode"] dct_output_format = encode_params.get("dct_output_format", "png") dct_color_mode = encode_params.get("dct_color_mode", "color") - + if embed_mode == "dct" and dct_output_format == "jpeg": output_ext = ".jpg" output_mime = "image/jpeg" else: output_ext = ".png" output_mime = "image/png" - + filename = encode_result.filename if not filename: filename = generate_filename("stego", output_ext) elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"): filename = filename[:-4] + ".jpg" - + # Store result file_id = secrets.token_urlsafe(16) temp_storage.save_temp_file( @@ -225,7 +240,7 @@ def register_stego_routes(app, **deps): "channel_fingerprint": encode_result.channel_fingerprint, }, ) - + _store_job( job_id, { @@ -234,7 +249,7 @@ def register_stego_routes(app, **deps): "created": time.time(), }, ) - + except Exception as e: _store_job( job_id, @@ -246,15 +261,14 @@ def register_stego_routes(app, **deps): ) finally: cleanup_progress_file(job_id) - - + def _run_encode_audio_job(job_id: str, encode_params: dict) -> None: """Background thread function for async audio encode (v4.3.0).""" progress_file = get_progress_file_path(job_id) - + try: _store_job(job_id, {"status": "running", "created": time.time()}) - + if encode_params.get("file_data"): encode_result = subprocess_stego.encode_audio( carrier_data=encode_params["carrier_data"], @@ -285,7 +299,7 @@ def register_stego_routes(app, **deps): progress_file=progress_file, chip_tier=encode_params.get("chip_tier"), ) - + if not encode_result.success: _store_job( job_id, @@ -296,7 +310,7 @@ def register_stego_routes(app, **deps): }, ) return - + filename = generate_filename("stego_audio", ".wav") file_id = secrets.token_urlsafe(16) temp_storage.save_temp_file( @@ -311,7 +325,7 @@ def register_stego_routes(app, **deps): "channel_fingerprint": encode_result.channel_fingerprint, }, ) - + _store_job( job_id, { @@ -320,7 +334,7 @@ def register_stego_routes(app, **deps): "created": time.time(), }, ) - + except Exception as e: _store_job( job_id, @@ -332,61 +346,64 @@ def register_stego_routes(app, **deps): ) finally: cleanup_progress_file(job_id) - - + @app.route("/encode", methods=["GET", "POST"]) @login_required def encode(): if request.method == "POST": # Check if async mode requested - is_async = request.form.get("async") == "true" or request.headers.get("X-Async") == "true" - + is_async = ( + request.form.get("async") == "true" or request.headers.get("X-Async") == "true" + ) + def _error_response(msg): """Return error as JSON (async) or HTML flash (sync).""" if is_async: return jsonify({"error": msg}), 400 flash(msg, "error") return render_template("stego/encode.html", has_qrcode_read=_HAS_QRCODE_READ) - + try: # Get files ref_photo = request.files.get("reference_photo") carrier = request.files.get("carrier") rsa_key_file = request.files.get("rsa_key") payload_file = request.files.get("payload_file") - + # Determine carrier type (v4.3.0) carrier_type = request.form.get("carrier_type", "image") - + if carrier_type == "audio": # ========== AUDIO ENCODE PATH (v4.3.0) ========== if not HAS_AUDIO_SUPPORT: return _error_response( "Audio steganography is not available. Install audio dependencies." ) - + if not ref_photo or not carrier: - return _error_response("Both reference photo and audio carrier are required") - + return _error_response( + "Both reference photo and audio carrier are required" + ) + if not allowed_image(ref_photo.filename): return _error_response("Reference must be an image (PNG, JPG, BMP)") - + if not allowed_audio(carrier.filename): return _error_response( "Invalid audio format. Use WAV, FLAC, MP3, OGG, AAC, or M4A" ) - + # Get form data message = request.form.get("message", "") passphrase = request.form.get("passphrase", "") pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") payload_type = request.form.get("payload_type", "text") - + embed_mode = request.form.get("embed_mode", "audio_lsb") if embed_mode not in ("audio_lsb", "audio_spread"): embed_mode = "audio_lsb" - + # Chip tier for spread spectrum (None = default) chip_tier_str = request.form.get("chip_tier") chip_tier = None @@ -394,9 +411,9 @@ def register_stego_routes(app, **deps): chip_tier = int(chip_tier_str) if chip_tier not in (0, 1, 2): chip_tier = None - + channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) - + # Determine payload if payload_type == "file" and payload_file and payload_file.filename: file_data = payload_file.read() @@ -414,25 +431,25 @@ def register_stego_routes(app, **deps): if not result.is_valid: return _error_response(result.error_message) payload = message - + if not passphrase: return _error_response("Passphrase is required") - + result = validate_passphrase(passphrase) if not result.is_valid: return _error_response(result.error_message) if result.warning: flash(result.warning, "warning") - + ref_data = ref_photo.read() carrier_data = carrier.read() - + # Handle RSA key (same as image path) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False - + if rsa_key_pem: if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) @@ -448,23 +465,25 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True else: return _error_response("Could not extract RSA key from QR code image.") - + result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: return _error_response(result.error_message) - + if pin: result = validate_pin(pin) if not result.is_valid: return _error_response(result.error_message) - - key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) - + + key_password = ( + None if rsa_key_from_qr else (rsa_password if rsa_password else None) + ) + if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: return _error_response(result.error_message) - + # Build audio encode params encode_params = { "carrier_data": carrier_data, @@ -478,20 +497,20 @@ def register_stego_routes(app, **deps): "carrier_type": "audio", "chip_tier": chip_tier, } - + if payload_type == "file" and payload_file and payload_file.filename: encode_params["file_data"] = payload.data encode_params["file_name"] = payload.filename encode_params["file_mime"] = payload.mime_type else: encode_params["message"] = payload - + if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_encode_audio_job, job_id, encode_params) return jsonify({"job_id": job_id, "status": "pending"}) - + # Sync audio encode if encode_params.get("file_data"): encode_result = subprocess_stego.encode_audio( @@ -521,11 +540,11 @@ def register_stego_routes(app, **deps): channel_key=channel_key, chip_tier=chip_tier, ) - + if not encode_result.success: error_msg = encode_result.error or "Audio encoding failed" return _error_response(error_msg) - + filename = generate_filename("stego_audio", ".wav") file_id = secrets.token_urlsafe(16) cleanup_temp_files() @@ -541,54 +560,56 @@ def register_stego_routes(app, **deps): "channel_fingerprint": encode_result.channel_fingerprint, }, ) - + return redirect(url_for("encode_result", file_id=file_id)) - + # ========== IMAGE ENCODE PATH (original) ========== if not ref_photo or not carrier: return _error_response("Both reference photo and carrier image are required") - + if not allowed_image(ref_photo.filename) or not allowed_image(carrier.filename): return _error_response("Invalid file type. Use PNG, JPG, or BMP") - + # Get form data - v3.2.0: renamed from day_phrase to passphrase message = request.form.get("message", "") passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") payload_type = request.form.get("payload_type", "text") - + # NEW in v3.0 - Embedding mode embed_mode = request.form.get("embed_mode", "lsb") if embed_mode not in ("lsb", "dct"): embed_mode = "lsb" - + # NEW in v3.0.1 - DCT output format dct_output_format = request.form.get("dct_output_format", "png") if dct_output_format not in ("png", "jpeg"): dct_output_format = "png" - + # NEW in v3.0.1 - DCT color mode dct_color_mode = request.form.get("dct_color_mode", "color") if dct_color_mode not in ("grayscale", "color"): dct_color_mode = "color" - + # NEW in v4.0.0 - Channel key channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) - + # Check DCT availability if embed_mode == "dct" and not has_dct_support(): - return _error_response("DCT mode requires scipy. Install with: pip install scipy") - + return _error_response( + "DCT mode requires scipy. Install with: pip install scipy" + ) + # Determine payload if payload_type == "file" and payload_file and payload_file.filename: # File payload file_data = payload_file.read() - + result = validate_file_payload(file_data, payload_file.filename) if not result.is_valid: return _error_response(result.error_message) - + mime_type, _ = mimetypes.guess_type(payload_file.filename) payload = FilePayload( data=file_data, filename=payload_file.filename, mime_type=mime_type @@ -599,30 +620,30 @@ def register_stego_routes(app, **deps): if not result.is_valid: return _error_response(result.error_message) payload = message - + # v3.2.0: Renamed from day_phrase if not passphrase: return _error_response("Passphrase is required") - + # v3.2.0: Validate passphrase result = validate_passphrase(passphrase) if not result.is_valid: return _error_response(result.error_message) - + # Show warning if passphrase is short if result.warning: flash(result.warning, "warning") - + # Read files ref_data = ref_photo.read() carrier_data = carrier.read() - + # Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False - + if rsa_key_pem: # Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd) if is_compressed(rsa_key_pem): @@ -639,35 +660,35 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True else: return _error_response("Could not extract RSA key from QR code image.") - + # Validate security factors result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: return _error_response(result.error_message) - + # Validate PIN if provided if pin: result = validate_pin(pin) if not result.is_valid: return _error_response(result.error_message) - + # Determine key password key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) - + # Validate RSA key if provided if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: return _error_response(result.error_message) - + # Validate carrier image result = validate_image(carrier_data, "Carrier image") if not result.is_valid: return _error_response(result.error_message) - + # Pre-check payload capacity BEFORE encode (fail fast) from stegasoo.steganography import will_fit_by_mode - + payload_size = ( len(payload.data) if hasattr(payload, "data") else len(payload.encode("utf-8")) ) @@ -684,7 +705,7 @@ def register_stego_routes(app, **deps): if alt_check.get("fits"): error_msg += " - Try LSB mode instead." return _error_response(error_msg) - + # Build encode params for either sync or async encode_params = { "carrier_data": carrier_data, @@ -698,21 +719,21 @@ def register_stego_routes(app, **deps): "dct_color_mode": dct_color_mode if embed_mode == "dct" else "color", "channel_key": channel_key, } - + if payload_type == "file" and payload_file and payload_file.filename: encode_params["file_data"] = payload.data encode_params["file_name"] = payload.filename encode_params["file_mime"] = payload.mime_type else: encode_params["message"] = payload - + # ASYNC MODE: Start background job and return JSON if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_encode_job, job_id, encode_params) return jsonify({"job_id": job_id, "status": "pending"}) - + # SYNC MODE: Run inline (original behavior) if payload_type == "file" and payload_file and payload_file.filename: encode_result = subprocess_stego.encode( @@ -744,14 +765,14 @@ def register_stego_routes(app, **deps): dct_color_mode=dct_color_mode if embed_mode == "dct" else "color", channel_key=channel_key, ) - + # Check for subprocess errors if not encode_result.success: error_msg = encode_result.error or "Encoding failed" if "capacity" in error_msg.lower(): raise CapacityError(error_msg) raise StegasooError(error_msg) - + # Determine actual output format for filename and storage if embed_mode == "dct" and dct_output_format == "jpeg": output_ext = ".jpg" @@ -759,14 +780,18 @@ def register_stego_routes(app, **deps): else: output_ext = ".png" output_mime = "image/png" - + # Use filename from result or generate one filename = encode_result.filename if not filename: filename = generate_filename("stego", output_ext) - elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"): + elif ( + embed_mode == "dct" + and dct_output_format == "jpeg" + and filename.endswith(".png") + ): filename = filename[:-4] + ".jpg" - + # Store temporarily file_id = secrets.token_urlsafe(16) cleanup_temp_files() @@ -784,24 +809,22 @@ def register_stego_routes(app, **deps): "channel_fingerprint": encode_result.channel_fingerprint, }, ) - + return redirect(url_for("encode_result", file_id=file_id)) - + except CapacityError as e: return _error_response(str(e)) except StegasooError as e: return _error_response(str(e)) except Exception as e: return _error_response(f"Error: {e}") - + return render_template("stego/encode.html", has_qrcode_read=_HAS_QRCODE_READ) - - + # ============================================================================ # ENCODE PROGRESS ENDPOINTS (v4.1.2) # ============================================================================ - - + @app.route("/encode/status/") @login_required def encode_status(job_id): @@ -809,17 +832,16 @@ def register_stego_routes(app, **deps): job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 - + response = {"status": job.get("status", "unknown")} - + if job["status"] == "complete": response["file_id"] = job.get("file_id") elif job["status"] == "error": response["error"] = job.get("error", "Unknown error") - + return jsonify(response) - - + @app.route("/encode/progress/") @login_required def encode_progress(job_id): @@ -827,23 +849,22 @@ def register_stego_routes(app, **deps): progress = read_progress(job_id) if progress: return jsonify(progress) - + # No progress file yet - check job status job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 - + if job["status"] == "complete": return jsonify({"percent": 100, "phase": "complete"}) elif job["status"] == "error": return jsonify({"percent": 0, "phase": "error", "error": job.get("error")}) elif job["status"] == "pending": return jsonify({"percent": 0, "phase": "starting"}) - + # Running but no progress file yet return jsonify({"percent": 0, "phase": "initializing"}) - - + @app.route("/encode/result/") @login_required def encode_result(file_id): @@ -851,9 +872,9 @@ def register_stego_routes(app, **deps): if not file_info: flash("File expired or not found. Please encode again.", "error") return redirect(url_for("encode")) - + carrier_type = file_info.get("carrier_type", "image") - + # Generate thumbnail only for images thumbnail_data = None thumbnail_id = None @@ -862,12 +883,14 @@ def register_stego_routes(app, **deps): if thumbnail_data: thumbnail_id = f"{file_id}_thumb" temp_storage.save_thumbnail(thumbnail_id, thumbnail_data) - + return render_template( "encode_result.html", file_id=file_id, filename=file_info["filename"], - thumbnail_url=url_for("encode_thumbnail", thumb_id=thumbnail_id) if thumbnail_id else None, + thumbnail_url=( + url_for("encode_thumbnail", thumb_id=thumbnail_id) if thumbnail_id else None + ), embed_mode=file_info.get("embed_mode", "lsb"), output_format=file_info.get("output_format", "png"), color_mode=file_info.get("color_mode"), @@ -876,8 +899,7 @@ def register_stego_routes(app, **deps): channel_mode=file_info.get("channel_mode", "public"), channel_fingerprint=file_info.get("channel_fingerprint"), ) - - + @app.route("/encode/thumbnail/") @login_required def encode_thumbnail(thumb_id): @@ -885,10 +907,9 @@ def register_stego_routes(app, **deps): thumb_data = temp_storage.get_thumbnail(thumb_id) if not thumb_data: return "Thumbnail not found", 404 - + return send_file(io.BytesIO(thumb_data), mimetype="image/jpeg", as_attachment=False) - - + @app.route("/encode/download/") @login_required def encode_download(file_id): @@ -896,17 +917,16 @@ def register_stego_routes(app, **deps): if not file_info: flash("File expired or not found.", "error") return redirect(url_for("encode")) - + mime_type = file_info.get("mime_type", "image/png") - + return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=True, download_name=file_info["filename"], ) - - + @app.route("/encode/file/") @login_required def encode_file_route(file_id): @@ -914,42 +934,39 @@ def register_stego_routes(app, **deps): file_info = temp_storage.get_temp_file(file_id) if not file_info: return "Not found", 404 - + mime_type = file_info.get("mime_type", "image/png") - + return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=False, download_name=file_info["filename"], ) - - + @app.route("/encode/cleanup/", methods=["POST"]) @login_required def encode_cleanup(file_id): """Manually cleanup a file after sharing.""" temp_storage.delete_temp_file(file_id) - + # Also cleanup thumbnail if exists thumb_id = f"{file_id}_thumb" temp_storage.delete_thumbnail(thumb_id) - + return jsonify({"status": "ok"}) - - + # ============================================================================ # DECODE # ============================================================================ - - + def _run_decode_job(job_id: str, decode_params: dict) -> None: """Background thread function for async decode.""" progress_file = get_progress_file_path(job_id) - + try: _store_job(job_id, {"status": "running", "created": time.time()}) - + # Run decode with progress file decode_result = subprocess_stego.decode( stego_data=decode_params["stego_data"], @@ -962,7 +979,7 @@ def register_stego_routes(app, **deps): channel_key=decode_params.get("channel_key"), progress_file=progress_file, ) - + if not decode_result.success: _store_job( job_id, @@ -974,7 +991,7 @@ def register_stego_routes(app, **deps): }, ) return - + # Store result based on type if decode_result.is_file: file_id = secrets.token_urlsafe(16) @@ -1009,7 +1026,7 @@ def register_stego_routes(app, **deps): "created": time.time(), }, ) - + except Exception as e: _store_job( job_id, @@ -1021,15 +1038,14 @@ def register_stego_routes(app, **deps): ) finally: cleanup_progress_file(job_id) - - + def _run_decode_audio_job(job_id: str, decode_params: dict) -> None: """Background thread function for async audio decode (v4.3.0).""" progress_file = get_progress_file_path(job_id) - + try: _store_job(job_id, {"status": "running", "created": time.time()}) - + decode_result = subprocess_stego.decode_audio( stego_data=decode_params["stego_data"], reference_data=decode_params["ref_data"], @@ -1041,7 +1057,7 @@ def register_stego_routes(app, **deps): channel_key=decode_params.get("channel_key"), progress_file=progress_file, ) - + if not decode_result.success: _store_job( job_id, @@ -1053,7 +1069,7 @@ def register_stego_routes(app, **deps): }, ) return - + if decode_result.is_file: file_id = secrets.token_urlsafe(16) filename = decode_result.filename or "decoded_file" @@ -1087,7 +1103,7 @@ def register_stego_routes(app, **deps): "created": time.time(), }, ) - + except Exception as e: _store_job( job_id, @@ -1099,8 +1115,7 @@ def register_stego_routes(app, **deps): ) finally: cleanup_progress_file(job_id) - - + @app.route("/decode", methods=["GET", "POST"]) @login_required def decode_page(): @@ -1110,51 +1125,61 @@ def register_stego_routes(app, **deps): ref_photo = request.files.get("reference_photo") stego_image = request.files.get("stego_image") rsa_key_file = request.files.get("rsa_key") - + # Determine carrier type (v4.3.0) carrier_type = request.form.get("carrier_type", "image") - + if carrier_type == "audio": # ========== AUDIO DECODE PATH (v4.3.0) ========== if not HAS_AUDIO_SUPPORT: flash("Audio steganography is not available.", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + if not ref_photo or not stego_image: flash("Both reference photo and stego audio are required", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + if not allowed_image(ref_photo.filename): flash("Reference must be an image", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + if not allowed_audio(stego_image.filename): flash("Invalid audio format", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + passphrase = request.form.get("passphrase", "") pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") - + embed_mode = request.form.get("embed_mode", "audio_auto") if embed_mode not in ("audio_auto", "audio_lsb", "audio_spread"): embed_mode = "audio_auto" - + channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) - + if not passphrase: flash("Passphrase is required", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + ref_data = ref_photo.read() stego_data = stego_image.read() - + # Handle RSA key (same as image path) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False - + if rsa_key_pem: if is_compressed(rsa_key_pem): rsa_key_pem = decompress_data(rsa_key_pem) @@ -1170,31 +1195,42 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True else: flash("Could not extract RSA key from QR code image.", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: flash(result.error_message, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + if pin: result = validate_pin(pin) if not result.is_valid: flash(result.error_message, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - - key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + + key_password = ( + None if rsa_key_from_qr else (rsa_password if rsa_password else None) + ) + if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: flash(result.error_message, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + is_async = ( - request.form.get("async") == "true" or request.headers.get("X-Async") == "true" + request.form.get("async") == "true" + or request.headers.get("X-Async") == "true" ) - + decode_params = { "stego_data": stego_data, "ref_data": ref_data, @@ -1205,13 +1241,13 @@ def register_stego_routes(app, **deps): "embed_mode": embed_mode, "channel_key": channel_key, } - + if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_decode_audio_job, job_id, decode_params) return jsonify({"job_id": job_id, "status": "pending"}) - + # Sync audio decode decode_result = subprocess_stego.decode_audio( stego_data=stego_data, @@ -1223,7 +1259,7 @@ def register_stego_routes(app, **deps): embed_mode=embed_mode, channel_key=channel_key, ) - + if not decode_result.success: error_msg = decode_result.error or "Audio decoding failed" if ( @@ -1237,8 +1273,10 @@ def register_stego_routes(app, **deps): ) else: flash(error_msg, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + if decode_result.is_file: file_id = secrets.token_urlsafe(16) cleanup_temp_files() @@ -1266,47 +1304,47 @@ def register_stego_routes(app, **deps): decoded_message=decode_result.message, has_qrcode_read=_HAS_QRCODE_READ, ) - + # ========== IMAGE DECODE PATH (original) ========== if not ref_photo or not stego_image: flash("Both reference photo and stego image are required", "error") return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + # Get form data - v3.2.0: renamed from day_phrase to passphrase passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed pin = request.form.get("pin", "").strip() rsa_password = request.form.get("rsa_password", "") - + # NEW in v3.0 - Extraction mode embed_mode = request.form.get("embed_mode", "auto") if embed_mode not in ("auto", "lsb", "dct"): embed_mode = "auto" - + # NEW in v4.0.0 - Channel key channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto")) - + # Check DCT availability if embed_mode == "dct" and not has_dct_support(): flash("DCT mode requires scipy. Install with: pip install scipy", "error") return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + # v3.2.0: Removed date handling (no stego_date needed) - + # v3.2.0: Renamed from day_phrase if not passphrase: flash("Passphrase is required", "error") return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + # Read files ref_data = ref_photo.read() stego_data = stego_image.read() - + # Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5) rsa_key_data = None rsa_key_pem = request.form.get("rsa_key_pem", "").strip() rsa_key_qr = request.files.get("rsa_key_qr") rsa_key_from_qr = False - + if rsa_key_pem: # Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd) if is_compressed(rsa_key_pem): @@ -1323,36 +1361,42 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True else: flash("Could not extract RSA key from QR code image.", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + # Validate security factors result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: flash(result.error_message, "error") return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + # Validate PIN if provided if pin: result = validate_pin(pin) if not result.is_valid: flash(result.error_message, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + # Determine key password key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) - + # Validate RSA key if provided if rsa_key_data: result = validate_rsa_key(rsa_key_data, key_password) if not result.is_valid: flash(result.error_message, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + # Check for async mode (v4.1.5) is_async = ( request.form.get("async") == "true" or request.headers.get("X-Async") == "true" ) - + # Build decode params decode_params = { "stego_data": stego_data, @@ -1364,14 +1408,14 @@ def register_stego_routes(app, **deps): "embed_mode": embed_mode, "channel_key": channel_key, } - + # ASYNC MODE: Start background job and return JSON if is_async: job_id = generate_job_id() _store_job(job_id, {"status": "pending", "created": time.time()}) _executor.submit(_run_decode_job, job_id, decode_params) return jsonify({"job_id": job_id, "status": "pending"}) - + # SYNC MODE: Run inline (original behavior) # v4.0.0: Include channel_key parameter # Use subprocess-isolated decode to prevent crashes @@ -1385,23 +1429,28 @@ def register_stego_routes(app, **deps): embed_mode=embed_mode, channel_key=channel_key, # v4.0.0 ) - + # Check for subprocess errors if not decode_result.success: error_msg = decode_result.error or "Decoding failed" # Check for channel key related errors if "channel key" in error_msg.lower(): flash(error_msg, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - if "decrypt" in error_msg.lower() or decode_result.error_type == "DecryptionError": + return render_template( + "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + ) + if ( + "decrypt" in error_msg.lower() + or decode_result.error_type == "DecryptionError" + ): raise DecryptionError(error_msg) raise StegasooError(error_msg) - + if decode_result.is_file: # File content - store temporarily for download file_id = secrets.token_urlsafe(16) cleanup_temp_files() - + filename = decode_result.filename or "decoded_file" temp_storage.save_temp_file( file_id, @@ -1411,7 +1460,7 @@ def register_stego_routes(app, **deps): "mime_type": decode_result.mime_type, }, ) - + return render_template( "decode.html", decoded_file=True, @@ -1428,7 +1477,7 @@ def register_stego_routes(app, **deps): decoded_message=decode_result.message, has_qrcode_read=_HAS_QRCODE_READ, ) - + except InvalidMagicBytesError: flash( "This doesn't appear to be a Stegasoo image. Try a different mode (LSB/DCT).", @@ -1459,10 +1508,9 @@ def register_stego_routes(app, **deps): except Exception as e: flash(f"Error: {e}", "error") return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - + return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) - - + @app.route("/decode/download/") @login_required def decode_download(file_id): @@ -1471,22 +1519,20 @@ def register_stego_routes(app, **deps): if not file_info: flash("File expired or not found.", "error") return redirect(url_for("decode_page")) - + mime_type = file_info.get("mime_type", "application/octet-stream") - + return send_file( io.BytesIO(file_info["data"]), mimetype=mime_type, as_attachment=True, download_name=file_info["filename"], ) - - + # ============================================================================ # DECODE PROGRESS ENDPOINTS (v4.1.5) # ============================================================================ - - + @app.route("/decode/status/") @login_required def decode_status(job_id): @@ -1494,9 +1540,9 @@ def register_stego_routes(app, **deps): job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 - + response = {"status": job.get("status", "unknown")} - + if job["status"] == "complete": response["is_file"] = job.get("is_file", False) if job.get("is_file"): @@ -1509,10 +1555,9 @@ def register_stego_routes(app, **deps): elif job["status"] == "error": response["error"] = job.get("error", "Unknown error") response["error_type"] = job.get("error_type") - + return jsonify(response) - - + @app.route("/decode/progress/") @login_required def decode_progress(job_id): @@ -1520,23 +1565,22 @@ def register_stego_routes(app, **deps): progress = read_progress(job_id) if progress: return jsonify(progress) - + # No progress file yet - check job status job = _get_job(job_id) if not job: return jsonify({"error": "Job not found"}), 404 - + if job["status"] == "complete": return jsonify({"percent": 100, "phase": "complete"}) elif job["status"] == "error": return jsonify({"percent": 0, "phase": "error", "error": job.get("error")}) elif job["status"] == "pending": return jsonify({"percent": 0, "phase": "starting"}) - + # Running but no progress file yet return jsonify({"percent": 5, "phase": "reading"}) - - + @app.route("/decode/result/") @login_required def decode_result(job_id): @@ -1545,11 +1589,11 @@ def register_stego_routes(app, **deps): if not job: flash("Job not found or expired.", "error") return redirect(url_for("decode_page")) - + if job["status"] != "complete": flash("Decode not complete.", "error") return redirect(url_for("decode_page")) - + if job.get("is_file"): return render_template( "decode.html", @@ -1566,8 +1610,7 @@ def register_stego_routes(app, **deps): decoded_message=job.get("message"), has_qrcode_read=_HAS_QRCODE_READ, ) - - + @app.route("/about") def about(): from stegasoo.channel import get_channel_status @@ -1587,30 +1630,27 @@ def register_stego_routes(app, **deps): channel_source=channel_status.get("source"), is_admin=is_admin_user, ) - - + # ============================================================================ # TOOLS ROUTES (v4.1.0) # ============================================================================ - - + @app.route("/tools") @login_required def tools(): """Advanced tools page.""" return render_template("stego/tools.html", has_dct=has_dct_support()) - - + @app.route("/api/tools/capacity", methods=["POST"]) @login_required def api_tools_capacity(): """Calculate image capacity for steganography.""" from stegasoo.dct_steganography import estimate_capacity_comparison - + carrier = request.files.get("image") if not carrier: return jsonify({"success": False, "error": "No image provided"}), 400 - + try: image_data = carrier.read() result = estimate_capacity_comparison(image_data) @@ -1620,49 +1660,49 @@ def register_stego_routes(app, **deps): return jsonify(result) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 - - + @app.route("/api/tools/strip-metadata", methods=["POST"]) @login_required def api_tools_strip_metadata(): """Strip EXIF/metadata from image.""" import io - + from stegasoo.utils import strip_image_metadata - + image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 - + try: image_data = image_file.read() clean_data = strip_image_metadata(image_data, output_format="PNG") - + buffer = io.BytesIO(clean_data) filename = image_file.filename.rsplit(".", 1)[0] + "_clean.png" - - return send_file(buffer, mimetype="image/png", as_attachment=True, download_name=filename) + + return send_file( + buffer, mimetype="image/png", as_attachment=True, download_name=filename + ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 - - + @app.route("/api/tools/exif", methods=["POST"]) @login_required def api_tools_exif(): """Read EXIF metadata from image.""" from stegasoo.utils import read_image_exif - + image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 - + try: image_data = image_file.read() exif = read_image_exif(image_data) - + # Check if it's a JPEG (editable) or not is_jpeg = image_data[:2] == b"\xff\xd8" - + return jsonify( { "success": True, @@ -1674,34 +1714,33 @@ def register_stego_routes(app, **deps): ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 - - + @app.route("/api/tools/exif/update", methods=["POST"]) @login_required def api_tools_exif_update(): """Update EXIF fields in image.""" from stegasoo.utils import write_image_exif - + image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 - + # Get updates from form data updates_json = request.form.get("updates", "{}") try: import json - + updates = json.loads(updates_json) except json.JSONDecodeError: return jsonify({"success": False, "error": "Invalid updates JSON"}), 400 - + if not updates: return jsonify({"success": False, "error": "No updates provided"}), 400 - + try: image_data = image_file.read() updated_data = write_image_exif(image_data, updates) - + # Return as downloadable file buffer = io.BytesIO(updated_data) return send_file( @@ -1714,27 +1753,26 @@ def register_stego_routes(app, **deps): return jsonify({"success": False, "error": str(e)}), 400 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 - - + @app.route("/api/tools/exif/clear", methods=["POST"]) @login_required def api_tools_exif_clear(): """Remove all EXIF metadata from image.""" from stegasoo.utils import strip_image_metadata - + image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 - + # Get desired output format (default to PNG for lossless) output_format = request.form.get("format", "PNG").upper() if output_format not in ("PNG", "JPEG", "BMP"): output_format = "PNG" - + try: image_data = image_file.read() clean_data = strip_image_metadata(image_data, output_format=output_format) - + # Determine extension and mimetype ext_map = { "PNG": ("png", "image/png"), @@ -1742,7 +1780,7 @@ def register_stego_routes(app, **deps): "BMP": ("bmp", "image/bmp"), } ext, mimetype = ext_map.get(output_format, ("png", "image/png")) - + # Return as downloadable file stem = ( image_file.filename.rsplit(".", 1)[0] @@ -1758,8 +1796,7 @@ def register_stego_routes(app, **deps): ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 - - + @app.route("/api/tools/rotate", methods=["POST"]) @login_required def api_tools_rotate(): @@ -1767,31 +1804,33 @@ def register_stego_routes(app, **deps): import shutil import subprocess import tempfile - + from PIL import Image - + image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 - + rotation = int(request.form.get("rotation", 0)) flip_h = request.form.get("flip_h", "false").lower() == "true" flip_v = request.form.get("flip_v", "false").lower() == "true" - + try: image_data = image_file.read() img = Image.open(io.BytesIO(image_data)) original_format = img.format # JPEG, PNG, etc. img.close() - + # For JPEGs, use jpegtran for lossless rotation/flip (preserves DCT stego) has_jpegtran = shutil.which("jpegtran") is not None - use_jpegtran = original_format == "JPEG" and has_jpegtran and (rotation or flip_h or flip_v) - + use_jpegtran = ( + original_format == "JPEG" and has_jpegtran and (rotation or flip_h or flip_v) + ) + if use_jpegtran: # Chain jpegtran operations for lossless transformation current_data = image_data - + # Apply rotation first if rotation in (90, 180, 270): with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: @@ -1822,7 +1861,7 @@ def register_stego_routes(app, **deps): os.unlink(p) except OSError: pass - + # Apply horizontal flip if flip_h: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: @@ -1853,7 +1892,7 @@ def register_stego_routes(app, **deps): os.unlink(p) except OSError: pass - + # Apply vertical flip if flip_v: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: @@ -1884,24 +1923,24 @@ def register_stego_routes(app, **deps): os.unlink(p) except OSError: pass - + buffer = io.BytesIO(current_data) mimetype = "image/jpeg" ext = "jpg" else: # Fallback to PIL for non-JPEGs or when jpegtran unavailable img = Image.open(io.BytesIO(image_data)) - + # Apply rotation (PIL rotates counter-clockwise, so negate) if rotation: img = img.rotate(-rotation, expand=True) - + # Apply flips if flip_h: img = img.transpose(Image.FLIP_LEFT_RIGHT) if flip_v: img = img.transpose(Image.FLIP_TOP_BOTTOM) - + # Preserve original format buffer = io.BytesIO() if original_format == "JPEG": @@ -1915,7 +1954,7 @@ def register_stego_routes(app, **deps): mimetype = "image/png" ext = "png" buffer.seek(0) - + stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename @@ -1929,32 +1968,31 @@ def register_stego_routes(app, **deps): ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 - - + @app.route("/api/tools/compress", methods=["POST"]) @login_required def api_tools_compress(): """Compress image to JPEG at specified quality.""" from PIL import Image - + image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 - + quality = int(request.form.get("quality", 85)) quality = max(10, min(100, quality)) # Clamp to valid range - + try: img = Image.open(io.BytesIO(image_file.read())) - + # Convert to RGB if necessary (JPEG doesn't support alpha) if img.mode in ("RGBA", "LA", "P"): img = img.convert("RGB") - + buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=quality) buffer.seek(0) - + stem = ( image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename @@ -1968,22 +2006,21 @@ def register_stego_routes(app, **deps): ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 - - + @app.route("/api/tools/convert", methods=["POST"]) @login_required def api_tools_convert(): """Convert image to different format.""" from PIL import Image - + image_file = request.files.get("image") if not image_file: return jsonify({"success": False, "error": "No image provided"}), 400 - + output_format = request.form.get("format", "PNG").upper() quality = int(request.form.get("quality", 90)) quality = max(10, min(100, quality)) - + # Validate format format_map = { "PNG": ("png", "image/png"), @@ -1992,21 +2029,21 @@ def register_stego_routes(app, **deps): } if output_format not in format_map: return jsonify({"success": False, "error": f"Unsupported format: {output_format}"}), 400 - + try: img = Image.open(io.BytesIO(image_file.read())) - + # Convert to RGB for JPEG (no alpha) if output_format == "JPEG" and img.mode in ("RGBA", "LA", "P"): img = img.convert("RGB") - + buffer = io.BytesIO() save_kwargs = {"format": output_format} if output_format in ("JPEG", "WEBP"): save_kwargs["quality"] = quality img.save(buffer, **save_kwargs) buffer.seek(0) - + ext, mimetype = format_map[output_format] stem = ( image_file.filename.rsplit(".", 1)[0] @@ -2021,18 +2058,16 @@ def register_stego_routes(app, **deps): ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 - - + # Add these two test routes anywhere in app.py after the app = Flask(...) line: - - + @app.route("/test-capacity", methods=["POST"]) def test_capacity(): """Minimal capacity test - no stegasoo code, just PIL.""" carrier = request.files.get("carrier") if not carrier: return jsonify({"error": "No carrier image provided"}), 400 - + try: carrier_data = carrier.read() buffer = io.BytesIO(carrier_data) @@ -2041,11 +2076,11 @@ def register_stego_routes(app, **deps): fmt = img.format img.close() buffer.close() - + pixels = width * height lsb_bytes = (pixels * 3) // 8 dct_bytes = ((width // 8) * (height // 8) * 16) // 8 - 10 - + return jsonify( { "success": True, @@ -2058,15 +2093,14 @@ def register_stego_routes(app, **deps): ) except Exception as e: return jsonify({"error": str(e)}), 500 - - + @app.route("/test-capacity-nopil", methods=["POST"]) def test_capacity_nopil(): """Ultra-minimal test - no PIL, no stegasoo.""" carrier = request.files.get("carrier") if not carrier: return jsonify({"error": "No carrier image provided"}), 400 - + carrier_data = carrier.read() return jsonify( { @@ -2074,9 +2108,7 @@ def register_stego_routes(app, **deps): "data_size": len(carrier_data), } ) - - + # ============================================================================ # AUTHENTICATION ROUTES (v4.0.2) # ============================================================================ - diff --git a/src/soosef/federation/chain.py b/src/soosef/federation/chain.py index af5799c..7bb47b4 100644 --- a/src/soosef/federation/chain.py +++ b/src/soosef/federation/chain.py @@ -433,9 +433,7 @@ class ChainStore: if prev_record is not None: expected_hash = compute_record_hash(prev_record) if record.prev_hash != expected_hash: - raise ChainIntegrityError( - f"Record {record.chain_index}: prev_hash mismatch" - ) + raise ChainIntegrityError(f"Record {record.chain_index}: prev_hash mismatch") elif record.chain_index == 0: if record.prev_hash != ChainState.GENESIS_PREV_HASH: raise ChainIntegrityError("Genesis record has non-zero prev_hash") diff --git a/src/soosef/fieldkit/deadman.py b/src/soosef/fieldkit/deadman.py index 2036483..f88ad6a 100644 --- a/src/soosef/fieldkit/deadman.py +++ b/src/soosef/fieldkit/deadman.py @@ -47,9 +47,7 @@ class DeadmanSwitch: state["grace_hours"] = grace_hours state["last_checkin"] = datetime.now(UTC).isoformat() self._save_state(state) - logger.info( - "Dead man's switch armed: %dh interval, %dh grace", interval_hours, grace_hours - ) + logger.info("Dead man's switch armed: %dh interval, %dh grace", interval_hours, grace_hours) def disarm(self) -> None: """Disarm the dead man's switch.""" @@ -83,9 +81,7 @@ class DeadmanSwitch: if not state["armed"] or not state["last_checkin"]: return False last = datetime.fromisoformat(state["last_checkin"]) - deadline = last + timedelta( - hours=state["interval_hours"] + state["grace_hours"] - ) + deadline = last + timedelta(hours=state["interval_hours"] + state["grace_hours"]) return datetime.now(UTC) > deadline def status(self) -> dict: diff --git a/src/soosef/fieldkit/killswitch.py b/src/soosef/fieldkit/killswitch.py index 9f54368..9289921 100644 --- a/src/soosef/fieldkit/killswitch.py +++ b/src/soosef/fieldkit/killswitch.py @@ -92,14 +92,16 @@ def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") -> ] if scope == PurgeScope.ALL: - steps.extend([ - ("destroy_auth_db", lambda: _secure_delete_file(paths.AUTH_DB)), - ("destroy_attestation_log", lambda: _secure_delete_dir(paths.ATTESTATIONS_DIR)), - ("destroy_chain_data", lambda: _secure_delete_dir(paths.CHAIN_DIR)), - ("destroy_temp_files", lambda: _secure_delete_dir(paths.TEMP_DIR)), - ("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)), - ("clear_journald", _clear_system_logs), - ]) + steps.extend( + [ + ("destroy_auth_db", lambda: _secure_delete_file(paths.AUTH_DB)), + ("destroy_attestation_log", lambda: _secure_delete_dir(paths.ATTESTATIONS_DIR)), + ("destroy_chain_data", lambda: _secure_delete_dir(paths.CHAIN_DIR)), + ("destroy_temp_files", lambda: _secure_delete_dir(paths.TEMP_DIR)), + ("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)), + ("clear_journald", _clear_system_logs), + ] + ) for name, action in steps: try: diff --git a/src/soosef/keystore/manager.py b/src/soosef/keystore/manager.py index b773d25..aafd64b 100644 --- a/src/soosef/keystore/manager.py +++ b/src/soosef/keystore/manager.py @@ -135,9 +135,7 @@ class KeystoreManager: from datetime import UTC, datetime meta_path = self._identity_meta_path() - meta_path.write_text( - json.dumps({"created_at": datetime.now(UTC).isoformat()}, indent=None) - ) + meta_path.write_text(json.dumps({"created_at": datetime.now(UTC).isoformat()}, indent=None)) return self.get_identity() @@ -263,8 +261,7 @@ class KeystoreManager: from datetime import UTC, datetime (archive_dir / "rotation.txt").write_text( - f"Rotated at: {datetime.now(UTC).isoformat()}\n" - f"Old fingerprint: {old_fp}\n" + f"Rotated at: {datetime.now(UTC).isoformat()}\n" f"Old fingerprint: {old_fp}\n" ) new_key = self.generate_channel_key() diff --git a/tests/test_chain_security.py b/tests/test_chain_security.py index c909e3e..78a7ca0 100644 --- a/tests/test_chain_security.py +++ b/tests/test_chain_security.py @@ -52,8 +52,7 @@ def test_concurrent_append_no_fork(chain_dir: Path): # Every index must be unique (no fork) assert len(all_indices) == len(set(all_indices)), ( - f"Duplicate chain indices detected — chain forked! " - f"Indices: {sorted(all_indices)}" + f"Duplicate chain indices detected — chain forked! " f"Indices: {sorted(all_indices)}" ) # Indices should be 0..N-1 contiguous @@ -100,7 +99,7 @@ def test_truncated_chain_file(chain_dir: Path, private_key: Ed25519PrivateKey): # Truncate the file mid-record chain_file = chain_dir / "chain.bin" data = chain_file.read_bytes() - chain_file.write_bytes(data[:len(data) - 50]) + chain_file.write_bytes(data[: len(data) - 50]) store2 = ChainStore(chain_dir) records = list(store2._iter_raw()) diff --git a/tests/test_deadman_enforcement.py b/tests/test_deadman_enforcement.py index 6a2f117..d2697ac 100644 --- a/tests/test_deadman_enforcement.py +++ b/tests/test_deadman_enforcement.py @@ -22,7 +22,6 @@ from pathlib import Path import pytest from click.testing import CliRunner - # ── Fixtures ──────────────────────────────────────────────────────────────── @@ -205,7 +204,9 @@ def cli_runner(): return CliRunner() -def test_check_deadman_disarmed(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch): +def test_check_deadman_disarmed( + tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch +): """check-deadman exits 0 and prints helpful message when not armed.""" from soosef.fieldkit import deadman as deadman_mod from soosef.cli import main @@ -219,7 +220,9 @@ def test_check_deadman_disarmed(tmp_path: Path, cli_runner: CliRunner, monkeypat assert "not armed" in result.output -def test_check_deadman_armed_ok(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch): +def test_check_deadman_armed_ok( + tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch +): """check-deadman exits 0 when armed and check-in is current.""" from soosef.fieldkit import deadman as deadman_mod from soosef.cli import main @@ -241,7 +244,9 @@ def test_check_deadman_armed_ok(tmp_path: Path, cli_runner: CliRunner, monkeypat assert "OK" in result.output -def test_check_deadman_overdue_in_grace(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch): +def test_check_deadman_overdue_in_grace( + tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch +): """check-deadman exits 0 but prints OVERDUE warning when past interval but in grace.""" from soosef.fieldkit import deadman as deadman_mod from soosef.cli import main