Files
stegasoo/frontends/api/main.py
adlee-was-taken ef5a9ce9cb Add per-channel hybrid audio spread spectrum and env feature toggles
Spread spectrum v2: independent per-channel embedding with round-robin
bit distribution, preserving spatial stereo/surround mix. Adaptive chip
tiers (256/512/1024) trade capacity for lossy codec robustness. LFE
channel skipped for 5.1+ layouts. v2 header (20B) with backward-
compatible v0 decode fallback.

Environment toggles (STEGASOO_AUDIO, STEGASOO_VIDEO) gate audio/video
features for minimal builds (e.g. Raspberry Pi image-only). Values:
auto (default, detect deps), 1/true (force on), 0/false (force off).

Web UI fixes: accordion defaults to step 1 on load, chevron arrow
styling, required attribute toggling for audio carrier type switch,
"Images & Mode" renamed to "Reference, Carrier, Mode".

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 11:58:40 -05:00

2258 lines
75 KiB
Python

#!/usr/bin/env python3
"""
Stegasoo REST API (v4.3.0)
FastAPI-based REST API for steganography operations.
Supports both text messages and file embedding.
CHANGES in v4.3.0:
- Audio steganography endpoints (/audio/*)
- LSB and spread spectrum (DSSS) audio embedding modes
- Audio info and capacity checking
CHANGES in v4.2.1:
- API key authentication (X-API-Key header)
- TLS support with self-signed certificates
- /auth/* endpoints for key management
CHANGES in v4.2.0:
- Async encode/decode operations (run in thread pool)
- Server can handle concurrent requests without blocking
CHANGES in v4.0.0:
- Added channel key support for deployment/group isolation
- New /channel endpoints for key management
- channel_key parameter on encode/decode endpoints
- Messages encoded with channel key require same key to decode
CHANGES in v3.2.0:
- Removed date dependency from all operations
- Renamed day_phrase → passphrase
- No date_str parameters needed
- Simplified API for asynchronous communications
NEW in v3.0: LSB and DCT embedding modes.
NEW in v3.0.1: DCT color mode and JPEG output format.
"""
import asyncio
import base64
import logging
import os
import sys
from functools import partial
from pathlib import Path
from typing import Literal
# Configure logging for API frontend
_log_level = os.environ.get("STEGASOO_LOG_LEVEL", "").strip().upper()
if _log_level and hasattr(logging, _log_level):
logging.basicConfig(
level=getattr(logging, _log_level),
format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
elif os.environ.get("STEGASOO_DEBUG", "").strip() in ("1", "true", "yes"):
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
api_logger = logging.getLogger("stegasoo.api")
from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, UploadFile
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel, Field
# API Key Authentication
try:
from .auth import (
add_api_key,
get_api_key_status,
is_auth_enabled,
list_api_keys,
remove_api_key,
require_api_key,
)
except ImportError:
# When running directly (not as package)
from auth import (
add_api_key,
get_api_key_status,
list_api_keys,
remove_api_key,
require_api_key,
)
# Add parent to path for development
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from stegasoo import (
HAS_AUDIO_SUPPORT,
MAX_FILE_PAYLOAD_SIZE,
CapacityError,
DecryptionError,
FilePayload,
StegasooError,
__version__,
calculate_capacity_by_mode,
clear_channel_key,
compare_modes,
decode,
encode,
generate_channel_key,
generate_credentials,
get_channel_status,
has_argon2,
has_dct_support,
set_channel_key,
validate_channel_key,
validate_image,
will_fit_by_mode,
)
# Audio steganography (v4.3.0) - conditionally imported
if HAS_AUDIO_SUPPORT:
from stegasoo import decode_audio, encode_audio, get_audio_info
from stegasoo.audio_steganography import calculate_audio_lsb_capacity
from stegasoo.spread_steganography import calculate_audio_spread_capacity
from stegasoo.constants import (
DEFAULT_PASSPHRASE_WORDS,
MAX_PASSPHRASE_WORDS,
MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
VALID_RSA_SIZES,
)
# QR Code utilities
try:
from stegasoo.qr_utils import (
extract_key_from_qr,
generate_qr_ascii,
generate_qr_code,
has_qr_read,
has_qr_write,
)
HAS_QR_READ = has_qr_read()
HAS_QR_WRITE = has_qr_write()
except ImportError:
HAS_QR_READ = False
HAS_QR_WRITE = False
extract_key_from_qr = None
generate_qr_code = None
generate_qr_ascii = None
# ============================================================================
# FASTAPI APP
# ============================================================================
app = FastAPI(
title="Stegasoo API",
description="""
Secure steganography with hybrid authentication. Supports text messages and file embedding.
## Version 4.0.0 Changes
- **Channel key support** - Deployment/group isolation for messages
- **New /channel endpoints** - Generate, view, and manage channel keys
- **channel_key parameter** - Added to encode/decode endpoints
## Version 3.2.0 Changes
- **No date parameters needed** - Encode and decode anytime without tracking dates
- **Single passphrase** - No daily rotation, just use your passphrase
- **True asynchronous communications** - Perfect for dead drops and delayed delivery
## Embedding Modes (v3.0)
- **LSB mode** (default): Spatial LSB embedding, full color output, higher capacity
- **DCT mode**: Frequency domain embedding, ~20% capacity, better stealth
## DCT Options (v3.0.1)
- **dct_color_mode**: 'grayscale' (default) or 'color' (preserves original colors)
- **dct_output_format**: 'png' (lossless) or 'jpeg' (smaller, more natural)
Use the `/modes` endpoint to check availability and `/compare` to compare capacities.
""",
version=__version__,
docs_url="/docs",
redoc_url="/redoc",
)
# ============================================================================
# TYPE ALIASES
# ============================================================================
EmbedModeType = Literal["lsb", "dct"]
ExtractModeType = Literal["auto", "lsb", "dct"]
DctColorModeType = Literal["grayscale", "color"]
DctOutputFormatType = Literal["png", "jpeg"]
AudioEmbedModeType = Literal["audio_lsb", "audio_spread"]
AudioExtractModeType = Literal["audio_auto", "audio_lsb", "audio_spread"]
# ============================================================================
# MODELS
# ============================================================================
class GenerateRequest(BaseModel):
use_pin: bool = True
use_rsa: bool = False
pin_length: int = Field(default=6, ge=MIN_PIN_LENGTH, le=MAX_PIN_LENGTH)
rsa_bits: int = Field(default=2048)
words_per_passphrase: int = Field(
default=DEFAULT_PASSPHRASE_WORDS,
ge=MIN_PASSPHRASE_WORDS,
le=MAX_PASSPHRASE_WORDS,
description="Words per passphrase (v3.2.0: default increased to 4)",
)
class GenerateResponse(BaseModel):
passphrase: str = Field(description="Single passphrase (v3.2.0: no daily rotation)")
pin: str | None = None
rsa_key_pem: str | None = None
entropy: dict[str, int]
# Legacy field for compatibility
phrases: dict[str, str] | None = Field(
default=None, description="Deprecated: Use 'passphrase' instead"
)
class EncodeRequest(BaseModel):
message: str
reference_photo_base64: str
carrier_image_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key",
)
embed_mode: EmbedModeType = Field(
default="lsb",
description="Embedding mode: 'lsb' (default, color) or 'dct' (requires scipy)",
)
dct_output_format: DctOutputFormatType = Field(
default="png",
description="DCT output format: 'png' (lossless) or 'jpeg' (smaller). Only applies to DCT mode.",
)
dct_color_mode: DctColorModeType = Field(
default="grayscale",
description="DCT color mode: 'grayscale' (default) or 'color' (preserves colors). Only applies to DCT mode.",
)
class EncodeFileRequest(BaseModel):
"""Request for embedding a file (base64-encoded)."""
file_data_base64: str
filename: str
mime_type: str | None = None
reference_photo_base64: str
carrier_image_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key",
)
embed_mode: EmbedModeType = Field(
default="lsb",
description="Embedding mode: 'lsb' (default, color) or 'dct' (requires scipy)",
)
dct_output_format: DctOutputFormatType = Field(
default="png",
description="DCT output format: 'png' (lossless) or 'jpeg' (smaller). Only applies to DCT mode.",
)
dct_color_mode: DctColorModeType = Field(
default="grayscale",
description="DCT color mode: 'grayscale' (default) or 'color' (preserves colors). Only applies to DCT mode.",
)
class EncodeResponse(BaseModel):
stego_image_base64: str
filename: str
capacity_used_percent: float
embed_mode: str = Field(description="Embedding mode used: 'lsb' or 'dct'")
output_format: str = Field(
default="png", description="Output format: 'png' or 'jpeg' (for DCT mode)"
)
color_mode: str = Field(
default="color",
description="Color mode: 'color' (LSB/DCT color) or 'grayscale' (DCT grayscale)",
)
# Channel key info (v4.0.0)
channel_mode: str = Field(default="public", description="Channel mode: 'public' or 'private'")
channel_fingerprint: str | None = Field(
default=None, description="Channel key fingerprint (if private mode)"
)
# Legacy fields (v3.2.0: no longer used in crypto)
date_used: str | None = Field(
default=None, description="Deprecated: Date no longer used in v3.2.0"
)
day_of_week: str | None = Field(
default=None, description="Deprecated: Date no longer used in v3.2.0"
)
class DecodeRequest(BaseModel):
stego_image_base64: str
reference_photo_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: str | None = Field(
default=None,
description="Channel key for decryption. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key",
)
embed_mode: ExtractModeType = Field(
default="auto", description="Extraction mode: 'auto' (default), 'lsb', or 'dct'"
)
class DecodeResponse(BaseModel):
"""Response for decode - can be text or file."""
payload_type: str # 'text' or 'file'
message: str | None = None # For text
file_data_base64: str | None = None # For file (base64-encoded)
filename: str | None = None # For file
mime_type: str | None = None # For file
class ModeCapacity(BaseModel):
"""Capacity info for a single mode."""
capacity_bytes: int
capacity_kb: float
available: bool
output_format: str
class ImageInfoResponse(BaseModel):
width: int
height: int
pixels: int
capacity_bytes: int = Field(description="LSB mode capacity (for backwards compatibility)")
capacity_kb: int = Field(description="LSB mode capacity in KB")
modes: dict[str, ModeCapacity] | None = Field(
default=None, description="Capacity by embedding mode (v3.0+)"
)
class CompareModesRequest(BaseModel):
"""Request for comparing embedding modes."""
carrier_image_base64: str
payload_size: int | None = Field(
default=None, description="Optional payload size to check if it fits"
)
class CompareModesResponse(BaseModel):
"""Response comparing LSB and DCT modes."""
width: int
height: int
lsb: dict
dct: dict
payload_check: dict | None = None
recommendation: str
class DctModeInfo(BaseModel):
"""Detailed DCT mode information."""
available: bool
name: str
description: str
output_formats: list[str]
color_modes: list[str]
capacity_ratio: str
requires: str
class ChannelStatusResponse(BaseModel):
"""Response for channel key status (v4.0.0)."""
mode: str = Field(description="'public' or 'private'")
configured: bool = Field(description="Whether a channel key is configured")
fingerprint: str | None = Field(default=None, description="Key fingerprint (partial)")
source: str | None = Field(default=None, description="Where the key comes from")
key: str | None = Field(default=None, description="Full key (only if reveal=true)")
class ChannelGenerateResponse(BaseModel):
"""Response for channel key generation (v4.0.0)."""
key: str = Field(description="Generated channel key")
fingerprint: str = Field(description="Key fingerprint")
saved: bool = Field(default=False, description="Whether key was saved to config")
save_location: str | None = Field(default=None, description="Where key was saved")
class ChannelSetRequest(BaseModel):
"""Request to set channel key (v4.0.0)."""
key: str = Field(description="Channel key to set")
location: str = Field(default="user", description="'user' or 'project'")
class AuthStatusResponse(BaseModel):
"""Response for API key authentication status."""
enabled: bool = Field(description="Whether API key auth is enabled")
total_keys: int = Field(description="Total number of configured API keys")
user_keys: int = Field(description="Keys in user config")
project_keys: int = Field(description="Keys in project config")
env_configured: bool = Field(description="Whether env var key is set")
class AuthKeyInfo(BaseModel):
"""Info about a single API key (not the actual key)."""
name: str
created: str
class ModesResponse(BaseModel):
"""Response showing available embedding modes."""
lsb: dict
dct: DctModeInfo
audio: dict | None = Field(default=None, description="Audio steganography modes (v4.3.0)")
# Channel key status (v4.0.0)
channel: dict | None = Field(default=None, description="Channel key status (v4.0.0)")
class StatusResponse(BaseModel):
version: str
has_argon2: bool
has_qrcode_read: bool
has_qrcode_write: bool # v4.2.0: QR generation capability
has_dct: bool
has_audio: bool = Field(default=False, description="Audio steganography support (v4.3.0)")
max_payload_kb: int
available_modes: list[str]
dct_features: dict | None = Field(default=None, description="DCT mode features (v3.0.1+)")
# Channel key status (v4.0.0)
channel: dict | None = Field(default=None, description="Channel key status (v4.0.0)")
breaking_changes: dict = Field(description="v4.0.0 breaking changes")
class QrExtractResponse(BaseModel):
success: bool
key_pem: str | None = None
error: str | None = None
class QrGenerateRequest(BaseModel):
"""Request to generate QR code from RSA key."""
key_pem: str = Field(..., description="RSA private key in PEM format")
output_format: str = Field(
default="png",
description="Output format: 'png', 'jpg', or 'ascii'",
)
compress: bool = Field(
default=True,
description="Compress key data with zstd (recommended for larger keys)",
)
class QrGenerateResponse(BaseModel):
"""Response containing generated QR code."""
success: bool
format: str | None = None
qr_data: str | None = Field(
default=None,
description="Base64-encoded image data (for png/jpg) or ASCII string",
)
error: str | None = None
class WillFitRequest(BaseModel):
"""Request to check if payload will fit."""
carrier_image_base64: str
payload_size: int
embed_mode: EmbedModeType = "lsb"
class WillFitResponse(BaseModel):
"""Response for will_fit check."""
fits: bool
payload_size: int
capacity: int
usage_percent: float
headroom: int
mode: str
class ErrorResponse(BaseModel):
error: str
detail: str | None = None
# --- Audio models (v4.3.0) ---
class AudioEncodeRequest(BaseModel):
"""Request to encode a text message into audio."""
message: str
reference_photo_base64: str
carrier_audio_base64: str
passphrase: str = Field(description="Passphrase for key derivation")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto, ''=public, 'XXXX-...'=explicit",
)
embed_mode: AudioEmbedModeType = Field(
default="audio_lsb",
description="Embedding mode: 'audio_lsb' (default) or 'audio_spread' (DSSS)",
)
chip_tier: int | None = Field(
default=None,
description="Spread spectrum chip tier: 0=lossless(256), 1=high_lossy(512), 2=low_lossy(1024). Only for audio_spread.",
)
class AudioEncodeFileRequest(BaseModel):
"""Request to encode a file into audio."""
file_data_base64: str
filename: str
mime_type: str | None = None
reference_photo_base64: str
carrier_audio_base64: str
passphrase: str = Field(description="Passphrase for key derivation")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto, ''=public, 'XXXX-...'=explicit",
)
embed_mode: AudioEmbedModeType = Field(
default="audio_lsb",
description="Embedding mode: 'audio_lsb' (default) or 'audio_spread' (DSSS)",
)
chip_tier: int | None = Field(
default=None,
description="Spread spectrum chip tier: 0=lossless(256), 1=high_lossy(512), 2=low_lossy(1024). Only for audio_spread.",
)
class AudioEncodeResponse(BaseModel):
"""Response from audio encode operations."""
stego_audio_base64: str
embed_mode: str = Field(description="Embedding mode used: 'audio_lsb' or 'audio_spread'")
stats: dict = Field(description="Embedding statistics (samples_modified, capacity_used, etc.)")
channel_mode: str = Field(default="public", description="Channel mode: 'public' or 'private'")
channel_fingerprint: str | None = Field(
default=None, description="Channel key fingerprint (if private mode)"
)
class AudioDecodeRequest(BaseModel):
"""Request to decode a message or file from stego audio."""
stego_audio_base64: str
reference_photo_base64: str
passphrase: str = Field(description="Passphrase for key derivation")
pin: str = ""
rsa_key_base64: str | None = None
rsa_password: str | None = None
channel_key: str | None = Field(
default=None,
description="Channel key for decryption. null=auto, ''=public, 'XXXX-...'=explicit",
)
embed_mode: AudioExtractModeType = Field(
default="audio_auto",
description="Extraction mode: 'audio_auto' (default), 'audio_lsb', or 'audio_spread'",
)
class AudioInfoResponse(BaseModel):
"""Response with audio file metadata and capacity info."""
sample_rate: int
channels: int
duration_seconds: float
num_samples: int
format: str
bit_depth: int | None = None
bitrate: int | None = None
capacity_lsb: int = Field(description="LSB mode capacity in bytes")
capacity_spread: int = Field(description="Spread spectrum mode capacity in bytes")
class AudioCapacityRequest(BaseModel):
"""Request to check if a payload fits in audio carrier."""
carrier_audio_base64: str
payload_size: int = Field(ge=1, description="Payload size in bytes")
embed_mode: AudioEmbedModeType = Field(
default="audio_lsb", description="Embedding mode to check capacity for"
)
class AudioCapacityResponse(BaseModel):
"""Response for audio capacity check."""
fits: bool
payload_size: int
capacity_bytes: int
usage_percent: float
embed_mode: str
# ============================================================================
# HELPER: RESOLVE CHANNEL KEY
# ============================================================================
def _resolve_channel_key(channel_key: str | None) -> str | None:
"""
Resolve channel key from API parameter.
Wrapper around library's resolve_channel_key with HTTP exception handling.
Returns:
Resolved channel key to pass to encode/decode
Raises:
HTTPException: If key format is invalid
"""
from stegasoo.channel import resolve_channel_key
try:
return resolve_channel_key(channel_key)
except (ValueError, FileNotFoundError) as e:
raise HTTPException(400, str(e))
def _get_channel_info(channel_key: str | None) -> tuple[str, str | None]:
"""
Get channel mode and fingerprint for response.
Uses library's get_channel_response_info for consistent formatting.
Returns:
(mode, fingerprint) tuple
"""
from stegasoo.channel import get_channel_response_info
info = get_channel_response_info(channel_key)
return info["mode"], info.get("fingerprint")
# ============================================================================
# HELPER: ASYNC EXECUTION
# ============================================================================
async def run_in_thread(func, *args, **kwargs):
"""
Run a CPU-bound function in a thread pool.
This allows the FastAPI server to handle other requests while
encode/decode operations are running. Essential for Pi deployments
where operations can take several seconds.
Usage:
result = await run_in_thread(encode, message=msg, carrier_image=carrier, ...)
"""
if kwargs:
func = partial(func, **kwargs)
return await asyncio.to_thread(func, *args)
# ============================================================================
# ROUTES - STATUS & INFO
# ============================================================================
@app.get("/", response_model=StatusResponse)
async def root():
"""Get API status and configuration."""
available_modes = ["lsb"]
dct_features = None
if has_dct_support():
available_modes.append("dct")
dct_features = {
"output_formats": ["png", "jpeg"],
"color_modes": ["grayscale", "color"],
"default_output_format": "png",
"default_color_mode": "grayscale",
}
# Channel key status (v4.0.0)
channel_status = get_channel_status()
channel_info = {
"mode": channel_status["mode"],
"configured": channel_status["configured"],
"fingerprint": channel_status.get("fingerprint"),
"source": channel_status.get("source"),
}
# Audio modes (v4.3.0)
if HAS_AUDIO_SUPPORT:
available_modes.append("audio_lsb")
available_modes.append("audio_spread")
return StatusResponse(
version=__version__,
has_argon2=has_argon2(),
has_qrcode_read=HAS_QR_READ,
has_qrcode_write=HAS_QR_WRITE,
has_dct=has_dct_support(),
has_audio=HAS_AUDIO_SUPPORT,
max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024,
available_modes=available_modes,
dct_features=dct_features,
channel=channel_info,
breaking_changes={
"v4_channel_key": "Messages encoded with channel key require same key to decode",
"format_version": 5,
"backward_compatible": False,
"v3_notes": {
"date_removed": "No date_str parameter needed - encode/decode anytime",
"passphrase_renamed": "day_phrase → passphrase (single passphrase, no daily rotation)",
},
},
)
@app.get("/modes", response_model=ModesResponse)
async def api_modes():
"""
Get available embedding modes and their status.
v4.0.0: Also includes channel key status.
"""
# Channel status
channel_status = get_channel_status()
channel_info = {
"mode": channel_status["mode"],
"configured": channel_status["configured"],
"fingerprint": channel_status.get("fingerprint"),
}
# Audio modes (v4.3.0)
audio_info = None
if HAS_AUDIO_SUPPORT:
audio_info = {
"available": True,
"modes": {
"audio_lsb": {
"name": "Audio LSB",
"description": "Embed in audio sample LSBs, high capacity",
"output_format": "WAV",
},
"audio_spread": {
"name": "Spread Spectrum (DSSS)",
"description": "Direct-sequence spread spectrum with Reed-Solomon ECC, better stealth",
"output_format": "WAV",
},
},
"supported_formats": ["WAV", "FLAC", "MP3", "OGG", "AAC", "M4A"],
"output_format": "WAV",
"requires": "soundfile",
}
return ModesResponse(
lsb={
"available": True,
"name": "Spatial LSB",
"description": "Embed in pixel LSBs, outputs PNG/BMP",
"output_format": "PNG (color)",
"capacity_ratio": "100%",
},
dct=DctModeInfo(
available=has_dct_support(),
name="DCT Domain",
description="Embed in DCT coefficients, frequency domain steganography",
output_formats=["png", "jpeg"],
color_modes=["grayscale", "color"],
capacity_ratio="~20% of LSB",
requires="scipy",
),
audio=audio_info,
channel=channel_info,
)
# ============================================================================
# ROUTES - CHANNEL KEY (v4.0.0)
# ============================================================================
@app.get("/channel/status", response_model=ChannelStatusResponse)
async def api_channel_status(
reveal: bool = Query(False, description="Include full key in response")
):
"""
Get current channel key status.
v4.0.0: New endpoint for channel key management.
Returns mode (public/private), fingerprint, and source.
Use reveal=true to include the full key.
"""
status = get_channel_status()
return ChannelStatusResponse(
mode=status["mode"],
configured=status["configured"],
fingerprint=status.get("fingerprint"),
source=status.get("source"),
key=status.get("key") if reveal and status["configured"] else None,
)
@app.post("/channel/generate", response_model=ChannelGenerateResponse)
async def api_channel_generate(
_: str = Depends(require_api_key),
save: bool = Query(False, description="Save to user config"),
save_project: bool = Query(False, description="Save to project config"),
):
"""
Generate a new channel key.
v4.0.0: New endpoint for channel key management.
Optionally saves to user config (~/.stegasoo/channel.key) or
project config (./config/channel.key).
"""
if save and save_project:
raise HTTPException(400, "Cannot use both save and save_project")
key = generate_channel_key()
fingerprint = f"{key[:4]}-••••-••••-••••-••••-••••-••••-{key[-4:]}"
saved = False
save_location = None
if save:
set_channel_key(key, location="user")
saved = True
save_location = "~/.stegasoo/channel.key"
elif save_project:
set_channel_key(key, location="project")
saved = True
save_location = "./config/channel.key"
return ChannelGenerateResponse(
key=key,
fingerprint=fingerprint,
saved=saved,
save_location=save_location,
)
@app.post("/channel/set")
async def api_channel_set(request: ChannelSetRequest, _: str = Depends(require_api_key)):
"""
Set/save a channel key to config.
v4.0.0: New endpoint for channel key management.
"""
if not validate_channel_key(request.key):
raise HTTPException(
400, "Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
)
if request.location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
set_channel_key(request.key, location=request.location)
status = get_channel_status()
return {
"success": True,
"location": status.get("source"),
"fingerprint": status.get("fingerprint"),
}
@app.delete("/channel")
async def api_channel_clear(
_: str = Depends(require_api_key),
location: str = Query("user", description="'user', 'project', or 'all'"),
):
"""
Clear/remove channel key from config.
v4.0.0: New endpoint for channel key management.
Note: Does not affect environment variables.
"""
if location == "all":
clear_channel_key(location="user")
clear_channel_key(location="project")
elif location in ("user", "project"):
clear_channel_key(location=location)
else:
raise HTTPException(400, "location must be 'user', 'project', or 'all'")
status = get_channel_status()
return {
"success": True,
"mode": status["mode"],
"still_configured": status["configured"],
"remaining_source": status.get("source"),
}
# ============================================================================
# ROUTES - AUTHENTICATION (v4.2.1)
# ============================================================================
@app.get("/auth/status", response_model=AuthStatusResponse)
async def api_auth_status():
"""
Get API key authentication status.
v4.2.1: New endpoint for auth status.
Returns whether auth is enabled and key counts.
"""
status = get_api_key_status()
return AuthStatusResponse(
enabled=status["enabled"],
total_keys=status["total_keys"],
user_keys=status["user_keys"],
project_keys=status["project_keys"],
env_configured=status["env_configured"],
)
@app.get("/auth/keys", response_model=list[AuthKeyInfo])
async def api_auth_list_keys(
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
List configured API keys (names only, not actual keys).
v4.2.1: New endpoint for auth management.
Requires authentication.
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
keys = list_api_keys(location)
return [AuthKeyInfo(name=k["name"], created=k["created"]) for k in keys]
@app.post("/auth/keys")
async def api_auth_create_key(
name: str = Query(..., description="Name for the new API key"),
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
Create a new API key.
v4.2.1: New endpoint for auth management.
Returns the key ONCE - it cannot be retrieved again!
Requires authentication (or no keys configured yet).
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
try:
key = add_api_key(name, location)
return {
"success": True,
"name": name,
"key": key,
"warning": "Save this key now! It cannot be retrieved again.",
}
except ValueError as e:
raise HTTPException(400, str(e))
@app.delete("/auth/keys")
async def api_auth_delete_key(
name: str = Query(..., description="Name of key to delete"),
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
Delete an API key by name.
v4.2.1: New endpoint for auth management.
Requires authentication.
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
if remove_api_key(name, location):
return {"success": True, "deleted": name}
else:
raise HTTPException(404, f"Key '{name}' not found in {location} config")
@app.post("/compare", response_model=CompareModesResponse)
async def api_compare_modes(request: CompareModesRequest, _: str = Depends(require_api_key)):
"""
Compare LSB and DCT embedding modes for a carrier image.
Returns capacity for both modes and recommendation.
Optionally checks if a specific payload size would fit.
"""
try:
carrier = base64.b64decode(request.carrier_image_base64)
comparison = compare_modes(carrier)
response = CompareModesResponse(
width=comparison["width"],
height=comparison["height"],
lsb={
"capacity_bytes": comparison["lsb"]["capacity_bytes"],
"capacity_kb": round(comparison["lsb"]["capacity_kb"], 1),
"available": True,
"output_format": comparison["lsb"]["output"],
},
dct={
"capacity_bytes": comparison["dct"]["capacity_bytes"],
"capacity_kb": round(comparison["dct"]["capacity_kb"], 1),
"available": comparison["dct"]["available"],
"output_formats": ["png", "jpeg"],
"color_modes": ["grayscale", "color"],
"ratio_vs_lsb_percent": round(comparison["dct"]["ratio_vs_lsb"], 1),
},
recommendation=(
"lsb" if not comparison["dct"]["available"] else "dct for stealth, lsb for capacity"
),
)
if request.payload_size:
fits_lsb = request.payload_size <= comparison["lsb"]["capacity_bytes"]
fits_dct = request.payload_size <= comparison["dct"]["capacity_bytes"]
response.payload_check = {
"size_bytes": request.payload_size,
"fits_lsb": fits_lsb,
"fits_dct": fits_dct,
}
# Update recommendation based on payload
if fits_dct and comparison["dct"]["available"]:
response.recommendation = "dct (payload fits, better stealth)"
elif fits_lsb:
response.recommendation = "lsb (payload too large for dct)"
else:
response.recommendation = "none (payload too large for both modes)"
return response
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/will-fit", response_model=WillFitResponse)
async def api_will_fit(request: WillFitRequest, _: str = Depends(require_api_key)):
"""
Check if a payload of given size will fit in the carrier image.
Supports both LSB and DCT modes.
"""
try:
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
carrier = base64.b64decode(request.carrier_image_base64)
result = will_fit_by_mode(request.payload_size, carrier, embed_mode=request.embed_mode)
return WillFitResponse(
fits=result["fits"],
payload_size=result["payload_size"],
capacity=result["capacity"],
usage_percent=round(result["usage_percent"], 1),
headroom=result["headroom"],
mode=request.embed_mode,
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - QR CODE
# ============================================================================
@app.post("/extract-key-from-qr", response_model=QrExtractResponse)
async def api_extract_key_from_qr(
_: str = Depends(require_api_key),
qr_image: UploadFile = File(..., description="QR code image containing RSA key"),
):
"""
Extract RSA key from a QR code image.
Supports both compressed (STEGASOO-Z: prefix) and uncompressed keys.
Returns the PEM-encoded key if found.
"""
if not HAS_QR_READ:
raise HTTPException(501, "QR code reading not available. Install pyzbar and libzbar.")
try:
image_data = await qr_image.read()
key_pem = extract_key_from_qr(image_data)
if key_pem:
return QrExtractResponse(success=True, key_pem=key_pem)
else:
return QrExtractResponse(success=False, error="No valid RSA key found in QR code")
except Exception as e:
return QrExtractResponse(success=False, error=str(e))
@app.post("/generate-key-qr", response_model=QrGenerateResponse)
async def api_generate_key_qr(request: QrGenerateRequest, _: str = Depends(require_api_key)):
"""
Generate QR code from an RSA private key.
Supports PNG, JPG, and ASCII output formats.
Uses zstd compression by default for better QR code density.
"""
if not HAS_QR_WRITE:
raise HTTPException(501, "QR code generation not available. Install qrcode library.")
try:
fmt = request.output_format.lower()
if fmt == "ascii":
ascii_qr = generate_qr_ascii(
request.key_pem,
compress=request.compress,
invert=False,
)
return QrGenerateResponse(success=True, format="ascii", qr_data=ascii_qr)
elif fmt in ("png", "jpg", "jpeg"):
import base64
qr_bytes = generate_qr_code(
request.key_pem,
compress=request.compress,
output_format=fmt,
)
qr_b64 = base64.b64encode(qr_bytes).decode("ascii")
return QrGenerateResponse(success=True, format=fmt, qr_data=qr_b64)
else:
return QrGenerateResponse(
success=False,
error=f"Unsupported format: {fmt}. Use 'png', 'jpg', or 'ascii'",
)
except ValueError as e:
return QrGenerateResponse(success=False, error=str(e))
except Exception as e:
return QrGenerateResponse(success=False, error=f"QR generation failed: {e}")
# ============================================================================
# ROUTES - GENERATE
# ============================================================================
@app.post("/generate", response_model=GenerateResponse)
async def api_generate(request: GenerateRequest, _: str = Depends(require_api_key)):
"""
Generate credentials for encoding/decoding.
At least one of use_pin or use_rsa must be True.
v3.2.0: Generates single passphrase (no daily rotation).
Default increased to 4 words for better security.
"""
if not request.use_pin and not request.use_rsa:
raise HTTPException(400, "Must enable at least one of use_pin or use_rsa")
if request.rsa_bits not in VALID_RSA_SIZES:
raise HTTPException(400, f"rsa_bits must be one of {VALID_RSA_SIZES}")
try:
creds = generate_credentials(
use_pin=request.use_pin,
use_rsa=request.use_rsa,
pin_length=request.pin_length,
rsa_bits=request.rsa_bits,
passphrase_words=request.words_per_passphrase,
)
return GenerateResponse(
passphrase=creds.passphrase,
pin=creds.pin,
rsa_key_pem=creds.rsa_key_pem,
entropy={
"passphrase": creds.passphrase_entropy,
"pin": creds.pin_entropy,
"rsa": creds.rsa_entropy,
"total": creds.total_entropy,
},
phrases=None, # Legacy field removed
)
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# HELPER FUNCTION FOR DCT PARAMETERS
# ============================================================================
def _get_dct_params(embed_mode: str, dct_output_format: str, dct_color_mode: str) -> dict:
"""
Get DCT-specific parameters if DCT mode is selected.
Returns kwargs to pass to encode().
"""
if embed_mode != "dct":
return {}
return {
"dct_output_format": dct_output_format,
"dct_color_mode": dct_color_mode,
}
def _get_output_info(embed_mode: str, dct_output_format: str, dct_color_mode: str) -> tuple:
"""
Get output format and color mode strings for response.
Returns (output_format, color_mode, mime_type).
"""
if embed_mode == "dct":
output_format = dct_output_format
color_mode = dct_color_mode
mime_type = "image/jpeg" if dct_output_format == "jpeg" else "image/png"
else:
output_format = "png"
color_mode = "color"
mime_type = "image/png"
return output_format, color_mode, mime_type
# ============================================================================
# ROUTES - ENCODE (JSON)
# ============================================================================
@app.post("/encode", response_model=EncodeResponse)
async def api_encode(request: EncodeRequest, _: str = Depends(require_api_key)):
"""
Encode a text message into an image.
Images must be base64-encoded. Returns base64-encoded stego image.
v4.0.0: Added channel_key parameter for deployment isolation.
v3.2.0: No date_str parameter needed - encode anytime!
"""
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
ref_photo = base64.b64decode(request.reference_photo_base64)
carrier = base64.b64decode(request.carrier_image_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
# Get DCT parameters
dct_params = _get_dct_params(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# v4.2.0: Run CPU-bound encode in thread pool
result = await run_in_thread(
encode,
message=request.message,
reference_photo=ref_photo,
carrier_image=carrier,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
**dct_params,
)
stego_b64 = base64.b64encode(result.stego_image).decode("utf-8")
output_format, color_mode, _ = _get_output_info(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# Get channel info for response
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return EncodeResponse(
stego_image_base64=stego_b64,
filename=result.filename,
capacity_used_percent=result.capacity_percent,
embed_mode=request.embed_mode,
output_format=output_format,
color_mode=color_mode,
channel_mode=channel_mode,
channel_fingerprint=channel_fingerprint,
date_used=None,
day_of_week=None,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/encode/file", response_model=EncodeResponse)
async def api_encode_file(request: EncodeFileRequest, _: str = Depends(require_api_key)):
"""
Encode a file into an image (JSON with base64).
File data must be base64-encoded.
v4.0.0: Added channel_key parameter for deployment isolation.
v3.2.0: No date_str parameter needed - encode anytime!
"""
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
file_data = base64.b64decode(request.file_data_base64)
ref_photo = base64.b64decode(request.reference_photo_base64)
carrier = base64.b64decode(request.carrier_image_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
payload = FilePayload(
data=file_data, filename=request.filename, mime_type=request.mime_type
)
# Get DCT parameters
dct_params = _get_dct_params(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# v4.2.0: Run CPU-bound encode in thread pool
result = await run_in_thread(
encode,
message=payload,
reference_photo=ref_photo,
carrier_image=carrier,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
**dct_params,
)
stego_b64 = base64.b64encode(result.stego_image).decode("utf-8")
output_format, color_mode, _ = _get_output_info(
request.embed_mode, request.dct_output_format, request.dct_color_mode
)
# Get channel info for response
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return EncodeResponse(
stego_image_base64=stego_b64,
filename=result.filename,
capacity_used_percent=result.capacity_percent,
embed_mode=request.embed_mode,
output_format=output_format,
color_mode=color_mode,
channel_mode=channel_mode,
channel_fingerprint=channel_fingerprint,
date_used=None,
day_of_week=None,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - DECODE (JSON)
# ============================================================================
@app.post("/decode", response_model=DecodeResponse)
async def api_decode(request: DecodeRequest, _: str = Depends(require_api_key)):
"""
Decode a message or file from a stego image.
Returns payload_type to indicate if result is text or file.
v4.0.0: Added channel_key parameter - must match encoding key.
v3.2.0: No date_str parameter needed - decode anytime!
"""
# Validate mode
if request.embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
stego = base64.b64decode(request.stego_image_base64)
ref_photo = base64.b64decode(request.reference_photo_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
# v4.2.0: Run CPU-bound decode in thread pool
result = await run_in_thread(
decode,
stego_image=stego,
reference_photo=ref_photo,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
)
if result.is_file:
return DecodeResponse(
payload_type="file",
file_data_base64=base64.b64encode(result.file_data).decode("utf-8"),
filename=result.filename,
mime_type=result.mime_type,
)
else:
return DecodeResponse(payload_type="text", message=result.message)
except DecryptionError as e:
# Provide helpful error message for channel key issues
error_msg = str(e)
if "channel key" in error_msg.lower():
raise HTTPException(401, error_msg)
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - ENCODE/DECODE (MULTIPART)
# ============================================================================
@app.post("/encode/multipart")
async def api_encode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"),
reference_photo: UploadFile = File(...),
carrier: UploadFile = File(...),
message: str = Form(""),
payload_file: UploadFile | None = File(None),
pin: str = Form(""),
rsa_key: UploadFile | None = File(None),
rsa_key_qr: UploadFile | None = File(None),
rsa_password: str = Form(""),
# Channel key (v4.0.0)
channel_key: str = Form(
"auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"
),
embed_mode: str = Form("lsb"),
dct_output_format: str = Form("png"),
dct_color_mode: str = Form("grayscale"),
):
"""
Encode using multipart form data (file uploads).
Provide either 'message' (text) or 'payload_file' (binary file).
RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image).
Returns the stego image directly with metadata headers.
v4.0.0: Added channel_key parameter for deployment isolation.
Use 'auto' for server config, 'none' for public mode.
v3.2.0: No date_str parameter needed - encode anytime!
"""
# Validate mode
if embed_mode not in ("lsb", "dct"):
raise HTTPException(400, "embed_mode must be 'lsb' or 'dct'")
if embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Validate DCT options
if dct_output_format not in ("png", "jpeg"):
raise HTTPException(400, "dct_output_format must be 'png' or 'jpeg'")
if dct_color_mode not in ("grayscale", "color"):
raise HTTPException(400, "dct_color_mode must be 'grayscale' or 'color'")
# Resolve channel key (v4.0.0)
# Form data: "auto" = use server config, "none" = public, otherwise explicit key
if channel_key.lower() == "auto":
resolved_channel_key = None # Auto mode
elif channel_key.lower() == "none":
resolved_channel_key = "" # Public mode
else:
resolved_channel_key = _resolve_channel_key(channel_key)
try:
ref_data = await reference_photo.read()
carrier_data = await carrier.read()
# Handle RSA key from .pem file or QR code image
rsa_key_data = None
rsa_key_from_qr = False
if rsa_key and rsa_key.filename:
rsa_key_data = await rsa_key.read()
elif rsa_key_qr and rsa_key_qr.filename:
if not HAS_QR_READ:
raise HTTPException(
501, "QR code reading not available. Install pyzbar and libzbar."
)
qr_image_data = await rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if not key_pem:
raise HTTPException(400, "Could not extract RSA key from QR code image")
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
# QR code keys are never password-protected
effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
# Determine payload
if payload_file and payload_file.filename:
file_data = await payload_file.read()
payload = FilePayload(
data=file_data, filename=payload_file.filename, mime_type=payload_file.content_type
)
elif message:
payload = message
else:
raise HTTPException(400, "Must provide either 'message' or 'payload_file'")
# Get DCT parameters
dct_params = _get_dct_params(embed_mode, dct_output_format, dct_color_mode)
# v4.2.0: Run CPU-bound encode in thread pool
result = await run_in_thread(
encode,
message=payload,
reference_photo=ref_data,
carrier_image=carrier_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=effective_password,
embed_mode=embed_mode,
channel_key=resolved_channel_key,
**dct_params,
)
output_format, color_mode, mime_type = _get_output_info(
embed_mode, dct_output_format, dct_color_mode
)
# Get channel info for headers
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
headers = {
"Content-Disposition": f"attachment; filename={result.filename}",
"X-Stegasoo-Capacity-Percent": f"{result.capacity_percent:.1f}",
"X-Stegasoo-Embed-Mode": embed_mode,
"X-Stegasoo-Output-Format": output_format,
"X-Stegasoo-Color-Mode": color_mode,
"X-Stegasoo-Channel-Mode": channel_mode,
"X-Stegasoo-Version": __version__,
}
if channel_fingerprint:
headers["X-Stegasoo-Channel-Fingerprint"] = channel_fingerprint
return Response(
content=result.stego_image,
media_type=mime_type,
headers=headers,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/decode/multipart", response_model=DecodeResponse)
async def api_decode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"),
reference_photo: UploadFile = File(...),
stego_image: UploadFile = File(...),
pin: str = Form(""),
rsa_key: UploadFile | None = File(None),
rsa_key_qr: UploadFile | None = File(None),
rsa_password: str = Form(""),
# Channel key (v4.0.0)
channel_key: str = Form(
"auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"
),
embed_mode: str = Form("auto"),
):
"""
Decode using multipart form data (file uploads).
RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image).
Returns JSON with payload_type indicating text or file.
v4.0.0: Added channel_key parameter - must match what was used for encoding.
Use 'auto' for server config, 'none' for public mode.
v3.2.0: No date_str parameter needed - decode anytime!
"""
# Validate mode
if embed_mode not in ("auto", "lsb", "dct"):
raise HTTPException(400, "embed_mode must be 'auto', 'lsb', or 'dct'")
if embed_mode == "dct" and not has_dct_support():
raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy")
# Resolve channel key (v4.0.0)
if channel_key.lower() == "auto":
resolved_channel_key = None # Auto mode
elif channel_key.lower() == "none":
resolved_channel_key = "" # Public mode
else:
resolved_channel_key = _resolve_channel_key(channel_key)
try:
ref_data = await reference_photo.read()
stego_data = await stego_image.read()
# Handle RSA key from .pem file or QR code image
rsa_key_data = None
rsa_key_from_qr = False
if rsa_key and rsa_key.filename:
rsa_key_data = await rsa_key.read()
elif rsa_key_qr and rsa_key_qr.filename:
if not HAS_QR_READ:
raise HTTPException(
501, "QR code reading not available. Install pyzbar and libzbar."
)
qr_image_data = await rsa_key_qr.read()
key_pem = extract_key_from_qr(qr_image_data)
if not key_pem:
raise HTTPException(400, "Could not extract RSA key from QR code image")
rsa_key_data = key_pem.encode("utf-8")
rsa_key_from_qr = True
# QR code keys are never password-protected
effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None)
# v4.2.0: Run CPU-bound decode in thread pool
result = await run_in_thread(
decode,
stego_image=stego_data,
reference_photo=ref_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=effective_password,
embed_mode=embed_mode,
channel_key=resolved_channel_key,
)
if result.is_file:
return DecodeResponse(
payload_type="file",
file_data_base64=base64.b64encode(result.file_data).decode("utf-8"),
filename=result.filename,
mime_type=result.mime_type,
)
else:
return DecodeResponse(payload_type="text", message=result.message)
except DecryptionError as e:
error_msg = str(e)
if "channel key" in error_msg.lower():
raise HTTPException(401, error_msg)
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - IMAGE INFO
# ============================================================================
@app.post("/image/info", response_model=ImageInfoResponse)
async def api_image_info(
_: str = Depends(require_api_key),
image: UploadFile = File(...),
include_modes: bool = Query(True, description="Include capacity by mode (v3.0+)"),
):
"""
Get information about an image's capacity.
Optionally includes capacity for both LSB and DCT modes.
"""
try:
image_data = await image.read()
result = validate_image(image_data, check_size=False)
if not result.is_valid:
raise HTTPException(400, result.error_message)
capacity = calculate_capacity_by_mode(image_data, "lsb")
response = ImageInfoResponse(
width=result.details["width"],
height=result.details["height"],
pixels=result.details["pixels"],
capacity_bytes=capacity,
capacity_kb=capacity // 1024,
)
if include_modes:
comparison = compare_modes(image_data)
response.modes = {
"lsb": ModeCapacity(
capacity_bytes=comparison["lsb"]["capacity_bytes"],
capacity_kb=round(comparison["lsb"]["capacity_kb"], 1),
available=True,
output_format=comparison["lsb"]["output"],
),
"dct": ModeCapacity(
capacity_bytes=comparison["dct"]["capacity_bytes"],
capacity_kb=round(comparison["dct"]["capacity_kb"], 1),
available=comparison["dct"]["available"],
output_format="PNG/JPEG (grayscale or color)",
),
}
return response
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ROUTES - AUDIO STEGANOGRAPHY (v4.3.0)
# ============================================================================
def _require_audio():
"""Check that audio support is available, raise 501 if not."""
if not HAS_AUDIO_SUPPORT:
raise HTTPException(
501, "Audio steganography not available. Install with: pip install stegasoo[audio]"
)
@app.post("/audio/encode", response_model=AudioEncodeResponse)
async def api_audio_encode(request: AudioEncodeRequest, _: str = Depends(require_api_key)):
"""
Encode a text message into audio.
Audio must be base64-encoded. Returns base64-encoded stego WAV.
v4.3.0: New endpoint for audio steganography.
"""
_require_audio()
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
ref_photo = base64.b64decode(request.reference_photo_base64)
carrier = base64.b64decode(request.carrier_audio_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
stego_audio, stats = await run_in_thread(
encode_audio,
message=request.message,
reference_photo=ref_photo,
carrier_audio=carrier,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
chip_tier=request.chip_tier,
)
stego_b64 = base64.b64encode(stego_audio).decode("utf-8")
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return AudioEncodeResponse(
stego_audio_base64=stego_b64,
embed_mode=stats.embed_mode,
stats={
"samples_modified": stats.samples_modified,
"total_samples": stats.total_samples,
"capacity_used": round(stats.capacity_used * 100, 1),
"bytes_embedded": stats.bytes_embedded,
"sample_rate": stats.sample_rate,
"channels": stats.channels,
"duration_seconds": round(stats.duration_seconds, 2),
},
channel_mode=channel_mode,
channel_fingerprint=channel_fingerprint,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/audio/encode/file", response_model=AudioEncodeResponse)
async def api_audio_encode_file(request: AudioEncodeFileRequest, _: str = Depends(require_api_key)):
"""
Encode a file into audio (JSON with base64).
v4.3.0: New endpoint for audio steganography.
"""
_require_audio()
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
file_data = base64.b64decode(request.file_data_base64)
ref_photo = base64.b64decode(request.reference_photo_base64)
carrier = base64.b64decode(request.carrier_audio_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
payload = FilePayload(
data=file_data, filename=request.filename, mime_type=request.mime_type
)
stego_audio, stats = await run_in_thread(
encode_audio,
message=payload,
reference_photo=ref_photo,
carrier_audio=carrier,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
chip_tier=request.chip_tier,
)
stego_b64 = base64.b64encode(stego_audio).decode("utf-8")
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return AudioEncodeResponse(
stego_audio_base64=stego_b64,
embed_mode=stats.embed_mode,
stats={
"samples_modified": stats.samples_modified,
"total_samples": stats.total_samples,
"capacity_used": round(stats.capacity_used * 100, 1),
"bytes_embedded": stats.bytes_embedded,
"sample_rate": stats.sample_rate,
"channels": stats.channels,
"duration_seconds": round(stats.duration_seconds, 2),
},
channel_mode=channel_mode,
channel_fingerprint=channel_fingerprint,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/audio/encode/multipart")
async def api_audio_encode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase for key derivation"),
reference_photo: UploadFile = File(...),
carrier: UploadFile = File(...),
message: str = Form(""),
payload_file: UploadFile | None = File(None),
pin: str = Form(""),
rsa_key: UploadFile | None = File(None),
rsa_password: str = Form(""),
channel_key: str = Form(
"auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"
),
embed_mode: str = Form("audio_lsb"),
chip_tier: int | None = Form(
None,
description="Spread spectrum chip tier: 0=lossless, 1=high_lossy, 2=low_lossy. Only for audio_spread.",
),
):
"""
Encode audio using multipart form data (file uploads).
Provide either 'message' (text) or 'payload_file' (binary file).
Returns the stego WAV directly with metadata headers.
v4.3.0: New endpoint for audio steganography.
"""
_require_audio()
if embed_mode not in ("audio_lsb", "audio_spread"):
raise HTTPException(400, "embed_mode must be 'audio_lsb' or 'audio_spread'")
# Resolve channel key
if channel_key.lower() == "auto":
resolved_channel_key = None
elif channel_key.lower() == "none":
resolved_channel_key = ""
else:
resolved_channel_key = _resolve_channel_key(channel_key)
try:
ref_data = await reference_photo.read()
carrier_data = await carrier.read()
rsa_key_data = None
if rsa_key and rsa_key.filename:
rsa_key_data = await rsa_key.read()
effective_password = rsa_password if rsa_password else None
# Determine payload
if payload_file and payload_file.filename:
file_data = await payload_file.read()
payload = FilePayload(
data=file_data, filename=payload_file.filename, mime_type=payload_file.content_type
)
elif message:
payload = message
else:
raise HTTPException(400, "Must provide either 'message' or 'payload_file'")
stego_audio, stats = await run_in_thread(
encode_audio,
message=payload,
reference_photo=ref_data,
carrier_audio=carrier_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=effective_password,
embed_mode=embed_mode,
channel_key=resolved_channel_key,
chip_tier=chip_tier,
)
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
headers = {
"Content-Disposition": "attachment; filename=stego_audio.wav",
"X-Stegasoo-Embed-Mode": stats.embed_mode,
"X-Stegasoo-Capacity-Percent": f"{stats.capacity_used * 100:.1f}",
"X-Stegasoo-Samples-Modified": str(stats.samples_modified),
"X-Stegasoo-Duration": f"{stats.duration_seconds:.2f}",
"X-Stegasoo-Channel-Mode": channel_mode,
"X-Stegasoo-Version": __version__,
}
if channel_fingerprint:
headers["X-Stegasoo-Channel-Fingerprint"] = channel_fingerprint
return Response(
content=stego_audio,
media_type="audio/wav",
headers=headers,
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/audio/decode", response_model=DecodeResponse)
async def api_audio_decode(request: AudioDecodeRequest, _: str = Depends(require_api_key)):
"""
Decode a message or file from stego audio.
Returns payload_type to indicate if result is text or file.
v4.3.0: New endpoint for audio steganography.
"""
_require_audio()
resolved_channel_key = _resolve_channel_key(request.channel_key)
try:
stego = base64.b64decode(request.stego_audio_base64)
ref_photo = base64.b64decode(request.reference_photo_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
result = await run_in_thread(
decode_audio,
stego_audio=stego,
reference_photo=ref_photo,
passphrase=request.passphrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
embed_mode=request.embed_mode,
channel_key=resolved_channel_key,
)
if result.is_file:
return DecodeResponse(
payload_type="file",
file_data_base64=base64.b64encode(result.file_data).decode("utf-8"),
filename=result.filename,
mime_type=result.mime_type,
)
else:
return DecodeResponse(payload_type="text", message=result.message)
except DecryptionError as e:
error_msg = str(e)
if "channel key" in error_msg.lower():
raise HTTPException(401, error_msg)
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/audio/decode/multipart", response_model=DecodeResponse)
async def api_audio_decode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase for key derivation"),
reference_photo: UploadFile = File(...),
stego_audio: UploadFile = File(...),
pin: str = Form(""),
rsa_key: UploadFile | None = File(None),
rsa_password: str = Form(""),
channel_key: str = Form(
"auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"
),
embed_mode: str = Form("audio_auto"),
):
"""
Decode audio using multipart form data (file uploads).
Returns JSON with payload_type indicating text or file.
v4.3.0: New endpoint for audio steganography.
"""
_require_audio()
if embed_mode not in ("audio_auto", "audio_lsb", "audio_spread"):
raise HTTPException(400, "embed_mode must be 'audio_auto', 'audio_lsb', or 'audio_spread'")
# Resolve channel key
if channel_key.lower() == "auto":
resolved_channel_key = None
elif channel_key.lower() == "none":
resolved_channel_key = ""
else:
resolved_channel_key = _resolve_channel_key(channel_key)
try:
ref_data = await reference_photo.read()
stego_data = await stego_audio.read()
rsa_key_data = None
if rsa_key and rsa_key.filename:
rsa_key_data = await rsa_key.read()
effective_password = rsa_password if rsa_password else None
result = await run_in_thread(
decode_audio,
stego_audio=stego_data,
reference_photo=ref_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=effective_password,
embed_mode=embed_mode,
channel_key=resolved_channel_key,
)
if result.is_file:
return DecodeResponse(
payload_type="file",
file_data_base64=base64.b64encode(result.file_data).decode("utf-8"),
filename=result.filename,
mime_type=result.mime_type,
)
else:
return DecodeResponse(payload_type="text", message=result.message)
except DecryptionError as e:
error_msg = str(e)
if "channel key" in error_msg.lower():
raise HTTPException(401, error_msg)
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/audio/info", response_model=AudioInfoResponse)
async def api_audio_info(
_: str = Depends(require_api_key),
audio: UploadFile = File(...),
):
"""
Get audio file metadata and embedding capacity.
v4.3.0: New endpoint for audio steganography.
"""
_require_audio()
try:
audio_data = await audio.read()
info = await run_in_thread(get_audio_info, audio_data)
# Calculate capacities for both modes
lsb_capacity = await run_in_thread(calculate_audio_lsb_capacity, audio_data)
try:
spread_info = await run_in_thread(calculate_audio_spread_capacity, audio_data)
spread_capacity = spread_info.usable_capacity_bytes
except Exception:
spread_capacity = 0
return AudioInfoResponse(
sample_rate=info.sample_rate,
channels=info.channels,
duration_seconds=round(info.duration_seconds, 2),
num_samples=info.num_samples,
format=info.format,
bit_depth=info.bit_depth,
bitrate=info.bitrate,
capacity_lsb=lsb_capacity,
capacity_spread=spread_capacity,
)
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/audio/capacity", response_model=AudioCapacityResponse)
async def api_audio_capacity(request: AudioCapacityRequest, _: str = Depends(require_api_key)):
"""
Check if a payload of a given size will fit in an audio carrier.
v4.3.0: New endpoint for audio steganography.
"""
_require_audio()
try:
carrier = base64.b64decode(request.carrier_audio_base64)
if request.embed_mode == "audio_lsb":
capacity = await run_in_thread(calculate_audio_lsb_capacity, carrier)
else:
spread_info = await run_in_thread(calculate_audio_spread_capacity, carrier)
capacity = spread_info.usable_capacity_bytes
fits = request.payload_size <= capacity
usage = (request.payload_size / capacity * 100) if capacity > 0 else 100.0
return AudioCapacityResponse(
fits=fits,
payload_size=request.payload_size,
capacity_bytes=capacity,
usage_percent=round(usage, 1),
embed_mode=request.embed_mode,
)
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ERROR HANDLERS
# ============================================================================
@app.exception_handler(StegasooError)
async def stegasoo_error_handler(request, exc):
return JSONResponse(status_code=400, content={"error": type(exc).__name__, "detail": str(exc)})
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)