- Replace jpegio references with jpeglib in comments/docstrings - Update sanitize-for-image.sh to use system Python 3.11+ (no pyenv) - Update rpi/patches/README.md for jpeglib world - Add AUR build artifacts to .gitignore Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2913 lines
100 KiB
Python
2913 lines
100 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Stegasoo Web Frontend (v4.0.0)
|
|
|
|
A production Flask application demonstrating proper web architecture patterns.
|
|
This isn't just a quick demo - it's built to run on a Raspberry Pi 24/7.
|
|
|
|
ARCHITECTURE OVERVIEW
|
|
=====================
|
|
|
|
┌─────────────────────────────────────────────────────────────────────┐
|
|
│ FLASK APPLICATION │
|
|
├─────────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ Routes (/encode, /decode, /api/*) │
|
|
│ │ │
|
|
│ ├── auth.py # Session management, user accounts │
|
|
│ ├── temp_storage.py # File-based temp storage with expiry │
|
|
│ ├── subprocess_stego.py # Isolated encode/decode workers │
|
|
│ └── ssl_utils.py # Self-signed cert generation │
|
|
│ │
|
|
│ Templates (Jinja2) │
|
|
│ └── base.html → encode.html, decode.html, etc. │
|
|
│ │
|
|
│ Static assets (CSS, JS) │
|
|
│ └── Vanilla JS, no framework (keeps it simple) │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────────┘
|
|
|
|
KEY PATTERNS
|
|
============
|
|
|
|
1. SUBPROCESS ISOLATION
|
|
Stegasoo's DCT mode uses scipy/jpeglib which can crash on malformed input.
|
|
We run encode/decode in subprocesses so crashes don't take down the server:
|
|
|
|
subprocess_stego = SubprocessStego(timeout=180)
|
|
result = subprocess_stego.encode(carrier, ref, message, ...)
|
|
|
|
If the subprocess crashes, we catch it and return an error gracefully.
|
|
|
|
2. ASYNC JOBS WITH PROGRESS
|
|
Encoding large images can take 30+ seconds. We use ThreadPoolExecutor
|
|
to run jobs in background threads with progress reporting:
|
|
|
|
job_id = generate_job_id()
|
|
_executor.submit(_run_encode_job, job_id, params)
|
|
# Client polls /api/encode/progress/<job_id> for updates
|
|
|
|
3. CONTEXT PROCESSORS
|
|
@app.context_processor injects variables into ALL templates:
|
|
|
|
return {"version": __version__, "has_dct": has_dct_support()}
|
|
|
|
Now every template can use {{ version }} without passing it explicitly.
|
|
|
|
4. BEFORE_REQUEST HOOKS
|
|
@app.before_request runs before every request. We use it for:
|
|
- First-run setup redirect (no users → /setup)
|
|
- Session validation
|
|
- Cleanup of old temp files
|
|
|
|
5. SECURE SECRET KEY
|
|
Flask sessions need a secret key. We persist it to a file so sessions
|
|
survive server restarts (otherwise everyone gets logged out).
|
|
|
|
CHANGES in v4.0.0:
|
|
- Added channel key support for deployment/group isolation
|
|
- New /api/channel/status endpoint
|
|
- Channel key selector on encode/decode pages
|
|
|
|
CHANGES in v3.2.0:
|
|
- Removed date dependency from all operations
|
|
- Simplified user experience for asynchronous communications
|
|
"""
|
|
|
|
import io
|
|
import mimetypes
|
|
import os
|
|
import secrets
|
|
import sys
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from pathlib import Path
|
|
|
|
import temp_storage
|
|
from auth import (
|
|
MAX_CHANNEL_KEYS,
|
|
MAX_USERS,
|
|
admin_required,
|
|
can_create_user,
|
|
can_save_channel_key,
|
|
change_password,
|
|
clear_recovery_key,
|
|
create_admin_user,
|
|
create_user,
|
|
delete_channel_key,
|
|
delete_user,
|
|
generate_temp_password,
|
|
get_all_users,
|
|
get_channel_key_by_id,
|
|
get_current_user,
|
|
get_non_admin_count,
|
|
get_recovery_key_hash,
|
|
get_user_by_id,
|
|
get_user_channel_keys,
|
|
get_username,
|
|
has_recovery_key,
|
|
is_admin,
|
|
is_authenticated,
|
|
login_required,
|
|
login_user,
|
|
logout_user,
|
|
reset_user_password,
|
|
save_channel_key,
|
|
set_recovery_key_hash,
|
|
update_channel_key_last_used,
|
|
update_channel_key_name,
|
|
user_exists,
|
|
verify_and_reset_admin_password,
|
|
verify_user_password,
|
|
)
|
|
from auth import (
|
|
init_app as init_auth,
|
|
)
|
|
from flask import (
|
|
Flask,
|
|
flash,
|
|
jsonify,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_file,
|
|
session,
|
|
url_for,
|
|
)
|
|
from PIL import Image
|
|
from ssl_utils import ensure_certs
|
|
|
|
os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0"
|
|
os.environ["OMP_NUM_THREADS"] = "1"
|
|
|
|
# Add parent to path for development
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
|
|
|
|
import stegasoo
|
|
from stegasoo import (
|
|
CapacityError,
|
|
DecryptionError,
|
|
FilePayload,
|
|
InvalidHeaderError,
|
|
InvalidMagicBytesError,
|
|
ReedSolomonError,
|
|
StegasooError,
|
|
export_rsa_key_pem,
|
|
generate_credentials,
|
|
generate_filename,
|
|
get_channel_status,
|
|
has_argon2,
|
|
# Channel key functions (v4.0.0)
|
|
has_dct_support,
|
|
load_rsa_key,
|
|
validate_channel_key,
|
|
validate_file_payload,
|
|
validate_image,
|
|
validate_message,
|
|
validate_passphrase,
|
|
validate_pin,
|
|
validate_rsa_key,
|
|
validate_security_factors,
|
|
)
|
|
from stegasoo.constants import (
|
|
DEFAULT_PASSPHRASE_WORDS,
|
|
MAX_FILE_PAYLOAD_SIZE,
|
|
MAX_FILE_SIZE,
|
|
MAX_MESSAGE_CHARS,
|
|
MAX_PIN_LENGTH,
|
|
MAX_UPLOAD_SIZE,
|
|
MIN_PASSPHRASE_WORDS,
|
|
MIN_PIN_LENGTH,
|
|
RECOMMENDED_PASSPHRASE_WORDS,
|
|
TEMP_FILE_EXPIRY,
|
|
TEMP_FILE_EXPIRY_MINUTES,
|
|
THUMBNAIL_QUALITY,
|
|
THUMBNAIL_SIZE,
|
|
VALID_RSA_SIZES,
|
|
__version__,
|
|
)
|
|
|
|
# QR Code support
|
|
try:
|
|
import qrcode # noqa: F401
|
|
from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_M # noqa: F401
|
|
|
|
HAS_QRCODE = True
|
|
except ImportError:
|
|
HAS_QRCODE = False
|
|
|
|
# QR Code reading
|
|
try:
|
|
from pyzbar.pyzbar import decode as pyzbar_decode # noqa: F401
|
|
|
|
HAS_QRCODE_READ = True
|
|
except ImportError:
|
|
HAS_QRCODE_READ = False
|
|
|
|
|
|
# Import QR utilities
|
|
# ============================================================================
|
|
# SUBPROCESS ISOLATION FOR STEGASOO OPERATIONS
|
|
# ============================================================================
|
|
#
|
|
# This is a critical reliability pattern. Here's the problem:
|
|
#
|
|
# scipy's DCT and jpeglib can crash (segfault) on:
|
|
# - Malformed JPEG files
|
|
# - Very large images that exhaust memory
|
|
# - Certain edge cases in coefficient manipulation
|
|
#
|
|
# If these crash in the main Flask process, your whole server dies.
|
|
# Users get a connection reset, and the service goes down.
|
|
#
|
|
# The solution: Run stegasoo operations in separate Python processes.
|
|
#
|
|
# Main Flask process Worker subprocess
|
|
# ┌─────────────────┐ ┌─────────────────┐
|
|
# │ │ spawn │ │
|
|
# │ /api/encode │──────────────>│ encode() │
|
|
# │ │ │ │
|
|
# │ wait for │<──────────────│ return result │
|
|
# │ result │ or crash │ (or crash) │
|
|
# │ │ │ │
|
|
# │ handle error │ │ (process dies) │
|
|
# └─────────────────┘ └─────────────────┘
|
|
#
|
|
# If the subprocess crashes, we catch the error and return a friendly message.
|
|
# The main server keeps running. Users can try again with different input.
|
|
#
|
|
# The subprocess_stego module handles all the pickling/unpickling of data.
|
|
|
|
from subprocess_stego import (
|
|
SubprocessStego,
|
|
cleanup_progress_file,
|
|
generate_job_id,
|
|
get_progress_file_path,
|
|
read_progress,
|
|
)
|
|
|
|
from stegasoo.qr_utils import (
|
|
can_fit_in_qr,
|
|
decompress_data,
|
|
detect_and_crop_qr,
|
|
extract_key_from_qr,
|
|
generate_qr_code,
|
|
is_compressed,
|
|
)
|
|
|
|
# Initialize subprocess wrapper (worker script must be in same directory)
|
|
subprocess_stego = SubprocessStego(timeout=180) # 3 minute timeout for large images
|
|
|
|
|
|
# ============================================================================
|
|
# FLASK APP CONFIGURATION
|
|
# ============================================================================
|
|
#
|
|
# Flask configuration demonstrates several production patterns:
|
|
#
|
|
# 1. SECRET KEY PERSISTENCE
|
|
# Flask uses secret_key to sign session cookies. If it changes, all users
|
|
# get logged out. We save it to a file so it survives restarts.
|
|
#
|
|
# 2. CONTENT LENGTH LIMITS
|
|
# MAX_CONTENT_LENGTH prevents DoS via huge uploads. Flask will reject
|
|
# requests that exceed this before loading them into memory.
|
|
#
|
|
# 3. ENVIRONMENT-BASED CONFIG
|
|
# Settings come from environment variables, allowing:
|
|
# - Different settings per deployment (dev/staging/prod)
|
|
# - Docker/systemd to inject config without code changes
|
|
# - 12-factor app compliance
|
|
#
|
|
# 4. INSTANCE FOLDER
|
|
# Flask's instance_path is for per-deployment data (databases, keys).
|
|
# It's .gitignored by default - perfect for secrets.
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Persist secret key so sessions survive restarts
|
|
# Without this, every restart = everyone gets logged out
|
|
_instance_path = Path(app.instance_path)
|
|
_instance_path.mkdir(parents=True, exist_ok=True)
|
|
_secret_key_file = _instance_path / ".secret_key"
|
|
if _secret_key_file.exists():
|
|
app.secret_key = _secret_key_file.read_text().strip()
|
|
else:
|
|
# First run: generate a new key and save it
|
|
app.secret_key = secrets.token_hex(32) # 256 bits of randomness
|
|
_secret_key_file.write_text(app.secret_key)
|
|
_secret_key_file.chmod(0o600) # Only owner can read
|
|
|
|
# Reject uploads larger than this (prevents memory exhaustion)
|
|
app.config["MAX_CONTENT_LENGTH"] = MAX_FILE_SIZE
|
|
|
|
# Auth configuration from environment
|
|
# STEGASOO_AUTH_ENABLED=false disables login (for local/dev use)
|
|
app.config["AUTH_ENABLED"] = os.environ.get("STEGASOO_AUTH_ENABLED", "true").lower() == "true"
|
|
app.config["HTTPS_ENABLED"] = os.environ.get("STEGASOO_HTTPS_ENABLED", "false").lower() == "true"
|
|
|
|
# Initialize auth module (sets up session handling, user DB)
|
|
init_auth(app)
|
|
|
|
# ============================================================================
|
|
# ASYNC JOB MANAGEMENT (v4.1.2)
|
|
# ============================================================================
|
|
#
|
|
# Problem: DCT encoding a large image can take 30-60 seconds.
|
|
# Solution: Run it in a background thread, let the client poll for progress.
|
|
#
|
|
# The flow:
|
|
#
|
|
# Client Server
|
|
# ────── ──────
|
|
# POST /api/encode/async ──────> Start background job
|
|
# <────── Return job_id
|
|
#
|
|
# GET /api/encode/progress/123 ─> Check job status
|
|
# <────── {"progress": 45, "phase": "embedding"}
|
|
#
|
|
# GET /api/encode/progress/123 ─> Check again
|
|
# <────── {"status": "complete", "file_id": "abc"}
|
|
#
|
|
# GET /api/download/abc ────────> Download result
|
|
# <────── Encoded image
|
|
#
|
|
# Why ThreadPoolExecutor instead of Celery/Redis?
|
|
# - This runs on a Raspberry Pi with 1GB RAM
|
|
# - We don't need distributed workers
|
|
# - Keep it simple - threads are fine for 2 concurrent jobs
|
|
#
|
|
# The thread pool is limited to 2 workers because:
|
|
# - Each encode loads the full image into memory
|
|
# - Too many concurrent jobs = OOM on the Pi
|
|
|
|
_executor = ThreadPoolExecutor(max_workers=2)
|
|
|
|
# Job storage: job_id -> {status, result, error, file_id, created, ...}
|
|
# We use a dict with a lock because threads access it concurrently
|
|
_jobs = {}
|
|
_jobs_lock = threading.Lock()
|
|
|
|
|
|
def _store_job(job_id: str, data: dict) -> None:
|
|
"""Thread-safe job storage."""
|
|
with _jobs_lock:
|
|
_jobs[job_id] = data
|
|
|
|
|
|
def _get_job(job_id: str) -> dict | None:
|
|
"""Thread-safe job retrieval."""
|
|
with _jobs_lock:
|
|
return _jobs.get(job_id)
|
|
|
|
|
|
def _cleanup_old_jobs(max_age_seconds: int = 3600) -> None:
|
|
"""Remove jobs older than max_age_seconds."""
|
|
now = time.time()
|
|
with _jobs_lock:
|
|
to_remove = [
|
|
jid for jid, data in _jobs.items() if now - data.get("created", 0) > max_age_seconds
|
|
]
|
|
for jid in to_remove:
|
|
cleanup_progress_file(jid)
|
|
del _jobs[jid]
|
|
|
|
|
|
@app.before_request
|
|
def require_setup():
|
|
"""Force redirect to setup if no users exist (first-run)."""
|
|
if not app.config.get("AUTH_ENABLED", True):
|
|
return None
|
|
|
|
# Skip for static files and setup-related routes
|
|
if request.endpoint in ("static", "setup", "setup_recovery", None):
|
|
return None
|
|
|
|
# If no users exist, redirect to setup
|
|
if not user_exists():
|
|
return redirect(url_for("setup"))
|
|
|
|
return None
|
|
|
|
|
|
# DEPRECATED: In-memory storage replaced by file-based temp_storage module
|
|
# Kept for backwards compatibility during transition
|
|
TEMP_FILES: dict[str, dict] = {} # Not used - see temp_storage.py
|
|
THUMBNAIL_FILES: dict[str, bytes] = {} # Not used - see temp_storage.py
|
|
|
|
|
|
# ============================================================================
|
|
# TEMPLATE CONTEXT PROCESSOR
|
|
# ============================================================================
|
|
#
|
|
# Context processors inject variables into EVERY template automatically.
|
|
# Instead of passing the same data to every render_template() call:
|
|
#
|
|
# # Bad: repetitive and error-prone
|
|
# return render_template("page.html", version=__version__, has_dct=...)
|
|
#
|
|
# We define it once here and it's available everywhere:
|
|
#
|
|
# # In any template:
|
|
# <p>Version: {{ version }}</p>
|
|
# {% if has_dct %}DCT mode available{% endif %}
|
|
#
|
|
# This is great for:
|
|
# - Version numbers (show in footer)
|
|
# - Feature flags (has_dct, auth_enabled)
|
|
# - User info (username, is_admin)
|
|
# - Global config (max sizes, limits)
|
|
#
|
|
# The function runs on EVERY request, so keep it fast.
|
|
# Don't do expensive database queries here.
|
|
|
|
|
|
@app.context_processor
|
|
def inject_globals():
|
|
"""Inject global variables into all templates."""
|
|
# Get channel status (v4.0.0)
|
|
channel_status = get_channel_status()
|
|
|
|
# Get saved channel keys for authenticated users (v4.2.0)
|
|
saved_channel_keys = []
|
|
if is_authenticated():
|
|
current_user = get_current_user()
|
|
if current_user:
|
|
saved_channel_keys = get_user_channel_keys(current_user.id)
|
|
|
|
return {
|
|
"version": __version__,
|
|
"max_message_chars": MAX_MESSAGE_CHARS,
|
|
"max_payload_kb": MAX_FILE_PAYLOAD_SIZE // 1024,
|
|
"max_upload_mb": MAX_UPLOAD_SIZE // (1024 * 1024),
|
|
"temp_file_expiry_minutes": TEMP_FILE_EXPIRY_MINUTES,
|
|
"min_pin_length": MIN_PIN_LENGTH,
|
|
"max_pin_length": MAX_PIN_LENGTH,
|
|
# NEW in v3.2.0
|
|
"min_passphrase_words": MIN_PASSPHRASE_WORDS,
|
|
"recommended_passphrase_words": RECOMMENDED_PASSPHRASE_WORDS,
|
|
"default_passphrase_words": DEFAULT_PASSPHRASE_WORDS,
|
|
# NEW in v3.0
|
|
"has_dct": has_dct_support(),
|
|
# NEW in v4.0.0 - Channel key status
|
|
"channel_mode": channel_status["mode"],
|
|
"channel_configured": channel_status["configured"],
|
|
"channel_fingerprint": channel_status.get("fingerprint"),
|
|
"channel_source": channel_status.get("source"),
|
|
# NEW in v4.0.2 - Auth state
|
|
"auth_enabled": app.config.get("AUTH_ENABLED", True),
|
|
"is_authenticated": is_authenticated(),
|
|
"username": get_username() if is_authenticated() else None,
|
|
# NEW in v4.1.0 - Admin state
|
|
"is_admin": is_admin(),
|
|
# NEW in v4.2.0 - Saved channel keys
|
|
"saved_channel_keys": saved_channel_keys,
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# CONFIGURATION
|
|
# ============================================================================
|
|
|
|
try:
|
|
print(f"Stegasoo v{__version__} - Web Frontend")
|
|
print(f"Current MAX_FILE_SIZE: {MAX_FILE_SIZE}")
|
|
print(f"Current MAX_FILE_PAYLOAD_SIZE: {MAX_FILE_PAYLOAD_SIZE}")
|
|
print(f"DCT support: {has_dct_support()}")
|
|
print(f"QR code support: write={HAS_QRCODE}, read={HAS_QRCODE_READ}")
|
|
|
|
# Channel key status (v4.0.0)
|
|
channel_status = get_channel_status()
|
|
print(f"Channel key: {channel_status['mode']} mode")
|
|
if channel_status["configured"]:
|
|
print(f" Fingerprint: {channel_status.get('fingerprint')}")
|
|
print(f" Source: {channel_status.get('source')}")
|
|
|
|
DESIRED_PAYLOAD_SIZE = 2 * 1024 * 1024 # 2MB
|
|
|
|
if hasattr(stegasoo, "MAX_FILE_PAYLOAD_SIZE"):
|
|
print(f"Overriding MAX_FILE_PAYLOAD_SIZE to {DESIRED_PAYLOAD_SIZE}")
|
|
stegasoo.MAX_FILE_PAYLOAD_SIZE = DESIRED_PAYLOAD_SIZE
|
|
|
|
except Exception as e:
|
|
print(f"Could not override stegasoo limits: {e}")
|
|
|
|
|
|
# ============================================================================
|
|
# CHANNEL KEY HELPER (v4.0.0)
|
|
# ============================================================================
|
|
|
|
|
|
def resolve_channel_key_form(channel_key_value: str) -> str:
|
|
"""
|
|
Resolve channel key from form input.
|
|
|
|
Wrapper around library's resolve_channel_key for subprocess compatibility.
|
|
Returns string values for subprocess_stego ('auto', 'none', or explicit key).
|
|
"""
|
|
from stegasoo.channel import resolve_channel_key
|
|
|
|
try:
|
|
result = resolve_channel_key(channel_key_value)
|
|
if result is None:
|
|
return "auto"
|
|
elif result == "":
|
|
return "none"
|
|
else:
|
|
return result
|
|
except (ValueError, FileNotFoundError):
|
|
# Invalid format, fall back to auto
|
|
return "auto"
|
|
|
|
|
|
def generate_thumbnail(image_data: bytes, size: tuple = THUMBNAIL_SIZE) -> bytes:
|
|
"""Generate thumbnail from image data."""
|
|
try:
|
|
with Image.open(io.BytesIO(image_data)) as img:
|
|
# Convert to RGB if necessary (handle grayscale too)
|
|
if img.mode in ("RGBA", "LA", "P"):
|
|
# Create white background for transparent images
|
|
background = Image.new("RGB", img.size, (255, 255, 255))
|
|
if img.mode == "P":
|
|
img = img.convert("RGBA")
|
|
background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
|
|
img = background
|
|
elif img.mode == "L":
|
|
# Convert grayscale to RGB for thumbnail
|
|
img = img.convert("RGB")
|
|
elif img.mode != "RGB":
|
|
img = img.convert("RGB")
|
|
|
|
# Create thumbnail
|
|
img.thumbnail(size, Image.Resampling.LANCZOS)
|
|
|
|
# Save to bytes
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="JPEG", quality=THUMBNAIL_QUALITY, optimize=True)
|
|
return buffer.getvalue()
|
|
except Exception as e:
|
|
print(f"Thumbnail generation error: {e}")
|
|
return None
|
|
|
|
|
|
def cleanup_temp_files():
|
|
"""Remove expired temporary files."""
|
|
temp_storage.cleanup_expired(TEMP_FILE_EXPIRY)
|
|
|
|
|
|
def allowed_image(filename: str) -> bool:
|
|
"""Check if file has allowed image extension."""
|
|
if not filename or "." not in filename:
|
|
return False
|
|
ext = filename.rsplit(".", 1)[1].lower()
|
|
return ext in {"png", "jpg", "jpeg", "bmp", "gif"}
|
|
|
|
|
|
def format_size(size_bytes: int) -> str:
|
|
"""Format file size for display."""
|
|
if size_bytes < 1024:
|
|
return f"{size_bytes} B"
|
|
elif size_bytes < 1024 * 1024:
|
|
return f"{size_bytes / 1024:.1f} KB"
|
|
else:
|
|
return f"{size_bytes / (1024 * 1024):.1f} MB"
|
|
|
|
|
|
# ============================================================================
|
|
# ROUTES
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template("index.html")
|
|
|
|
|
|
# ============================================================================
|
|
# CHANNEL KEY API (v4.0.0)
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/api/channel/status")
|
|
@login_required
|
|
def api_channel_status():
|
|
"""
|
|
Get current channel key status (v4.0.0).
|
|
|
|
Returns JSON with mode, fingerprint, and source.
|
|
"""
|
|
# Use subprocess for isolation
|
|
result = subprocess_stego.get_channel_status(reveal=False)
|
|
|
|
if result.success:
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"mode": result.mode,
|
|
"configured": result.configured,
|
|
"fingerprint": result.fingerprint,
|
|
"source": result.source,
|
|
}
|
|
)
|
|
else:
|
|
# Fallback to direct call if subprocess fails
|
|
status = get_channel_status()
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"mode": status["mode"],
|
|
"configured": status["configured"],
|
|
"fingerprint": status.get("fingerprint"),
|
|
"source": status.get("source"),
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/api/channel/validate", methods=["POST"])
|
|
@login_required
|
|
def api_channel_validate():
|
|
"""
|
|
Validate a channel key format (v4.0.0).
|
|
|
|
Returns JSON with validation result.
|
|
"""
|
|
key = request.form.get("key", "") or (request.json.get("key", "") if request.is_json else "")
|
|
|
|
if not key:
|
|
return jsonify({"valid": False, "error": "No key provided"})
|
|
|
|
is_valid = validate_channel_key(key)
|
|
|
|
if is_valid:
|
|
fingerprint = f"{key[:4]}-••••-••••-••••-••••-••••-••••-{key[-4:]}"
|
|
return jsonify(
|
|
{
|
|
"valid": True,
|
|
"fingerprint": fingerprint,
|
|
}
|
|
)
|
|
else:
|
|
return jsonify(
|
|
{
|
|
"valid": False,
|
|
"error": "Invalid format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX",
|
|
}
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# GENERATE
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/generate", methods=["GET", "POST"])
|
|
@login_required
|
|
def generate():
|
|
if request.method == "POST":
|
|
# v3.2.0: Changed from words_per_phrase to words_per_passphrase, default increased to 4
|
|
words_per_passphrase = int(
|
|
request.form.get("words_per_passphrase", DEFAULT_PASSPHRASE_WORDS)
|
|
)
|
|
use_pin = request.form.get("use_pin") == "on"
|
|
use_rsa = request.form.get("use_rsa") == "on"
|
|
|
|
if not use_pin and not use_rsa:
|
|
flash("You must select at least one security factor (PIN or RSA Key)", "error")
|
|
return render_template("generate.html", generated=False, has_qrcode=HAS_QRCODE)
|
|
|
|
pin_length = int(request.form.get("pin_length", 6))
|
|
rsa_bits = int(request.form.get("rsa_bits", 2048))
|
|
|
|
# Clamp values
|
|
words_per_passphrase = max(MIN_PASSPHRASE_WORDS, min(12, words_per_passphrase))
|
|
pin_length = max(MIN_PIN_LENGTH, min(MAX_PIN_LENGTH, pin_length))
|
|
if rsa_bits not in VALID_RSA_SIZES:
|
|
rsa_bits = 2048
|
|
|
|
try:
|
|
# v3.2.0 FIX: Use correct parameter name 'passphrase_words'
|
|
creds = generate_credentials(
|
|
use_pin=use_pin,
|
|
use_rsa=use_rsa,
|
|
pin_length=pin_length,
|
|
rsa_bits=rsa_bits,
|
|
passphrase_words=words_per_passphrase, # FIX: was words_per_passphrase=
|
|
)
|
|
|
|
# Store RSA key temporarily for QR generation
|
|
qr_token = None
|
|
qr_needs_compression = False
|
|
qr_too_large = False
|
|
|
|
if creds.rsa_key_pem and HAS_QRCODE:
|
|
# Check if key fits in QR code
|
|
if can_fit_in_qr(creds.rsa_key_pem, compress=True):
|
|
qr_needs_compression = True
|
|
else:
|
|
qr_too_large = True
|
|
|
|
if not qr_too_large:
|
|
qr_token = secrets.token_urlsafe(16)
|
|
cleanup_temp_files()
|
|
temp_storage.save_temp_file(qr_token, creds.rsa_key_pem.encode(), {
|
|
"filename": "rsa_key.pem",
|
|
"type": "rsa_key",
|
|
"compress": qr_needs_compression,
|
|
})
|
|
|
|
# v3.2.0: Single passphrase instead of daily phrases
|
|
return render_template(
|
|
"generate.html",
|
|
passphrase=creds.passphrase, # v3.2.0: Single passphrase
|
|
pin=creds.pin,
|
|
generated=True,
|
|
words_per_passphrase=words_per_passphrase,
|
|
pin_length=pin_length if use_pin else None,
|
|
use_pin=use_pin,
|
|
use_rsa=use_rsa,
|
|
rsa_bits=rsa_bits,
|
|
rsa_key_pem=creds.rsa_key_pem,
|
|
passphrase_entropy=creds.passphrase_entropy,
|
|
pin_entropy=creds.pin_entropy,
|
|
rsa_entropy=creds.rsa_entropy,
|
|
total_entropy=creds.total_entropy,
|
|
has_qrcode=HAS_QRCODE,
|
|
qr_token=qr_token,
|
|
qr_needs_compression=qr_needs_compression,
|
|
qr_too_large=qr_too_large,
|
|
)
|
|
except Exception as e:
|
|
flash(f"Error generating credentials: {e}", "error")
|
|
return render_template("generate.html", generated=False, has_qrcode=HAS_QRCODE)
|
|
|
|
return render_template("generate.html", generated=False, has_qrcode=HAS_QRCODE)
|
|
|
|
|
|
@app.route("/generate/qr/<token>")
|
|
@login_required
|
|
def generate_qr(token):
|
|
"""Generate QR code for RSA key."""
|
|
if not HAS_QRCODE:
|
|
return "QR code support not available", 501
|
|
|
|
file_info = temp_storage.get_temp_file(token)
|
|
if not file_info:
|
|
return "Token expired or invalid", 404
|
|
|
|
if file_info.get("type") != "rsa_key":
|
|
return "Invalid token type", 400
|
|
|
|
try:
|
|
key_pem = file_info["data"].decode("utf-8")
|
|
compress = file_info.get("compress", False)
|
|
qr_png = generate_qr_code(key_pem, compress=compress)
|
|
|
|
return send_file(io.BytesIO(qr_png), mimetype="image/png", as_attachment=False)
|
|
except Exception as e:
|
|
return f"Error generating QR code: {e}", 500
|
|
|
|
|
|
@app.route("/generate/qr-download/<token>")
|
|
@login_required
|
|
def generate_qr_download(token):
|
|
"""Download QR code as PNG file."""
|
|
if not HAS_QRCODE:
|
|
return "QR code support not available", 501
|
|
|
|
file_info = temp_storage.get_temp_file(token)
|
|
if not file_info:
|
|
return "Token expired or invalid", 404
|
|
|
|
if file_info.get("type") != "rsa_key":
|
|
return "Invalid token type", 400
|
|
|
|
try:
|
|
key_pem = file_info["data"].decode("utf-8")
|
|
compress = file_info.get("compress", False)
|
|
qr_png = generate_qr_code(key_pem, compress=compress)
|
|
|
|
return send_file(
|
|
io.BytesIO(qr_png),
|
|
mimetype="image/png",
|
|
as_attachment=True,
|
|
download_name="stegasoo_rsa_key_qr.png",
|
|
)
|
|
except Exception as e:
|
|
return f"Error generating QR code: {e}", 500
|
|
|
|
|
|
@app.route("/qr/crop", methods=["POST"])
|
|
@login_required
|
|
def qr_crop():
|
|
"""
|
|
Detect and crop QR code from an image.
|
|
|
|
Useful for extracting QR codes from photos taken at an angle,
|
|
with extra background, etc. Returns the cropped QR as PNG.
|
|
"""
|
|
if not HAS_QRCODE_READ:
|
|
return jsonify({"error": "QR code reading not available (install pyzbar)"}), 501
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"error": "No image provided"}), 400
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
|
|
# Use the new crop function
|
|
cropped = detect_and_crop_qr(image_data)
|
|
|
|
if cropped is None:
|
|
return jsonify({"error": "No QR code detected in image"}), 404
|
|
|
|
# Return as downloadable PNG or inline based on query param
|
|
as_attachment = request.args.get("download", "").lower() in ("1", "true", "yes")
|
|
|
|
return send_file(
|
|
io.BytesIO(cropped),
|
|
mimetype="image/png",
|
|
as_attachment=as_attachment,
|
|
download_name="cropped_qr.png",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": f"Error processing image: {e}"}), 500
|
|
|
|
|
|
@app.route("/generate/download-key", methods=["POST"])
|
|
@login_required
|
|
def download_key():
|
|
"""Download RSA key as password-protected PEM file."""
|
|
key_pem = request.form.get("key_pem", "")
|
|
password = request.form.get("key_password", "")
|
|
|
|
if not key_pem:
|
|
flash("No key to download", "error")
|
|
return redirect(url_for("generate"))
|
|
|
|
if not password or len(password) < 8:
|
|
flash("Password must be at least 8 characters", "error")
|
|
return redirect(url_for("generate"))
|
|
|
|
try:
|
|
private_key = load_rsa_key(key_pem.encode("utf-8"))
|
|
encrypted_pem = export_rsa_key_pem(private_key, password=password)
|
|
|
|
key_id = secrets.token_hex(4)
|
|
filename = f"stegasoo_key_{private_key.key_size}_{key_id}.pem"
|
|
|
|
return send_file(
|
|
io.BytesIO(encrypted_pem),
|
|
mimetype="application/x-pem-file",
|
|
as_attachment=True,
|
|
download_name=filename,
|
|
)
|
|
except Exception as e:
|
|
flash(f"Error creating key file: {e}", "error")
|
|
return redirect(url_for("generate"))
|
|
|
|
|
|
@app.route("/extract-key-from-qr", methods=["POST"])
|
|
@login_required
|
|
def extract_key_from_qr_route():
|
|
"""
|
|
Extract RSA key from uploaded QR code image.
|
|
Returns JSON with the extracted key or error.
|
|
"""
|
|
if not HAS_QRCODE_READ:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"error": "QR code reading not available. Install pyzbar and libzbar.",
|
|
}
|
|
),
|
|
501,
|
|
)
|
|
|
|
qr_image = request.files.get("qr_image")
|
|
if not qr_image:
|
|
return jsonify({"success": False, "error": "No QR image provided"}), 400
|
|
|
|
try:
|
|
image_data = qr_image.read()
|
|
key_pem = extract_key_from_qr(image_data)
|
|
|
|
if key_pem:
|
|
return jsonify({"success": True, "key_pem": key_pem})
|
|
else:
|
|
return jsonify({"success": False, "error": "No valid RSA key found in QR code"}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# NEW in v3.0 - CAPACITY COMPARISON API
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/api/compare-capacity", methods=["POST"])
|
|
@login_required
|
|
def api_compare_capacity():
|
|
"""
|
|
Compare LSB and DCT capacity for an uploaded carrier image.
|
|
Returns JSON with capacity info for both modes.
|
|
Uses subprocess isolation to prevent crashes.
|
|
"""
|
|
carrier = request.files.get("carrier")
|
|
if not carrier:
|
|
return jsonify({"error": "No carrier image provided"}), 400
|
|
|
|
try:
|
|
carrier_data = carrier.read()
|
|
|
|
# Use subprocess-isolated compare_modes
|
|
result = subprocess_stego.compare_modes(carrier_data)
|
|
|
|
if not result.success:
|
|
return jsonify({"error": result.error or "Comparison failed"}), 500
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"width": result.width,
|
|
"height": result.height,
|
|
"lsb": {
|
|
"capacity_bytes": result.lsb["capacity_bytes"],
|
|
"capacity_kb": round(result.lsb["capacity_kb"], 1),
|
|
"output": result.lsb.get("output", "PNG"),
|
|
},
|
|
"dct": {
|
|
"capacity_bytes": result.dct["capacity_bytes"],
|
|
"capacity_kb": round(result.dct["capacity_kb"], 1),
|
|
"output": result.dct.get("output", "JPEG"),
|
|
"available": result.dct.get("available", True),
|
|
"ratio": round(result.dct.get("ratio_vs_lsb", 0), 1),
|
|
},
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/check-fit", methods=["POST"])
|
|
@login_required
|
|
def api_check_fit():
|
|
"""
|
|
Check if a payload will fit in the carrier with selected mode.
|
|
Returns JSON with fit status and details.
|
|
Uses subprocess isolation to prevent crashes.
|
|
"""
|
|
carrier = request.files.get("carrier")
|
|
payload_size = request.form.get("payload_size", type=int)
|
|
embed_mode = request.form.get("embed_mode", "lsb")
|
|
|
|
if not carrier or payload_size is None:
|
|
return jsonify({"error": "Missing carrier or payload_size"}), 400
|
|
|
|
if embed_mode not in ("lsb", "dct"):
|
|
return jsonify({"error": "Invalid embed_mode"}), 400
|
|
|
|
if embed_mode == "dct" and not has_dct_support():
|
|
return jsonify({"error": "DCT mode requires scipy"}), 400
|
|
|
|
try:
|
|
carrier_data = carrier.read()
|
|
|
|
# Use subprocess-isolated capacity check
|
|
result = subprocess_stego.check_capacity(
|
|
carrier_data=carrier_data,
|
|
payload_size=payload_size,
|
|
embed_mode=embed_mode,
|
|
)
|
|
|
|
if not result.success:
|
|
return jsonify({"error": result.error or "Capacity check failed"}), 500
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"fits": result.fits,
|
|
"payload_size": result.payload_size,
|
|
"capacity": result.capacity,
|
|
"usage_percent": round(result.usage_percent, 1),
|
|
"headroom": result.headroom,
|
|
"mode": result.mode,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# ENCODE
|
|
# ============================================================================
|
|
|
|
|
|
def _run_encode_job(job_id: str, encode_params: dict) -> None:
|
|
"""Background thread function for async encode."""
|
|
progress_file = get_progress_file_path(job_id)
|
|
|
|
try:
|
|
_store_job(job_id, {"status": "running", "created": time.time()})
|
|
|
|
# Run encode with progress file
|
|
if encode_params.get("file_data"):
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=encode_params["carrier_data"],
|
|
reference_data=encode_params["ref_data"],
|
|
file_data=encode_params["file_data"],
|
|
file_name=encode_params["file_name"],
|
|
file_mime=encode_params["file_mime"],
|
|
passphrase=encode_params["passphrase"],
|
|
pin=encode_params.get("pin"),
|
|
rsa_key_data=encode_params.get("rsa_key_data"),
|
|
rsa_password=encode_params.get("key_password"),
|
|
embed_mode=encode_params["embed_mode"],
|
|
dct_output_format=encode_params.get("dct_output_format", "png"),
|
|
dct_color_mode=encode_params.get("dct_color_mode", "color"),
|
|
channel_key=encode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
)
|
|
else:
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=encode_params["carrier_data"],
|
|
reference_data=encode_params["ref_data"],
|
|
message=encode_params["message"],
|
|
passphrase=encode_params["passphrase"],
|
|
pin=encode_params.get("pin"),
|
|
rsa_key_data=encode_params.get("rsa_key_data"),
|
|
rsa_password=encode_params.get("key_password"),
|
|
embed_mode=encode_params["embed_mode"],
|
|
dct_output_format=encode_params.get("dct_output_format", "png"),
|
|
dct_color_mode=encode_params.get("dct_color_mode", "color"),
|
|
channel_key=encode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
)
|
|
|
|
if not encode_result.success:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": encode_result.error or "Encoding failed",
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
return
|
|
|
|
# Determine output format
|
|
embed_mode = encode_params["embed_mode"]
|
|
dct_output_format = encode_params.get("dct_output_format", "png")
|
|
dct_color_mode = encode_params.get("dct_color_mode", "color")
|
|
|
|
if embed_mode == "dct" and dct_output_format == "jpeg":
|
|
output_ext = ".jpg"
|
|
output_mime = "image/jpeg"
|
|
else:
|
|
output_ext = ".png"
|
|
output_mime = "image/png"
|
|
|
|
filename = encode_result.filename
|
|
if not filename:
|
|
filename = generate_filename("stego", output_ext)
|
|
elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"):
|
|
filename = filename[:-4] + ".jpg"
|
|
|
|
# Store result
|
|
file_id = secrets.token_urlsafe(16)
|
|
temp_storage.save_temp_file(file_id, encode_result.stego_data, {
|
|
"filename": filename,
|
|
"embed_mode": embed_mode,
|
|
"output_format": dct_output_format if embed_mode == "dct" else "png",
|
|
"color_mode": dct_color_mode if embed_mode == "dct" else None,
|
|
"mime_type": output_mime,
|
|
"channel_mode": encode_result.channel_mode,
|
|
"channel_fingerprint": encode_result.channel_fingerprint,
|
|
})
|
|
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"file_id": file_id,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": str(e),
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
finally:
|
|
cleanup_progress_file(job_id)
|
|
|
|
|
|
@app.route("/encode", methods=["GET", "POST"])
|
|
@login_required
|
|
def encode_page():
|
|
if request.method == "POST":
|
|
# Check if async mode requested
|
|
is_async = request.form.get("async") == "true" or request.headers.get("X-Async") == "true"
|
|
|
|
def _error_response(msg):
|
|
"""Return error as JSON (async) or HTML flash (sync)."""
|
|
if is_async:
|
|
return jsonify({"error": msg}), 400
|
|
flash(msg, "error")
|
|
return render_template("encode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
try:
|
|
# Get files
|
|
ref_photo = request.files.get("reference_photo")
|
|
carrier = request.files.get("carrier")
|
|
rsa_key_file = request.files.get("rsa_key")
|
|
payload_file = request.files.get("payload_file")
|
|
|
|
if not ref_photo or not carrier:
|
|
return _error_response("Both reference photo and carrier image are required")
|
|
|
|
if not allowed_image(ref_photo.filename) or not allowed_image(carrier.filename):
|
|
return _error_response("Invalid file type. Use PNG, JPG, or BMP")
|
|
|
|
# Get form data - v3.2.0: renamed from day_phrase to passphrase
|
|
message = request.form.get("message", "")
|
|
passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed
|
|
pin = request.form.get("pin", "").strip()
|
|
rsa_password = request.form.get("rsa_password", "")
|
|
payload_type = request.form.get("payload_type", "text")
|
|
|
|
# NEW in v3.0 - Embedding mode
|
|
embed_mode = request.form.get("embed_mode", "lsb")
|
|
if embed_mode not in ("lsb", "dct"):
|
|
embed_mode = "lsb"
|
|
|
|
# NEW in v3.0.1 - DCT output format
|
|
dct_output_format = request.form.get("dct_output_format", "png")
|
|
if dct_output_format not in ("png", "jpeg"):
|
|
dct_output_format = "png"
|
|
|
|
# NEW in v3.0.1 - DCT color mode
|
|
dct_color_mode = request.form.get("dct_color_mode", "color")
|
|
if dct_color_mode not in ("grayscale", "color"):
|
|
dct_color_mode = "color"
|
|
|
|
# NEW in v4.0.0 - Channel key
|
|
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
|
|
|
|
# Check DCT availability
|
|
if embed_mode == "dct" and not has_dct_support():
|
|
return _error_response("DCT mode requires scipy. Install with: pip install scipy")
|
|
|
|
# Determine payload
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
# File payload
|
|
file_data = payload_file.read()
|
|
|
|
result = validate_file_payload(file_data, payload_file.filename)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
mime_type, _ = mimetypes.guess_type(payload_file.filename)
|
|
payload = FilePayload(
|
|
data=file_data, filename=payload_file.filename, mime_type=mime_type
|
|
)
|
|
else:
|
|
# Text message
|
|
result = validate_message(message)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
payload = message
|
|
|
|
# v3.2.0: Renamed from day_phrase
|
|
if not passphrase:
|
|
return _error_response("Passphrase is required")
|
|
|
|
# v3.2.0: Validate passphrase
|
|
result = validate_passphrase(passphrase)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Show warning if passphrase is short
|
|
if result.warning:
|
|
flash(result.warning, "warning")
|
|
|
|
# Read files
|
|
ref_data = ref_photo.read()
|
|
carrier_data = carrier.read()
|
|
|
|
# Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5)
|
|
rsa_key_data = None
|
|
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
|
|
rsa_key_qr = request.files.get("rsa_key_qr")
|
|
rsa_key_from_qr = False
|
|
|
|
if rsa_key_pem:
|
|
# Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd)
|
|
if is_compressed(rsa_key_pem):
|
|
rsa_key_pem = decompress_data(rsa_key_pem)
|
|
rsa_key_data = rsa_key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
elif rsa_key_file and rsa_key_file.filename:
|
|
rsa_key_data = rsa_key_file.read()
|
|
elif rsa_key_qr and rsa_key_qr.filename and HAS_QRCODE_READ:
|
|
qr_image_data = rsa_key_qr.read()
|
|
key_pem = extract_key_from_qr(qr_image_data)
|
|
if key_pem:
|
|
rsa_key_data = key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
else:
|
|
return _error_response("Could not extract RSA key from QR code image.")
|
|
|
|
# Validate security factors
|
|
result = validate_security_factors(pin, rsa_key_data)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Validate PIN if provided
|
|
if pin:
|
|
result = validate_pin(pin)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Determine key password
|
|
key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
|
|
|
|
# Validate RSA key if provided
|
|
if rsa_key_data:
|
|
result = validate_rsa_key(rsa_key_data, key_password)
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Validate carrier image
|
|
result = validate_image(carrier_data, "Carrier image")
|
|
if not result.is_valid:
|
|
return _error_response(result.error_message)
|
|
|
|
# Pre-check payload capacity BEFORE encode (fail fast)
|
|
from stegasoo.steganography import will_fit_by_mode
|
|
|
|
payload_size = (
|
|
len(payload.data) if hasattr(payload, "data") else len(payload.encode("utf-8"))
|
|
)
|
|
fit_check = will_fit_by_mode(payload_size, carrier_data, embed_mode=embed_mode)
|
|
if not fit_check.get("fits", True):
|
|
error_msg = (
|
|
f"Payload too large for {embed_mode.upper()} mode. "
|
|
f"Payload: {payload_size:,} bytes, "
|
|
f"Capacity: {fit_check.get('capacity', 0):,} bytes"
|
|
)
|
|
# Suggest alternative mode
|
|
if embed_mode == "dct":
|
|
alt_check = will_fit_by_mode(payload_size, carrier_data, embed_mode="lsb")
|
|
if alt_check.get("fits"):
|
|
error_msg += " - Try LSB mode instead."
|
|
return _error_response(error_msg)
|
|
|
|
# Build encode params for either sync or async
|
|
encode_params = {
|
|
"carrier_data": carrier_data,
|
|
"ref_data": ref_data,
|
|
"passphrase": passphrase,
|
|
"pin": pin if pin else None,
|
|
"rsa_key_data": rsa_key_data,
|
|
"key_password": key_password,
|
|
"embed_mode": embed_mode,
|
|
"dct_output_format": dct_output_format if embed_mode == "dct" else "png",
|
|
"dct_color_mode": dct_color_mode if embed_mode == "dct" else "color",
|
|
"channel_key": channel_key,
|
|
}
|
|
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
encode_params["file_data"] = payload.data
|
|
encode_params["file_name"] = payload.filename
|
|
encode_params["file_mime"] = payload.mime_type
|
|
else:
|
|
encode_params["message"] = payload
|
|
|
|
# ASYNC MODE: Start background job and return JSON
|
|
if is_async:
|
|
job_id = generate_job_id()
|
|
_store_job(job_id, {"status": "pending", "created": time.time()})
|
|
_executor.submit(_run_encode_job, job_id, encode_params)
|
|
return jsonify({"job_id": job_id, "status": "pending"})
|
|
|
|
# SYNC MODE: Run inline (original behavior)
|
|
if payload_type == "file" and payload_file and payload_file.filename:
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=carrier_data,
|
|
reference_data=ref_data,
|
|
file_data=payload.data,
|
|
file_name=payload.filename,
|
|
file_mime=payload.mime_type,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
dct_output_format=dct_output_format if embed_mode == "dct" else "png",
|
|
dct_color_mode=dct_color_mode if embed_mode == "dct" else "color",
|
|
channel_key=channel_key,
|
|
)
|
|
else:
|
|
encode_result = subprocess_stego.encode(
|
|
carrier_data=carrier_data,
|
|
reference_data=ref_data,
|
|
message=payload,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
dct_output_format=dct_output_format if embed_mode == "dct" else "png",
|
|
dct_color_mode=dct_color_mode if embed_mode == "dct" else "color",
|
|
channel_key=channel_key,
|
|
)
|
|
|
|
# Check for subprocess errors
|
|
if not encode_result.success:
|
|
error_msg = encode_result.error or "Encoding failed"
|
|
if "capacity" in error_msg.lower():
|
|
raise CapacityError(error_msg)
|
|
raise StegasooError(error_msg)
|
|
|
|
# Determine actual output format for filename and storage
|
|
if embed_mode == "dct" and dct_output_format == "jpeg":
|
|
output_ext = ".jpg"
|
|
output_mime = "image/jpeg"
|
|
else:
|
|
output_ext = ".png"
|
|
output_mime = "image/png"
|
|
|
|
# Use filename from result or generate one
|
|
filename = encode_result.filename
|
|
if not filename:
|
|
filename = generate_filename("stego", output_ext)
|
|
elif embed_mode == "dct" and dct_output_format == "jpeg" and filename.endswith(".png"):
|
|
filename = filename[:-4] + ".jpg"
|
|
|
|
# Store temporarily
|
|
file_id = secrets.token_urlsafe(16)
|
|
cleanup_temp_files()
|
|
temp_storage.save_temp_file(file_id, encode_result.stego_data, {
|
|
"filename": filename,
|
|
"embed_mode": embed_mode,
|
|
"output_format": dct_output_format if embed_mode == "dct" else "png",
|
|
"color_mode": dct_color_mode if embed_mode == "dct" else None,
|
|
"mime_type": output_mime,
|
|
# Channel info (v4.0.0)
|
|
"channel_mode": encode_result.channel_mode,
|
|
"channel_fingerprint": encode_result.channel_fingerprint,
|
|
})
|
|
|
|
return redirect(url_for("encode_result", file_id=file_id))
|
|
|
|
except CapacityError as e:
|
|
return _error_response(str(e))
|
|
except StegasooError as e:
|
|
return _error_response(str(e))
|
|
except Exception as e:
|
|
return _error_response(f"Error: {e}")
|
|
|
|
return render_template("encode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
|
|
# ============================================================================
|
|
# ENCODE PROGRESS ENDPOINTS (v4.1.2)
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/encode/status/<job_id>")
|
|
@login_required
|
|
def encode_status(job_id):
|
|
"""Get the status of an async encode job."""
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
response = {"status": job.get("status", "unknown")}
|
|
|
|
if job["status"] == "complete":
|
|
response["file_id"] = job.get("file_id")
|
|
elif job["status"] == "error":
|
|
response["error"] = job.get("error", "Unknown error")
|
|
|
|
return jsonify(response)
|
|
|
|
|
|
@app.route("/encode/progress/<job_id>")
|
|
@login_required
|
|
def encode_progress(job_id):
|
|
"""Get the progress of an async encode job."""
|
|
progress = read_progress(job_id)
|
|
if progress:
|
|
return jsonify(progress)
|
|
|
|
# No progress file yet - check job status
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
if job["status"] == "complete":
|
|
return jsonify({"percent": 100, "phase": "complete"})
|
|
elif job["status"] == "error":
|
|
return jsonify({"percent": 0, "phase": "error", "error": job.get("error")})
|
|
elif job["status"] == "pending":
|
|
return jsonify({"percent": 0, "phase": "starting"})
|
|
|
|
# Running but no progress file yet
|
|
return jsonify({"percent": 0, "phase": "initializing"})
|
|
|
|
|
|
@app.route("/encode/result/<file_id>")
|
|
@login_required
|
|
def encode_result(file_id):
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
flash("File expired or not found. Please encode again.", "error")
|
|
return redirect(url_for("encode_page"))
|
|
|
|
# Generate thumbnail
|
|
thumbnail_data = generate_thumbnail(file_info["data"])
|
|
thumbnail_id = None
|
|
|
|
if thumbnail_data:
|
|
thumbnail_id = f"{file_id}_thumb"
|
|
temp_storage.save_thumbnail(thumbnail_id, thumbnail_data)
|
|
|
|
return render_template(
|
|
"encode_result.html",
|
|
file_id=file_id,
|
|
filename=file_info["filename"],
|
|
thumbnail_url=url_for("encode_thumbnail", thumb_id=thumbnail_id) if thumbnail_id else None,
|
|
embed_mode=file_info.get("embed_mode", "lsb"),
|
|
output_format=file_info.get("output_format", "png"),
|
|
color_mode=file_info.get("color_mode"),
|
|
# Channel info (v4.0.0)
|
|
channel_mode=file_info.get("channel_mode", "public"),
|
|
channel_fingerprint=file_info.get("channel_fingerprint"),
|
|
)
|
|
|
|
|
|
@app.route("/encode/thumbnail/<thumb_id>")
|
|
@login_required
|
|
def encode_thumbnail(thumb_id):
|
|
"""Serve thumbnail image."""
|
|
thumb_data = temp_storage.get_thumbnail(thumb_id)
|
|
if not thumb_data:
|
|
return "Thumbnail not found", 404
|
|
|
|
return send_file(
|
|
io.BytesIO(thumb_data), mimetype="image/jpeg", as_attachment=False
|
|
)
|
|
|
|
|
|
@app.route("/encode/download/<file_id>")
|
|
@login_required
|
|
def encode_download(file_id):
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
flash("File expired or not found.", "error")
|
|
return redirect(url_for("encode_page"))
|
|
|
|
mime_type = file_info.get("mime_type", "image/png")
|
|
|
|
return send_file(
|
|
io.BytesIO(file_info["data"]),
|
|
mimetype=mime_type,
|
|
as_attachment=True,
|
|
download_name=file_info["filename"],
|
|
)
|
|
|
|
|
|
@app.route("/encode/file/<file_id>")
|
|
@login_required
|
|
def encode_file_route(file_id):
|
|
"""Serve file for Web Share API."""
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
return "Not found", 404
|
|
|
|
mime_type = file_info.get("mime_type", "image/png")
|
|
|
|
return send_file(
|
|
io.BytesIO(file_info["data"]),
|
|
mimetype=mime_type,
|
|
as_attachment=False,
|
|
download_name=file_info["filename"],
|
|
)
|
|
|
|
|
|
@app.route("/encode/cleanup/<file_id>", methods=["POST"])
|
|
@login_required
|
|
def encode_cleanup(file_id):
|
|
"""Manually cleanup a file after sharing."""
|
|
temp_storage.delete_temp_file(file_id)
|
|
|
|
# Also cleanup thumbnail if exists
|
|
thumb_id = f"{file_id}_thumb"
|
|
temp_storage.delete_thumbnail(thumb_id)
|
|
|
|
return jsonify({"status": "ok"})
|
|
|
|
|
|
# ============================================================================
|
|
# DECODE
|
|
# ============================================================================
|
|
|
|
|
|
def _run_decode_job(job_id: str, decode_params: dict) -> None:
|
|
"""Background thread function for async decode."""
|
|
progress_file = get_progress_file_path(job_id)
|
|
|
|
try:
|
|
_store_job(job_id, {"status": "running", "created": time.time()})
|
|
|
|
# Run decode with progress file
|
|
decode_result = subprocess_stego.decode(
|
|
stego_data=decode_params["stego_data"],
|
|
reference_data=decode_params["ref_data"],
|
|
passphrase=decode_params["passphrase"],
|
|
pin=decode_params.get("pin"),
|
|
rsa_key_data=decode_params.get("rsa_key_data"),
|
|
rsa_password=decode_params.get("rsa_password"),
|
|
embed_mode=decode_params.get("embed_mode", "auto"),
|
|
channel_key=decode_params.get("channel_key"),
|
|
progress_file=progress_file,
|
|
)
|
|
|
|
if not decode_result.success:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": decode_result.error or "Decoding failed",
|
|
"error_type": decode_result.error_type,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
return
|
|
|
|
# Store result based on type
|
|
if decode_result.is_file:
|
|
file_id = secrets.token_urlsafe(16)
|
|
filename = decode_result.filename or "decoded_file"
|
|
temp_storage.save_temp_file(file_id, decode_result.file_data, {
|
|
"filename": filename,
|
|
"mime_type": decode_result.mime_type,
|
|
})
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"file_id": file_id,
|
|
"is_file": True,
|
|
"filename": filename,
|
|
"file_size": len(decode_result.file_data),
|
|
"mime_type": decode_result.mime_type,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
else:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "complete",
|
|
"is_file": False,
|
|
"message": decode_result.message,
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
_store_job(
|
|
job_id,
|
|
{
|
|
"status": "error",
|
|
"error": str(e),
|
|
"created": time.time(),
|
|
},
|
|
)
|
|
finally:
|
|
cleanup_progress_file(job_id)
|
|
|
|
|
|
@app.route("/decode", methods=["GET", "POST"])
|
|
@login_required
|
|
def decode_page():
|
|
if request.method == "POST":
|
|
try:
|
|
# Get files
|
|
ref_photo = request.files.get("reference_photo")
|
|
stego_image = request.files.get("stego_image")
|
|
rsa_key_file = request.files.get("rsa_key")
|
|
|
|
if not ref_photo or not stego_image:
|
|
flash("Both reference photo and stego image are required", "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
# Get form data - v3.2.0: renamed from day_phrase to passphrase
|
|
passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed
|
|
pin = request.form.get("pin", "").strip()
|
|
rsa_password = request.form.get("rsa_password", "")
|
|
|
|
# NEW in v3.0 - Extraction mode
|
|
embed_mode = request.form.get("embed_mode", "auto")
|
|
if embed_mode not in ("auto", "lsb", "dct"):
|
|
embed_mode = "auto"
|
|
|
|
# NEW in v4.0.0 - Channel key
|
|
channel_key = resolve_channel_key_form(request.form.get("channel_key", "auto"))
|
|
|
|
# Check DCT availability
|
|
if embed_mode == "dct" and not has_dct_support():
|
|
flash("DCT mode requires scipy. Install with: pip install scipy", "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
# v3.2.0: Removed date handling (no stego_date needed)
|
|
|
|
# v3.2.0: Renamed from day_phrase
|
|
if not passphrase:
|
|
flash("Passphrase is required", "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
# Read files
|
|
ref_data = ref_photo.read()
|
|
stego_data = stego_image.read()
|
|
|
|
# Handle RSA key - can come from .pem file, QR code image, or webcam-scanned PEM (v4.1.5)
|
|
rsa_key_data = None
|
|
rsa_key_pem = request.form.get("rsa_key_pem", "").strip()
|
|
rsa_key_qr = request.files.get("rsa_key_qr")
|
|
rsa_key_from_qr = False
|
|
|
|
if rsa_key_pem:
|
|
# Webcam-scanned PEM key (v4.1.5+) - may be compressed (zlib or zstd)
|
|
if is_compressed(rsa_key_pem):
|
|
rsa_key_pem = decompress_data(rsa_key_pem)
|
|
rsa_key_data = rsa_key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
elif rsa_key_file and rsa_key_file.filename:
|
|
rsa_key_data = rsa_key_file.read()
|
|
elif rsa_key_qr and rsa_key_qr.filename and HAS_QRCODE_READ:
|
|
qr_image_data = rsa_key_qr.read()
|
|
key_pem = extract_key_from_qr(qr_image_data)
|
|
if key_pem:
|
|
rsa_key_data = key_pem.encode("utf-8")
|
|
rsa_key_from_qr = True
|
|
else:
|
|
flash("Could not extract RSA key from QR code image.", "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
# Validate security factors
|
|
result = validate_security_factors(pin, rsa_key_data)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
# Validate PIN if provided
|
|
if pin:
|
|
result = validate_pin(pin)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
# Determine key password
|
|
key_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
|
|
|
|
# Validate RSA key if provided
|
|
if rsa_key_data:
|
|
result = validate_rsa_key(rsa_key_data, key_password)
|
|
if not result.is_valid:
|
|
flash(result.error_message, "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
# Check for async mode (v4.1.5)
|
|
is_async = request.form.get("async") == "true" or request.headers.get("X-Async") == "true"
|
|
|
|
# Build decode params
|
|
decode_params = {
|
|
"stego_data": stego_data,
|
|
"ref_data": ref_data,
|
|
"passphrase": passphrase,
|
|
"pin": pin if pin else None,
|
|
"rsa_key_data": rsa_key_data,
|
|
"rsa_password": key_password,
|
|
"embed_mode": embed_mode,
|
|
"channel_key": channel_key,
|
|
}
|
|
|
|
# ASYNC MODE: Start background job and return JSON
|
|
if is_async:
|
|
job_id = generate_job_id()
|
|
_store_job(job_id, {"status": "pending", "created": time.time()})
|
|
_executor.submit(_run_decode_job, job_id, decode_params)
|
|
return jsonify({"job_id": job_id, "status": "pending"})
|
|
|
|
# SYNC MODE: Run inline (original behavior)
|
|
# v4.0.0: Include channel_key parameter
|
|
# Use subprocess-isolated decode to prevent crashes
|
|
decode_result = subprocess_stego.decode(
|
|
stego_data=stego_data,
|
|
reference_data=ref_data,
|
|
passphrase=passphrase,
|
|
pin=pin if pin else None,
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=key_password,
|
|
embed_mode=embed_mode,
|
|
channel_key=channel_key, # v4.0.0
|
|
)
|
|
|
|
# Check for subprocess errors
|
|
if not decode_result.success:
|
|
error_msg = decode_result.error or "Decoding failed"
|
|
# Check for channel key related errors
|
|
if "channel key" in error_msg.lower():
|
|
flash(error_msg, "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
if "decrypt" in error_msg.lower() or decode_result.error_type == "DecryptionError":
|
|
raise DecryptionError(error_msg)
|
|
raise StegasooError(error_msg)
|
|
|
|
if decode_result.is_file:
|
|
# File content - store temporarily for download
|
|
file_id = secrets.token_urlsafe(16)
|
|
cleanup_temp_files()
|
|
|
|
filename = decode_result.filename or "decoded_file"
|
|
temp_storage.save_temp_file(file_id, decode_result.file_data, {
|
|
"filename": filename,
|
|
"mime_type": decode_result.mime_type,
|
|
})
|
|
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_file=True,
|
|
file_id=file_id,
|
|
filename=filename,
|
|
file_size=format_size(len(decode_result.file_data)),
|
|
mime_type=decode_result.mime_type,
|
|
has_qrcode_read=HAS_QRCODE_READ,
|
|
)
|
|
else:
|
|
# Text content
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_message=decode_result.message,
|
|
has_qrcode_read=HAS_QRCODE_READ,
|
|
)
|
|
|
|
except InvalidMagicBytesError:
|
|
flash(
|
|
"This doesn't appear to be a Stegasoo image. Try a different mode (LSB/DCT).",
|
|
"warning",
|
|
)
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
except ReedSolomonError:
|
|
flash(
|
|
"Image too corrupted to decode. It may have been re-saved or compressed.",
|
|
"error",
|
|
)
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
except InvalidHeaderError:
|
|
flash(
|
|
"Invalid or corrupted header. The image may have been modified.",
|
|
"error",
|
|
)
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
except DecryptionError:
|
|
flash(
|
|
"Wrong credentials. Double-check your reference photo, passphrase, PIN, and channel key.",
|
|
"warning",
|
|
)
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
except StegasooError as e:
|
|
flash(str(e), "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
except Exception as e:
|
|
flash(f"Error: {e}", "error")
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
return render_template("decode.html", has_qrcode_read=HAS_QRCODE_READ)
|
|
|
|
|
|
@app.route("/decode/download/<file_id>")
|
|
@login_required
|
|
def decode_download(file_id):
|
|
"""Download decoded file."""
|
|
file_info = temp_storage.get_temp_file(file_id)
|
|
if not file_info:
|
|
flash("File expired or not found.", "error")
|
|
return redirect(url_for("decode_page"))
|
|
|
|
mime_type = file_info.get("mime_type", "application/octet-stream")
|
|
|
|
return send_file(
|
|
io.BytesIO(file_info["data"]),
|
|
mimetype=mime_type,
|
|
as_attachment=True,
|
|
download_name=file_info["filename"],
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# DECODE PROGRESS ENDPOINTS (v4.1.5)
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/decode/status/<job_id>")
|
|
@login_required
|
|
def decode_status(job_id):
|
|
"""Get the status of an async decode job."""
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
response = {"status": job.get("status", "unknown")}
|
|
|
|
if job["status"] == "complete":
|
|
response["is_file"] = job.get("is_file", False)
|
|
if job.get("is_file"):
|
|
response["file_id"] = job.get("file_id")
|
|
response["filename"] = job.get("filename")
|
|
response["file_size"] = job.get("file_size")
|
|
response["mime_type"] = job.get("mime_type")
|
|
else:
|
|
response["message"] = job.get("message")
|
|
elif job["status"] == "error":
|
|
response["error"] = job.get("error", "Unknown error")
|
|
response["error_type"] = job.get("error_type")
|
|
|
|
return jsonify(response)
|
|
|
|
|
|
@app.route("/decode/progress/<job_id>")
|
|
@login_required
|
|
def decode_progress(job_id):
|
|
"""Get the progress of an async decode job."""
|
|
progress = read_progress(job_id)
|
|
if progress:
|
|
return jsonify(progress)
|
|
|
|
# No progress file yet - check job status
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
if job["status"] == "complete":
|
|
return jsonify({"percent": 100, "phase": "complete"})
|
|
elif job["status"] == "error":
|
|
return jsonify({"percent": 0, "phase": "error", "error": job.get("error")})
|
|
elif job["status"] == "pending":
|
|
return jsonify({"percent": 0, "phase": "starting"})
|
|
|
|
# Running but no progress file yet
|
|
return jsonify({"percent": 5, "phase": "reading"})
|
|
|
|
|
|
@app.route("/decode/result/<job_id>")
|
|
@login_required
|
|
def decode_result(job_id):
|
|
"""Get the result page for an async decode job."""
|
|
job = _get_job(job_id)
|
|
if not job:
|
|
flash("Job not found or expired.", "error")
|
|
return redirect(url_for("decode_page"))
|
|
|
|
if job["status"] != "complete":
|
|
flash("Decode not complete.", "error")
|
|
return redirect(url_for("decode_page"))
|
|
|
|
if job.get("is_file"):
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_file=True,
|
|
file_id=job.get("file_id"),
|
|
filename=job.get("filename"),
|
|
file_size=format_size(job.get("file_size", 0)),
|
|
mime_type=job.get("mime_type"),
|
|
has_qrcode_read=HAS_QRCODE_READ,
|
|
)
|
|
else:
|
|
return render_template(
|
|
"decode.html",
|
|
decoded_message=job.get("message"),
|
|
has_qrcode_read=HAS_QRCODE_READ,
|
|
)
|
|
|
|
|
|
@app.route("/about")
|
|
def about():
|
|
from stegasoo.channel import get_channel_status
|
|
|
|
channel_status = get_channel_status()
|
|
|
|
# Check if user is admin (for QR sharing)
|
|
current_user = get_current_user()
|
|
is_admin = current_user.is_admin if current_user else False
|
|
|
|
return render_template(
|
|
"about.html",
|
|
has_argon2=has_argon2(),
|
|
has_qrcode_read=HAS_QRCODE_READ,
|
|
# Channel info (bugfix - was not being passed)
|
|
channel_configured=channel_status["configured"],
|
|
channel_fingerprint=channel_status.get("fingerprint"),
|
|
channel_source=channel_status.get("source"),
|
|
# Admin check for QR sharing
|
|
is_admin=is_admin,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# TOOLS ROUTES (v4.1.0)
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/tools")
|
|
@login_required
|
|
def tools():
|
|
"""Advanced tools page."""
|
|
return render_template("tools.html", has_dct=has_dct_support())
|
|
|
|
|
|
@app.route("/api/tools/capacity", methods=["POST"])
|
|
@login_required
|
|
def api_tools_capacity():
|
|
"""Calculate image capacity for steganography."""
|
|
from stegasoo.dct_steganography import estimate_capacity_comparison
|
|
|
|
carrier = request.files.get("image")
|
|
if not carrier:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
try:
|
|
image_data = carrier.read()
|
|
result = estimate_capacity_comparison(image_data)
|
|
result["success"] = True
|
|
result["filename"] = carrier.filename
|
|
result["megapixels"] = round((result["width"] * result["height"]) / 1_000_000, 2)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/tools/strip-metadata", methods=["POST"])
|
|
@login_required
|
|
def api_tools_strip_metadata():
|
|
"""Strip EXIF/metadata from image."""
|
|
import io
|
|
|
|
from stegasoo.utils import strip_image_metadata
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
clean_data = strip_image_metadata(image_data, output_format="PNG")
|
|
|
|
buffer = io.BytesIO(clean_data)
|
|
filename = image_file.filename.rsplit(".", 1)[0] + "_clean.png"
|
|
|
|
return send_file(buffer, mimetype="image/png", as_attachment=True, download_name=filename)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/tools/exif", methods=["POST"])
|
|
@login_required
|
|
def api_tools_exif():
|
|
"""Read EXIF metadata from image."""
|
|
from stegasoo.utils import read_image_exif
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
exif = read_image_exif(image_data)
|
|
|
|
# Check if it's a JPEG (editable) or not
|
|
is_jpeg = image_data[:2] == b"\xff\xd8"
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"filename": image_file.filename,
|
|
"exif": exif,
|
|
"editable": is_jpeg,
|
|
"field_count": len(exif),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/tools/exif/update", methods=["POST"])
|
|
@login_required
|
|
def api_tools_exif_update():
|
|
"""Update EXIF fields in image."""
|
|
from stegasoo.utils import write_image_exif
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
# Get updates from form data
|
|
updates_json = request.form.get("updates", "{}")
|
|
try:
|
|
import json
|
|
|
|
updates = json.loads(updates_json)
|
|
except json.JSONDecodeError:
|
|
return jsonify({"success": False, "error": "Invalid updates JSON"}), 400
|
|
|
|
if not updates:
|
|
return jsonify({"success": False, "error": "No updates provided"}), 400
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
updated_data = write_image_exif(image_data, updates)
|
|
|
|
# Return as downloadable file
|
|
buffer = io.BytesIO(updated_data)
|
|
return send_file(
|
|
buffer,
|
|
mimetype="image/jpeg",
|
|
as_attachment=True,
|
|
download_name=f"exif_{image_file.filename}",
|
|
)
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/tools/exif/clear", methods=["POST"])
|
|
@login_required
|
|
def api_tools_exif_clear():
|
|
"""Remove all EXIF metadata from image."""
|
|
from stegasoo.utils import strip_image_metadata
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
# Get desired output format (default to PNG for lossless)
|
|
output_format = request.form.get("format", "PNG").upper()
|
|
if output_format not in ("PNG", "JPEG", "BMP"):
|
|
output_format = "PNG"
|
|
|
|
try:
|
|
image_data = image_file.read()
|
|
clean_data = strip_image_metadata(image_data, output_format=output_format)
|
|
|
|
# Determine extension and mimetype
|
|
ext_map = {
|
|
"PNG": ("png", "image/png"),
|
|
"JPEG": ("jpg", "image/jpeg"),
|
|
"BMP": ("bmp", "image/bmp"),
|
|
}
|
|
ext, mimetype = ext_map.get(output_format, ("png", "image/png"))
|
|
|
|
# Return as downloadable file
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
buffer = io.BytesIO(clean_data)
|
|
return send_file(
|
|
buffer,
|
|
mimetype=mimetype,
|
|
as_attachment=True,
|
|
download_name=f"{stem}_clean.{ext}",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/tools/rotate", methods=["POST"])
|
|
@login_required
|
|
def api_tools_rotate():
|
|
"""Rotate and/or flip an image."""
|
|
from PIL import Image
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
rotation = int(request.form.get("rotation", 0))
|
|
flip_h = request.form.get("flip_h", "false").lower() == "true"
|
|
flip_v = request.form.get("flip_v", "false").lower() == "true"
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(image_file.read()))
|
|
|
|
# Apply rotation (PIL rotates counter-clockwise, so negate)
|
|
if rotation:
|
|
img = img.rotate(-rotation, expand=True)
|
|
|
|
# Apply flips
|
|
if flip_h:
|
|
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
|
if flip_v:
|
|
img = img.transpose(Image.FLIP_TOP_BOTTOM)
|
|
|
|
# Output as PNG (lossless)
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="PNG")
|
|
buffer.seek(0)
|
|
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
return send_file(
|
|
buffer,
|
|
mimetype="image/png",
|
|
as_attachment=True,
|
|
download_name=f"{stem}_transformed.png",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/tools/compress", methods=["POST"])
|
|
@login_required
|
|
def api_tools_compress():
|
|
"""Compress image to JPEG at specified quality."""
|
|
from PIL import Image
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
quality = int(request.form.get("quality", 85))
|
|
quality = max(10, min(100, quality)) # Clamp to valid range
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(image_file.read()))
|
|
|
|
# Convert to RGB if necessary (JPEG doesn't support alpha)
|
|
if img.mode in ("RGBA", "LA", "P"):
|
|
img = img.convert("RGB")
|
|
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="JPEG", quality=quality)
|
|
buffer.seek(0)
|
|
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
return send_file(
|
|
buffer,
|
|
mimetype="image/jpeg",
|
|
as_attachment=True,
|
|
download_name=f"{stem}_q{quality}.jpg",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/tools/convert", methods=["POST"])
|
|
@login_required
|
|
def api_tools_convert():
|
|
"""Convert image to different format."""
|
|
from PIL import Image
|
|
|
|
image_file = request.files.get("image")
|
|
if not image_file:
|
|
return jsonify({"success": False, "error": "No image provided"}), 400
|
|
|
|
output_format = request.form.get("format", "PNG").upper()
|
|
quality = int(request.form.get("quality", 90))
|
|
quality = max(10, min(100, quality))
|
|
|
|
# Validate format
|
|
format_map = {
|
|
"PNG": ("png", "image/png"),
|
|
"JPEG": ("jpg", "image/jpeg"),
|
|
"WEBP": ("webp", "image/webp"),
|
|
}
|
|
if output_format not in format_map:
|
|
return jsonify({"success": False, "error": f"Unsupported format: {output_format}"}), 400
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(image_file.read()))
|
|
|
|
# Convert to RGB for JPEG (no alpha)
|
|
if output_format == "JPEG" and img.mode in ("RGBA", "LA", "P"):
|
|
img = img.convert("RGB")
|
|
|
|
buffer = io.BytesIO()
|
|
save_kwargs = {"format": output_format}
|
|
if output_format in ("JPEG", "WEBP"):
|
|
save_kwargs["quality"] = quality
|
|
img.save(buffer, **save_kwargs)
|
|
buffer.seek(0)
|
|
|
|
ext, mimetype = format_map[output_format]
|
|
stem = (
|
|
image_file.filename.rsplit(".", 1)[0]
|
|
if "." in image_file.filename
|
|
else image_file.filename
|
|
)
|
|
return send_file(
|
|
buffer,
|
|
mimetype=mimetype,
|
|
as_attachment=True,
|
|
download_name=f"{stem}.{ext}",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
# Add these two test routes anywhere in app.py after the app = Flask(...) line:
|
|
|
|
|
|
@app.route("/test-capacity", methods=["POST"])
|
|
def test_capacity():
|
|
"""Minimal capacity test - no stegasoo code, just PIL."""
|
|
carrier = request.files.get("carrier")
|
|
if not carrier:
|
|
return jsonify({"error": "No carrier image provided"}), 400
|
|
|
|
try:
|
|
carrier_data = carrier.read()
|
|
buffer = io.BytesIO(carrier_data)
|
|
img = Image.open(buffer)
|
|
width, height = img.size
|
|
fmt = img.format
|
|
img.close()
|
|
buffer.close()
|
|
|
|
pixels = width * height
|
|
lsb_bytes = (pixels * 3) // 8
|
|
dct_bytes = ((width // 8) * (height // 8) * 16) // 8 - 10
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"width": width,
|
|
"height": height,
|
|
"format": fmt,
|
|
"lsb_kb": round(lsb_bytes / 1024, 1),
|
|
"dct_kb": round(dct_bytes / 1024, 1),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/test-capacity-nopil", methods=["POST"])
|
|
def test_capacity_nopil():
|
|
"""Ultra-minimal test - no PIL, no stegasoo."""
|
|
carrier = request.files.get("carrier")
|
|
if not carrier:
|
|
return jsonify({"error": "No carrier image provided"}), 400
|
|
|
|
carrier_data = carrier.read()
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"data_size": len(carrier_data),
|
|
}
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# AUTHENTICATION ROUTES (v4.0.2)
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
"""Login page."""
|
|
if not app.config.get("AUTH_ENABLED", True):
|
|
return redirect(url_for("index"))
|
|
|
|
if not user_exists():
|
|
return redirect(url_for("setup"))
|
|
|
|
if is_authenticated():
|
|
return redirect(url_for("index"))
|
|
|
|
if request.method == "POST":
|
|
username = request.form.get("username", "")
|
|
password = request.form.get("password", "")
|
|
user = verify_user_password(username, password)
|
|
if user:
|
|
login_user(user)
|
|
session.permanent = True
|
|
flash("Login successful", "success")
|
|
return redirect(url_for("index"))
|
|
else:
|
|
flash("Invalid username or password", "error")
|
|
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
"""Logout and clear session."""
|
|
logout_user()
|
|
flash("Logged out successfully", "success")
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@app.route("/setup", methods=["GET", "POST"])
|
|
def setup():
|
|
"""First-run setup page - create admin account (Step 1)."""
|
|
if not app.config.get("AUTH_ENABLED", True):
|
|
return redirect(url_for("index"))
|
|
|
|
if user_exists():
|
|
return redirect(url_for("login"))
|
|
|
|
if request.method == "POST":
|
|
username = request.form.get("username", "admin")
|
|
password = request.form.get("password", "")
|
|
password_confirm = request.form.get("password_confirm", "")
|
|
|
|
if password != password_confirm:
|
|
flash("Passwords do not match", "error")
|
|
else:
|
|
success, message = create_admin_user(username, password)
|
|
if success:
|
|
# Auto-login the new admin
|
|
user = verify_user_password(username, password)
|
|
if user:
|
|
login_user(user)
|
|
session.permanent = True
|
|
# Redirect to recovery key setup (Step 2)
|
|
return redirect(url_for("setup_recovery"))
|
|
else:
|
|
flash(message, "error")
|
|
|
|
return render_template("setup.html")
|
|
|
|
|
|
@app.route("/setup/recovery", methods=["GET", "POST"])
|
|
@login_required
|
|
def setup_recovery():
|
|
"""Recovery key setup page (Step 2 of initial setup)."""
|
|
import base64
|
|
|
|
from stegasoo.recovery import generate_recovery_key, generate_recovery_qr, hash_recovery_key
|
|
|
|
# Only allow during initial setup (no recovery key yet, first admin)
|
|
if has_recovery_key():
|
|
return redirect(url_for("index"))
|
|
|
|
current_user = get_current_user()
|
|
if current_user.role != "admin":
|
|
return redirect(url_for("index"))
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "skip":
|
|
# No recovery key - most secure but no way to recover
|
|
flash("Setup complete. No recovery key configured.", "warning")
|
|
return redirect(url_for("index"))
|
|
|
|
elif action == "save":
|
|
# User confirmed they saved the key
|
|
recovery_key = request.form.get("recovery_key")
|
|
if recovery_key:
|
|
key_hash = hash_recovery_key(recovery_key)
|
|
set_recovery_key_hash(key_hash)
|
|
flash("Setup complete. Recovery key saved.", "success")
|
|
return redirect(url_for("index"))
|
|
|
|
# Generate a new key to show
|
|
recovery_key = generate_recovery_key()
|
|
|
|
# Generate QR code as base64
|
|
try:
|
|
qr_bytes = generate_recovery_qr(recovery_key)
|
|
qr_base64 = base64.b64encode(qr_bytes).decode("utf-8")
|
|
except ImportError:
|
|
qr_base64 = None
|
|
|
|
return render_template(
|
|
"setup_recovery.html",
|
|
recovery_key=recovery_key,
|
|
qr_base64=qr_base64,
|
|
)
|
|
|
|
|
|
@app.route("/recover", methods=["GET", "POST"])
|
|
def recover():
|
|
"""Password recovery page - reset password using recovery key."""
|
|
# Don't show if no recovery key configured
|
|
if not get_recovery_key_hash():
|
|
flash("No recovery key configured for this instance", "error")
|
|
return redirect(url_for("login"))
|
|
|
|
if request.method == "POST":
|
|
recovery_key = request.form.get("recovery_key", "").strip()
|
|
new_password = request.form.get("new_password", "")
|
|
new_password_confirm = request.form.get("new_password_confirm", "")
|
|
|
|
if not recovery_key:
|
|
flash("Please enter your recovery key", "error")
|
|
elif new_password != new_password_confirm:
|
|
flash("Passwords do not match", "error")
|
|
elif len(new_password) < 8:
|
|
flash("Password must be at least 8 characters", "error")
|
|
else:
|
|
success, message = verify_and_reset_admin_password(recovery_key, new_password)
|
|
if success:
|
|
flash("Password reset successfully. Please login.", "success")
|
|
return redirect(url_for("login"))
|
|
else:
|
|
flash(message, "error")
|
|
|
|
return render_template("recover.html")
|
|
|
|
|
|
@app.route("/account/recovery/regenerate", methods=["GET", "POST"])
|
|
@login_required
|
|
@admin_required
|
|
def regenerate_recovery():
|
|
"""Generate a new recovery key (replaces existing one)."""
|
|
import base64
|
|
|
|
from stegasoo.recovery import generate_recovery_key, generate_recovery_qr, hash_recovery_key
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "cancel":
|
|
flash("Recovery key generation cancelled", "warning")
|
|
return redirect(url_for("account"))
|
|
|
|
elif action == "save":
|
|
# User confirmed they saved the key
|
|
recovery_key = request.form.get("recovery_key")
|
|
if recovery_key:
|
|
key_hash = hash_recovery_key(recovery_key)
|
|
set_recovery_key_hash(key_hash)
|
|
flash("New recovery key saved successfully", "success")
|
|
return redirect(url_for("account"))
|
|
|
|
# Generate a new key to show
|
|
recovery_key = generate_recovery_key()
|
|
|
|
# Generate QR code as base64
|
|
try:
|
|
qr_bytes = generate_recovery_qr(recovery_key)
|
|
qr_base64 = base64.b64encode(qr_bytes).decode("utf-8")
|
|
except ImportError:
|
|
qr_base64 = None
|
|
|
|
return render_template(
|
|
"regenerate_recovery.html",
|
|
recovery_key=recovery_key,
|
|
qr_base64=qr_base64,
|
|
has_existing=has_recovery_key(),
|
|
)
|
|
|
|
|
|
@app.route("/account/recovery/disable", methods=["POST"])
|
|
@login_required
|
|
@admin_required
|
|
def disable_recovery():
|
|
"""Disable recovery key (no password reset possible)."""
|
|
if clear_recovery_key():
|
|
flash("Recovery key disabled. Password reset is no longer possible.", "warning")
|
|
else:
|
|
flash("No recovery key was configured", "error")
|
|
return redirect(url_for("account"))
|
|
|
|
|
|
@app.route("/account/recovery/stego-backup", methods=["POST"])
|
|
@login_required
|
|
@admin_required
|
|
def create_stego_backup():
|
|
"""Create stego backup - hide recovery key in an image."""
|
|
from stegasoo.recovery import create_stego_backup as make_backup
|
|
|
|
recovery_key = request.form.get("recovery_key", "")
|
|
if not recovery_key:
|
|
flash("No recovery key provided", "error")
|
|
return redirect(url_for("regenerate_recovery"))
|
|
|
|
if "carrier_image" not in request.files:
|
|
flash("No image uploaded", "error")
|
|
return redirect(url_for("regenerate_recovery"))
|
|
|
|
carrier_file = request.files["carrier_image"]
|
|
if not carrier_file.filename:
|
|
flash("No image selected", "error")
|
|
return redirect(url_for("regenerate_recovery"))
|
|
|
|
try:
|
|
carrier_data = carrier_file.read()
|
|
stego_data = make_backup(recovery_key, carrier_data)
|
|
|
|
# Return as downloadable PNG
|
|
buffer = io.BytesIO(stego_data)
|
|
return send_file(
|
|
buffer,
|
|
mimetype="image/png",
|
|
as_attachment=True,
|
|
download_name="stegasoo-recovery-backup.png",
|
|
)
|
|
except ValueError as e:
|
|
flash(str(e), "error")
|
|
return redirect(url_for("regenerate_recovery"))
|
|
|
|
|
|
@app.route("/recover/stego", methods=["POST"])
|
|
def recover_from_stego():
|
|
"""Extract recovery key from stego backup image."""
|
|
from stegasoo.recovery import extract_stego_backup
|
|
|
|
if "stego_image" not in request.files or "reference_image" not in request.files:
|
|
flash("Both stego image and reference image are required", "error")
|
|
return redirect(url_for("recover"))
|
|
|
|
stego_file = request.files["stego_image"]
|
|
reference_file = request.files["reference_image"]
|
|
|
|
if not stego_file.filename or not reference_file.filename:
|
|
flash("Both images must be selected", "error")
|
|
return redirect(url_for("recover"))
|
|
|
|
try:
|
|
stego_data = stego_file.read()
|
|
reference_data = reference_file.read()
|
|
|
|
extracted_key = extract_stego_backup(stego_data, reference_data)
|
|
|
|
if extracted_key:
|
|
# Return the key to pre-fill the recovery form
|
|
return render_template("recover.html", prefilled_key=extracted_key)
|
|
else:
|
|
flash("Could not extract recovery key. Check images are correct.", "error")
|
|
return redirect(url_for("recover"))
|
|
|
|
except Exception as e:
|
|
flash(f"Extraction failed: {e}", "error")
|
|
return redirect(url_for("recover"))
|
|
|
|
|
|
@app.route("/account", methods=["GET", "POST"])
|
|
@login_required
|
|
def account():
|
|
"""Account management page."""
|
|
current_user = get_current_user()
|
|
|
|
if request.method == "POST":
|
|
current = request.form.get("current_password", "")
|
|
new = request.form.get("new_password", "")
|
|
new_confirm = request.form.get("new_password_confirm", "")
|
|
|
|
if new != new_confirm:
|
|
flash("New passwords do not match", "error")
|
|
else:
|
|
success, message = change_password(current_user.id, current, new)
|
|
flash(message, "success" if success else "error")
|
|
|
|
# Get saved channel keys
|
|
channel_keys = get_user_channel_keys(current_user.id)
|
|
|
|
return render_template(
|
|
"account.html",
|
|
username=current_user.username,
|
|
user=current_user,
|
|
is_admin=current_user.is_admin,
|
|
has_recovery=has_recovery_key(),
|
|
channel_keys=channel_keys,
|
|
max_channel_keys=MAX_CHANNEL_KEYS,
|
|
can_save_key=can_save_channel_key(current_user.id),
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# CHANNEL KEY MANAGEMENT ROUTES (v4.2.0)
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/account/keys/save", methods=["POST"])
|
|
@login_required
|
|
def account_save_key():
|
|
"""Save a new channel key."""
|
|
current_user = get_current_user()
|
|
name = request.form.get("key_name", "").strip()
|
|
channel_key = request.form.get("channel_key", "").strip()
|
|
|
|
# Normalize key format (remove dashes if present)
|
|
channel_key = channel_key.replace("-", "").lower()
|
|
|
|
success, message, key = save_channel_key(current_user.id, name, channel_key)
|
|
flash(message, "success" if success else "error")
|
|
return redirect(url_for("account"))
|
|
|
|
|
|
@app.route("/account/keys/<int:key_id>/delete", methods=["POST"])
|
|
@login_required
|
|
def account_delete_key(key_id):
|
|
"""Delete a saved channel key."""
|
|
current_user = get_current_user()
|
|
success, message = delete_channel_key(key_id, current_user.id)
|
|
flash(message, "success" if success else "error")
|
|
return redirect(url_for("account"))
|
|
|
|
|
|
@app.route("/account/keys/<int:key_id>/rename", methods=["POST"])
|
|
@login_required
|
|
def account_rename_key(key_id):
|
|
"""Rename a saved channel key."""
|
|
current_user = get_current_user()
|
|
new_name = request.form.get("new_name", "").strip()
|
|
success, message = update_channel_key_name(key_id, current_user.id, new_name)
|
|
flash(message, "success" if success else "error")
|
|
return redirect(url_for("account"))
|
|
|
|
|
|
@app.route("/api/channel/keys")
|
|
@login_required
|
|
def api_channel_keys():
|
|
"""Get saved channel keys for current user (JSON API)."""
|
|
current_user = get_current_user()
|
|
keys = get_user_channel_keys(current_user.id)
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"keys": [
|
|
{
|
|
"id": k.id,
|
|
"name": k.name,
|
|
"fingerprint": f"{k.channel_key[:4]}...{k.channel_key[-4:]}",
|
|
"channel_key": k.channel_key,
|
|
"last_used_at": k.last_used_at,
|
|
}
|
|
for k in keys
|
|
],
|
|
"can_save": can_save_channel_key(current_user.id),
|
|
"max_keys": MAX_CHANNEL_KEYS,
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/api/channel/keys/<int:key_id>/use", methods=["POST"])
|
|
@login_required
|
|
def api_channel_key_use(key_id):
|
|
"""Mark a channel key as used (updates last_used_at)."""
|
|
current_user = get_current_user()
|
|
key = get_channel_key_by_id(key_id, current_user.id)
|
|
if not key:
|
|
return jsonify({"success": False, "error": "Key not found"}), 404
|
|
|
|
update_channel_key_last_used(key_id, current_user.id)
|
|
return jsonify({"success": True})
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN ROUTES (v4.1.0)
|
|
# ============================================================================
|
|
|
|
|
|
@app.route("/admin/settings")
|
|
@admin_required
|
|
def admin_settings():
|
|
"""System settings page (admin only)."""
|
|
import platform
|
|
import sys
|
|
|
|
from stegasoo import __version__
|
|
from stegasoo.channel import get_channel_status
|
|
|
|
channel_status = get_channel_status()
|
|
|
|
return render_template(
|
|
"admin/settings.html",
|
|
# Channel info (key hidden until password verified)
|
|
channel_configured=channel_status["configured"],
|
|
channel_fingerprint=channel_status.get("fingerprint"),
|
|
channel_source=channel_status.get("source"),
|
|
# Server config
|
|
hostname=os.environ.get("STEGASOO_HOSTNAME") or socket.gethostname(),
|
|
port=os.environ.get("STEGASOO_PORT", "5000"),
|
|
https_enabled=app.config.get("HTTPS_ENABLED", False),
|
|
auth_enabled=app.config.get("AUTH_ENABLED", True),
|
|
max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024,
|
|
max_upload_mb=MAX_FILE_SIZE // (1024 * 1024),
|
|
dct_available=has_dct_support(),
|
|
qr_available=HAS_QRCODE_READ,
|
|
# Environment
|
|
version=__version__,
|
|
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
|
|
platform=platform.system(),
|
|
kdf_type="Argon2id" if has_argon2() else "PBKDF2",
|
|
)
|
|
|
|
|
|
@app.route("/admin/settings/unlock", methods=["POST"])
|
|
@admin_required
|
|
def admin_settings_unlock():
|
|
"""Verify password and return channel key (AJAX)."""
|
|
from stegasoo.channel import get_channel_status
|
|
|
|
data = request.get_json() or {}
|
|
password = data.get("password", "")
|
|
|
|
if not password:
|
|
return jsonify({"success": False, "error": "Password required"})
|
|
|
|
# Get current user and verify password
|
|
username = get_username()
|
|
user = verify_user_password(username, password)
|
|
|
|
if not user:
|
|
return jsonify({"success": False, "error": "Incorrect password"})
|
|
|
|
# Password verified - return channel key
|
|
channel_status = get_channel_status()
|
|
channel_key = channel_status.get("key") if channel_status["configured"] else ""
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"channel_key": channel_key
|
|
})
|
|
|
|
|
|
@app.route("/admin/users")
|
|
@admin_required
|
|
def admin_users():
|
|
"""User management page (admin only)."""
|
|
users = get_all_users()
|
|
current_user = get_current_user()
|
|
return render_template(
|
|
"admin/users.html",
|
|
users=users,
|
|
current_user=current_user,
|
|
user_count=get_non_admin_count(),
|
|
max_users=MAX_USERS,
|
|
can_create=can_create_user(),
|
|
)
|
|
|
|
|
|
@app.route("/admin/users/new", methods=["GET", "POST"])
|
|
@admin_required
|
|
def admin_user_new():
|
|
"""Create new user (admin only)."""
|
|
if request.method == "POST":
|
|
username = request.form.get("username", "")
|
|
password = request.form.get("password", "")
|
|
|
|
success, message, user = create_user(username, password)
|
|
|
|
# Check if AJAX request
|
|
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
|
|
if success:
|
|
return jsonify({"success": True, "username": username, "password": password})
|
|
else:
|
|
return jsonify({"success": False, "error": message})
|
|
|
|
# Regular form submission fallback
|
|
if success:
|
|
flash(f"User '{username}' created successfully", "success")
|
|
session["temp_password"] = password
|
|
session["temp_username"] = username
|
|
return redirect(url_for("admin_user_created"))
|
|
else:
|
|
flash(message, "error")
|
|
|
|
# Generate a temp password for the form
|
|
temp_password = generate_temp_password()
|
|
return render_template("admin/user_new.html", temp_password=temp_password)
|
|
|
|
|
|
@app.route("/admin/users/created")
|
|
@admin_required
|
|
def admin_user_created():
|
|
"""Show created user confirmation with password."""
|
|
username = session.pop("temp_username", None)
|
|
password = session.pop("temp_password", None)
|
|
|
|
if not username or not password:
|
|
return redirect(url_for("admin_users"))
|
|
|
|
return render_template(
|
|
"admin/user_created.html",
|
|
username=username,
|
|
password=password,
|
|
)
|
|
|
|
|
|
@app.route("/admin/users/<int:user_id>/delete", methods=["POST"])
|
|
@admin_required
|
|
def admin_user_delete(user_id):
|
|
"""Delete a user (admin only)."""
|
|
current_user = get_current_user()
|
|
success, message = delete_user(user_id, current_user.id)
|
|
flash(message, "success" if success else "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
|
|
@app.route("/admin/users/<int:user_id>/reset-password", methods=["POST"])
|
|
@admin_required
|
|
def admin_user_reset_password(user_id):
|
|
"""Reset a user's password (admin only)."""
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
flash("User not found", "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
# Generate new password
|
|
new_password = generate_temp_password()
|
|
success, message = reset_user_password(user_id, new_password)
|
|
|
|
if success:
|
|
# Store for display
|
|
session["temp_password"] = new_password
|
|
session["temp_username"] = user.username
|
|
return redirect(url_for("admin_user_password_reset"))
|
|
else:
|
|
flash(message, "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
|
|
@app.route("/admin/users/password-reset")
|
|
@admin_required
|
|
def admin_user_password_reset():
|
|
"""Show password reset confirmation."""
|
|
username = session.pop("temp_username", None)
|
|
password = session.pop("temp_password", None)
|
|
|
|
if not username or not password:
|
|
return redirect(url_for("admin_users"))
|
|
|
|
return render_template(
|
|
"admin/password_reset.html",
|
|
username=username,
|
|
password=password,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# MAIN
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
base_dir = Path(__file__).parent
|
|
|
|
# Clean up any leftover temp files from previous runs
|
|
temp_storage.init(base_dir / "temp_files")
|
|
cleaned = temp_storage.cleanup_all()
|
|
if cleaned > 0:
|
|
print(f"Cleaned up {cleaned} leftover temp files from previous run")
|
|
|
|
# HTTPS configuration
|
|
ssl_context = None
|
|
if app.config.get("HTTPS_ENABLED", False):
|
|
import socket
|
|
hostname = os.environ.get("STEGASOO_HOSTNAME") or socket.gethostname()
|
|
try:
|
|
cert_path, key_path = ensure_certs(base_dir, hostname)
|
|
if cert_path.exists() and key_path.exists():
|
|
ssl_context = (str(cert_path), str(key_path))
|
|
print(f"HTTPS enabled with self-signed certificate for {hostname}")
|
|
else:
|
|
print("ERROR: SSL certificates not found after generation attempt")
|
|
print(f" Expected: {cert_path}, {key_path}")
|
|
print(" Falling back to HTTP (INSECURE)")
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to generate SSL certificates: {e}")
|
|
print(" Falling back to HTTP (INSECURE)")
|
|
print(" To fix: mkdir -p certs && openssl req -x509 -newkey rsa:2048 \\")
|
|
print(" -keyout certs/server.key -out certs/server.crt -days 365 -nodes \\")
|
|
print(" -subj '/CN=localhost'")
|
|
|
|
# Auth status
|
|
if app.config.get("AUTH_ENABLED", True):
|
|
print("Authentication enabled")
|
|
else:
|
|
print("Authentication disabled")
|
|
|
|
port = int(os.environ.get("STEGASOO_PORT", "5000"))
|
|
app.run(
|
|
host="0.0.0.0",
|
|
port=port,
|
|
debug=False,
|
|
ssl_context=ssl_context,
|
|
)
|