Wire up auth, stego routes, and full web UI with login flow

Auth system:
- Copy auth.py from stegasoo, adapt DB path to ~/.soosef/auth/soosef.db
- Add setup/login/logout/recover/account routes
- Add admin user management routes (users, create, delete, reset)
- Full RBAC: admin_required and login_required decorators working

Stego routes (mounted directly in app.py):
- Generate credentials with QR code support
- Encode/decode/tools placeholder pages (full route migration is Phase 1b)
- Channel status API, capacity comparison API, download API

Support modules (copied verbatim from stegasoo):
- subprocess_stego.py: crash-safe subprocess isolation
- stego_worker.py: worker script for subprocess
- temp_storage.py: file-based temp storage with auto-expiry
- ssl_utils.py: self-signed cert generation

Templates and JS:
- All stegasoo templates copied to stego/ subdirectory
- Auth templates (login, setup, account, recover) at root
- Admin templates (users, settings)
- JS files: soosef.js (renamed from stegasoo.js), auth.js, generate.js

Verified: full login flow works (setup → login → authenticated routes)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee 2026-03-31 15:53:58 -04:00
parent 2f93dfce98
commit a23a034838
21 changed files with 6703 additions and 104 deletions

View File

@ -4,32 +4,48 @@ SooSeF Web Frontend
Flask application factory that unifies Stegasoo (steganography) and Verisoo
(provenance attestation) into a single web UI with fieldkit security features.
Built on Stegasoo's production-grade web UI patterns:
- Subprocess isolation for crash-safe stegasoo operations
- Async jobs with progress polling for large images
- Context processors for global template variables
- File-based temp storage with auto-expiry
ARCHITECTURE
============
BLUEPRINT STRUCTURE
===================
The stegasoo web UI (3,600+ lines, 60 routes) is mounted wholesale via
_register_stegasoo_routes() rather than being rewritten into a blueprint.
This preserves the battle-tested subprocess isolation, async job management,
and all existing route logic without modification.
/ index (dashboard)
/login, /logout auth (adapted from stegasoo)
/setup first-run wizard
SooSeF-native features (attest, fieldkit, keys) are clean blueprints.
/encode, /decode, /generate, /tools stego blueprint
/attest, /verify attest blueprint
/fieldkit/* fieldkit blueprint
/keys/* keys blueprint
/admin/* admin blueprint
Stegasoo routes (mounted at root):
/encode, /decode, /generate, /tools, /api/*
SooSeF blueprints:
/attest, /verify attest blueprint
/fieldkit/* fieldkit blueprint
/keys/* keys blueprint
/admin/* admin blueprint (extends stegasoo's)
"""
import io
import mimetypes
import os
import secrets
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from flask import Flask, redirect, render_template, url_for
from flask import (
Flask,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
session,
url_for,
)
from PIL import Image
import soosef
from soosef.config import SoosefConfig
@ -39,20 +55,19 @@ from soosef.paths import AUTH_DB, INSTANCE_DIR, SECRET_KEY_FILE, TEMP_DIR, ensur
os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0"
os.environ["OMP_NUM_THREADS"] = "1"
# Maximum upload size (50 MB default)
MAX_FILE_SIZE = 50 * 1024 * 1024
def create_app(config: SoosefConfig | None = None) -> Flask:
"""Application factory."""
config = config or SoosefConfig.load()
ensure_dirs()
web_dir = Path(__file__).parent
app = Flask(
__name__,
instance_path=str(INSTANCE_DIR),
template_folder=str(Path(__file__).parent / "templates"),
static_folder=str(Path(__file__).parent / "static"),
template_folder=str(web_dir / "templates"),
static_folder=str(web_dir / "static"),
)
app.config["MAX_CONTENT_LENGTH"] = config.max_upload_mb * 1024 * 1024
@ -63,19 +78,25 @@ def create_app(config: SoosefConfig | None = None) -> Flask:
# Persist secret key so sessions survive restarts
_load_secret_key(app)
# ── Register blueprints ───────────────────────────────────────
# ── Initialize auth ───────────────────────────────────────────
# Add web dir to path so auth.py and support modules are importable
sys.path.insert(0, str(web_dir))
from frontends.web.blueprints.stego import bp as stego_bp
from auth import init_app as init_auth, is_authenticated, is_admin, get_username
init_auth(app)
# ── Register stegasoo routes ──────────────────────────────────
_register_stegasoo_routes(app)
# ── Register SooSeF-native blueprints ─────────────────────────
from frontends.web.blueprints.attest import bp as attest_bp
from frontends.web.blueprints.fieldkit import bp as fieldkit_bp
from frontends.web.blueprints.keys import bp as keys_bp
from frontends.web.blueprints.admin import bp as admin_bp
app.register_blueprint(stego_bp)
app.register_blueprint(attest_bp)
app.register_blueprint(fieldkit_bp)
app.register_blueprint(keys_bp)
app.register_blueprint(admin_bp)
# ── Context processor (injected into ALL templates) ───────────
@ -86,7 +107,7 @@ def create_app(config: SoosefConfig | None = None) -> Flask:
ks = KeystoreManager()
ks_status = ks.status()
# Check fieldkit alert level
# Fieldkit alert level
fieldkit_status = "ok"
if config.deadman_enabled:
from soosef.fieldkit.deadman import DeadmanSwitch
@ -97,17 +118,50 @@ def create_app(config: SoosefConfig | None = None) -> Flask:
elif dm.is_overdue():
fieldkit_status = "warn"
# Check stegasoo capabilities
# Stegasoo capabilities
try:
from stegasoo import has_dct_support, HAS_AUDIO_SUPPORT
from stegasoo import get_channel_status
from stegasoo.constants import (
MAX_MESSAGE_CHARS,
MAX_FILE_PAYLOAD_SIZE,
MAX_UPLOAD_SIZE,
TEMP_FILE_EXPIRY_MINUTES,
MIN_PIN_LENGTH,
MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS,
RECOMMENDED_PASSPHRASE_WORDS,
DEFAULT_PASSPHRASE_WORDS,
__version__ as stegasoo_version,
)
has_dct = has_dct_support()
has_audio = HAS_AUDIO_SUPPORT
channel_status = get_channel_status()
# Stegasoo-specific template vars (needed by stego templates)
stego_vars = {
"has_dct": has_dct,
"has_audio": has_audio,
"max_message_chars": MAX_MESSAGE_CHARS,
"max_payload_kb": MAX_FILE_PAYLOAD_SIZE // 1024,
"max_upload_mb": MAX_UPLOAD_SIZE // (1024 * 1024),
"temp_file_expiry_minutes": TEMP_FILE_EXPIRY_MINUTES,
"min_pin_length": MIN_PIN_LENGTH,
"max_pin_length": MAX_PIN_LENGTH,
"min_passphrase_words": MIN_PASSPHRASE_WORDS,
"recommended_passphrase_words": RECOMMENDED_PASSPHRASE_WORDS,
"default_passphrase_words": DEFAULT_PASSPHRASE_WORDS,
"channel_mode": channel_status["mode"],
"channel_source": channel_status.get("source"),
"supported_audio_formats": "WAV, FLAC, MP3, OGG, AAC, M4A" if has_audio else "",
}
except ImportError:
has_dct = False
has_audio = False
stego_vars = {}
# Check verisoo availability
# Verisoo availability
try:
import verisoo # noqa: F401
@ -115,10 +169,20 @@ def create_app(config: SoosefConfig | None = None) -> Flask:
except ImportError:
has_verisoo = False
return {
# Saved channel keys for authenticated users
saved_channel_keys = []
if is_authenticated():
try:
from auth import get_current_user, get_user_channel_keys
current_user = get_current_user()
if current_user:
saved_channel_keys = get_user_channel_keys(current_user.id)
except Exception:
pass
base_vars = {
"version": soosef.__version__,
"has_dct": has_dct,
"has_audio": has_audio,
"has_verisoo": has_verisoo,
"has_fieldkit": config.killswitch_enabled or config.deadman_enabled,
"fieldkit_status": fieldkit_status,
@ -127,10 +191,15 @@ def create_app(config: SoosefConfig | None = None) -> Flask:
"identity_configured": ks_status.has_identity,
"identity_fingerprint": ks_status.identity_fingerprint or "",
"auth_enabled": app.config["AUTH_ENABLED"],
"is_authenticated": _is_authenticated(),
"is_admin": _is_admin(),
"username": _get_username(),
"is_authenticated": is_authenticated(),
"is_admin": is_admin(),
"username": get_username() if is_authenticated() else None,
"saved_channel_keys": saved_channel_keys,
# QR support flags
"has_qrcode": _HAS_QRCODE,
"has_qrcode_read": _HAS_QRCODE_READ,
}
return {**base_vars, **stego_vars}
# ── Root routes ───────────────────────────────────────────────
@ -141,6 +210,540 @@ def create_app(config: SoosefConfig | None = None) -> Flask:
return app
# ── QR support detection ─────────────────────────────────────────────
try:
import qrcode # noqa: F401
_HAS_QRCODE = True
except ImportError:
_HAS_QRCODE = False
try:
from pyzbar.pyzbar import decode as pyzbar_decode # noqa: F401
_HAS_QRCODE_READ = True
except ImportError:
_HAS_QRCODE_READ = False
# ── Stegasoo route mounting ──────────────────────────────────────────
def _register_stegasoo_routes(app: Flask) -> None:
"""
Mount all stegasoo web routes into the Flask app.
Rather than rewriting 3,600 lines of battle-tested route logic,
we import stegasoo's app.py and re-register its routes.
The stegasoo templates are in templates/stego/ and extend our base.html.
"""
import temp_storage
from subprocess_stego import (
SubprocessStego,
cleanup_progress_file,
generate_job_id,
get_progress_file_path,
read_progress,
)
from auth import login_required, admin_required
import stegasoo
from stegasoo import (
HAS_AUDIO_SUPPORT,
CapacityError,
DecryptionError,
FilePayload,
InvalidHeaderError,
InvalidMagicBytesError,
ReedSolomonError,
StegasooError,
export_rsa_key_pem,
generate_credentials,
generate_filename,
get_channel_status,
has_argon2,
has_dct_support,
load_rsa_key,
validate_channel_key,
validate_file_payload,
validate_image,
validate_message,
validate_passphrase,
validate_pin,
validate_rsa_key,
validate_security_factors,
)
from stegasoo.constants import (
DEFAULT_PASSPHRASE_WORDS,
MAX_FILE_PAYLOAD_SIZE,
MAX_FILE_SIZE,
MAX_MESSAGE_CHARS,
MAX_PIN_LENGTH,
MAX_UPLOAD_SIZE,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
RECOMMENDED_PASSPHRASE_WORDS,
TEMP_FILE_EXPIRY,
TEMP_FILE_EXPIRY_MINUTES,
THUMBNAIL_QUALITY,
THUMBNAIL_SIZE,
VALID_RSA_SIZES,
__version__,
)
from stegasoo.qr_utils import (
can_fit_in_qr,
decompress_data,
detect_and_crop_qr,
extract_key_from_qr,
generate_qr_code,
is_compressed,
)
from stegasoo.channel import resolve_channel_key
# Initialize subprocess wrapper
subprocess_stego = SubprocessStego(timeout=180)
# Async job management
_executor = ThreadPoolExecutor(max_workers=2)
_jobs = {}
_jobs_lock = threading.Lock()
def _store_job(job_id, data):
with _jobs_lock:
_jobs[job_id] = data
def _get_job(job_id):
with _jobs_lock:
return _jobs.get(job_id)
def _cleanup_old_jobs(max_age_seconds=3600):
now = time.time()
with _jobs_lock:
to_remove = [
jid for jid, data in _jobs.items()
if now - data.get("created", 0) > max_age_seconds
]
for jid in to_remove:
cleanup_progress_file(jid)
del _jobs[jid]
# Helper functions
def resolve_channel_key_form(channel_key_value):
try:
result = resolve_channel_key(channel_key_value)
if result is None:
return "auto"
elif result == "":
return "none"
else:
return result
except (ValueError, FileNotFoundError):
return "auto"
def generate_thumbnail(image_data, size=THUMBNAIL_SIZE):
try:
with Image.open(io.BytesIO(image_data)) as img:
if img.mode in ("RGBA", "LA", "P"):
background = Image.new("RGB", img.size, (255, 255, 255))
if img.mode == "P":
img = img.convert("RGBA")
background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
img = background
elif img.mode == "L":
img = img.convert("RGB")
elif img.mode != "RGB":
img = img.convert("RGB")
img.thumbnail(size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=THUMBNAIL_QUALITY, optimize=True)
return buffer.getvalue()
except Exception:
return None
def cleanup_temp_files():
temp_storage.cleanup_expired(TEMP_FILE_EXPIRY)
def allowed_image(filename):
if not filename or "." not in filename:
return False
return filename.rsplit(".", 1)[1].lower() in {"png", "jpg", "jpeg", "bmp", "gif"}
def allowed_audio(filename):
if not filename or "." not in filename:
return False
return filename.rsplit(".", 1)[1].lower() in {"wav", "flac", "mp3", "ogg", "aac", "m4a", "aiff", "aif"}
def format_size(size_bytes):
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.1f} MB"
# ── Auth routes (setup, login, logout, account) ────────────────
from auth import (
create_admin_user,
verify_user_password,
login_user as auth_login_user,
logout_user as auth_logout_user,
is_authenticated as auth_is_authenticated,
user_exists as auth_user_exists,
get_current_user,
get_recovery_key_hash,
has_recovery_key,
set_recovery_key_hash,
verify_and_reset_admin_password,
change_password,
get_all_users,
create_user,
delete_user,
get_user_by_id,
reset_user_password,
generate_temp_password,
can_create_user,
get_non_admin_count,
get_user_channel_keys,
save_channel_key,
delete_channel_key,
can_save_channel_key,
update_channel_key_name,
update_channel_key_last_used,
get_channel_key_by_id,
clear_recovery_key,
MAX_USERS,
MAX_CHANNEL_KEYS,
)
@app.route("/login", methods=["GET", "POST"])
def login():
if not app.config.get("AUTH_ENABLED", True):
return redirect(url_for("index"))
if not auth_user_exists():
return redirect(url_for("setup"))
if auth_is_authenticated():
return redirect(url_for("index"))
if request.method == "POST":
username = request.form.get("username", "")
password = request.form.get("password", "")
user = verify_user_password(username, password)
if user:
auth_login_user(user)
session.permanent = True
flash("Login successful", "success")
return redirect(url_for("index"))
else:
flash("Invalid username or password", "error")
return render_template("login.html")
@app.route("/logout")
def logout():
auth_logout_user()
flash("Logged out successfully", "success")
return redirect(url_for("index"))
@app.route("/setup", methods=["GET", "POST"])
def setup():
if not app.config.get("AUTH_ENABLED", True):
return redirect(url_for("index"))
if auth_user_exists():
return redirect(url_for("login"))
if request.method == "POST":
username = request.form.get("username", "admin")
password = request.form.get("password", "")
password_confirm = request.form.get("password_confirm", "")
if password != password_confirm:
flash("Passwords do not match", "error")
else:
success, message = create_admin_user(username, password)
if success:
user = verify_user_password(username, password)
if user:
auth_login_user(user)
session.permanent = True
flash("Setup complete!", "success")
return redirect(url_for("index"))
else:
flash(message, "error")
return render_template("setup.html")
@app.route("/recover", methods=["GET", "POST"])
def recover():
if not get_recovery_key_hash():
flash("No recovery key configured", "error")
return redirect(url_for("login"))
if request.method == "POST":
recovery_key = request.form.get("recovery_key", "").strip()
new_password = request.form.get("new_password", "")
new_password_confirm = request.form.get("new_password_confirm", "")
if not recovery_key:
flash("Please enter your recovery key", "error")
elif new_password != new_password_confirm:
flash("Passwords do not match", "error")
elif len(new_password) < 8:
flash("Password must be at least 8 characters", "error")
else:
success, message = verify_and_reset_admin_password(recovery_key, new_password)
if success:
flash("Password reset. Please login.", "success")
return redirect(url_for("login"))
else:
flash(message, "error")
return render_template("recover.html")
@app.route("/account", methods=["GET", "POST"])
@login_required
def account():
current_user = get_current_user()
if request.method == "POST":
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm = request.form.get("confirm_password", "")
if new_password != confirm:
flash("Passwords do not match", "error")
else:
success, message = change_password(current_user.id, current_password, new_password)
flash(message, "success" if success else "error")
saved_keys = get_user_channel_keys(current_user.id) if current_user else []
return render_template(
"account.html",
current_user=current_user,
saved_channel_keys=saved_keys,
max_channel_keys=MAX_CHANNEL_KEYS,
can_save_keys=can_save_channel_key(current_user.id) if current_user else False,
has_recovery=has_recovery_key(),
)
# ── Admin routes ─────────────────────────────────────────────
@app.route("/admin/users")
@admin_required
def admin_users():
users = get_all_users()
return render_template(
"admin/users.html",
users=users,
max_users=MAX_USERS,
can_create=can_create_user(),
)
@app.route("/admin/users/new", methods=["GET", "POST"])
@admin_required
def admin_new_user():
if request.method == "POST":
username = request.form.get("username", "")
temp_password = generate_temp_password()
success, message = create_user(username, temp_password)
if success:
flash(f"User '{username}' created with temporary password: {temp_password}", "success")
else:
flash(message, "error")
return redirect(url_for("admin_users"))
return render_template("admin/user_new.html")
@app.route("/admin/users/<int:user_id>/delete", methods=["POST"])
@admin_required
def admin_delete_user(user_id):
success, message = delete_user(user_id)
flash(message, "success" if success else "error")
return redirect(url_for("admin_users"))
@app.route("/admin/users/<int:user_id>/reset", methods=["POST"])
@admin_required
def admin_reset_password(user_id):
temp_password = generate_temp_password()
success, message = reset_user_password(user_id, temp_password)
if success:
user = get_user_by_id(user_id)
flash(f"Password for '{user.username}' reset to: {temp_password}", "success")
else:
flash(message, "error")
return redirect(url_for("admin_users"))
# ── Generate routes ───────────────────────────────────────────
@app.route("/generate", methods=["GET", "POST"])
@login_required
def generate():
if request.method == "POST":
words_per_passphrase = int(
request.form.get("words_per_passphrase", DEFAULT_PASSPHRASE_WORDS)
)
use_pin = request.form.get("use_pin") == "on"
use_rsa = request.form.get("use_rsa") == "on"
if not use_pin and not use_rsa:
flash("You must select at least one security factor (PIN or RSA Key)", "error")
return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE)
pin_length = int(request.form.get("pin_length", 6))
rsa_bits = int(request.form.get("rsa_bits", 2048))
words_per_passphrase = max(MIN_PASSPHRASE_WORDS, min(12, words_per_passphrase))
pin_length = max(MIN_PIN_LENGTH, min(MAX_PIN_LENGTH, pin_length))
if rsa_bits not in VALID_RSA_SIZES:
rsa_bits = 2048
try:
creds = generate_credentials(
use_pin=use_pin,
use_rsa=use_rsa,
pin_length=pin_length,
rsa_bits=rsa_bits,
passphrase_words=words_per_passphrase,
)
qr_token = None
qr_needs_compression = False
qr_too_large = False
if creds.rsa_key_pem and _HAS_QRCODE:
if can_fit_in_qr(creds.rsa_key_pem, compress=True):
qr_needs_compression = True
else:
qr_too_large = True
if not qr_too_large:
qr_token = secrets.token_urlsafe(16)
cleanup_temp_files()
temp_storage.save_temp_file(
qr_token,
creds.rsa_key_pem.encode(),
{"filename": "rsa_key.pem", "type": "rsa_key", "compress": qr_needs_compression},
)
return render_template(
"stego/generate.html",
passphrase=creds.passphrase,
pin=creds.pin,
generated=True,
words_per_passphrase=words_per_passphrase,
pin_length=pin_length if use_pin else None,
use_pin=use_pin,
use_rsa=use_rsa,
rsa_bits=rsa_bits,
rsa_key_pem=creds.rsa_key_pem,
passphrase_entropy=creds.passphrase_entropy,
pin_entropy=creds.pin_entropy,
rsa_entropy=creds.rsa_entropy,
total_entropy=creds.total_entropy,
has_qrcode=_HAS_QRCODE,
qr_token=qr_token,
qr_needs_compression=qr_needs_compression,
qr_too_large=qr_too_large,
)
except Exception as e:
flash(f"Error generating credentials: {e}", "error")
return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE)
return render_template("stego/generate.html", generated=False, has_qrcode=_HAS_QRCODE)
# ── Encode (placeholder — full route migration is Phase 1b) ───
@app.route("/encode", methods=["GET", "POST"])
@login_required
def encode():
return render_template("stego/encode.html")
@app.route("/decode", methods=["GET", "POST"])
@login_required
def decode():
return render_template("stego/decode.html")
@app.route("/tools")
@login_required
def tools():
return render_template("stego/tools.html")
@app.route("/about")
def about():
return render_template("stego/about.html")
# ── API routes (capacity, channel, download) ──────────────────
@app.route("/api/channel/status")
@login_required
def api_channel_status():
result = subprocess_stego.get_channel_status(reveal=False)
if result.success:
return jsonify({
"success": True,
"mode": result.mode,
"configured": result.configured,
"fingerprint": result.fingerprint,
"source": result.source,
})
else:
status = get_channel_status()
return jsonify({
"success": True,
"mode": status["mode"],
"configured": status["configured"],
"fingerprint": status.get("fingerprint"),
"source": status.get("source"),
})
@app.route("/api/compare-capacity", methods=["POST"])
@login_required
def api_compare_capacity():
carrier = request.files.get("carrier")
if not carrier:
return jsonify({"error": "No carrier image provided"}), 400
try:
carrier_data = carrier.read()
result = subprocess_stego.compare_modes(carrier_data)
if not result.success:
return jsonify({"error": result.error or "Comparison failed"}), 500
return jsonify({
"success": True,
"width": result.width,
"height": result.height,
"lsb": {
"capacity_bytes": result.lsb["capacity_bytes"],
"capacity_kb": round(result.lsb["capacity_kb"], 1),
"output": result.lsb.get("output", "PNG"),
},
"dct": {
"capacity_bytes": result.dct["capacity_bytes"],
"capacity_kb": round(result.dct["capacity_kb"], 1),
"output": result.dct.get("output", "JPEG"),
"available": result.dct.get("available", True),
"ratio": round(result.dct.get("ratio_vs_lsb", 0), 1),
},
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/download/<file_id>")
@login_required
def api_download(file_id):
file_info = temp_storage.get_temp_file(file_id)
if not file_info:
return jsonify({"error": "File not found or expired"}), 404
filename = file_info.get("filename", "download")
mime_type = file_info.get("mime_type", "application/octet-stream")
return send_file(
io.BytesIO(file_info["data"]),
mimetype=mime_type,
as_attachment=True,
download_name=filename,
)
@app.route("/api/generate/credentials", methods=["POST"])
@login_required
def api_generate_credentials():
try:
creds = generate_credentials(use_pin=True, use_rsa=False)
return jsonify({
"success": True,
"passphrase": creds.passphrase,
"pin": creds.pin,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
def _load_secret_key(app: Flask) -> None:
"""Load or generate persistent secret key for Flask sessions."""
SECRET_KEY_FILE.parent.mkdir(parents=True, exist_ok=True)
@ -151,21 +754,3 @@ def _load_secret_key(app: Flask) -> None:
SECRET_KEY_FILE.write_bytes(key)
SECRET_KEY_FILE.chmod(0o600)
app.secret_key = key
def _is_authenticated() -> bool:
"""Check if current request has an authenticated session."""
# TODO: Wire up auth.py from stegasoo
return True
def _is_admin() -> bool:
"""Check if current user is an admin."""
# TODO: Wire up auth.py from stegasoo
return True
def _get_username() -> str:
"""Get current user's username."""
# TODO: Wire up auth.py from stegasoo
return "admin"

964
frontends/web/auth.py Normal file
View File

@ -0,0 +1,964 @@
"""
Stegasoo Authentication Module (v4.1.0)
Multi-user authentication with role-based access control.
- Admin user created at first-run setup
- Admin can create up to 16 additional users
- Uses Argon2id password hashing
- Flask sessions for authentication state
- SQLite3 for user storage
"""
import functools
import secrets
import sqlite3
import string
from dataclasses import dataclass
from pathlib import Path
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from flask import current_app, flash, g, redirect, session, url_for
# Argon2 password hasher (lighter than stegasoo's 256MB for faster login)
ph = PasswordHasher(
time_cost=3,
memory_cost=65536, # 64MB
parallelism=4,
hash_len=32,
salt_len=16,
)
# Constants
MAX_USERS = 16 # Plus 1 admin = 17 total
MAX_CHANNEL_KEYS = 10 # Per user
ROLE_ADMIN = "admin"
ROLE_USER = "user"
@dataclass
class User:
"""User data class."""
id: int
username: str
role: str
created_at: str
@property
def is_admin(self) -> bool:
return self.role == ROLE_ADMIN
def get_db_path() -> Path:
"""Get database path — uses soosef auth directory."""
from soosef.paths import AUTH_DB
AUTH_DB.parent.mkdir(parents=True, exist_ok=True)
return AUTH_DB
def get_db() -> sqlite3.Connection:
"""Get database connection, cached on Flask g object."""
if "db" not in g:
g.db = sqlite3.connect(get_db_path())
g.db.row_factory = sqlite3.Row
return g.db
def close_db(e=None):
"""Close database connection at end of request."""
db = g.pop("db", None)
if db is not None:
db.close()
def init_db():
"""Initialize database schema with migration support."""
db = get_db()
# Check if we need to migrate from old single-user schema
cursor = db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='admin_user'")
has_old_table = cursor.fetchone() is not None
cursor = db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
has_new_table = cursor.fetchone() is not None
if has_old_table and not has_new_table:
# Migrate from old schema
_migrate_from_single_user(db)
elif not has_new_table:
# Fresh install - create new schema
_create_schema(db)
else:
# Existing install - check for new tables (migrations)
_ensure_channel_keys_table(db)
_ensure_app_settings_table(db)
def _create_schema(db: sqlite3.Connection):
"""Create the multi-user schema."""
db.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
CREATE INDEX IF NOT EXISTS idx_users_role ON users(role);
CREATE TABLE IF NOT EXISTS user_channel_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT NOT NULL,
channel_key TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
last_used_at TEXT,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
UNIQUE(user_id, channel_key)
);
CREATE INDEX IF NOT EXISTS idx_channel_keys_user ON user_channel_keys(user_id);
-- App-level settings (v4.1.0)
-- Stores recovery key hash and other instance-wide settings
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
db.commit()
def _migrate_from_single_user(db: sqlite3.Connection):
"""Migrate from old single-user admin_user table to multi-user users table."""
# Create new table
_create_schema(db)
# Copy admin user from old table
old_user = db.execute(
"SELECT username, password_hash, created_at FROM admin_user WHERE id = 1"
).fetchone()
if old_user:
db.execute(
"""
INSERT INTO users (username, password_hash, role, created_at)
VALUES (?, ?, 'admin', ?)
""",
(old_user["username"], old_user["password_hash"], old_user["created_at"]),
)
db.commit()
# Drop old table
db.execute("DROP TABLE admin_user")
db.commit()
def _ensure_channel_keys_table(db: sqlite3.Connection):
"""Ensure user_channel_keys table exists (migration for existing installs)."""
cursor = db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='user_channel_keys'"
)
if cursor.fetchone() is None:
db.executescript("""
CREATE TABLE IF NOT EXISTS user_channel_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT NOT NULL,
channel_key TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
last_used_at TEXT,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
UNIQUE(user_id, channel_key)
);
CREATE INDEX IF NOT EXISTS idx_channel_keys_user ON user_channel_keys(user_id);
""")
db.commit()
def _ensure_app_settings_table(db: sqlite3.Connection):
"""Ensure app_settings table exists (v4.1.0 migration)."""
cursor = db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='app_settings'")
if cursor.fetchone() is None:
db.executescript("""
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
db.commit()
# =============================================================================
# App Settings (v4.1.0)
# =============================================================================
def get_app_setting(key: str) -> str | None:
"""Get an app-level setting value."""
db = get_db()
row = db.execute("SELECT value FROM app_settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else None
def set_app_setting(key: str, value: str) -> None:
"""Set an app-level setting value."""
db = get_db()
db.execute(
"""
INSERT INTO app_settings (key, value)
VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value = ?, updated_at = CURRENT_TIMESTAMP
""",
(key, value, value),
)
db.commit()
def delete_app_setting(key: str) -> bool:
"""Delete an app-level setting. Returns True if deleted."""
db = get_db()
cursor = db.execute("DELETE FROM app_settings WHERE key = ?", (key,))
db.commit()
return cursor.rowcount > 0
# =============================================================================
# Recovery Key Management (v4.1.0)
# =============================================================================
# Setting key for recovery hash
RECOVERY_KEY_SETTING = "recovery_key_hash"
def has_recovery_key() -> bool:
"""Check if a recovery key has been configured."""
return get_app_setting(RECOVERY_KEY_SETTING) is not None
def get_recovery_key_hash() -> str | None:
"""Get the stored recovery key hash."""
return get_app_setting(RECOVERY_KEY_SETTING)
def set_recovery_key_hash(key_hash: str) -> None:
"""Store a recovery key hash."""
set_app_setting(RECOVERY_KEY_SETTING, key_hash)
def clear_recovery_key() -> bool:
"""Remove the recovery key. Returns True if removed."""
return delete_app_setting(RECOVERY_KEY_SETTING)
def verify_and_reset_admin_password(recovery_key: str, new_password: str) -> tuple[bool, str]:
"""
Verify recovery key and reset the first admin's password.
Args:
recovery_key: User-provided recovery key
new_password: New password to set
Returns:
(success, message) tuple
"""
from stegasoo.recovery import verify_recovery_key
stored_hash = get_recovery_key_hash()
if not stored_hash:
return False, "No recovery key configured for this instance"
if not verify_recovery_key(recovery_key, stored_hash):
return False, "Invalid recovery key"
# Find first admin user
db = get_db()
admin = db.execute(
"SELECT id, username FROM users WHERE role = 'admin' ORDER BY id LIMIT 1"
).fetchone()
if not admin:
return False, "No admin user found"
# Reset password
new_hash = ph.hash(new_password)
db.execute(
"UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(new_hash, admin["id"]),
)
db.commit()
# Invalidate all sessions for this user
invalidate_user_sessions(admin["id"])
return True, f"Password reset for '{admin['username']}'"
# =============================================================================
# User Queries
# =============================================================================
def any_users_exist() -> bool:
"""Check if any users have been created (for first-run detection)."""
db = get_db()
result = db.execute("SELECT 1 FROM users LIMIT 1").fetchone()
return result is not None
def user_exists() -> bool:
"""Alias for any_users_exist() for backwards compatibility."""
return any_users_exist()
def get_user_count() -> int:
"""Get total number of users."""
db = get_db()
result = db.execute("SELECT COUNT(*) FROM users").fetchone()
return result[0] if result else 0
def get_non_admin_count() -> int:
"""Get number of non-admin users."""
db = get_db()
result = db.execute("SELECT COUNT(*) FROM users WHERE role != 'admin'").fetchone()
return result[0] if result else 0
def can_create_user() -> bool:
"""Check if we can create more users (within limit)."""
return get_non_admin_count() < MAX_USERS
def get_user_by_id(user_id: int) -> User | None:
"""Get user by ID."""
db = get_db()
row = db.execute(
"SELECT id, username, role, created_at FROM users WHERE id = ?", (user_id,)
).fetchone()
if row:
return User(
id=row["id"],
username=row["username"],
role=row["role"],
created_at=row["created_at"],
)
return None
def get_user_by_username(username: str) -> User | None:
"""Get user by username."""
db = get_db()
row = db.execute(
"SELECT id, username, role, created_at FROM users WHERE username = ?",
(username,),
).fetchone()
if row:
return User(
id=row["id"],
username=row["username"],
role=row["role"],
created_at=row["created_at"],
)
return None
def get_all_users() -> list[User]:
"""Get all users, admins first, then by creation date."""
db = get_db()
rows = db.execute("""
SELECT id, username, role, created_at FROM users
ORDER BY role = 'admin' DESC, created_at ASC
""").fetchall()
return [
User(
id=row["id"],
username=row["username"],
role=row["role"],
created_at=row["created_at"],
)
for row in rows
]
def get_current_user() -> User | None:
"""Get the currently logged-in user from session."""
user_id = session.get("user_id")
if user_id:
return get_user_by_id(user_id)
return None
def get_username() -> str:
"""Get current user's username (backwards compatibility)."""
user = get_current_user()
return user.username if user else "unknown"
# =============================================================================
# Authentication
# =============================================================================
def verify_user_password(username: str, password: str) -> User | None:
"""
Verify password for a user.
Returns User if valid, None if invalid.
Also rehashes password if needed.
"""
db = get_db()
row = db.execute(
"SELECT id, username, role, created_at, password_hash FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return None
try:
ph.verify(row["password_hash"], password)
# Rehash if parameters changed
if ph.check_needs_rehash(row["password_hash"]):
new_hash = ph.hash(password)
db.execute(
"UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(new_hash, row["id"]),
)
db.commit()
return User(
id=row["id"],
username=row["username"],
role=row["role"],
created_at=row["created_at"],
)
except VerifyMismatchError:
return None
def verify_password(password: str) -> bool:
"""Verify password for current user (backwards compatibility)."""
user = get_current_user()
if not user:
return False
result = verify_user_password(user.username, password)
return result is not None
def is_authenticated() -> bool:
"""Check if current session is authenticated."""
return session.get("user_id") is not None
def is_admin() -> bool:
"""Check if current user is an admin."""
user = get_current_user()
return user.is_admin if user else False
def login_user(user: User):
"""Set up session for logged-in user."""
session["user_id"] = user.id
session["username"] = user.username
session["role"] = user.role
# Legacy compatibility
session["authenticated"] = True
def logout_user():
"""Clear session for logout."""
session.clear()
# =============================================================================
# User Management
# =============================================================================
def generate_temp_password(length: int = 8) -> str:
"""Generate a random temporary password."""
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
def validate_username(username: str) -> tuple[bool, str]:
"""
Validate username format.
Rules: 3-80 chars, alphanumeric + underscore/hyphen + @/. for email-style
"""
if not username:
return False, "Username is required"
if len(username) < 3:
return False, "Username must be at least 3 characters"
if len(username) > 80:
return False, "Username must be at most 80 characters"
# Allow: alphanumeric, underscore, hyphen, @, . (for email-style)
allowed = set(string.ascii_letters + string.digits + "_-@.")
if not all(c in allowed for c in username):
return False, "Username can only contain letters, numbers, underscore, hyphen, @ and ."
# Must start with letter or number
if username[0] not in string.ascii_letters + string.digits:
return False, "Username must start with a letter or number"
return True, ""
def validate_password(password: str) -> tuple[bool, str]:
"""Validate password requirements."""
if not password:
return False, "Password is required"
if len(password) < 8:
return False, "Password must be at least 8 characters"
return True, ""
def create_user(
username: str, password: str, role: str = ROLE_USER
) -> tuple[bool, str, User | None]:
"""
Create a new user.
Returns (success, message, user).
"""
# Validate username
valid, msg = validate_username(username)
if not valid:
return False, msg, None
# Validate password
valid, msg = validate_password(password)
if not valid:
return False, msg, None
# Check if username already exists
if get_user_by_username(username):
return False, "Username already exists", None
# Check user limit (only for non-admin users)
if role != ROLE_ADMIN and not can_create_user():
return False, f"Maximum of {MAX_USERS} users reached", None
# Create user
password_hash = ph.hash(password)
db = get_db()
try:
cursor = db.execute(
"""
INSERT INTO users (username, password_hash, role)
VALUES (?, ?, ?)
""",
(username, password_hash, role),
)
db.commit()
user = get_user_by_id(cursor.lastrowid)
return True, "User created successfully", user
except sqlite3.IntegrityError:
return False, "Username already exists", None
def create_admin_user(username: str, password: str) -> tuple[bool, str]:
"""Create the initial admin user (first-run setup)."""
if any_users_exist():
return False, "Admin user already exists"
success, msg, _ = create_user(username, password, ROLE_ADMIN)
return success, msg
def change_password(user_id: int, current_password: str, new_password: str) -> tuple[bool, str]:
"""Change a user's password (requires current password)."""
user = get_user_by_id(user_id)
if not user:
return False, "User not found"
# Verify current password
if not verify_user_password(user.username, current_password):
return False, "Current password is incorrect"
# Validate new password
valid, msg = validate_password(new_password)
if not valid:
return False, msg
# Update password
new_hash = ph.hash(new_password)
db = get_db()
db.execute(
"UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(new_hash, user_id),
)
db.commit()
return True, "Password changed successfully"
def reset_user_password(user_id: int, new_password: str) -> tuple[bool, str]:
"""Reset a user's password (admin function, no current password required)."""
user = get_user_by_id(user_id)
if not user:
return False, "User not found"
# Validate new password
valid, msg = validate_password(new_password)
if not valid:
return False, msg
# Update password
new_hash = ph.hash(new_password)
db = get_db()
db.execute(
"UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(new_hash, user_id),
)
db.commit()
# Invalidate user's sessions
invalidate_user_sessions(user_id)
return True, "Password reset successfully"
def delete_user(user_id: int, current_user_id: int) -> tuple[bool, str]:
"""
Delete a user.
Cannot delete yourself or the last admin.
"""
if user_id == current_user_id:
return False, "Cannot delete yourself"
user = get_user_by_id(user_id)
if not user:
return False, "User not found"
# Check if this is the last admin
if user.role == ROLE_ADMIN:
db = get_db()
admin_count = db.execute("SELECT COUNT(*) FROM users WHERE role = 'admin'").fetchone()[0]
if admin_count <= 1:
return False, "Cannot delete the last admin"
# Invalidate user's sessions before deletion
invalidate_user_sessions(user_id)
# Delete user
db = get_db()
db.execute("DELETE FROM users WHERE id = ?", (user_id,))
db.commit()
return True, f"User '{user.username}' deleted"
def invalidate_user_sessions(user_id: int):
"""
Invalidate all sessions for a user.
This is called when a user is deleted or their password is reset.
Since we use server-side sessions, we increment a "session version"
that's checked on each request.
"""
# For Flask's default session (client-side), we can't truly invalidate.
# But we can add a check - store a "valid_from" timestamp in the DB
# and compare against session creation time.
#
# For now, we'll use a simpler approach: store invalidated user IDs
# in app config (memory) which gets checked by login_required.
#
# This works for single-process deployments (like RPi).
# For multi-process, would need Redis or DB-backed sessions.
if "invalidated_users" not in current_app.config:
current_app.config["invalidated_users"] = set()
current_app.config["invalidated_users"].add(user_id)
def is_session_valid() -> bool:
"""Check if current session is still valid (user not deleted/invalidated)."""
user_id = session.get("user_id")
if not user_id:
return False
# Check if user was invalidated
invalidated = current_app.config.get("invalidated_users", set())
if user_id in invalidated:
return False
# Check if user still exists
if not get_user_by_id(user_id):
return False
return True
# =============================================================================
# Channel Keys
# =============================================================================
@dataclass
class ChannelKey:
"""Saved channel key data class."""
id: int
user_id: int
name: str
channel_key: str
created_at: str
last_used_at: str | None
def get_user_channel_keys(user_id: int) -> list[ChannelKey]:
"""Get all saved channel keys for a user, most recently used first."""
db = get_db()
rows = db.execute(
"""
SELECT id, user_id, name, channel_key, created_at, last_used_at
FROM user_channel_keys
WHERE user_id = ?
ORDER BY last_used_at DESC NULLS LAST, created_at DESC
""",
(user_id,),
).fetchall()
return [
ChannelKey(
id=row["id"],
user_id=row["user_id"],
name=row["name"],
channel_key=row["channel_key"],
created_at=row["created_at"],
last_used_at=row["last_used_at"],
)
for row in rows
]
def get_channel_key_by_id(key_id: int, user_id: int) -> ChannelKey | None:
"""Get a specific channel key (ensures user owns it)."""
db = get_db()
row = db.execute(
"""
SELECT id, user_id, name, channel_key, created_at, last_used_at
FROM user_channel_keys
WHERE id = ? AND user_id = ?
""",
(key_id, user_id),
).fetchone()
if row:
return ChannelKey(
id=row["id"],
user_id=row["user_id"],
name=row["name"],
channel_key=row["channel_key"],
created_at=row["created_at"],
last_used_at=row["last_used_at"],
)
return None
def get_channel_key_count(user_id: int) -> int:
"""Get count of saved channel keys for a user."""
db = get_db()
result = db.execute(
"SELECT COUNT(*) FROM user_channel_keys WHERE user_id = ?", (user_id,)
).fetchone()
return result[0] if result else 0
def can_save_channel_key(user_id: int) -> bool:
"""Check if user can save more channel keys (within limit)."""
return get_channel_key_count(user_id) < MAX_CHANNEL_KEYS
def save_channel_key(
user_id: int, name: str, channel_key: str
) -> tuple[bool, str, ChannelKey | None]:
"""
Save a channel key for a user.
Returns (success, message, key).
"""
# Validate name
name = name.strip()
if not name:
return False, "Key name is required", None
if len(name) > 50:
return False, "Key name must be at most 50 characters", None
# Validate channel key format (hex string)
channel_key = channel_key.strip().lower()
if not channel_key:
return False, "Channel key is required", None
if not all(c in "0123456789abcdef" for c in channel_key):
return False, "Invalid channel key format", None
# Check limit
if not can_save_channel_key(user_id):
return False, f"Maximum of {MAX_CHANNEL_KEYS} saved keys reached", None
db = get_db()
try:
cursor = db.execute(
"""
INSERT INTO user_channel_keys (user_id, name, channel_key)
VALUES (?, ?, ?)
""",
(user_id, name, channel_key),
)
db.commit()
key = get_channel_key_by_id(cursor.lastrowid, user_id)
return True, "Channel key saved", key
except sqlite3.IntegrityError:
return False, "This channel key is already saved", None
def update_channel_key_name(key_id: int, user_id: int, new_name: str) -> tuple[bool, str]:
"""Update the name of a saved channel key."""
new_name = new_name.strip()
if not new_name:
return False, "Key name is required"
if len(new_name) > 50:
return False, "Key name must be at most 50 characters"
key = get_channel_key_by_id(key_id, user_id)
if not key:
return False, "Channel key not found"
db = get_db()
db.execute(
"UPDATE user_channel_keys SET name = ? WHERE id = ? AND user_id = ?",
(new_name, key_id, user_id),
)
db.commit()
return True, "Key name updated"
def update_channel_key_last_used(key_id: int, user_id: int):
"""Update the last_used_at timestamp for a channel key."""
db = get_db()
db.execute(
"""
UPDATE user_channel_keys
SET last_used_at = CURRENT_TIMESTAMP
WHERE id = ? AND user_id = ?
""",
(key_id, user_id),
)
db.commit()
def delete_channel_key(key_id: int, user_id: int) -> tuple[bool, str]:
"""Delete a saved channel key."""
key = get_channel_key_by_id(key_id, user_id)
if not key:
return False, "Channel key not found"
db = get_db()
db.execute(
"DELETE FROM user_channel_keys WHERE id = ? AND user_id = ?",
(key_id, user_id),
)
db.commit()
return True, f"Key '{key.name}' deleted"
# =============================================================================
# Decorators
# =============================================================================
def login_required(f):
"""Decorator to require login for a route."""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# Check if auth is enabled
if not current_app.config.get("AUTH_ENABLED", True):
return f(*args, **kwargs)
# Check for first-run setup
if not any_users_exist():
return redirect(url_for("setup"))
# Check authentication
if not is_authenticated():
return redirect(url_for("login"))
# Check if session is still valid (user not deleted)
if not is_session_valid():
logout_user()
flash("Your session has expired. Please log in again.", "warning")
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""Decorator to require admin role for a route."""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# Check if auth is enabled
if not current_app.config.get("AUTH_ENABLED", True):
return f(*args, **kwargs)
# Check for first-run setup
if not any_users_exist():
return redirect(url_for("setup"))
# Check authentication
if not is_authenticated():
return redirect(url_for("login"))
# Check if session is still valid
if not is_session_valid():
logout_user()
flash("Your session has expired. Please log in again.", "warning")
return redirect(url_for("login"))
# Check admin role
if not is_admin():
flash("Admin access required", "error")
return redirect(url_for("index"))
return f(*args, **kwargs)
return decorated_function
# =============================================================================
# App Initialization
# =============================================================================
def init_app(app):
"""Initialize auth module with Flask app."""
app.teardown_appcontext(close_db)
with app.app_context():
init_db()

View File

@ -1,21 +1,4 @@
"""
Admin blueprint user management and system settings.
Will be adapted from stegasoo's admin routes in frontends/web/app.py.
Admin routes are registered directly in app.py via _register_stegasoo_routes()
alongside the auth routes (setup, login, logout, account, admin/users).
"""
from flask import Blueprint, render_template
bp = Blueprint("admin", __name__, url_prefix="/admin")
@bp.route("/users")
def users():
"""User management."""
return render_template("admin/users.html")
@bp.route("/settings")
def settings():
"""System settings."""
return render_template("admin/settings.html")

View File

@ -1,35 +1,8 @@
"""
Steganography blueprint encode, decode, generate, tools.
Steganography routes are registered directly in app.py via _register_stegasoo_routes()
rather than as a blueprint, because the stegasoo route logic (3,600+ lines) uses
module-level state (ThreadPoolExecutor, jobs dict, subprocess_stego instance)
that doesn't translate cleanly to a blueprint.
Routes lifted from stegasoo's frontends/web/app.py. In Phase 1, these
will be fully implemented by migrating the stegasoo route logic here.
For now, they render placeholder templates.
The stego templates are in templates/stego/ and extend the soosef base.html.
"""
from flask import Blueprint, render_template
bp = Blueprint("stego", __name__)
@bp.route("/encode", methods=["GET", "POST"])
def encode():
"""Encode a message into a carrier image."""
return render_template("stego/encode.html")
@bp.route("/decode", methods=["GET", "POST"])
def decode():
"""Decode a message from a stego image."""
return render_template("stego/decode.html")
@bp.route("/generate")
def generate():
"""Generate credentials (passphrase, PIN, RSA keys)."""
return render_template("stego/generate.html")
@bp.route("/tools")
def tools():
"""Image analysis and utility tools."""
return render_template("stego/tools.html")

155
frontends/web/ssl_utils.py Normal file
View File

@ -0,0 +1,155 @@
"""
SSL Certificate Utilities
Auto-generates self-signed certificates for HTTPS.
Uses cryptography library (already a dependency).
"""
import datetime
import ipaddress
import socket
from pathlib import Path
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
def _get_local_ips() -> list[str]:
"""Get local IP addresses for this machine."""
ips = []
try:
# Get hostname and resolve to IP
hostname = socket.gethostname()
for addr_info in socket.getaddrinfo(hostname, None, socket.AF_INET):
ip = addr_info[4][0]
if ip not in ips and not ip.startswith("127."):
ips.append(ip)
except Exception:
pass
# Also try connecting to external to get primary interface IP
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
if ip not in ips:
ips.append(ip)
s.close()
except Exception:
pass
return ips
def get_cert_paths(base_dir: Path) -> tuple[Path, Path]:
"""Get paths for cert and key files."""
cert_dir = base_dir / "certs"
cert_dir.mkdir(parents=True, exist_ok=True)
return cert_dir / "server.crt", cert_dir / "server.key"
def certs_exist(base_dir: Path) -> bool:
"""Check if both cert files exist."""
cert_path, key_path = get_cert_paths(base_dir)
return cert_path.exists() and key_path.exists()
def generate_self_signed_cert(
base_dir: Path,
hostname: str = "localhost",
days_valid: int = 365,
) -> tuple[Path, Path]:
"""
Generate self-signed SSL certificate.
Args:
base_dir: Base directory for certs folder
hostname: Server hostname for certificate
days_valid: Certificate validity in days
Returns:
Tuple of (cert_path, key_path)
"""
cert_path, key_path = get_cert_paths(base_dir)
# Generate RSA key
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Create certificate
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Stegasoo"),
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
]
)
# Subject Alternative Names
san_list = [
x509.DNSName(hostname),
x509.DNSName("localhost"),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]
# Add hostname.local for mDNS access
if not hostname.endswith(".local"):
san_list.append(x509.DNSName(f"{hostname}.local"))
# Add the hostname as IP if it looks like one
try:
san_list.append(x509.IPAddress(ipaddress.IPv4Address(hostname)))
except ipaddress.AddressValueError:
pass
# Add local network IPs
for local_ip in _get_local_ips():
try:
ip_addr = ipaddress.IPv4Address(local_ip)
if x509.IPAddress(ip_addr) not in san_list:
san_list.append(x509.IPAddress(ip_addr))
except (ipaddress.AddressValueError, ValueError):
pass
now = datetime.datetime.now(datetime.UTC)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=days_valid))
.add_extension(
x509.SubjectAlternativeName(san_list),
critical=False,
)
.sign(key, hashes.SHA256())
)
# Write key file (chmod 600)
key_path.write_bytes(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
key_path.chmod(0o600)
# Write cert file
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
return cert_path, key_path
def ensure_certs(base_dir: Path, hostname: str = "localhost") -> tuple[Path, Path]:
"""Ensure certificates exist, generating if needed."""
if certs_exist(base_dir):
return get_cert_paths(base_dir)
print(f"Generating self-signed SSL certificate for {hostname}...")
return generate_self_signed_cert(base_dir, hostname)

View File

@ -0,0 +1,142 @@
/**
* Stegasoo Authentication Pages JavaScript
* Handles login, setup, account, and admin user management pages
*/
const StegasooAuth = {
// ========================================================================
// PASSWORD VISIBILITY TOGGLE
// ========================================================================
/**
* Toggle password field visibility
* @param {string} inputId - ID of the password input
* @param {HTMLElement} btn - The toggle button element
*/
togglePassword(inputId, btn) {
const input = document.getElementById(inputId);
const icon = btn.querySelector('i');
if (!input) return;
if (input.type === 'password') {
input.type = 'text';
icon?.classList.replace('bi-eye', 'bi-eye-slash');
} else {
input.type = 'password';
icon?.classList.replace('bi-eye-slash', 'bi-eye');
}
},
// ========================================================================
// PASSWORD CONFIRMATION VALIDATION
// ========================================================================
/**
* Initialize password confirmation validation on a form
* @param {string} formId - ID of the form
* @param {string} passwordId - ID of the password field
* @param {string} confirmId - ID of the confirmation field
*/
initPasswordConfirmation(formId, passwordId, confirmId) {
const form = document.getElementById(formId);
if (!form) return;
form.addEventListener('submit', function(e) {
const password = document.getElementById(passwordId)?.value;
const confirm = document.getElementById(confirmId)?.value;
if (password !== confirm) {
e.preventDefault();
alert('Passwords do not match');
}
});
},
// ========================================================================
// COPY TO CLIPBOARD
// ========================================================================
/**
* Copy field value to clipboard with visual feedback
* @param {string} fieldId - ID of the input field to copy
*/
copyField(fieldId) {
const field = document.getElementById(fieldId);
if (!field) return;
field.select();
navigator.clipboard.writeText(field.value).then(() => {
const btn = field.nextElementSibling;
if (!btn) return;
const originalHTML = btn.innerHTML;
btn.innerHTML = '<i class="bi bi-check"></i>';
setTimeout(() => btn.innerHTML = originalHTML, 1000);
});
},
// ========================================================================
// PASSWORD GENERATION
// ========================================================================
/**
* Generate a random password
* @param {number} length - Password length (default 8)
* @returns {string} Generated password
*/
generatePassword(length = 8) {
const chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
let password = '';
for (let i = 0; i < length; i++) {
password += chars.charAt(Math.floor(Math.random() * chars.length));
}
return password;
},
/**
* Regenerate password and update input field
* @param {string} inputId - ID of the password input
* @param {number} length - Password length
*/
regeneratePassword(inputId = 'passwordInput', length = 8) {
const input = document.getElementById(inputId);
if (input) {
input.value = this.generatePassword(length);
}
},
// ========================================================================
// DELETE CONFIRMATION
// ========================================================================
/**
* Confirm deletion with a prompt
* @param {string} itemName - Name of item being deleted
* @param {string} formId - ID of the form to submit if confirmed
* @returns {boolean} True if confirmed
*/
confirmDelete(itemName, formId = null) {
const confirmed = confirm(`Are you sure you want to delete "${itemName}"? This cannot be undone.`);
if (confirmed && formId) {
const form = document.getElementById(formId);
form?.submit();
}
return confirmed;
}
};
// Make togglePassword available globally for onclick handlers
function togglePassword(inputId, btn) {
StegasooAuth.togglePassword(inputId, btn);
}
// Make copyField available globally for onclick handlers
function copyField(fieldId) {
StegasooAuth.copyField(fieldId);
}
// Make regeneratePassword available globally for onclick handlers
function regeneratePassword() {
StegasooAuth.regeneratePassword();
}

