fieldwitness/frontends/web/stego_routes.py
Aaron D. Lee 5c74a5f4aa
Some checks failed
CI / lint (push) Failing after 36s
CI / typecheck (push) Failing after 37s
CI / test (push) Failing after 24s
Fix black formatting and target Python 3.12 in CI
Reformat 8 files and add --target-version py312 to avoid
3.13 AST parsing issues with Python 3.12 container.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 18:26:32 -04:00

2115 lines
85 KiB
Python

"""
Stegasoo encode/decode/tools routes.
Ported from stegasoo's frontends/web/app.py. These routes handle:
- Image encode with async progress tracking
- Audio encode (v4.3.0)
- Image/audio decode
- Encode result display
- Encode/decode progress polling
- Tools API (capacity, EXIF, rotate, compress, convert)
All routes use subprocess isolation via SubprocessStego for crash safety.
"""
import io
import mimetypes
import secrets
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from flask import (
flash,
jsonify,
redirect,
render_template,
request,
send_file,
url_for,
)
from PIL import Image
def register_stego_routes(app, **deps):
"""Register all stegasoo encode/decode routes on the Flask app."""
# Unpack dependencies passed from app.py
login_required = deps["login_required"]
subprocess_stego = deps["subprocess_stego"]
temp_storage = deps["temp_storage"]
_HAS_QRCODE_READ = deps.get("has_qrcode_read", False)
from stegasoo import (
HAS_AUDIO_SUPPORT,
CapacityError,
DecryptionError,
FilePayload,
InvalidHeaderError,
InvalidMagicBytesError,
ReedSolomonError,
StegasooError,
generate_filename,
has_dct_support,
validate_file_payload,
validate_image,
validate_message,
validate_passphrase,
validate_pin,
validate_rsa_key,
validate_security_factors,
)
from stegasoo.constants import (
MAX_FILE_PAYLOAD_SIZE,
MAX_MESSAGE_CHARS,
TEMP_FILE_EXPIRY,
THUMBNAIL_QUALITY,
THUMBNAIL_SIZE,
)
from stegasoo.channel import resolve_channel_key
from stegasoo.qr_utils import (
decompress_data,
extract_key_from_qr,
is_compressed,
)
from subprocess_stego import (
cleanup_progress_file,
generate_job_id,
get_progress_file_path,
read_progress,
)
# Async job management
_executor = ThreadPoolExecutor(max_workers=2)
_jobs = {}
_jobs_lock = threading.Lock()
def _store_job(job_id, data):
with _jobs_lock:
_jobs[job_id] = data
def _get_job(job_id):
with _jobs_lock:
return _jobs.get(job_id)
def _cleanup_old_jobs(max_age_seconds=3600):
now = time.time()
with _jobs_lock:
to_remove = [
jid for jid, data in _jobs.items() if now - data.get("created", 0) > max_age_seconds
]
for jid in to_remove:
cleanup_progress_file(jid)
del _jobs[jid]
def resolve_channel_key_form(value):
try:
result = resolve_channel_key(value)
return "auto" if result is None else ("none" if result == "" else result)
except (ValueError, FileNotFoundError):
return "auto"
def generate_thumbnail(image_data, size=THUMBNAIL_SIZE):
try:
with Image.open(io.BytesIO(image_data)) as img:
if img.mode in ("RGBA", "LA", "P"):
bg = Image.new("RGB", img.size, (255, 255, 255))
if img.mode == "P":
img = img.convert("RGBA")
bg.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
img.thumbnail(size, Image.Resampling.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=THUMBNAIL_QUALITY, optimize=True)
return buf.getvalue()
except Exception:
return None
def cleanup_temp_files():
temp_storage.cleanup_expired(TEMP_FILE_EXPIRY)
def allowed_image(fn):
return bool(
fn
and "." in fn
and fn.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif"}
)
def allowed_audio(fn):
return bool(
fn
and "." in fn
and fn.rsplit(".", 1)[1].lower()
in {"wav", "flac", "mp3", "ogg", "aac", "m4a", "aiff", "aif"}
)
def format_size(n):
if n < 1024:
return f"{n} B"
elif n < 1024 * 1024:
return f"{n/1024:.1f} KB"
else:
return f"{n/(1024*1024):.1f} MB"
# ── Routes below are extracted from stegasoo app.py ──
def _run_encode_job(job_id: str, encode_params: dict) -> None:
"""Background thread function for async encode."""
progress_file = get_progress_file_path(job_id)
try:
_store_job(job_id, {"status": "running", "created": time.time()})
# Run encode with progress file
if encode_params.get("file_data"):
encode_result = subprocess_stego.encode(
carrier_data=encode_params["carrier_data"],
reference_data=encode_params["ref_data"],
file_data=encode_params["file_data"],
file_name=encode_params["file_name"],
file_mime=encode_params["file_mime"],
passphrase=encode_params["passphrase"],
pin=encode_params.get("pin"),
rsa_key_data=encode_params.get("rsa_key_data"),
rsa_password=encode_params.get("key_password"),
embed_mode=encode_params["embed_mode"],
dct_output_format=encode_params.get("dct_output_format", "png"),
dct_color_mode=encode_params.get("dct_color_mode", "color"),
channel_key=encode_params.get("channel_key"),
progress_file=progress_file,
)
else:
encode_result = subprocess_stego.encode(
carrier_data=encode_params["carrier_data"],
reference_data=encode_params["ref_data"],
message=encode_params["message"],
passphrase=encode_params["passphrase"],
pin=encode_params.get("pin"),
rsa_key_data=encode_params.get("rsa_key_data"),
rsa_password=encode_params.get("key_password"),
embed_mode=encode_params["embed_mode"],
dct_output_format=encode_params.get("dct_output_format", "png"),
dct_color_mode=encode_params.get("dct_color_mode", "color"),
channel_key=encode_params.get("channel_key"),
progress_file=progress_file,
)
if not encode_result.success:
_store_job(
job_id,
{
"status": "error",
"error": encode_result.error or "Encoding failed",
"created": time.time(),
},
)
return
# Determine output format
embed_mode = encode_params["embed_mode"]
dct_output_format = encode_params.get("dct_output_format", "png")
dct_color_mode = encode_params.get("dct_color_mode", "color")
if embed_mode == "dct" and dct_output_format == "jpeg":
output_ext = ".jpg"
output_mime = "image/jpeg"
else:
output_ext = ".png"
output_mime = "image/png"
filename = encode_result.filename
if not filename:
filename = generate_filename("stego", output_ext)
elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"):
filename = filename[:-4] + ".jpg"
# Store result
file_id = secrets.token_urlsafe(16)
temp_storage.save_temp_file(
file_id,
encode_result.stego_data,
{
"filename": filename,
"embed_mode": embed_mode,
"output_format": dct_output_format if embed_mode == "dct" else "png",
"color_mode": dct_color_mode if embed_mode == "dct" else None,
"mime_type": output_mime,
"channel_mode": encode_result.channel_mode,
"channel_fingerprint": encode_result.channel_fingerprint,
},
)
_store_job(
job_id,
{
"status": "complete",
"file_id": file_id,
"created": time.time(),
},
)
except Exception as e:
_store_job(
job_id,
{
"status": "error",
"error": str(e),
"created": time.time(),
},
)
finally:
cleanup_progress_file(job_id)
def _run_encode_audio_job(job_id: str, encode_params: dict) -> None:
"""Background thread function for async audio encode (v4.3.0)."""
progress_file = get_progress_file_path(job_id)
try:
_store_job(job_id, {"status": "running", "created": time.time()})
if encode_params.get("file_data"):
encode_result = subprocess_stego.encode_audio(
carrier_data=encode_params["carrier_data"],
reference_data=encode_params["ref_data"],
file_data=encode_params["file_data"],
file_name=encode_params["file_name"],
file_mime=encode_params["file_mime"],
passphrase=encode_params["passphrase"],
pin=encode_params.get("pin"),
rsa_key_data=encode_params.get("rsa_key_data"),
rsa_password=encode_params.get("key_password"),
embed_mode=encode_params["embed_mode"],
channel_key=encode_params.get("channel_key"),
progress_file=progress_file,
chip_tier=encode_params.get("chip_tier"),
)
else:
encode_result = subprocess_stego.encode_audio(
carrier_data=encode_params["carrier_data"],
reference_data=encode_params["ref_data"],
message=encode_params.get("message"),
passphrase=encode_params["passphrase"],
pin=encode_params.get("pin"),
rsa_key_data=encode_params.get("rsa_key_data"),
rsa_password=encode_params.get("key_password"),
embed_mode=encode_params["embed_mode"],
channel_key=encode_params.get("channel_key"),
progress_file=progress_file,
chip_tier=encode_params.get("chip_tier"),
)
if not encode_result.success:
_store_job(
job_id,
{
"status": "error",
"error": encode_result.error or "Audio encoding failed",
"created": time.time(),
},
)
return
filename = generate_filename("stego_audio", ".wav")
file_id = secrets.token_urlsafe(16)
temp_storage.save_temp_file(
file_id,
encode_result.stego_data,
{
"filename": filename,
"embed_mode": encode_params["embed_mode"],
"carrier_type": "audio",
"mime_type": "audio/wav",
"channel_mode": encode_result.channel_mode,
"channel_fingerprint": encode_result.channel_fingerprint,
},
)
_store_job(
job_id,
{
"status": "complete",
"file_id": file_id,
"created": time.time(),
},
)
except Exception as e:
_store_job(
job_id,
{
"status": "error",
"error": str(e),
"created": time.time(),
},
)
finally:
cleanup_progress_file(job_id)
@app.route("/encode", methods=["GET", "POST"])
@login_required
def encode():
if request.method == "POST":
# Check if async mode requested
is_async = (
request.form.get("async") == "true" or request.headers.get("X-Async") == "true"
)
def _error_response(msg):
"""Return error as JSON (async) or HTML flash (sync)."""
if is_async:
return jsonify({"error": msg}), 400
flash(msg, "error")
return render_template("stego/encode.html", has_qrcode_read=_HAS_QRCODE_READ)
try:
# Get files
ref_photo = request.files.get("reference_photo")
carrier = request.files.get("carrier")
rsa_key_file = request.files.get("rsa_key")
payload_file = request.files.get("payload_file")
# Determine carrier type (v4.3.0)
carrier_type = request.form.get("carrier_type", "image")
if carrier_type == "audio":
# ========== AUDIO ENCODE PATH (v4.3.0) ==========
if not HAS_AUDIO_SUPPORT:
return _error_response(
"Audio steganography is not available. Install audio dependencies."
)
if not ref_photo or not carrier:
return _error_response(
"Both reference photo and audio carrier are required"
)
if not allowed_image(ref_photo.filename):
return _error_response("Reference must be an image (PNG, JPG, BMP)")
if not allowed_audio(carrier.filename):
return _error_response(
"Invalid audio format. Use WAV, FLAC, MP3, OGG, AAC, or M4A"
)
# Get form data
message = request.form.get("message", "")
passphrase = request.form.get("passphrase", "")
pin = request.form.get("pin", "").strip()
rsa_password = request.form.get("rsa_password", "")
payload_type = request.form.get("payload_type", "text")
embed_mode = request.form.get("embed_mode", "audio_lsb")
if embed_mode not in ("audio_lsb", "audio_spread"):
embed_mode = "audio_lsb"
# Chip tier for spread spectrum (None = default)
chip_tier_str = request.form.get("chip_tier")
chip_tier = None
if chip_tier_str and chip_tier_str.isdigit():
chip_tier = int(chip_tier_str)
if chip_tier not in (0, 1, 2):
chip_tier = None
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
# Determine payload
if payload_type == "file" and payload_file and payload_file.filename:
file_data = payload_file.read()
result = validate_file_payload(file_data, payload_file.filename)
if not result.is_valid:
return _error_response(result.error_message)
mime_type, _ = mimetypes.guess_type(payload_file.filename)
payload = FilePayload(
data=file_data,
filename=payload_file.filename,
mime_type=mime_type,
)
else:
result = validate_message(message)
if not result.is_valid:
return _error_response(result.error_message)
payload = message
if not passphrase:
return _error_response("Passphrase is required")
result = validate_passphrase(passphrase)
if not result.is_valid:
return _error_response(result.error_message)
if result.warning:
flash(result.warning, "warning")
ref_data = ref_photo.read()
carrier_data = carrier.read()
# Handle RSA key (same as image path)
rsa_key_data = None
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
rsa_key_qr = request.files.get("rsa_key_qr")
rsa_key_from_qr = False
if rsa_key_pem:
if is_compressed(rsa_key_pem):
rsa_key_pem = decompress_data(rsa_key_pem)
rsa_key_data = rsa_key_pem.encode("utf-8")
rsa_key_from_qr = True
elif rsa_key_file and rsa_key_file.filename:
rsa_key_data = rsa_key_file.read()
elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ:
qr_image_data = rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if key_pem:
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
else:
return _error_response("Could not extract RSA key from QR code image.")
result = validate_security_factors(pin, rsa_key_data)
if not result.is_valid:
return _error_response(result.error_message)
if pin:
result = validate_pin(pin)
if not result.is_valid:
return _error_response(result.error_message)
key_password = (
None if rsa_key_from_qr else (rsa_password if rsa_password else None)
)
if rsa_key_data:
result = validate_rsa_key(rsa_key_data, key_password)
if not result.is_valid:
return _error_response(result.error_message)
# Build audio encode params
encode_params = {
"carrier_data": carrier_data,
"ref_data": ref_data,
"passphrase": passphrase,
"pin": pin if pin else None,
"rsa_key_data": rsa_key_data,
"key_password": key_password,
"embed_mode": embed_mode,
"channel_key": channel_key,
"carrier_type": "audio",
"chip_tier": chip_tier,
}
if payload_type == "file" and payload_file and payload_file.filename:
encode_params["file_data"] = payload.data
encode_params["file_name"] = payload.filename
encode_params["file_mime"] = payload.mime_type
else:
encode_params["message"] = payload
if is_async:
job_id = generate_job_id()
_store_job(job_id, {"status": "pending", "created": time.time()})
_executor.submit(_run_encode_audio_job, job_id, encode_params)
return jsonify({"job_id": job_id, "status": "pending"})
# Sync audio encode
if encode_params.get("file_data"):
encode_result = subprocess_stego.encode_audio(
carrier_data=carrier_data,
reference_data=ref_data,
file_data=encode_params["file_data"],
file_name=encode_params["file_name"],
file_mime=encode_params["file_mime"],
passphrase=passphrase,
pin=pin if pin else None,
rsa_key_data=rsa_key_data,
rsa_password=key_password,
embed_mode=embed_mode,
channel_key=channel_key,
chip_tier=chip_tier,
)
else:
encode_result = subprocess_stego.encode_audio(
carrier_data=carrier_data,
reference_data=ref_data,
message=payload,
passphrase=passphrase,
pin=pin if pin else None,
rsa_key_data=rsa_key_data,
rsa_password=key_password,
embed_mode=embed_mode,
channel_key=channel_key,
chip_tier=chip_tier,
)
if not encode_result.success:
error_msg = encode_result.error or "Audio encoding failed"
return _error_response(error_msg)
filename = generate_filename("stego_audio", ".wav")
file_id = secrets.token_urlsafe(16)
cleanup_temp_files()
temp_storage.save_temp_file(
file_id,
encode_result.stego_data,
{
"filename": filename,
"embed_mode": embed_mode,
"carrier_type": "audio",
"mime_type": "audio/wav",
"channel_mode": encode_result.channel_mode,
"channel_fingerprint": encode_result.channel_fingerprint,
},
)
return redirect(url_for("encode_result", file_id=file_id))
# ========== IMAGE ENCODE PATH (original) ==========
if not ref_photo or not carrier:
return _error_response("Both reference photo and carrier image are required")
if not allowed_image(ref_photo.filename) or not allowed_image(carrier.filename):
return _error_response("Invalid file type. Use PNG, JPG, or BMP")
# Get form data - v3.2.0: renamed from day_phrase to passphrase
message = request.form.get("message", "")
passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed
pin = request.form.get("pin", "").strip()
rsa_password = request.form.get("rsa_password", "")
payload_type = request.form.get("payload_type", "text")
# NEW in v3.0 - Embedding mode
embed_mode = request.form.get("embed_mode", "lsb")
if embed_mode not in ("lsb", "dct"):
embed_mode = "lsb"
# NEW in v3.0.1 - DCT output format
dct_output_format = request.form.get("dct_output_format", "png")
if dct_output_format not in ("png", "jpeg"):
dct_output_format = "png"
# NEW in v3.0.1 - DCT color mode
dct_color_mode = request.form.get("dct_color_mode", "color")
if dct_color_mode not in ("grayscale", "color"):
dct_color_mode = "color"
# NEW in v4.0.0 - Channel key
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
# Check DCT availability
if embed_mode == "dct" and not has_dct_support():
return _error_response(
"DCT mode requires scipy. Install with: pip install scipy"
)
# Determine payload
if payload_type == "file" and payload_file and payload_file.filename:
# File payload
file_data = payload_file.read()
result = validate_file_payload(file_data, payload_file.filename)
if not result.is_valid:
return _error_response(result.error_message)
mime_type, _ = mimetypes.guess_type(payload_file.filename)
payload = FilePayload(
data=file_data, filename=payload_file.filename, mime_type=mime_type
)
else:
# Text message
result = validate_message(message)
if not result.is_valid:
return _error_response(result.error_message)
payload = message
# v3.2.0: Renamed from day_phrase
if not passphrase:
return _error_response("Passphrase is required")
# v3.2.0: Validate passphrase
result = validate_passphrase(passphrase)
if not result.is_valid:
return _error_response(result.error_message)
# Show warning if passphrase is short
if result.warning:
flash(result.warning, "warning")
# Read files
ref_data = ref_photo.read()
carrier_data = carrier.read()
# Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5)
rsa_key_data = None
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
rsa_key_qr = request.files.get("rsa_key_qr")
rsa_key_from_qr = False
if rsa_key_pem:
# Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd)
if is_compressed(rsa_key_pem):
rsa_key_pem = decompress_data(rsa_key_pem)
rsa_key_data = rsa_key_pem.encode("utf-8")
rsa_key_from_qr = True
elif rsa_key_file and rsa_key_file.filename:
rsa_key_data = rsa_key_file.read()
elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ:
qr_image_data = rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if key_pem:
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
else:
return _error_response("Could not extract RSA key from QR code image.")
# Validate security factors
result = validate_security_factors(pin, rsa_key_data)
if not result.is_valid:
return _error_response(result.error_message)
# Validate PIN if provided
if pin:
result = validate_pin(pin)
if not result.is_valid:
return _error_response(result.error_message)
# Determine key password
key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
# Validate RSA key if provided
if rsa_key_data:
result = validate_rsa_key(rsa_key_data, key_password)
if not result.is_valid:
return _error_response(result.error_message)
# Validate carrier image
result = validate_image(carrier_data, "Carrier image")
if not result.is_valid:
return _error_response(result.error_message)
# Pre-check payload capacity BEFORE encode (fail fast)
from stegasoo.steganography import will_fit_by_mode
payload_size = (
len(payload.data) if hasattr(payload, "data") else len(payload.encode("utf-8"))
)
fit_check = will_fit_by_mode(payload_size, carrier_data, embed_mode=embed_mode)
if not fit_check.get("fits", True):
error_msg = (
f"Payload too large for {embed_mode.upper()} mode. "
f"Payload: {payload_size:,} bytes, "
f"Capacity: {fit_check.get('capacity', 0):,} bytes"
)
# Suggest alternative mode
if embed_mode == "dct":
alt_check = will_fit_by_mode(payload_size, carrier_data, embed_mode="lsb")
if alt_check.get("fits"):
error_msg += " - Try LSB mode instead."
return _error_response(error_msg)
# Build encode params for either sync or async
encode_params = {
"carrier_data": carrier_data,
"ref_data": ref_data,
"passphrase": passphrase,
"pin": pin if pin else None,
"rsa_key_data": rsa_key_data,
"key_password": key_password,
"embed_mode": embed_mode,
"dct_output_format": dct_output_format if embed_mode == "dct" else "png",
"dct_color_mode": dct_color_mode if embed_mode == "dct" else "color",
"channel_key": channel_key,
}
if payload_type == "file" and payload_file and payload_file.filename:
encode_params["file_data"] = payload.data
encode_params["file_name"] = payload.filename
encode_params["file_mime"] = payload.mime_type
else:
encode_params["message"] = payload
# ASYNC MODE: Start background job and return JSON
if is_async:
job_id = generate_job_id()
_store_job(job_id, {"status": "pending", "created": time.time()})
_executor.submit(_run_encode_job, job_id, encode_params)
return jsonify({"job_id": job_id, "status": "pending"})
# SYNC MODE: Run inline (original behavior)
if payload_type == "file" and payload_file and payload_file.filename:
encode_result = subprocess_stego.encode(
carrier_data=carrier_data,
reference_data=ref_data,
file_data=payload.data,
file_name=payload.filename,
file_mime=payload.mime_type,
passphrase=passphrase,
pin=pin if pin else None,
rsa_key_data=rsa_key_data,
rsa_password=key_password,
embed_mode=embed_mode,
dct_output_format=dct_output_format if embed_mode == "dct" else "png",
dct_color_mode=dct_color_mode if embed_mode == "dct" else "color",
channel_key=channel_key,
)
else:
encode_result = subprocess_stego.encode(
carrier_data=carrier_data,
reference_data=ref_data,
message=payload,
passphrase=passphrase,
pin=pin if pin else None,
rsa_key_data=rsa_key_data,
rsa_password=key_password,
embed_mode=embed_mode,
dct_output_format=dct_output_format if embed_mode == "dct" else "png",
dct_color_mode=dct_color_mode if embed_mode == "dct" else "color",
channel_key=channel_key,
)
# Check for subprocess errors
if not encode_result.success:
error_msg = encode_result.error or "Encoding failed"
if "capacity" in error_msg.lower():
raise CapacityError(error_msg)
raise StegasooError(error_msg)
# Determine actual output format for filename and storage
if embed_mode == "dct" and dct_output_format == "jpeg":
output_ext = ".jpg"
output_mime = "image/jpeg"
else:
output_ext = ".png"
output_mime = "image/png"
# Use filename from result or generate one
filename = encode_result.filename
if not filename:
filename = generate_filename("stego", output_ext)
elif (
embed_mode == "dct"
and dct_output_format == "jpeg"
and filename.endswith(".png")
):
filename = filename[:-4] + ".jpg"
# Store temporarily
file_id = secrets.token_urlsafe(16)
cleanup_temp_files()
temp_storage.save_temp_file(
file_id,
encode_result.stego_data,
{
"filename": filename,
"embed_mode": embed_mode,
"output_format": dct_output_format if embed_mode == "dct" else "png",
"color_mode": dct_color_mode if embed_mode == "dct" else None,
"mime_type": output_mime,
# Channel info (v4.0.0)
"channel_mode": encode_result.channel_mode,
"channel_fingerprint": encode_result.channel_fingerprint,
},
)
return redirect(url_for("encode_result", file_id=file_id))
except CapacityError as e:
return _error_response(str(e))
except StegasooError as e:
return _error_response(str(e))
except Exception as e:
return _error_response(f"Error: {e}")
return render_template("stego/encode.html", has_qrcode_read=_HAS_QRCODE_READ)
# ============================================================================
# ENCODE PROGRESS ENDPOINTS (v4.1.2)
# ============================================================================
@app.route("/encode/status/<job_id>")
@login_required
def encode_status(job_id):
"""Get the status of an async encode job."""
job = _get_job(job_id)
if not job:
return jsonify({"error": "Job not found"}), 404
response = {"status": job.get("status", "unknown")}
if job["status"] == "complete":
response["file_id"] = job.get("file_id")
elif job["status"] == "error":
response["error"] = job.get("error", "Unknown error")
return jsonify(response)
@app.route("/encode/progress/<job_id>")
@login_required
def encode_progress(job_id):
"""Get the progress of an async encode job."""
progress = read_progress(job_id)
if progress:
return jsonify(progress)
# No progress file yet - check job status
job = _get_job(job_id)
if not job:
return jsonify({"error": "Job not found"}), 404
if job["status"] == "complete":
return jsonify({"percent": 100, "phase": "complete"})
elif job["status"] == "error":
return jsonify({"percent": 0, "phase": "error", "error": job.get("error")})
elif job["status"] == "pending":
return jsonify({"percent": 0, "phase": "starting"})
# Running but no progress file yet
return jsonify({"percent": 0, "phase": "initializing"})
@app.route("/encode/result/<file_id>")
@login_required
def encode_result(file_id):
file_info = temp_storage.get_temp_file(file_id)
if not file_info:
flash("File expired or not found. Please encode again.", "error")
return redirect(url_for("encode"))
carrier_type = file_info.get("carrier_type", "image")
# Generate thumbnail only for images
thumbnail_data = None
thumbnail_id = None
if carrier_type != "audio":
thumbnail_data = generate_thumbnail(file_info["data"])
if thumbnail_data:
thumbnail_id = f"{file_id}_thumb"
temp_storage.save_thumbnail(thumbnail_id, thumbnail_data)
return render_template(
"encode_result.html",
file_id=file_id,
filename=file_info["filename"],
thumbnail_url=(
url_for("encode_thumbnail", thumb_id=thumbnail_id) if thumbnail_id else None
),
embed_mode=file_info.get("embed_mode", "lsb"),
output_format=file_info.get("output_format", "png"),
color_mode=file_info.get("color_mode"),
carrier_type=carrier_type,
# Channel info (v4.0.0)
channel_mode=file_info.get("channel_mode", "public"),
channel_fingerprint=file_info.get("channel_fingerprint"),
)
@app.route("/encode/thumbnail/<thumb_id>")
@login_required
def encode_thumbnail(thumb_id):
"""Serve thumbnail image."""
thumb_data = temp_storage.get_thumbnail(thumb_id)
if not thumb_data:
return "Thumbnail not found", 404
return send_file(io.BytesIO(thumb_data), mimetype="image/jpeg", as_attachment=False)
@app.route("/encode/download/<file_id>")
@login_required
def encode_download(file_id):
file_info = temp_storage.get_temp_file(file_id)
if not file_info:
flash("File expired or not found.", "error")
return redirect(url_for("encode"))
mime_type = file_info.get("mime_type", "image/png")
return send_file(
io.BytesIO(file_info["data"]),
mimetype=mime_type,
as_attachment=True,
download_name=file_info["filename"],
)
@app.route("/encode/file/<file_id>")
@login_required
def encode_file_route(file_id):
"""Serve file for Web Share API."""
file_info = temp_storage.get_temp_file(file_id)
if not file_info:
return "Not found", 404
mime_type = file_info.get("mime_type", "image/png")
return send_file(
io.BytesIO(file_info["data"]),
mimetype=mime_type,
as_attachment=False,
download_name=file_info["filename"],
)
@app.route("/encode/cleanup/<file_id>", methods=["POST"])
@login_required
def encode_cleanup(file_id):
"""Manually cleanup a file after sharing."""
temp_storage.delete_temp_file(file_id)
# Also cleanup thumbnail if exists
thumb_id = f"{file_id}_thumb"
temp_storage.delete_thumbnail(thumb_id)
return jsonify({"status": "ok"})
# ============================================================================
# DECODE
# ============================================================================
def _run_decode_job(job_id: str, decode_params: dict) -> None:
"""Background thread function for async decode."""
progress_file = get_progress_file_path(job_id)
try:
_store_job(job_id, {"status": "running", "created": time.time()})
# Run decode with progress file
decode_result = subprocess_stego.decode(
stego_data=decode_params["stego_data"],
reference_data=decode_params["ref_data"],
passphrase=decode_params["passphrase"],
pin=decode_params.get("pin"),
rsa_key_data=decode_params.get("rsa_key_data"),
rsa_password=decode_params.get("rsa_password"),
embed_mode=decode_params.get("embed_mode", "auto"),
channel_key=decode_params.get("channel_key"),
progress_file=progress_file,
)
if not decode_result.success:
_store_job(
job_id,
{
"status": "error",
"error": decode_result.error or "Decoding failed",
"error_type": decode_result.error_type,
"created": time.time(),
},
)
return
# Store result based on type
if decode_result.is_file:
file_id = secrets.token_urlsafe(16)
filename = decode_result.filename or "decoded_file"
temp_storage.save_temp_file(
file_id,
decode_result.file_data,
{
"filename": filename,
"mime_type": decode_result.mime_type,
},
)
_store_job(
job_id,
{
"status": "complete",
"file_id": file_id,
"is_file": True,
"filename": filename,
"file_size": len(decode_result.file_data),
"mime_type": decode_result.mime_type,
"created": time.time(),
},
)
else:
_store_job(
job_id,
{
"status": "complete",
"is_file": False,
"message": decode_result.message,
"created": time.time(),
},
)
except Exception as e:
_store_job(
job_id,
{
"status": "error",
"error": str(e),
"created": time.time(),
},
)
finally:
cleanup_progress_file(job_id)
def _run_decode_audio_job(job_id: str, decode_params: dict) -> None:
"""Background thread function for async audio decode (v4.3.0)."""
progress_file = get_progress_file_path(job_id)
try:
_store_job(job_id, {"status": "running", "created": time.time()})
decode_result = subprocess_stego.decode_audio(
stego_data=decode_params["stego_data"],
reference_data=decode_params["ref_data"],
passphrase=decode_params["passphrase"],
pin=decode_params.get("pin"),
rsa_key_data=decode_params.get("rsa_key_data"),
rsa_password=decode_params.get("rsa_password"),
embed_mode=decode_params.get("embed_mode", "audio_auto"),
channel_key=decode_params.get("channel_key"),
progress_file=progress_file,
)
if not decode_result.success:
_store_job(
job_id,
{
"status": "error",
"error": decode_result.error or "Audio decoding failed",
"error_type": decode_result.error_type,
"created": time.time(),
},
)
return
if decode_result.is_file:
file_id = secrets.token_urlsafe(16)
filename = decode_result.filename or "decoded_file"
temp_storage.save_temp_file(
file_id,
decode_result.file_data,
{
"filename": filename,
"mime_type": decode_result.mime_type,
},
)
_store_job(
job_id,
{
"status": "complete",
"file_id": file_id,
"is_file": True,
"filename": filename,
"file_size": len(decode_result.file_data),
"mime_type": decode_result.mime_type,
"created": time.time(),
},
)
else:
_store_job(
job_id,
{
"status": "complete",
"is_file": False,
"message": decode_result.message,
"created": time.time(),
},
)
except Exception as e:
_store_job(
job_id,
{
"status": "error",
"error": str(e),
"created": time.time(),
},
)
finally:
cleanup_progress_file(job_id)
@app.route("/decode", methods=["GET", "POST"])
@login_required
def decode_page():
if request.method == "POST":
try:
# Get files
ref_photo = request.files.get("reference_photo")
stego_image = request.files.get("stego_image")
rsa_key_file = request.files.get("rsa_key")
# Determine carrier type (v4.3.0)
carrier_type = request.form.get("carrier_type", "image")
if carrier_type == "audio":
# ========== AUDIO DECODE PATH (v4.3.0) ==========
if not HAS_AUDIO_SUPPORT:
flash("Audio steganography is not available.", "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
if not ref_photo or not stego_image:
flash("Both reference photo and stego audio are required", "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
if not allowed_image(ref_photo.filename):
flash("Reference must be an image", "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
if not allowed_audio(stego_image.filename):
flash("Invalid audio format", "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
passphrase = request.form.get("passphrase", "")
pin = request.form.get("pin", "").strip()
rsa_password = request.form.get("rsa_password", "")
embed_mode = request.form.get("embed_mode", "audio_auto")
if embed_mode not in ("audio_auto", "audio_lsb", "audio_spread"):
embed_mode = "audio_auto"
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
if not passphrase:
flash("Passphrase is required", "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
ref_data = ref_photo.read()
stego_data = stego_image.read()
# Handle RSA key (same as image path)
rsa_key_data = None
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
rsa_key_qr = request.files.get("rsa_key_qr")
rsa_key_from_qr = False
if rsa_key_pem:
if is_compressed(rsa_key_pem):
rsa_key_pem = decompress_data(rsa_key_pem)
rsa_key_data = rsa_key_pem.encode("utf-8")
rsa_key_from_qr = True
elif rsa_key_file and rsa_key_file.filename:
rsa_key_data = rsa_key_file.read()
elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ:
qr_image_data = rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if key_pem:
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
else:
flash("Could not extract RSA key from QR code image.", "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
result = validate_security_factors(pin, rsa_key_data)
if not result.is_valid:
flash(result.error_message, "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
if pin:
result = validate_pin(pin)
if not result.is_valid:
flash(result.error_message, "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
key_password = (
None if rsa_key_from_qr else (rsa_password if rsa_password else None)
)
if rsa_key_data:
result = validate_rsa_key(rsa_key_data, key_password)
if not result.is_valid:
flash(result.error_message, "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
is_async = (
request.form.get("async") == "true"
or request.headers.get("X-Async") == "true"
)
decode_params = {
"stego_data": stego_data,
"ref_data": ref_data,
"passphrase": passphrase,
"pin": pin if pin else None,
"rsa_key_data": rsa_key_data,
"rsa_password": key_password,
"embed_mode": embed_mode,
"channel_key": channel_key,
}
if is_async:
job_id = generate_job_id()
_store_job(job_id, {"status": "pending", "created": time.time()})
_executor.submit(_run_decode_audio_job, job_id, decode_params)
return jsonify({"job_id": job_id, "status": "pending"})
# Sync audio decode
decode_result = subprocess_stego.decode_audio(
stego_data=stego_data,
reference_data=ref_data,
passphrase=passphrase,
pin=pin if pin else None,
rsa_key_data=rsa_key_data,
rsa_password=key_password,
embed_mode=embed_mode,
channel_key=channel_key,
)
if not decode_result.success:
error_msg = decode_result.error or "Audio decoding failed"
if (
"decrypt" in error_msg.lower()
or decode_result.error_type == "DecryptionError"
):
flash(
"Wrong credentials. Double-check your reference photo, "
"passphrase, PIN, and channel key.",
"warning",
)
else:
flash(error_msg, "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
if decode_result.is_file:
file_id = secrets.token_urlsafe(16)
cleanup_temp_files()
filename = decode_result.filename or "decoded_file"
temp_storage.save_temp_file(
file_id,
decode_result.file_data,
{
"filename": filename,
"mime_type": decode_result.mime_type,
},
)
return render_template(
"decode.html",
decoded_file=True,
file_id=file_id,
filename=filename,
file_size=format_size(len(decode_result.file_data)),
mime_type=decode_result.mime_type,
has_qrcode_read=_HAS_QRCODE_READ,
)
else:
return render_template(
"decode.html",
decoded_message=decode_result.message,
has_qrcode_read=_HAS_QRCODE_READ,
)
# ========== IMAGE DECODE PATH (original) ==========
if not ref_photo or not stego_image:
flash("Both reference photo and stego image are required", "error")
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
# Get form data - v3.2.0: renamed from day_phrase to passphrase
passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed
pin = request.form.get("pin", "").strip()
rsa_password = request.form.get("rsa_password", "")
# NEW in v3.0 - Extraction mode
embed_mode = request.form.get("embed_mode", "auto")
if embed_mode not in ("auto", "lsb", "dct"):
embed_mode = "auto"
# NEW in v4.0.0 - Channel key
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
# Check DCT availability
if embed_mode == "dct" and not has_dct_support():
flash("DCT mode requires scipy. Install with: pip install scipy", "error")
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
# v3.2.0: Removed date handling (no stego_date needed)
# v3.2.0: Renamed from day_phrase
if not passphrase:
flash("Passphrase is required", "error")
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
# Read files
ref_data = ref_photo.read()
stego_data = stego_image.read()
# Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5)
rsa_key_data = None
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
rsa_key_qr = request.files.get("rsa_key_qr")
rsa_key_from_qr = False
if rsa_key_pem:
# Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd)
if is_compressed(rsa_key_pem):
rsa_key_pem = decompress_data(rsa_key_pem)
rsa_key_data = rsa_key_pem.encode("utf-8")
rsa_key_from_qr = True
elif rsa_key_file and rsa_key_file.filename:
rsa_key_data = rsa_key_file.read()
elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ:
qr_image_data = rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if key_pem:
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
else:
flash("Could not extract RSA key from QR code image.", "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
# Validate security factors
result = validate_security_factors(pin, rsa_key_data)
if not result.is_valid:
flash(result.error_message, "error")
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
# Validate PIN if provided
if pin:
result = validate_pin(pin)
if not result.is_valid:
flash(result.error_message, "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
# Determine key password
key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
# Validate RSA key if provided
if rsa_key_data:
result = validate_rsa_key(rsa_key_data, key_password)
if not result.is_valid:
flash(result.error_message, "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
# Check for async mode (v4.1.5)
is_async = (
request.form.get("async") == "true" or request.headers.get("X-Async") == "true"
)
# Build decode params
decode_params = {
"stego_data": stego_data,
"ref_data": ref_data,
"passphrase": passphrase,
"pin": pin if pin else None,
"rsa_key_data": rsa_key_data,
"rsa_password": key_password,
"embed_mode": embed_mode,
"channel_key": channel_key,
}
# ASYNC MODE: Start background job and return JSON
if is_async:
job_id = generate_job_id()
_store_job(job_id, {"status": "pending", "created": time.time()})
_executor.submit(_run_decode_job, job_id, decode_params)
return jsonify({"job_id": job_id, "status": "pending"})
# SYNC MODE: Run inline (original behavior)
# v4.0.0: Include channel_key parameter
# Use subprocess-isolated decode to prevent crashes
decode_result = subprocess_stego.decode(
stego_data=stego_data,
reference_data=ref_data,
passphrase=passphrase,
pin=pin if pin else None,
rsa_key_data=rsa_key_data,
rsa_password=key_password,
embed_mode=embed_mode,
channel_key=channel_key, # v4.0.0
)
# Check for subprocess errors
if not decode_result.success:
error_msg = decode_result.error or "Decoding failed"
# Check for channel key related errors
if "channel key" in error_msg.lower():
flash(error_msg, "error")
return render_template(
"stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ
)
if (
"decrypt" in error_msg.lower()
or decode_result.error_type == "DecryptionError"
):
raise DecryptionError(error_msg)
raise StegasooError(error_msg)
if decode_result.is_file:
# File content - store temporarily for download
file_id = secrets.token_urlsafe(16)
cleanup_temp_files()
filename = decode_result.filename or "decoded_file"
temp_storage.save_temp_file(
file_id,
decode_result.file_data,
{
"filename": filename,
"mime_type": decode_result.mime_type,
},
)
return render_template(
"decode.html",
decoded_file=True,
file_id=file_id,
filename=filename,
file_size=format_size(len(decode_result.file_data)),
mime_type=decode_result.mime_type,
has_qrcode_read=_HAS_QRCODE_READ,
)
else:
# Text content
return render_template(
"decode.html",
decoded_message=decode_result.message,
has_qrcode_read=_HAS_QRCODE_READ,
)
except InvalidMagicBytesError:
flash(
"This doesn't appear to be a Stegasoo image. Try a different mode (LSB/DCT).",
"warning",
)
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
except ReedSolomonError:
flash(
"Image too corrupted to decode. It may have been re-saved or compressed.",
"error",
)
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
except InvalidHeaderError:
flash(
"Invalid or corrupted header. The image may have been modified.",
"error",
)
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
except DecryptionError:
flash(
"Wrong credentials. Double-check your reference photo, passphrase, PIN, and channel key.",
"warning",
)
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
except StegasooError as e:
flash(str(e), "error")
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
except Exception as e:
flash(f"Error: {e}", "error")
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ)
@app.route("/decode/download/<file_id>")
@login_required
def decode_download(file_id):
"""Download decoded file."""
file_info = temp_storage.get_temp_file(file_id)
if not file_info:
flash("File expired or not found.", "error")
return redirect(url_for("decode_page"))
mime_type = file_info.get("mime_type", "application/octet-stream")
return send_file(
io.BytesIO(file_info["data"]),
mimetype=mime_type,
as_attachment=True,
download_name=file_info["filename"],
)
# ============================================================================
# DECODE PROGRESS ENDPOINTS (v4.1.5)
# ============================================================================
@app.route("/decode/status/<job_id>")
@login_required
def decode_status(job_id):
"""Get the status of an async decode job."""
job = _get_job(job_id)
if not job:
return jsonify({"error": "Job not found"}), 404
response = {"status": job.get("status", "unknown")}
if job["status"] == "complete":
response["is_file"] = job.get("is_file", False)
if job.get("is_file"):
response["file_id"] = job.get("file_id")
response["filename"] = job.get("filename")
response["file_size"] = job.get("file_size")
response["mime_type"] = job.get("mime_type")
else:
response["message"] = job.get("message")
elif job["status"] == "error":
response["error"] = job.get("error", "Unknown error")
response["error_type"] = job.get("error_type")
return jsonify(response)
@app.route("/decode/progress/<job_id>")
@login_required
def decode_progress(job_id):
"""Get the progress of an async decode job."""
progress = read_progress(job_id)
if progress:
return jsonify(progress)
# No progress file yet - check job status
job = _get_job(job_id)
if not job:
return jsonify({"error": "Job not found"}), 404
if job["status"] == "complete":
return jsonify({"percent": 100, "phase": "complete"})
elif job["status"] == "error":
return jsonify({"percent": 0, "phase": "error", "error": job.get("error")})
elif job["status"] == "pending":
return jsonify({"percent": 0, "phase": "starting"})
# Running but no progress file yet
return jsonify({"percent": 5, "phase": "reading"})
@app.route("/decode/result/<job_id>")
@login_required
def decode_result(job_id):
"""Get the result page for an async decode job."""
job = _get_job(job_id)
if not job:
flash("Job not found or expired.", "error")
return redirect(url_for("decode_page"))
if job["status"] != "complete":
flash("Decode not complete.", "error")
return redirect(url_for("decode_page"))
if job.get("is_file"):
return render_template(
"decode.html",
decoded_file=True,
file_id=job.get("file_id"),
filename=job.get("filename"),
file_size=format_size(job.get("file_size", 0)),
mime_type=job.get("mime_type"),
has_qrcode_read=_HAS_QRCODE_READ,
)
else:
return render_template(
"decode.html",
decoded_message=job.get("message"),
has_qrcode_read=_HAS_QRCODE_READ,
)
@app.route("/about")
def about():
from stegasoo.channel import get_channel_status
from stegasoo import has_argon2
from auth import get_current_user
channel_status = get_channel_status()
current_user = get_current_user()
is_admin_user = current_user.is_admin if current_user else False
return render_template(
"stego/about.html",
has_argon2=has_argon2(),
has_qrcode_read=_HAS_QRCODE_READ,
channel_configured=channel_status["configured"],
channel_fingerprint=channel_status.get("fingerprint"),
channel_source=channel_status.get("source"),
is_admin=is_admin_user,
)
# ============================================================================
# TOOLS ROUTES (v4.1.0)
# ============================================================================
@app.route("/tools")
@login_required
def tools():
"""Advanced tools page."""
return render_template("stego/tools.html", has_dct=has_dct_support())
@app.route("/api/tools/capacity", methods=["POST"])
@login_required
def api_tools_capacity():
"""Calculate image capacity for steganography."""
from stegasoo.dct_steganography import estimate_capacity_comparison
carrier = request.files.get("image")
if not carrier:
return jsonify({"success": False, "error": "No image provided"}), 400
try:
image_data = carrier.read()
result = estimate_capacity_comparison(image_data)
result["success"] = True
result["filename"] = carrier.filename
result["megapixels"] = round((result["width"] * result["height"]) / 1_000_000, 2)
return jsonify(result)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 400
@app.route("/api/tools/strip-metadata", methods=["POST"])
@login_required
def api_tools_strip_metadata():
"""Strip EXIF/metadata from image."""
import io
from stegasoo.utils import strip_image_metadata
image_file = request.files.get("image")
if not image_file:
return jsonify({"success": False, "error": "No image provided"}), 400
try:
image_data = image_file.read()
clean_data = strip_image_metadata(image_data, output_format="PNG")
buffer = io.BytesIO(clean_data)
filename = image_file.filename.rsplit(".", 1)[0] + "_clean.png"
return send_file(
buffer, mimetype="image/png", as_attachment=True, download_name=filename
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 400
@app.route("/api/tools/exif", methods=["POST"])
@login_required
def api_tools_exif():
"""Read EXIF metadata from image."""
from stegasoo.utils import read_image_exif
image_file = request.files.get("image")
if not image_file:
return jsonify({"success": False, "error": "No image provided"}), 400
try:
image_data = image_file.read()
exif = read_image_exif(image_data)
# Check if it's a JPEG (editable) or not
is_jpeg = image_data[:2] == b"\xff\xd8"
return jsonify(
{
"success": True,
"filename": image_file.filename,
"exif": exif,
"editable": is_jpeg,
"field_count": len(exif),
}
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 400
@app.route("/api/tools/exif/update", methods=["POST"])
@login_required
def api_tools_exif_update():
"""Update EXIF fields in image."""
from stegasoo.utils import write_image_exif
image_file = request.files.get("image")
if not image_file:
return jsonify({"success": False, "error": "No image provided"}), 400
# Get updates from form data
updates_json = request.form.get("updates", "{}")
try:
import json
updates = json.loads(updates_json)
except json.JSONDecodeError:
return jsonify({"success": False, "error": "Invalid updates JSON"}), 400
if not updates:
return jsonify({"success": False, "error": "No updates provided"}), 400
try:
image_data = image_file.read()
updated_data = write_image_exif(image_data, updates)
# Return as downloadable file
buffer = io.BytesIO(updated_data)
return send_file(
buffer,
mimetype="image/jpeg",
as_attachment=True,
download_name=f"exif_{image_file.filename}",
)
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 400
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/tools/exif/clear", methods=["POST"])
@login_required
def api_tools_exif_clear():
"""Remove all EXIF metadata from image."""
from stegasoo.utils import strip_image_metadata
image_file = request.files.get("image")
if not image_file:
return jsonify({"success": False, "error": "No image provided"}), 400
# Get desired output format (default to PNG for lossless)
output_format = request.form.get("format", "PNG").upper()
if output_format not in ("PNG", "JPEG", "BMP"):
output_format = "PNG"
try:
image_data = image_file.read()
clean_data = strip_image_metadata(image_data, output_format=output_format)
# Determine extension and mimetype
ext_map = {
"PNG": ("png", "image/png"),
"JPEG": ("jpg", "image/jpeg"),
"BMP": ("bmp", "image/bmp"),
}
ext, mimetype = ext_map.get(output_format, ("png", "image/png"))
# Return as downloadable file
stem = (
image_file.filename.rsplit(".", 1)[0]
if "." in image_file.filename
else image_file.filename
)
buffer = io.BytesIO(clean_data)
return send_file(
buffer,
mimetype=mimetype,
as_attachment=True,
download_name=f"{stem}_clean.{ext}",
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/tools/rotate", methods=["POST"])
@login_required
def api_tools_rotate():
"""Rotate and/or flip an image, using lossless jpegtran for JPEGs."""
import shutil
import subprocess
import tempfile
from PIL import Image
image_file = request.files.get("image")
if not image_file:
return jsonify({"success": False, "error": "No image provided"}), 400
rotation = int(request.form.get("rotation", 0))
flip_h = request.form.get("flip_h", "false").lower() == "true"
flip_v = request.form.get("flip_v", "false").lower() == "true"
try:
image_data = image_file.read()
img = Image.open(io.BytesIO(image_data))
original_format = img.format # JPEG, PNG, etc.
img.close()
# For JPEGs, use jpegtran for lossless rotation/flip (preserves DCT stego)
has_jpegtran = shutil.which("jpegtran") is not None
use_jpegtran = (
original_format == "JPEG" and has_jpegtran and (rotation or flip_h or flip_v)
)
if use_jpegtran:
# Chain jpegtran operations for lossless transformation
current_data = image_data
# Apply rotation first
if rotation in (90, 180, 270):
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
f.write(current_data)
input_path = f.name
output_path = tempfile.mktemp(suffix=".jpg")
try:
result = subprocess.run(
[
"jpegtran",
"-rotate",
str(rotation),
"-copy",
"all",
"-outfile",
output_path,
input_path,
],
capture_output=True,
timeout=30,
)
if result.returncode == 0:
with open(output_path, "rb") as f:
current_data = f.read()
finally:
for p in [input_path, output_path]:
try:
os.unlink(p)
except OSError:
pass
# Apply horizontal flip
if flip_h:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
f.write(current_data)
input_path = f.name
output_path = tempfile.mktemp(suffix=".jpg")
try:
result = subprocess.run(
[
"jpegtran",
"-flip",
"horizontal",
"-copy",
"all",
"-outfile",
output_path,
input_path,
],
capture_output=True,
timeout=30,
)
if result.returncode == 0:
with open(output_path, "rb") as f:
current_data = f.read()
finally:
for p in [input_path, output_path]:
try:
os.unlink(p)
except OSError:
pass
# Apply vertical flip
if flip_v:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
f.write(current_data)
input_path = f.name
output_path = tempfile.mktemp(suffix=".jpg")
try:
result = subprocess.run(
[
"jpegtran",
"-flip",
"vertical",
"-copy",
"all",
"-outfile",
output_path,
input_path,
],
capture_output=True,
timeout=30,
)
if result.returncode == 0:
with open(output_path, "rb") as f:
current_data = f.read()
finally:
for p in [input_path, output_path]:
try:
os.unlink(p)
except OSError:
pass
buffer = io.BytesIO(current_data)
mimetype = "image/jpeg"
ext = "jpg"
else:
# Fallback to PIL for non-JPEGs or when jpegtran unavailable
img = Image.open(io.BytesIO(image_data))
# Apply rotation (PIL rotates counter-clockwise, so negate)
if rotation:
img = img.rotate(-rotation, expand=True)
# Apply flips
if flip_h:
img = img.transpose(Image.FLIP_LEFT_RIGHT)
if flip_v:
img = img.transpose(Image.FLIP_TOP_BOTTOM)
# Preserve original format
buffer = io.BytesIO()
if original_format == "JPEG":
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
img.save(buffer, format="JPEG", quality=95)
mimetype = "image/jpeg"
ext = "jpg"
else:
img.save(buffer, format="PNG")
mimetype = "image/png"
ext = "png"
buffer.seek(0)
stem = (
image_file.filename.rsplit(".", 1)[0]
if "." in image_file.filename
else image_file.filename
)
return send_file(
buffer,
mimetype=mimetype,
as_attachment=True,
download_name=f"{stem}_transformed.{ext}",
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/tools/compress", methods=["POST"])
@login_required
def api_tools_compress():
"""Compress image to JPEG at specified quality."""
from PIL import Image
image_file = request.files.get("image")
if not image_file:
return jsonify({"success": False, "error": "No image provided"}), 400
quality = int(request.form.get("quality", 85))
quality = max(10, min(100, quality)) # Clamp to valid range
try:
img = Image.open(io.BytesIO(image_file.read()))
# Convert to RGB if necessary (JPEG doesn't support alpha)
if img.mode in ("RGBA", "LA", "P"):
img = img.convert("RGB")
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=quality)
buffer.seek(0)
stem = (
image_file.filename.rsplit(".", 1)[0]
if "." in image_file.filename
else image_file.filename
)
return send_file(
buffer,
mimetype="image/jpeg",
as_attachment=True,
download_name=f"{stem}_q{quality}.jpg",
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/tools/convert", methods=["POST"])
@login_required
def api_tools_convert():
"""Convert image to different format."""
from PIL import Image
image_file = request.files.get("image")
if not image_file:
return jsonify({"success": False, "error": "No image provided"}), 400
output_format = request.form.get("format", "PNG").upper()
quality = int(request.form.get("quality", 90))
quality = max(10, min(100, quality))
# Validate format
format_map = {
"PNG": ("png", "image/png"),
"JPEG": ("jpg", "image/jpeg"),
"WEBP": ("webp", "image/webp"),
}
if output_format not in format_map:
return jsonify({"success": False, "error": f"Unsupported format: {output_format}"}), 400
try:
img = Image.open(io.BytesIO(image_file.read()))
# Convert to RGB for JPEG (no alpha)
if output_format == "JPEG" and img.mode in ("RGBA", "LA", "P"):
img = img.convert("RGB")
buffer = io.BytesIO()
save_kwargs = {"format": output_format}
if output_format in ("JPEG", "WEBP"):
save_kwargs["quality"] = quality
img.save(buffer, **save_kwargs)
buffer.seek(0)
ext, mimetype = format_map[output_format]
stem = (
image_file.filename.rsplit(".", 1)[0]
if "." in image_file.filename
else image_file.filename
)
return send_file(
buffer,
mimetype=mimetype,
as_attachment=True,
download_name=f"{stem}.{ext}",
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
# Add these two test routes anywhere in app.py after the app = Flask(...) line:
@app.route("/test-capacity", methods=["POST"])
def test_capacity():
"""Minimal capacity test - no stegasoo code, just PIL."""
carrier = request.files.get("carrier")
if not carrier:
return jsonify({"error": "No carrier image provided"}), 400
try:
carrier_data = carrier.read()
buffer = io.BytesIO(carrier_data)
img = Image.open(buffer)
width, height = img.size
fmt = img.format
img.close()
buffer.close()
pixels = width * height
lsb_bytes = (pixels * 3) // 8
dct_bytes = ((width // 8) * (height // 8) * 16) // 8 - 10
return jsonify(
{
"success": True,
"width": width,
"height": height,
"format": fmt,
"lsb_kb": round(lsb_bytes / 1024, 1),
"dct_kb": round(dct_bytes / 1024, 1),
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/test-capacity-nopil", methods=["POST"])
def test_capacity_nopil():
"""Ultra-minimal test - no PIL, no stegasoo."""
carrier = request.files.get("carrier")
if not carrier:
return jsonify({"error": "No carrier image provided"}), 400
carrier_data = carrier.read()
return jsonify(
{
"success": True,
"data_size": len(carrier_data),
}
)
# ============================================================================
# AUTHENTICATION ROUTES (v4.0.2)
# ============================================================================