Files
stegasoo/frontends/api/main.py
Aaron D. Lee 34ede3815f Add API key authentication and TLS support
API Authentication (v4.2.1):
- API key auth via X-API-Key header
- Keys hashed (SHA-256) and stored in ~/.stegasoo/api_keys.json
- Auth disabled when no keys configured
- Protected endpoints: encode, decode, generate, channel/*, compare, etc.
- Public endpoints: /, /docs, /modes, /auth/status, /channel/status

TLS Support:
- Auto-generates self-signed certs on first run
- Certs include localhost, local IPs, hostname.local
- Stored in ~/.stegasoo/certs/

CLI Commands:
- stegasoo api keys list/create/delete
- stegasoo api tls generate/info
- stegasoo api serve (starts with TLS by default)

Updated systemd service to use TLS.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 18:03:51 -05:00

1623 lines
53 KiB
Python

#!/usr/bin/env python3
"""
Stegasoo REST API (v4.2.0)
FastAPI-based REST API for steganography operations.
Supports both text messages and file embedding.
CHANGES in v4.2.0:
- Async encode/decode operations (run in thread pool)
- Server can handle concurrent requests without blocking
CHANGES in v4.0.0:
- Added channel key support for deployment/group isolation
- New /channel endpoints for key management
- channel_key parameter on encode/decode endpoints
- Messages encoded with channel key require same key to decode
CHANGES in v3.2.0:
- Removed date dependency from all operations
- Renamed day_phrase → passphrase
- No date_str parameters needed
- Simplified API for asynchronous communications
NEW in v3.0: LSB and DCT embedding modes.
NEW in v3.0.1: DCT color mode and JPEG output format.
"""
import asyncio
import base64
import sys
from functools import partial
from pathlib import Path
from typing import Literal
from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, UploadFile
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel, Field
# API Key Authentication
try:
from .auth import (
require_api_key,
get_api_key_status,
add_api_key,
remove_api_key,
list_api_keys,
is_auth_enabled,
)
except ImportError:
# When running directly (not as package)
from auth import (
require_api_key,
get_api_key_status,
add_api_key,
remove_api_key,
list_api_keys,
is_auth_enabled,
)
# Add parent to path for development
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from stegasoo import (
MAX_FILE_PAYLOAD_SIZE,
CapacityError,
DecryptionError,
FilePayload,
StegasooError,
__version__,
calculate_capacity_by_mode,
clear_channel_key,
compare_modes,
decode,
encode,
generate_channel_key,
generate_credentials,
get_channel_status,
has_argon2,
has_dct_support,
set_channel_key,
validate_channel_key,
validate_image,
will_fit_by_mode,
)
from stegasoo.constants import (
DEFAULT_PASSPHRASE_WORDS,
MAX_PASSPHRASE_WORDS,
MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
VALID_RSA_SIZES,
)
# QR Code utilities
try:
from stegasoo.qr_utils import (
extract_key_from_qr,
generate_qr_ascii,
generate_qr_code,
has_qr_read,
has_qr_write,
)
HAS_QR_READ = has_qr_read()
HAS_QR_WRITE = has_qr_write()
except ImportError:
HAS_QR_READ = False
HAS_QR_WRITE = False
extract_key_from_qr = None
generate_qr_code = None
generate_qr_ascii = None
# ============================================================================
# FASTAPI APP
# ============================================================================
app = FastAPI(
title="Stegasoo API",
description="""
Secure steganography with hybrid authentication. Supports text messages and file embedding.
## Version 4.0.0 Changes
- **Channel key support** - Deployment/group isolation for messages
- **New /channel endpoints** - Generate, view, and manage channel keys
- **channel_key parameter** - Added to encode/decode endpoints
## Version 3.2.0 Changes
- **No date parameters needed** - Encode and decode anytime without tracking dates
- **Single passphrase** - No daily rotation, just use your passphrase
- **True asynchronous communications** - Perfect for dead drops and delayed delivery
## Embedding Modes (v3.0)
- **LSB mode** (default): Spatial LSB embedding, full color output, higher capacity
- **DCT mode**: Frequency domain embedding, ~20% capacity, better stealth
## DCT Options (v3.0.1)
- **dct_color_mode**: 'grayscale' (default) or 'color' (preserves original colors)
- **dct_output_format**: 'png' (lossless) or 'jpeg' (smaller, more natural)
Use the `/modes` endpoint to check availability and `/compare` to compare capacities.
""",
version=__version__,
docs_url="/docs",
redoc_url="/redoc",
)
# ============================================================================
# TYPE ALIASES
# ============================================================================
EmbedModeType = Literal["lsb", "dct"]
ExtractModeType = Literal["auto", "lsb", "dct"]
DctColorModeType = Literal["grayscale", "color"]
DctOutputFormatType = Literal["png", "jpeg"]
# ============================================================================
# MODELS
# ============================================================================
class GenerateRequest(BaseModel):
use_pin: bool = True
use_rsa: bool = False
pin_length: int = Field(default=6, ge=MIN_PIN_LENGTH, le=MAX_PIN_LENGTH)
rsa_bits: int = Field(default=2048)
words_per_passphrase: int = Field(
default=DEFAULT_PASSPHRASE_WORDS,
ge=MIN_PASSPHRASE_WORDS,
le=MAX_PASSPHRASE_WORDS,
description="Words per passphrase (v3.2.0: default increased to 4)",
)
class GenerateResponse(BaseModel):
passphrase: str = Field(description="Single passphrase (v3.2.0: no daily rotation)")
pin: str | None = None
rsa_key_pem: str | None = None
entropy: dict[str, int]
# Legacy field for compatibility
phrases: dict[str, str] | None = Field(
default=None, description="Deprecated: Use 'passphrase' instead"
)
class EncodeRequest(BaseModel):
message: str
reference_photo_base64: str
carrier_image_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key",
)
embed_mode: EmbedModeType = Field(
default="lsb",
description="Embedding mode: 'lsb' (default, color) or 'dct' (requires scipy)",
)
dct_output_format: DctOutputFormatType = Field(
default="png",
description="DCT output format: 'png' (lossless) or 'jpeg' (smaller). Only applies to DCT mode.",
)
dct_color_mode: DctColorModeType = Field(
default="grayscale",
description="DCT color mode: 'grayscale' (default) or 'color' (preserves colors). Only applies to DCT mode.",
)
class EncodeFileRequest(BaseModel):
"""Request for embedding a file (base64-encoded)."""
file_data_base64: str
filename: str
mime_type: str | None = None
reference_photo_base64: str
carrier_image_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key",
)
embed_mode: EmbedModeType = Field(
default="lsb",
description="Embedding mode: 'lsb' (default, color) or 'dct' (requires scipy)",
)
dct_output_format: DctOutputFormatType = Field(
default="png",
description="DCT output format: 'png' (lossless) or 'jpeg' (smaller). Only applies to DCT mode.",
)
dct_color_mode: DctColorModeType = Field(
default="grayscale",
description="DCT color mode: 'grayscale' (default) or 'color' (preserves colors). Only applies to DCT mode.",
)
class EncodeResponse(BaseModel):
stego_image_base64: str
filename: str
capacity_used_percent: float
embed_mode: str = Field(description="Embedding mode used: 'lsb' or 'dct'")
output_format: str = Field(
default="png", description="Output format: 'png' or 'jpeg' (for DCT mode)"
)
color_mode: str = Field(
default="color",
description="Color mode: 'color' (LSB/DCT color) or 'grayscale' (DCT grayscale)",
)
# Channel key info (v4.0.0)
channel_mode: str = Field(default="public", description="Channel mode: 'public' or 'private'")
channel_fingerprint: str | None = Field(
default=None, description="Channel key fingerprint (if private mode)"
)
# Legacy fields (v3.2.0: no longer used in crypto)
date_used: str | None = Field(
default=None, description="Deprecated: Date no longer used in v3.2.0"
)
day_of_week: str | None = Field(
default=None, description="Deprecated: Date no longer used in v3.2.0"
)
class DecodeRequest(BaseModel):
stego_image_base64: str
reference_photo_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: str | None = Field(
default=None,
description="Channel key for decryption. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key",
)
embed_mode: ExtractModeType = Field(
default="auto", description="Extraction mode: 'auto' (default), 'lsb', or 'dct'"
)
class DecodeResponse(BaseModel):
"""Response for decode - can be text or file."""
payload_type: str # 'text' or 'file'
message: str | None = None # For text
file_data_base64: str | None = None # For file (base64-encoded)
filename: str | None = None # For file
mime_type: str | None = None # For file
class ModeCapacity(BaseModel):
"""Capacity info for a single mode."""
capacity_bytes: int
capacity_kb: float
available: bool
output_format: str
class ImageInfoResponse(BaseModel):
width: int
height: int
pixels: int
capacity_bytes: int = Field(description="LSB mode capacity (for backwards compatibility)")
capacity_kb: int = Field(description="LSB mode capacity in KB")
modes: dict[str, ModeCapacity] | None = Field(
default=None, description="Capacity by embedding mode (v3.0+)"
)
class CompareModesRequest(BaseModel):
"""Request for comparing embedding modes."""
carrier_image_base64: str
payload_size: int | None = Field(
default=None, description="Optional payload size to check if it fits"
)
class CompareModesResponse(BaseModel):
"""Response comparing LSB and DCT modes."""
width: int
height: int
lsb: dict
dct: dict
payload_check: dict | None = None
recommendation: str
class DctModeInfo(BaseModel):
"""Detailed DCT mode information."""
available: bool
name: str
description: str
output_formats: list[str]
color_modes: list[str]
capacity_ratio: str
requires: str
class ChannelStatusResponse(BaseModel):
"""Response for channel key status (v4.0.0)."""
mode: str = Field(description="'public' or 'private'")
configured: bool = Field(description="Whether a channel key is configured")
fingerprint: str | None = Field(default=None, description="Key fingerprint (partial)")
source: str | None = Field(default=None, description="Where the key comes from")
key: str | None = Field(default=None, description="Full key (only if reveal=true)")
class ChannelGenerateResponse(BaseModel):
"""Response for channel key generation (v4.0.0)."""
key: str = Field(description="Generated channel key")
fingerprint: str = Field(description="Key fingerprint")
saved: bool = Field(default=False, description="Whether key was saved to config")
save_location: str | None = Field(default=None, description="Where key was saved")
class ChannelSetRequest(BaseModel):
"""Request to set channel key (v4.0.0)."""
key: str = Field(description="Channel key to set")
location: str = Field(default="user", description="'user' or 'project'")
class AuthStatusResponse(BaseModel):
"""Response for API key authentication status."""
enabled: bool = Field(description="Whether API key auth is enabled")
total_keys: int = Field(description="Total number of configured API keys")
user_keys: int = Field(description="Keys in user config")
project_keys: int = Field(description="Keys in project config")
env_configured: bool = Field(description="Whether env var key is set")
class AuthKeyInfo(BaseModel):
"""Info about a single API key (not the actual key)."""
name: str
created: str
class ModesResponse(BaseModel):
"""Response showing available embedding modes."""
lsb: dict
dct: DctModeInfo
# Channel key status (v4.0.0)
channel: dict | None = Field(default=None, description="Channel key status (v4.0.0)")
class StatusResponse(BaseModel):
version: str
has_argon2: bool
has_qrcode_read: bool
has_qrcode_write: bool # v4.2.0: QR generation capability
has_dct: bool
max_payload_kb: int
available_modes: list[str]
dct_features: dict | None = Field(default=None, description="DCT mode features (v3.0.1+)")
# Channel key status (v4.0.0)
channel: dict | None = Field(default=None, description="Channel key status (v4.0.0)")
breaking_changes: dict = Field(description="v4.0.0 breaking changes")
class QrExtractResponse(BaseModel):
success: bool
key_pem: str | None = None
error: str | None = None
class QrGenerateRequest(BaseModel):
"""Request to generate QR code from RSA key."""
key_pem: str = Field(..., description="RSA private key in PEM format")
output_format: str = Field(
default="png",
description="Output format: 'png', 'jpg', or 'ascii'",
)
compress: bool = Field(
default=True,
description="Compress key data with zstd (recommended for larger keys)",
)
class QrGenerateResponse(BaseModel):
"""Response containing generated QR code."""
success: bool
format: str | None = None
qr_data: str | None = Field(
default=None,
description="Base64-encoded image data (for png/jpg) or ASCII string",
)
error: str | None = None
class WillFitRequest(BaseModel):
"""Request to check if payload will fit."""
carrier_image_base64: str
payload_size: int
embed_mode: EmbedModeType = "lsb"
class WillFitResponse(BaseModel):
"""Response for will_fit check."""
fits: bool
payload_size: int
capacity: int
usage_percent: float
headroom: int
mode: str
class ErrorResponse(BaseModel):
error: str
detail: str | None = None
# ============================================================================
# HELPER: RESOLVE CHANNEL KEY
# ============================================================================
def _resolve_channel_key(channel_key: str | None) -> str | None:
"""
Resolve channel key from API parameter.
Wrapper around library's resolve_channel_key with HTTP exception handling.
Returns:
Resolved channel key to pass to encode/decode
Raises:
HTTPException: If key format is invalid
"""
from stegasoo.channel import resolve_channel_key
try:
return resolve_channel_key(channel_key)
except (ValueError, FileNotFoundError) as e:
raise HTTPException(400, str(e))
def _get_channel_info(channel_key: str | None) -> tuple[str, str | None]:
"""
Get channel mode and fingerprint for response.
Uses library's get_channel_response_info for consistent formatting.
Returns:
(mode, fingerprint) tuple
"""
from stegasoo.channel import get_channel_response_info
info = get_channel_response_info(channel_key)
return info["mode"], info.get("fingerprint")
# ============================================================================
# HELPER: ASYNC EXECUTION
# ============================================================================
async def run_in_thread(func, *args, **kwargs):
"""
Run a CPU-bound function in a thread pool.
This allows the FastAPI server to handle other requests while
encode/decode operations are running. Essential for Pi deployments
where operations can take several seconds.
Usage:
result = await run_in_thread(encode, message=msg, carrier_image=carrier, ...)
"""
if kwargs:
func = partial(func, **kwargs)
return await asyncio.to_thread(func, *args)
# ============================================================================
# ROUTES - STATUS & INFO
# ============================================================================
@app.get("/", response_model=StatusResponse)
async def root():
"""Get API status and configuration."""
available_modes = ["lsb"]
dct_features = None
if has_dct_support():
available_modes.append("dct")
dct_features = {
"output_formats": ["png", "jpeg"],
"color_modes": ["grayscale", "color"],
"default_output_format": "png",
"default_color_mode": "grayscale",
}
# Channel key status (v4.0.0)
channel_status = get_channel_status()
channel_info = {
"mode": channel_status["mode"],
"configured": channel_status["configured"],
"fingerprint": channel_status.get("fingerprint"),
"source": channel_status.get("source"),
}
return StatusResponse(
version=__version__,
has_argon2=has_argon2(),
has_qrcode_read=HAS_QR_READ,
has_qrcode_write=HAS_QR_WRITE,
has_dct=has_dct_support(),
max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024,
available_modes=available_modes,
dct_features=dct_features,
channel=channel_info,
breaking_changes={
"v4_channel_key": "Messages encoded with channel key require same key to decode",
"format_version": 5,
"backward_compatible": False,
"v3_notes": {
"date_removed": "No date_str parameter needed - encode/decode anytime",
"passphrase_renamed": "day_phrase → passphrase (single passphrase, no daily rotation)",
},
},
)
@app.get("/modes", response_model=ModesResponse)
async def api_modes():
"""
Get available embedding modes and their status.
v4.0.0: Also includes channel key status.
"""
# Channel status
channel_status = get_channel_status()
channel_info = {
"mode": channel_status["mode"],
"configured": channel_status["configured"],
"fingerprint": channel_status.get("fingerprint"),
}
return ModesResponse(
lsb={
"available": True,
"name": "Spatial LSB",
"description": "Embed in pixel LSBs, outputs PNG/BMP",
"output_format": "PNG (color)",
"capacity_ratio": "100%",
},
dct=DctModeInfo(
available=has_dct_support(),
name="DCT Domain",
description="Embed in DCT coefficients, frequency domain steganography",
output_formats=["png", "jpeg"],
color_modes=["grayscale", "color"],
capacity_ratio="~20% of LSB",
requires="scipy",
),
channel=channel_info,
)
# ============================================================================
# ROUTES - CHANNEL KEY (v4.0.0)
# ============================================================================
@app.get("/channel/status", response_model=ChannelStatusResponse)
async def api_channel_status(
reveal: bool = Query(False, description="Include full key in response")
):
"""
Get current channel key status.
v4.0.0: New endpoint for channel key management.
Returns mode (public/private), fingerprint, and source.
Use reveal=true to include the full key.
"""
status = get_channel_status()
return ChannelStatusResponse(
mode=status["mode"],
configured=status["configured"],
fingerprint=status.get("fingerprint"),
source=status.get("source"),
key=status.get("key") if reveal and status["configured"] else None,
)
@app.post("/channel/generate", response_model=ChannelGenerateResponse)
async def api_channel_generate(
_: str = Depends(require_api_key),
save: bool = Query(False, description="Save to user config"),
save_project: bool = Query(False, description="Save to project config"),
):
"""
Generate a new channel key.
v4.0.0: New endpoint for channel key management.
Optionally saves to user config (~/.stegasoo/channel.key) or
project config (./config/channel.key).
"""
if save and save_project:
raise HTTPException(400, "Cannot use both save and save_project")
key = generate_channel_key()
fingerprint = f"{key[:4]}-••••-••••-••••-••••-••••-••••-{key[-4:]}"
saved = False
save_location = None
if save:
set_channel_key(key, location="user")
saved = True
save_location = "~/.stegasoo/channel.key"
elif save_project:
set_channel_key(key, location="project")
saved = True
save_location = "./config/channel.key"
return ChannelGenerateResponse(
key=key,
fingerprint=fingerprint,
saved=saved,
save_location=save_location,
)
@app.post("/channel/set")
async def api_channel_set(request: ChannelSetRequest, _: str = Depends(require_api_key)):
"""
Set/save a channel key to config.
v4.0.0: New endpoint for channel key management.
"""
if not validate_channel_key(request.key):
raise HTTPException(
400, "Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
)
if request.location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
set_channel_key(request.key, location=request.location)
status = get_channel_status()
return {
"success": True,
"location": status.get("source"),
"fingerprint": status.get("fingerprint"),
}
@app.delete("/channel")
async def api_channel_clear(
_: str = Depends(require_api_key),
location: str = Query("user", description="'user', 'project', or 'all'")
):
"""
Clear/remove channel key from config.
v4.0.0: New endpoint for channel key management.
Note: Does not affect environment variables.
"""
if location == "all":
clear_channel_key(location="user")
clear_channel_key(location="project")
elif location in ("user", "project"):
clear_channel_key(location=location)
else:
raise HTTPException(400, "location must be 'user', 'project', or 'all'")
status = get_channel_status()
return {
"success": True,
"mode": status["mode"],
"still_configured": status["configured"],
"remaining_source": status.get("source"),
}
# ============================================================================
# ROUTES - AUTHENTICATION (v4.2.1)
# ============================================================================
@app.get("/auth/status", response_model=AuthStatusResponse)
async def api_auth_status():
"""
Get API key authentication status.
v4.2.1: New endpoint for auth status.
Returns whether auth is enabled and key counts.
"""
status = get_api_key_status()
return AuthStatusResponse(
enabled=status["enabled"],
total_keys=status["total_keys"],
user_keys=status["user_keys"],
project_keys=status["project_keys"],
env_configured=status["env_configured"],
)
@app.get("/auth/keys", response_model=list[AuthKeyInfo])
async def api_auth_list_keys(
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
List configured API keys (names only, not actual keys).
v4.2.1: New endpoint for auth management.
Requires authentication.
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
keys = list_api_keys(location)
return [AuthKeyInfo(name=k["name"], created=k["created"]) for k in keys]
@app.post("/auth/keys")
async def api_auth_create_key(
name: str = Query(..., description="Name for the new API key"),
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
Create a new API key.
v4.2.1: New endpoint for auth management.
Returns the key ONCE - it cannot be retrieved again!
Requires authentication (or no keys configured yet).
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
try:
key = add_api_key(name, location)
return {
"success": True,
"name": name,
"key": key,
"warning": "Save this key now! It cannot be retrieved again.",
}
except ValueError as e:
raise HTTPException(400, str(e))
@app.delete("/auth/keys")
async def api_auth_delete_key(
name: str = Query(..., description="Name of key to delete"),
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
Delete an API key by name.
v4.2.1: New endpoint for auth management.
Requires authentication.
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
if remove_api_key(name, location):
return {"success": True, "deleted": name}
else:
raise HTTPException(404, f"Key '{name}' not found in {location} config")
@app.post("/compare", response_model=CompareModesResponse)
async def api_compare_modes(request: CompareModesRequest, _: str = Depends(require_api_key)):
"""
Compare LSB and DCT embedding modes for a carrier image.
Returns capacity for both modes and recommendation.
Optionally checks if a specific payload size would fit.
"""
try:
carrier = base64.b64decode(request.carrier_image_base64)
comparison = compare_modes(carrier)
response = CompareModesResponse(
width=comparison["width"],
height=comparison["height"],
lsb={
"capacity_bytes": comparison["lsb"]["capacity_bytes"],
"capacity_kb": round(comparison["lsb"]["capacity_kb"], 1),
"available": True,
"output_format": comparison["lsb"]["output"],
},
dct={
"capacity_bytes": comparison["dct"]["capacity_bytes"],
"capacity_kb": round(comparison["dct"]["capacity_kb"], 1),
"available": comparison["dct"]["available"],
"output_formats": ["png", "jpeg"],
"color_modes": ["grayscale", "color"],
"ratio_vs_lsb_percent": round(comparison["dct"]["ratio_vs_lsb"], 1),
},
recommendation=(
"lsb" if not comparison["dct"]["available"] else "dct for stealth, lsb for capacity"
),
)
if request.payload_size:
fits_lsb = request.payload_size <= comparison["lsb"]["capacity_bytes"]
fits_dct = request.payload_size <= comparison["dct"]["capacity_bytes"]
response.payload_check = {
"size_bytes": request.payload_size,
"fits_lsb": fits_lsb,
"fits_dct": fits_dct,
}
# Update recommendation based on payload
if fits_dct and comparison["dct"]["available"]:
response.recommendation = "dct (payload fits, better stealth)"
elif fits_lsb:
response.recommendation = "lsb (payload too large for dct)"
else:
response.recommendation = "none (payload too large for both modes)"
return response
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/will-fit", response_model=WillFitResponse)
async def api_will_fit(request: WillFitRequest, _: str = Depends(require_api_key)):
"""
Check if a payload of given size will fit in the carrier image.
Supports both LSB and DCT modes.
"""
try:
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
carrier = base64.b64decode(request.carrier_image_base64)
result = will_fit_by_mode(request.payload_size, carrier, embed_mode=request.embed_mode)
return WillFitResponse(
fits=result["fits"],
payload_size=result["payload_size"],
capacity=result["capacity"],
usage_percent=round(result["usage_percent"], 1),
headroom=result["headroom"],
mode=request.embed_mode,
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - QR CODE
# ============================================================================
@app.post("/extract-key-from-qr", response_model=QrExtractResponse)
async def api_extract_key_from_qr(
_: str = Depends(require_api_key),
qr_image: UploadFile = File(..., description="QR code image containing RSA key")
):
"""
Extract RSA key from a QR code image.
Supports both compressed (STEGASOO-Z: prefix) and uncompressed keys.
Returns the PEM-encoded key if found.
"""
if not HAS_QR_READ:
raise HTTPException(501, "QR code reading not available. Install pyzbar and libzbar.")
try:
image_data = await qr_image.read()
key_pem = extract_key_from_qr(image_data)
if key_pem:
return QrExtractResponse(success=True, key_pem=key_pem)
else:
return QrExtractResponse(success=False, error="No valid RSA key found in QR code")
except Exception as e:
return QrExtractResponse(success=False, error=str(e))
@app.post("/generate-key-qr", response_model=QrGenerateResponse)
async def api_generate_key_qr(request: QrGenerateRequest, _: str = Depends(require_api_key)):
"""
Generate QR code from an RSA private key.
Supports PNG, JPG, and ASCII output formats.
Uses zstd compression by default for better QR code density.
"""
if not HAS_QR_WRITE:
raise HTTPException(501, "QR code generation not available. Install qrcode library.")
try:
fmt = request.output_format.lower()
if fmt == "ascii":
ascii_qr = generate_qr_ascii(
request.key_pem,
compress=request.compress,
invert=False,
)
return QrGenerateResponse(success=True, format="ascii", qr_data=ascii_qr)
elif fmt in ("png", "jpg", "jpeg"):
import base64
qr_bytes = generate_qr_code(
request.key_pem,
compress=request.compress,
output_format=fmt,
)
qr_b64 = base64.b64encode(qr_bytes).decode("ascii")
return QrGenerateResponse(success=True, format=fmt, qr_data=qr_b64)
else:
return QrGenerateResponse(
success=False,
error=f"Unsupported format: {fmt}. Use 'png', 'jpg', or 'ascii'",
)
except ValueError as e:
return QrGenerateResponse(success=False, error=str(e))
except Exception as e:
return QrGenerateResponse(success=False, error=f"QR generation failed: {e}")
# ============================================================================
# ROUTES - GENERATE
# ============================================================================
@app.post("/generate", response_model=GenerateResponse)
async def api_generate(request: GenerateRequest, _: str = Depends(require_api_key)):
"""
Generate credentials for encoding/decoding.
At least one of use_pin or use_rsa must be True.
v3.2.0: Generates single passphrase (no daily rotation).
Default increased to 4 words for better security.
"""
if not request.use_pin and not request.use_rsa:
raise HTTPException(400, "Must enable at least one of use_pin or use_rsa")
if request.rsa_bits not in VALID_RSA_SIZES:
raise HTTPException(400, f"rsa_bits must be one of {VALID_RSA_SIZES}")
try:
creds = generate_credentials(
use_pin=request.use_pin,
use_rsa=request.use_rsa,
pin_length=request.pin_length,
rsa_bits=request.rsa_bits,
passphrase_words=request.words_per_passphrase,
)
return GenerateResponse(
passphrase=creds.passphrase,
pin=creds.pin,
rsa_key_pem=creds.rsa_key_pem,
entropy={
"passphrase": creds.passphrase_entropy,
"pin": creds.pin_entropy,
"rsa": creds.rsa_entropy,
"total": creds.total_entropy,
},
phrases=None, # Legacy field removed
)
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# HELPER FUNCTION FOR DCT PARAMETERS
# ============================================================================
def _get_dct_params(embed_mode: str, dct_output_format: str, dct_color_mode: str) -> dict:
"""
Get DCT-specific parameters if DCT mode is selected.
Returns kwargs to pass to encode().
"""
if embed_mode != "dct":
return {}
return {
"dct_output_format": dct_output_format,
"dct_color_mode": dct_color_mode,
}
def _get_output_info(embed_mode: str, dct_output_format: str, dct_color_mode: str) -> tuple:
"""
Get output format and color mode strings for response.
Returns (output_format, color_mode, mime_type).
"""
if embed_mode == "dct":
output_format = dct_output_format
color_mode = dct_color_mode
mime_type = "image/jpeg" if dct_output_format == "jpeg" else "image/png"
else:
output_format = "png"
color_mode = "color"
mime_type = "image/png"
return output_format, color_mode, mime_type
# ============================================================================
# ROUTES - ENCODE (JSON)
# ============================================================================
@app.post("/encode", response_model=EncodeResponse)
async def api_encode(request: EncodeRequest, _: str = Depends(require_api_key)):
"""
Encode a text message into an image.
Images must be base64-encoded. Returns base64-encoded stego image.
v4.0.0: Added channel_key parameter for deployment isolation.
v3.2.0: No date_str parameter needed - encode anytime!
"""
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
ref_photo = base64.b64decode(request.reference_photo_base64)
carrier = base64.b64decode(request.carrier_image_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
# Get DCT parameters
dct_params = _get_dct_params(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# v4.2.0: Run CPU-bound encode in thread pool
result = await run_in_thread(
encode,
message=request.message,
reference_photo=ref_photo,
carrier_image=carrier,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
**dct_params,
)
stego_b64 = base64.b64encode(result.stego_image).decode("utf-8")
output_format, color_mode, _ = _get_output_info(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# Get channel info for response
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return EncodeResponse(
stego_image_base64=stego_b64,
filename=result.filename,
capacity_used_percent=result.capacity_percent,
embed_mode=request.embed_mode,
output_format=output_format,
color_mode=color_mode,
channel_mode=channel_mode,
channel_fingerprint=channel_fingerprint,
date_used=None,
day_of_week=None,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/encode/file", response_model=EncodeResponse)
async def api_encode_file(request: EncodeFileRequest, _: str = Depends(require_api_key)):
"""
Encode a file into an image (JSON with base64).
File data must be base64-encoded.
v4.0.0: Added channel_key parameter for deployment isolation.
v3.2.0: No date_str parameter needed - encode anytime!
"""
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
file_data = base64.b64decode(request.file_data_base64)
ref_photo = base64.b64decode(request.reference_photo_base64)
carrier = base64.b64decode(request.carrier_image_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
payload = FilePayload(
data=file_data, filename=request.filename, mime_type=request.mime_type
)
# Get DCT parameters
dct_params = _get_dct_params(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# v4.2.0: Run CPU-bound encode in thread pool
result = await run_in_thread(
encode,
message=payload,
reference_photo=ref_photo,
carrier_image=carrier,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
**dct_params,
)
stego_b64 = base64.b64encode(result.stego_image).decode("utf-8")
output_format, color_mode, _ = _get_output_info(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# Get channel info for response
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return EncodeResponse(
stego_image_base64=stego_b64,
filename=result.filename,
capacity_used_percent=result.capacity_percent,
embed_mode=request.embed_mode,
output_format=output_format,
color_mode=color_mode,
channel_mode=channel_mode,
channel_fingerprint=channel_fingerprint,
date_used=None,
day_of_week=None,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - DECODE (JSON)
# ============================================================================
@app.post("/decode", response_model=DecodeResponse)
async def api_decode(request: DecodeRequest, _: str = Depends(require_api_key)):
"""
Decode a message or file from a stego image.
Returns payload_type to indicate if result is text or file.
v4.0.0: Added channel_key parameter - must match encoding key.
v3.2.0: No date_str parameter needed - decode anytime!
"""
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
stego = base64.b64decode(request.stego_image_base64)
ref_photo = base64.b64decode(request.reference_photo_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
# v4.2.0: Run CPU-bound decode in thread pool
result = await run_in_thread(
decode,
stego_image=stego,
reference_photo=ref_photo,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
)
if result.is_file:
return DecodeResponse(
payload_type="file",
file_data_base64=base64.b64encode(result.file_data).decode("utf-8"),
filename=result.filename,
mime_type=result.mime_type,
)
else:
return DecodeResponse(payload_type="text", message=result.message)
except DecryptionError as e:
# Provide helpful error message for channel key issues
error_msg = str(e)
if "channel key" in error_msg.lower():
raise HTTPException(401, error_msg)
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - ENCODE/DECODE (MULTIPART)
# ============================================================================
@app.post("/encode/multipart")
async def api_encode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"),
reference_photo: UploadFile = File(...),
carrier: UploadFile = File(...),
message: str = Form(""),
payload_file: UploadFile | None = File(None),
pin: str = Form(""),
rsa_key: UploadFile | None = File(None),
rsa_key_qr: UploadFile | None = File(None),
rsa_password: str = Form(""),
# Channel key (v4.0.0)
channel_key: str = Form(
"auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"
),
embed_mode: str = Form("lsb"),
dct_output_format: str = Form("png"),
dct_color_mode: str = Form("grayscale"),
):
"""
Encode using multipart form data (file uploads).
Provide either 'message' (text) or 'payload_file' (binary file).
RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image).
Returns the stego image directly with metadata headers.
v4.0.0: Added channel_key parameter for deployment isolation.
Use 'auto' for server config, 'none' for public mode.
v3.2.0: No date_str parameter needed - encode anytime!
"""
# Validate mode
if embed_mode not in ("lsb", "dct"):
raise HTTPException(400, "embed_mode must be 'lsb' or 'dct'")
if embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Validate DCT options
if dct_output_format not in ("png", "jpeg"):
raise HTTPException(400, "dct_output_format must be 'png' or 'jpeg'")
if dct_color_mode not in ("grayscale", "color"):
raise HTTPException(400, "dct_color_mode must be 'grayscale' or 'color'")
# Resolve channel key (v4.0.0)
# Form data: "auto" = use server config, "none" = public, otherwise explicit key
if channel_key.lower() == "auto":
resolved_channel_key = None # Auto mode
elif channel_key.lower() == "none":
resolved_channel_key = "" # Public mode
else:
resolved_channel_key = _resolve_channel_key(channel_key)
try:
ref_data = await reference_photo.read()
carrier_data = await carrier.read()
# Handle RSA key from .pem file or QR code image
rsa_key_data = None
rsa_key_from_qr = False
if rsa_key and rsa_key.filename:
rsa_key_data = await rsa_key.read()
elif rsa_key_qr and rsa_key_qr.filename:
if not HAS_QR_READ:
raise HTTPException(
501, "QR code reading not available. Install pyzbar and libzbar."
)
qr_image_data = await rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if not key_pem:
raise HTTPException(400, "Could not extract RSA key from QR code image")
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
# QR code keys are never password-protected
effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
# Determine payload
if payload_file and payload_file.filename:
file_data = await payload_file.read()
payload = FilePayload(
data=file_data, filename=payload_file.filename, mime_type=payload_file.content_type
)
elif message:
payload = message
else:
raise HTTPException(400, "Must provide either 'message' or 'payload_file'")
# Get DCT parameters
dct_params = _get_dct_params(embed_mode, dct_output_format, dct_color_mode)
# v4.2.0: Run CPU-bound encode in thread pool
result = await run_in_thread(
encode,
message=payload,
reference_photo=ref_data,
carrier_image=carrier_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=effective_password,
embed_mode=embed_mode,
channel_key=resolved_channel_key,
**dct_params,
)
output_format, color_mode, mime_type = _get_output_info(
embed_mode, dct_output_format, dct_color_mode
)
# Get channel info for headers
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
headers = {
"Content-Disposition": f"attachment; filename={result.filename}",
"X-Stegasoo-Capacity-Percent": f"{result.capacity_percent:.1f}",
"X-Stegasoo-Embed-Mode": embed_mode,
"X-Stegasoo-Output-Format": output_format,
"X-Stegasoo-Color-Mode": color_mode,
"X-Stegasoo-Channel-Mode": channel_mode,
"X-Stegasoo-Version": __version__,
}
if channel_fingerprint:
headers["X-Stegasoo-Channel-Fingerprint"] = channel_fingerprint
return Response(
content=result.stego_image,
media_type=mime_type,
headers=headers,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/decode/multipart", response_model=DecodeResponse)
async def api_decode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"),
reference_photo: UploadFile = File(...),
stego_image: UploadFile = File(...),
pin: str = Form(""),
rsa_key: UploadFile | None = File(None),
rsa_key_qr: UploadFile | None = File(None),
rsa_password: str = Form(""),
# Channel key (v4.0.0)
channel_key: str = Form(
"auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"
),
embed_mode: str = Form("auto"),
):
"""
Decode using multipart form data (file uploads).
RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image).
Returns JSON with payload_type indicating text or file.
v4.0.0: Added channel_key parameter - must match what was used for encoding.
Use 'auto' for server config, 'none' for public mode.
v3.2.0: No date_str parameter needed - decode anytime!
"""
# Validate mode
if embed_mode not in ("auto", "lsb", "dct"):
raise HTTPException(400, "embed_mode must be 'auto', 'lsb', or 'dct'")
if embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key (v4.0.0)
if channel_key.lower() == "auto":
resolved_channel_key = None # Auto mode
elif channel_key.lower() == "none":
resolved_channel_key = "" # Public mode
else:
resolved_channel_key = _resolve_channel_key(channel_key)
try:
ref_data = await reference_photo.read()
stego_data = await stego_image.read()
# Handle RSA key from .pem file or QR code image
rsa_key_data = None
rsa_key_from_qr = False
if rsa_key and rsa_key.filename:
rsa_key_data = await rsa_key.read()
elif rsa_key_qr and rsa_key_qr.filename:
if not HAS_QR_READ:
raise HTTPException(
501, "QR code reading not available. Install pyzbar and libzbar."
)
qr_image_data = await rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if not key_pem:
raise HTTPException(400, "Could not extract RSA key from QR code image")
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
# QR code keys are never password-protected
effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
# v4.2.0: Run CPU-bound decode in thread pool
result = await run_in_thread(
decode,
stego_image=stego_data,
reference_photo=ref_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=effective_password,
embed_mode=embed_mode,
channel_key=resolved_channel_key,
)
if result.is_file:
return DecodeResponse(
payload_type="file",
file_data_base64=base64.b64encode(result.file_data).decode("utf-8"),
filename=result.filename,
mime_type=result.mime_type,
)
else:
return DecodeResponse(payload_type="text", message=result.message)
except DecryptionError as e:
error_msg = str(e)
if "channel key" in error_msg.lower():
raise HTTPException(401, error_msg)
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - IMAGE INFO
# ============================================================================
@app.post("/image/info", response_model=ImageInfoResponse)
async def api_image_info(
_: str = Depends(require_api_key),
image: UploadFile = File(...),
include_modes: bool = Query(True, description="Include capacity by mode (v3.0+)"),
):
"""
Get information about an image's capacity.
Optionally includes capacity for both LSB and DCT modes.
"""
try:
image_data = await image.read()
result = validate_image(image_data, check_size=False)
if not result.is_valid:
raise HTTPException(400, result.error_message)
capacity = calculate_capacity_by_mode(image_data, "lsb")
response = ImageInfoResponse(
width=result.details["width"],
height=result.details["height"],
pixels=result.details["pixels"],
capacity_bytes=capacity,
capacity_kb=capacity // 1024,
)
if include_modes:
comparison = compare_modes(image_data)
response.modes = {
"lsb": ModeCapacity(
capacity_bytes=comparison["lsb"]["capacity_bytes"],
capacity_kb=round(comparison["lsb"]["capacity_kb"], 1),
available=True,
output_format=comparison["lsb"]["output"],
),
"dct": ModeCapacity(
capacity_bytes=comparison["dct"]["capacity_bytes"],
capacity_kb=round(comparison["dct"]["capacity_kb"], 1),
available=comparison["dct"]["available"],
output_format="PNG/JPEG (grayscale or color)",
),
}
return response
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ERROR HANDLERS
# ============================================================================
@app.exception_handler(StegasooError)
async def stegasoo_error_handler(request, exc):
return JSONResponse(status_code=400, content={"error": type(exc).__name__, "detail": str(exc)})
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)