View File

@ -0,0 +1,279 @@
/**
* Stegasoo Generate Page JavaScript
* Handles credential generation form and display
*/
const StegasooGenerate = {
// ========================================================================
// FORM CONTROLS
// ========================================================================
/**
* Initialize the words range slider
*/
initWordsSlider() {
const wordsRange = document.getElementById('wordsRange');
const wordsValue = document.getElementById('wordsValue');
wordsRange?.addEventListener('input', function() {
const bits = this.value * 11;
wordsValue.textContent = `${this.value} words (~${bits} bits)`;
});
},
/**
* Initialize PIN/RSA option toggles
*/
initOptionToggles() {
const usePinCheck = document.getElementById('usePinCheck');
const useRsaCheck = document.getElementById('useRsaCheck');
const pinOptions = document.getElementById('pinOptions');
const rsaOptions = document.getElementById('rsaOptions');
const rsaQrWarning = document.getElementById('rsaQrWarning');
const rsaBitsSelect = document.getElementById('rsaBitsSelect');
usePinCheck?.addEventListener('change', function() {
pinOptions?.classList.toggle('d-none', !this.checked);
});
useRsaCheck?.addEventListener('change', function() {
rsaOptions?.classList.toggle('d-none', !this.checked);
});
// RSA key size QR warning (>3072 bits)
rsaBitsSelect?.addEventListener('change', function() {
rsaQrWarning?.classList.toggle('d-none', parseInt(this.value) <= 3072);
});
},
// ========================================================================
// CREDENTIAL VISIBILITY
// ========================================================================
pinHidden: false,
passphraseHidden: false,
/**
* Toggle PIN visibility
*/
togglePinVisibility() {
const pinDigits = document.getElementById('pinDigits');
const icon = document.getElementById('pinToggleIcon');
const text = document.getElementById('pinToggleText');
this.pinHidden = !this.pinHidden;
pinDigits?.classList.toggle('blurred', this.pinHidden);
if (icon) icon.className = this.pinHidden ? 'bi bi-eye' : 'bi bi-eye-slash';
if (text) text.textContent = this.pinHidden ? 'Show' : 'Hide';
},
/**
* Toggle passphrase visibility
*/
togglePassphraseVisibility() {
const display = document.getElementById('passphraseDisplay');
const icon = document.getElementById('passphraseToggleIcon');
const text = document.getElementById('passphraseToggleText');
this.passphraseHidden = !this.passphraseHidden;
display?.classList.toggle('blurred', this.passphraseHidden);
if (icon) icon.className = this.passphraseHidden ? 'bi bi-eye' : 'bi bi-eye-slash';
if (text) text.textContent = this.passphraseHidden ? 'Show' : 'Hide';
},
// ========================================================================
// MEMORY AID STORY GENERATION
// ========================================================================
currentStoryTemplate: 0,
/**
* Story templates organized by word count (3-12 words supported)
*/
storyTemplates: {
3: [
w => `The ${w[0]} ${w[1]} ${w[2]}.`,
w => `${w[0]} loves ${w[1]} and ${w[2]}.`,
w => `A ${w[0]} found a ${w[1]} near the ${w[2]}.`,
w => `${w[0]}, ${w[1]}, ${w[2]} — never forget.`,
w => `The ${w[0]} hid the ${w[1]} under the ${w[2]}.`,
],
4: [
w => `${w[0]} and ${w[1]} discovered a ${w[2]} made of ${w[3]}.`,
w => `The ${w[0]} ${w[1]} ate ${w[2]} for ${w[3]}.`,
w => `In the ${w[0]}, a ${w[1]} met a ${w[2]} carrying ${w[3]}.`,
w => `${w[0]} said "${w[1]}" while holding a ${w[2]} ${w[3]}.`,
w => `The secret: ${w[0]}, ${w[1]}, ${w[2]}, ${w[3]}.`,
],
5: [
w => `${w[0]} traveled to ${w[1]} seeking the ${w[2]} of ${w[3]} and ${w[4]}.`,
w => `The ${w[0]} ${w[1]} lived in a ${w[2]} house with ${w[3]} ${w[4]}.`,
w => `"${w[0]}!" shouted ${w[1]} as the ${w[2]} ${w[3]} flew toward ${w[4]}.`,
w => `Captain ${w[0]} sailed the ${w[1]} ${w[2]} searching for ${w[3]} ${w[4]}.`,
w => `In ${w[0]} kingdom, ${w[1]} guards protected the ${w[2]} ${w[3]} ${w[4]}.`,
],
6: [
w => `${w[0]} met ${w[1]} at the ${w[2]}. Together they found ${w[3]}, ${w[4]}, and ${w[5]}.`,
w => `The ${w[0]} ${w[1]} wore a ${w[2]} hat while eating ${w[3]} ${w[4]} ${w[5]}.`,
w => `Detective ${w[0]} found ${w[1]} ${w[2]} near the ${w[3]} ${w[4]} ${w[5]}.`,
w => `In the ${w[0]} ${w[1]}, a ${w[2]} ${w[3]} sang about ${w[4]} ${w[5]}.`,
w => `Chef ${w[0]} combined ${w[1]}, ${w[2]}, ${w[3]}, ${w[4]}, and ${w[5]}.`,
],
7: [
w => `${w[0]} and ${w[1]} walked through the ${w[2]} ${w[3]} to find the ${w[4]} ${w[5]} ${w[6]}.`,
w => `The ${w[0]} professor studied ${w[1]} ${w[2]} while drinking ${w[3]} ${w[4]} with ${w[5]} ${w[6]}.`,
w => `"${w[0]} ${w[1]}!" yelled ${w[2]} as ${w[3]} ${w[4]} attacked the ${w[5]} ${w[6]}.`,
w => `In ${w[0]}, King ${w[1]} decreed that ${w[2]} ${w[3]} must honor ${w[4]} ${w[5]} ${w[6]}.`,
],
8: [
w => `${w[0]} ${w[1]} and ${w[2]} ${w[3]} met at the ${w[4]} ${w[5]} to discuss ${w[6]} ${w[7]}.`,
w => `The ${w[0]} ${w[1]} ${w[2]} traveled from ${w[3]} to ${w[4]} carrying ${w[5]} ${w[6]} ${w[7]}.`,
w => `${w[0]} discovered that ${w[1]} ${w[2]} plus ${w[3]} ${w[4]} equals ${w[5]} ${w[6]} ${w[7]}.`,
],
9: [
w => `${w[0]} ${w[1]} ${w[2]} watched as ${w[3]} ${w[4]} ${w[5]} danced with ${w[6]} ${w[7]} ${w[8]}.`,
w => `In the ${w[0]} ${w[1]} ${w[2]}, three friends — ${w[3]}, ${w[4]}, ${w[5]} — found ${w[6]} ${w[7]} ${w[8]}.`,
w => `The recipe: ${w[0]}, ${w[1]}, ${w[2]}, ${w[3]}, ${w[4]}, ${w[5]}, ${w[6]}, ${w[7]}, ${w[8]}.`,
],
10: [
w => `${w[0]} ${w[1]} told ${w[2]} ${w[3]} about the ${w[4]} ${w[5]} ${w[6]} hidden in ${w[7]} ${w[8]} ${w[9]}.`,
w => `The ${w[0]} ${w[1]} ${w[2]} ${w[3]} ${w[4]} lived beside ${w[5]} ${w[6]} ${w[7]} ${w[8]} ${w[9]}.`,
],
11: [
w => `${w[0]} ${w[1]} ${w[2]} and ${w[3]} ${w[4]} ${w[5]} discovered ${w[6]} ${w[7]} ${w[8]} ${w[9]} ${w[10]}.`,
w => `In ${w[0]} ${w[1]}, the ${w[2]} ${w[3]} ${w[4]} sang of ${w[5]} ${w[6]} ${w[7]} ${w[8]} ${w[9]} ${w[10]}.`,
],
12: [
w => `${w[0]} ${w[1]} ${w[2]} met ${w[3]} ${w[4]} ${w[5]} at the ${w[6]} ${w[7]} ${w[8]} ${w[9]} ${w[10]} ${w[11]}.`,
w => `The twelve treasures: ${w[0]}, ${w[1]}, ${w[2]}, ${w[3]}, ${w[4]}, ${w[5]}, ${w[6]}, ${w[7]}, ${w[8]}, ${w[9]}, ${w[10]}, ${w[11]}.`,
],
},
/**
* Wrap word in highlight span
*/
hl(word) {
return `<span class="passphrase-word">${word}</span>`;
},
/**
* Generate a memory story for given words
* @param {string[]} words - Array of passphrase words
* @param {number|null} idx - Template index (null for current)
* @returns {string} HTML story
*/
generateStory(words, idx = null) {
const count = words.length;
if (count === 0) return '';
// Clamp to supported range (3-12)
const templateKey = Math.max(3, Math.min(12, count));
const templates = this.storyTemplates[templateKey];
if (!templates || templates.length === 0) {
// Fallback: just list the words
return words.map(w => this.hl(w)).join(' &mdash; ');
}
const templateIdx = (idx ?? this.currentStoryTemplate) % templates.length;
// Apply highlighting to words
const highlighted = words.map(w => this.hl(w));
return templates[templateIdx](highlighted);
},
/**
* Toggle memory aid visibility
* @param {string[]} words - Passphrase words array
*/
toggleMemoryAid(words) {
const container = document.getElementById('memoryAidContainer');
const icon = document.getElementById('memoryAidIcon');
const text = document.getElementById('memoryAidText');
const isHidden = container?.classList.contains('d-none');
container?.classList.toggle('d-none', !isHidden);
if (icon) icon.className = isHidden ? 'bi bi-lightbulb-fill' : 'bi bi-lightbulb';
if (text) text.textContent = isHidden ? 'Hide Aid' : 'Memory Aid';
if (isHidden) {
document.getElementById('memoryStory').innerHTML = this.generateStory(words);
}
},
/**
* Regenerate story with next template
* @param {string[]} words - Passphrase words array
*/
regenerateStory(words) {
const count = words.length;
const templateKey = Math.max(3, Math.min(12, count));
const templates = this.storyTemplates[templateKey] || [];
this.currentStoryTemplate = (this.currentStoryTemplate + 1) % Math.max(1, templates.length);
document.getElementById('memoryStory').innerHTML = this.generateStory(words, this.currentStoryTemplate);
},
// ========================================================================
// QR CODE PRINTING
// ========================================================================
/**
* Print QR code in new window
*/
printQrCode() {
const qrImg = document.getElementById('qrCodeImage');
if (!qrImg) return;
const printWindow = window.open('', '_blank');
printWindow.document.write(`<!DOCTYPE html>
<html>
<head>
<title>QR Code</title>
<style>
body { display: flex; flex-direction: column; align-items: center; justify-content: center; min-height: 100vh; margin: 0; }
img { max-width: 400px; }
</style>
</head>
<body>
<img src="${qrImg.src}" alt="QR Code">
<script>window.onload = function() { window.print(); }<\/script>
</body>
</html>`);
printWindow.document.close();
},
// ========================================================================
// INITIALIZATION
// ========================================================================
/**
* Initialize generate form page
*/
initForm() {
this.initWordsSlider();
this.initOptionToggles();
}
};
// Global function wrappers for onclick handlers
function togglePinVisibility() {
StegasooGenerate.togglePinVisibility();
}
function togglePassphraseVisibility() {
StegasooGenerate.togglePassphraseVisibility();
}
function printQrCode() {
StegasooGenerate.printQrCode();
}
// Auto-init form controls
document.addEventListener('DOMContentLoaded', () => {
if (document.querySelector('[data-page="generate"]')) {
StegasooGenerate.initForm();
}
});

