Add API key authentication and TLS support

API Authentication (v4.2.1):
- API key auth via X-API-Key header
- Keys hashed (SHA-256) and stored in ~/.stegasoo/api_keys.json
- Auth disabled when no keys configured
- Protected endpoints: encode, decode, generate, channel/*, compare, etc.
- Public endpoints: /, /docs, /modes, /auth/status, /channel/status

TLS Support:
- Auto-generates self-signed certs on first run
- Certs include localhost, local IPs, hostname.local
- Stored in ~/.stegasoo/certs/

CLI Commands:
- stegasoo api keys list/create/delete
- stegasoo api tls generate/info
- stegasoo api serve (starts with TLS by default)

Updated systemd service to use TLS.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-01-11 18:03:51 -05:00
parent 3b5ab41ce9
commit 34ede3815f
5 changed files with 699 additions and 15 deletions

256
frontends/api/auth.py Normal file
View File

@@ -0,0 +1,256 @@
"""
API Key Authentication for Stegasoo REST API.
Provides simple API key authentication with hashed key storage.
Keys can be stored in user config (~/.stegasoo/) or project config (./config/).
Usage:
from .auth import require_api_key, get_api_key_status
@app.get("/protected")
async def protected_endpoint(api_key: str = Depends(require_api_key)):
return {"status": "authenticated"}
"""
import hashlib
import json
import os
import secrets
from pathlib import Path
from typing import Optional
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
# API key header name
API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)
# Config locations
USER_CONFIG_DIR = Path.home() / ".stegasoo"
PROJECT_CONFIG_DIR = Path("./config")
# Key file name
API_KEYS_FILE = "api_keys.json"
# Environment variable for API key (alternative to file)
API_KEY_ENV_VAR = "STEGASOO_API_KEY"
def _hash_key(key: str) -> str:
"""Hash an API key for storage."""
return hashlib.sha256(key.encode()).hexdigest()
def _get_keys_file(location: str = "user") -> Path:
"""Get path to API keys file."""
if location == "project":
return PROJECT_CONFIG_DIR / API_KEYS_FILE
return USER_CONFIG_DIR / API_KEYS_FILE
def _load_keys(location: str = "user") -> dict:
"""Load API keys from config file."""
keys_file = _get_keys_file(location)
if keys_file.exists():
try:
with open(keys_file) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {"keys": [], "enabled": True}
return {"keys": [], "enabled": True}
def _save_keys(data: dict, location: str = "user") -> None:
"""Save API keys to config file."""
keys_file = _get_keys_file(location)
keys_file.parent.mkdir(parents=True, exist_ok=True)
with open(keys_file, "w") as f:
json.dump(data, f, indent=2)
# Secure permissions (owner read/write only)
os.chmod(keys_file, 0o600)
def generate_api_key() -> str:
"""Generate a new API key."""
# Format: stegasoo_XXXX_XXXXXXXXXXXXXXXXXXXXXXXXXXXX
# 32 bytes = 256 bits of entropy
random_part = secrets.token_hex(16)
return f"stegasoo_{random_part[:4]}_{random_part[4:]}"
def add_api_key(name: str, location: str = "user") -> str:
"""
Generate and store a new API key.
Args:
name: Descriptive name for the key (e.g., "laptop", "automation")
location: "user" or "project"
Returns:
The generated API key (only shown once!)
"""
key = generate_api_key()
key_hash = _hash_key(key)
data = _load_keys(location)
# Check for duplicate name
for existing in data["keys"]:
if existing["name"] == name:
raise ValueError(f"Key with name '{name}' already exists")
data["keys"].append({
"name": name,
"hash": key_hash,
"created": __import__("datetime").datetime.now().isoformat(),
})
_save_keys(data, location)
return key
def remove_api_key(name: str, location: str = "user") -> bool:
"""
Remove an API key by name.
Returns:
True if key was found and removed, False otherwise
"""
data = _load_keys(location)
original_count = len(data["keys"])
data["keys"] = [k for k in data["keys"] if k["name"] != name]
if len(data["keys"]) < original_count:
_save_keys(data, location)
return True
return False
def list_api_keys(location: str = "user") -> list[dict]:
"""
List all API keys (names and creation dates, not actual keys).
"""
data = _load_keys(location)
return [{"name": k["name"], "created": k.get("created", "unknown")} for k in data["keys"]]
def set_auth_enabled(enabled: bool, location: str = "user") -> None:
"""Enable or disable API key authentication."""
data = _load_keys(location)
data["enabled"] = enabled
_save_keys(data, location)
def is_auth_enabled() -> bool:
"""Check if API key authentication is enabled."""
# Check project config first, then user config
for location in ["project", "user"]:
data = _load_keys(location)
if "enabled" in data:
return data["enabled"]
# Default: enabled if any keys exist
return bool(get_all_key_hashes())
def get_all_key_hashes() -> set[str]:
"""Get all valid API key hashes from all sources."""
hashes = set()
# Check environment variable first
env_key = os.environ.get(API_KEY_ENV_VAR)
if env_key:
hashes.add(_hash_key(env_key))
# Check project and user configs
for location in ["project", "user"]:
data = _load_keys(location)
for key_entry in data.get("keys", []):
if "hash" in key_entry:
hashes.add(key_entry["hash"])
return hashes
def validate_api_key(key: str) -> bool:
"""Validate an API key against stored hashes."""
if not key:
return False
key_hash = _hash_key(key)
valid_hashes = get_all_key_hashes()
return key_hash in valid_hashes
def get_api_key_status() -> dict:
"""Get current API key authentication status."""
user_keys = list_api_keys("user")
project_keys = list_api_keys("project")
env_configured = bool(os.environ.get(API_KEY_ENV_VAR))
total_keys = len(user_keys) + len(project_keys) + (1 if env_configured else 0)
return {
"enabled": is_auth_enabled(),
"total_keys": total_keys,
"user_keys": len(user_keys),
"project_keys": len(project_keys),
"env_configured": env_configured,
"keys": {
"user": user_keys,
"project": project_keys,
}
}
# FastAPI dependency for API key authentication
async def require_api_key(api_key: Optional[str] = Security(API_KEY_HEADER)) -> str:
"""
FastAPI dependency that requires a valid API key.
Usage:
@app.get("/protected")
async def endpoint(key: str = Depends(require_api_key)):
...
"""
# Check if auth is enabled
if not is_auth_enabled():
return "auth_disabled"
# No keys configured = auth disabled
if not get_all_key_hashes():
return "no_keys_configured"
# Validate the provided key
if not api_key:
raise HTTPException(
status_code=401,
detail="API key required. Provide X-API-Key header.",
headers={"WWW-Authenticate": "ApiKey"},
)
if not validate_api_key(api_key):
raise HTTPException(
status_code=403,
detail="Invalid API key.",
)
return api_key
async def optional_api_key(api_key: Optional[str] = Security(API_KEY_HEADER)) -> Optional[str]:
"""
FastAPI dependency that optionally validates API key.
Returns the key if valid, None if not provided or invalid.
Doesn't raise exceptions - useful for endpoints that work
with or without auth.
"""
if api_key and validate_api_key(api_key):
return api_key
return None

View File

@@ -32,10 +32,31 @@ from functools import partial
from pathlib import Path
from typing import Literal
from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile
from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, UploadFile
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel, Field
# API Key Authentication
try:
from .auth import (
require_api_key,
get_api_key_status,
add_api_key,
remove_api_key,
list_api_keys,
is_auth_enabled,
)
except ImportError:
# When running directly (not as package)
from auth import (
require_api_key,
get_api_key_status,
add_api_key,
remove_api_key,
list_api_keys,
is_auth_enabled,
)
# Add parent to path for development
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
@@ -357,6 +378,23 @@ class ChannelSetRequest(BaseModel):
location: str = Field(default="user", description="'user' or 'project'")
class AuthStatusResponse(BaseModel):
"""Response for API key authentication status."""
enabled: bool = Field(description="Whether API key auth is enabled")
total_keys: int = Field(description="Total number of configured API keys")
user_keys: int = Field(description="Keys in user config")
project_keys: int = Field(description="Keys in project config")
env_configured: bool = Field(description="Whether env var key is set")
class AuthKeyInfo(BaseModel):
"""Info about a single API key (not the actual key)."""
name: str
created: str
class ModesResponse(BaseModel):
"""Response showing available embedding modes."""
@@ -614,6 +652,7 @@ async def api_channel_status(
@app.post("/channel/generate", response_model=ChannelGenerateResponse)
async def api_channel_generate(
_: str = Depends(require_api_key),
save: bool = Query(False, description="Save to user config"),
save_project: bool = Query(False, description="Save to project config"),
):
@@ -652,7 +691,7 @@ async def api_channel_generate(
@app.post("/channel/set")
async def api_channel_set(request: ChannelSetRequest):
async def api_channel_set(request: ChannelSetRequest, _: str = Depends(require_api_key)):
"""
Set/save a channel key to config.
@@ -678,6 +717,7 @@ async def api_channel_set(request: ChannelSetRequest):
@app.delete("/channel")
async def api_channel_clear(
_: str = Depends(require_api_key),
location: str = Query("user", description="'user', 'project', or 'all'")
):
"""
@@ -704,8 +744,98 @@ async def api_channel_clear(
}
# ============================================================================
# ROUTES - AUTHENTICATION (v4.2.1)
# ============================================================================
@app.get("/auth/status", response_model=AuthStatusResponse)
async def api_auth_status():
"""
Get API key authentication status.
v4.2.1: New endpoint for auth status.
Returns whether auth is enabled and key counts.
"""
status = get_api_key_status()
return AuthStatusResponse(
enabled=status["enabled"],
total_keys=status["total_keys"],
user_keys=status["user_keys"],
project_keys=status["project_keys"],
env_configured=status["env_configured"],
)
@app.get("/auth/keys", response_model=list[AuthKeyInfo])
async def api_auth_list_keys(
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
List configured API keys (names only, not actual keys).
v4.2.1: New endpoint for auth management.
Requires authentication.
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
keys = list_api_keys(location)
return [AuthKeyInfo(name=k["name"], created=k["created"]) for k in keys]
@app.post("/auth/keys")
async def api_auth_create_key(
name: str = Query(..., description="Name for the new API key"),
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
Create a new API key.
v4.2.1: New endpoint for auth management.
Returns the key ONCE - it cannot be retrieved again!
Requires authentication (or no keys configured yet).
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
try:
key = add_api_key(name, location)
return {
"success": True,
"name": name,
"key": key,
"warning": "Save this key now! It cannot be retrieved again.",
}
except ValueError as e:
raise HTTPException(400, str(e))
@app.delete("/auth/keys")
async def api_auth_delete_key(
name: str = Query(..., description="Name of key to delete"),
location: str = Query("user", description="'user' or 'project'"),
_: str = Depends(require_api_key),
):
"""
Delete an API key by name.
v4.2.1: New endpoint for auth management.
Requires authentication.
"""
if location not in ("user", "project"):
raise HTTPException(400, "location must be 'user' or 'project'")
if remove_api_key(name, location):
return {"success": True, "deleted": name}
else:
raise HTTPException(404, f"Key '{name}' not found in {location} config")
@app.post("/compare", response_model=CompareModesResponse)
async def api_compare_modes(request: CompareModesRequest):
async def api_compare_modes(request: CompareModesRequest, _: str = Depends(require_api_key)):
"""
Compare LSB and DCT embedding modes for a carrier image.
@@ -763,7 +893,7 @@ async def api_compare_modes(request: CompareModesRequest):
@app.post("/will-fit", response_model=WillFitResponse)
async def api_will_fit(request: WillFitRequest):
async def api_will_fit(request: WillFitRequest, _: str = Depends(require_api_key)):
"""
Check if a payload of given size will fit in the carrier image.
@@ -799,6 +929,7 @@ async def api_will_fit(request: WillFitRequest):
@app.post("/extract-key-from-qr", response_model=QrExtractResponse)
async def api_extract_key_from_qr(
_: str = Depends(require_api_key),
qr_image: UploadFile = File(..., description="QR code image containing RSA key")
):
"""
@@ -823,7 +954,7 @@ async def api_extract_key_from_qr(
@app.post("/generate-key-qr", response_model=QrGenerateResponse)
async def api_generate_key_qr(request: QrGenerateRequest):
async def api_generate_key_qr(request: QrGenerateRequest, _: str = Depends(require_api_key)):
"""
Generate QR code from an RSA private key.
@@ -873,7 +1004,7 @@ async def api_generate_key_qr(request: QrGenerateRequest):
@app.post("/generate", response_model=GenerateResponse)
async def api_generate(request: GenerateRequest):
async def api_generate(request: GenerateRequest, _: str = Depends(require_api_key)):
"""
Generate credentials for encoding/decoding.
@@ -955,7 +1086,7 @@ def _get_output_info(embed_mode: str, dct_output_format: str, dct_color_mode: st
@app.post("/encode", response_model=EncodeResponse)
async def api_encode(request: EncodeRequest):
async def api_encode(request: EncodeRequest, _: str = Depends(require_api_key)):
"""
Encode a text message into an image.
@@ -1027,7 +1158,7 @@ async def api_encode(request: EncodeRequest):
@app.post("/encode/file", response_model=EncodeResponse)
async def api_encode_file(request: EncodeFileRequest):
async def api_encode_file(request: EncodeFileRequest, _: str = Depends(require_api_key)):
"""
Encode a file into an image (JSON with base64).
@@ -1109,7 +1240,7 @@ async def api_encode_file(request: EncodeFileRequest):
@app.post("/decode", response_model=DecodeResponse)
async def api_decode(request: DecodeRequest):
async def api_decode(request: DecodeRequest, _: str = Depends(require_api_key)):
"""
Decode a message or file from a stego image.
@@ -1172,6 +1303,7 @@ async def api_decode(request: DecodeRequest):
@app.post("/encode/multipart")
async def api_encode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"),
reference_photo: UploadFile = File(...),
carrier: UploadFile = File(...),
@@ -1313,6 +1445,7 @@ async def api_encode_multipart(
@app.post("/decode/multipart", response_model=DecodeResponse)
async def api_decode_multipart(
_: str = Depends(require_api_key),
passphrase: str = Form(..., description="Passphrase (v3.2.0: renamed from day_phrase)"),
reference_photo: UploadFile = File(...),
stego_image: UploadFile = File(...),
@@ -1418,6 +1551,7 @@ async def api_decode_multipart(
@app.post("/image/info", response_model=ImageInfoResponse)
async def api_image_info(
_: str = Depends(require_api_key),
image: UploadFile = File(...),
include_modes: bool = Query(True, description="Include capacity by mode (v3.0+)"),
):