#!/usr/bin/env python3 """ Stegasoo REST API FastAPI-based REST API for steganography operations. Supports both text messages and file embedding. """ import io import sys import base64 from pathlib import Path from typing import Optional from datetime import date from fastapi import FastAPI, HTTPException, UploadFile, File, Form from fastapi.responses import Response, JSONResponse from pydantic import BaseModel, Field # Add parent to path for development sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'src')) import stegasoo from stegasoo import ( encode, decode, generate_credentials, validate_image, calculate_capacity, get_day_from_date, DAY_NAMES, __version__, StegasooError, DecryptionError, CapacityError, has_argon2, FilePayload, MAX_FILE_PAYLOAD_SIZE, ) from stegasoo.constants import ( MIN_PIN_LENGTH, MAX_PIN_LENGTH, MIN_PHRASE_WORDS, MAX_PHRASE_WORDS, VALID_RSA_SIZES, ) # QR Code utilities try: from stegasoo.qr_utils import ( extract_key_from_qr, has_qr_read, ) HAS_QR_READ = has_qr_read() except ImportError: HAS_QR_READ = False extract_key_from_qr = None # ============================================================================ # FASTAPI APP # ============================================================================ app = FastAPI( title="Stegasoo API", description="Secure steganography with hybrid authentication. Supports text messages and file embedding.", version=__version__, docs_url="/docs", redoc_url="/redoc", ) # ============================================================================ # MODELS # ============================================================================ class GenerateRequest(BaseModel): use_pin: bool = True use_rsa: bool = False pin_length: int = Field(default=6, ge=MIN_PIN_LENGTH, le=MAX_PIN_LENGTH) rsa_bits: int = Field(default=2048) words_per_phrase: int = Field(default=3, ge=MIN_PHRASE_WORDS, le=MAX_PHRASE_WORDS) class GenerateResponse(BaseModel): phrases: dict[str, str] pin: Optional[str] = None rsa_key_pem: Optional[str] = None entropy: dict[str, int] class EncodeRequest(BaseModel): message: str reference_photo_base64: str carrier_image_base64: str day_phrase: str pin: str = "" rsa_key_base64: Optional[str] = None rsa_password: Optional[str] = None date_str: Optional[str] = None class EncodeFileRequest(BaseModel): """Request for embedding a file (base64-encoded).""" file_data_base64: str filename: str mime_type: Optional[str] = None reference_photo_base64: str carrier_image_base64: str day_phrase: str pin: str = "" rsa_key_base64: Optional[str] = None rsa_password: Optional[str] = None date_str: Optional[str] = None class EncodeResponse(BaseModel): stego_image_base64: str filename: str capacity_used_percent: float date_used: str day_of_week: str class DecodeRequest(BaseModel): stego_image_base64: str reference_photo_base64: str day_phrase: str pin: str = "" rsa_key_base64: Optional[str] = None rsa_password: Optional[str] = None class DecodeResponse(BaseModel): """Response for decode - can be text or file.""" payload_type: str # 'text' or 'file' message: Optional[str] = None # For text file_data_base64: Optional[str] = None # For file (base64-encoded) filename: Optional[str] = None # For file mime_type: Optional[str] = None # For file class ImageInfoResponse(BaseModel): width: int height: int pixels: int capacity_bytes: int capacity_kb: int class StatusResponse(BaseModel): version: str has_argon2: bool has_qrcode_read: bool day_names: list[str] max_payload_kb: int class QrExtractResponse(BaseModel): success: bool key_pem: Optional[str] = None error: Optional[str] = None class ErrorResponse(BaseModel): error: str detail: Optional[str] = None # ============================================================================ # ROUTES # ============================================================================ @app.get("/", response_model=StatusResponse) async def root(): """Get API status and configuration.""" return StatusResponse( version=__version__, has_argon2=has_argon2(), has_qrcode_read=HAS_QR_READ, day_names=list(DAY_NAMES), max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024 ) @app.post("/extract-key-from-qr", response_model=QrExtractResponse) async def api_extract_key_from_qr( qr_image: UploadFile = File(..., description="QR code image containing RSA key") ): """ Extract RSA key from a QR code image. Supports both compressed (STEGASOO-Z: prefix) and uncompressed keys. Returns the PEM-encoded key if found. """ if not HAS_QR_READ: raise HTTPException( 501, "QR code reading not available. Install pyzbar and libzbar." ) try: image_data = await qr_image.read() key_pem = extract_key_from_qr(image_data) if key_pem: return QrExtractResponse(success=True, key_pem=key_pem) else: return QrExtractResponse( success=False, error="No valid RSA key found in QR code" ) except Exception as e: return QrExtractResponse(success=False, error=str(e)) @app.post("/generate", response_model=GenerateResponse) async def api_generate(request: GenerateRequest): """ Generate credentials for encoding/decoding. At least one of use_pin or use_rsa must be True. """ if not request.use_pin and not request.use_rsa: raise HTTPException(400, "Must enable at least one of use_pin or use_rsa") if request.rsa_bits not in VALID_RSA_SIZES: raise HTTPException(400, f"rsa_bits must be one of {VALID_RSA_SIZES}") try: creds = generate_credentials( use_pin=request.use_pin, use_rsa=request.use_rsa, pin_length=request.pin_length, rsa_bits=request.rsa_bits, words_per_phrase=request.words_per_phrase ) return GenerateResponse( phrases=creds.phrases, pin=creds.pin, rsa_key_pem=creds.rsa_key_pem, entropy={ "phrase": creds.phrase_entropy, "pin": creds.pin_entropy, "rsa": creds.rsa_entropy, "total": creds.total_entropy } ) except Exception as e: raise HTTPException(500, str(e)) @app.post("/encode", response_model=EncodeResponse) async def api_encode(request: EncodeRequest): """ Encode a text message into an image. Images must be base64-encoded. Returns base64-encoded stego image. """ try: ref_photo = base64.b64decode(request.reference_photo_base64) carrier = base64.b64decode(request.carrier_image_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None result = encode( message=request.message, reference_photo=ref_photo, carrier_image=carrier, day_phrase=request.day_phrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, date_str=request.date_str ) stego_b64 = base64.b64encode(result.stego_image).decode('utf-8') day_of_week = get_day_from_date(result.date_used) return EncodeResponse( stego_image_base64=stego_b64, filename=result.filename, capacity_used_percent=result.capacity_percent, date_used=result.date_used, day_of_week=day_of_week ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/encode/file", response_model=EncodeResponse) async def api_encode_file(request: EncodeFileRequest): """ Encode a file into an image (JSON with base64). File data must be base64-encoded. """ try: file_data = base64.b64decode(request.file_data_base64) ref_photo = base64.b64decode(request.reference_photo_base64) carrier = base64.b64decode(request.carrier_image_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None payload = FilePayload( data=file_data, filename=request.filename, mime_type=request.mime_type ) result = encode( message=payload, reference_photo=ref_photo, carrier_image=carrier, day_phrase=request.day_phrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, date_str=request.date_str ) stego_b64 = base64.b64encode(result.stego_image).decode('utf-8') day_of_week = get_day_from_date(result.date_used) return EncodeResponse( stego_image_base64=stego_b64, filename=result.filename, capacity_used_percent=result.capacity_percent, date_used=result.date_used, day_of_week=day_of_week ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/decode", response_model=DecodeResponse) async def api_decode(request: DecodeRequest): """ Decode a message or file from a stego image. Returns payload_type to indicate if result is text or file. """ try: stego = base64.b64decode(request.stego_image_base64) ref_photo = base64.b64decode(request.reference_photo_base64) rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None result = decode( stego_image=stego, reference_photo=ref_photo, day_phrase=request.day_phrase, pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password ) if result.is_file: return DecodeResponse( payload_type='file', file_data_base64=base64.b64encode(result.file_data).decode('utf-8'), filename=result.filename, mime_type=result.mime_type ) else: return DecodeResponse( payload_type='text', message=result.message ) except DecryptionError as e: raise HTTPException(401, "Decryption failed. Check credentials.") except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/encode/multipart") async def api_encode_multipart( day_phrase: str = Form(...), reference_photo: UploadFile = File(...), carrier: UploadFile = File(...), message: str = Form(""), payload_file: Optional[UploadFile] = File(None), pin: str = Form(""), rsa_key: Optional[UploadFile] = File(None), rsa_key_qr: Optional[UploadFile] = File(None), rsa_password: str = Form(""), date_str: str = Form("") ): """ Encode using multipart form data (file uploads). Provide either 'message' (text) or 'payload_file' (binary file). RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image). Returns the stego image directly as PNG with metadata headers. """ try: ref_data = await reference_photo.read() carrier_data = await carrier.read() # Handle RSA key from .pem file or QR code image rsa_key_data = None rsa_key_from_qr = False if rsa_key and rsa_key.filename: rsa_key_data = await rsa_key.read() elif rsa_key_qr and rsa_key_qr.filename: if not HAS_QR_READ: raise HTTPException( 501, "QR code reading not available. Install pyzbar and libzbar." ) qr_image_data = await rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if not key_pem: raise HTTPException(400, "Could not extract RSA key from QR code image") rsa_key_data = key_pem.encode('utf-8') rsa_key_from_qr = True # QR code keys are never password-protected effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) # Determine payload if payload_file and payload_file.filename: file_data = await payload_file.read() payload = FilePayload( data=file_data, filename=payload_file.filename, mime_type=payload_file.content_type ) elif message: payload = message else: raise HTTPException(400, "Must provide either 'message' or 'payload_file'") result = encode( message=payload, reference_photo=ref_data, carrier_image=carrier_data, day_phrase=day_phrase, pin=pin, rsa_key_data=rsa_key_data, rsa_password=effective_password, date_str=date_str if date_str else None ) day_of_week = get_day_from_date(result.date_used) return Response( content=result.stego_image, media_type="image/png", headers={ "Content-Disposition": f"attachment; filename={result.filename}", "X-Stegasoo-Date": result.date_used, "X-Stegasoo-Day": day_of_week, "X-Stegasoo-Capacity-Percent": f"{result.capacity_percent:.1f}" } ) except CapacityError as e: raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/decode/multipart", response_model=DecodeResponse) async def api_decode_multipart( day_phrase: str = Form(...), reference_photo: UploadFile = File(...), stego_image: UploadFile = File(...), pin: str = Form(""), rsa_key: Optional[UploadFile] = File(None), rsa_key_qr: Optional[UploadFile] = File(None), rsa_password: str = Form("") ): """ Decode using multipart form data (file uploads). RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image). Returns JSON with payload_type indicating text or file. """ try: ref_data = await reference_photo.read() stego_data = await stego_image.read() # Handle RSA key from .pem file or QR code image rsa_key_data = None rsa_key_from_qr = False if rsa_key and rsa_key.filename: rsa_key_data = await rsa_key.read() elif rsa_key_qr and rsa_key_qr.filename: if not HAS_QR_READ: raise HTTPException( 501, "QR code reading not available. Install pyzbar and libzbar." ) qr_image_data = await rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if not key_pem: raise HTTPException(400, "Could not extract RSA key from QR code image") rsa_key_data = key_pem.encode('utf-8') rsa_key_from_qr = True # QR code keys are never password-protected effective_password = None if rsa_key_from_qr else (rsa_password if rsa_password else None) result = decode( stego_image=stego_data, reference_photo=ref_data, day_phrase=day_phrase, pin=pin, rsa_key_data=rsa_key_data, rsa_password=effective_password ) if result.is_file: return DecodeResponse( payload_type='file', file_data_base64=base64.b64encode(result.file_data).decode('utf-8'), filename=result.filename, mime_type=result.mime_type ) else: return DecodeResponse( payload_type='text', message=result.message ) except DecryptionError: raise HTTPException(401, "Decryption failed. Check credentials.") except StegasooError as e: raise HTTPException(400, str(e)) except Exception as e: raise HTTPException(500, str(e)) @app.post("/image/info", response_model=ImageInfoResponse) async def api_image_info(image: UploadFile = File(...)): """Get information about an image's capacity.""" try: image_data = await image.read() result = validate_image(image_data, check_size=False) if not result.is_valid: raise HTTPException(400, result.error_message) capacity = calculate_capacity(image_data) return ImageInfoResponse( width=result.details['width'], height=result.details['height'], pixels=result.details['pixels'], capacity_bytes=capacity, capacity_kb=capacity // 1024 ) except HTTPException: raise except Exception as e: raise HTTPException(500, str(e)) # ============================================================================ # ERROR HANDLERS # ============================================================================ @app.exception_handler(StegasooError) async def stegasoo_error_handler(request, exc): return JSONResponse( status_code=400, content={"error": type(exc).__name__, "detail": str(exc)} ) # ============================================================================ # MAIN # ============================================================================ if __name__ == '__main__': import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)