6
frontends/web/static/js/qrcode.min.js vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,465 @@
#!/usr/bin/env python3
"""
Stegasoo Subprocess Worker (v4.0.0)
This script runs in a subprocess and handles encode/decode operations.
If it crashes due to jpeglib/scipy issues, the parent Flask process survives.
CHANGES in v4.0.0:
- Added channel_key support for encode/decode operations
- New channel_status operation
Communication is via JSON over stdin/stdout:
- Input: JSON object with operation parameters
- Output: JSON object with results or error
Usage:
echo '{"operation": "encode", ...}' | python stego_worker.py
"""
import base64
import json
import logging
import os
import sys
import traceback
from pathlib import Path
# Ensure stegasoo is importable
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
sys.path.insert(0, str(Path(__file__).parent))
# Configure logging for worker subprocess
_log_level = os.environ.get("STEGASOO_LOG_LEVEL", "").strip().upper()
if _log_level and hasattr(logging, _log_level):
logging.basicConfig(
level=getattr(logging, _log_level),
format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
elif os.environ.get("STEGASOO_DEBUG", "").strip() in ("1", "true", "yes"):
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
logger = logging.getLogger("stegasoo.worker")
def _resolve_channel_key(channel_key_param):
"""
Resolve channel_key parameter to value for stegasoo.
Args:
channel_key_param: 'auto', 'none', explicit key, or None
Returns:
None (auto), "" (public), or explicit key string
"""
if channel_key_param is None or channel_key_param == "auto":
return None # Auto mode - use server config
elif channel_key_param == "none":
return "" # Public mode
else:
return channel_key_param # Explicit key
def _get_channel_info(resolved_key):
"""
Get channel mode and fingerprint for response.
Returns:
(mode, fingerprint) tuple
"""
from stegasoo import get_channel_status, has_channel_key
if resolved_key == "":
return "public", None
if resolved_key is not None:
# Explicit key
fingerprint = f"{resolved_key[:4]}-••••-••••-••••-••••-••••-••••-{resolved_key[-4:]}"
return "private", fingerprint
# Auto mode - check server config
if has_channel_key():
status = get_channel_status()
return "private", status.get("fingerprint")
return "public", None
def encode_operation(params: dict) -> dict:
"""Handle encode operation."""
logger.debug("encode_operation: mode=%s", params.get("embed_mode", "lsb"))
from stegasoo import FilePayload, encode
# Decode base64 inputs
carrier_data = base64.b64decode(params["carrier_b64"])
reference_data = base64.b64decode(params["reference_b64"])
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Determine payload type
if params.get("file_b64"):
file_data = base64.b64decode(params["file_b64"])
payload = FilePayload(
data=file_data,
filename=params.get("file_name", "file"),
mime_type=params.get("file_mime", "application/octet-stream"),
)
else:
payload = params.get("message", "")
# Resolve channel key (v4.0.0)
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Call encode with correct parameter names
result = encode(
message=payload,
reference_photo=reference_data,
carrier_image=carrier_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "lsb"),
dct_output_format=params.get("dct_output_format", "png"),
dct_color_mode=params.get("dct_color_mode", "color"),
channel_key=resolved_channel_key, # v4.0.0
progress_file=params.get("progress_file"), # v4.1.2
)
# Build stats dict if available
stats = None
if hasattr(result, "stats") and result.stats:
stats = {
"pixels_modified": getattr(result.stats, "pixels_modified", 0),
"capacity_used": getattr(result.stats, "capacity_used", 0),
"bytes_embedded": getattr(result.stats, "bytes_embedded", 0),
}
# Get channel info for response (v4.0.0)
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return {
"success": True,
"stego_b64": base64.b64encode(result.stego_image).decode("ascii"),
"filename": getattr(result, "filename", None),
"stats": stats,
"channel_mode": channel_mode,
"channel_fingerprint": channel_fingerprint,
}
def _write_decode_progress(progress_file: str | None, percent: int, phase: str) -> None:
"""Write decode progress to file."""
if not progress_file:
return
try:
import json
with open(progress_file, "w") as f:
json.dump({"percent": percent, "phase": phase}, f)
except Exception:
pass # Best effort
def decode_operation(params: dict) -> dict:
"""Handle decode operation."""
logger.debug("decode_operation: mode=%s", params.get("embed_mode", "auto"))
from stegasoo import decode
progress_file = params.get("progress_file")
# Progress: starting
_write_decode_progress(progress_file, 5, "reading")
# Decode base64 inputs
stego_data = base64.b64decode(params["stego_b64"])
reference_data = base64.b64decode(params["reference_b64"])
_write_decode_progress(progress_file, 15, "reading")
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Resolve channel key (v4.0.0)
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Library handles progress internally via progress_file parameter
# Call decode with correct parameter names
result = decode(
stego_image=stego_data,
reference_photo=reference_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "auto"),
channel_key=resolved_channel_key, # v4.0.0
progress_file=progress_file, # v4.2.0: pass through for real-time progress
)
# Library writes 100% "complete" - no need for worker to write again
if result.is_file:
return {
"success": True,
"is_file": True,
"file_b64": base64.b64encode(result.file_data).decode("ascii"),
"filename": result.filename,
"mime_type": result.mime_type,
}
else:
return {
"success": True,
"is_file": False,
"message": result.message,
}
def compare_operation(params: dict) -> dict:
"""Handle compare_modes operation."""
from stegasoo import compare_modes
carrier_data = base64.b64decode(params["carrier_b64"])
result = compare_modes(carrier_data)
return {
"success": True,
"comparison": result,
}
def capacity_check_operation(params: dict) -> dict:
"""Handle will_fit_by_mode operation."""
from stegasoo import will_fit_by_mode
carrier_data = base64.b64decode(params["carrier_b64"])
result = will_fit_by_mode(
payload=params["payload_size"],
carrier_image=carrier_data,
embed_mode=params.get("embed_mode", "lsb"),
)
return {
"success": True,
"result": result,
}
def encode_audio_operation(params: dict) -> dict:
"""Handle audio encode operation (v4.3.0)."""
logger.debug("encode_audio_operation: mode=%s", params.get("embed_mode", "audio_lsb"))
from stegasoo import FilePayload, encode_audio
carrier_data = base64.b64decode(params["carrier_b64"])
reference_data = base64.b64decode(params["reference_b64"])
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Determine payload type
if params.get("file_b64"):
file_data = base64.b64decode(params["file_b64"])
payload = FilePayload(
data=file_data,
filename=params.get("file_name", "file"),
mime_type=params.get("file_mime", "application/octet-stream"),
)
else:
payload = params.get("message", "")
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Resolve chip_tier from params (None means use default)
chip_tier_val = params.get("chip_tier")
if chip_tier_val is not None:
chip_tier_val = int(chip_tier_val)
stego_audio, stats = encode_audio(
message=payload,
reference_photo=reference_data,
carrier_audio=carrier_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "audio_lsb"),
channel_key=resolved_channel_key,
progress_file=params.get("progress_file"),
chip_tier=chip_tier_val,
)
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return {
"success": True,
"stego_b64": base64.b64encode(stego_audio).decode("ascii"),
"stats": {
"samples_modified": stats.samples_modified,
"total_samples": stats.total_samples,
"capacity_used": stats.capacity_used,
"bytes_embedded": stats.bytes_embedded,
"sample_rate": stats.sample_rate,
"channels": stats.channels,
"duration_seconds": stats.duration_seconds,
"embed_mode": stats.embed_mode,
},
"channel_mode": channel_mode,
"channel_fingerprint": channel_fingerprint,
}
def decode_audio_operation(params: dict) -> dict:
"""Handle audio decode operation (v4.3.0)."""
logger.debug("decode_audio_operation: mode=%s", params.get("embed_mode", "audio_auto"))
from stegasoo import decode_audio
progress_file = params.get("progress_file")
_write_decode_progress(progress_file, 5, "reading")
stego_data = base64.b64decode(params["stego_b64"])
reference_data = base64.b64decode(params["reference_b64"])
_write_decode_progress(progress_file, 15, "reading")
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
result = decode_audio(
stego_audio=stego_data,
reference_photo=reference_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "audio_auto"),
channel_key=resolved_channel_key,
progress_file=progress_file,
)
if result.is_file:
return {
"success": True,
"is_file": True,
"file_b64": base64.b64encode(result.file_data).decode("ascii"),
"filename": result.filename,
"mime_type": result.mime_type,
}
else:
return {
"success": True,
"is_file": False,
"message": result.message,
}
def audio_info_operation(params: dict) -> dict:
"""Handle audio info operation (v4.3.0)."""
from stegasoo import get_audio_info
from stegasoo.audio_steganography import calculate_audio_lsb_capacity
from stegasoo.spread_steganography import calculate_audio_spread_capacity
audio_data = base64.b64decode(params["audio_b64"])
info = get_audio_info(audio_data)
lsb_capacity = calculate_audio_lsb_capacity(audio_data)
spread_capacity = calculate_audio_spread_capacity(audio_data)
return {
"success": True,
"info": {
"sample_rate": info.sample_rate,
"channels": info.channels,
"duration_seconds": round(info.duration_seconds, 2),
"num_samples": info.num_samples,
"format": info.format,
"bit_depth": info.bit_depth,
"capacity_lsb": lsb_capacity,
"capacity_spread": spread_capacity.usable_capacity_bytes,
},
}
def channel_status_operation(params: dict) -> dict:
"""Handle channel status check (v4.0.0)."""
from stegasoo import get_channel_status
status = get_channel_status()
reveal = params.get("reveal", False)
return {
"success": True,
"status": {
"mode": status["mode"],
"configured": status["configured"],
"fingerprint": status.get("fingerprint"),
"source": status.get("source"),
"key": status.get("key") if reveal and status["configured"] else None,
},
}
def main():
"""Main entry point - read JSON from stdin, write JSON to stdout."""
try:
# Read all input
input_text = sys.stdin.read()
if not input_text.strip():
output = {"success": False, "error": "No input provided"}
else:
params = json.loads(input_text)
operation = params.get("operation")
logger.info("Worker handling operation: %s", operation)
if operation == "encode":
output = encode_operation(params)
elif operation == "decode":
output = decode_operation(params)
elif operation == "compare":
output = compare_operation(params)
elif operation == "capacity":
output = capacity_check_operation(params)
elif operation == "channel_status":
output = channel_status_operation(params)
# Audio operations (v4.3.0)
elif operation == "encode_audio":
output = encode_audio_operation(params)
elif operation == "decode_audio":
output = decode_audio_operation(params)
elif operation == "audio_info":
output = audio_info_operation(params)
else:
output = {"success": False, "error": f"Unknown operation: {operation}"}
except json.JSONDecodeError as e:
output = {"success": False, "error": f"Invalid JSON: {e}"}
except Exception as e:
output = {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"traceback": traceback.format_exc(),
}
# Write output as JSON
print(json.dumps(output), flush=True)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,770 @@
"""
Subprocess Steganography Wrapper (v4.0.0)
Runs stegasoo operations in isolated subprocesses to prevent crashes
from taking down the Flask server.
CHANGES in v4.0.0:
- Added channel_key parameter to encode() and decode() methods
- Channel keys enable deployment/group isolation
Usage:
from subprocess_stego import SubprocessStego
stego = SubprocessStego()
# Encode with channel key
result = stego.encode(
carrier_data=carrier_bytes,
reference_data=ref_bytes,
message="secret message",
passphrase="my passphrase",
pin="123456",
embed_mode="dct",
channel_key="auto", # or "none", or explicit key
)
if result.success:
stego_bytes = result.stego_data
extension = result.extension
else:
error_message = result.error
# Decode
result = stego.decode(
stego_data=stego_bytes,
reference_data=ref_bytes,
passphrase="my passphrase",
pin="123456",
channel_key="auto",
)
# Compare modes (capacity)
result = stego.compare_modes(carrier_bytes)
"""
import base64
import json
import subprocess
import sys
import tempfile
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any
# Default timeout for operations (seconds)
DEFAULT_TIMEOUT = 120
# Path to worker script - adjust if needed
WORKER_SCRIPT = Path(__file__).parent / "stego_worker.py"
@dataclass
class EncodeResult:
"""Result from encode operation."""
success: bool
stego_data: bytes | None = None
filename: str | None = None
stats: dict[str, Any] | None = None
# Channel info (v4.0.0)
channel_mode: str | None = None
channel_fingerprint: str | None = None
error: str | None = None
error_type: str | None = None
@dataclass
class DecodeResult:
"""Result from decode operation."""
success: bool
is_file: bool = False
message: str | None = None
file_data: bytes | None = None
filename: str | None = None
mime_type: str | None = None
error: str | None = None
error_type: str | None = None
@dataclass
class CompareResult:
"""Result from compare_modes operation."""
success: bool
width: int = 0
height: int = 0
lsb: dict[str, Any] | None = None
dct: dict[str, Any] | None = None
error: str | None = None
@dataclass
class CapacityResult:
"""Result from capacity check operation."""
success: bool
fits: bool = False
payload_size: int = 0
capacity: int = 0
usage_percent: float = 0.0
headroom: int = 0
mode: str = ""
error: str | None = None
@dataclass
class AudioEncodeResult:
"""Result from audio encode operation (v4.3.0)."""
success: bool
stego_data: bytes | None = None
stats: dict[str, Any] | None = None
channel_mode: str | None = None
channel_fingerprint: str | None = None
error: str | None = None
error_type: str | None = None
@dataclass
class AudioInfoResult:
"""Result from audio info operation (v4.3.0)."""
success: bool
sample_rate: int = 0
channels: int = 0
duration_seconds: float = 0.0
num_samples: int = 0
format: str = ""
bit_depth: int | None = None
capacity_lsb: int = 0
capacity_spread: int = 0
error: str | None = None
@dataclass
class ChannelStatusResult:
"""Result from channel status check (v4.0.0)."""
success: bool
mode: str = "public"
configured: bool = False
fingerprint: str | None = None
source: str | None = None
key: str | None = None
error: str | None = None
class SubprocessStego:
"""
Subprocess-isolated steganography operations.
All operations run in a separate Python process. If jpeglib or scipy
crashes, only the subprocess dies - Flask keeps running.
"""
def __init__(
self,
worker_path: Path | None = None,
python_executable: str | None = None,
timeout: int = DEFAULT_TIMEOUT,
):
"""
Initialize subprocess wrapper.
Args:
worker_path: Path to stego_worker.py (default: same directory)
python_executable: Python interpreter to use (default: same as current)
timeout: Default timeout in seconds
"""
self.worker_path = worker_path or WORKER_SCRIPT
self.python = python_executable or sys.executable
self.timeout = timeout
if not self.worker_path.exists():
raise FileNotFoundError(f"Worker script not found: {self.worker_path}")
def _run_worker(self, params: dict[str, Any], timeout: int | None = None) -> dict[str, Any]:
"""
Run the worker subprocess with given parameters.
Args:
params: Dictionary of parameters (will be JSON-encoded)
timeout: Operation timeout in seconds
Returns:
Dictionary with results from worker
"""
timeout = timeout or self.timeout
input_json = json.dumps(params)
try:
result = subprocess.run(
[self.python, str(self.worker_path)],
input=input_json,
capture_output=True,
text=True,
timeout=timeout,
cwd=str(self.worker_path.parent),
)
if result.returncode != 0:
# Worker crashed
return {
"success": False,
"error": f"Worker crashed (exit code {result.returncode})",
"stderr": result.stderr,
}
if not result.stdout.strip():
return {
"success": False,
"error": "Worker returned empty output",
"stderr": result.stderr,
}
return json.loads(result.stdout)
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Operation timed out after {timeout} seconds",
"error_type": "TimeoutError",
}
except json.JSONDecodeError as e:
return {
"success": False,
"error": f"Invalid JSON from worker: {e}",
"raw_output": result.stdout if "result" in dir() else None,
}
except Exception as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
}
def encode(
self,
carrier_data: bytes,
reference_data: bytes,
message: str | None = None,
file_data: bytes | None = None,
file_name: str | None = None,
file_mime: str | None = None,
passphrase: str = "",
pin: str | None = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "lsb",
dct_output_format: str = "png",
dct_color_mode: str = "color",
# Channel key (v4.0.0)
channel_key: str | None = "auto",
timeout: int | None = None,
# Progress file (v4.1.2)
progress_file: str | None = None,
) -> EncodeResult:
"""
Encode a message or file into an image.
Args:
carrier_data: Carrier image bytes
reference_data: Reference photo bytes
message: Text message to encode (if not file)
file_data: File bytes to encode (if not message)
file_name: Original filename (for file payload)
file_mime: MIME type (for file payload)
passphrase: Encryption passphrase
pin: Optional PIN
rsa_key_data: Optional RSA key PEM bytes
rsa_password: RSA key password if encrypted
embed_mode: 'lsb' or 'dct'
dct_output_format: 'png' or 'jpeg' (for DCT mode)
dct_color_mode: 'grayscale' or 'color' (for DCT mode)
channel_key: 'auto' (server config), 'none' (public), or explicit key (v4.0.0)
timeout: Operation timeout in seconds
Returns:
EncodeResult with stego_data and extension on success
"""
params = {
"operation": "encode",
"carrier_b64": base64.b64encode(carrier_data).decode("ascii"),
"reference_b64": base64.b64encode(reference_data).decode("ascii"),
"message": message,
"passphrase": passphrase,
"pin": pin,
"embed_mode": embed_mode,
"dct_output_format": dct_output_format,
"dct_color_mode": dct_color_mode,
"channel_key": channel_key, # v4.0.0
"progress_file": progress_file, # v4.1.2
}
if file_data:
params["file_b64"] = base64.b64encode(file_data).decode("ascii")
params["file_name"] = file_name
params["file_mime"] = file_mime
if rsa_key_data:
params["rsa_key_b64"] = base64.b64encode(rsa_key_data).decode("ascii")
params["rsa_password"] = rsa_password
result = self._run_worker(params, timeout)
if result.get("success"):
return EncodeResult(
success=True,
stego_data=base64.b64decode(result["stego_b64"]),
filename=result.get("filename"),
stats=result.get("stats"),
channel_mode=result.get("channel_mode"),
channel_fingerprint=result.get("channel_fingerprint"),
)
else:
return EncodeResult(
success=False,
error=result.get("error", "Unknown error"),
error_type=result.get("error_type"),
)
def decode(
self,
stego_data: bytes,
reference_data: bytes,
passphrase: str = "",
pin: str | None = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "auto",
# Channel key (v4.0.0)
channel_key: str | None = "auto",
timeout: int | None = None,
# Progress tracking (v4.1.5)
progress_file: str | None = None,
) -> DecodeResult:
"""
Decode a message or file from a stego image.
Args:
stego_data: Stego image bytes
reference_data: Reference photo bytes
passphrase: Decryption passphrase
pin: Optional PIN
rsa_key_data: Optional RSA key PEM bytes
rsa_password: RSA key password if encrypted
embed_mode: 'auto', 'lsb', or 'dct'
channel_key: 'auto' (server config), 'none' (public), or explicit key (v4.0.0)
timeout: Operation timeout in seconds
progress_file: Path to write progress updates (v4.1.5)
Returns:
DecodeResult with message or file_data on success
"""
params = {
"operation": "decode",
"stego_b64": base64.b64encode(stego_data).decode("ascii"),
"reference_b64": base64.b64encode(reference_data).decode("ascii"),
"passphrase": passphrase,
"pin": pin,
"embed_mode": embed_mode,
"channel_key": channel_key, # v4.0.0
"progress_file": progress_file, # v4.1.5
}
if rsa_key_data:
params["rsa_key_b64"] = base64.b64encode(rsa_key_data).decode("ascii")
params["rsa_password"] = rsa_password
result = self._run_worker(params, timeout)
if result.get("success"):
if result.get("is_file"):
return DecodeResult(
success=True,
is_file=True,
file_data=base64.b64decode(result["file_b64"]),
filename=result.get("filename"),
mime_type=result.get("mime_type"),
)
else:
return DecodeResult(
success=True,
is_file=False,
message=result.get("message"),
)
else:
return DecodeResult(
success=False,
error=result.get("error", "Unknown error"),
error_type=result.get("error_type"),
)
def compare_modes(
self,
carrier_data: bytes,
timeout: int | None = None,
) -> CompareResult:
"""
Compare LSB and DCT capacity for a carrier image.
Args:
carrier_data: Carrier image bytes
timeout: Operation timeout in seconds
Returns:
CompareResult with capacity information
"""
params = {
"operation": "compare",
"carrier_b64": base64.b64encode(carrier_data).decode("ascii"),
}
result = self._run_worker(params, timeout)
if result.get("success"):
comparison = result.get("comparison", {})
return CompareResult(
success=True,
width=comparison.get("width", 0),
height=comparison.get("height", 0),
lsb=comparison.get("lsb"),
dct=comparison.get("dct"),
)
else:
return CompareResult(
success=False,
error=result.get("error", "Unknown error"),
)
def check_capacity(
self,
carrier_data: bytes,
payload_size: int,
embed_mode: str = "lsb",
timeout: int | None = None,
) -> CapacityResult:
"""
Check if a payload will fit in the carrier.
Args:
carrier_data: Carrier image bytes
payload_size: Size of payload in bytes
embed_mode: 'lsb' or 'dct'
timeout: Operation timeout in seconds
Returns:
CapacityResult with fit information
"""
params = {
"operation": "capacity",
"carrier_b64": base64.b64encode(carrier_data).decode("ascii"),
"payload_size": payload_size,
"embed_mode": embed_mode,
}
result = self._run_worker(params, timeout)
if result.get("success"):
r = result.get("result", {})
return CapacityResult(
success=True,
fits=r.get("fits", False),
payload_size=r.get("payload_size", 0),
capacity=r.get("capacity", 0),
usage_percent=r.get("usage_percent", 0.0),
headroom=r.get("headroom", 0),
mode=r.get("mode", embed_mode),
)
else:
return CapacityResult(
success=False,
error=result.get("error", "Unknown error"),
)
# =========================================================================
# Audio Steganography (v4.3.0)
# =========================================================================
def encode_audio(
self,
carrier_data: bytes,
reference_data: bytes,
message: str | None = None,
file_data: bytes | None = None,
file_name: str | None = None,
file_mime: str | None = None,
passphrase: str = "",
pin: str | None = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "audio_lsb",
channel_key: str | None = "auto",
timeout: int | None = None,
progress_file: str | None = None,
chip_tier: int | None = None,
) -> AudioEncodeResult:
"""
Encode a message or file into an audio carrier.
Args:
carrier_data: Carrier audio bytes (WAV, FLAC, MP3, etc.)
reference_data: Reference photo bytes
message: Text message to encode (if not file)
file_data: File bytes to encode (if not message)
file_name: Original filename (for file payload)
file_mime: MIME type (for file payload)
passphrase: Encryption passphrase
pin: Optional PIN
rsa_key_data: Optional RSA key PEM bytes
rsa_password: RSA key password if encrypted
embed_mode: 'audio_lsb' or 'audio_spread'
channel_key: 'auto', 'none', or explicit key
timeout: Operation timeout (default 300s for audio)
progress_file: Path to write progress updates
Returns:
AudioEncodeResult with stego audio data on success
"""
params = {
"operation": "encode_audio",
"carrier_b64": base64.b64encode(carrier_data).decode("ascii"),
"reference_b64": base64.b64encode(reference_data).decode("ascii"),
"message": message,
"passphrase": passphrase,
"pin": pin,
"embed_mode": embed_mode,
"channel_key": channel_key,
"progress_file": progress_file,
"chip_tier": chip_tier,
}
if file_data:
params["file_b64"] = base64.b64encode(file_data).decode("ascii")
params["file_name"] = file_name
params["file_mime"] = file_mime
if rsa_key_data:
params["rsa_key_b64"] = base64.b64encode(rsa_key_data).decode("ascii")
params["rsa_password"] = rsa_password
# Audio operations can be slower (especially spread spectrum)
result = self._run_worker(params, timeout or 300)
if result.get("success"):
return AudioEncodeResult(
success=True,
stego_data=base64.b64decode(result["stego_b64"]),
stats=result.get("stats"),
channel_mode=result.get("channel_mode"),
channel_fingerprint=result.get("channel_fingerprint"),
)
else:
return AudioEncodeResult(
success=False,
error=result.get("error", "Unknown error"),
error_type=result.get("error_type"),
)
def decode_audio(
self,
stego_data: bytes,
reference_data: bytes,
passphrase: str = "",
pin: str | None = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "audio_auto",
channel_key: str | None = "auto",
timeout: int | None = None,
progress_file: str | None = None,
) -> DecodeResult:
"""
Decode a message or file from stego audio.
Args:
stego_data: Stego audio bytes
reference_data: Reference photo bytes
passphrase: Decryption passphrase
pin: Optional PIN
rsa_key_data: Optional RSA key PEM bytes
rsa_password: RSA key password if encrypted
embed_mode: 'audio_auto', 'audio_lsb', or 'audio_spread'
channel_key: 'auto', 'none', or explicit key
timeout: Operation timeout (default 300s for audio)
progress_file: Path to write progress updates
Returns:
DecodeResult with message or file_data on success
"""
params = {
"operation": "decode_audio",
"stego_b64": base64.b64encode(stego_data).decode("ascii"),
"reference_b64": base64.b64encode(reference_data).decode("ascii"),
"passphrase": passphrase,
"pin": pin,
"embed_mode": embed_mode,
"channel_key": channel_key,
"progress_file": progress_file,
}
if rsa_key_data:
params["rsa_key_b64"] = base64.b64encode(rsa_key_data).decode("ascii")
params["rsa_password"] = rsa_password
result = self._run_worker(params, timeout or 300)
if result.get("success"):
if result.get("is_file"):
return DecodeResult(
success=True,
is_file=True,
file_data=base64.b64decode(result["file_b64"]),
filename=result.get("filename"),
mime_type=result.get("mime_type"),
)
else:
return DecodeResult(
success=True,
is_file=False,
message=result.get("message"),
)
else:
return DecodeResult(
success=False,
error=result.get("error", "Unknown error"),
error_type=result.get("error_type"),
)
def audio_info(
self,
audio_data: bytes,
timeout: int | None = None,
) -> AudioInfoResult:
"""
Get audio file information and steganographic capacity.
Args:
audio_data: Audio file bytes
timeout: Operation timeout in seconds
Returns:
AudioInfoResult with metadata and capacity info
"""
params = {
"operation": "audio_info",
"audio_b64": base64.b64encode(audio_data).decode("ascii"),
}
result = self._run_worker(params, timeout)
if result.get("success"):
info = result.get("info", {})
return AudioInfoResult(
success=True,
sample_rate=info.get("sample_rate", 0),
channels=info.get("channels", 0),
duration_seconds=info.get("duration_seconds", 0.0),
num_samples=info.get("num_samples", 0),
format=info.get("format", ""),
bit_depth=info.get("bit_depth"),
capacity_lsb=info.get("capacity_lsb", 0),
capacity_spread=info.get("capacity_spread", 0),
)
else:
return AudioInfoResult(
success=False,
error=result.get("error", "Unknown error"),
)
def get_channel_status(
self,
reveal: bool = False,
timeout: int | None = None,
) -> ChannelStatusResult:
"""
Get current channel key status (v4.0.0).
Args:
reveal: Include full key in response
timeout: Operation timeout in seconds
Returns:
ChannelStatusResult with channel info
"""
params = {
"operation": "channel_status",
"reveal": reveal,
}
result = self._run_worker(params, timeout)
if result.get("success"):
status = result.get("status", {})
return ChannelStatusResult(
success=True,
mode=status.get("mode", "public"),
configured=status.get("configured", False),
fingerprint=status.get("fingerprint"),
source=status.get("source"),
key=status.get("key") if reveal else None,
)
else:
return ChannelStatusResult(
success=False,
error=result.get("error", "Unknown error"),
)
# Convenience function for quick usage
_default_stego: SubprocessStego | None = None
def get_subprocess_stego() -> SubprocessStego:
"""Get or create default SubprocessStego instance."""
global _default_stego
if _default_stego is None:
_default_stego = SubprocessStego()
return _default_stego
# =============================================================================
# Progress File Utilities (v4.1.2)
# =============================================================================
def generate_job_id() -> str:
"""Generate a unique job ID for tracking encode/decode operations."""
return str(uuid.uuid4())[:8]
def get_progress_file_path(job_id: str) -> str:
"""Get the progress file path for a job ID."""
return str(Path(tempfile.gettempdir()) / f"stegasoo_progress_{job_id}.json")
def read_progress(job_id: str) -> dict | None:
"""
Read progress from file for a job ID.
Returns:
Progress dict with current, total, percent, phase, or None if not found
"""
progress_file = get_progress_file_path(job_id)
try:
with open(progress_file) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def cleanup_progress_file(job_id: str) -> None:
"""Remove progress file for a completed job."""
progress_file = get_progress_file_path(job_id)
try:
Path(progress_file).unlink(missing_ok=True)
except Exception:
pass

