#!/usr/bin/env python3 """ Stegasoo REST API (v4.3.0) FastAPI-based REST API for steganography operations. Supports both text messages and file embedding. CHANGES in v4.3.0: - Audio steganography endpoints (/audio/*) - LSB and spread spectrum (DSSS) audio embedding modes - Audio info and capacity checking CHANGES in v4.2.1: - API key authentication (X-API-Key header) - TLS support with self-signed certificates - /auth/* endpoints for key management CHANGES in v4.2.0: - Async encode/decode operations (run in thread pool) - Server can handle concurrent requests without blocking CHANGES in v4.0.0: - Added channel key support for deployment/group isolation - New /channel endpoints for key management - channel_key parameter on encode/decode endpoints - Messages encoded with channel key require same key to decode CHANGES in v3.2.0: - Removed date dependency from all operations - Renamed day_phrase → passphrase - No date_str parameters needed - Simplified API for asynchronous communications NEW in v3.0: LSB and DCT embedding modes. NEW in v3.0.1: DCT color mode and JPEG output format. """ import asyncio import base64 import logging import os import sys from functools import partial from pathlib import Path from typing import Literal # Configure logging for API frontend _log_level = os.environ.get("STEGASOO_LOG_LEVEL", "").strip().upper() if _log_level and hasattr(logging, _log_level): logging.basicConfig( level=getattr(logging, _log_level), format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s", datefmt="%H:%M:%S", stream=sys.stderr, ) elif os.environ.get("STEGASOO_DEBUG", "").strip() in ("1", "true", "yes"): logging.basicConfig( level=logging.DEBUG, format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s", datefmt="%H:%M:%S", stream=sys.stderr, ) api_logger = logging.getLogger("stegasoo.api") from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, UploadFile from fastapi.responses import JSONResponse, Response from pydantic import BaseModel, Field # API Key Authentication try: from .auth import ( add_api_key, get_api_key_status, is_auth_enabled, list_api_keys, remove_api_key, require_api_key, ) except ImportError: # When running directly (not as package) from auth import ( add_api_key, get_api_key_status, list_api_keys, remove_api_key, require_api_key, ) # Add parent to path for development sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) from stegasoo import ( HAS_AUDIO_SUPPORT, MAX_FILE_PAYLOAD_SIZE, CapacityError, DecryptionError, FilePayload, StegasooError, __version__, calculate_capacity_by_mode, clear_channel_key, compare_modes, decode, encode, generate_channel_key, generate_credentials, get_channel_status, has_argon2, has_dct_support, set_channel_key, validate_channel_key, validate_image, will_fit_by_mode, ) # Audio steganography (v4.3.0) - conditionally imported if HAS_AUDIO_SUPPORT: from stegasoo import decode_audio, encode_audio, get_audio_info from stegasoo.audio_steganography import calculate_audio_lsb_capacity from stegasoo.spread_steganography import calculate_audio_spread_capacity from stegasoo.constants import ( DEFAULT_PASSPHRASE_WORDS, MAX_PASSPHRASE_WORDS, MAX_PIN_LENGTH, MIN_PASSPHRASE_WORDS, MIN_PIN_LENGTH, VALID_RSA_SIZES, ) # QR Code utilities try: from stegasoo.qr_utils import ( extract_key_from_qr, generate_qr_ascii, generate_qr_code, has_qr_read, has_qr_write, ) HAS_QR_READ = has_qr_read() HAS_QR_WRITE = has_qr_write() except ImportError: HAS_QR_READ = False HAS_QR_WRITE = False extract_key_from_qr = None generate_qr_code = None generate_qr_ascii = None # ============================================================================ # FASTAPI APP # ============================================================================ app = FastAPI( title="Stegasoo API", description=""" Secure steganography with hybrid authentication. Supports text messages and file embedding. ## Version 4.0.0 Changes - **Channel key support** - Deployment/group isolation for messages - **New /channel endpoints** - Generate, view, and manage channel keys - **channel_key parameter** - Added to encode/decode endpoints ## Version 3.2.0 Changes - **No date parameters needed** - Encode and decode anytime without tracking dates - **Single passphrase** - No daily rotation, just use your passphrase - **True asynchronous communications** - Perfect for dead drops and delayed delivery ## Embedding Modes (v3.0) - **LSB mode** (default): Spatial LSB embedding, full color output, higher capacity - **DCT mode**: Frequency domain embedding, ~20% capacity, better stealth ## DCT Options (v3.0.1) - **dct_color_mode**: 'grayscale' (default) or 'color' (preserves original colors) - **dct_output_format**: 'png' (lossless) or 'jpeg' (smaller, more natural) Use the `/modes` endpoint to check availability and `/compare` to compare capacities. """, version=__version__, docs_url="/docs", redoc_url="/redoc", ) # ============================================================================ # TYPE ALIASES # ============================================================================ EmbedModeType = Literal["lsb", "dct"] ExtractModeType = Literal["auto", "lsb", "dct"] DctColorModeType = Literal["grayscale", "color"] DctOutputFormatType = Literal["png", "jpeg"] AudioEmbedModeType = Literal["audio_lsb", "audio_spread"] AudioExtractModeType = Literal["audio_auto", "audio_lsb", "audio_spread"] # ============================================================================ # MODELS # ============================================================================ class GenerateRequest(BaseModel): use_pin: bool = True use_rsa: bool = False pin_length: int = Field(default=6, ge=MIN_PIN_LENGTH, le=MAX_PIN_LENGTH) rsa_bits: int = Field(default=2048) words_per_passphrase: int = Field( default=DEFAULT_PASSPHRASE_WORDS, ge=MIN_PASSPHRASE_WORDS, le=MAX_PASSPHRASE_WORDS, description="Words per passphrase (v3.2.0: default increased to 4)", ) class GenerateResponse(BaseModel): passphrase: str = Field(description="Single passphrase (v3.2.0: no daily rotation)") pin: str | None = None rsa_key_pem: str | None = None entropy: dict[str, int] # Legacy field for compatibility phrases: dict[str, str] | None = Field( default=None, description="Deprecated: Use 'passphrase' instead" ) class EncodeRequest(BaseModel): message: str reference_photo_base64: str carrier_image_base64: str passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)") pin: str = "" rsa_key_base64: str | None = None rsa_password: str | None = None # Channel key (v4.0.0) channel_key: str | None = Field( default=None, description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key", ) embed_mode: EmbedModeType = Field( default="lsb", description="Embedding mode: 'lsb' (default, color) or 'dct' (requires scipy)", ) dct_output_format: DctOutputFormatType = Field( default="png", description="DCT output format: 'png' (lossless) or 'jpeg' (smaller). Only applies to DCT mode.", ) dct_color_mode: DctColorModeType = Field( default="grayscale", description="DCT color mode: 'grayscale' (default) or 'color' (preserves colors). Only applies to DCT mode.", ) class EncodeFileRequest(BaseModel): """Request for embedding a file (base64-encoded).""" file_data_base64: str filename: str mime_type: str | None = None reference_photo_base64: str carrier_image_base64: str passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)") pin: str = "" rsa_key_base64: str | None = None rsa_password: str | None = None # Channel key (v4.0.0) channel_key: str | None = Field( default=None, description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key", ) embed_mode: EmbedModeType = Field( default="lsb", description="Embedding mode: 'lsb' (default, color) or 'dct' (requires scipy)", ) dct_output_format: DctOutputFormatType = Field( default="png", description="DCT output format: 'png' (lossless) or 'jpeg' (smaller). Only applies to DCT mode.", ) dct_color_mode: DctColorModeType = Field( default="grayscale", description="DCT color mode: 'grayscale' (default) or 'color' (preserves colors). Only applies to DCT mode.", ) class EncodeResponse(BaseModel): stego_image_base64: str filename: str capacity_used_percent: float embed_mode: str = Field(description="Embedding mode used: 'lsb' or 'dct'") output_format: str = Field( default="png", description="Output format: 'png' or 'jpeg' (for DCT mode)" ) color_mode: str = Field( default="color", description="Color mode: 'color' (LSB/DCT color) or 'grayscale' (DCT grayscale)", ) # Channel key info (v4.0.0) channel_mode: str = Field(default="public", description="Channel mode: 'public' or 'private'") channel_fingerprint: str | None = Field( default=None, description="Channel key fingerprint (if private mode)" ) # Legacy fields (v3.2.0: no longer used in crypto) date_used: str | None = Field( default=None, description="Deprecated: Date no longer used in v3.2.0" ) day_of_week: str | None = Field( default=None, description="Deprecated: Date no longer used in v3.2.0" ) class DecodeRequest(BaseModel): stego_image_base64: str reference_photo_base64: str passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)") pin: str = "" rsa_key_base64: str | None = None rsa_password: str | None = None # Channel key (v4.0.0) channel_key: str | None = Field( default=None, description="Channel key for decryption. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key", ) embed_mode: ExtractModeType = Field( default="auto", description="Extraction mode: 'auto' (default), 'lsb', or 'dct'" ) class DecodeResponse(BaseModel): """Response for decode - can be text or file.""" payload_type: str # 'text' or 'file' message: str | None = None # For text file_data_base64: str | None = None # For file (base64-encoded) filename: str | None = None # For file mime_type: str | None = None # For file class ModeCapacity(BaseModel): """Capacity info for a single mode.""" capacity_bytes: int capacity_kb: float available: bool output_format: str class ImageInfoResponse(BaseModel): width: int height: int pixels: int capacity_bytes: int = Field(description="LSB mode capacity (for backwards compatibility)") capacity_kb: int = Field(description="LSB mode capacity in KB") modes: dict[str, ModeCapacity] | None = Field( default=None, description="Capacity by embedding mode (v3.0+)" ) class CompareModesRequest(BaseModel): """Request for comparing embedding modes.""" carrier_image_base64: str payload_size: int | None = Field( default=None, description="Optional payload size to check if it fits" ) class CompareModesResponse(BaseModel): """Response comparing LSB and DCT modes.""" width: int height: int lsb: dict dct: dict payload_check: dict | None = None recommendation: str class DctModeInfo(BaseModel): """Detailed DCT mode information.""" available: bool name: str description: str output_formats: list[str] color_modes: list[str] capacity_ratio: str requires: str class ChannelStatusResponse(BaseModel): """Response for channel key status (v4.0.0).""" mode: str = Field(description="'public' or 'private'") configured: bool = Field(description="Whether a channel key is configured") fingerprint: str | None = Field(default=None, description="Key fingerprint (partial)") source: str | None = Field(default=None, description="Where the key comes from") key: str | None = Field(default=None, description="Full key (only if reveal=true)") class ChannelGenerateResponse(BaseModel): """Response for channel key generation (v4.0.0).""" key: str = Field(description="Generated channel key") fingerprint: str = Field(description="Key fingerprint") saved: bool = Field(default=False, description="Whether key was saved to config") save_location: str | None = Field(default=None, description="Where key was saved") class ChannelSetRequest(BaseModel): """Request to set channel key (v4.0.0).""" key: str = Field(description="Channel key to set") location: str = Field(default="user", description="'user' or 'project'") class AuthStatusResponse(BaseModel): """Response for API key authentication status.""" enabled: bool = Field(description="Whether API key auth is enabled") total_keys: int = Field(description="Total number of configured API keys") user_keys: int = Field(description="Keys in user config") project_keys: int = Field(description="Keys in project config") env_configured: bool = Field(description="Whether env var key is set") class AuthKeyInfo(BaseModel): """Info about a single API key (not the actual key).""" name: str created: str class ModesResponse(BaseModel): """Response showing available embedding modes.""" lsb: dict dct: DctModeInfo audio: dict | None = Field(default=None, description="Audio steganography modes (v4.3.0)") # Channel key status (v4.0.0) channel: dict | None = Field(default=None, description="Channel key status (v4.0.0)") class StatusResponse(BaseModel): version: str has_argon2: bool has_qrcode_read: bool has_qrcode_write: bool # v4.2.0: QR generation capability has_dct: bool has_audio: bool = Field(default=False, description="Audio steganography support (v4.3.0)") max_payload_kb: int available_modes: list[str] dct_features: dict | None = Field(default=None, description="DCT mode features (v3.0.1+)") # Channel key status (v4.0.0) channel: dict | None = Field(default=None, description="Channel key status (v4.0.0)") breaking_changes: dict = Field(description="v4.0.0 breaking changes") class QrExtractResponse(BaseModel): success: bool key_pem: str | None = None error: str | None = None class QrGenerateRequest(BaseModel): """Request to generate QR code from RSA key.""" key_pem: str = Field(..., description="RSA private key in PEM format") output_format: str = Field( default="png", description="Output format: 'png', 'jpg', or 'ascii'", ) compress: bool = Field( default=True, description="Compress key data with zstd (recommended for larger keys)", ) class QrGenerateResponse(BaseModel): """Response containing generated QR code.""" success: bool format: str | None = None qr_data: str | None = Field( default=None, description="Base64-encoded image data (for png/jpg) or ASCII string", ) error: str | None = None class WillFitRequest(BaseModel): """Request to check if payload will fit.""" carrier_image_base64: str payload_size: int embed_mode: EmbedModeType = "lsb" class WillFitResponse(BaseModel): """Response for will_fit check.""" fits: bool payload_size: int capacity: int usage_percent: float headroom: int mode: str class ErrorResponse(BaseModel): error: str detail: str | None = None # --- Audio models (v4.3.0) --- class AudioEncodeRequest(BaseModel): """Request to encode a text message into audio.""" message: str reference_photo_base64: str carrier_audio_base64: str passphrase: str = Field(description="Passphrase for key derivation") pin: str = "" rsa_key_base64: str | None = None rsa_password: str | None = None channel_key: str | None = Field( default=None, description="Channel key for deployment isolation. null=auto, ''=public, 'XXXX-...'=explicit", ) embed_mode: AudioEmbedModeType = Field( default="audio_lsb", description="Embedding mode: 'audio_lsb' (default) or 'audio_spread' (DSSS)", ) chip_tier: int | None = Field( default=None, description="Spread spectrum chip tier: 0=lossless(256), 1=high_lossy(512), 2=low_lossy(1024). Only for audio_spread.", ) class AudioEncodeFileRequest(BaseModel): """Request to encode a file into audio.""" file_data_base64: str filename: str mime_type: str | None = None reference_photo_base64: str carrier_audio_base64: str passphrase: str = Field(description="Passphrase for key derivation") pin: str = "" rsa_key_base64: str | None = None rsa_password: str | None = None channel_key: str | None = Field( default=None, description="Channel key for deployment isolation. null=auto, ''=public, 'XXXX-...'=explicit", ) embed_mode: AudioEmbedModeType = Field( default="audio_lsb", description="Embedding mode: 'audio_lsb' (default) or 'audio_spread' (DSSS)", ) chip_tier: int | None = Field( default=None, description="Spread spectrum chip tier: 0=lossless(256), 1=high_lossy(512), 2=low_lossy(1024). Only for audio_spread.", ) class AudioEncodeResponse(BaseModel): """Response from audio encode operations.""" stego_audio_base64: str embed_mode: str = Field(description="Embedding mode used: 'audio_lsb' or 'audio_spread'") stats: dict = Field(description="Embedding statistics (samples_modified, capacity_used, etc.)") channel_mode: str = Field(default="public", description="Channel mode: 'public' or 'private'") channel_fingerprint: str | None = Field( default=None, description="Channel key fingerprint (if private mode)" ) class AudioDecodeRequest(BaseModel): """Request to decode a message or file from stego audio.""" stego_audio_base64: str reference_photo_base64: str passphrase: str = Field(description="Passphrase for key derivation") pin: str = "" rsa_key_base64: str | None = None rsa_password: str | None = None channel_key: str | None = Field( default=None, description="Channel key for decryption. null=auto, ''=public, 'XXXX-...'=explicit", ) embed_mode: AudioExtractModeType = Field( default="audio_auto", description="Extraction mode: 'audio_auto' (default), 'audio_lsb', or 'audio_spread'", ) class AudioInfoResponse(BaseModel): """Response with audio file metadata and capacity info.""" sample_rate: int channels: int duration_seconds: float num_samples: int format: str bit_depth: int | None = None bitrate: int | None = None capacity_lsb: int = Field(description="LSB mode capacity in bytes") capacity_spread: int = Field(description="Spread spectrum mode capacity in bytes") class AudioCapacityRequest(BaseModel): """Request to check if a payload fits in audio carrier.""" carrier_audio_base64: str payload_size: int = Field(ge=1, description="Payload size in bytes") embed_mode: AudioEmbedModeType = Field( default="audio_lsb", description="Embedding mode to check capacity for" ) class AudioCapacityResponse(BaseModel): """Response for audio capacity check.""" fits: bool payload_size: int capacity_bytes: int usage_percent: float embed_mode: str # ============================================================================ # HELPER: RESOLVE CHANNEL KEY # ============================================================================ def _resolve_channel_key(channel_key: str | None) -> str | None: """ Resolve channel key from API parameter. Wrapper around library's resolve_channel_key with HTTP exception handling. Returns: Resolved channel key to pass to encode/decode Raises: HTTPException: If key format is invalid """ from stegasoo.channel import resolve_channel_key try: return resolve_channel_key(channel_key) except (ValueError, FileNotFoundError) as e: raise HTTPException(400, str(e)) def _get_channel_info(channel_key: str | None) -> tuple[str, str | None]: """ Get channel mode and fingerprint for response. Uses library's get_channel_response_info for consistent formatting. Returns: (mode, fingerprint) tuple """ from stegasoo.channel import get_channel_response_info info = get_channel_response_info(channel_key) return info["mode"], info.get("fingerprint") # ============================================================================ # HELPER: ASYNC EXECUTION # ============================================================================ async def run_in_thread(func, *args, **kwargs): """ Run a CPU-bound function in a thread pool. This allows the FastAPI server to handle other requests while encode/decode operations are running. Essential for Pi deployments where operations can take several seconds. Usage: result = await run_in_thread(encode, message=msg, carrier_image=carrier, ...) """ if kwargs: func = partial(func, **kwargs) return await asyncio.to_thread(func, *args) # ============================================================================ # ROUTES - STATUS & INFO # ============================================================================ @app.get("/", response_model=StatusResponse) async def root(): """Get API status and configuration.""" available_modes = ["lsb"] dct_features = None if has_dct_support(): available_modes.append("dct") dct_features = { "output_formats": ["png", "jpeg"], "color_modes": ["grayscale", "color"], "default_output_format": "png", "default_color_mode": "grayscale", } # Channel key status (v4.0.0) channel_status = get_channel_status() channel_info = { "mode": channel_status["mode"], "configured": channel_status["configured"], "fingerprint": channel_status.get("fingerprint"), "source": channel_status.get("source"), } # Audio modes (v4.3.0) if HAS_AUDIO_SUPPORT: available_modes.append("audio_lsb") available_modes.append("audio_spread") return StatusResponse( version=__version__, has_argon2=has_argon2(), has_qrcode_read=HAS_QR_READ, has_qrcode_write=HAS_QR_WRITE, has_dct=has_dct_support(), has_audio=HAS_AUDIO_SUPPORT, max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024, available_modes=available_modes, dct_features=dct_features, channel=channel_info, breaking_changes={ "v4_channel_key": "Messages encoded with channel key require same key to decode", "format_version": 5, "backward_compatible": False, "v3_notes": { "date_removed": "No date_str parameter needed - encode/decode anytime", "passphrase_renamed": "day_phrase → passphrase (single passphrase, no daily rotation)", }, }, ) @app.get("/modes", response_model=ModesResponse) async def api_modes(): """ Get available embedding modes and their status. v4.0.0: Also includes channel key status. """ # Channel status channel_status = get_channel_status() channel_info = { "mode": channel_status["mode"], "configured": channel_status["configured"], "fingerprint": channel_status.get("fingerprint"), } # Audio modes (v4.3.0) audio_info = None if HAS_AUDIO_SUPPORT: audio_info = { "available": True, "modes": { "audio_lsb": { "name": "Audio LSB", "description": "Embed in audio sample LSBs, high capacity", "output_format": "WAV", }, "audio_spread": { "name": "Spread Spectrum (DSSS)", "description": "Direct-sequence spread spectrum with Reed-Solomon ECC, better stealth", "output_format": "WAV", }, }, "supported_formats": ["WAV", "FLAC", "MP3", "OGG", "AAC", "M4A"], "output_format": "WAV", "requires": "soundfile", } return ModesResponse( lsb={ "available": True, "name": "Spatial LSB", "description": "Embed in pixel LSBs, outputs PNG/BMP", "output_format": "PNG (color)", "capacity_ratio": "100%", }, dct=DctModeInfo( available=has_dct_support(), name="DCT Domain", description="Embed in DCT coefficients, frequency domain steganography", output_formats=["png", "jpeg"], color_modes=["grayscale", "color"], capacity_ratio="~20% of LSB", requires="scipy", ), audio=audio_info, channel=channel_info, ) # ============================================================================ # ROUTES - CHANNEL KEY (v4.0.0) # ============================================================================ @app.get("/channel/status", response_model=ChannelStatusResponse) async def api_channel_status( reveal: bool = Query(False, description="Include full key in response") ): """ Get current channel key status. v4.0.0: New endpoint for channel key management. Returns mode (public/private), fingerprint, and source. Use reveal=true to include the full key. """ status = get_channel_status() return ChannelStatusResponse( mode=status["mode"], configured=status["configured"], fingerprint=status.get("fingerprint"), source=status.get("source"), key=status.get("key") if reveal and status["configured"] else None, ) @app.post("/channel/generate", response_model=ChannelGenerateResponse) async def api_channel_generate( _: str = Depends(require_api_key), save: bool = Query(False, description="Save to user config"), save_project: bool = Query(False, description="Save to project config"), ): """ Generate a new channel key. v4.0.0: New endpoint for channel key management. Optionally saves to user config (~/.stegasoo/channel.key) or project config (./config/channel.key). """ if save and save_project: raise HTTPException(400, "Cannot use both save and save_project") key = generate_channel_key() fingerprint = f"{key[:4]}-••••-••••-••••-••••-••••-••••-{key[-4:]}" saved = False save_location = None if save: set_channel_key(key, location="user") saved = True save_location = "~/.stegasoo/channel.key" elif save_project: set_channel_key(key, location="project") saved = True save_location = "./config/channel.key" return ChannelGenerateResponse( key=key, fingerprint=fingerprint, saved=saved, save_location=save_location, ) @app.post("/channel/set") async def api_channel_set(request: ChannelSetRequest, _: str = Depends(require_api_key)): """ Set/save a channel key to config. v4.0.0: New endpoint for channel key management. """ if not validate_channel_key(request.key): raise HTTPException( 400, "Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX" ) if request.location not in ("user", "project"): raise HTTPException(400, "location must be 'user' or 'project'") set_channel_key(request.key, location=request.location) status = get_channel_status() return { "success": True, "location": status.get("source"), "fingerprint": status.get("fingerprint"), } @app.delete("/channel") async def api_channel_clear( _: str = Depends(require_api_key), location: str = Query("user", description="'user', 'project', or 'all'"), ): """ Clear/remove channel key from config. v4.0.0: New endpoint for channel key management. Note: Does not affect environment variables. """ if location == "all": clear_channel_key(location="user") clear_channel_key(location="project") elif location in ("user", "project"): clear_channel_key(location=location) else: raise HTTPException(400, "location must be 'user', 'project', or 'all'") status = get_channel_status() return { "success": True, "mode": status["mode"], "still_configured": status["configured"], "remaining_source": status.get("source"), } # ============================================================================ # ROUTES - AUTHENTICATION (v4.2.1) # ============================================================================ @app.get("/auth/status", response_model=AuthStatusResponse) async def api_auth_status(): """ Get API key authentication status. v4.2.1: New endpoint for auth status. Returns whether auth is enabled and key counts. """ status = get_api_key_status() return AuthStatusResponse( enabled=status["enabled"], total_keys=status["total_keys"], user_keys=status["user_keys"], project_keys=status["project_keys"], env_configured=status["env_configured"], ) @app.get("/auth/keys", response_model=list[AuthKeyInfo]) async def api_auth_list_keys( location: str = Query("user", description="'user' or 'project'"), _: str = Depends(require_api_key), ): """ List configured API keys (names only, not actual keys). v4.2.1: New endpoint for auth management. Requires authentication. """ if location not in ("user", "project"): raise HTTPException(400, "location must be 'user' or 'project'") keys = list_api_keys(location) return [AuthKeyInfo(name=k["name"], created=k["created"]) for k in keys] @app.post("/auth/keys") async def api_auth_create_key( name: str = Query(..., description="Name for the new API key"), location: str = Query("user", description="'user' or 'project'"), _: str = Depends(require_api_key), ): """ Create a new API key. v4.2.1: New endpoint for auth management. Returns the key ONCE - it cannot be retrieved again! Requires authentication (or no keys configured yet). """ if location not in ("user", "project"): raise HTTPException(400, "location must be 'user' or 'project'") try: key = add_api_key(name, location) return { "success": True, "name": name, "key": key, "warning": "Save this key now! It cannot be retrieved again.", } except ValueError as e: raise HTTPException(400, str(e)) @app.delete("/auth/keys") async def api_auth_delete_key( name: str = Query(..., description="Name of key to delete"), location: str = Query("user", description="'user' or 'project'"), _: str = Depends(require_api_key), ): """ Delete an API key by name. v4.2.1: New endpoint for auth management. Requires authentication. """ if location not in ("user", "project"): raise HTTPException(400, "location must be 'user' or 'project'") if remove_api_key(name, location): return {"success": True, "deleted": name} else: raise HTTPException(404, f"Key '{name}' not found in {location} config") @app.post("/compare", response_model=CompareModesResponse) async def api_compare_modes(request: CompareModesRequest, _: str = Depends(require_api_key)): """ Compare LSB and DCT embedding modes for a carrier image. Returns capacity for both modes and recommendation. Optionally checks if a specific payload size would fit. """ try: carrier = base64.b64decode(request.carrier_image_base64) comparison = compare_modes(carrier) response = CompareModesResponse( width=comparison["width"], height=comparison["height"], lsb={ "capacity_bytes": comparison["lsb"]["capacity_bytes"], "capacity_kb": round(comparison["lsb"]["capacity_kb"], 1), "available": True, "output_format": comparison["lsb"]["output"], }, dct={ "capacity_bytes": comparison["dct"]["capacity_bytes"], "capacity_kb": round(comparison["dct"]["capacity_kb"], 1), "available": comparison["dct"]["available"], "output_formats": ["png", "jpeg"], "color_modes": ["grayscale", "color"], "ratio_vs_lsb_percent": round(comparison["dct"]["ratio_vs_lsb"], 1), }, recommendation=( "lsb" if not comparison["dct"]["available"] else "dct for stealth, lsb for capacity" ), ) if request.payload_size: fits_lsb = request.payload_size <= comparison["lsb"]["capacity_bytes"] fits_dct = request.payload_size <= comparison["dct"]["capacity_bytes"] response.payload_check = { "size_bytes": request.payload_size, "fits_lsb": fits_lsb, "fits_dct": fits_dct, } # Update recommendation based on payload if fits_dct and comparison["dct"]["available"]: response.recommendation = "dct (payload fits, better stealth)" elif fits_lsb: response.recommendation = "lsb (payload too large for dct)" else: response.recommendation = "none (payload too large for both modes)" return response except Exception as e: raise HTTPException(500, str(e)) @app.post("/will-fit", response_model=WillFitResponse) async def api_will_fit(request: WillFitRequest, _: str = Depends(require_api_key)): """ Check if a payload of given size will fit in the carrier image. Supports both LSB and DCT modes. """ try: # Validate mode if request.embed_mode == "dct" and not has_dct_support(): raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") carrier = base64.b64decode(request.carrier_image_base64) result = will_fit_by_mode(request.payload_size, carrier, embed_mode=request.embed_mode) return WillFitResponse( fits=result["fits"], payload_size=result["payload_size"], capacity=result["capacity"], usage_percent=round(result["usage_percent"], 1), headroom=result["headroom"], mode=request.embed_mode, ) except HTTPException: raise except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # ROUTES - QR CODE # ============================================================================ @app.post("/extract-key-from-qr", response_model=QrExtractResponse) async def api_extract_key_from_qr( _: str = Depends(require_api_key), qr_image: UploadFile = File(..., description="QR code image containing RSA key"), ): """ Extract RSA key from a QR code image. Supports both compressed (STEGASOO-Z: prefix) and uncompressed keys. Returns the PEM-encoded key if found. """ if not HAS_QR_READ: raise HTTPException(501, "QR code reading not available. Install pyzbar and libzbar.") try: image_data = await qr_image.read() key_pem = extract_key_from_qr(image_data) if key_pem: return QrExtractResponse(success=True, key_pem=key_pem) else: return QrExtractResponse(success=False, error="No valid RSA key found in QR code") except Exception as e: return QrExtractResponse(success=False, error=str(e)) @app.post("/generate-key-qr", response_model=QrGenerateResponse) async def api_generate_key_qr(request: QrGenerateRequest, _: str = Depends(require_api_key)): """ Generate QR code from an RSA private key. Supports PNG, JPG, and ASCII output formats. Uses zstd compression by default for better QR code density. """ if not HAS_QR_WRITE: raise HTTPException(501, "QR code generation not available. Install qrcode library.") try: fmt = request.output_format.lower() if fmt == "ascii": ascii_qr = generate_qr_ascii( request.key_pem, compress=request.compress, invert=False, ) return QrGenerateResponse(success=True, format="ascii", qr_data=ascii_qr) elif fmt in ("png", "jpg", "jpeg"): import base64 qr_bytes = generate_qr_code( request.key_pem, compress=request.compress, output_format=fmt, ) qr_b64 = base64.b64encode(qr_bytes).decode("ascii") return QrGenerateResponse(success=True, format=fmt, qr_data=qr_b64) else: return QrGenerateResponse( success=False, error=f"Unsupported format: {fmt}. Use 'png', 'jpg', or 'ascii'", ) except ValueError as e: return QrGenerateResponse(success=False, error=str(e)) except Exception as e: return QrGenerateResponse(success=False, error=f"QR generation failed: {e}") # ============================================================================ # ROUTES - GENERATE # ============================================================================ @app.post("/generate", response_model=GenerateResponse) async def api_generate(request: GenerateRequest, _: str = Depends(require_api_key)): """ Generate credentials for encoding/decoding. At least one of use_pin or use_rsa must be True. v3.2.0: Generates single passphrase (no daily rotation). Default increased to 4 words for better security. """ if not request.use_pin and not request.use_rsa: raise HTTPException(400, "Must enable at least one of use_pin or use_rsa") if request.rsa_bits not in VALID_RSA_SIZES: raise HTTPException(400, f"rsa_bits must be one of {VALID_RSA_SIZES}") try: creds = generate_credentials( use_pin=request.use_pin, use_rsa=request.use_rsa, pin_length=request.pin_length, rsa_bits=request.rsa_bits, passphrase_words=request.words_per_passphrase, ) return GenerateResponse( passphrase=creds.passphrase, pin=creds.pin, rsa_key_pem=creds.rsa_key_pem, entropy={ "passphrase": creds.passphrase_entropy, "pin": creds.pin_entropy, "rsa": creds.rsa_entropy, "total": creds.total_entropy, }, phrases=None, # Legacy field removed ) except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # HELPER FUNCTION FOR DCT PARAMETERS # ============================================================================ def _get_dct_params(embed_mode: str, dct_output_format: str, dct_color_mode: str) -> dict: """ Get DCT-specific parameters if DCT mode is selected. Returns kwargs to pass to encode(). """ if embed_mode != "dct": return {} return { "dct_output_format": dct_output_format, "dct_color_mode": dct_color_mode, } def _get_output_info(embed_mode: str, dct_output_format: str, dct_color_mode: str) -> tuple: """ Get output format and color mode strings for response. Returns (output_format, color_mode, mime_type). """ if embed_mode == "dct": output_format = dct_output_format color_mode = dct_color_mode mime_type = "image/jpeg" if dct_output_format == "jpeg" else "image/png" else: output_format = "png" color_mode = "color" mime_type = "image/png" return output_format, color_mode, mime_type # ============================================================================ # ROUTES - ENCODE (JSON) # ============================================================================ @app.post("/encode", response_model=EncodeResponse) async def api_encode(request: EncodeRequest, _: str = Depends(require_api_key)): """ Encode a text message into an image. Images must be base64-encoded. Returns base64-encoded stego image. v4.0.0: Added channel_key parameter for deployment isolation. v3.2.0: No date_str parameter needed - encode anytime! """ # Validate mode if request.embed_mode == "dct" and not has_dct_support(): raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") # Resolve channel key resolved_channel_key = _resolve_channel_key(request.channel_key) try: ref_photo = base64.b64decode(request.reference_photo_base64) carrier = base64.b64decode(request.carrier_image_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None # Get DCT parameters dct_params = _get_dct_params( request.embed_mode, request.dct_output_format, request.dct_color_mode ) # v4.2.0: Run CPU-bound encode in thread pool result = await run_in_thread( encode, message=request.message, reference_photo=ref_photo, carrier_image=carrier, passphrase=request.passphrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, embed_mode=request.embed_mode, channel_key=resolved_channel_key, **dct_params, ) stego_b64 = base64.b64encode(result.stego_image).decode("utf-8") output_format, color_mode, _ = _get_output_info( request.embed_mode, request.dct_output_format, request.dct_color_mode ) # Get channel info for response channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) return EncodeResponse( stego_image_base64=stego_b64, filename=result.filename, capacity_used_percent=result.capacity_percent, embed_mode=request.embed_mode, output_format=output_format, color_mode=color_mode, channel_mode=channel_mode, channel_fingerprint=channel_fingerprint, date_used=None, day_of_week=None, ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/encode/file", response_model=EncodeResponse) async def api_encode_file(request: EncodeFileRequest, _: str = Depends(require_api_key)): """ Encode a file into an image (JSON with base64). File data must be base64-encoded. v4.0.0: Added channel_key parameter for deployment isolation. v3.2.0: No date_str parameter needed - encode anytime! """ # Validate mode if request.embed_mode == "dct" and not has_dct_support(): raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") # Resolve channel key resolved_channel_key = _resolve_channel_key(request.channel_key) try: file_data = base64.b64decode(request.file_data_base64) ref_photo = base64.b64decode(request.reference_photo_base64) carrier = base64.b64decode(request.carrier_image_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None payload = FilePayload( data=file_data, filename=request.filename, mime_type=request.mime_type ) # Get DCT parameters dct_params = _get_dct_params( request.embed_mode, request.dct_output_format, request.dct_color_mode ) # v4.2.0: Run CPU-bound encode in thread pool result = await run_in_thread( encode, message=payload, reference_photo=ref_photo, carrier_image=carrier, passphrase=request.passphrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, embed_mode=request.embed_mode, channel_key=resolved_channel_key, **dct_params, ) stego_b64 = base64.b64encode(result.stego_image).decode("utf-8") output_format, color_mode, _ = _get_output_info( request.embed_mode, request.dct_output_format, request.dct_color_mode ) # Get channel info for response channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) return EncodeResponse( stego_image_base64=stego_b64, filename=result.filename, capacity_used_percent=result.capacity_percent, embed_mode=request.embed_mode, output_format=output_format, color_mode=color_mode, channel_mode=channel_mode, channel_fingerprint=channel_fingerprint, date_used=None, day_of_week=None, ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # ROUTES - DECODE (JSON) # ============================================================================ @app.post("/decode", response_model=DecodeResponse) async def api_decode(request: DecodeRequest, _: str = Depends(require_api_key)): """ Decode a message or file from a stego image. Returns payload_type to indicate if result is text or file. v4.0.0: Added channel_key parameter - must match encoding key. v3.2.0: No date_str parameter needed - decode anytime! """ # Validate mode if request.embed_mode == "dct" and not has_dct_support(): raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") # Resolve channel key resolved_channel_key = _resolve_channel_key(request.channel_key) try: stego = base64.b64decode(request.stego_image_base64) ref_photo = base64.b64decode(request.reference_photo_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None # v4.2.0: Run CPU-bound decode in thread pool result = await run_in_thread( decode, stego_image=stego, reference_photo=ref_photo, passphrase=request.passphrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, embed_mode=request.embed_mode, channel_key=resolved_channel_key, ) if result.is_file: return DecodeResponse( payload_type="file", file_data_base64=base64.b64encode(result.file_data).decode("utf-8"), filename=result.filename, mime_type=result.mime_type, ) else: return DecodeResponse(payload_type="text", message=result.message) except DecryptionError as e: # Provide helpful error message for channel key issues error_msg = str(e) if "channel key" in error_msg.lower(): raise HTTPException(401, error_msg) raise HTTPException(401, "Decryption failed. Check credentials.") except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # ROUTES - ENCODE/DECODE (MULTIPART) # ============================================================================ @app.post("/encode/multipart") async def api_encode_multipart( _: str = Depends(require_api_key), passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"), reference_photo: UploadFile = File(...), carrier: UploadFile = File(...), message: str = Form(""), payload_file: UploadFile | None = File(None), pin: str = Form(""), rsa_key: UploadFile | None = File(None), rsa_key_qr: UploadFile | None = File(None), rsa_password: str = Form(""), # Channel key (v4.0.0) channel_key: str = Form( "auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit" ), embed_mode: str = Form("lsb"), dct_output_format: str = Form("png"), dct_color_mode: str = Form("grayscale"), ): """ Encode using multipart form data (file uploads). Provide either 'message' (text) or 'payload_file' (binary file). RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image). Returns the stego image directly with metadata headers. v4.0.0: Added channel_key parameter for deployment isolation. Use 'auto' for server config, 'none' for public mode. v3.2.0: No date_str parameter needed - encode anytime! """ # Validate mode if embed_mode not in ("lsb", "dct"): raise HTTPException(400, "embed_mode must be 'lsb' or 'dct'") if embed_mode == "dct" and not has_dct_support(): raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") # Validate DCT options if dct_output_format not in ("png", "jpeg"): raise HTTPException(400, "dct_output_format must be 'png' or 'jpeg'") if dct_color_mode not in ("grayscale", "color"): raise HTTPException(400, "dct_color_mode must be 'grayscale' or 'color'") # Resolve channel key (v4.0.0) # Form data: "auto" = use server config, "none" = public, otherwise explicit key if channel_key.lower() == "auto": resolved_channel_key = None # Auto mode elif channel_key.lower() == "none": resolved_channel_key = "" # Public mode else: resolved_channel_key = _resolve_channel_key(channel_key) try: ref_data = await reference_photo.read() carrier_data = await carrier.read() # Handle RSA key from .pem file or QR code image rsa_key_data = None rsa_key_from_qr = False if rsa_key and rsa_key.filename: rsa_key_data = await rsa_key.read() elif rsa_key_qr and rsa_key_qr.filename: if not HAS_QR_READ: raise HTTPException( 501, "QR code reading not available. Install pyzbar and libzbar." ) qr_image_data = await rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if not key_pem: raise HTTPException(400, "Could not extract RSA key from QR code image") rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True # QR code keys are never password-protected effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) # Determine payload if payload_file and payload_file.filename: file_data = await payload_file.read() payload = FilePayload( data=file_data, filename=payload_file.filename, mime_type=payload_file.content_type ) elif message: payload = message else: raise HTTPException(400, "Must provide either 'message' or 'payload_file'") # Get DCT parameters dct_params = _get_dct_params(embed_mode, dct_output_format, dct_color_mode) # v4.2.0: Run CPU-bound encode in thread pool result = await run_in_thread( encode, message=payload, reference_photo=ref_data, carrier_image=carrier_data, passphrase=passphrase, pin=pin, rsa_key_data=rsa_key_data, rsa_password=effective_password, embed_mode=embed_mode, channel_key=resolved_channel_key, **dct_params, ) output_format, color_mode, mime_type = _get_output_info( embed_mode, dct_output_format, dct_color_mode ) # Get channel info for headers channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) headers = { "Content-Disposition": f"attachment; filename={result.filename}", "X-Stegasoo-Capacity-Percent": f"{result.capacity_percent:.1f}", "X-Stegasoo-Embed-Mode": embed_mode, "X-Stegasoo-Output-Format": output_format, "X-Stegasoo-Color-Mode": color_mode, "X-Stegasoo-Channel-Mode": channel_mode, "X-Stegasoo-Version": __version__, } if channel_fingerprint: headers["X-Stegasoo-Channel-Fingerprint"] = channel_fingerprint return Response( content=result.stego_image, media_type=mime_type, headers=headers, ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except HTTPException: raise except Exception as e: raise HTTPException(500, str(e)) @app.post("/decode/multipart", response_model=DecodeResponse) async def api_decode_multipart( _: str = Depends(require_api_key), passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"), reference_photo: UploadFile = File(...), stego_image: UploadFile = File(...), pin: str = Form(""), rsa_key: UploadFile | None = File(None), rsa_key_qr: UploadFile | None = File(None), rsa_password: str = Form(""), # Channel key (v4.0.0) channel_key: str = Form( "auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit" ), embed_mode: str = Form("auto"), ): """ Decode using multipart form data (file uploads). RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image). Returns JSON with payload_type indicating text or file. v4.0.0: Added channel_key parameter - must match what was used for encoding. Use 'auto' for server config, 'none' for public mode. v3.2.0: No date_str parameter needed - decode anytime! """ # Validate mode if embed_mode not in ("auto", "lsb", "dct"): raise HTTPException(400, "embed_mode must be 'auto', 'lsb', or 'dct'") if embed_mode == "dct" and not has_dct_support(): raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") # Resolve channel key (v4.0.0) if channel_key.lower() == "auto": resolved_channel_key = None # Auto mode elif channel_key.lower() == "none": resolved_channel_key = "" # Public mode else: resolved_channel_key = _resolve_channel_key(channel_key) try: ref_data = await reference_photo.read() stego_data = await stego_image.read() # Handle RSA key from .pem file or QR code image rsa_key_data = None rsa_key_from_qr = False if rsa_key and rsa_key.filename: rsa_key_data = await rsa_key.read() elif rsa_key_qr and rsa_key_qr.filename: if not HAS_QR_READ: raise HTTPException( 501, "QR code reading not available. Install pyzbar and libzbar." ) qr_image_data = await rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if not key_pem: raise HTTPException(400, "Could not extract RSA key from QR code image") rsa_key_data = key_pem.encode("utf-8") rsa_key_from_qr = True # QR code keys are never password-protected effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) # v4.2.0: Run CPU-bound decode in thread pool result = await run_in_thread( decode, stego_image=stego_data, reference_photo=ref_data, passphrase=passphrase, pin=pin, rsa_key_data=rsa_key_data, rsa_password=effective_password, embed_mode=embed_mode, channel_key=resolved_channel_key, ) if result.is_file: return DecodeResponse( payload_type="file", file_data_base64=base64.b64encode(result.file_data).decode("utf-8"), filename=result.filename, mime_type=result.mime_type, ) else: return DecodeResponse(payload_type="text", message=result.message) except DecryptionError as e: error_msg = str(e) if "channel key" in error_msg.lower(): raise HTTPException(401, error_msg) raise HTTPException(401, "Decryption failed. Check credentials.") except StegasooError as e: raise HTTPException(400, str(e)) except HTTPException: raise except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # ROUTES - IMAGE INFO # ============================================================================ @app.post("/image/info", response_model=ImageInfoResponse) async def api_image_info( _: str = Depends(require_api_key), image: UploadFile = File(...), include_modes: bool = Query(True, description="Include capacity by mode (v3.0+)"), ): """ Get information about an image's capacity. Optionally includes capacity for both LSB and DCT modes. """ try: image_data = await image.read() result = validate_image(image_data, check_size=False) if not result.is_valid: raise HTTPException(400, result.error_message) capacity = calculate_capacity_by_mode(image_data, "lsb") response = ImageInfoResponse( width=result.details["width"], height=result.details["height"], pixels=result.details["pixels"], capacity_bytes=capacity, capacity_kb=capacity // 1024, ) if include_modes: comparison = compare_modes(image_data) response.modes = { "lsb": ModeCapacity( capacity_bytes=comparison["lsb"]["capacity_bytes"], capacity_kb=round(comparison["lsb"]["capacity_kb"], 1), available=True, output_format=comparison["lsb"]["output"], ), "dct": ModeCapacity( capacity_bytes=comparison["dct"]["capacity_bytes"], capacity_kb=round(comparison["dct"]["capacity_kb"], 1), available=comparison["dct"]["available"], output_format="PNG/JPEG (grayscale or color)", ), } return response except HTTPException: raise except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # ROUTES - AUDIO STEGANOGRAPHY (v4.3.0) # ============================================================================ def _require_audio(): """Check that audio support is available, raise 501 if not.""" if not HAS_AUDIO_SUPPORT: raise HTTPException( 501, "Audio steganography not available. Install with: pip install stegasoo[audio]" ) @app.post("/audio/encode", response_model=AudioEncodeResponse) async def api_audio_encode(request: AudioEncodeRequest, _: str = Depends(require_api_key)): """ Encode a text message into audio. Audio must be base64-encoded. Returns base64-encoded stego WAV. v4.3.0: New endpoint for audio steganography. """ _require_audio() resolved_channel_key = _resolve_channel_key(request.channel_key) try: ref_photo = base64.b64decode(request.reference_photo_base64) carrier = base64.b64decode(request.carrier_audio_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None stego_audio, stats = await run_in_thread( encode_audio, message=request.message, reference_photo=ref_photo, carrier_audio=carrier, passphrase=request.passphrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, embed_mode=request.embed_mode, channel_key=resolved_channel_key, chip_tier=request.chip_tier, ) stego_b64 = base64.b64encode(stego_audio).decode("utf-8") channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) return AudioEncodeResponse( stego_audio_base64=stego_b64, embed_mode=stats.embed_mode, stats={ "samples_modified": stats.samples_modified, "total_samples": stats.total_samples, "capacity_used": round(stats.capacity_used * 100, 1), "bytes_embedded": stats.bytes_embedded, "sample_rate": stats.sample_rate, "channels": stats.channels, "duration_seconds": round(stats.duration_seconds, 2), }, channel_mode=channel_mode, channel_fingerprint=channel_fingerprint, ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/audio/encode/file", response_model=AudioEncodeResponse) async def api_audio_encode_file(request: AudioEncodeFileRequest, _: str = Depends(require_api_key)): """ Encode a file into audio (JSON with base64). v4.3.0: New endpoint for audio steganography. """ _require_audio() resolved_channel_key = _resolve_channel_key(request.channel_key) try: file_data = base64.b64decode(request.file_data_base64) ref_photo = base64.b64decode(request.reference_photo_base64) carrier = base64.b64decode(request.carrier_audio_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None payload = FilePayload( data=file_data, filename=request.filename, mime_type=request.mime_type ) stego_audio, stats = await run_in_thread( encode_audio, message=payload, reference_photo=ref_photo, carrier_audio=carrier, passphrase=request.passphrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, embed_mode=request.embed_mode, channel_key=resolved_channel_key, chip_tier=request.chip_tier, ) stego_b64 = base64.b64encode(stego_audio).decode("utf-8") channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) return AudioEncodeResponse( stego_audio_base64=stego_b64, embed_mode=stats.embed_mode, stats={ "samples_modified": stats.samples_modified, "total_samples": stats.total_samples, "capacity_used": round(stats.capacity_used * 100, 1), "bytes_embedded": stats.bytes_embedded, "sample_rate": stats.sample_rate, "channels": stats.channels, "duration_seconds": round(stats.duration_seconds, 2), }, channel_mode=channel_mode, channel_fingerprint=channel_fingerprint, ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/audio/encode/multipart") async def api_audio_encode_multipart( _: str = Depends(require_api_key), passphrase: str = Form(..., description="Passphrase for key derivation"), reference_photo: UploadFile = File(...), carrier: UploadFile = File(...), message: str = Form(""), payload_file: UploadFile | None = File(None), pin: str = Form(""), rsa_key: UploadFile | None = File(None), rsa_password: str = Form(""), channel_key: str = Form( "auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit" ), embed_mode: str = Form("audio_lsb"), chip_tier: int | None = Form( None, description="Spread spectrum chip tier: 0=lossless, 1=high_lossy, 2=low_lossy. Only for audio_spread.", ), ): """ Encode audio using multipart form data (file uploads). Provide either 'message' (text) or 'payload_file' (binary file). Returns the stego WAV directly with metadata headers. v4.3.0: New endpoint for audio steganography. """ _require_audio() if embed_mode not in ("audio_lsb", "audio_spread"): raise HTTPException(400, "embed_mode must be 'audio_lsb' or 'audio_spread'") # Resolve channel key if channel_key.lower() == "auto": resolved_channel_key = None elif channel_key.lower() == "none": resolved_channel_key = "" else: resolved_channel_key = _resolve_channel_key(channel_key) try: ref_data = await reference_photo.read() carrier_data = await carrier.read() rsa_key_data = None if rsa_key and rsa_key.filename: rsa_key_data = await rsa_key.read() effective_password = rsa_password if rsa_password else None # Determine payload if payload_file and payload_file.filename: file_data = await payload_file.read() payload = FilePayload( data=file_data, filename=payload_file.filename, mime_type=payload_file.content_type ) elif message: payload = message else: raise HTTPException(400, "Must provide either 'message' or 'payload_file'") stego_audio, stats = await run_in_thread( encode_audio, message=payload, reference_photo=ref_data, carrier_audio=carrier_data, passphrase=passphrase, pin=pin, rsa_key_data=rsa_key_data, rsa_password=effective_password, embed_mode=embed_mode, channel_key=resolved_channel_key, chip_tier=chip_tier, ) channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) headers = { "Content-Disposition": "attachment; filename=stego_audio.wav", "X-Stegasoo-Embed-Mode": stats.embed_mode, "X-Stegasoo-Capacity-Percent": f"{stats.capacity_used * 100:.1f}", "X-Stegasoo-Samples-Modified": str(stats.samples_modified), "X-Stegasoo-Duration": f"{stats.duration_seconds:.2f}", "X-Stegasoo-Channel-Mode": channel_mode, "X-Stegasoo-Version": __version__, } if channel_fingerprint: headers["X-Stegasoo-Channel-Fingerprint"] = channel_fingerprint return Response( content=stego_audio, media_type="audio/wav", headers=headers, ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except HTTPException: raise except Exception as e: raise HTTPException(500, str(e)) @app.post("/audio/decode", response_model=DecodeResponse) async def api_audio_decode(request: AudioDecodeRequest, _: str = Depends(require_api_key)): """ Decode a message or file from stego audio. Returns payload_type to indicate if result is text or file. v4.3.0: New endpoint for audio steganography. """ _require_audio() resolved_channel_key = _resolve_channel_key(request.channel_key) try: stego = base64.b64decode(request.stego_audio_base64) ref_photo = base64.b64decode(request.reference_photo_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None result = await run_in_thread( decode_audio, stego_audio=stego, reference_photo=ref_photo, passphrase=request.passphrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, embed_mode=request.embed_mode, channel_key=resolved_channel_key, ) if result.is_file: return DecodeResponse( payload_type="file", file_data_base64=base64.b64encode(result.file_data).decode("utf-8"), filename=result.filename, mime_type=result.mime_type, ) else: return DecodeResponse(payload_type="text", message=result.message) except DecryptionError as e: error_msg = str(e) if "channel key" in error_msg.lower(): raise HTTPException(401, error_msg) raise HTTPException(401, "Decryption failed. Check credentials.") except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/audio/decode/multipart", response_model=DecodeResponse) async def api_audio_decode_multipart( _: str = Depends(require_api_key), passphrase: str = Form(..., description="Passphrase for key derivation"), reference_photo: UploadFile = File(...), stego_audio: UploadFile = File(...), pin: str = Form(""), rsa_key: UploadFile | None = File(None), rsa_password: str = Form(""), channel_key: str = Form( "auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit" ), embed_mode: str = Form("audio_auto"), ): """ Decode audio using multipart form data (file uploads). Returns JSON with payload_type indicating text or file. v4.3.0: New endpoint for audio steganography. """ _require_audio() if embed_mode not in ("audio_auto", "audio_lsb", "audio_spread"): raise HTTPException(400, "embed_mode must be 'audio_auto', 'audio_lsb', or 'audio_spread'") # Resolve channel key if channel_key.lower() == "auto": resolved_channel_key = None elif channel_key.lower() == "none": resolved_channel_key = "" else: resolved_channel_key = _resolve_channel_key(channel_key) try: ref_data = await reference_photo.read() stego_data = await stego_audio.read() rsa_key_data = None if rsa_key and rsa_key.filename: rsa_key_data = await rsa_key.read() effective_password = rsa_password if rsa_password else None result = await run_in_thread( decode_audio, stego_audio=stego_data, reference_photo=ref_data, passphrase=passphrase, pin=pin, rsa_key_data=rsa_key_data, rsa_password=effective_password, embed_mode=embed_mode, channel_key=resolved_channel_key, ) if result.is_file: return DecodeResponse( payload_type="file", file_data_base64=base64.b64encode(result.file_data).decode("utf-8"), filename=result.filename, mime_type=result.mime_type, ) else: return DecodeResponse(payload_type="text", message=result.message) except DecryptionError as e: error_msg = str(e) if "channel key" in error_msg.lower(): raise HTTPException(401, error_msg) raise HTTPException(401, "Decryption failed. Check credentials.") except StegasooError as e: raise HTTPException(400, str(e)) except HTTPException: raise except Exception as e: raise HTTPException(500, str(e)) @app.post("/audio/info", response_model=AudioInfoResponse) async def api_audio_info( _: str = Depends(require_api_key), audio: UploadFile = File(...), ): """ Get audio file metadata and embedding capacity. v4.3.0: New endpoint for audio steganography. """ _require_audio() try: audio_data = await audio.read() info = await run_in_thread(get_audio_info, audio_data) # Calculate capacities for both modes lsb_capacity = await run_in_thread(calculate_audio_lsb_capacity, audio_data) try: spread_info = await run_in_thread(calculate_audio_spread_capacity, audio_data) spread_capacity = spread_info.usable_capacity_bytes except Exception: spread_capacity = 0 return AudioInfoResponse( sample_rate=info.sample_rate, channels=info.channels, duration_seconds=round(info.duration_seconds, 2), num_samples=info.num_samples, format=info.format, bit_depth=info.bit_depth, bitrate=info.bitrate, capacity_lsb=lsb_capacity, capacity_spread=spread_capacity, ) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/audio/capacity", response_model=AudioCapacityResponse) async def api_audio_capacity(request: AudioCapacityRequest, _: str = Depends(require_api_key)): """ Check if a payload of a given size will fit in an audio carrier. v4.3.0: New endpoint for audio steganography. """ _require_audio() try: carrier = base64.b64decode(request.carrier_audio_base64) if request.embed_mode == "audio_lsb": capacity = await run_in_thread(calculate_audio_lsb_capacity, carrier) else: spread_info = await run_in_thread(calculate_audio_spread_capacity, carrier) capacity = spread_info.usable_capacity_bytes fits = request.payload_size <= capacity usage = (request.payload_size / capacity * 100) if capacity > 0 else 100.0 return AudioCapacityResponse( fits=fits, payload_size=request.payload_size, capacity_bytes=capacity, usage_percent=round(usage, 1), embed_mode=request.embed_mode, ) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # ERROR HANDLERS # ============================================================================ @app.exception_handler(StegasooError) async def stegasoo_error_handler(request, exc): return JSONResponse(status_code=400, content={"error": type(exc).__name__, "detail": str(exc)}) # ============================================================================ # MAIN # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)