Merge stegasoo (v4.3.0, steganography) and verisoo (v0.1.0, attestation) as subpackages under soosef.stegasoo and soosef.verisoo. This eliminates cross-repo coordination and enables atomic changes across the full stack. - Copy stegasoo (34 modules) and verisoo (15 modules) into src/soosef/ - Convert all verisoo absolute imports to relative imports - Rewire ~50 import sites across soosef code (cli, web, keystore, tests) - Replace stegasoo/verisoo pip deps with inlined code + pip extras (stego-dct, stego-audio, attest, web, api, cli, fieldkit, all, dev) - Add _availability.py for runtime feature detection - Add unified FastAPI mount point at soosef.api - Copy and adapt tests from both repos (155 pass, 1 skip) - Drop standalone CLI/web frontends; keep FastAPI as optional modules - Both source repos tagged pre-monorepo-consolidation on GitHub Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2114 lines
85 KiB
Python
2114 lines
85 KiB
Python
"""
|
|
Stegasoo encode/decode/tools routes.
|
|
|
|
Ported from stegasoo's frontends/web/app.py. These routes handle:
|
|
- Image encode with async progress tracking
|
|
- Audio encode (v4.3.0)
|
|
- Image/audio decode
|
|
- Encode result display
|
|
- Encode/decode progress polling
|
|
- Tools API (capacity, EXIF, rotate, compress, convert)
|
|
|
|
All routes use subprocess isolation via SubprocessStego for crash safety.
|
|
"""
|
|
|
|
import io
|
|
import mimetypes
|
|
import os
|
|
import secrets
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from flask import (
|
|
flash,
|
|
jsonify,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_file,
|
|
url_for,
|
|
)
|
|
from PIL import Image
|
|
|
|
|
|
def register_stego_routes(app, **deps):
|
|
"""Register all stegasoo encode/decode routes on the Flask app."""
|
|
|
|
# Unpack dependencies passed from app.py
|
|
login_required = deps["login_required"]
|
|
subprocess_stego = deps["subprocess_stego"]
|
|
temp_storage = deps["temp_storage"]
|
|
_has_qrcode_read = deps.get("has_qrcode_read", False)
|
|
|
|
from soosef.stegasoo import (
|
|
HAS_AUDIO_SUPPORT,
|
|
CapacityError,
|
|
DecryptionError,
|
|
FilePayload,
|
|
InvalidHeaderError,
|
|
InvalidMagicBytesError,
|
|
ReedSolomonError,
|
|
StegasooError,
|
|
generate_filename,
|
|
has_dct_support,
|
|
validate_file_payload,
|
|
validate_image,
|
|
validate_message,
|
|
validate_passphrase,
|
|
validate_pin,
|
|
validate_rsa_key,
|
|
validate_security_factors,
|
|
)
|
|
from soosef.stegasoo.channel import resolve_channel_key
|
|
from soosef.stegasoo.constants import (
|
|
TEMP_FILE_EXPIRY,
|
|
THUMBNAIL_QUALITY,
|
|
THUMBNAIL_SIZE,
|
|
)
|
|
from soosef.stegasoo.qr_utils import (
|
|
decompress_data,
|
|
extract_key_from_qr,
|
|
is_compressed,
|
|
)
|
|
from subprocess_stego import (
|
|
cleanup_progress_file,
|
|
generate_job_id,
|
|
get_progress_file_path,
|
|
read_progress,
|
|
)
|
|
|
|
# Async job management
|
|
_executor = ThreadPoolExecutor(max_workers=2)
|
|
_jobs = {}
|
|
_jobs_lock = threading.Lock()
|
|
|
|
def _store_job(job_id, data):
|
|
with _jobs_lock:
|
|
_jobs[job_id] = data
|
|
|
|
def _get_job(job_id):
|
|
with _jobs_lock:
|
|
return _jobs.get(job_id)
|
|
|
|
def _cleanup_old_jobs(max_age_seconds=3600):
|
|
now = time.time()
|
|
with _jobs_lock:
|
|
to_remove = [
|
|
jid for jid, data in _jobs.items() if now - data.get("created", 0) > max_age_seconds
|
|
]
|
|
for jid in to_remove:
|
|
cleanup_progress_file(jid)
|
|
del _jobs[jid]
|
|
|
|
def resolve_channel_key_form(value):
|
|
try:
|
|
result = resolve_channel_key(value)
|
|
return "auto" if result is None else ("none" if result == "" else result)
|
|
except (ValueError, FileNotFoundError):
|
|
return "auto"
|
|
|
|
def generate_thumbnail(image_data, size=THUMBNAIL_SIZE):
|
|
try:
|
|
with Image.open(io.BytesIO(image_data)) as img:
|
|
if img.mode in ("RGBA", "LA", "P"):
|
|
bg = Image.new("RGB", img.size, (255, 255, 255))
|
|
if img.mode == "P":
|
|
img = img.convert("RGBA")
|
|
bg.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
|
|
img = bg
|
|
elif img.mode != "RGB":
|
|
img = img.convert("RGB")
|
|
img.thumbnail(size, Image.Resampling.LANCZOS)
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="JPEG", quality=THUMBNAIL_QUALITY, optimize=True)
|
|
return buf.getvalue()
|
|
except Exception:
|
|
return None
|
|
|
|
def cleanup_temp_files():
|
|
temp_storage.cleanup_expired(TEMP_FILE_EXPIRY)
|
|
|
|
def allowed_image(fn):
|
|
return bool(
|
|
fn
|
|
and "." in fn
|
|
and fn.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif"}
|
|
)
|
|
|
|
def allowed_audio(fn):
|
|
return bool(
|
|
fn
|
|
and "." in fn
|
|
and fn.rsplit(".", 1)[1].lower()
|
|
in {"wav", "flac", "mp3", "ogg", "aac", "m4a", "aiff", "aif"}
|
|
)
|
|
|
|
def format_size(n):
|
|
if n < 1024:
|
|
return f"{n} B"
|
|
elif n < 1024 * 1024:
|
|
return f"{n/1024:.1f} KB"
|
|
else:
|
|
return f"{n/(1024*1024):.1f} MB"
|
|
|
|
# ── Routes below are extracted from stegasoo app.py ──
|
|
|
|
def _run_encode_job(job_id: str, encode_params: dict) -> None:
|
|
"""Background thread function for async encode."""
|
|
progress_file = get_progress_file_path(job_id)
|
|
|
|
try:
|
|
_store_job(job_id, {"status": "running", "created": time.time()})
|
|
|
|
# Run encode with progress file
|
|
if encode_params.get("file_data"):
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=encode_params["carrier_data"],
|
|
reference_data=encode_params["ref_data"],
|
|
file_data=encode_params["file_data"],
|
|
file_name=encode_params["file_name"],
|
|
file_mime=encode_params["file_mime"],
|
|
passphrase=encode_params["passphrase"],
|
|
pin=encode_params.get("pin"),
|
|
rsa_key_data=encode_params.get("rsa_key_data"),
|
|
rsa_password=encode_params.get("key_password"),
|
|
embed_mode=encode_params["embed_mode"],
|
|
dct_output_format=encode_params.get("dct_output_format", "png"),
|
|
dct_color_mode=encode_params.get("dct_color_mode", "color"),
|
|
channel_key=encode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
)
|
|
else:
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=encode_params["carrier_data"],
|
|
reference_data=encode_params["ref_data"],
|
|
message=encode_params["message"],
|
|
passphrase=encode_params["passphrase"],
|
|
pin=encode_params.get("pin"),
|
|
rsa_key_data=encode_params.get("rsa_key_data"),
|
|
rsa_password=encode_params.get("key_password"),
|
|
embed_mode=encode_params["embed_mode"],
|
|
dct_output_format=encode_params.get("dct_output_format", "png"),
|
|
dct_color_mode=encode_params.get("dct_color_mode", "color"),
|
|
channel_key=encode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
)
|
|
|
|
if not encode_result.success:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": encode_result.error or "Encoding failed",
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
return
|
|
|
|
# Determine output format
|
|
embed_mode = encode_params["embed_mode"]
|
|
dct_output_format = encode_params.get("dct_output_format", "png")
|
|
dct_color_mode = encode_params.get("dct_color_mode", "color")
|
|
|
|
if embed_mode == "dct" and dct_output_format == "jpeg":
|
|
output_ext = ".jpg"
|
|
output_mime = "image/jpeg"
|
|
else:
|
|
output_ext = ".png"
|
|
output_mime = "image/png"
|
|
|
|
filename = encode_result.filename
|
|
if not filename:
|
|
filename = generate_filename("stego", output_ext)
|
|
elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"):
|
|
filename = filename[:-4] + ".jpg"
|
|
|
|
# Store result
|
|
file_id = secrets.token_urlsafe(16)
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
encode_result.stego_data,
|
|
{
|
|
"filename": filename,
|
|
"embed_mode": embed_mode,
|
|
"output_format": dct_output_format if embed_mode == "dct" else "png",
|
|
"color_mode": dct_color_mode if embed_mode == "dct" else None,
|
|
"mime_type": output_mime,
|
|
"channel_mode": encode_result.channel_mode,
|
|
"channel_fingerprint": encode_result.channel_fingerprint,
|
|
},
|
|
)
|
|
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"file_id": file_id,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": str(e),
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
finally:
|
|
cleanup_progress_file(job_id)
|
|
|
|
def _run_encode_audio_job(job_id: str, encode_params: dict) -> None:
|
|
"""Background thread function for async audio encode (v4.3.0)."""
|
|
progress_file = get_progress_file_path(job_id)
|
|
|
|
try:
|
|
_store_job(job_id, {"status": "running", "created": time.time()})
|
|
|
|
if encode_params.get("file_data"):
|
|
encode_result = subprocess_stego.encode_audio(
|
|
carrier_data=encode_params["carrier_data"],
|
|
reference_data=encode_params["ref_data"],
|
|
file_data=encode_params["file_data"],
|
|
file_name=encode_params["file_name"],
|
|
file_mime=encode_params["file_mime"],
|
|
passphrase=encode_params["passphrase"],
|
|
pin=encode_params.get("pin"),
|
|
rsa_key_data=encode_params.get("rsa_key_data"),
|
|
rsa_password=encode_params.get("key_password"),
|
|
embed_mode=encode_params["embed_mode"],
|
|
channel_key=encode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
chip_tier=encode_params.get("chip_tier"),
|
|
)
|
|
else:
|
|
encode_result = subprocess_stego.encode_audio(
|
|
carrier_data=encode_params["carrier_data"],
|
|
reference_data=encode_params["ref_data"],
|
|
message=encode_params.get("message"),
|
|
passphrase=encode_params["passphrase"],
|
|
pin=encode_params.get("pin"),
|
|
rsa_key_data=encode_params.get("rsa_key_data"),
|
|
rsa_password=encode_params.get("key_password"),
|
|
embed_mode=encode_params["embed_mode"],
|
|
channel_key=encode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
chip_tier=encode_params.get("chip_tier"),
|
|
)
|
|
|
|
if not encode_result.success:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": encode_result.error or "Audio encoding failed",
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
return
|
|
|
|
filename = generate_filename("stego_audio", ".wav")
|
|
file_id = secrets.token_urlsafe(16)
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
encode_result.stego_data,
|
|
{
|
|
"filename": filename,
|
|
"embed_mode": encode_params["embed_mode"],
|
|
"carrier_type": "audio",
|
|
"mime_type": "audio/wav",
|
|
"channel_mode": encode_result.channel_mode,
|
|
"channel_fingerprint": encode_result.channel_fingerprint,
|
|
},
|
|
)
|
|
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"file_id": file_id,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": str(e),
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
finally:
|
|
cleanup_progress_file(job_id)
|
|
|
|
@app.route("/encode", methods=["GET", "POST"])
|
|
@login_required
|
|
def encode():
|
|
if request.method == "POST":
|
|
# Check if async mode requested
|
|
is_async = (
|
|
request.form.get("async") == "true" or request.headers.get("X-Async") == "true"
|
|
)
|
|
|
|
def _error_response(msg):
|
|
"""Return error as JSON (async) or HTML flash (sync)."""
|
|
if is_async:
|
|
return jsonify({"error": msg}), 400
|
|
flash(msg, "error")
|
|
return render_template("stego/encode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
try:
|
|
# Get files
|
|
ref_photo = request.files.get("reference_photo")
|
|
carrier = request.files.get("carrier")
|
|
rsa_key_file = request.files.get("rsa_key")
|
|
payload_file = request.files.get("payload_file")
|
|
|
|
# Determine carrier type (v4.3.0)
|
|
carrier_type = request.form.get("carrier_type", "image")
|
|
|
|
if carrier_type == "audio":
|
|
# ========== AUDIO ENCODE PATH (v4.3.0) ==========
|
|
if not HAS_AUDIO_SUPPORT:
|
|
return _error_response(
|
|
"Audio steganography is not available. Install audio dependencies."
|
|
)
|
|
|
|
if not ref_photo or not carrier:
|
|
return _error_response(
|
|
"Both reference photo and audio carrier are required"
|
|
)
|
|
|
|
if not allowed_image(ref_photo.filename):
|
|
return _error_response("Reference must be an image (PNG, JPG, BMP)")
|
|
|
|
if not allowed_audio(carrier.filename):
|
|
return _error_response(
|
|
"Invalid audio format. Use WAV, FLAC, MP3, OGG, AAC, or M4A"
|
|
)
|
|
|
|
# Get form data
|
|
message = request.form.get("message", "")
|
|
passphrase = request.form.get("passphrase", "")
|
|
pin = request.form.get("pin", "").strip()
|
|
rsa_password = request.form.get("rsa_password", "")
|
|
payload_type = request.form.get("payload_type", "text")
|
|
|
|
embed_mode = request.form.get("embed_mode", "audio_lsb")
|
|
if embed_mode not in ("audio_lsb", "audio_spread"):
|
|
embed_mode = "audio_lsb"
|
|
|
|
# Chip tier for spread spectrum (None = default)
|
|
chip_tier_str = request.form.get("chip_tier")
|
|
chip_tier = None
|
|
if chip_tier_str and chip_tier_str.isdigit():
|
|
chip_tier = int(chip_tier_str)
|
|
if chip_tier not in (0, 1, 2):
|
|
chip_tier = None
|
|
|
|
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
|
|
|
|
# Determine payload
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
file_data = payload_file.read()
|
|
result = validate_file_payload(file_data, payload_file.filename)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
mime_type, _ = mimetypes.guess_type(payload_file.filename)
|
|
payload = FilePayload(
|
|
data=file_data,
|
|
filename=payload_file.filename,
|
|
mime_type=mime_type,
|
|
)
|
|
else:
|
|
result = validate_message(message)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
payload = message
|
|
|
|
if not passphrase:
|
|
return _error_response("Passphrase is required")
|
|
|
|
result = validate_passphrase(passphrase)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
if result.warning:
|
|
flash(result.warning, "warning")
|
|
|
|
ref_data = ref_photo.read()
|
|
carrier_data = carrier.read()
|
|
|
|
# Handle RSA key (same as image path)
|
|
rsa_key_data = None
|
|
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
|
|
rsa_key_qr = request.files.get("rsa_key_qr")
|
|
rsa_key_from_qr = False
|
|
|
|
if rsa_key_pem:
|
|
if is_compressed(rsa_key_pem):
|
|
rsa_key_pem = decompress_data(rsa_key_pem)
|
|
rsa_key_data = rsa_key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
elif rsa_key_file and rsa_key_file.filename:
|
|
rsa_key_data = rsa_key_file.read()
|
|
elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read:
|
|
qr_image_data = rsa_key_qr.read()
|
|
key_pem = extract_key_from_qr(qr_image_data)
|
|
if key_pem:
|
|
rsa_key_data = key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
else:
|
|
return _error_response("Could not extract RSA key from QR code image.")
|
|
|
|
result = validate_security_factors(pin, rsa_key_data)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
if pin:
|
|
result = validate_pin(pin)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
key_password = (
|
|
None if rsa_key_from_qr else (rsa_password if rsa_password else None)
|
|
)
|
|
|
|
if rsa_key_data:
|
|
result = validate_rsa_key(rsa_key_data, key_password)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Build audio encode params
|
|
encode_params = {
|
|
"carrier_data": carrier_data,
|
|
"ref_data": ref_data,
|
|
"passphrase": passphrase,
|
|
"pin": pin if pin else None,
|
|
"rsa_key_data": rsa_key_data,
|
|
"key_password": key_password,
|
|
"embed_mode": embed_mode,
|
|
"channel_key": channel_key,
|
|
"carrier_type": "audio",
|
|
"chip_tier": chip_tier,
|
|
}
|
|
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
encode_params["file_data"] = payload.data
|
|
encode_params["file_name"] = payload.filename
|
|
encode_params["file_mime"] = payload.mime_type
|
|
else:
|
|
encode_params["message"] = payload
|
|
|
|
if is_async:
|
|
job_id = generate_job_id()
|
|
_store_job(job_id, {"status": "pending", "created": time.time()})
|
|
_executor.submit(_run_encode_audio_job, job_id, encode_params)
|
|
return jsonify({"job_id": job_id, "status": "pending"})
|
|
|
|
# Sync audio encode
|
|
if encode_params.get("file_data"):
|
|
encode_result = subprocess_stego.encode_audio(
|
|
carrier_data=carrier_data,
|
|
reference_data=ref_data,
|
|
file_data=encode_params["file_data"],
|
|
file_name=encode_params["file_name"],
|
|
file_mime=encode_params["file_mime"],
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
channel_key=channel_key,
|
|
chip_tier=chip_tier,
|
|
)
|
|
else:
|
|
encode_result = subprocess_stego.encode_audio(
|
|
carrier_data=carrier_data,
|
|
reference_data=ref_data,
|
|
message=payload,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
channel_key=channel_key,
|
|
chip_tier=chip_tier,
|
|
)
|
|
|
|
if not encode_result.success:
|
|
error_msg = encode_result.error or "Audio encoding failed"
|
|
return _error_response(error_msg)
|
|
|
|
filename = generate_filename("stego_audio", ".wav")
|
|
file_id = secrets.token_urlsafe(16)
|
|
cleanup_temp_files()
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
encode_result.stego_data,
|
|
{
|
|
"filename": filename,
|
|
"embed_mode": embed_mode,
|
|
"carrier_type": "audio",
|
|
"mime_type": "audio/wav",
|
|
"channel_mode": encode_result.channel_mode,
|
|
"channel_fingerprint": encode_result.channel_fingerprint,
|
|
},
|
|
)
|
|
|
|
return redirect(url_for("encode_result", file_id=file_id))
|
|
|
|
# ========== IMAGE ENCODE PATH (original) ==========
|
|
if not ref_photo or not carrier:
|
|
return _error_response("Both reference photo and carrier image are required")
|
|
|
|
if not allowed_image(ref_photo.filename) or not allowed_image(carrier.filename):
|
|
return _error_response("Invalid file type. Use PNG, JPG, or BMP")
|
|
|
|
# Get form data - v3.2.0: renamed from day_phrase to passphrase
|
|
message = request.form.get("message", "")
|
|
passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed
|
|
pin = request.form.get("pin", "").strip()
|
|
rsa_password = request.form.get("rsa_password", "")
|
|
payload_type = request.form.get("payload_type", "text")
|
|
|
|
# NEW in v3.0 - Embedding mode
|
|
embed_mode = request.form.get("embed_mode", "lsb")
|
|
if embed_mode not in ("lsb", "dct"):
|
|
embed_mode = "lsb"
|
|
|
|
# NEW in v3.0.1 - DCT output format
|
|
dct_output_format = request.form.get("dct_output_format", "png")
|
|
if dct_output_format not in ("png", "jpeg"):
|
|
dct_output_format = "png"
|
|
|
|
# NEW in v3.0.1 - DCT color mode
|
|
dct_color_mode = request.form.get("dct_color_mode", "color")
|
|
if dct_color_mode not in ("grayscale", "color"):
|
|
dct_color_mode = "color"
|
|
|
|
# NEW in v4.0.0 - Channel key
|
|
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
|
|
|
|
# Check DCT availability
|
|
if embed_mode == "dct" and not has_dct_support():
|
|
return _error_response(
|
|
"DCT mode requires scipy. Install with: pip install scipy"
|
|
)
|
|
|
|
# Determine payload
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
# File payload
|
|
file_data = payload_file.read()
|
|
|
|
result = validate_file_payload(file_data, payload_file.filename)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
mime_type, _ = mimetypes.guess_type(payload_file.filename)
|
|
payload = FilePayload(
|
|
data=file_data, filename=payload_file.filename, mime_type=mime_type
|
|
)
|
|
else:
|
|
# Text message
|
|
result = validate_message(message)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
payload = message
|
|
|
|
# v3.2.0: Renamed from day_phrase
|
|
if not passphrase:
|
|
return _error_response("Passphrase is required")
|
|
|
|
# v3.2.0: Validate passphrase
|
|
result = validate_passphrase(passphrase)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Show warning if passphrase is short
|
|
if result.warning:
|
|
flash(result.warning, "warning")
|
|
|
|
# Read files
|
|
ref_data = ref_photo.read()
|
|
carrier_data = carrier.read()
|
|
|
|
# Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5)
|
|
rsa_key_data = None
|
|
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
|
|
rsa_key_qr = request.files.get("rsa_key_qr")
|
|
rsa_key_from_qr = False
|
|
|
|
if rsa_key_pem:
|
|
# Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd)
|
|
if is_compressed(rsa_key_pem):
|
|
rsa_key_pem = decompress_data(rsa_key_pem)
|
|
rsa_key_data = rsa_key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
elif rsa_key_file and rsa_key_file.filename:
|
|
rsa_key_data = rsa_key_file.read()
|
|
elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read:
|
|
qr_image_data = rsa_key_qr.read()
|
|
key_pem = extract_key_from_qr(qr_image_data)
|
|
if key_pem:
|
|
rsa_key_data = key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
else:
|
|
return _error_response("Could not extract RSA key from QR code image.")
|
|
|
|
# Validate security factors
|
|
result = validate_security_factors(pin, rsa_key_data)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Validate PIN if provided
|
|
if pin:
|
|
result = validate_pin(pin)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Determine key password
|
|
key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
|
|
|
|
# Validate RSA key if provided
|
|
if rsa_key_data:
|
|
result = validate_rsa_key(rsa_key_data, key_password)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Validate carrier image
|
|
result = validate_image(carrier_data, "Carrier image")
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Pre-check payload capacity BEFORE encode (fail fast)
|
|
from soosef.stegasoo.steganography import will_fit_by_mode
|
|
|
|
payload_size = (
|
|
len(payload.data) if hasattr(payload, "data") else len(payload.encode("utf-8"))
|
|
)
|
|
fit_check = will_fit_by_mode(payload_size, carrier_data, embed_mode=embed_mode)
|
|
if not fit_check.get("fits", True):
|
|
error_msg = (
|
|
f"Payload too large for {embed_mode.upper()} mode. "
|
|
f"Payload: {payload_size:,} bytes, "
|
|
f"Capacity: {fit_check.get('capacity', 0):,} bytes"
|
|
)
|
|
# Suggest alternative mode
|
|
if embed_mode == "dct":
|
|
alt_check = will_fit_by_mode(payload_size, carrier_data, embed_mode="lsb")
|
|
if alt_check.get("fits"):
|
|
error_msg += " - Try LSB mode instead."
|
|
return _error_response(error_msg)
|
|
|
|
# Build encode params for either sync or async
|
|
encode_params = {
|
|
"carrier_data": carrier_data,
|
|
"ref_data": ref_data,
|
|
"passphrase": passphrase,
|
|
"pin": pin if pin else None,
|
|
"rsa_key_data": rsa_key_data,
|
|
"key_password": key_password,
|
|
"embed_mode": embed_mode,
|
|
"dct_output_format": dct_output_format if embed_mode == "dct" else "png",
|
|
"dct_color_mode": dct_color_mode if embed_mode == "dct" else "color",
|
|
"channel_key": channel_key,
|
|
}
|
|
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
encode_params["file_data"] = payload.data
|
|
encode_params["file_name"] = payload.filename
|
|
encode_params["file_mime"] = payload.mime_type
|
|
else:
|
|
encode_params["message"] = payload
|
|
|
|
# ASYNC MODE: Start background job and return JSON
|
|
if is_async:
|
|
job_id = generate_job_id()
|
|
_store_job(job_id, {"status": "pending", "created": time.time()})
|
|
_executor.submit(_run_encode_job, job_id, encode_params)
|
|
return jsonify({"job_id": job_id, "status": "pending"})
|
|
|
|
# SYNC MODE: Run inline (original behavior)
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=carrier_data,
|
|
reference_data=ref_data,
|
|
file_data=payload.data,
|
|
file_name=payload.filename,
|
|
file_mime=payload.mime_type,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
dct_output_format=dct_output_format if embed_mode == "dct" else "png",
|
|
dct_color_mode=dct_color_mode if embed_mode == "dct" else "color",
|
|
channel_key=channel_key,
|
|
)
|
|
else:
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=carrier_data,
|
|
reference_data=ref_data,
|
|
message=payload,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
dct_output_format=dct_output_format if embed_mode == "dct" else "png",
|
|
dct_color_mode=dct_color_mode if embed_mode == "dct" else "color",
|
|
channel_key=channel_key,
|
|
)
|
|
|
|
# Check for subprocess errors
|
|
if not encode_result.success:
|
|
error_msg = encode_result.error or "Encoding failed"
|
|
if "capacity" in error_msg.lower():
|
|
raise CapacityError(error_msg)
|
|
raise StegasooError(error_msg)
|
|
|
|
# Determine actual output format for filename and storage
|
|
if embed_mode == "dct" and dct_output_format == "jpeg":
|
|
output_ext = ".jpg"
|
|
output_mime = "image/jpeg"
|
|
else:
|
|
output_ext = ".png"
|
|
output_mime = "image/png"
|
|
|
|
# Use filename from result or generate one
|
|
filename = encode_result.filename
|
|
if not filename:
|
|
filename = generate_filename("stego", output_ext)
|
|
elif (
|
|
embed_mode == "dct"
|
|
and dct_output_format == "jpeg"
|
|
and filename.endswith(".png")
|
|
):
|
|
filename = filename[:-4] + ".jpg"
|
|
|
|
# Store temporarily
|
|
file_id = secrets.token_urlsafe(16)
|
|
cleanup_temp_files()
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
encode_result.stego_data,
|
|
{
|
|
"filename": filename,
|
|
"embed_mode": embed_mode,
|
|
"output_format": dct_output_format if embed_mode == "dct" else "png",
|
|
"color_mode": dct_color_mode if embed_mode == "dct" else None,
|
|
"mime_type": output_mime,
|
|
# Channel info (v4.0.0)
|
|
"channel_mode": encode_result.channel_mode,
|
|
"channel_fingerprint": encode_result.channel_fingerprint,
|
|
},
|
|
)
|
|
|
|
return redirect(url_for("encode_result", file_id=file_id))
|
|
|
|
except CapacityError as e:
|
|
return _error_response(str(e))
|
|
except StegasooError as e:
|
|
return _error_response(str(e))
|
|
except Exception as e:
|
|
return _error_response(f"Error: {e}")
|
|
|
|
return render_template("stego/encode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
# ============================================================================
|
|
# ENCODE PROGRESS ENDPOINTS (v4.1.2)
|
|
# ============================================================================
|
|
|
|
@app.route("/encode/status/<job_id>")
|
|
@login_required
|
|
def encode_status(job_id):
|
|
"""Get the status of an async encode job."""
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
response = {"status": job.get("status", "unknown")}
|
|
|
|
if job["status"] == "complete":
|
|
response["file_id"] = job.get("file_id")
|
|
elif job["status"] == "error":
|
|
response["error"] = job.get("error", "Unknown error")
|
|
|
|
return jsonify(response)
|
|
|
|
@app.route("/encode/progress/<job_id>")
|
|
@login_required
|
|
def encode_progress(job_id):
|
|
"""Get the progress of an async encode job."""
|
|
progress = read_progress(job_id)
|
|
if progress:
|
|
return jsonify(progress)
|
|
|
|
# No progress file yet - check job status
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
if job["status"] == "complete":
|
|
return jsonify({"percent": 100, "phase": "complete"})
|
|
elif job["status"] == "error":
|
|
return jsonify({"percent": 0, "phase": "error", "error": job.get("error")})
|
|
elif job["status"] == "pending":
|
|
return jsonify({"percent": 0, "phase": "starting"})
|
|
|
|
# Running but no progress file yet
|
|
return jsonify({"percent": 0, "phase": "initializing"})
|
|
|
|
@app.route("/encode/result/<file_id>")
|
|
@login_required
|
|
def encode_result(file_id):
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
flash("File expired or not found. Please encode again.", "error")
|
|
return redirect(url_for("encode"))
|
|
|
|
carrier_type = file_info.get("carrier_type", "image")
|
|
|
|
# Generate thumbnail only for images
|
|
thumbnail_data = None
|
|
thumbnail_id = None
|
|
if carrier_type != "audio":
|
|
thumbnail_data = generate_thumbnail(file_info["data"])
|
|
if thumbnail_data:
|
|
thumbnail_id = f"{file_id}_thumb"
|
|
temp_storage.save_thumbnail(thumbnail_id, thumbnail_data)
|
|
|
|
return render_template(
|
|
"encode_result.html",
|
|
file_id=file_id,
|
|
filename=file_info["filename"],
|
|
thumbnail_url=(
|
|
url_for("encode_thumbnail", thumb_id=thumbnail_id) if thumbnail_id else None
|
|
),
|
|
embed_mode=file_info.get("embed_mode", "lsb"),
|
|
output_format=file_info.get("output_format", "png"),
|
|
color_mode=file_info.get("color_mode"),
|
|
carrier_type=carrier_type,
|
|
# Channel info (v4.0.0)
|
|
channel_mode=file_info.get("channel_mode", "public"),
|
|
channel_fingerprint=file_info.get("channel_fingerprint"),
|
|
)
|
|
|
|
@app.route("/encode/thumbnail/<thumb_id>")
|
|
@login_required
|
|
def encode_thumbnail(thumb_id):
|
|
"""Serve thumbnail image."""
|
|
thumb_data = temp_storage.get_thumbnail(thumb_id)
|
|
if not thumb_data:
|
|
return "Thumbnail not found", 404
|
|
|
|
return send_file(io.BytesIO(thumb_data), mimetype="image/jpeg", as_attachment=False)
|
|
|
|
@app.route("/encode/download/<file_id>")
|
|
@login_required
|
|
def encode_download(file_id):
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
flash("File expired or not found.", "error")
|
|
return redirect(url_for("encode"))
|
|
|
|
mime_type = file_info.get("mime_type", "image/png")
|
|
|
|
return send_file(
|
|
io.BytesIO(file_info["data"]),
|
|
mimetype=mime_type,
|
|
as_attachment=True,
|
|
download_name=file_info["filename"],
|
|
)
|
|
|
|
@app.route("/encode/file/<file_id>")
|
|
@login_required
|
|
def encode_file_route(file_id):
|
|
"""Serve file for Web Share API."""
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
return "Not found", 404
|
|
|
|
mime_type = file_info.get("mime_type", "image/png")
|
|
|
|
return send_file(
|
|
io.BytesIO(file_info["data"]),
|
|
mimetype=mime_type,
|
|
as_attachment=False,
|
|
download_name=file_info["filename"],
|
|
)
|
|
|
|
@app.route("/encode/cleanup/<file_id>", methods=["POST"])
|
|
@login_required
|
|
def encode_cleanup(file_id):
|
|
"""Manually cleanup a file after sharing."""
|
|
temp_storage.delete_temp_file(file_id)
|
|
|
|
# Also cleanup thumbnail if exists
|
|
thumb_id = f"{file_id}_thumb"
|
|
temp_storage.delete_thumbnail(thumb_id)
|
|
|
|
return jsonify({"status": "ok"})
|
|
|
|
# ============================================================================
|
|
# DECODE
|
|
# ============================================================================
|
|
|
|
def _run_decode_job(job_id: str, decode_params: dict) -> None:
|
|
"""Background thread function for async decode."""
|
|
progress_file = get_progress_file_path(job_id)
|
|
|
|
try:
|
|
_store_job(job_id, {"status": "running", "created": time.time()})
|
|
|
|
# Run decode with progress file
|
|
decode_result = subprocess_stego.decode(
|
|
stego_data=decode_params["stego_data"],
|
|
reference_data=decode_params["ref_data"],
|
|
passphrase=decode_params["passphrase"],
|
|
pin=decode_params.get("pin"),
|
|
rsa_key_data=decode_params.get("rsa_key_data"),
|
|
rsa_password=decode_params.get("rsa_password"),
|
|
embed_mode=decode_params.get("embed_mode", "auto"),
|
|
channel_key=decode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
)
|
|
|
|
if not decode_result.success:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": decode_result.error or "Decoding failed",
|
|
"error_type": decode_result.error_type,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
return
|
|
|
|
# Store result based on type
|
|
if decode_result.is_file:
|
|
file_id = secrets.token_urlsafe(16)
|
|
filename = decode_result.filename or "decoded_file"
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
decode_result.file_data,
|
|
{
|
|
"filename": filename,
|
|
"mime_type": decode_result.mime_type,
|
|
},
|
|
)
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"file_id": file_id,
|
|
"is_file": True,
|
|
"filename": filename,
|
|
"file_size": len(decode_result.file_data),
|
|
"mime_type": decode_result.mime_type,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
else:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"is_file": False,
|
|
"message": decode_result.message,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": str(e),
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
finally:
|
|
cleanup_progress_file(job_id)
|
|
|
|
def _run_decode_audio_job(job_id: str, decode_params: dict) -> None:
|
|
"""Background thread function for async audio decode (v4.3.0)."""
|
|
progress_file = get_progress_file_path(job_id)
|
|
|
|
try:
|
|
_store_job(job_id, {"status": "running", "created": time.time()})
|
|
|
|
decode_result = subprocess_stego.decode_audio(
|
|
stego_data=decode_params["stego_data"],
|
|
reference_data=decode_params["ref_data"],
|
|
passphrase=decode_params["passphrase"],
|
|
pin=decode_params.get("pin"),
|
|
rsa_key_data=decode_params.get("rsa_key_data"),
|
|
rsa_password=decode_params.get("rsa_password"),
|
|
embed_mode=decode_params.get("embed_mode", "audio_auto"),
|
|
channel_key=decode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
)
|
|
|
|
if not decode_result.success:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": decode_result.error or "Audio decoding failed",
|
|
"error_type": decode_result.error_type,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
return
|
|
|
|
if decode_result.is_file:
|
|
file_id = secrets.token_urlsafe(16)
|
|
filename = decode_result.filename or "decoded_file"
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
decode_result.file_data,
|
|
{
|
|
"filename": filename,
|
|
"mime_type": decode_result.mime_type,
|
|
},
|
|
)
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"file_id": file_id,
|
|
"is_file": True,
|
|
"filename": filename,
|
|
"file_size": len(decode_result.file_data),
|
|
"mime_type": decode_result.mime_type,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
else:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"is_file": False,
|
|
"message": decode_result.message,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": str(e),
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
finally:
|
|
cleanup_progress_file(job_id)
|
|
|
|
@app.route("/decode", methods=["GET", "POST"])
|
|
@login_required
|
|
def decode_page():
|
|
if request.method == "POST":
|
|
try:
|
|
# Get files
|
|
ref_photo = request.files.get("reference_photo")
|
|
stego_image = request.files.get("stego_image")
|
|
rsa_key_file = request.files.get("rsa_key")
|
|
|
|
# Determine carrier type (v4.3.0)
|
|
carrier_type = request.form.get("carrier_type", "image")
|
|
|
|
if carrier_type == "audio":
|
|
# ========== AUDIO DECODE PATH (v4.3.0) ==========
|
|
if not HAS_AUDIO_SUPPORT:
|
|
flash("Audio steganography is not available.", "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
if not ref_photo or not stego_image:
|
|
flash("Both reference photo and stego audio are required", "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
if not allowed_image(ref_photo.filename):
|
|
flash("Reference must be an image", "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
if not allowed_audio(stego_image.filename):
|
|
flash("Invalid audio format", "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
passphrase = request.form.get("passphrase", "")
|
|
pin = request.form.get("pin", "").strip()
|
|
rsa_password = request.form.get("rsa_password", "")
|
|
|
|
embed_mode = request.form.get("embed_mode", "audio_auto")
|
|
if embed_mode not in ("audio_auto", "audio_lsb", "audio_spread"):
|
|
embed_mode = "audio_auto"
|
|
|
|
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
|
|
|
|
if not passphrase:
|
|
flash("Passphrase is required", "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
ref_data = ref_photo.read()
|
|
stego_data = stego_image.read()
|
|
|
|
# Handle RSA key (same as image path)
|
|
rsa_key_data = None
|
|
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
|
|
rsa_key_qr = request.files.get("rsa_key_qr")
|
|
rsa_key_from_qr = False
|
|
|
|
if rsa_key_pem:
|
|
if is_compressed(rsa_key_pem):
|
|
rsa_key_pem = decompress_data(rsa_key_pem)
|
|
rsa_key_data = rsa_key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
elif rsa_key_file and rsa_key_file.filename:
|
|
rsa_key_data = rsa_key_file.read()
|
|
elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read:
|
|
qr_image_data = rsa_key_qr.read()
|
|
key_pem = extract_key_from_qr(qr_image_data)
|
|
if key_pem:
|
|
rsa_key_data = key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
else:
|
|
flash("Could not extract RSA key from QR code image.", "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
result = validate_security_factors(pin, rsa_key_data)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
if pin:
|
|
result = validate_pin(pin)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
key_password = (
|
|
None if rsa_key_from_qr else (rsa_password if rsa_password else None)
|
|
)
|
|
|
|
if rsa_key_data:
|
|
result = validate_rsa_key(rsa_key_data, key_password)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
is_async = (
|
|
request.form.get("async") == "true"
|
|
or request.headers.get("X-Async") == "true"
|
|
)
|
|
|
|
decode_params = {
|
|
"stego_data": stego_data,
|
|
"ref_data": ref_data,
|
|
"passphrase": passphrase,
|
|
"pin": pin if pin else None,
|
|
"rsa_key_data": rsa_key_data,
|
|
"rsa_password": key_password,
|
|
"embed_mode": embed_mode,
|
|
"channel_key": channel_key,
|
|
}
|
|
|
|
if is_async:
|
|
job_id = generate_job_id()
|
|
_store_job(job_id, {"status": "pending", "created": time.time()})
|
|
_executor.submit(_run_decode_audio_job, job_id, decode_params)
|
|
return jsonify({"job_id": job_id, "status": "pending"})
|
|
|
|
# Sync audio decode
|
|
decode_result = subprocess_stego.decode_audio(
|
|
stego_data=stego_data,
|
|
reference_data=ref_data,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
channel_key=channel_key,
|
|
)
|
|
|
|
if not decode_result.success:
|
|
error_msg = decode_result.error or "Audio decoding failed"
|
|
if (
|
|
"decrypt" in error_msg.lower()
|
|
or decode_result.error_type == "DecryptionError"
|
|
):
|
|
flash(
|
|
"Wrong credentials. Double-check your reference photo, "
|
|
"passphrase, PIN, and channel key.",
|
|
"warning",
|
|
)
|
|
else:
|
|
flash(error_msg, "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
if decode_result.is_file:
|
|
file_id = secrets.token_urlsafe(16)
|
|
cleanup_temp_files()
|
|
filename = decode_result.filename or "decoded_file"
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
decode_result.file_data,
|
|
{
|
|
"filename": filename,
|
|
"mime_type": decode_result.mime_type,
|
|
},
|
|
)
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_file=True,
|
|
file_id=file_id,
|
|
filename=filename,
|
|
file_size=format_size(len(decode_result.file_data)),
|
|
mime_type=decode_result.mime_type,
|
|
has_qrcode_read=_has_qrcode_read,
|
|
)
|
|
else:
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_message=decode_result.message,
|
|
has_qrcode_read=_has_qrcode_read,
|
|
)
|
|
|
|
# ========== IMAGE DECODE PATH (original) ==========
|
|
if not ref_photo or not stego_image:
|
|
flash("Both reference photo and stego image are required", "error")
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
# Get form data - v3.2.0: renamed from day_phrase to passphrase
|
|
passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed
|
|
pin = request.form.get("pin", "").strip()
|
|
rsa_password = request.form.get("rsa_password", "")
|
|
|
|
# NEW in v3.0 - Extraction mode
|
|
embed_mode = request.form.get("embed_mode", "auto")
|
|
if embed_mode not in ("auto", "lsb", "dct"):
|
|
embed_mode = "auto"
|
|
|
|
# NEW in v4.0.0 - Channel key
|
|
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
|
|
|
|
# Check DCT availability
|
|
if embed_mode == "dct" and not has_dct_support():
|
|
flash("DCT mode requires scipy. Install with: pip install scipy", "error")
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
# v3.2.0: Removed date handling (no stego_date needed)
|
|
|
|
# v3.2.0: Renamed from day_phrase
|
|
if not passphrase:
|
|
flash("Passphrase is required", "error")
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
# Read files
|
|
ref_data = ref_photo.read()
|
|
stego_data = stego_image.read()
|
|
|
|
# Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5)
|
|
rsa_key_data = None
|
|
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
|
|
rsa_key_qr = request.files.get("rsa_key_qr")
|
|
rsa_key_from_qr = False
|
|
|
|
if rsa_key_pem:
|
|
# Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd)
|
|
if is_compressed(rsa_key_pem):
|
|
rsa_key_pem = decompress_data(rsa_key_pem)
|
|
rsa_key_data = rsa_key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
elif rsa_key_file and rsa_key_file.filename:
|
|
rsa_key_data = rsa_key_file.read()
|
|
elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read:
|
|
qr_image_data = rsa_key_qr.read()
|
|
key_pem = extract_key_from_qr(qr_image_data)
|
|
if key_pem:
|
|
rsa_key_data = key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
else:
|
|
flash("Could not extract RSA key from QR code image.", "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
# Validate security factors
|
|
result = validate_security_factors(pin, rsa_key_data)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
# Validate PIN if provided
|
|
if pin:
|
|
result = validate_pin(pin)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
# Determine key password
|
|
key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
|
|
|
|
# Validate RSA key if provided
|
|
if rsa_key_data:
|
|
result = validate_rsa_key(rsa_key_data, key_password)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
|
|
# Check for async mode (v4.1.5)
|
|
is_async = (
|
|
request.form.get("async") == "true" or request.headers.get("X-Async") == "true"
|
|
)
|
|
|
|
# Build decode params
|
|
decode_params = {
|
|
"stego_data": stego_data,
|
|
"ref_data": ref_data,
|
|
"passphrase": passphrase,
|
|
"pin": pin if pin else None,
|
|
"rsa_key_data": rsa_key_data,
|
|
"rsa_password": key_password,
|
|
"embed_mode": embed_mode,
|
|
"channel_key": channel_key,
|
|
}
|
|
|
|
# ASYNC MODE: Start background job and return JSON
|
|
if is_async:
|
|
job_id = generate_job_id()
|
|
_store_job(job_id, {"status": "pending", "created": time.time()})
|
|
_executor.submit(_run_decode_job, job_id, decode_params)
|
|
return jsonify({"job_id": job_id, "status": "pending"})
|
|
|
|
# SYNC MODE: Run inline (original behavior)
|
|
# v4.0.0: Include channel_key parameter
|
|
# Use subprocess-isolated decode to prevent crashes
|
|
decode_result = subprocess_stego.decode(
|
|
stego_data=stego_data,
|
|
reference_data=ref_data,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
channel_key=channel_key, # v4.0.0
|
|
)
|
|
|
|
# Check for subprocess errors
|
|
if not decode_result.success:
|
|
error_msg = decode_result.error or "Decoding failed"
|
|
# Check for channel key related errors
|
|
if "channel key" in error_msg.lower():
|
|
flash(error_msg, "error")
|
|
return render_template(
|
|
"stego/decode.html", has_qrcode_read=_has_qrcode_read
|
|
)
|
|
if (
|
|
"decrypt" in error_msg.lower()
|
|
or decode_result.error_type == "DecryptionError"
|
|
):
|
|
raise DecryptionError(error_msg)
|
|
raise StegasooError(error_msg)
|
|
|
|
if decode_result.is_file:
|
|
# File content - store temporarily for download
|
|
file_id = secrets.token_urlsafe(16)
|
|
cleanup_temp_files()
|
|
|
|
filename = decode_result.filename or "decoded_file"
|
|
temp_storage.save_temp_file(
|
|
file_id,
|
|
decode_result.file_data,
|
|
{
|
|
"filename": filename,
|
|
"mime_type": decode_result.mime_type,
|
|
},
|
|
)
|
|
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_file=True,
|
|
file_id=file_id,
|
|
filename=filename,
|
|
file_size=format_size(len(decode_result.file_data)),
|
|
mime_type=decode_result.mime_type,
|
|
has_qrcode_read=_has_qrcode_read,
|
|
)
|
|
else:
|
|
# Text content
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_message=decode_result.message,
|
|
has_qrcode_read=_has_qrcode_read,
|
|
)
|
|
|
|
except InvalidMagicBytesError:
|
|
flash(
|
|
"This doesn't appear to be a Stegasoo image. Try a different mode (LSB/DCT).",
|
|
"warning",
|
|
)
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
except ReedSolomonError:
|
|
flash(
|
|
"Image too corrupted to decode. It may have been re-saved or compressed.",
|
|
"error",
|
|
)
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
except InvalidHeaderError:
|
|
flash(
|
|
"Invalid or corrupted header. The image may have been modified.",
|
|
"error",
|
|
)
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
except DecryptionError:
|
|
flash(
|
|
"Wrong credentials. Double-check your reference photo, passphrase, PIN, and channel key.",
|
|
"warning",
|
|
)
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
except StegasooError as e:
|
|
flash(str(e), "error")
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
except Exception as e:
|
|
flash(f"Error: {e}", "error")
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read)
|
|
|
|
@app.route("/decode/download/<file_id>")
|
|
@login_required
|
|
def decode_download(file_id):
|
|
"""Download decoded file."""
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
flash("File expired or not found.", "error")
|
|
return redirect(url_for("decode_page"))
|
|
|
|
mime_type = file_info.get("mime_type", "application/octet-stream")
|
|
|
|
return send_file(
|
|
io.BytesIO(file_info["data"]),
|
|
mimetype=mime_type,
|
|
as_attachment=True,
|
|
download_name=file_info["filename"],
|
|
)
|
|
|
|
# ============================================================================
|
|
# DECODE PROGRESS ENDPOINTS (v4.1.5)
|
|
# ============================================================================
|
|
|
|
@app.route("/decode/status/<job_id>")
|
|
@login_required
|
|
def decode_status(job_id):
|
|
"""Get the status of an async decode job."""
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
response = {"status": job.get("status", "unknown")}
|
|
|
|
if job["status"] == "complete":
|
|
response["is_file"] = job.get("is_file", False)
|
|
if job.get("is_file"):
|
|
response["file_id"] = job.get("file_id")
|
|
response["filename"] = job.get("filename")
|
|
response["file_size"] = job.get("file_size")
|
|
response["mime_type"] = job.get("mime_type")
|
|
else:
|
|
response["message"] = job.get("message")
|
|
elif job["status"] == "error":
|
|
response["error"] = job.get("error", "Unknown error")
|
|
response["error_type"] = job.get("error_type")
|
|
|
|
return jsonify(response)
|
|
|
|
@app.route("/decode/progress/<job_id>")
|
|
@login_required
|
|
def decode_progress(job_id):
|
|
"""Get the progress of an async decode job."""
|
|
progress = read_progress(job_id)
|
|
if progress:
|
|
return jsonify(progress)
|
|
|
|
# No progress file yet - check job status
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
if job["status"] == "complete":
|
|
return jsonify({"percent": 100, "phase": "complete"})
|
|
elif job["status"] == "error":
|
|
return jsonify({"percent": 0, "phase": "error", "error": job.get("error")})
|
|
elif job["status"] == "pending":
|
|
return jsonify({"percent": 0, "phase": "starting"})
|
|
|
|
# Running but no progress file yet
|
|
return jsonify({"percent": 5, "phase": "reading"})
|
|
|
|
@app.route("/decode/result/<job_id>")
|
|
@login_required
|
|
def decode_result(job_id):
|
|
"""Get the result page for an async decode job."""
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
flash("Job not found or expired.", "error")
|
|
return redirect(url_for("decode_page"))
|
|
|
|
if job["status"] != "complete":
|
|
flash("Decode not complete.", "error")
|
|
return redirect(url_for("decode_page"))
|
|
|
|
if job.get("is_file"):
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_file=True,
|
|
file_id=job.get("file_id"),
|
|
filename=job.get("filename"),
|
|
file_size=format_size(job.get("file_size", 0)),
|
|
mime_type=job.get("mime_type"),
|
|
has_qrcode_read=_has_qrcode_read,
|
|
)
|
|
else:
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_message=job.get("message"),
|
|
has_qrcode_read=_has_qrcode_read,
|
|
)
|
|
|
|
@app.route("/about")
|
|
def about():
|
|
from auth import get_current_user
|
|
from soosef.stegasoo import has_argon2
|
|
from soosef.stegasoo.channel import get_channel_status
|
|
|
|
channel_status = get_channel_status()
|
|
current_user = get_current_user()
|
|
is_admin_user = current_user.is_admin if current_user else False
|
|
|
|
return render_template(
|
|
"stego/about.html",
|
|
has_argon2=has_argon2(),
|
|
has_qrcode_read=_has_qrcode_read,
|
|
channel_configured=channel_status["configured"],
|
|
channel_fingerprint=channel_status.get("fingerprint"),
|
|
channel_source=channel_status.get("source"),
|
|
is_admin=is_admin_user,
|
|
)
|
|
|
|
# ============================================================================
|
|
# TOOLS ROUTES (v4.1.0)
|
|
# ============================================================================
|
|
|
|
@app.route("/tools")
|
|
@login_required
|
|
def tools():
|
|
"""Advanced tools page."""
|
|
return render_template("stego/tools.html", has_dct=has_dct_support())
|
|
|
|
@app.route("/api/tools/capacity", methods=["POST"])
|
|
@login_required
|
|
def api_tools_capacity():
|
|
"""Calculate image capacity for steganography."""
|
|
from soosef.stegasoo.dct_steganography import estimate_capacity_comparison
|
|
|
|
carrier = request.files.get("image")
|
|
if not carrier:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
try:
|
|
image_data = carrier.read()
|
|
result = estimate_capacity_comparison(image_data)
|
|
result["success"] = True
|
|
result["filename"] = carrier.filename
|
|
result["megapixels"] = round((result["width"] * result["height"]) / 1_000_000, 2)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
@app.route("/api/tools/strip-metadata", methods=["POST"])
|
|
@login_required
|
|
def api_tools_strip_metadata():
|
|
"""Strip EXIF/metadata from image."""
|
|
import io
|
|
|
|
from soosef.stegasoo.utils import strip_image_metadata
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
clean_data = strip_image_metadata(image_data, output_format="PNG")
|
|
|
|
buffer = io.BytesIO(clean_data)
|
|
filename = image_file.filename.rsplit(".", 1)[0] + "_clean.png"
|
|
|
|
return send_file(
|
|
buffer, mimetype="image/png", as_attachment=True, download_name=filename
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
@app.route("/api/tools/exif", methods=["POST"])
|
|
@login_required
|
|
def api_tools_exif():
|
|
"""Read EXIF metadata from image."""
|
|
from soosef.stegasoo.utils import read_image_exif
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
exif = read_image_exif(image_data)
|
|
|
|
# Check if it's a JPEG (editable) or not
|
|
is_jpeg = image_data[:2] == b"\xff\xd8"
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"filename": image_file.filename,
|
|
"exif": exif,
|
|
"editable": is_jpeg,
|
|
"field_count": len(exif),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
@app.route("/api/tools/exif/update", methods=["POST"])
|
|
@login_required
|
|
def api_tools_exif_update():
|
|
"""Update EXIF fields in image."""
|
|
from soosef.stegasoo.utils import write_image_exif
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
# Get updates from form data
|
|
updates_json = request.form.get("updates", "{}")
|
|
try:
|
|
import json
|
|
|
|
updates = json.loads(updates_json)
|
|
except json.JSONDecodeError:
|
|
return jsonify({"success": False, "error": "Invalid updates JSON"}), 400
|
|
|
|
if not updates:
|
|
return jsonify({"success": False, "error": "No updates provided"}), 400
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
updated_data = write_image_exif(image_data, updates)
|
|
|
|
# Return as downloadable file
|
|
buffer = io.BytesIO(updated_data)
|
|
return send_file(
|
|
buffer,
|
|
mimetype="image/jpeg",
|
|
as_attachment=True,
|
|
download_name=f"exif_{image_file.filename}",
|
|
)
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
@app.route("/api/tools/exif/clear", methods=["POST"])
|
|
@login_required
|
|
def api_tools_exif_clear():
|
|
"""Remove all EXIF metadata from image."""
|
|
from soosef.stegasoo.utils import strip_image_metadata
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
# Get desired output format (default to PNG for lossless)
|
|
output_format = request.form.get("format", "PNG").upper()
|
|
if output_format not in ("PNG", "JPEG", "BMP"):
|
|
output_format = "PNG"
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
clean_data = strip_image_metadata(image_data, output_format=output_format)
|
|
|
|
# Determine extension and mimetype
|
|
ext_map = {
|
|
"PNG": ("png", "image/png"),
|
|
"JPEG": ("jpg", "image/jpeg"),
|
|
"BMP": ("bmp", "image/bmp"),
|
|
}
|
|
ext, mimetype = ext_map.get(output_format, ("png", "image/png"))
|
|
|
|
# Return as downloadable file
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
buffer = io.BytesIO(clean_data)
|
|
return send_file(
|
|
buffer,
|
|
mimetype=mimetype,
|
|
as_attachment=True,
|
|
download_name=f"{stem}_clean.{ext}",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
@app.route("/api/tools/rotate", methods=["POST"])
|
|
@login_required
|
|
def api_tools_rotate():
|
|
"""Rotate and/or flip an image, using lossless jpegtran for JPEGs."""
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
|
|
from PIL import Image
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
rotation = int(request.form.get("rotation", 0))
|
|
flip_h = request.form.get("flip_h", "false").lower() == "true"
|
|
flip_v = request.form.get("flip_v", "false").lower() == "true"
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
img = Image.open(io.BytesIO(image_data))
|
|
original_format = img.format # JPEG, PNG, etc.
|
|
img.close()
|
|
|
|
# For JPEGs, use jpegtran for lossless rotation/flip (preserves DCT stego)
|
|
has_jpegtran = shutil.which("jpegtran") is not None
|
|
use_jpegtran = (
|
|
original_format == "JPEG" and has_jpegtran and (rotation or flip_h or flip_v)
|
|
)
|
|
|
|
if use_jpegtran:
|
|
# Chain jpegtran operations for lossless transformation
|
|
current_data = image_data
|
|
|
|
# Apply rotation first
|
|
if rotation in (90, 180, 270):
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(current_data)
|
|
input_path = f.name
|
|
output_path = tempfile.mktemp(suffix=".jpg")
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"jpegtran",
|
|
"-rotate",
|
|
str(rotation),
|
|
"-copy",
|
|
"all",
|
|
"-outfile",
|
|
output_path,
|
|
input_path,
|
|
],
|
|
capture_output=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode == 0:
|
|
with open(output_path, "rb") as f:
|
|
current_data = f.read()
|
|
finally:
|
|
for p in [input_path, output_path]:
|
|
try:
|
|
os.unlink(p)
|
|
except OSError:
|
|
pass
|
|
|
|
# Apply horizontal flip
|
|
if flip_h:
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(current_data)
|
|
input_path = f.name
|
|
output_path = tempfile.mktemp(suffix=".jpg")
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"jpegtran",
|
|
"-flip",
|
|
"horizontal",
|
|
"-copy",
|
|
"all",
|
|
"-outfile",
|
|
output_path,
|
|
input_path,
|
|
],
|
|
capture_output=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode == 0:
|
|
with open(output_path, "rb") as f:
|
|
current_data = f.read()
|
|
finally:
|
|
for p in [input_path, output_path]:
|
|
try:
|
|
os.unlink(p)
|
|
except OSError:
|
|
pass
|
|
|
|
# Apply vertical flip
|
|
if flip_v:
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(current_data)
|
|
input_path = f.name
|
|
output_path = tempfile.mktemp(suffix=".jpg")
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"jpegtran",
|
|
"-flip",
|
|
"vertical",
|
|
"-copy",
|
|
"all",
|
|
"-outfile",
|
|
output_path,
|
|
input_path,
|
|
],
|
|
capture_output=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode == 0:
|
|
with open(output_path, "rb") as f:
|
|
current_data = f.read()
|
|
finally:
|
|
for p in [input_path, output_path]:
|
|
try:
|
|
os.unlink(p)
|
|
except OSError:
|
|
pass
|
|
|
|
buffer = io.BytesIO(current_data)
|
|
mimetype = "image/jpeg"
|
|
ext = "jpg"
|
|
else:
|
|
# Fallback to PIL for non-JPEGs or when jpegtran unavailable
|
|
img = Image.open(io.BytesIO(image_data))
|
|
|
|
# Apply rotation (PIL rotates counter-clockwise, so negate)
|
|
if rotation:
|
|
img = img.rotate(-rotation, expand=True)
|
|
|
|
# Apply flips
|
|
if flip_h:
|
|
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
|
if flip_v:
|
|
img = img.transpose(Image.FLIP_TOP_BOTTOM)
|
|
|
|
# Preserve original format
|
|
buffer = io.BytesIO()
|
|
if original_format == "JPEG":
|
|
if img.mode in ("RGBA", "P"):
|
|
img = img.convert("RGB")
|
|
img.save(buffer, format="JPEG", quality=95)
|
|
mimetype = "image/jpeg"
|
|
ext = "jpg"
|
|
else:
|
|
img.save(buffer, format="PNG")
|
|
mimetype = "image/png"
|
|
ext = "png"
|
|
buffer.seek(0)
|
|
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
return send_file(
|
|
buffer,
|
|
mimetype=mimetype,
|
|
as_attachment=True,
|
|
download_name=f"{stem}_transformed.{ext}",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
@app.route("/api/tools/compress", methods=["POST"])
|
|
@login_required
|
|
def api_tools_compress():
|
|
"""Compress image to JPEG at specified quality."""
|
|
from PIL import Image
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
quality = int(request.form.get("quality", 85))
|
|
quality = max(10, min(100, quality)) # Clamp to valid range
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(image_file.read()))
|
|
|
|
# Convert to RGB if necessary (JPEG doesn't support alpha)
|
|
if img.mode in ("RGBA", "LA", "P"):
|
|
img = img.convert("RGB")
|
|
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="JPEG", quality=quality)
|
|
buffer.seek(0)
|
|
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
return send_file(
|
|
buffer,
|
|
mimetype="image/jpeg",
|
|
as_attachment=True,
|
|
download_name=f"{stem}_q{quality}.jpg",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
@app.route("/api/tools/convert", methods=["POST"])
|
|
@login_required
|
|
def api_tools_convert():
|
|
"""Convert image to different format."""
|
|
from PIL import Image
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
output_format = request.form.get("format", "PNG").upper()
|
|
quality = int(request.form.get("quality", 90))
|
|
quality = max(10, min(100, quality))
|
|
|
|
# Validate format
|
|
format_map = {
|
|
"PNG": ("png", "image/png"),
|
|
"JPEG": ("jpg", "image/jpeg"),
|
|
"WEBP": ("webp", "image/webp"),
|
|
}
|
|
if output_format not in format_map:
|
|
return jsonify({"success": False, "error": f"Unsupported format: {output_format}"}), 400
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(image_file.read()))
|
|
|
|
# Convert to RGB for JPEG (no alpha)
|
|
if output_format == "JPEG" and img.mode in ("RGBA", "LA", "P"):
|
|
img = img.convert("RGB")
|
|
|
|
buffer = io.BytesIO()
|
|
save_kwargs = {"format": output_format}
|
|
if output_format in ("JPEG", "WEBP"):
|
|
save_kwargs["quality"] = quality
|
|
img.save(buffer, **save_kwargs)
|
|
buffer.seek(0)
|
|
|
|
ext, mimetype = format_map[output_format]
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
return send_file(
|
|
buffer,
|
|
mimetype=mimetype,
|
|
as_attachment=True,
|
|
download_name=f"{stem}.{ext}",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
# Add these two test routes anywhere in app.py after the app = Flask(...) line:
|
|
|
|
@app.route("/test-capacity", methods=["POST"])
|
|
def test_capacity():
|
|
"""Minimal capacity test - no stegasoo code, just PIL."""
|
|
carrier = request.files.get("carrier")
|
|
if not carrier:
|
|
return jsonify({"error": "No carrier image provided"}), 400
|
|
|
|
try:
|
|
carrier_data = carrier.read()
|
|
buffer = io.BytesIO(carrier_data)
|
|
img = Image.open(buffer)
|
|
width, height = img.size
|
|
fmt = img.format
|
|
img.close()
|
|
buffer.close()
|
|
|
|
pixels = width * height
|
|
lsb_bytes = (pixels * 3) // 8
|
|
dct_bytes = ((width // 8) * (height // 8) * 16) // 8 - 10
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"width": width,
|
|
"height": height,
|
|
"format": fmt,
|
|
"lsb_kb": round(lsb_bytes / 1024, 1),
|
|
"dct_kb": round(dct_bytes / 1024, 1),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/test-capacity-nopil", methods=["POST"])
|
|
def test_capacity_nopil():
|
|
"""Ultra-minimal test - no PIL, no stegasoo."""
|
|
carrier = request.files.get("carrier")
|
|
if not carrier:
|
|
return jsonify({"error": "No carrier image provided"}), 400
|
|
|
|
carrier_data = carrier.read()
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"data_size": len(carrier_data),
|
|
}
|
|
)
|
|
|
|
# ============================================================================
|
|
# AUTHENTICATION ROUTES (v4.0.2)
|
|
# ============================================================================
|