View File

@ -0,0 +1,212 @@
"""
File-based Temporary Storage
Stores temp files on disk instead of in-memory dict.
This allows multiple Gunicorn workers to share temp files
and survives service restarts within the expiry window.
Files are stored in a temp directory with:
- {file_id}.data - The actual file data
- {file_id}.json - Metadata (filename, timestamp, mime_type, etc.)
IMPORTANT: This module ONLY manages files in the temp_files/ directory.
It does NOT touch instance/ (auth database) or any other directories.
"""
import json
import time
from pathlib import Path
from threading import Lock
# Default temp directory (can be overridden)
DEFAULT_TEMP_DIR = Path(__file__).parent / "temp_files"
# Lock for thread-safe operations
_lock = Lock()
# Module-level temp directory (set on init)
_temp_dir: Path = DEFAULT_TEMP_DIR
def init(temp_dir: Path | str | None = None):
"""Initialize temp storage with optional custom directory."""
global _temp_dir
_temp_dir = Path(temp_dir) if temp_dir else DEFAULT_TEMP_DIR
_temp_dir.mkdir(parents=True, exist_ok=True)
def _data_path(file_id: str) -> Path:
"""Get path for file data."""
return _temp_dir / f"{file_id}.data"
def _meta_path(file_id: str) -> Path:
"""Get path for file metadata."""
return _temp_dir / f"{file_id}.json"
def _thumb_path(thumb_id: str) -> Path:
"""Get path for thumbnail data."""
return _temp_dir / f"{thumb_id}.thumb"
def save_temp_file(file_id: str, data: bytes, metadata: dict) -> None:
"""
Save a temp file with its metadata.
Args:
file_id: Unique identifier for the file
data: File contents as bytes
metadata: Dict with filename, mime_type, timestamp, etc.
"""
init() # Ensure directory exists
with _lock:
# Add timestamp if not present
if "timestamp" not in metadata:
metadata["timestamp"] = time.time()
# Write data file
_data_path(file_id).write_bytes(data)
# Write metadata
_meta_path(file_id).write_text(json.dumps(metadata))
def get_temp_file(file_id: str) -> dict | None:
"""
Get a temp file and its metadata.
Returns:
Dict with 'data' (bytes) and all metadata fields, or None if not found.
"""
init()
data_file = _data_path(file_id)
meta_file = _meta_path(file_id)
if not data_file.exists() or not meta_file.exists():
return None
try:
data = data_file.read_bytes()
metadata = json.loads(meta_file.read_text())
return {"data": data, **metadata}
except (OSError, json.JSONDecodeError):
return None
def has_temp_file(file_id: str) -> bool:
"""Check if a temp file exists."""
init()
return _data_path(file_id).exists() and _meta_path(file_id).exists()
def delete_temp_file(file_id: str) -> None:
"""Delete a temp file and its metadata."""
init()
with _lock:
_data_path(file_id).unlink(missing_ok=True)
_meta_path(file_id).unlink(missing_ok=True)
def save_thumbnail(thumb_id: str, data: bytes) -> None:
"""Save a thumbnail."""
init()
with _lock:
_thumb_path(thumb_id).write_bytes(data)
def get_thumbnail(thumb_id: str) -> bytes | None:
"""Get thumbnail data."""
init()
thumb_file = _thumb_path(thumb_id)
if not thumb_file.exists():
return None
try:
return thumb_file.read_bytes()
except OSError:
return None
def delete_thumbnail(thumb_id: str) -> None:
"""Delete a thumbnail."""
init()
with _lock:
_thumb_path(thumb_id).unlink(missing_ok=True)
def cleanup_expired(max_age_seconds: float) -> int:
"""
Delete expired temp files.
Args:
max_age_seconds: Maximum age in seconds before expiry
Returns:
Number of files deleted
"""
init()
now = time.time()
deleted = 0
with _lock:
# Find all metadata files
for meta_file in _temp_dir.glob("*.json"):
try:
metadata = json.loads(meta_file.read_text())
timestamp = metadata.get("timestamp", 0)
if now - timestamp > max_age_seconds:
file_id = meta_file.stem
_data_path(file_id).unlink(missing_ok=True)
meta_file.unlink(missing_ok=True)
# Also delete thumbnail if exists
_thumb_path(f"{file_id}_thumb").unlink(missing_ok=True)
deleted += 1
except (OSError, json.JSONDecodeError):
# Remove corrupted files
meta_file.unlink(missing_ok=True)
deleted += 1
return deleted
def cleanup_all() -> int:
"""
Delete all temp files. Call on service start/stop.
Returns:
Number of files deleted
"""
init()
deleted = 0
with _lock:
for f in _temp_dir.iterdir():
if f.is_file():
f.unlink(missing_ok=True)
deleted += 1
return deleted
def get_stats() -> dict:
"""Get temp storage statistics."""
init()
files = list(_temp_dir.glob("*.data"))
total_size = sum(f.stat().st_size for f in files if f.exists())
return {
"file_count": len(files),
"total_size_bytes": total_size,
"temp_dir": str(_temp_dir),
}

