Files
stegasoo/frontends/api/main.py
2025-12-28 00:07:04 -05:00

383 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Stegasoo REST API
FastAPI-based REST API for steganography operations.
Designed for integration with other services and automation.
"""
import io
import sys
import base64
from pathlib import Path
from typing import Optional
from datetime import date
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.responses import Response, JSONResponse
from pydantic import BaseModel, Field
# Add parent to path for development
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'src'))
import stegasoo
from stegasoo import (
encode, decode, generate_credentials,
validate_image, calculate_capacity,
get_day_from_date,
DAY_NAMES, __version__,
StegasooError, DecryptionError, CapacityError,
has_argon2,
)
from stegasoo.constants import (
MIN_PIN_LENGTH, MAX_PIN_LENGTH,
MIN_PHRASE_WORDS, MAX_PHRASE_WORDS,
VALID_RSA_SIZES,
)
# ============================================================================
# FASTAPI APP
# ============================================================================
app = FastAPI(
title="Stegasoo API",
description="Secure steganography with hybrid authentication",
version=__version__,
docs_url="/docs",
redoc_url="/redoc",
)
# ============================================================================
# MODELS
# ============================================================================
class GenerateRequest(BaseModel):
use_pin: bool = True
use_rsa: bool = False
pin_length: int = Field(default=6, ge=MIN_PIN_LENGTH, le=MAX_PIN_LENGTH)
rsa_bits: int = Field(default=2048)
words_per_phrase: int = Field(default=3, ge=MIN_PHRASE_WORDS, le=MAX_PHRASE_WORDS)
class GenerateResponse(BaseModel):
phrases: dict[str, str]
pin: Optional[str] = None
rsa_key_pem: Optional[str] = None
entropy: dict[str, int]
class EncodeRequest(BaseModel):
message: str
reference_photo_base64: str
carrier_image_base64: str
day_phrase: str
pin: str = ""
rsa_key_base64: Optional[str] = None
rsa_password: Optional[str] = None
date_str: Optional[str] = None
class EncodeResponse(BaseModel):
stego_image_base64: str
filename: str
capacity_used_percent: float
date_used: str
day_of_week: str
class DecodeRequest(BaseModel):
stego_image_base64: str
reference_photo_base64: str
day_phrase: str
pin: str = ""
rsa_key_base64: Optional[str] = None
rsa_password: Optional[str] = None
class DecodeResponse(BaseModel):
message: str
class ImageInfoResponse(BaseModel):
width: int
height: int
pixels: int
capacity_bytes: int
capacity_kb: int
class StatusResponse(BaseModel):
version: str
has_argon2: bool
day_names: list[str]
class ErrorResponse(BaseModel):
error: str
detail: Optional[str] = None
# ============================================================================
# ROUTES
# ============================================================================
@app.get("/", response_model=StatusResponse)
async def root():
"""Get API status and configuration."""
return StatusResponse(
version=__version__,
has_argon2=has_argon2(),
day_names=list(DAY_NAMES)
)
@app.post("/generate", response_model=GenerateResponse)
async def api_generate(request: GenerateRequest):
"""
Generate credentials for encoding/decoding.
At least one of use_pin or use_rsa must be True.
"""
if not request.use_pin and not request.use_rsa:
raise HTTPException(400, "Must enable at least one of use_pin or use_rsa")
if request.rsa_bits not in VALID_RSA_SIZES:
raise HTTPException(400, f"rsa_bits must be one of {VALID_RSA_SIZES}")
try:
creds = generate_credentials(
use_pin=request.use_pin,
use_rsa=request.use_rsa,
pin_length=request.pin_length,
rsa_bits=request.rsa_bits,
words_per_phrase=request.words_per_phrase
)
return GenerateResponse(
phrases=creds.phrases,
pin=creds.pin,
rsa_key_pem=creds.rsa_key_pem,
entropy={
"phrase": creds.phrase_entropy,
"pin": creds.pin_entropy,
"rsa": creds.rsa_entropy,
"total": creds.total_entropy
}
)
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/encode", response_model=EncodeResponse)
async def api_encode(request: EncodeRequest):
"""
Encode a secret message into an image.
Images must be base64-encoded. Returns base64-encoded stego image.
"""
try:
ref_photo = base64.b64decode(request.reference_photo_base64)
carrier = base64.b64decode(request.carrier_image_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
result = encode(
message=request.message,
reference_photo=ref_photo,
carrier_image=carrier,
day_phrase=request.day_phrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password,
date_str=request.date_str
)
stego_b64 = base64.b64encode(result.stego_image).decode('utf-8')
# Get day of week from the date used
day_of_week = get_day_from_date(result.date_used)
return EncodeResponse(
stego_image_base64=stego_b64,
filename=result.filename,
capacity_used_percent=result.capacity_percent,
date_used=result.date_used,
day_of_week=day_of_week
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/decode", response_model=DecodeResponse)
async def api_decode(request: DecodeRequest):
"""
Decode a secret message from a stego image.
Images must be base64-encoded.
"""
try:
stego = base64.b64decode(request.stego_image_base64)
ref_photo = base64.b64decode(request.reference_photo_base64)
rsa_key = base64.b64decode(request.rsa_key_base64) if request.rsa_key_base64 else None
message = decode(
stego_image=stego,
reference_photo=ref_photo,
day_phrase=request.day_phrase,
pin=request.pin,
rsa_key_data=rsa_key,
rsa_password=request.rsa_password
)
return DecodeResponse(message=message)
except DecryptionError as e:
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/encode/multipart")
async def api_encode_multipart(
message: str = Form(...),
day_phrase: str = Form(...),
reference_photo: UploadFile = File(...),
carrier: UploadFile = File(...),
pin: str = Form(""),
rsa_key: Optional[UploadFile] = File(None),
rsa_password: str = Form(""),
date_str: str = Form("")
):
"""
Encode using multipart form data (file uploads).
Returns the stego image directly as PNG with metadata headers.
"""
try:
ref_data = await reference_photo.read()
carrier_data = await carrier.read()
rsa_key_data = await rsa_key.read() if rsa_key else None
result = encode(
message=message,
reference_photo=ref_data,
carrier_image=carrier_data,
day_phrase=day_phrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=rsa_password if rsa_password else None,
date_str=date_str if date_str else None
)
# Get day of week from the date used
day_of_week = get_day_from_date(result.date_used)
return Response(
content=result.stego_image,
media_type="image/png",
headers={
"Content-Disposition": f"attachment; filename={result.filename}",
"X-Stegasoo-Date": result.date_used,
"X-Stegasoo-Day": day_of_week,
"X-Stegasoo-Capacity-Percent": f"{result.capacity_percent:.1f}"
}
)
except CapacityError as e:
raise HTTPException(400, str(e))
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/decode/multipart", response_model=DecodeResponse)
async def api_decode_multipart(
day_phrase: str = Form(...),
reference_photo: UploadFile = File(...),
stego_image: UploadFile = File(...),
pin: str = Form(""),
rsa_key: Optional[UploadFile] = File(None),
rsa_password: str = Form("")
):
"""
Decode using multipart form data (file uploads).
"""
try:
ref_data = await reference_photo.read()
stego_data = await stego_image.read()
rsa_key_data = await rsa_key.read() if rsa_key else None
message = decode(
stego_image=stego_data,
reference_photo=ref_data,
day_phrase=day_phrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=rsa_password if rsa_password else None
)
return DecodeResponse(message=message)
except DecryptionError:
raise HTTPException(401, "Decryption failed. Check credentials.")
except StegasooError as e:
raise HTTPException(400, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/image/info", response_model=ImageInfoResponse)
async def api_image_info(image: UploadFile = File(...)):
"""Get information about an image's capacity."""
try:
image_data = await image.read()
result = validate_image(image_data, check_size=False)
if not result.is_valid:
raise HTTPException(400, result.error_message)
capacity = calculate_capacity(image_data)
return ImageInfoResponse(
width=result.details['width'],
height=result.details['height'],
pixels=result.details['pixels'],
capacity_bytes=capacity,
capacity_kb=capacity // 1024
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ============================================================================
# ERROR HANDLERS
# ============================================================================
@app.exception_handler(StegasooError)
async def stegasoo_error_handler(request, exc):
return JSONResponse(
status_code=400,
content={"error": type(exc).__name__, "detail": str(exc)}
)
# ============================================================================
# MAIN
# ============================================================================
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)