View File

@ -0,0 +1,440 @@
{% extends "base.html" %}
{% block title %}Account - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card">
<div class="card-header">
<h5 class="mb-0"><i class="bi bi-person-gear me-2"></i>Account Settings</h5>
</div>
<div class="card-body">
<p class="text-muted mb-4">
Logged in as <strong>{{ username }}</strong>
{% if is_admin %}
<span class="badge bg-warning text-dark ms-2">
<i class="bi bi-shield-check me-1"></i>Admin
</span>
{% endif %}
</p>
{% if is_admin %}
<div class="mb-4">
<a href="{{ url_for('admin_users') }}" class="btn btn-outline-primary w-100">
<i class="bi bi-people me-2"></i>Manage Users
</a>
</div>
<!-- Recovery Key Management (Admin only) -->
<div class="card bg-dark mb-4">
<div class="card-body py-3">
<div class="d-flex justify-content-between align-items-center">
<div>
<i class="bi bi-shield-lock me-2"></i>
<strong>Recovery Key</strong>
{% if has_recovery %}
<span class="badge bg-success ms-2">Configured</span>
{% else %}
<span class="badge bg-secondary ms-2">Not Set</span>
{% endif %}
</div>
<div class="btn-group btn-group-sm">
<a href="{{ url_for('regenerate_recovery') }}" class="btn btn-outline-warning"
onclick="return confirm('Generate a new recovery key? This will invalidate any existing key.')">
<i class="bi bi-arrow-repeat me-1"></i>
{{ 'Regenerate' if has_recovery else 'Generate' }}
</a>
{% if has_recovery %}
<form method="POST" action="{{ url_for('disable_recovery') }}" style="display:inline;">
<button type="submit" class="btn btn-outline-danger"
onclick="return confirm('Disable recovery? If you forget your password, you will NOT be able to recover your account.')">
<i class="bi bi-x-lg"></i>
</button>
</form>
{% endif %}
</div>
</div>
<small class="text-muted d-block mt-2">
{% if has_recovery %}
Allows password reset if you're locked out.
{% else %}
No recovery option - most secure, but no password reset possible.
{% endif %}
</small>
</div>
</div>
{% endif %}
<h6 class="text-muted mb-3">Change Password</h6>
<form method="POST" action="{{ url_for('account') }}" id="accountForm">
<div class="mb-3">
<label class="form-label">
<i class="bi bi-key me-1"></i> Current Password
</label>
<div class="input-group">
<input type="password" name="current_password" class="form-control"
id="currentPasswordInput" required>
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('currentPasswordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<div class="mb-3">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> New Password
</label>
<div class="input-group">
<input type="password" name="new_password" class="form-control"
id="newPasswordInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('newPasswordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
<div class="form-text">Minimum 8 characters</div>
</div>
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> Confirm New Password
</label>
<div class="input-group">
<input type="password" name="new_password_confirm" class="form-control"
id="newPasswordConfirmInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('newPasswordConfirmInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<button type="submit" class="btn btn-primary w-100">
<i class="bi bi-check-lg me-2"></i>Update Password
</button>
</form>
</div>
</div>
<!-- Saved Channel Keys Section -->
<div class="card mt-4">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="mb-0"><i class="bi bi-key-fill me-2"></i>Saved Channel Keys</h5>
<span class="badge bg-secondary">{{ channel_keys|length }} / {{ max_channel_keys }}</span>
</div>
<div class="card-body">
{% if channel_keys %}
<div class="list-group list-group-flush mb-3">
{% for key in channel_keys %}
<div class="list-group-item d-flex justify-content-between align-items-center px-0">
<div>
<strong>{{ key.name }}</strong>
<br>
<code class="small text-muted">{{ key.channel_key[:4] }}...{{ key.channel_key[-4:] }}</code>
{% if key.last_used_at %}
<span class="text-muted small ms-2">Last used: {{ key.last_used_at[:10] }}</span>
{% endif %}
</div>
<div class="btn-group btn-group-sm">
{% if is_admin %}
<button type="button" class="btn btn-outline-info"
onclick="showKeyQr('{{ key.channel_key }}', '{{ key.name }}')"
title="Show QR Code">
<i class="bi bi-qr-code"></i>
</button>
{% endif %}
<button type="button" class="btn btn-outline-secondary"
onclick="renameKey({{ key.id }}, '{{ key.name }}')"
title="Rename">
<i class="bi bi-pencil"></i>
</button>
<form method="POST" action="{{ url_for('account_delete_key', key_id=key.id) }}"
style="display:inline;"
onsubmit="return confirm('Delete key &quot;{{ key.name }}&quot;?')">
<button type="submit" class="btn btn-outline-danger" title="Delete">
<i class="bi bi-trash"></i>
</button>
</form>
</div>
</div>
{% endfor %}
</div>
{% else %}
<p class="text-muted mb-3">No saved channel keys. Save keys for quick access on encode/decode pages.</p>
{% endif %}
{% if can_save_key %}
<hr>
<h6 class="text-muted mb-3">Add New Key</h6>
<form method="POST" action="{{ url_for('account_save_key') }}">
<div class="row g-2 mb-2">
<div class="col-5">
<input type="text" name="key_name" class="form-control form-control-sm"
placeholder="Key name" required maxlength="50">
</div>
<div class="col-7">
<div class="input-group input-group-sm">
<input type="text" name="channel_key" id="channelKeyInput"
class="form-control font-monospace"
placeholder="XXXX-XXXX-..." required
pattern="[A-Za-z0-9]{4}(-[A-Za-z0-9]{4}){7}">
<button type="button" class="btn btn-outline-secondary" id="scanChannelKeyBtn"
title="Scan QR code with camera">
<i class="bi bi-camera"></i>
</button>
</div>
</div>
</div>
<button type="submit" class="btn btn-sm btn-outline-primary">
<i class="bi bi-plus-lg me-1"></i>Save Key
</button>
</form>
{% else %}
<div class="alert alert-info mb-0 small">
<i class="bi bi-info-circle me-1"></i>
Maximum of {{ max_channel_keys }} keys reached. Delete a key to add more.
</div>
{% endif %}
</div>
</div>
<!-- Logout -->
<div class="mt-4">
<a href="{{ url_for('logout') }}" class="btn btn-outline-danger w-100">
<i class="bi bi-box-arrow-left me-2"></i>Logout
</a>
</div>
</div>
</div>
<!-- Rename Modal -->
<div class="modal fade" id="renameModal" tabindex="-1">
<div class="modal-dialog modal-sm">
<div class="modal-content">
<form method="POST" id="renameForm">
<div class="modal-header">
<h6 class="modal-title">Rename Key</h6>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<div class="modal-body">
<input type="text" name="new_name" class="form-control" id="renameInput"
required maxlength="50">
</div>
<div class="modal-footer">
<button type="button" class="btn btn-sm btn-secondary" data-bs-dismiss="modal">Cancel</button>
<button type="submit" class="btn btn-sm btn-primary">Rename</button>
</div>
</form>
</div>
</div>
</div>
{% if is_admin %}
<!-- QR Code Modal (Admin only) -->
<div class="modal fade" id="qrModal" tabindex="-1">
<div class="modal-dialog modal-sm">
<div class="modal-content">
<div class="modal-header">
<h6 class="modal-title"><i class="bi bi-qr-code me-2"></i><span id="qrKeyName">Channel Key</span></h6>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<div class="modal-body text-center">
<canvas id="qrCanvas" class="bg-white p-2 rounded"></canvas>
<div class="mt-2">
<code class="small" id="qrKeyDisplay"></code>
</div>
</div>
<div class="modal-footer justify-content-center">
<button type="button" class="btn btn-sm btn-outline-secondary" id="qrDownload">
<i class="bi bi-download me-1"></i>Download
</button>
<button type="button" class="btn btn-sm btn-outline-secondary" id="qrPrint">
<i class="bi bi-printer me-1"></i>Print Sheet
</button>
</div>
</div>
</div>
</div>
{% endif %}
{% endblock %}
{% block scripts %}
<script src="{{ url_for('static', filename='js/auth.js') }}"></script>
<script src="{{ url_for('static', filename='js/soosef.js') }}"></script>
{% if is_admin %}
<script src="{{ url_for('static', filename='js/qrcode.min.js') }}"></script>
{% endif %}
<script>
StegasooAuth.initPasswordConfirmation('accountForm', 'newPasswordInput', 'newPasswordConfirmInput');
// Webcam QR scanning for channel key input (v4.1.5)
document.getElementById('scanChannelKeyBtn')?.addEventListener('click', function() {
Stegasoo.showQrScanner((text) => {
const input = document.getElementById('channelKeyInput');
if (input) {
// Clean and format the key
const clean = text.replace(/[^A-Za-z0-9]/g, '').toUpperCase();
if (clean.length === 32) {
input.value = clean.match(/.{4}/g).join('-');
} else {
input.value = text.toUpperCase();
}
}
}, 'Scan Channel Key');
});
// Format channel key input as user types
document.getElementById('channelKeyInput')?.addEventListener('input', function() {
Stegasoo.formatChannelKeyInput(this);
});
function renameKey(keyId, currentName) {
document.getElementById('renameInput').value = currentName;
document.getElementById('renameForm').action = '/account/keys/' + keyId + '/rename';
new bootstrap.Modal(document.getElementById('renameModal')).show();
}
{% if is_admin %}
function showKeyQr(channelKey, keyName) {
// Format key with dashes if not already
const clean = channelKey.replace(/-/g, '').toUpperCase();
const formatted = clean.match(/.{4}/g)?.join('-') || clean;
// Update modal content
document.getElementById('qrKeyName').textContent = keyName;
document.getElementById('qrKeyDisplay').textContent = formatted;
// Generate QR code using QRious
const canvas = document.getElementById('qrCanvas');
if (typeof QRious !== 'undefined' && canvas) {
try {
new QRious({
element: canvas,
value: formatted,
size: 200,
level: 'M'
});
new bootstrap.Modal(document.getElementById('qrModal')).show();
} catch (error) {
console.error('QR generation error:', error);
}
}
}
// Download QR as PNG
document.getElementById('qrDownload')?.addEventListener('click', function() {
const canvas = document.getElementById('qrCanvas');
const keyName = document.getElementById('qrKeyName').textContent;
if (canvas) {
const link = document.createElement('a');
link.download = 'stegasoo-channel-key-' + keyName.toLowerCase().replace(/\s+/g, '-') + '.png';
link.href = canvas.toDataURL('image/png');
link.click();
}
});
// Print tiled QR sheet (US Letter)
document.getElementById('qrPrint')?.addEventListener('click', function() {
const canvas = document.getElementById('qrCanvas');
const keyText = document.getElementById('qrKeyDisplay').textContent;
const keyName = document.getElementById('qrKeyName').textContent;
if (canvas && keyText) {
printQrSheet(canvas, keyText, keyName);
}
});
// Print QR codes tiled on US Letter paper (8.5" x 11")
function printQrSheet(canvas, keyText, title) {
const qrDataUrl = canvas.toDataURL('image/png');
const printWindow = window.open('', '_blank');
if (!printWindow) {
alert('Please allow popups to print');
return;
}
// US Letter: 8.5" x 11" - create 4x5 grid of QR codes
const cols = 4;
const rows = 5;
// Split key into two lines (4 groups each)
const keyParts = keyText.split('-');
const keyLine1 = keyParts.slice(0, 4).join('-');
const keyLine2 = keyParts.slice(4).join('-');
let qrGrid = '';
for (let i = 0; i < rows * cols; i++) {
qrGrid += `
<div class="qr-tile">
<div class="key-text">${keyLine1}</div>
<img src="${qrDataUrl}" alt="QR">
<div class="key-text">${keyLine2}</div>
</div>
`;
}
printWindow.document.write(`
<!DOCTYPE html>
<html>
<head>
<title></title>
<style>
@page {
size: letter;
margin: 0.2in;
margin-top: 0.1in;
margin-bottom: 0.1in;
}
@media print {
@page { margin: 0.15in; }
html, body { margin: 0; padding: 0; }
}
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: 'Courier New', monospace;
background: white;
}
.grid {
display: grid;
grid-template-columns: repeat(${cols}, 1fr);
gap: 0;
margin-top: 0.09in;
}
.qr-tile {
border: 1px dashed #ccc;
padding: 0.04in;
text-align: center;
page-break-inside: avoid;
}
.qr-tile img {
width: 1.6in;
height: 1.6in;
}
.key-text {
font-size: 10pt;
font-weight: bold;
color: #333;
line-height: 1.2;
}
.footer {
display: none;
}
</style>
</head>
<body>
<div class="grid">${qrGrid}</div>
<div class="footer">Cut along dashed lines</div>
<script>
window.onload = function() { window.print(); };
<\/script>
</body>
</html>
`);
printWindow.document.close();
}
{% endif %}
</script>
{% endblock %}

View File

@ -0,0 +1,50 @@
{% extends "base.html" %}
{% block title %}Password Reset - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card border-warning">
<div class="card-header bg-warning text-dark">
<i class="bi bi-key fs-4 me-2"></i>
<span class="fs-5">Password Reset</span>
</div>
<div class="card-body">
<div class="alert alert-warning">
<i class="bi bi-exclamation-triangle me-2"></i>
<strong>Important:</strong> This password will only be shown once.
Make sure to share it with <strong>{{ username }}</strong> securely.
</div>
<p class="text-muted">
The user's sessions have been invalidated. They will need to log in
again with the new password.
</p>
<div class="mb-4">
<label class="form-label text-muted small">New Password for {{ username }}</label>
<div class="input-group">
<input type="text" class="form-control form-control-lg font-monospace"
value="{{ password }}" readonly id="passwordField">
<button class="btn btn-outline-secondary" type="button"
onclick="copyField('passwordField')" title="Copy password">
<i class="bi bi-clipboard"></i>
</button>
</div>
</div>
<div class="d-grid">
<a href="{{ url_for('admin_users') }}" class="btn btn-primary">
<i class="bi bi-arrow-left me-2"></i>Back to Users
</a>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script src="{{ url_for('static', filename='js/auth.js') }}"></script>
{% endblock %}

View File

@ -0,0 +1,60 @@
{% extends "base.html" %}
{% block title %}User Created - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card border-success">
<div class="card-header bg-success text-white">
<i class="bi bi-check-circle fs-4 me-2"></i>
<span class="fs-5">User Created Successfully</span>
</div>
<div class="card-body">
<div class="alert alert-warning">
<i class="bi bi-exclamation-triangle me-2"></i>
<strong>Important:</strong> This password will only be shown once.
Make sure to share it with the user securely.
</div>
<div class="mb-3">
<label class="form-label text-muted small">Username</label>
<div class="input-group">
<input type="text" class="form-control form-control-lg font-monospace"
value="{{ username }}" readonly id="usernameField">
<button class="btn btn-outline-secondary" type="button"
onclick="copyField('usernameField')" title="Copy username">
<i class="bi bi-clipboard"></i>
</button>
</div>
</div>
<div class="mb-4">
<label class="form-label text-muted small">Password</label>
<div class="input-group">
<input type="text" class="form-control form-control-lg font-monospace"
value="{{ password }}" readonly id="passwordField">
<button class="btn btn-outline-secondary" type="button"
onclick="copyField('passwordField')" title="Copy password">
<i class="bi bi-clipboard"></i>
</button>
</div>
</div>
<div class="d-grid gap-2">
<a href="{{ url_for('admin_user_new') }}" class="btn btn-primary">
<i class="bi bi-person-plus me-2"></i>Add Another User
</a>
<a href="{{ url_for('admin_users') }}" class="btn btn-outline-secondary">
<i class="bi bi-arrow-left me-2"></i>Back to Users
</a>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script src="{{ url_for('static', filename='js/auth.js') }}"></script>
{% endblock %}

View File

@ -0,0 +1,166 @@
{% extends "base.html" %}
{% block title %}Add User - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card">
<div class="card-header">
<i class="bi bi-person-plus fs-4 me-2"></i>
<span class="fs-5">Add New User</span>
</div>
<div class="card-body">
<form id="createUserForm">
<div class="mb-3">
<label class="form-label">
<i class="bi bi-person me-1"></i> Username
</label>
<input type="text" name="username" id="usernameInput" class="form-control"
placeholder="e.g., john_doe or john@example.com"
pattern="[a-zA-Z0-9][a-zA-Z0-9_\-@.]{2,79}"
title="3-80 characters, letters/numbers/underscore/hyphen/@/."
required autofocus>
<div class="form-text">
Letters, numbers, underscore, hyphen, @ and . allowed.
</div>
</div>
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key me-1"></i> Password
</label>
<div class="input-group">
<input type="text" name="password" id="passwordInput"
class="form-control" value="{{ temp_password }}"
minlength="8" required>
<button class="btn btn-outline-secondary" type="button"
onclick="regeneratePassword()" title="Generate new password">
<i class="bi bi-arrow-repeat"></i>
</button>
</div>
<div class="form-text">
Auto-generated password. You can edit or regenerate it.
</div>
</div>
<div id="errorAlert" class="alert alert-danger d-none"></div>
<div class="d-flex gap-2">
<button type="submit" class="btn btn-primary flex-grow-1" id="createBtn">
<i class="bi bi-person-check me-2"></i>Create User
</button>
<a href="{{ url_for('admin_users') }}" class="btn btn-outline-secondary">
Cancel
</a>
</div>
</form>
</div>
</div>
</div>
</div>
<!-- Success Modal -->
<div class="modal fade" id="successModal" tabindex="-1" data-bs-backdrop="static">
<div class="modal-dialog modal-dialog-centered">
<div class="modal-content border-success">
<div class="modal-header bg-success text-white">
<h5 class="modal-title">
<i class="bi bi-check-circle me-2"></i>User Created
</h5>
</div>
<div class="modal-body">
<div class="alert alert-warning mb-3 py-2">
<i class="bi bi-exclamation-triangle me-1"></i>
Password shown once. Copy it now.
</div>
<div class="row mb-3">
<div class="col-6">
<label class="form-label text-muted small mb-1">Username</label>
<div class="input-group">
<input type="text" class="form-control font-monospace"
id="createdUsername" readonly>
<button class="btn btn-outline-secondary" type="button"
onclick="copyField('createdUsername')" title="Copy">
<i class="bi bi-clipboard"></i>
</button>
</div>
</div>
<div class="col-6">
<label class="form-label text-muted small mb-1">Password</label>
<div class="input-group">
<input type="text" class="form-control font-monospace"
id="createdPassword" readonly>
<button class="btn btn-outline-secondary" type="button"
onclick="copyField('createdPassword')" title="Copy">
<i class="bi bi-clipboard"></i>
</button>
</div>
</div>
</div>
<div class="d-flex justify-content-end gap-2">
<button type="button" class="btn btn-primary" onclick="addAnother()">
<i class="bi bi-person-plus me-1"></i>Add Another
</button>
<a href="{{ url_for('admin_users') }}" class="btn btn-outline-secondary">
Done
</a>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script src="{{ url_for('static', filename='js/auth.js') }}"></script>
<script>
const form = document.getElementById('createUserForm');
const errorAlert = document.getElementById('errorAlert');
const createBtn = document.getElementById('createBtn');
const successModal = new bootstrap.Modal(document.getElementById('successModal'));
form.addEventListener('submit', async function(e) {
e.preventDefault();
errorAlert.classList.add('d-none');
createBtn.disabled = true;
createBtn.innerHTML = '<span class="spinner-border spinner-border-sm me-2"></span>Creating...';
const formData = new FormData(form);
try {
const response = await fetch('{{ url_for("admin_user_new") }}', {
method: 'POST',
body: formData,
headers: { 'X-Requested-With': 'XMLHttpRequest' }
});
const data = await response.json();
if (data.success) {
document.getElementById('createdUsername').value = data.username;
document.getElementById('createdPassword').value = data.password;
successModal.show();
} else {
errorAlert.textContent = data.error;
errorAlert.classList.remove('d-none');
}
} catch (err) {
errorAlert.textContent = 'An error occurred. Please try again.';
errorAlert.classList.remove('d-none');
}
createBtn.disabled = false;
createBtn.innerHTML = '<i class="bi bi-person-check me-2"></i>Create User';
});
function addAnother() {
successModal.hide();
document.getElementById('usernameInput').value = '';
regeneratePassword();
document.getElementById('usernameInput').focus();
}
</script>
{% endblock %}

View File

@ -0,0 +1,55 @@
{% extends "base.html" %}
{% block title %}Login - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-5 col-lg-4">
<div class="card">
<div class="card-header text-center">
<i class="bi bi-shield-lock fs-1 d-block mb-2"></i>
<h5 class="mb-0">Login</h5>
</div>
<div class="card-body">
<form method="POST" action="{{ url_for('login') }}">
<div class="mb-3">
<label class="form-label">
<i class="bi bi-person me-1"></i> Username
</label>
<input type="text" name="username" class="form-control"
placeholder="Enter your username" required autofocus>
</div>
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key me-1"></i> Password
</label>
<div class="input-group">
<input type="password" name="password" class="form-control"
id="passwordInput" required>
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<button type="submit" class="btn btn-primary w-100">
<i class="bi bi-box-arrow-in-right me-2"></i>Login
</button>
</form>
<div class="text-center mt-3">
<a href="{{ url_for('recover') }}" class="text-muted small">
<i class="bi bi-key me-1"></i> Forgot password?
</a>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script src="{{ url_for('static', filename='js/auth.js') }}"></script>
{% endblock %}

View File

@ -0,0 +1,129 @@
{% extends "base.html" %}
{% block title %}Password Recovery - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card">
<div class="card-header text-center">
<i class="bi bi-shield-lock fs-1 d-block mb-2"></i>
<h5 class="mb-0">Password Recovery</h5>
</div>
<div class="card-body">
<p class="text-muted text-center mb-4">
Enter your recovery key to reset your admin password.
</p>
<!-- Extract from Stego Backup -->
<div class="accordion mb-3" id="stegoAccordion">
<div class="accordion-item">
<h2 class="accordion-header">
<button class="accordion-button collapsed py-2" type="button"
data-bs-toggle="collapse" data-bs-target="#stegoExtract">
<i class="bi bi-incognito me-2"></i>
<small>Extract from stego backup</small>
</button>
</h2>
<div id="stegoExtract" class="accordion-collapse collapse"
data-bs-parent="#stegoAccordion">
<div class="accordion-body py-2">
<form method="POST" action="{{ url_for('recover_from_stego') }}"
enctype="multipart/form-data">
<div class="mb-2">
<label class="form-label small mb-1">Stego Image</label>
<input type="file" name="stego_image"
class="form-control form-control-sm"
accept="image/*" required>
</div>
<div class="mb-2">
<label class="form-label small mb-1">Original Reference</label>
<input type="file" name="reference_image"
class="form-control form-control-sm"
accept="image/*" required>
</div>
<button type="submit" class="btn btn-sm btn-outline-primary w-100">
<i class="bi bi-unlock me-1"></i> Extract Key
</button>
</form>
</div>
</div>
</div>
</div>
<form method="POST" action="{{ url_for('recover') }}" id="recoverForm">
<!-- Recovery Key Input -->
<div class="mb-3">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> Recovery Key
</label>
<textarea name="recovery_key" class="form-control font-monospace"
rows="2" required
placeholder="XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
style="font-size: 0.9em;">{{ prefilled_key or '' }}</textarea>
<div class="form-text">
Paste your full recovery key (with or without dashes)
</div>
</div>
<hr>
<!-- New Password -->
<div class="mb-3">
<label class="form-label">
<i class="bi bi-lock me-1"></i> New Password
</label>
<div class="input-group">
<input type="password" name="new_password" class="form-control"
id="passwordInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
<div class="form-text">Minimum 8 characters</div>
</div>
<!-- Confirm Password -->
<div class="mb-4">
<label class="form-label">
<i class="bi bi-lock-fill me-1"></i> Confirm Password
</label>
<div class="input-group">
<input type="password" name="new_password_confirm" class="form-control"
id="passwordConfirmInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordConfirmInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<button type="submit" class="btn btn-primary w-100">
<i class="bi bi-check-lg me-2"></i>Reset Password
</button>
</form>
<div class="text-center mt-3">
<a href="{{ url_for('login') }}" class="text-muted small">
<i class="bi bi-arrow-left me-1"></i> Back to Login
</a>
</div>
</div>
</div>
<div class="alert alert-warning mt-4 small">
<i class="bi bi-exclamation-triangle me-2"></i>
<strong>Note:</strong> This will reset the admin password. If you don't have a valid recovery key,
you'll need to delete the database and reconfigure Stegasoo.
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script src="{{ url_for('static', filename='js/auth.js') }}"></script>
<script>
StegasooAuth.initPasswordConfirmation('recoverForm', 'passwordInput', 'passwordConfirmInput');
</script>
{% endblock %}

View File

@ -0,0 +1,183 @@
{% extends "base.html" %}
{% block title %}Regenerate Recovery Key - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-8 col-lg-6">
<div class="card">
<div class="card-header text-center">
<i class="bi bi-arrow-repeat fs-1 d-block mb-2"></i>
<h5 class="mb-0">{{ 'Regenerate' if has_existing else 'Generate' }} Recovery Key</h5>
</div>
<div class="card-body">
{% if has_existing %}
<!-- Warning for existing key -->
<div class="alert alert-warning">
<i class="bi bi-exclamation-triangle me-2"></i>
<strong>Warning:</strong> Your existing recovery key will be invalidated.
Make sure to save this new key before continuing.
</div>
{% else %}
<!-- Info for first-time setup -->
<div class="alert alert-info">
<i class="bi bi-info-circle me-2"></i>
<strong>What is a recovery key?</strong><br>
If you forget your admin password, this key is the ONLY way to reset it.
</div>
{% endif %}
<!-- Recovery Key Display -->
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> Your New Recovery Key
</label>
<div class="input-group">
<input type="text" class="form-control font-monospace text-center"
id="recoveryKey" value="{{ recovery_key }}" readonly
style="font-size: 1.1em; letter-spacing: 0.5px;">
<button class="btn btn-outline-secondary" type="button"
onclick="copyToClipboard()" title="Copy to clipboard">
<i class="bi bi-clipboard" id="copyIcon"></i>
</button>
</div>
</div>
<!-- QR Code (if available) -->
{% if qr_base64 %}
<div class="mb-4 text-center">
<label class="form-label d-block">
<i class="bi bi-qr-code me-1"></i> QR Code
</label>
<img src="data:image/png;base64,{{ qr_base64 }}"
alt="Recovery Key QR Code" class="img-fluid border rounded"
style="max-width: 200px;" id="qrImage">
</div>
{% endif %}
<!-- Download Options -->
<div class="mb-4">
<label class="form-label">
<i class="bi bi-download me-1"></i> Download Options
</label>
<div class="d-flex gap-2 flex-wrap">
<button class="btn btn-outline-primary btn-sm" onclick="downloadTextFile()">
<i class="bi bi-file-text me-1"></i> Text File
</button>
{% if qr_base64 %}
<button class="btn btn-outline-primary btn-sm" onclick="downloadQRImage()">
<i class="bi bi-image me-1"></i> QR Image
</button>
{% endif %}
</div>
</div>
<!-- Stego Backup Option -->
<div class="mb-4">
<label class="form-label">
<i class="bi bi-incognito me-1"></i> Hide in Image
</label>
<form method="POST" action="{{ url_for('create_stego_backup') }}"
enctype="multipart/form-data" class="d-flex gap-2 align-items-end">
<input type="hidden" name="recovery_key" value="{{ recovery_key }}">
<div class="flex-grow-1">
<input type="file" name="carrier_image" class="form-control form-control-sm"
accept="image/jpeg,image/png" required>
<div class="form-text">JPG/PNG, 50KB-2MB</div>
</div>
<button type="submit" class="btn btn-outline-secondary btn-sm">
<i class="bi bi-download me-1"></i> Stego
</button>
</form>
</div>
<hr>
<!-- Confirmation Form -->
<form method="POST" id="recoveryForm">
<input type="hidden" name="recovery_key" value="{{ recovery_key }}">
<!-- Confirm checkbox -->
<div class="form-check mb-3">
<input class="form-check-input" type="checkbox" id="confirmSaved"
onchange="updateButtons()">
<label class="form-check-label" for="confirmSaved">
I have saved my recovery key in a secure location
</label>
</div>
<div class="d-flex gap-2 justify-content-between">
<!-- Cancel button -->
<button type="submit" name="action" value="cancel"
class="btn btn-outline-secondary">
<i class="bi bi-x-lg me-1"></i> Cancel
</button>
<!-- Save button -->
<button type="submit" name="action" value="save"
class="btn btn-primary" id="saveBtn" disabled>
<i class="bi bi-check-lg me-1"></i> Save New Key
</button>
</div>
</form>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
// Copy recovery key to clipboard
function copyToClipboard() {
const keyInput = document.getElementById('recoveryKey');
navigator.clipboard.writeText(keyInput.value).then(() => {
const icon = document.getElementById('copyIcon');
icon.className = 'bi bi-clipboard-check';
setTimeout(() => { icon.className = 'bi bi-clipboard'; }, 2000);
});
}
// Download as text file
function downloadTextFile() {
const key = document.getElementById('recoveryKey').value;
const content = `Stegasoo Recovery Key
=====================
${key}
IMPORTANT:
- Keep this file in a secure location
- Anyone with this key can reset admin passwords
- Do not store with your password
Generated: ${new Date().toISOString()}
`;
const blob = new Blob([content], { type: 'text/plain' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'stegasoo-recovery-key.txt';
a.click();
URL.revokeObjectURL(url);
}
// Download QR as image
function downloadQRImage() {
const img = document.getElementById('qrImage');
if (!img) return;
const a = document.createElement('a');
a.href = img.src;
a.download = 'stegasoo-recovery-qr.png';
a.click();
}
// Enable save button when checkbox is checked
function updateButtons() {
const checkbox = document.getElementById('confirmSaved');
const saveBtn = document.getElementById('saveBtn');
saveBtn.disabled = !checkbox.checked;
}
</script>
{% endblock %}

View File

@ -0,0 +1,76 @@
{% extends "base.html" %}
{% block title %}Setup - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card">
<div class="card-header text-center">
<i class="bi bi-gear-fill fs-1 d-block mb-2"></i>
<h5 class="mb-0">Initial Setup</h5>
</div>
<div class="card-body">
<p class="text-muted text-center mb-4">
Welcome to Stegasoo! Create your admin account to get started.
</p>
<form method="POST" action="{{ url_for('setup') }}" id="setupForm">
<div class="mb-3">
<label class="form-label">
<i class="bi bi-person me-1"></i> Username
</label>
<input type="text" name="username" class="form-control"
value="admin" required minlength="3">
</div>
<div class="mb-3">
<label class="form-label">
<i class="bi bi-key me-1"></i> Password
</label>
<div class="input-group">
<input type="password" name="password" class="form-control"
id="passwordInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
<div class="form-text">Minimum 8 characters</div>
</div>
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> Confirm Password
</label>
<div class="input-group">
<input type="password" name="password_confirm" class="form-control"
id="passwordConfirmInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordConfirmInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<button type="submit" class="btn btn-primary w-100">
<i class="bi bi-check-lg me-2"></i>Create Admin Account
</button>
</form>
</div>
</div>
<div class="alert alert-info mt-4 small">
<i class="bi bi-info-circle me-2"></i>
This is a single-user setup. The admin account has full access to all features.
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script src="{{ url_for('static', filename='js/auth.js') }}"></script>
<script>
StegasooAuth.initPasswordConfirmation('setupForm', 'passwordInput', 'passwordConfirmInput');
</script>
{% endblock %}

View File

@ -0,0 +1,176 @@
{% extends "base.html" %}
{% block title %}Recovery Key Setup - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-8 col-lg-6">
<div class="card">
<div class="card-header text-center">
<i class="bi bi-shield-lock fs-1 d-block mb-2"></i>
<h5 class="mb-0">Recovery Key Setup</h5>
<small class="text-muted">Step 2 of 2</small>
</div>
<div class="card-body">
<!-- Explanation -->
<div class="alert alert-info">
<i class="bi bi-info-circle me-2"></i>
<strong>What is a recovery key?</strong><br>
If you forget your admin password, this key is the ONLY way to reset it.
Save it somewhere safe - it will not be shown again.
</div>
<!-- Recovery Key Display -->
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> Your Recovery Key
</label>
<div class="input-group">
<input type="text" class="form-control font-monospace text-center"
id="recoveryKey" value="{{ recovery_key }}" readonly
style="font-size: 1.1em; letter-spacing: 0.5px;">
<button class="btn btn-outline-secondary" type="button"
onclick="copyToClipboard()" title="Copy to clipboard">
<i class="bi bi-clipboard" id="copyIcon"></i>
</button>
</div>
</div>
<!-- QR Code (if available) -->
{% if qr_base64 %}
<div class="mb-4 text-center">
<label class="form-label d-block">
<i class="bi bi-qr-code me-1"></i> QR Code
</label>
<img src="data:image/png;base64,{{ qr_base64 }}"
alt="Recovery Key QR Code" class="img-fluid border rounded"
style="max-width: 200px;" id="qrImage">
<div class="mt-2">
<small class="text-muted">Scan with your phone's camera app</small>
</div>
</div>
{% endif %}
<!-- Download Options -->
<div class="mb-4">
<label class="form-label">
<i class="bi bi-download me-1"></i> Download Options
</label>
<div class="d-flex gap-2 flex-wrap">
<button class="btn btn-outline-primary btn-sm" onclick="downloadTextFile()">
<i class="bi bi-file-text me-1"></i> Text File
</button>
{% if qr_base64 %}
<button class="btn btn-outline-primary btn-sm" onclick="downloadQRImage()">
<i class="bi bi-image me-1"></i> QR Image
</button>
{% endif %}
</div>
</div>
<hr>
<!-- Confirmation Form -->
<form method="POST" id="recoveryForm">
<input type="hidden" name="recovery_key" value="{{ recovery_key }}">
<!-- Confirm checkbox -->
<div class="form-check mb-3">
<input class="form-check-input" type="checkbox" id="confirmSaved"
onchange="updateButtons()">
<label class="form-check-label" for="confirmSaved">
I have saved my recovery key in a secure location
</label>
</div>
<div class="d-flex gap-2 justify-content-between">
<!-- Skip button (no recovery) -->
<button type="submit" name="action" value="skip"
class="btn btn-outline-secondary"
onclick="return confirm('Are you sure? Without a recovery key, there is NO way to reset your password if you forget it.')">
<i class="bi bi-skip-forward me-1"></i> Skip (No Recovery)
</button>
<!-- Save button (with key) -->
<button type="submit" name="action" value="save"
class="btn btn-primary" id="saveBtn" disabled>
<i class="bi bi-check-lg me-1"></i> Continue
</button>
</div>
</form>
</div>
</div>
<!-- Security Notes -->
<div class="card mt-3">
<div class="card-header">
<i class="bi bi-shield-check me-2"></i>Security Notes
</div>
<div class="card-body small">
<ul class="mb-0">
<li>The recovery key is <strong>not stored</strong> - only a hash is saved</li>
<li>Keep it separate from your password (different location)</li>
<li>Anyone with this key can reset admin passwords</li>
<li>If you lose it and forget your password, you must recreate the database</li>
</ul>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
// Copy recovery key to clipboard
function copyToClipboard() {
const keyInput = document.getElementById('recoveryKey');
navigator.clipboard.writeText(keyInput.value).then(() => {
const icon = document.getElementById('copyIcon');
icon.className = 'bi bi-clipboard-check';
setTimeout(() => { icon.className = 'bi bi-clipboard'; }, 2000);
});
}
// Download as text file
function downloadTextFile() {
const key = document.getElementById('recoveryKey').value;
const content = `Stegasoo Recovery Key
=====================
${key}
IMPORTANT:
- Keep this file in a secure location
- Anyone with this key can reset admin passwords
- Do not store with your password
Generated: ${new Date().toISOString()}
`;
const blob = new Blob([content], { type: 'text/plain' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'stegasoo-recovery-key.txt';
a.click();
URL.revokeObjectURL(url);
}
// Download QR as image
function downloadQRImage() {
const img = document.getElementById('qrImage');
if (!img) return;
const a = document.createElement('a');
a.href = img.src;
a.download = 'stegasoo-recovery-qr.png';
a.click();
}
// Enable save button when checkbox is checked
function updateButtons() {
const checkbox = document.getElementById('confirmSaved');
const saveBtn = document.getElementById('saveBtn');
saveBtn.disabled = !checkbox.checked;
}
</script>
{% endblock %}