From 4eefc946c4d3d0cf3f9eacdf549b08e5cee665d4 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Wed, 31 Dec 2025 13:11:34 -0500 Subject: [PATCH] Version 3.1.0 now with experimental DCT support. --- frontends/api/main.py | 355 +++++++++++- frontends/cli/main.py | 313 ++++++++++- frontends/web/app.py | 170 +++++- frontends/web/templates/decode.html | 113 +++- frontends/web/templates/encode.html | 307 ++++++++++- frontends/web/templates/encode_result.html | 40 +- src/stegasoo/__init__.py | 316 +++++++++-- src/stegasoo/constants.py | 43 +- src/stegasoo/dct_steganography.py | 554 +++++++++++++++++++ src/stegasoo/steganography.py | 608 ++++++++++++++------- 10 files changed, 2520 insertions(+), 299 deletions(-) create mode 100644 src/stegasoo/dct_steganography.py diff --git a/frontends/api/main.py b/frontends/api/main.py index b45fe02..0441d4c 100644 --- a/frontends/api/main.py +++ b/frontends/api/main.py @@ -1,19 +1,20 @@ #!/usr/bin/env python3 """ -Stegasoo REST API +Stegasoo REST API (v3.0) FastAPI-based REST API for steganography operations. Supports both text messages and file embedding. +NEW in v3.0: LSB and DCT embedding modes. """ import io import sys import base64 from pathlib import Path -from typing import Optional +from typing import Optional, Literal from datetime import date -from fastapi import FastAPI, HTTPException, UploadFile, File, Form +from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Query from fastapi.responses import Response, JSONResponse from pydantic import BaseModel, Field @@ -30,6 +31,14 @@ from stegasoo import ( has_argon2, FilePayload, MAX_FILE_PAYLOAD_SIZE, + # NEW in v3.0 - Embedding modes + EMBED_MODE_LSB, + EMBED_MODE_DCT, + EMBED_MODE_AUTO, + has_dct_support, + compare_modes, + will_fit_by_mode, + calculate_capacity_by_mode, ) from stegasoo.constants import ( MIN_PIN_LENGTH, MAX_PIN_LENGTH, @@ -55,13 +64,30 @@ except ImportError: app = FastAPI( title="Stegasoo API", - description="Secure steganography with hybrid authentication. Supports text messages and file embedding.", + description=""" +Secure steganography with hybrid authentication. Supports text messages and file embedding. + +## Embedding Modes (v3.0) + +- **LSB mode** (default): Spatial LSB embedding, full color output, higher capacity +- **DCT mode**: Frequency domain embedding, grayscale output, ~20% capacity, better stealth + +Use the `/modes` endpoint to check availability and `/compare` to compare capacities. +""", version=__version__, docs_url="/docs", redoc_url="/redoc", ) +# ============================================================================ +# TYPE ALIASES +# ============================================================================ + +EmbedModeType = Literal["lsb", "dct"] +ExtractModeType = Literal["auto", "lsb", "dct"] + + # ============================================================================ # MODELS # ============================================================================ @@ -90,6 +116,10 @@ class EncodeRequest(BaseModel): rsa_key_base64: Optional[str] = None rsa_password: Optional[str] = None date_str: Optional[str] = None + embed_mode: EmbedModeType = Field( + default="lsb", + description="Embedding mode: 'lsb' (default, color) or 'dct' (grayscale, requires scipy)" + ) class EncodeFileRequest(BaseModel): @@ -104,6 +134,10 @@ class EncodeFileRequest(BaseModel): rsa_key_base64: Optional[str] = None rsa_password: Optional[str] = None date_str: Optional[str] = None + embed_mode: EmbedModeType = Field( + default="lsb", + description="Embedding mode: 'lsb' (default, color) or 'dct' (grayscale, requires scipy)" + ) class EncodeResponse(BaseModel): @@ -112,6 +146,7 @@ class EncodeResponse(BaseModel): capacity_used_percent: float date_used: str day_of_week: str + embed_mode: str = Field(description="Embedding mode used: 'lsb' or 'dct'") class DecodeRequest(BaseModel): @@ -121,6 +156,10 @@ class DecodeRequest(BaseModel): pin: str = "" rsa_key_base64: Optional[str] = None rsa_password: Optional[str] = None + embed_mode: ExtractModeType = Field( + default="auto", + description="Extraction mode: 'auto' (default), 'lsb', or 'dct'" + ) class DecodeResponse(BaseModel): @@ -132,20 +171,60 @@ class DecodeResponse(BaseModel): mime_type: Optional[str] = None # For file +class ModeCapacity(BaseModel): + """Capacity info for a single mode.""" + capacity_bytes: int + capacity_kb: float + available: bool + output_format: str + + class ImageInfoResponse(BaseModel): width: int height: int pixels: int - capacity_bytes: int - capacity_kb: int + capacity_bytes: int = Field(description="LSB mode capacity (for backwards compatibility)") + capacity_kb: int = Field(description="LSB mode capacity in KB") + # NEW in v3.0 + modes: Optional[dict[str, ModeCapacity]] = Field( + default=None, + description="Capacity by embedding mode (v3.0+)" + ) + + +class CompareModesRequest(BaseModel): + """Request for comparing embedding modes.""" + carrier_image_base64: str + payload_size: Optional[int] = Field( + default=None, + description="Optional payload size to check if it fits" + ) + + +class CompareModesResponse(BaseModel): + """Response comparing LSB and DCT modes.""" + width: int + height: int + lsb: dict + dct: dict + payload_check: Optional[dict] = None + recommendation: str + + +class ModesResponse(BaseModel): + """Response showing available embedding modes.""" + lsb: dict + dct: dict class StatusResponse(BaseModel): version: str has_argon2: bool has_qrcode_read: bool + has_dct: bool # NEW in v3.0 day_names: list[str] max_payload_kb: int + available_modes: list[str] # NEW in v3.0 class QrExtractResponse(BaseModel): @@ -154,27 +233,165 @@ class QrExtractResponse(BaseModel): error: Optional[str] = None +class WillFitRequest(BaseModel): + """Request to check if payload will fit.""" + carrier_image_base64: str + payload_size: int + embed_mode: EmbedModeType = "lsb" + + +class WillFitResponse(BaseModel): + """Response for will_fit check.""" + fits: bool + payload_size: int + capacity: int + usage_percent: float + headroom: int + mode: str + + class ErrorResponse(BaseModel): error: str detail: Optional[str] = None # ============================================================================ -# ROUTES +# ROUTES - STATUS & INFO # ============================================================================ @app.get("/", response_model=StatusResponse) async def root(): """Get API status and configuration.""" + available_modes = ["lsb"] + if has_dct_support(): + available_modes.append("dct") + return StatusResponse( version=__version__, has_argon2=has_argon2(), has_qrcode_read=HAS_QR_READ, + has_dct=has_dct_support(), day_names=list(DAY_NAMES), - max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024 + max_payload_kb=MAX_FILE_PAYLOAD_SIZE // 1024, + available_modes=available_modes ) +@app.get("/modes", response_model=ModesResponse) +async def api_modes(): + """ + Get available embedding modes and their status. + + NEW in v3.0: Shows LSB and DCT mode availability. + """ + return ModesResponse( + lsb={ + "available": True, + "name": "Spatial LSB", + "description": "Embed in pixel LSBs, outputs PNG/BMP", + "output_format": "PNG (color)", + "capacity_ratio": "100%", + }, + dct={ + "available": has_dct_support(), + "name": "DCT Domain", + "description": "Embed in DCT coefficients, outputs grayscale PNG", + "output_format": "PNG (grayscale)", + "capacity_ratio": "~20% of LSB", + "requires": "scipy", + } + ) + + +@app.post("/compare", response_model=CompareModesResponse) +async def api_compare_modes(request: CompareModesRequest): + """ + Compare LSB and DCT embedding modes for a carrier image. + + NEW in v3.0: Returns capacity for both modes and recommendation. + Optionally checks if a specific payload size would fit. + """ + try: + carrier = base64.b64decode(request.carrier_image_base64) + comparison = compare_modes(carrier) + + response = CompareModesResponse( + width=comparison['width'], + height=comparison['height'], + lsb={ + "capacity_bytes": comparison['lsb']['capacity_bytes'], + "capacity_kb": round(comparison['lsb']['capacity_kb'], 1), + "available": True, + "output_format": comparison['lsb']['output'], + }, + dct={ + "capacity_bytes": comparison['dct']['capacity_bytes'], + "capacity_kb": round(comparison['dct']['capacity_kb'], 1), + "available": comparison['dct']['available'], + "output_format": comparison['dct']['output'], + "ratio_vs_lsb_percent": round(comparison['dct']['ratio_vs_lsb'], 1), + }, + recommendation="lsb" if not comparison['dct']['available'] else "dct for stealth, lsb for capacity" + ) + + if request.payload_size: + fits_lsb = request.payload_size <= comparison['lsb']['capacity_bytes'] + fits_dct = request.payload_size <= comparison['dct']['capacity_bytes'] + + response.payload_check = { + "size_bytes": request.payload_size, + "fits_lsb": fits_lsb, + "fits_dct": fits_dct, + } + + # Update recommendation based on payload + if fits_dct and comparison['dct']['available']: + response.recommendation = "dct (payload fits, better stealth)" + elif fits_lsb: + response.recommendation = "lsb (payload too large for dct)" + else: + response.recommendation = "none (payload too large for both modes)" + + return response + + except Exception as e: + raise HTTPException(500, str(e)) + + +@app.post("/will-fit", response_model=WillFitResponse) +async def api_will_fit(request: WillFitRequest): + """ + Check if a payload of given size will fit in the carrier image. + + NEW in v3.0: Supports both LSB and DCT modes. + """ + try: + # Validate mode + if request.embed_mode == "dct" and not has_dct_support(): + raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") + + carrier = base64.b64decode(request.carrier_image_base64) + result = will_fit_by_mode(request.payload_size, carrier, embed_mode=request.embed_mode) + + return WillFitResponse( + fits=result['fits'], + payload_size=result['payload_size'], + capacity=result['capacity'], + usage_percent=round(result['usage_percent'], 1), + headroom=result['headroom'], + mode=request.embed_mode + ) + + except HTTPException: + raise + except Exception as e: + raise HTTPException(500, str(e)) + + +# ============================================================================ +# ROUTES - QR CODE +# ============================================================================ + @app.post("/extract-key-from-qr", response_model=QrExtractResponse) async def api_extract_key_from_qr( qr_image: UploadFile = File(..., description="QR code image containing RSA key") @@ -206,6 +423,10 @@ async def api_extract_key_from_qr( return QrExtractResponse(success=False, error=str(e)) +# ============================================================================ +# ROUTES - GENERATE +# ============================================================================ + @app.post("/generate", response_model=GenerateResponse) async def api_generate(request: GenerateRequest): """ @@ -243,13 +464,23 @@ async def api_generate(request: GenerateRequest): raise HTTPException(500, str(e)) +# ============================================================================ +# ROUTES - ENCODE (JSON) +# ============================================================================ + @app.post("/encode", response_model=EncodeResponse) async def api_encode(request: EncodeRequest): """ Encode a text message into an image. Images must be base64-encoded. Returns base64-encoded stego image. + + NEW in v3.0: Supports embed_mode parameter ('lsb' or 'dct'). """ + # Validate mode + if request.embed_mode == "dct" and not has_dct_support(): + raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") + try: ref_photo = base64.b64decode(request.reference_photo_base64) carrier = base64.b64decode(request.carrier_image_base64) @@ -263,7 +494,8 @@ async def api_encode(request: EncodeRequest): pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, - date_str=request.date_str + date_str=request.date_str, + embed_mode=request.embed_mode, # NEW in v3.0 ) stego_b64 = base64.b64encode(result.stego_image).decode('utf-8') @@ -274,7 +506,8 @@ async def api_encode(request: EncodeRequest): filename=result.filename, capacity_used_percent=result.capacity_percent, date_used=result.date_used, - day_of_week=day_of_week + day_of_week=day_of_week, + embed_mode=request.embed_mode, ) except CapacityError as e: @@ -291,7 +524,13 @@ async def api_encode_file(request: EncodeFileRequest): Encode a file into an image (JSON with base64). File data must be base64-encoded. + + NEW in v3.0: Supports embed_mode parameter ('lsb' or 'dct'). """ + # Validate mode + if request.embed_mode == "dct" and not has_dct_support(): + raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") + try: file_data = base64.b64decode(request.file_data_base64) ref_photo = base64.b64decode(request.reference_photo_base64) @@ -312,7 +551,8 @@ async def api_encode_file(request: EncodeFileRequest): pin=request.pin, rsa_key_data=rsa_key, rsa_password=request.rsa_password, - date_str=request.date_str + date_str=request.date_str, + embed_mode=request.embed_mode, # NEW in v3.0 ) stego_b64 = base64.b64encode(result.stego_image).decode('utf-8') @@ -323,7 +563,8 @@ async def api_encode_file(request: EncodeFileRequest): filename=result.filename, capacity_used_percent=result.capacity_percent, date_used=result.date_used, - day_of_week=day_of_week + day_of_week=day_of_week, + embed_mode=request.embed_mode, ) except CapacityError as e: @@ -334,13 +575,24 @@ async def api_encode_file(request: EncodeFileRequest): raise HTTPException(500, str(e)) +# ============================================================================ +# ROUTES - DECODE (JSON) +# ============================================================================ + @app.post("/decode", response_model=DecodeResponse) async def api_decode(request: DecodeRequest): """ Decode a message or file from a stego image. Returns payload_type to indicate if result is text or file. + + NEW in v3.0: Supports embed_mode parameter ('auto', 'lsb', or 'dct'). + With 'auto' (default), tries LSB first then DCT. """ + # Validate mode + if request.embed_mode == "dct" and not has_dct_support(): + raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") + try: stego = base64.b64decode(request.stego_image_base64) ref_photo = base64.b64decode(request.reference_photo_base64) @@ -352,7 +604,8 @@ async def api_decode(request: DecodeRequest): day_phrase=request.day_phrase, pin=request.pin, rsa_key_data=rsa_key, - rsa_password=request.rsa_password + rsa_password=request.rsa_password, + embed_mode=request.embed_mode, # NEW in v3.0 ) if result.is_file: @@ -376,6 +629,10 @@ async def api_decode(request: DecodeRequest): raise HTTPException(500, str(e)) +# ============================================================================ +# ROUTES - ENCODE/DECODE (MULTIPART) +# ============================================================================ + @app.post("/encode/multipart") async def api_encode_multipart( day_phrase: str = Form(...), @@ -387,7 +644,8 @@ async def api_encode_multipart( rsa_key: Optional[UploadFile] = File(None), rsa_key_qr: Optional[UploadFile] = File(None), rsa_password: str = Form(""), - date_str: str = Form("") + date_str: str = Form(""), + embed_mode: str = Form("lsb"), # NEW in v3.0 ): """ Encode using multipart form data (file uploads). @@ -395,7 +653,15 @@ async def api_encode_multipart( Provide either 'message' (text) or 'payload_file' (binary file). RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image). Returns the stego image directly as PNG with metadata headers. + + NEW in v3.0: Supports embed_mode parameter ('lsb' or 'dct'). """ + # Validate mode + if embed_mode not in ("lsb", "dct"): + raise HTTPException(400, "embed_mode must be 'lsb' or 'dct'") + if embed_mode == "dct" and not has_dct_support(): + raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") + try: ref_data = await reference_photo.read() carrier_data = await carrier.read() @@ -443,7 +709,8 @@ async def api_encode_multipart( pin=pin, rsa_key_data=rsa_key_data, rsa_password=effective_password, - date_str=date_str if date_str else None + date_str=date_str if date_str else None, + embed_mode=embed_mode, # NEW in v3.0 ) day_of_week = get_day_from_date(result.date_used) @@ -455,7 +722,8 @@ async def api_encode_multipart( "Content-Disposition": f"attachment; filename={result.filename}", "X-Stegasoo-Date": result.date_used, "X-Stegasoo-Day": day_of_week, - "X-Stegasoo-Capacity-Percent": f"{result.capacity_percent:.1f}" + "X-Stegasoo-Capacity-Percent": f"{result.capacity_percent:.1f}", + "X-Stegasoo-Embed-Mode": embed_mode, # NEW in v3.0 } ) @@ -463,6 +731,8 @@ async def api_encode_multipart( raise HTTPException(400, str(e)) except StegasooError as e: raise HTTPException(400, str(e)) + except HTTPException: + raise except Exception as e: raise HTTPException(500, str(e)) @@ -475,14 +745,23 @@ async def api_decode_multipart( pin: str = Form(""), rsa_key: Optional[UploadFile] = File(None), rsa_key_qr: Optional[UploadFile] = File(None), - rsa_password: str = Form("") + rsa_password: str = Form(""), + embed_mode: str = Form("auto"), # NEW in v3.0 ): """ Decode using multipart form data (file uploads). RSA key can be provided as 'rsa_key' (.pem file) or 'rsa_key_qr' (QR code image). Returns JSON with payload_type indicating text or file. + + NEW in v3.0: Supports embed_mode parameter ('auto', 'lsb', or 'dct'). """ + # Validate mode + if embed_mode not in ("auto", "lsb", "dct"): + raise HTTPException(400, "embed_mode must be 'auto', 'lsb', or 'dct'") + if embed_mode == "dct" and not has_dct_support(): + raise HTTPException(400, "DCT mode requires scipy. Install with: pip install scipy") + try: ref_data = await reference_photo.read() stego_data = await stego_image.read() @@ -515,7 +794,8 @@ async def api_decode_multipart( day_phrase=day_phrase, pin=pin, rsa_key_data=rsa_key_data, - rsa_password=effective_password + rsa_password=effective_password, + embed_mode=embed_mode, # NEW in v3.0 ) if result.is_file: @@ -535,13 +815,26 @@ async def api_decode_multipart( raise HTTPException(401, "Decryption failed. Check credentials.") except StegasooError as e: raise HTTPException(400, str(e)) + except HTTPException: + raise except Exception as e: raise HTTPException(500, str(e)) +# ============================================================================ +# ROUTES - IMAGE INFO +# ============================================================================ + @app.post("/image/info", response_model=ImageInfoResponse) -async def api_image_info(image: UploadFile = File(...)): - """Get information about an image's capacity.""" +async def api_image_info( + image: UploadFile = File(...), + include_modes: bool = Query(True, description="Include capacity by mode (v3.0+)") +): + """ + Get information about an image's capacity. + + NEW in v3.0: Optionally includes capacity for both LSB and DCT modes. + """ try: image_data = await image.read() @@ -551,7 +844,7 @@ async def api_image_info(image: UploadFile = File(...)): capacity = calculate_capacity(image_data) - return ImageInfoResponse( + response = ImageInfoResponse( width=result.details['width'], height=result.details['height'], pixels=result.details['pixels'], @@ -559,6 +852,26 @@ async def api_image_info(image: UploadFile = File(...)): capacity_kb=capacity // 1024 ) + # NEW in v3.0 - include mode comparison + if include_modes: + comparison = compare_modes(image_data) + response.modes = { + "lsb": ModeCapacity( + capacity_bytes=comparison['lsb']['capacity_bytes'], + capacity_kb=round(comparison['lsb']['capacity_kb'], 1), + available=True, + output_format=comparison['lsb']['output'], + ), + "dct": ModeCapacity( + capacity_bytes=comparison['dct']['capacity_bytes'], + capacity_kb=round(comparison['dct']['capacity_kb'], 1), + available=comparison['dct']['available'], + output_format=comparison['dct']['output'], + ), + } + + return response + except HTTPException: raise except Exception as e: diff --git a/frontends/cli/main.py b/frontends/cli/main.py index 687f01e..7f3f814 100644 --- a/frontends/cli/main.py +++ b/frontends/cli/main.py @@ -8,6 +8,7 @@ Usage: stegasoo decode [OPTIONS] stegasoo verify [OPTIONS] stegasoo info [OPTIONS] + stegasoo compare [OPTIONS] # NEW in v3.0 """ import sys @@ -29,9 +30,16 @@ from stegasoo import ( DAY_NAMES, __version__, StegasooError, DecryptionError, ExtractionError, FilePayload, - # New in 2.2.1 will_fit, strip_image_metadata, + # NEW in v3.0 - Embedding modes + EMBED_MODE_LSB, + EMBED_MODE_DCT, + EMBED_MODE_AUTO, + has_dct_support, + compare_modes, + will_fit_by_mode, + calculate_capacity_by_mode, ) # QR Code utilities @@ -68,6 +76,11 @@ def cli(): • Reference photo (something you have) • Daily passphrase (something you know) • Static PIN or RSA key (additional security) + + \b + NEW in v3.0 - Embedding Modes: + • LSB mode (default): Full color output, higher capacity + • DCT mode: Grayscale output, ~20% capacity, better stealth """ pass @@ -200,8 +213,10 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json): @click.option('--key-password', help='RSA key password (for encrypted .pem files)') @click.option('--output', '-o', type=click.Path(), help='Output file (default: auto-generated)') @click.option('--date', 'date_str', help='Date override (YYYY-MM-DD)') +@click.option('--mode', 'embed_mode', type=click.Choice(['lsb', 'dct']), default='lsb', + help='Embedding mode: lsb (default, color) or dct (grayscale, requires scipy)') @click.option('--quiet', '-q', is_flag=True, help='Suppress output except errors') -def encode_cmd(ref, carrier, message, message_file, embed_file, phrase, pin, key, key_qr, key_password, output, date_str, quiet): +def encode_cmd(ref, carrier, message, message_file, embed_file, phrase, pin, key, key_qr, key_password, output, date_str, embed_mode, quiet): """ Encode a secret message or file into an image. @@ -212,20 +227,37 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, phrase, pin, key For binary files, use -e/--embed-file. RSA key can be provided as a .pem file (--key) or QR code image (--key-qr). + \b + Embedding Modes (v3.0): + --mode lsb Spatial LSB embedding (default) + • Full color output (PNG/BMP) + • Higher capacity (~375 KB/megapixel) + + --mode dct DCT domain embedding (requires scipy) + • Grayscale output only + • Lower capacity (~75 KB/megapixel) + • Better resistance to visual analysis + \b Examples: - # Text message with PIN + # Text message with PIN (LSB mode, default) stegasoo encode -r photo.jpg -c meme.png -p "apple forest thunder" --pin 123456 -m "secret" + # DCT mode for better stealth + stegasoo encode -r photo.jpg -c meme.png -p "words" --pin 123456 -m "secret" --mode dct + # With RSA key file stegasoo encode -r photo.jpg -c meme.png -p "words" -k mykey.pem -m "secret" - # With RSA key from QR code image - stegasoo encode -r photo.jpg -c meme.png -p "words" --key-qr keyqr.png -m "secret" - # Embed a binary file stegasoo encode -r photo.jpg -c meme.png -p "words" --pin 123456 -e secret.pdf """ + # Check DCT mode availability + if embed_mode == 'dct' and not has_dct_support(): + raise click.ClickException( + "DCT mode requires scipy. Install with: pip install scipy" + ) + # Determine what to encode payload = None @@ -277,16 +309,28 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, phrase, pin, key ref_photo = Path(ref).read_bytes() carrier_image = Path(carrier).read_bytes() - # Pre-check capacity - fit_check = will_fit(payload, carrier_image) + # Pre-check capacity with selected mode + fit_check = will_fit_by_mode(payload, carrier_image, embed_mode=embed_mode) if not fit_check['fits']: + # Suggest alternative mode if it would fit + alt_mode = 'lsb' if embed_mode == 'dct' else 'dct' + alt_check = will_fit_by_mode(payload, carrier_image, embed_mode=alt_mode) + + suggestion = "" + if alt_mode == 'lsb' and alt_check['fits']: + suggestion = f"\n Tip: Payload would fit in LSB mode (--mode lsb)" + raise click.ClickException( - f"Payload too large for carrier image.\n" + f"Payload too large for {embed_mode.upper()} mode.\n" f" Payload: {fit_check['payload_size']:,} bytes\n" f" Capacity: {fit_check['capacity']:,} bytes\n" f" Shortfall: {-fit_check['headroom']:,} bytes" + f"{suggestion}" ) + if not quiet: + click.echo(f"Mode: {embed_mode.upper()} ({fit_check['usage_percent']:.1f}% capacity)") + result = encode( message=payload, reference_photo=ref_photo, @@ -296,6 +340,7 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, phrase, pin, key rsa_key_data=rsa_key_data, rsa_password=effective_key_password, date_str=date_str, + embed_mode=embed_mode, # NEW in v3.0 ) # Determine output path @@ -313,6 +358,8 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, phrase, pin, key click.echo(f" Size: {len(result.stego_image):,} bytes") click.echo(f" Capacity used: {result.capacity_percent:.1f}%") click.echo(f" Date: {result.date_used}") + if embed_mode == 'dct': + click.secho(f" Note: Output is grayscale (DCT mode)", dim=True) except StegasooError as e: raise click.ClickException(str(e)) @@ -335,9 +382,11 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, phrase, pin, key @click.option('--key-qr', type=click.Path(exists=True), help='RSA key from QR code image') @click.option('--key-password', help='RSA key password (for encrypted .pem files)') @click.option('--output', '-o', type=click.Path(), help='Save decoded content to file') +@click.option('--mode', 'embed_mode', type=click.Choice(['auto', 'lsb', 'dct']), default='auto', + help='Extraction mode: auto (default), lsb, or dct') @click.option('--quiet', '-q', is_flag=True, help='Output only the content (for text) or suppress messages (for files)') @click.option('--force', is_flag=True, help='Overwrite existing output file') -def decode_cmd(ref, stego, phrase, pin, key, key_qr, key_password, output, quiet, force): +def decode_cmd(ref, stego, phrase, pin, key, key_qr, key_password, output, embed_mode, quiet, force): """ Decode a secret message or file from a stego image. @@ -345,20 +394,32 @@ def decode_cmd(ref, stego, phrase, pin, key, key_qr, key_password, output, quiet Automatically detects whether content is text or a file. RSA key can be provided as a .pem file (--key) or QR code image (--key-qr). + \b + Extraction Modes (v3.0): + --mode auto Auto-detect (default) - tries LSB first, then DCT + --mode lsb Only try LSB extraction + --mode dct Only try DCT extraction (requires scipy) + \b Examples: - # Decode with PIN + # Decode with PIN (auto-detect mode) stegasoo decode -r photo.jpg -s stego.png -p "apple forest thunder" --pin 123456 + # Explicitly specify DCT mode + stegasoo decode -r photo.jpg -s stego.png -p "words" --pin 123456 --mode dct + # Decode with RSA key file stegasoo decode -r photo.jpg -s stego.png -p "words" -k mykey.pem - # Decode with RSA key from QR code image - stegasoo decode -r photo.jpg -s stego.png -p "words" --key-qr keyqr.png - # Save output to file stegasoo decode -r photo.jpg -s stego.png -p "words" --pin 123456 -o output.txt """ + # Check DCT mode availability + if embed_mode == 'dct' and not has_dct_support(): + raise click.ClickException( + "DCT mode requires scipy. Install with: pip install scipy" + ) + # Load key if provided (from .pem file or QR code image) rsa_key_data = None rsa_key_from_qr = False @@ -400,6 +461,7 @@ def decode_cmd(ref, stego, phrase, pin, key, key_qr, key_password, output, quiet pin=pin or "", rsa_key_data=rsa_key_data, rsa_password=effective_key_password, + embed_mode=embed_mode, # NEW in v3.0 ) if result.is_file: @@ -459,8 +521,10 @@ def decode_cmd(ref, stego, phrase, pin, key, key_qr, key_password, output, quiet @click.option('--key', '-k', type=click.Path(exists=True), help='RSA key file (.pem)') @click.option('--key-qr', type=click.Path(exists=True), help='RSA key from QR code image') @click.option('--key-password', help='RSA key password (for encrypted .pem files)') +@click.option('--mode', 'embed_mode', type=click.Choice(['auto', 'lsb', 'dct']), default='auto', + help='Extraction mode: auto (default), lsb, or dct') @click.option('--json', 'as_json', is_flag=True, help='Output as JSON') -def verify(ref, stego, phrase, pin, key, key_qr, key_password, as_json): +def verify(ref, stego, phrase, pin, key, key_qr, key_password, embed_mode, as_json): """ Verify that a stego image can be decoded without extracting the message. @@ -472,7 +536,15 @@ def verify(ref, stego, phrase, pin, key, key_qr, key_password, as_json): stegasoo verify -r photo.jpg -s stego.png -p "apple forest thunder" --pin 123456 stegasoo verify -r photo.jpg -s stego.png -p "words" -k mykey.pem --json + + stegasoo verify -r photo.jpg -s stego.png -p "words" --pin 123456 --mode dct """ + # Check DCT mode availability + if embed_mode == 'dct' and not has_dct_support(): + raise click.ClickException( + "DCT mode requires scipy. Install with: pip install scipy" + ) + # Load key if provided rsa_key_data = None rsa_key_from_qr = False @@ -511,6 +583,7 @@ def verify(ref, stego, phrase, pin, key, key_qr, key_password, as_json): pin=pin or "", rsa_key_data=rsa_key_data, rsa_password=effective_key_password, + embed_mode=embed_mode, # NEW in v3.0 ) # Calculate payload size @@ -576,11 +649,13 @@ def verify(ref, stego, phrase, pin, key, key_qr, key_password, as_json): @cli.command() @click.argument('image', type=click.Path(exists=True)) -def info(image): +@click.option('--json', 'as_json', is_flag=True, help='Output as JSON') +def info(image, as_json): """ Show information about an image. - Displays dimensions, capacity, and attempts to detect date from filename. + Displays dimensions, capacity for both LSB and DCT modes, + and attempts to detect date from filename. """ try: image_data = Path(image).read_bytes() @@ -589,21 +664,58 @@ def info(image): if not result.is_valid: raise click.ClickException(result.error_message) - capacity = calculate_capacity(image_data) + # Get capacity comparison + comparison = compare_modes(image_data) # Try to get date from filename date_str = parse_date_from_filename(image) day_name = get_day_from_date(date_str) if date_str else None + if as_json: + import json + output = { + "file": image, + "width": result.details['width'], + "height": result.details['height'], + "pixels": result.details['pixels'], + "mode": result.details['mode'], + "format": result.details['format'], + "capacity": { + "lsb": { + "bytes": comparison['lsb']['capacity_bytes'], + "kb": round(comparison['lsb']['capacity_kb'], 1), + }, + "dct": { + "bytes": comparison['dct']['capacity_bytes'], + "kb": round(comparison['dct']['capacity_kb'], 1), + "available": comparison['dct']['available'], + "ratio_vs_lsb": round(comparison['dct']['ratio_vs_lsb'], 1), + }, + }, + } + if date_str: + output["embed_date"] = date_str + output["embed_day"] = day_name + click.echo(json.dumps(output, indent=2)) + return + click.echo() click.secho(f"Image: {image}", bold=True) click.echo(f" Dimensions: {result.details['width']} × {result.details['height']}") click.echo(f" Pixels: {result.details['pixels']:,}") click.echo(f" Mode: {result.details['mode']}") click.echo(f" Format: {result.details['format']}") - click.echo(f" Capacity: ~{capacity:,} bytes ({capacity // 1024} KB)") + click.echo() + + click.secho(" Capacity:", bold=True) + click.echo(f" LSB mode: ~{comparison['lsb']['capacity_bytes']:,} bytes ({comparison['lsb']['capacity_kb']:.1f} KB)") + + dct_status = "✓" if comparison['dct']['available'] else "✗ (scipy not installed)" + click.echo(f" DCT mode: ~{comparison['dct']['capacity_bytes']:,} bytes ({comparison['dct']['capacity_kb']:.1f} KB) {dct_status}") + click.echo(f" DCT ratio: {comparison['dct']['ratio_vs_lsb']:.1f}% of LSB") if date_str: + click.echo() click.echo(f" Embed date: {date_str} ({day_name})") click.echo() @@ -612,6 +724,127 @@ def info(image): raise click.ClickException(str(e)) +# ============================================================================ +# COMPARE COMMAND (NEW in v3.0) +# ============================================================================ + +@cli.command() +@click.argument('image', type=click.Path(exists=True)) +@click.option('--payload-size', '-s', type=int, help='Check if specific payload size fits') +@click.option('--json', 'as_json', is_flag=True, help='Output as JSON') +def compare(image, payload_size, as_json): + """ + Compare LSB and DCT embedding modes for an image. + + Shows capacity for each mode and recommends which to use. + Optionally checks if a specific payload size would fit. + + \b + Examples: + stegasoo compare carrier.png + stegasoo compare carrier.png --payload-size 50000 + stegasoo compare carrier.png --json + """ + try: + image_data = Path(image).read_bytes() + + comparison = compare_modes(image_data) + + if as_json: + import json + output = { + "file": image, + "width": comparison['width'], + "height": comparison['height'], + "modes": { + "lsb": { + "capacity_bytes": comparison['lsb']['capacity_bytes'], + "capacity_kb": round(comparison['lsb']['capacity_kb'], 1), + "available": True, + "output_format": comparison['lsb']['output'], + }, + "dct": { + "capacity_bytes": comparison['dct']['capacity_bytes'], + "capacity_kb": round(comparison['dct']['capacity_kb'], 1), + "available": comparison['dct']['available'], + "output_format": comparison['dct']['output'], + "ratio_vs_lsb_percent": round(comparison['dct']['ratio_vs_lsb'], 1), + }, + }, + } + + if payload_size: + output["payload_check"] = { + "size_bytes": payload_size, + "fits_lsb": payload_size <= comparison['lsb']['capacity_bytes'], + "fits_dct": payload_size <= comparison['dct']['capacity_bytes'], + } + + click.echo(json.dumps(output, indent=2)) + return + + click.echo() + click.secho(f"═══ Mode Comparison: {image} ═══", fg='cyan', bold=True) + click.echo(f" Dimensions: {comparison['width']} × {comparison['height']}") + click.echo() + + # LSB mode + click.secho(" ┌─── LSB Mode ───", fg='green') + click.echo(f" │ Capacity: {comparison['lsb']['capacity_bytes']:,} bytes ({comparison['lsb']['capacity_kb']:.1f} KB)") + click.echo(f" │ Output: {comparison['lsb']['output']}") + click.echo(f" │ Status: ✓ Available") + click.echo(" │") + + # DCT mode + click.secho(" ├─── DCT Mode ───", fg='blue') + click.echo(f" │ Capacity: {comparison['dct']['capacity_bytes']:,} bytes ({comparison['dct']['capacity_kb']:.1f} KB)") + click.echo(f" │ Output: {comparison['dct']['output']}") + click.echo(f" │ Ratio: {comparison['dct']['ratio_vs_lsb']:.1f}% of LSB capacity") + if comparison['dct']['available']: + click.echo(f" │ Status: ✓ Available") + else: + click.secho(f" │ Status: ✗ Requires scipy (pip install scipy)", fg='yellow') + click.echo(" │") + + # Payload check + if payload_size: + click.secho(" ├─── Payload Check ───", fg='magenta') + click.echo(f" │ Size: {payload_size:,} bytes") + + fits_lsb = payload_size <= comparison['lsb']['capacity_bytes'] + fits_dct = payload_size <= comparison['dct']['capacity_bytes'] + + lsb_icon = "✓" if fits_lsb else "✗" + dct_icon = "✓" if fits_dct else "✗" + lsb_color = 'green' if fits_lsb else 'red' + dct_color = 'green' if fits_dct else 'red' + + click.echo(f" │ LSB mode: ", nl=False) + click.secho(f"{lsb_icon} {'Fits' if fits_lsb else 'Too large'}", fg=lsb_color) + click.echo(f" │ DCT mode: ", nl=False) + click.secho(f"{dct_icon} {'Fits' if fits_dct else 'Too large'}", fg=dct_color) + click.echo(" │") + + # Recommendation + click.secho(" └─── Recommendation ───", fg='yellow') + if not comparison['dct']['available']: + click.echo(" Use LSB mode (DCT unavailable)") + elif payload_size: + if fits_dct: + click.echo(" DCT mode for better stealth (payload fits both modes)") + elif fits_lsb: + click.echo(" LSB mode (payload too large for DCT)") + else: + click.secho(" ✗ Payload too large for both modes!", fg='red') + else: + click.echo(" LSB for larger payloads, DCT for better stealth") + + click.echo() + + except Exception as e: + raise click.ClickException(str(e)) + + # ============================================================================ # STRIP-METADATA COMMAND # ============================================================================ @@ -656,6 +889,48 @@ def strip_metadata_cmd(image, output, output_format, quiet): raise click.ClickException(str(e)) +# ============================================================================ +# MODES COMMAND (NEW in v3.0) +# ============================================================================ + +@cli.command() +def modes(): + """ + Show available embedding modes and their status. + + Displays which modes are available and their characteristics. + """ + click.echo() + click.secho("═══ Stegasoo Embedding Modes ═══", fg='cyan', bold=True) + click.echo() + + # LSB Mode + click.secho(" LSB Mode (Spatial LSB)", fg='green', bold=True) + click.echo(" Status: ✓ Always available") + click.echo(" Output: PNG/BMP (full color)") + click.echo(" Capacity: ~375 KB per megapixel") + click.echo(" Use case: Larger payloads, color preservation") + click.echo(" CLI flag: --mode lsb (default)") + click.echo() + + # DCT Mode + click.secho(" DCT Mode (Frequency Domain)", fg='blue', bold=True) + if has_dct_support(): + click.echo(" Status: ✓ Available") + else: + click.secho(" Status: ✗ Requires scipy", fg='yellow') + click.echo(" Install: pip install scipy") + click.echo(" Output: PNG (grayscale only)") + click.echo(" Capacity: ~75 KB per megapixel (~20% of LSB)") + click.echo(" Use case: Better stealth, smaller messages") + click.echo(" CLI flag: --mode dct") + click.echo() + + click.secho(" Tip:", dim=True) + click.echo(" Use 'stegasoo compare ' to see capacity for both modes") + click.echo() + + # ============================================================================ # MAIN # ============================================================================ diff --git a/frontends/web/app.py b/frontends/web/app.py index 0b2820c..9628c19 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 """ -Stegasoo Web Frontend +Stegasoo Web Frontend (v3.0.1) Flask-based web UI for steganography operations. Supports both text messages and file embedding. +NEW in v3.0: LSB and DCT embedding modes with advanced options. +NEW in v3.0.1: DCT output format selection (PNG or JPEG). """ import io @@ -35,6 +37,13 @@ from stegasoo import ( StegasooError, DecryptionError, CapacityError, has_argon2, FilePayload, + # NEW in v3.0 - Embedding modes + EMBED_MODE_LSB, + EMBED_MODE_DCT, + EMBED_MODE_AUTO, + has_dct_support, + compare_modes, + will_fit_by_mode, ) from stegasoo.constants import ( __version__, @@ -102,6 +111,8 @@ def inject_globals(): 'temp_file_expiry_minutes': TEMP_FILE_EXPIRY_MINUTES, 'min_pin_length': MIN_PIN_LENGTH, 'max_pin_length': MAX_PIN_LENGTH, + # NEW in v3.0 + 'has_dct': has_dct_support(), } @@ -114,6 +125,7 @@ try: # Check current limits print(f"Current MAX_FILE_SIZE from constants: {MAX_FILE_SIZE}") print(f"Current MAX_FILE_PAYLOAD_SIZE: {MAX_FILE_PAYLOAD_SIZE}") + print(f"DCT support available: {has_dct_support()}") DESIRED_PAYLOAD_SIZE = 2 * 1024 * 1024 # 2MB @@ -131,7 +143,7 @@ def generate_thumbnail(image_data: bytes, size: tuple = THUMBNAIL_SIZE) -> bytes """Generate thumbnail from image data.""" try: with Image.open(io.BytesIO(image_data)) as img: - # Convert to RGB if necessary + # Convert to RGB if necessary (handle grayscale too) if img.mode in ('RGBA', 'LA', 'P'): # Create white background for transparent images background = Image.new('RGB', img.size, (255, 255, 255)) @@ -139,6 +151,9 @@ def generate_thumbnail(image_data: bytes, size: tuple = THUMBNAIL_SIZE) -> bytes img = img.convert('RGBA') background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) img = background + elif img.mode == 'L': + # Convert grayscale to RGB for thumbnail + img = img.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') @@ -401,6 +416,85 @@ def extract_key_from_qr_route(): }), 500 +# ============================================================================ +# NEW in v3.0 - CAPACITY COMPARISON API +# ============================================================================ + +@app.route('/api/compare-capacity', methods=['POST']) +def api_compare_capacity(): + """ + Compare LSB and DCT capacity for an uploaded carrier image. + Returns JSON with capacity info for both modes. + """ + carrier = request.files.get('carrier') + if not carrier: + return jsonify({'error': 'No carrier image provided'}), 400 + + try: + carrier_data = carrier.read() + comparison = compare_modes(carrier_data) + + return jsonify({ + 'success': True, + 'width': comparison['width'], + 'height': comparison['height'], + 'lsb': { + 'capacity_bytes': comparison['lsb']['capacity_bytes'], + 'capacity_kb': round(comparison['lsb']['capacity_kb'], 1), + 'output': comparison['lsb']['output'], + }, + 'dct': { + 'capacity_bytes': comparison['dct']['capacity_bytes'], + 'capacity_kb': round(comparison['dct']['capacity_kb'], 1), + 'output': comparison['dct']['output'], + 'available': comparison['dct']['available'], + 'ratio': round(comparison['dct']['ratio_vs_lsb'], 1), + } + }) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/check-fit', methods=['POST']) +def api_check_fit(): + """ + Check if a payload will fit in the carrier with selected mode. + Returns JSON with fit status and details. + """ + carrier = request.files.get('carrier') + payload_size = request.form.get('payload_size', type=int) + embed_mode = request.form.get('embed_mode', 'lsb') + + if not carrier or payload_size is None: + return jsonify({'error': 'Missing carrier or payload_size'}), 400 + + if embed_mode not in ('lsb', 'dct'): + return jsonify({'error': 'Invalid embed_mode'}), 400 + + if embed_mode == 'dct' and not has_dct_support(): + return jsonify({'error': 'DCT mode requires scipy'}), 400 + + try: + carrier_data = carrier.read() + result = will_fit_by_mode(payload_size, carrier_data, embed_mode=embed_mode) + + return jsonify({ + 'success': True, + 'fits': result['fits'], + 'payload_size': result['payload_size'], + 'capacity': result['capacity'], + 'usage_percent': round(result['usage_percent'], 1), + 'headroom': result['headroom'], + 'mode': embed_mode, + }) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +# ============================================================================ +# ENCODE +# ============================================================================ + @app.route('/encode', methods=['GET', 'POST']) def encode_page(): day_of_week = get_today_day() @@ -428,6 +522,21 @@ def encode_page(): rsa_password = request.form.get('rsa_password', '') payload_type = request.form.get('payload_type', 'text') + # NEW in v3.0 - Embedding mode + embed_mode = request.form.get('embed_mode', 'lsb') + if embed_mode not in ('lsb', 'dct'): + embed_mode = 'lsb' + + # NEW in v3.0.1 - DCT output format + dct_output_format = request.form.get('dct_output_format', 'png') + if dct_output_format not in ('png', 'jpeg'): + dct_output_format = 'png' + + # Check DCT availability + if embed_mode == 'dct' and not has_dct_support(): + flash('DCT mode requires scipy. Install with: pip install scipy', 'error') + return render_template('encode.html', day_of_week=day_of_week, has_qrcode_read=HAS_QRCODE_READ) + # Determine payload if payload_type == 'file' and payload_file and payload_file.filename: # File payload @@ -515,7 +624,7 @@ def encode_page(): else: date_str = datetime.now().strftime('%Y-%m-%d') - # Encode + # Encode with selected mode and output format encode_result = encode( message=payload, reference_photo=ref_data, @@ -524,16 +633,34 @@ def encode_page(): pin=pin, rsa_key_data=rsa_key_data, rsa_password=key_password, - date_str=date_str + date_str=date_str, + embed_mode=embed_mode, # NEW in v3.0 + dct_output_format=dct_output_format if embed_mode == 'dct' else None, # NEW in v3.0.1 ) + # Determine actual output format for filename and storage + if embed_mode == 'dct' and dct_output_format == 'jpeg': + output_ext = '.jpg' + output_mime = 'image/jpeg' + # Modify filename extension if needed + filename = encode_result.filename + if filename.endswith('.png'): + filename = filename[:-4] + '.jpg' + else: + output_ext = '.png' + output_mime = 'image/png' + filename = encode_result.filename + # Store temporarily file_id = secrets.token_urlsafe(16) cleanup_temp_files() TEMP_FILES[file_id] = { 'data': encode_result.stego_image, - 'filename': encode_result.filename, - 'timestamp': time.time() + 'filename': filename, + 'timestamp': time.time(), + 'embed_mode': embed_mode, + 'output_format': dct_output_format if embed_mode == 'dct' else 'png', + 'mime_type': output_mime, } return redirect(url_for('encode_result', file_id=file_id)) @@ -570,7 +697,9 @@ def encode_result(file_id): return render_template('encode_result.html', file_id=file_id, filename=file_info['filename'], - thumbnail_url=url_for('encode_thumbnail', thumb_id=thumbnail_id) if thumbnail_id else None + thumbnail_url=url_for('encode_thumbnail', thumb_id=thumbnail_id) if thumbnail_id else None, + embed_mode=file_info.get('embed_mode', 'lsb'), + output_format=file_info.get('output_format', 'png'), # NEW in v3.0.1 ) @@ -594,9 +723,11 @@ def encode_download(file_id): return redirect(url_for('encode_page')) file_info = TEMP_FILES[file_id] + mime_type = file_info.get('mime_type', 'image/png') + return send_file( io.BytesIO(file_info['data']), - mimetype='image/png', + mimetype=mime_type, as_attachment=True, download_name=file_info['filename'] ) @@ -609,9 +740,11 @@ def encode_file_route(file_id): return "Not found", 404 file_info = TEMP_FILES[file_id] + mime_type = file_info.get('mime_type', 'image/png') + return send_file( io.BytesIO(file_info['data']), - mimetype='image/png', + mimetype=mime_type, as_attachment=False, download_name=file_info['filename'] ) @@ -629,6 +762,10 @@ def encode_cleanup(file_id): return jsonify({'status': 'ok'}) +# ============================================================================ +# DECODE +# ============================================================================ + @app.route('/decode', methods=['GET', 'POST']) def decode_page(): if request.method == 'POST': @@ -647,6 +784,16 @@ def decode_page(): pin = request.form.get('pin', '').strip() rsa_password = request.form.get('rsa_password', '') + # NEW in v3.0 - Extraction mode + embed_mode = request.form.get('embed_mode', 'auto') + if embed_mode not in ('auto', 'lsb', 'dct'): + embed_mode = 'auto' + + # Check DCT availability + if embed_mode == 'dct' and not has_dct_support(): + flash('DCT mode requires scipy. Install with: pip install scipy', 'error') + return render_template('decode.html', has_qrcode_read=HAS_QRCODE_READ) + # Get encoding date from form (detected from filename in JS) stego_date = request.form.get('stego_date', '').strip() @@ -700,7 +847,7 @@ def decode_page(): flash(result.error_message, 'error') return render_template('decode.html', has_qrcode_read=HAS_QRCODE_READ) - # Decode + # Decode with selected mode decode_result = decode( stego_image=stego_data, reference_photo=ref_data, @@ -708,7 +855,8 @@ def decode_page(): pin=pin, rsa_key_data=rsa_key_data, rsa_password=key_password, - date_str=stego_date if stego_date else None + date_str=stego_date if stego_date else None, + embed_mode=embed_mode, # NEW in v3.0 ) if decode_result.is_file: diff --git a/frontends/web/templates/decode.html b/frontends/web/templates/decode.html index 694912a..09fb5b6 100644 --- a/frontends/web/templates/decode.html +++ b/frontends/web/templates/decode.html @@ -181,6 +181,78 @@ + +
+ + +
+
+ + +
+ + +
+ +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+ +
+ + Auto tries LSB first, then DCT. Use specific mode if you know how it was encoded. + {% if not has_dct %} +
DCT requires scipy: pip install scipy + {% endif %} +
+
+ +
+
+
+ @@ -211,10 +283,14 @@ Ensure the stego image hasn't been resized or recompressed -
  • +
  • If using an RSA key, make sure the password is correct
  • +
  • + + v3.0: If auto-detection fails, try specifying LSB or DCT mode in Advanced Options +
  • @@ -228,7 +304,8 @@ // Form submit loading state document.getElementById('decodeForm')?.addEventListener('submit', function() { const btn = document.getElementById('decodeBtn'); - btn.innerHTML = 'Decoding...'; + const selectedMode = document.querySelector('input[name="embed_mode"]:checked')?.value || 'auto'; + btn.innerHTML = `Decoding (${selectedMode.toUpperCase()})...`; btn.disabled = true; }); @@ -323,6 +400,38 @@ document.getElementById('togglePin')?.addEventListener('click', function() { } }); +// Mode card highlighting +const autoModeCard = document.getElementById('autoModeCard'); +const lsbModeCardDec = document.getElementById('lsbModeCardDec'); +const dctModeCardDec = document.getElementById('dctModeCardDec'); +const modeAuto = document.getElementById('modeAuto'); +const modeLsbDec = document.getElementById('modeLsbDec'); +const modeDctDec = document.getElementById('modeDctDec'); + +function updateModeCardHighlightDec() { + autoModeCard?.classList.toggle('border-success', modeAuto?.checked); + autoModeCard?.classList.toggle('border-2', modeAuto?.checked); + lsbModeCardDec?.classList.toggle('border-primary', modeLsbDec?.checked); + lsbModeCardDec?.classList.toggle('border-2', modeLsbDec?.checked); + dctModeCardDec?.classList.toggle('border-info', modeDctDec?.checked); + dctModeCardDec?.classList.toggle('border-2', modeDctDec?.checked); +} + +modeAuto?.addEventListener('change', updateModeCardHighlightDec); +modeLsbDec?.addEventListener('change', updateModeCardHighlightDec); +modeDctDec?.addEventListener('change', updateModeCardHighlightDec); +updateModeCardHighlightDec(); // Initial state + +// Advanced options chevron rotation +document.getElementById('advancedOptionsDec')?.addEventListener('show.bs.collapse', function() { + document.getElementById('advancedChevronDec').classList.add('bi-chevron-up'); + document.getElementById('advancedChevronDec').classList.remove('bi-chevron-down'); +}); +document.getElementById('advancedOptionsDec')?.addEventListener('hide.bs.collapse', function() { + document.getElementById('advancedChevronDec').classList.remove('bi-chevron-up'); + document.getElementById('advancedChevronDec').classList.add('bi-chevron-down'); +}); + // Paste from Clipboard document.addEventListener('paste', function(e) { if (!document.getElementById('decodeForm')) return; diff --git a/frontends/web/templates/encode.html b/frontends/web/templates/encode.html index 1e89d1c..6c01bf0 100644 --- a/frontends/web/templates/encode.html +++ b/frontends/web/templates/encode.html @@ -36,7 +36,7 @@ Carrier Image
    - +
    Drop image or click to browse @@ -49,6 +49,20 @@
    + +
    +
    +
    + + Carrier: - +
    +
    + LSB: - + DCT: - +
    +
    +
    +
    + +
    + + +
    +
    + + +
    + + +
    + +
    +
    + + +
    +
    + + +
    +
    + + +
    +
    +
    + + +
    + + LSB is best for most uses. + DCT provides better stealth but smaller capacity and grayscale output. +
    +
    + + +
    + + +
    +
    +
    + + +
    +
    +
    +
    + + +
    +
    +
    + +
    + + PNG is 100% reliable. JPEG produces smaller, more natural-looking files but uses lossy compression (Q=95). +
    +
    + + +
    +
    +
    +
    +
    LSB Capacity
    +
    -
    +
    +
    +
    DCT Capacity
    +
    -
    +
    +
    +
    + DCT is ~20% of LSB capacity +
    +
    +
    + +
    +
    +
    + @@ -317,7 +466,15 @@ if (rsaKeyQrInput) { // Form submit loading state document.getElementById('encodeForm').addEventListener('submit', function(e) { const btn = document.getElementById('encodeBtn'); - btn.innerHTML = 'Encoding...'; + const selectedMode = document.querySelector('input[name="embed_mode"]:checked').value; + let modeLabel = selectedMode.toUpperCase(); + + if (selectedMode === 'dct') { + const outputFormat = document.querySelector('input[name="dct_output_format"]:checked')?.value || 'png'; + modeLabel += ` → ${outputFormat.toUpperCase()}`; + } + + btn.innerHTML = `Encoding (${modeLabel})...`; btn.disabled = true; }); @@ -338,12 +495,147 @@ messageInput.addEventListener('input', function() { charCount.classList.toggle('text-danger', len > maxChars * 0.95); }); +// ============================================================================ +// v3.0 - Capacity Comparison API +// ============================================================================ + +const carrierInput = document.getElementById('carrierInput'); +const capacityPanel = document.getElementById('capacityPanel'); +const carrierDimensions = document.getElementById('carrierDimensions'); +const lsbCapacityBadge = document.getElementById('lsbCapacityBadge'); +const dctCapacityBadge = document.getElementById('dctCapacityBadge'); +const lsbCapacityDetail = document.getElementById('lsbCapacityDetail'); +const dctCapacityDetail = document.getElementById('dctCapacityDetail'); +const modeCapacityComparison = document.getElementById('modeCapacityComparison'); +const capacityRatio = document.getElementById('capacityRatio'); + +let currentCapacity = null; + +async function fetchCapacityComparison(file) { + const formData = new FormData(); + formData.append('carrier', file); + + try { + const response = await fetch('/api/compare-capacity', { + method: 'POST', + body: formData + }); + + if (response.ok) { + const data = await response.json(); + if (data.success) { + currentCapacity = data; + updateCapacityDisplay(data); + } + } + } catch (err) { + console.error('Capacity comparison failed:', err); + } +} + +function updateCapacityDisplay(data) { + // Update top panel + carrierDimensions.textContent = `${data.width} × ${data.height}`; + lsbCapacityBadge.textContent = `LSB: ${data.lsb.capacity_kb} KB`; + + if (data.dct.available) { + dctCapacityBadge.textContent = `DCT: ${data.dct.capacity_kb} KB`; + dctCapacityBadge.classList.remove('bg-secondary'); + dctCapacityBadge.classList.add('bg-info'); + } else { + dctCapacityBadge.textContent = `DCT: N/A`; + dctCapacityBadge.classList.remove('bg-info'); + dctCapacityBadge.classList.add('bg-secondary'); + } + + capacityPanel.classList.remove('d-none'); + + // Update advanced options panel + lsbCapacityDetail.textContent = `${data.lsb.capacity_kb} KB`; + dctCapacityDetail.textContent = data.dct.available ? `${data.dct.capacity_kb} KB` : 'N/A'; + capacityRatio.textContent = data.dct.available + ? `DCT is ${data.dct.ratio}% of LSB capacity` + : 'DCT mode not available'; + modeCapacityComparison.classList.remove('d-none'); +} + +// Listen for carrier file selection +if (carrierInput) { + carrierInput.addEventListener('change', function() { + if (this.files && this.files[0]) { + fetchCapacityComparison(this.files[0]); + } else { + capacityPanel.classList.add('d-none'); + modeCapacityComparison.classList.add('d-none'); + currentCapacity = null; + } + }); +} + +// ============================================================================ +// Mode card highlighting & DCT output format visibility +// ============================================================================ + +const lsbModeCard = document.getElementById('lsbModeCard'); +const dctModeCard = document.getElementById('dctModeCard'); +const modeLsb = document.getElementById('modeLsb'); +const modeDct = document.getElementById('modeDct'); +const dctOutputFormatGroup = document.getElementById('dctOutputFormatGroup'); +const dctPngCard = document.getElementById('dctPngCard'); +const dctJpegCard = document.getElementById('dctJpegCard'); +const dctFormatPng = document.getElementById('dctFormatPng'); +const dctFormatJpeg = document.getElementById('dctFormatJpeg'); + +function updateModeCardHighlight() { + // Mode cards + lsbModeCard.classList.toggle('border-primary', modeLsb.checked); + lsbModeCard.classList.toggle('border-2', modeLsb.checked); + dctModeCard.classList.toggle('border-info', modeDct.checked); + dctModeCard.classList.toggle('border-2', modeDct.checked); + + // Show/hide DCT output format selector + if (dctOutputFormatGroup) { + dctOutputFormatGroup.classList.toggle('d-none', !modeDct.checked); + } +} + +function updateDctFormatCardHighlight() { + if (dctPngCard && dctJpegCard) { + dctPngCard.classList.toggle('border-success', dctFormatPng.checked); + dctPngCard.classList.toggle('border-2', dctFormatPng.checked); + dctJpegCard.classList.toggle('border-warning', dctFormatJpeg.checked); + dctJpegCard.classList.toggle('border-2', dctFormatJpeg.checked); + } +} + +modeLsb.addEventListener('change', updateModeCardHighlight); +modeDct.addEventListener('change', updateModeCardHighlight); +dctFormatPng?.addEventListener('change', updateDctFormatCardHighlight); +dctFormatJpeg?.addEventListener('change', updateDctFormatCardHighlight); + +updateModeCardHighlight(); // Initial state +updateDctFormatCardHighlight(); // Initial state + +// Advanced options chevron rotation +document.getElementById('advancedOptions').addEventListener('show.bs.collapse', function() { + document.getElementById('advancedChevron').classList.add('bi-chevron-up'); + document.getElementById('advancedChevron').classList.remove('bi-chevron-down'); +}); +document.getElementById('advancedOptions').addEventListener('hide.bs.collapse', function() { + document.getElementById('advancedChevron').classList.remove('bi-chevron-up'); + document.getElementById('advancedChevron').classList.add('bi-chevron-down'); +}); + +// ============================================================================ // Drag & drop with preview for images +// ============================================================================ + document.querySelectorAll('.drop-zone').forEach(zone => { const input = zone.querySelector('input[type="file"]'); const label = zone.querySelector('.drop-zone-label'); const preview = zone.querySelector('.drop-zone-preview'); const isPayloadZone = zone.id === 'payloadDropZone'; + const isCarrierZone = zone.id === 'carrierDropZone'; ['dragenter', 'dragover'].forEach(evt => { zone.addEventListener(evt, e => { @@ -367,6 +659,11 @@ document.querySelectorAll('.drop-zone').forEach(zone => { if (!isPayloadZone) { showPreview(e.dataTransfer.files[0]); } + + // Trigger capacity check for carrier + if (isCarrierZone && e.dataTransfer.files[0]) { + fetchCapacityComparison(e.dataTransfer.files[0]); + } } }); @@ -420,6 +717,7 @@ function checkDuplicateFiles() { document.querySelector('#carrierDropZone .drop-zone-label').innerHTML = '' + 'Drop image or click to browse'; + capacityPanel.classList.add('d-none'); } } } @@ -443,6 +741,11 @@ document.addEventListener('paste', function(e) { targetInput.files = container.files; targetInput.dispatchEvent(new Event('change')); + + // Trigger capacity check if pasted to carrier + if (targetInput === carrierInput) { + fetchCapacityComparison(blob); + } break; } } diff --git a/frontends/web/templates/encode_result.html b/frontends/web/templates/encode_result.html index 6a3036d..4c08c4f 100644 --- a/frontends/web/templates/encode_result.html +++ b/frontends/web/templates/encode_result.html @@ -34,6 +34,34 @@ {{ filename }} + +
    + {% if embed_mode == 'dct' %} + + DCT Mode + + {% if output_format == 'jpeg' %} + + JPEG + +
    Grayscale JPEG, frequency domain embedding (Q=95)
    + {% else %} + + PNG + +
    Grayscale PNG, frequency domain embedding (lossless)
    + {% endif %} + {% else %} + + LSB Mode + + + PNG + +
    Full color PNG, spatial LSB embedding
    + {% endif %} +
    +
    @@ -53,7 +81,14 @@
    • This file expires in 5 minutes
    • Do not resize or recompress the image
    • + {% if embed_mode == 'dct' and output_format == 'jpeg' %} +
    • JPEG format is lossy - avoid re-saving or editing
    • + {% else %}
    • PNG format preserves your hidden data
    • + {% endif %} + {% if embed_mode == 'dct' %} +
    • Recipient needs DCT mode or Auto detection to decode
    • + {% endif %}
    @@ -72,13 +107,14 @@ const shareBtn = document.getElementById('shareBtn'); const fileUrl = "{{ url_for('encode_file_route', file_id=file_id, _external=True) }}"; const fileName = "{{ filename }}"; +const mimeType = "{{ 'image/jpeg' if embed_mode == 'dct' and output_format == 'jpeg' else 'image/png' }}"; if (navigator.share && navigator.canShare) { // Check if we can share files fetch(fileUrl) .then(response => response.blob()) .then(blob => { - const file = new File([blob], fileName, { type: 'image/png' }); + const file = new File([blob], fileName, { type: mimeType }); if (navigator.canShare({ files: [file] })) { shareBtn.style.display = 'block'; shareBtn.addEventListener('click', async () => { @@ -106,4 +142,4 @@ document.getElementById('downloadBtn').addEventListener('click', function() { }, 2000); }); -{% endblock %} \ No newline at end of file +{% endblock %} diff --git a/src/stegasoo/__init__.py b/src/stegasoo/__init__.py index 7be4eb0..fc636bd 100644 --- a/src/stegasoo/__init__.py +++ b/src/stegasoo/__init__.py @@ -1,5 +1,5 @@ """ -Stegasoo - Secure Steganography Library +Stegasoo - Secure Steganography Library (v3.0.1) A Python library for hiding encrypted messages and files in images using hybrid photo + passphrase + PIN authentication. @@ -58,7 +58,7 @@ File Embedding: else: print(decoded.message) -Capacity Pre-check (v2.2.1): +Capacity Pre-check: from stegasoo import will_fit # Check if payload will fit before encoding @@ -68,13 +68,52 @@ Capacity Pre-check (v2.2.1): else: print(f"Need {-result['headroom']} more bytes") +NEW in v3.0 - DCT Embedding Mode: + from stegasoo import encode, has_dct_support, compare_modes + + # Check if DCT mode is available (requires scipy) + if has_dct_support(): + # DCT mode: smaller capacity, grayscale output, frequency domain + result = encode( + message="Secret", + reference_photo=ref_photo, + carrier_image=carrier, + day_phrase="apple forest thunder", + pin="123456", + embed_mode='dct', # NEW parameter + ) + + # Compare mode capacities + info = compare_modes(carrier_image) + print(f"LSB capacity: {info['lsb']['capacity_kb']:.1f} KB") + print(f"DCT capacity: {info['dct']['capacity_kb']:.1f} KB") + +NEW in v3.0.1 - DCT Output Format: + # DCT mode can output PNG (lossless) or JPEG (smaller, natural) + result = encode( + message="Secret", + ..., + embed_mode='dct', + dct_output_format='jpeg', # 'png' (default) or 'jpeg' + ) + Debugging: from stegasoo.debug import debug debug.enable(True) # Enable debug output debug.enable_performance(True) # Enable timing """ -from .constants import __version__, DAY_NAMES, MAX_MESSAGE_SIZE, MAX_FILE_PAYLOAD_SIZE +from .constants import ( + __version__, + DAY_NAMES, + MAX_MESSAGE_SIZE, + MAX_FILE_PAYLOAD_SIZE, + # NEW in v3.0 - Embedding modes + EMBED_MODE_LSB, + EMBED_MODE_DCT, + EMBED_MODE_AUTO, + detect_stego_mode, +) from .models import ( Credentials, EncodeInput, @@ -152,8 +191,13 @@ from .steganography import ( get_image_format, is_lossless_format, LOSSLESS_FORMATS, - # NEW in v2.2.1 will_fit, + # NEW in v3.0 + has_dct_support, + calculate_capacity_by_mode, + will_fit_by_mode, + get_available_modes, + compare_modes, ) from .utils import ( generate_filename, @@ -164,7 +208,6 @@ from .utils import ( secure_delete, SecureDeleter, format_file_size, - # NEW in v2.2.1 strip_image_metadata, ) from .debug import debug # Import debug utilities @@ -183,7 +226,7 @@ from .compression import ( ) # ============================================================================= -# NEW IN v2.2.0 - Batch Processing +# Batch Processing # ============================================================================= from .batch import ( BatchProcessor, @@ -191,10 +234,43 @@ from .batch import ( BatchItem, BatchStatus, batch_capacity_check, - # NEW in v2.2.1 BatchCredentials, ) +# ============================================================================= +# NEW in v3.0 - DCT Steganography (optional, requires scipy) +# ============================================================================= +try: + from .dct_steganography import ( + embed_in_dct, + extract_from_dct, + calculate_dct_capacity, + will_fit_dct, + estimate_capacity_comparison, + DCTEmbedStats, + DCTCapacityInfo, + ) + HAS_DCT = True +except ImportError: + HAS_DCT = False + # Provide stub functions that raise helpful errors + def embed_in_dct(*args, **kwargs): + raise ImportError("DCT mode requires scipy. Install: pip install scipy") + def extract_from_dct(*args, **kwargs): + raise ImportError("DCT mode requires scipy. Install: pip install scipy") + def calculate_dct_capacity(*args, **kwargs): + raise ImportError("DCT mode requires scipy. Install: pip install scipy") + def will_fit_dct(*args, **kwargs): + raise ImportError("DCT mode requires scipy. Install: pip install scipy") + def estimate_capacity_comparison(*args, **kwargs): + raise ImportError("DCT mode requires scipy. Install: pip install scipy") + + # Stub classes + class DCTEmbedStats: + pass + class DCTCapacityInfo: + pass + # QR Code utilities (optional, depends on qrcode and pyzbar) try: from .qr_utils import ( @@ -223,16 +299,22 @@ from pathlib import Path from typing import Optional, Union, Dict, Any +# ============================================================================= +# ENCODE FUNCTION (v3.0.1 - with dct_output_format) +# ============================================================================= + def encode( - message: Union[str, bytes, FilePayload], + message, # Union[str, bytes, FilePayload] reference_photo: bytes, carrier_image: bytes, day_phrase: str, pin: str = "", - rsa_key_data: Optional[bytes] = None, - rsa_password: Optional[str] = None, - date_str: Optional[str] = None, - output_format: Optional[str] = None, + rsa_key_data = None, # Optional[bytes] + rsa_password = None, # Optional[str] + date_str = None, # Optional[str] + output_format = None, # Optional[str] + embed_mode: str = EMBED_MODE_LSB, + dct_output_format: str = "png", # NEW in v3.0.1: 'png' or 'jpeg' ) -> EncodeResult: """ Encode a secret message or file into an image. @@ -249,8 +331,9 @@ def encode( rsa_key_data: RSA private key PEM bytes (optional if using PIN) rsa_password: Password for RSA key if encrypted date_str: Date string YYYY-MM-DD (defaults to today) - output_format: Force output format ('PNG', 'BMP'). If None, preserves - carrier format for lossless types, defaults to PNG for lossy. + output_format: Force output format ('PNG', 'BMP') - LSB mode only + embed_mode: Embedding mode - 'lsb' (default) or 'dct' (v3.0+) + dct_output_format: For DCT mode - 'png' (lossless) or 'jpeg' (smaller) Returns: EncodeResult with stego image and metadata @@ -260,14 +343,37 @@ def encode( SecurityFactorError: If no PIN or RSA key provided CapacityError: If carrier is too small EncryptionError: If encryption fails + ImportError: If DCT mode requested but scipy unavailable - Note: - Output format is always lossless (PNG or BMP) to preserve hidden data. - If carrier is JPEG/GIF, output will be PNG to maintain data integrity. + Example: + # Default LSB mode + >>> result = encode(message="Secret", ...) + + # DCT mode with PNG output (lossless) + >>> result = encode(message="Secret", ..., embed_mode='dct') + + # DCT mode with JPEG output (smaller, natural) + >>> result = encode(message="Secret", ..., embed_mode='dct', dct_output_format='jpeg') """ # Debug logging debug.print(f"encode called: message type={type(message).__name__}, " - f"day_phrase='{day_phrase[:20]}...', pin_length={len(pin)}") + f"day_phrase='{day_phrase[:20]}...', pin_length={len(pin)}, " + f"embed_mode={embed_mode}, dct_output_format={dct_output_format}") + + # Validate embed_mode + if embed_mode not in (EMBED_MODE_LSB, EMBED_MODE_DCT): + raise ValidationError(f"Invalid embed_mode: {embed_mode}. Use 'lsb' or 'dct'") + + if embed_mode == EMBED_MODE_DCT and not has_dct_support(): + raise ImportError( + "DCT embedding mode requires scipy. " + "Install with: pip install scipy" + ) + + # Validate dct_output_format + if dct_output_format not in ('png', 'jpeg'): + debug.print(f"Invalid dct_output_format '{dct_output_format}', defaulting to 'png'") + dct_output_format = 'png' # Validate inputs require_valid_payload(message) @@ -301,27 +407,54 @@ def encode( debug.data(pixel_key, "Pixel key") # Embed in image (returns extension too) + # CRITICAL: Pass dct_output_format to embed_in_image stego_data, stats, extension = embed_in_image( - carrier_image, encrypted, pixel_key, output_format=output_format + encrypted, + carrier_image, + pixel_key, + output_format=output_format, + embed_mode=embed_mode, + dct_output_format=dct_output_format, # NEW in v3.0.1 ) # Generate filename with correct extension filename = generate_filename(date_str, extension=extension) - debug.print(f"Encoding complete: {filename}, " - f"modified {stats.pixels_modified}/{stats.total_pixels} pixels " - f"({stats.modification_percent:.2f}%)") - - return EncodeResult( - stego_image=stego_data, - filename=filename, - pixels_modified=stats.pixels_modified, - total_pixels=stats.total_pixels, - capacity_used=stats.capacity_used, - date_used=date_str - ) + # Handle stats from either LSB or DCT mode + if hasattr(stats, 'pixels_modified'): + # LSB mode stats + debug.print(f"Encoding complete: {filename}, " + f"modified {stats.pixels_modified}/{stats.total_pixels} pixels " + f"({stats.modification_percent:.2f}%)") + + return EncodeResult( + stego_image=stego_data, + filename=filename, + pixels_modified=stats.pixels_modified, + total_pixels=stats.total_pixels, + capacity_used=stats.capacity_used, + date_used=date_str + ) + else: + # DCT mode stats + debug.print(f"Encoding complete (DCT): {filename}, " + f"embedded {stats.bits_embedded // 8} bytes " + f"({stats.usage_percent:.2f}% capacity)") + + return EncodeResult( + stego_image=stego_data, + filename=filename, + pixels_modified=stats.blocks_used * 64, # Approximate + total_pixels=stats.blocks_available * 64, + capacity_used=stats.usage_percent / 100.0, + date_used=date_str + ) +# ============================================================================= +# ENCODE_FILE FUNCTION (v3.0.1 - with dct_output_format) +# ============================================================================= + def encode_file( filepath: Union[str, Path], reference_photo: bytes, @@ -333,6 +466,8 @@ def encode_file( date_str: Optional[str] = None, output_format: Optional[str] = None, filename_override: Optional[str] = None, + embed_mode: str = EMBED_MODE_LSB, + dct_output_format: str = "png", # NEW in v3.0.1 ) -> EncodeResult: """ Encode a file into an image. @@ -348,13 +483,16 @@ def encode_file( rsa_key_data: RSA private key PEM bytes (optional if using PIN) rsa_password: Password for RSA key if encrypted date_str: Date string YYYY-MM-DD (defaults to today) - output_format: Force output format ('PNG', 'BMP') + output_format: Force output format ('PNG', 'BMP') - LSB mode only filename_override: Override the stored filename + embed_mode: 'lsb' (default) or 'dct' (v3.0+) + dct_output_format: For DCT mode - 'png' or 'jpeg' (v3.0.1+) Returns: EncodeResult with stego image and metadata """ - debug.print(f"encode_file called: filepath={filepath}") + debug.print(f"encode_file called: filepath={filepath}, embed_mode={embed_mode}, " + f"dct_output_format={dct_output_format}") payload = FilePayload.from_file(str(filepath), filename_override) return encode( @@ -367,9 +505,15 @@ def encode_file( rsa_password=rsa_password, date_str=date_str, output_format=output_format, + embed_mode=embed_mode, + dct_output_format=dct_output_format, # NEW in v3.0.1 ) +# ============================================================================= +# ENCODE_BYTES FUNCTION (v3.0.1 - with dct_output_format) +# ============================================================================= + def encode_bytes( data: bytes, filename: str, @@ -382,6 +526,8 @@ def encode_bytes( date_str: Optional[str] = None, output_format: Optional[str] = None, mime_type: Optional[str] = None, + embed_mode: str = EMBED_MODE_LSB, + dct_output_format: str = "png", # NEW in v3.0.1 ) -> EncodeResult: """ Encode raw bytes with a filename into an image. @@ -398,13 +544,16 @@ def encode_bytes( rsa_key_data: RSA private key PEM bytes (optional if using PIN) rsa_password: Password for RSA key if encrypted date_str: Date string YYYY-MM-DD (defaults to today) - output_format: Force output format ('PNG', 'BMP') + output_format: Force output format ('PNG', 'BMP') - LSB mode only mime_type: MIME type of the data + embed_mode: 'lsb' (default) or 'dct' (v3.0+) + dct_output_format: For DCT mode - 'png' or 'jpeg' (v3.0.1+) Returns: EncodeResult with stego image and metadata """ - debug.print(f"encode_bytes called: filename={filename}, data_size={len(data)}") + debug.print(f"encode_bytes called: filename={filename}, data_size={len(data)}, " + f"embed_mode={embed_mode}, dct_output_format={dct_output_format}") payload = FilePayload(data=data, filename=filename, mime_type=mime_type) return encode( @@ -417,9 +566,15 @@ def encode_bytes( rsa_password=rsa_password, date_str=date_str, output_format=output_format, + embed_mode=embed_mode, + dct_output_format=dct_output_format, # NEW in v3.0.1 ) +# ============================================================================= +# DECODE FUNCTION +# ============================================================================= + @debug.time def decode( stego_image: bytes, @@ -429,6 +584,7 @@ def decode( rsa_key_data: Optional[bytes] = None, rsa_password: Optional[str] = None, date_str: Optional[str] = None, + embed_mode: str = EMBED_MODE_AUTO, ) -> DecodeResult: """ Decode a secret message or file from a stego image. @@ -443,6 +599,11 @@ def decode( pin: Static PIN (if used during encoding) rsa_key_data: RSA private key PEM bytes (if used during encoding) rsa_password: Password for RSA key if encrypted + date_str: Date override (defaults to today, then checks header) + embed_mode: 'auto' (default), 'lsb', or 'dct' (v3.0+) + - 'auto': Try LSB first, then DCT if available + - 'lsb': Only try LSB extraction + - 'dct': Only try DCT extraction (requires scipy) Returns: DecodeResult with: @@ -457,9 +618,24 @@ def decode( SecurityFactorError: If no PIN or RSA key provided ExtractionError: If data cannot be extracted DecryptionError: If decryption fails + ImportError: If DCT mode explicitly requested but scipy unavailable + + Note: + With embed_mode='auto' (default), tries LSB first then DCT. + For best performance, specify the mode if you know it. """ debug.print(f"decode called: stego_image_size={len(stego_image)}, " - f"day_phrase='{day_phrase[:20]}...'") + f"day_phrase='{day_phrase[:20]}...', embed_mode={embed_mode}") + + # Validate embed_mode + if embed_mode not in (EMBED_MODE_AUTO, EMBED_MODE_LSB, EMBED_MODE_DCT): + raise ValidationError(f"Invalid embed_mode: {embed_mode}. Use 'auto', 'lsb', or 'dct'") + + if embed_mode == EMBED_MODE_DCT and not has_dct_support(): + raise ImportError( + "DCT extraction mode requires scipy. " + "Install with: pip install scipy" + ) # Validate inputs require_security_factors(pin, rsa_key_data) @@ -479,7 +655,12 @@ def decode( debug.data(pixel_key, "Pixel key for extraction") - encrypted = extract_from_image(stego_image, pixel_key) + # Extract with specified mode + encrypted = extract_from_image( + stego_image, + pixel_key, + embed_mode=embed_mode, + ) # If we got data, check if it's from a different date if encrypted: @@ -490,7 +671,11 @@ def decode( pixel_key = derive_pixel_key( reference_photo, day_phrase, header['date'], pin, rsa_key_data ) - encrypted = extract_from_image(stego_image, pixel_key) + encrypted = extract_from_image( + stego_image, + pixel_key, + embed_mode=embed_mode, + ) if not encrypted: debug.print("No data extracted from image") @@ -503,6 +688,10 @@ def decode( return decrypt_message(encrypted, reference_photo, day_phrase, pin, rsa_key_data) +# ============================================================================= +# DECODE_TEXT FUNCTION +# ============================================================================= + def decode_text( stego_image: bytes, reference_photo: bytes, @@ -511,6 +700,7 @@ def decode_text( rsa_key_data: Optional[bytes] = None, rsa_password: Optional[str] = None, date_str: Optional[str] = None, + embed_mode: str = EMBED_MODE_AUTO, ) -> str: """ Decode a text message from a stego image. @@ -525,6 +715,8 @@ def decode_text( pin: Static PIN (if used during encoding) rsa_key_data: RSA private key PEM bytes (if used during encoding) rsa_password: Password for RSA key if encrypted + date_str: Date override + embed_mode: 'auto' (default), 'lsb', or 'dct' (v3.0+) Returns: Decrypted message string @@ -532,8 +724,17 @@ def decode_text( Raises: DecryptionError: If content is a binary file, not text """ - debug.print("decode_text called") - result = decode(stego_image, reference_photo, day_phrase, pin, rsa_key_data, rsa_password) + debug.print(f"decode_text called, embed_mode={embed_mode}") + result = decode( + stego_image, + reference_photo, + day_phrase, + pin, + rsa_key_data, + rsa_password, + date_str, + embed_mode, + ) if result.is_file: # Try to decode file as text @@ -553,6 +754,10 @@ def decode_text( return message +# ============================================================================= +# EXPORTS +# ============================================================================= + __all__ = [ # Version '__version__', @@ -565,6 +770,27 @@ __all__ = [ 'decode_text', 'generate_credentials', + # NEW in v3.0 - Embedding modes + 'EMBED_MODE_LSB', + 'EMBED_MODE_DCT', + 'EMBED_MODE_AUTO', + 'has_dct_support', + 'compare_modes', + 'get_available_modes', + 'calculate_capacity_by_mode', + 'will_fit_by_mode', + 'detect_stego_mode', + 'HAS_DCT', + + # NEW in v3.0 - DCT functions (available if scipy installed) + 'embed_in_dct', + 'extract_from_dct', + 'calculate_dct_capacity', + 'will_fit_dct', + 'estimate_capacity_comparison', + 'DCTEmbedStats', + 'DCTCapacityInfo', + # Constants 'DAY_NAMES', 'LOSSLESS_FORMATS', @@ -646,7 +872,7 @@ __all__ = [ 'get_image_dimensions', 'get_image_format', 'is_lossless_format', - 'will_fit', # NEW in v2.2.1 + 'will_fit', # Utilities 'generate_filename', @@ -657,12 +883,12 @@ __all__ = [ 'secure_delete', 'SecureDeleter', 'format_file_size', - 'strip_image_metadata', # NEW in v2.2.1 + 'strip_image_metadata', # Debugging 'debug', - # Compression (v2.2.0) + # Compression 'compress', 'decompress', 'CompressionAlgorithm', @@ -671,11 +897,11 @@ __all__ = [ 'estimate_compressed_size', 'get_available_algorithms', - # Batch processing (v2.2.0) + # Batch processing 'BatchProcessor', 'BatchResult', 'BatchItem', 'BatchStatus', 'batch_capacity_check', - 'BatchCredentials', # NEW in v2.2.1 + 'BatchCredentials', ] diff --git a/src/stegasoo/constants.py b/src/stegasoo/constants.py index 2fea0e3..0d85566 100644 --- a/src/stegasoo/constants.py +++ b/src/stegasoo/constants.py @@ -12,7 +12,7 @@ from pathlib import Path # VERSION # ============================================================================ -__version__ = "2.2.1" +__version__ = "3.1.0" # ============================================================================ # FILE FORMAT @@ -181,3 +181,44 @@ def get_wordlist() -> list[str]: if _bip39_words is None: _bip39_words = get_bip39_words() return _bip39_words + + +# ============================================================================= +# DCT STEGANOGRAPHY (v3.0) +# ============================================================================= + +# Embedding modes +EMBED_MODE_LSB = 'lsb' # Spatial LSB embedding (default, original mode) +EMBED_MODE_DCT = 'dct' # DCT domain embedding (new in v3.0) +EMBED_MODE_AUTO = 'auto' # Auto-detect on decode + +# DCT-specific constants +DCT_MAGIC_HEADER = b'\x89DCT' # Magic header for DCT mode +DCT_FORMAT_VERSION = 1 +DCT_STEP_SIZE = 8 # QIM quantization step + +# Valid embedding modes +VALID_EMBED_MODES = {EMBED_MODE_LSB, EMBED_MODE_DCT} + + +def detect_stego_mode(encrypted_data: bytes) -> str: + """ + Detect embedding mode from encrypted payload header. + + Args: + encrypted_data: First few bytes of extracted payload + + Returns: + 'lsb' or 'dct' or 'unknown' + """ + if len(encrypted_data) < 4: + return 'unknown' + + header = encrypted_data[:4] + + if header == b'\x89ST3': + return EMBED_MODE_LSB + elif header == b'\x89DCT': + return EMBED_MODE_DCT + else: + return 'unknown' diff --git a/src/stegasoo/dct_steganography.py b/src/stegasoo/dct_steganography.py new file mode 100644 index 0000000..a517a25 --- /dev/null +++ b/src/stegasoo/dct_steganography.py @@ -0,0 +1,554 @@ +""" +DCT Domain Steganography Module (v3.0.1) + +Embeds data in DCT coefficients of grayscale images. +Supports PNG (lossless) or JPEG (natural, smaller) output. + +This provides an alternative to LSB embedding with different trade-offs: +- More resistant to visual inspection +- Survives some image processing +- Lower capacity (~20% of LSB) +- Works in frequency domain + +Requires: scipy (for DCT transforms) +""" + +import io +import struct +import hashlib +from dataclasses import dataclass +from typing import Optional, Literal +from enum import Enum + +import numpy as np +from PIL import Image + +# Check for scipy availability +try: + from scipy.fftpack import dct, idct + HAS_SCIPY = True +except ImportError: + HAS_SCIPY = False + dct = None + idct = None + + +# ============================================================================ +# CONSTANTS +# ============================================================================ + +# DCT block size (standard 8x8 like JPEG) +BLOCK_SIZE = 8 + +# Coefficients to use for embedding (mid-frequency, zig-zag order positions) +# Avoiding DC (0,0) and high-frequency edges +# These positions are relatively stable across JPEG compression +EMBED_POSITIONS = [ + (0, 1), (1, 0), (2, 0), (1, 1), (0, 2), (0, 3), (1, 2), (2, 1), (3, 0), + (4, 0), (3, 1), (2, 2), (1, 3), (0, 4), (0, 5), (1, 4), (2, 3), (3, 2), + (4, 1), (5, 0), (5, 1), (4, 2), (3, 3), (2, 4), (1, 5), (0, 6), (0, 7), + (1, 6), (2, 5), (3, 4), (4, 3), (5, 2), (6, 1), (7, 0), +] + +# Use subset of mid-frequency coefficients for better robustness +# Positions 4-20 in zig-zag order (skip very low and very high frequencies) +DEFAULT_EMBED_POSITIONS = EMBED_POSITIONS[4:20] # 16 coefficients per block + +# Quantization step for embedding (larger = more robust, more visible) +QUANT_STEP = 25 + +# Magic bytes for DCT stego identification +DCT_MAGIC = b'DCTS' + +# Header: magic(4) + version(1) + flags(1) + length(4) = 10 bytes +HEADER_SIZE = 10 + +# Output format options +OUTPUT_FORMAT_PNG = 'png' +OUTPUT_FORMAT_JPEG = 'jpeg' + +# JPEG quality for output (high to preserve coefficients) +JPEG_OUTPUT_QUALITY = 95 + + +# ============================================================================ +# DATA CLASSES +# ============================================================================ + +class DCTOutputFormat(Enum): + """Output format for DCT stego images.""" + PNG = 'png' + JPEG = 'jpeg' + + +@dataclass +class DCTEmbedStats: + """Statistics from DCT embedding operation.""" + blocks_used: int + blocks_available: int + bits_embedded: int + capacity_bits: int + usage_percent: float + image_width: int + image_height: int + output_format: str # 'png' or 'jpeg' + + +@dataclass +class DCTCapacityInfo: + """Capacity information for a carrier image.""" + width: int + height: int + blocks_x: int + blocks_y: int + total_blocks: int + bits_per_block: int + total_capacity_bits: int + total_capacity_bytes: int + usable_capacity_bytes: int # After header overhead + + +# ============================================================================ +# HELPER FUNCTIONS +# ============================================================================ + +def _check_scipy(): + """Raise ImportError if scipy is not available.""" + if not HAS_SCIPY: + raise ImportError( + "DCT steganography requires scipy. " + "Install with: pip install scipy" + ) + + +def _dct2(block: np.ndarray) -> np.ndarray: + """Apply 2D DCT to a block.""" + return dct(dct(block.T, norm='ortho').T, norm='ortho') + + +def _idct2(block: np.ndarray) -> np.ndarray: + """Apply 2D inverse DCT to a block.""" + return idct(idct(block.T, norm='ortho').T, norm='ortho') + + +def _to_grayscale(image_data: bytes) -> np.ndarray: + """Convert image bytes to grayscale numpy array.""" + img = Image.open(io.BytesIO(image_data)) + gray = img.convert('L') + return np.array(gray, dtype=np.float64) + + +def _pad_to_blocks(image: np.ndarray) -> tuple[np.ndarray, tuple[int, int]]: + """Pad image dimensions to be divisible by block size.""" + h, w = image.shape + new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE + new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE + + if new_h == h and new_w == w: + return image, (h, w) + + padded = np.zeros((new_h, new_w), dtype=image.dtype) + padded[:h, :w] = image + + # Mirror padding for smoother edges + if new_h > h: + padded[h:, :w] = image[h-(new_h-h):h, :w][::-1, :] + if new_w > w: + padded[:h, w:] = image[:h, w-(new_w-w):w][:, ::-1] + if new_h > h and new_w > w: + padded[h:, w:] = image[h-(new_h-h):h, w-(new_w-w):w][::-1, ::-1] + + return padded, (h, w) + + +def _unpad_image(image: np.ndarray, original_size: tuple[int, int]) -> np.ndarray: + """Remove padding from image.""" + h, w = original_size + return image[:h, :w] + + +def _embed_bit_in_coeff(coeff: float, bit: int, quant_step: int = QUANT_STEP) -> float: + """Embed a single bit into a DCT coefficient using QIM.""" + # Quantization Index Modulation + quantized = round(coeff / quant_step) + if (quantized % 2) != bit: + # Adjust to embed the bit + if quantized % 2 == 0 and bit == 1: + quantized += 1 if coeff >= quantized * quant_step else -1 + elif quantized % 2 == 1 and bit == 0: + quantized += 1 if coeff >= quantized * quant_step else -1 + return quantized * quant_step + + +def _extract_bit_from_coeff(coeff: float, quant_step: int = QUANT_STEP) -> int: + """Extract a single bit from a DCT coefficient.""" + quantized = round(coeff / quant_step) + return quantized % 2 + + +def _generate_block_order(num_blocks: int, seed: bytes) -> list[int]: + """Generate pseudo-random block order from seed.""" + # Create deterministic RNG from seed + hash_bytes = hashlib.sha256(seed).digest() + rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big')) + + order = list(range(num_blocks)) + rng.shuffle(order) + return order + + +def _save_stego_image( + image: np.ndarray, + output_format: str = OUTPUT_FORMAT_PNG +) -> bytes: + """Save stego image in specified format.""" + # Clip to valid range and convert to uint8 + clipped = np.clip(image, 0, 255).astype(np.uint8) + img = Image.fromarray(clipped, mode='L') + + buffer = io.BytesIO() + + if output_format == OUTPUT_FORMAT_JPEG: + # High-quality JPEG with no chroma subsampling + img.save( + buffer, + format='JPEG', + quality=JPEG_OUTPUT_QUALITY, + subsampling=0, # 4:4:4 - no subsampling + optimize=True + ) + else: + # PNG (lossless, default) + img.save(buffer, format='PNG', optimize=True) + + return buffer.getvalue() + + +def _create_header(data_length: int, flags: int = 0) -> bytes: + """Create DCT stego header.""" + # Header format: MAGIC(4) + VERSION(1) + FLAGS(1) + LENGTH(4) + version = 1 + return struct.pack('>4sBBI', DCT_MAGIC, version, flags, data_length) + + +def _parse_header(header_bits: list[int]) -> tuple[int, int, int]: + """Parse header from extracted bits. Returns (version, flags, data_length).""" + if len(header_bits) < HEADER_SIZE * 8: + raise ValueError("Insufficient header data") + + # Convert bits to bytes + header_bytes = bytes([ + sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8)) + for i in range(HEADER_SIZE) + ]) + + magic, version, flags, length = struct.unpack('>4sBBI', header_bytes) + + if magic != DCT_MAGIC: + raise ValueError("Invalid DCT stego magic bytes - not a DCT stego image") + + return version, flags, length + + +# ============================================================================ +# PUBLIC API +# ============================================================================ + +def has_dct_support() -> bool: + """Check if DCT steganography is available.""" + return HAS_SCIPY + + +def calculate_dct_capacity(image_data: bytes) -> DCTCapacityInfo: + """ + Calculate the DCT embedding capacity of an image. + + Args: + image_data: Image file bytes + + Returns: + DCTCapacityInfo with capacity details + """ + _check_scipy() + + img = Image.open(io.BytesIO(image_data)) + width, height = img.size + + # Calculate blocks + blocks_x = width // BLOCK_SIZE + blocks_y = height // BLOCK_SIZE + total_blocks = blocks_x * blocks_y + + # Bits per block (using selected coefficient positions) + bits_per_block = len(DEFAULT_EMBED_POSITIONS) + + # Total capacity + total_bits = total_blocks * bits_per_block + total_bytes = total_bits // 8 + + # Usable capacity (minus header) + usable_bytes = max(0, total_bytes - HEADER_SIZE) + + return DCTCapacityInfo( + width=width, + height=height, + blocks_x=blocks_x, + blocks_y=blocks_y, + total_blocks=total_blocks, + bits_per_block=bits_per_block, + total_capacity_bits=total_bits, + total_capacity_bytes=total_bytes, + usable_capacity_bytes=usable_bytes + ) + + +def will_fit_dct(data_length: int, image_data: bytes) -> bool: + """ + Check if data will fit in the image using DCT embedding. + + Args: + data_length: Length of data in bytes + image_data: Carrier image bytes + + Returns: + True if data fits, False otherwise + """ + capacity = calculate_dct_capacity(image_data) + return data_length <= capacity.usable_capacity_bytes + + +def estimate_capacity_comparison(image_data: bytes) -> dict: + """ + Compare LSB and DCT capacity for an image. + + Args: + image_data: Image file bytes + + Returns: + Dict with 'lsb' and 'dct' capacity info + """ + img = Image.open(io.BytesIO(image_data)) + width, height = img.size + pixels = width * height + + # LSB capacity (3 bits per pixel for RGB, simplified) + lsb_bytes = (pixels * 3) // 8 + + # DCT capacity + if HAS_SCIPY: + dct_info = calculate_dct_capacity(image_data) + dct_bytes = dct_info.usable_capacity_bytes + else: + # Estimate without scipy + blocks = (width // 8) * (height // 8) + dct_bytes = (blocks * 16) // 8 - HEADER_SIZE + + return { + 'width': width, + 'height': height, + 'lsb': { + 'capacity_bytes': lsb_bytes, + 'capacity_kb': lsb_bytes / 1024, + 'output': 'PNG/BMP (color)', + }, + 'dct': { + 'capacity_bytes': dct_bytes, + 'capacity_kb': dct_bytes / 1024, + 'output': 'PNG or JPEG (grayscale)', + 'ratio_vs_lsb': (dct_bytes / lsb_bytes * 100) if lsb_bytes > 0 else 0, + 'available': HAS_SCIPY, + } + } + + +def embed_in_dct( + data: bytes, + carrier_image: bytes, + seed: bytes, + output_format: str = OUTPUT_FORMAT_PNG, +) -> tuple[bytes, DCTEmbedStats]: + """ + Embed data into image using DCT coefficient modification. + + Args: + data: Data to embed + carrier_image: Carrier image bytes + seed: Seed for pseudo-random block selection + output_format: Output format - 'png' (default, lossless) or 'jpeg' (smaller) + + Returns: + Tuple of (stego_image_bytes, stats) + + Raises: + ImportError: If scipy is not available + ValueError: If data is too large for carrier + """ + _check_scipy() + + # Validate output format + if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG): + raise ValueError(f"Invalid output format: {output_format}. Use 'png' or 'jpeg'") + + # Calculate capacity + capacity_info = calculate_dct_capacity(carrier_image) + + if len(data) > capacity_info.usable_capacity_bytes: + raise ValueError( + f"Data too large ({len(data)} bytes) for carrier " + f"(capacity: {capacity_info.usable_capacity_bytes} bytes)" + ) + + # Prepare image + image = _to_grayscale(carrier_image) + padded, original_size = _pad_to_blocks(image) + + # Create header + data + header = _create_header(len(data)) + payload = header + data + + # Convert payload to bits + bits = [] + for byte in payload: + for i in range(7, -1, -1): + bits.append((byte >> i) & 1) + + # Generate block order + num_blocks = capacity_info.total_blocks + block_order = _generate_block_order(num_blocks, seed) + + # Embed bits + bit_idx = 0 + blocks_used = 0 + h, w = padded.shape + + for block_num in block_order: + if bit_idx >= len(bits): + break + + # Calculate block position + by = (block_num // (w // BLOCK_SIZE)) * BLOCK_SIZE + bx = (block_num % (w // BLOCK_SIZE)) * BLOCK_SIZE + + # Extract and transform block + block = padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE].copy() + dct_block = _dct2(block) + + # Embed bits in selected coefficients + for pos in DEFAULT_EMBED_POSITIONS: + if bit_idx >= len(bits): + break + dct_block[pos] = _embed_bit_in_coeff(dct_block[pos], bits[bit_idx]) + bit_idx += 1 + + # Inverse transform and store + modified_block = _idct2(dct_block) + padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] = modified_block + blocks_used += 1 + + # Remove padding and save + result = _unpad_image(padded, original_size) + stego_bytes = _save_stego_image(result, output_format) + + stats = DCTEmbedStats( + blocks_used=blocks_used, + blocks_available=capacity_info.total_blocks, + bits_embedded=len(bits), + capacity_bits=capacity_info.total_capacity_bits, + usage_percent=(len(bits) / capacity_info.total_capacity_bits) * 100, + image_width=original_size[1], + image_height=original_size[0], + output_format=output_format, + ) + + return stego_bytes, stats + + +def extract_from_dct( + stego_image: bytes, + seed: bytes, +) -> bytes: + """ + Extract data from DCT stego image. + + Args: + stego_image: Stego image bytes + seed: Same seed used for embedding + + Returns: + Extracted data bytes + + Raises: + ImportError: If scipy is not available + ValueError: If image is not a valid DCT stego image + """ + _check_scipy() + + # Prepare image + image = _to_grayscale(stego_image) + padded, original_size = _pad_to_blocks(image) + + # Calculate capacity + h, w = padded.shape + blocks_x = w // BLOCK_SIZE + blocks_y = h // BLOCK_SIZE + num_blocks = blocks_x * blocks_y + + # Generate same block order + block_order = _generate_block_order(num_blocks, seed) + + # Extract all bits (we'll stop when we have enough based on header) + all_bits = [] + + for block_num in block_order: + by = (block_num // blocks_x) * BLOCK_SIZE + bx = (block_num % blocks_x) * BLOCK_SIZE + + block = padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] + dct_block = _dct2(block) + + for pos in DEFAULT_EMBED_POSITIONS: + bit = _extract_bit_from_coeff(dct_block[pos]) + all_bits.append(bit) + + # Check if we have enough for header + if len(all_bits) >= HEADER_SIZE * 8: + try: + _, _, data_length = _parse_header(all_bits[:HEADER_SIZE * 8]) + total_needed = (HEADER_SIZE + data_length) * 8 + if len(all_bits) >= total_needed: + break + except ValueError: + # Not enough data yet or invalid, continue + pass + + # Parse header + version, flags, data_length = _parse_header(all_bits) + + # Extract data bits + data_bits = all_bits[HEADER_SIZE * 8:(HEADER_SIZE + data_length) * 8] + + # Convert bits to bytes + data = bytes([ + sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8)) + for i in range(data_length) + ]) + + return data + + +# ============================================================================ +# CONVENIENCE FUNCTIONS +# ============================================================================ + +def get_output_extension(output_format: str) -> str: + """Get file extension for output format.""" + if output_format == OUTPUT_FORMAT_JPEG: + return '.jpg' + return '.png' + + +def get_output_mimetype(output_format: str) -> str: + """Get MIME type for output format.""" + if output_format == OUTPUT_FORMAT_JPEG: + return 'image/jpeg' + return 'image/png' diff --git a/src/stegasoo/steganography.py b/src/stegasoo/steganography.py index bfa7fbd..58a64d0 100644 --- a/src/stegasoo/steganography.py +++ b/src/stegasoo/steganography.py @@ -1,7 +1,16 @@ """ -Stegasoo Steganography Functions +Stegasoo Steganography Functions (v3.0.1) -LSB embedding and extraction with pseudo-random pixel selection. +LSB and DCT embedding modes with pseudo-random pixel/coefficient selection. + +New in v3.0: +- DCT domain embedding mode (requires scipy) +- embed_mode parameter for encode/decode +- Auto-detection of embedding mode +- Comparison utilities + +New in v3.0.1: +- dct_output_format parameter for DCT mode ('png' or 'jpeg') """ import io @@ -15,6 +24,12 @@ from cryptography.hazmat.backends import default_backend from .models import EmbedStats, FilePayload from .exceptions import CapacityError, ExtractionError, EmbeddingError from .debug import debug +from .constants import ( + EMBED_MODE_LSB, + EMBED_MODE_DCT, + EMBED_MODE_AUTO, + VALID_EMBED_MODES, +) # Lossless formats that preserve LSB data @@ -40,6 +55,48 @@ HEADER_OVERHEAD = 104 # Magic + version + date + salt + iv + tag LENGTH_PREFIX = 4 # 4 bytes for payload length ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX +# DCT output format options (v3.0.1) +DCT_OUTPUT_PNG = 'png' +DCT_OUTPUT_JPEG = 'jpeg' + + +# ============================================================================= +# DCT MODULE LAZY LOADING +# ============================================================================= + +_dct_module = None + + +def _get_dct_module(): + """Lazy load DCT module to avoid scipy import if not needed.""" + global _dct_module + if _dct_module is None: + from . import dct_steganography + _dct_module = dct_steganography + return _dct_module + + +def has_dct_support() -> bool: + """ + Check if DCT steganography mode is available. + + Returns: + True if scipy is installed and DCT functions work + + Example: + >>> if has_dct_support(): + ... result = encode(..., embed_mode='dct') + """ + try: + dct_mod = _get_dct_module() + return dct_mod.has_dct_support() + except ImportError: + return False + + +# ============================================================================= +# FORMAT UTILITIES +# ============================================================================= def get_output_format(input_format: Optional[str]) -> Tuple[str, str]: """ @@ -51,12 +108,6 @@ def get_output_format(input_format: Optional[str]) -> Tuple[str, str]: Returns: Tuple of (PIL format string, file extension) for output Falls back to PNG for lossy or unknown formats. - - Example: - >>> get_output_format('JPEG') - ('PNG', 'png') - >>> get_output_format('PNG') - ('PNG', 'png') """ debug.validate(input_format is None or isinstance(input_format, str), "Input format must be string or None") @@ -67,11 +118,14 @@ def get_output_format(input_format: Optional[str]) -> Tuple[str, str]: debug.print(f"Using lossless format: {fmt} -> .{ext}") return fmt, ext - # Default to PNG for lossy formats (JPEG, GIF) or unknown debug.print(f"Input format {input_format} is lossy or unknown, defaulting to PNG") return 'PNG', 'png' +# ============================================================================= +# CAPACITY FUNCTIONS +# ============================================================================= + def will_fit( payload: Union[str, bytes, FilePayload, int], carrier_image: bytes, @@ -79,38 +133,16 @@ def will_fit( include_compression_estimate: bool = True, ) -> dict: """ - Check if a payload will fit in a carrier image without performing encryption. - - This is a lightweight pre-check to avoid wasted work on payloads that - are too large. For accurate results with compression, the actual compressed - size may vary. + Check if a payload will fit in a carrier image (LSB mode). Args: payload: Message string, raw bytes, FilePayload, or size in bytes carrier_image: Carrier image bytes bits_per_channel: Bits to use per color channel (1-2) - include_compression_estimate: Estimate compressed size (requires payload data) + include_compression_estimate: Estimate compressed size Returns: - Dict with: - - fits: bool - Whether payload will fit - - payload_size: int - Raw payload size in bytes - - estimated_encrypted_size: int - Estimated size after encryption + overhead - - capacity: int - Available capacity in bytes - - usage_percent: float - Estimated capacity usage (0-100) - - headroom: int - Bytes remaining (negative if won't fit) - - compressed_estimate: int | None - Estimated compressed size (if applicable) - - Example: - >>> result = will_fit("Hello world", carrier_bytes) - >>> result['fits'] - True - >>> result['usage_percent'] - 0.5 - - >>> result = will_fit(50000, carrier_bytes) # Check if 50KB would fit - >>> result['fits'] - False + Dict with fits, capacity, usage info """ # Determine payload size if isinstance(payload, int): @@ -121,42 +153,35 @@ def will_fit( payload_size = len(payload_data) elif isinstance(payload, FilePayload): payload_data = payload.data - # Account for filename/mime metadata filename_overhead = len(payload.filename.encode('utf-8')) if payload.filename else 0 mime_overhead = len(payload.mime_type.encode('utf-8')) if payload.mime_type else 0 - payload_size = len(payload.data) + filename_overhead + mime_overhead + 5 # +5 for length prefixes + type byte + payload_size = len(payload.data) + filename_overhead + mime_overhead + 5 else: payload_data = payload payload_size = len(payload) - # Calculate capacity capacity = calculate_capacity(carrier_image, bits_per_channel) - # Estimate encrypted size (payload + random padding + overhead) - # Padding adds 64-319 bytes, averaging ~190 estimated_padding = 190 estimated_encrypted_size = payload_size + estimated_padding + ENCRYPTION_OVERHEAD - # Compression estimate compressed_estimate = None if include_compression_estimate and payload_data is not None and len(payload_data) >= 64: try: import zlib compressed = zlib.compress(payload_data, level=6) - # Add compression header overhead (9 bytes) compressed_size = len(compressed) + 9 if compressed_size < payload_size: compressed_estimate = compressed_size - # Use compressed size for fit calculation estimated_encrypted_size = compressed_size + estimated_padding + ENCRYPTION_OVERHEAD except Exception: - pass # Ignore compression errors + pass headroom = capacity - estimated_encrypted_size fits = headroom >= 0 usage_percent = (estimated_encrypted_size / capacity * 100) if capacity > 0 else 100.0 - result = { + return { 'fits': fits, 'payload_size': payload_size, 'estimated_encrypted_size': estimated_encrypted_size, @@ -164,14 +189,206 @@ def will_fit( 'usage_percent': min(usage_percent, 100.0), 'headroom': headroom, 'compressed_estimate': compressed_estimate, + 'mode': EMBED_MODE_LSB, } - - debug.print(f"will_fit: payload={payload_size}, encrypted~={estimated_encrypted_size}, " - f"capacity={capacity}, fits={fits}") - - return result +def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int: + """ + Calculate the maximum message capacity of an image (LSB mode). + + Args: + image_data: Image bytes + bits_per_channel: Bits to use per color channel + + Returns: + Maximum bytes that can be embedded (minus overhead) + """ + debug.validate(bits_per_channel in (1, 2), + f"bits_per_channel must be 1 or 2, got {bits_per_channel}") + + img_file = Image.open(io.BytesIO(image_data)) + img = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file + + num_pixels = img.size[0] * img.size[1] + bits_per_pixel = 3 * bits_per_channel + max_bytes = (num_pixels * bits_per_pixel) // 8 + + capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD) + debug.print(f"LSB capacity: {capacity} bytes at {bits_per_channel} bit(s)/channel") + return capacity + + +def calculate_capacity_by_mode( + image_data: bytes, + embed_mode: str = EMBED_MODE_LSB, + bits_per_channel: int = 1, +) -> dict: + """ + Calculate capacity for specified embedding mode. + + Args: + image_data: Carrier image bytes + embed_mode: 'lsb' or 'dct' + bits_per_channel: Bits per channel for LSB mode + + Returns: + Dict with capacity information + """ + if embed_mode == EMBED_MODE_DCT: + if not has_dct_support(): + raise ImportError("scipy required for DCT mode. Install: pip install scipy") + + dct_mod = _get_dct_module() + dct_info = dct_mod.calculate_dct_capacity(image_data) + + return { + 'mode': EMBED_MODE_DCT, + 'capacity_bytes': dct_info.usable_capacity_bytes, + 'capacity_bits': dct_info.total_capacity_bits, + 'width': dct_info.width, + 'height': dct_info.height, + 'total_blocks': dct_info.total_blocks, + } + else: + capacity = calculate_capacity(image_data, bits_per_channel) + img = Image.open(io.BytesIO(image_data)) + width, height = img.size + + return { + 'mode': EMBED_MODE_LSB, + 'capacity_bytes': capacity, + 'capacity_bits': capacity * 8, + 'width': width, + 'height': height, + 'bits_per_channel': bits_per_channel, + } + + +def will_fit_by_mode( + payload: Union[str, bytes, FilePayload, int], + carrier_image: bytes, + embed_mode: str = EMBED_MODE_LSB, + bits_per_channel: int = 1, +) -> dict: + """ + Check if payload fits in specified mode. + + Args: + payload: Message, bytes, FilePayload, or size in bytes + carrier_image: Carrier image bytes + embed_mode: 'lsb' or 'dct' + bits_per_channel: For LSB mode + + Returns: + Dict with fits, capacity, usage info + """ + if embed_mode == EMBED_MODE_DCT: + if not has_dct_support(): + return {'fits': False, 'error': 'scipy not available', 'mode': EMBED_MODE_DCT} + + if isinstance(payload, int): + payload_size = payload + elif isinstance(payload, str): + payload_size = len(payload.encode('utf-8')) + elif hasattr(payload, 'data'): + payload_size = len(payload.data) + else: + payload_size = len(payload) + + estimated_size = payload_size + ENCRYPTION_OVERHEAD + 190 + + dct_mod = _get_dct_module() + fits = dct_mod.will_fit_dct(estimated_size, carrier_image) + capacity_info = dct_mod.calculate_dct_capacity(carrier_image) + capacity = capacity_info.usable_capacity_bytes + + usage_percent = (estimated_size / capacity * 100) if capacity > 0 else 100.0 + + return { + 'fits': fits, + 'payload_size': payload_size, + 'capacity': capacity, + 'usage_percent': min(usage_percent, 100.0), + 'headroom': capacity - estimated_size, + 'mode': EMBED_MODE_DCT, + } + else: + return will_fit(payload, carrier_image, bits_per_channel) + + +def get_available_modes() -> dict: + """ + Get available embedding modes and their status. + + Returns: + Dict mapping mode name to availability info + """ + return { + EMBED_MODE_LSB: { + 'available': True, + 'name': 'Spatial LSB', + 'description': 'Embed in pixel LSBs, outputs PNG/BMP', + 'output_format': 'PNG (color)', + }, + EMBED_MODE_DCT: { + 'available': has_dct_support(), + 'name': 'DCT Domain', + 'description': 'Embed in DCT coefficients, outputs grayscale PNG or JPEG', + 'output_formats': ['PNG (grayscale)', 'JPEG (grayscale)'], + 'requires': 'scipy', + }, + } + + +def compare_modes(image_data: bytes) -> dict: + """ + Compare embedding modes for a carrier image. + + Args: + image_data: Carrier image bytes + + Returns: + Dict with comparison of LSB vs DCT modes + """ + img = Image.open(io.BytesIO(image_data)) + width, height = img.size + + lsb_bytes = calculate_capacity(image_data, 1) + + if has_dct_support(): + dct_mod = _get_dct_module() + dct_info = dct_mod.calculate_dct_capacity(image_data) + dct_bytes = dct_info.usable_capacity_bytes + dct_available = True + else: + safe_blocks = (height // 8) * (width // 8) + dct_bytes = (safe_blocks * 16) // 8 # Estimated + dct_available = False + + return { + 'width': width, + 'height': height, + 'lsb': { + 'capacity_bytes': lsb_bytes, + 'capacity_kb': lsb_bytes / 1024, + 'available': True, + 'output': 'PNG (color)', + }, + 'dct': { + 'capacity_bytes': dct_bytes, + 'capacity_kb': dct_bytes / 1024, + 'available': dct_available, + 'output': 'PNG or JPEG (grayscale)', + 'ratio_vs_lsb': (dct_bytes / lsb_bytes * 100) if lsb_bytes > 0 else 0, + }, + } + + +# ============================================================================= +# PIXEL INDEX GENERATION +# ============================================================================= + @debug.time def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List[int]: """ @@ -179,17 +396,6 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List Uses ChaCha20 as a CSPRNG seeded by the key to deterministically select which pixels will hold hidden data. - - Args: - key: 32-byte key for pixel selection - num_pixels: Total pixels in image - num_needed: Number of pixels needed for embedding - - Returns: - List of pixel indices - - Note: - Optimizes for both small and large num_needed values. """ debug.validate(len(key) == 32, f"Pixel key must be 32 bytes, got {len(key)}") debug.validate(num_pixels > 0, f"Number of pixels must be positive, got {num_pixels}") @@ -200,7 +406,6 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List debug.print(f"Generating {num_needed} pixel indices from {num_pixels} total pixels") if num_needed >= num_pixels // 2: - # If we need many pixels, shuffle all indices debug.print(f"Using full shuffle (needed {num_needed}/{num_pixels} pixels)") nonce = b'\x00' * 16 cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend()) @@ -209,7 +414,6 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List indices = list(range(num_pixels)) random_bytes = encryptor.update(b'\x00' * (num_pixels * 4)) - # Fisher-Yates shuffle using CSPRNG for i in range(num_pixels - 1, 0, -1): j_bytes = random_bytes[(num_pixels - 1 - i) * 4:(num_pixels - i) * 4] j = int.from_bytes(j_bytes, 'big') % (i + 1) @@ -219,7 +423,6 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List debug.print(f"Generated {len(selected)} indices via shuffle") return selected - # Optimized path: generate indices directly (for smaller selections) debug.print(f"Using optimized selection (needed {num_needed}/{num_pixels} pixels)") selected = [] used = set() @@ -228,7 +431,6 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend()) encryptor = cipher.encryptor() - # Generate more than needed to handle collisions bytes_needed = (num_needed * 2) * 4 random_bytes = encryptor.update(b'\x00' * bytes_needed) @@ -244,11 +446,10 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List else: collisions += 1 - # Generate additional if needed (rare) if len(selected) < num_needed: debug.print(f"Need {num_needed - len(selected)} more indices, generating...") extra_needed = num_needed - len(selected) - for _ in range(extra_needed * 2): # Try twice as many to account for collisions + for _ in range(extra_needed * 2): extra_bytes = encryptor.update(b'\x00' * 4) idx = int.from_bytes(extra_bytes, 'big') % num_pixels if idx not in used: @@ -263,43 +464,91 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List return selected +# ============================================================================= +# EMBEDDING FUNCTIONS +# ============================================================================= + @debug.time def embed_in_image( - carrier_data: bytes, - encrypted_data: bytes, + data: bytes, + image_data: bytes, pixel_key: bytes, bits_per_channel: int = 1, - output_format: Optional[str] = None -) -> Tuple[bytes, EmbedStats, str]: + output_format: Optional[str] = None, + embed_mode: str = EMBED_MODE_LSB, + dct_output_format: str = DCT_OUTPUT_PNG, # NEW in v3.0.1 +) -> Tuple[bytes, Union[EmbedStats, 'DCTEmbedStats'], str]: """ - Embed encrypted data in carrier image using LSB steganography. - - Uses pseudo-random pixel selection based on pixel_key to scatter - the data across the image, defeating statistical analysis. - - Note: Output images have all metadata (EXIF, etc.) stripped automatically. + Embed data into an image using specified mode. Args: - carrier_data: Carrier image bytes - encrypted_data: Data to embed - pixel_key: Key for pixel selection - bits_per_channel: Bits to use per color channel (1-2) - output_format: Force specific output format (PNG, BMP). - If None, auto-detect from carrier (lossless) or default to PNG. + data: Data to embed (encrypted payload) + image_data: Carrier image bytes + pixel_key: Key for pixel/coefficient selection + bits_per_channel: Bits per channel (LSB mode only) + output_format: Force output format (LSB mode only) + embed_mode: 'lsb' (default) or 'dct' + dct_output_format: For DCT mode - 'png' (lossless) or 'jpeg' (smaller) Returns: - Tuple of (image bytes, EmbedStats, file extension) + Tuple of (stego image bytes, stats, file extension) Raises: - CapacityError: If carrier is too small + CapacityError: If data won't fit EmbeddingError: If embedding fails - - Example: - >>> stego_bytes, stats, ext = embed_in_image(carrier, encrypted, key) - >>> stats.pixels_modified - 1500 + ImportError: If DCT mode requested but scipy unavailable """ - debug.print(f"Embedding {len(encrypted_data)} bytes into image") + debug.print(f"embed_in_image: mode={embed_mode}, data={len(data)} bytes") + debug.validate(embed_mode in VALID_EMBED_MODES, + f"Invalid embed_mode: {embed_mode}. Use 'lsb' or 'dct'") + + # DCT MODE + if embed_mode == EMBED_MODE_DCT: + if not has_dct_support(): + raise ImportError( + "scipy is required for DCT embedding mode. " + "Install with: pip install scipy" + ) + + # Validate DCT output format + if dct_output_format not in (DCT_OUTPUT_PNG, DCT_OUTPUT_JPEG): + debug.print(f"Invalid dct_output_format '{dct_output_format}', defaulting to PNG") + dct_output_format = DCT_OUTPUT_PNG + + dct_mod = _get_dct_module() + + # Pass output_format to DCT module (v3.0.1) + stego_bytes, dct_stats = dct_mod.embed_in_dct( + data, + image_data, + pixel_key, + output_format=dct_output_format, + ) + + # Determine extension based on output format + if dct_output_format == DCT_OUTPUT_JPEG: + ext = 'jpg' + else: + ext = 'png' + + debug.print(f"DCT embedding complete: {dct_output_format.upper()} output, ext={ext}") + return stego_bytes, dct_stats, ext + + # LSB MODE + return _embed_lsb(data, image_data, pixel_key, bits_per_channel, output_format) + + +def _embed_lsb( + data: bytes, + image_data: bytes, + pixel_key: bytes, + bits_per_channel: int = 1, + output_format: Optional[str] = None, +) -> Tuple[bytes, EmbedStats, str]: + """ + Embed data using LSB steganography (internal implementation). + """ + debug.print(f"LSB embedding {len(data)} bytes into image") debug.data(pixel_key, "Pixel key for embedding") debug.validate(bits_per_channel in (1, 2), f"bits_per_channel must be 1 or 2, got {bits_per_channel}") @@ -307,13 +556,12 @@ def embed_in_image( f"Pixel key must be 32 bytes, got {len(pixel_key)}") try: - img_file = Image.open(io.BytesIO(carrier_data)) + img_file = Image.open(io.BytesIO(image_data)) input_format = img_file.format debug.print(f"Carrier image: {img_file.size[0]}x{img_file.size[1]}, format: {input_format}") - # Convert to RGB - this returns Image.Image, not ImageFile - img: Image.Image = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file.copy() + img = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file.copy() if img_file.mode != 'RGB': debug.print(f"Converting image from {img_file.mode} to RGB") @@ -325,8 +573,7 @@ def embed_in_image( debug.print(f"Image capacity: {max_bytes} bytes at {bits_per_channel} bit(s)/channel") - # Prepend length - data_with_len = struct.pack('>I', len(encrypted_data)) + encrypted_data + data_with_len = struct.pack('>I', len(data)) + data if len(data_with_len) > max_bytes: debug.print(f"Capacity error: need {len(data_with_len)}, have {max_bytes}") @@ -335,16 +582,13 @@ def embed_in_image( debug.print(f"Total data to embed: {len(data_with_len)} bytes " f"({len(data_with_len)/max_bytes*100:.1f}% of capacity)") - # Convert to binary string binary_data = ''.join(format(b, '08b') for b in data_with_len) pixels_needed = (len(binary_data) + bits_per_pixel - 1) // bits_per_pixel debug.print(f"Need {pixels_needed} pixels to embed {len(binary_data)} bits") - # Get pixel indices selected_indices = generate_pixel_indices(pixel_key, num_pixels, pixels_needed) - # Embed data new_pixels = list(pixels) clear_mask = 0xFF ^ ((1 << bits_per_channel) - 1) @@ -381,11 +625,9 @@ def embed_in_image( debug.print(f"Modified {modified_pixels} pixels (out of {len(selected_indices)} selected)") - # Create output image (fresh image = no metadata/EXIF carried over) stego_img = Image.new('RGB', img.size) stego_img.putdata(new_pixels) - # Determine output format if output_format: out_fmt = output_format.upper() out_ext = FORMAT_TO_EXT.get(out_fmt, 'png') @@ -405,42 +647,88 @@ def embed_in_image( bytes_embedded=len(data_with_len) ) - debug.print(f"Embedding complete: {out_fmt} image, {len(output.getvalue())} bytes") + debug.print(f"LSB embedding complete: {out_fmt} image, {len(output.getvalue())} bytes") return output.getvalue(), stats, out_ext except CapacityError: raise except Exception as e: - debug.exception(e, "embed_in_image") + debug.exception(e, "embed_lsb") raise EmbeddingError(f"Failed to embed data: {e}") from e +# ============================================================================= +# EXTRACTION FUNCTIONS +# ============================================================================= + @debug.time def extract_from_image( image_data: bytes, pixel_key: bytes, - bits_per_channel: int = 1 + bits_per_channel: int = 1, + embed_mode: str = EMBED_MODE_AUTO, ) -> Optional[bytes]: """ Extract hidden data from a stego image. Args: image_data: Stego image bytes - pixel_key: Key for pixel selection (must match encoding) - bits_per_channel: Bits per channel (must match encoding) + pixel_key: Key for pixel/coefficient selection (must match encoding) + bits_per_channel: Bits per channel (LSB mode only) + embed_mode: 'auto' (try both), 'lsb', or 'dct' Returns: Extracted data bytes, or None if extraction fails - - Raises: - ExtractionError: If extraction fails critically - - Example: - >>> extracted = extract_from_image(stego_bytes, key) - >>> len(extracted) - 1024 """ - debug.print(f"Extracting from {len(image_data)} byte image") + debug.print(f"extract_from_image: mode={embed_mode}") + + # AUTO MODE: Try LSB first, then DCT + if embed_mode == EMBED_MODE_AUTO: + result = _extract_lsb(image_data, pixel_key, bits_per_channel) + if result is not None: + debug.print("Auto-detect: LSB extraction succeeded") + return result + + if has_dct_support(): + debug.print("Auto-detect: LSB failed, trying DCT") + result = _extract_dct(image_data, pixel_key) + if result is not None: + debug.print("Auto-detect: DCT extraction succeeded") + return result + + debug.print("Auto-detect: All modes failed") + return None + + # EXPLICIT DCT MODE + elif embed_mode == EMBED_MODE_DCT: + if not has_dct_support(): + raise ImportError("scipy required for DCT mode") + return _extract_dct(image_data, pixel_key) + + # EXPLICIT LSB MODE + else: + return _extract_lsb(image_data, pixel_key, bits_per_channel) + + +def _extract_dct(image_data: bytes, pixel_key: bytes) -> Optional[bytes]: + """Extract using DCT mode.""" + try: + dct_mod = _get_dct_module() + return dct_mod.extract_from_dct(image_data, pixel_key) + except Exception as e: + debug.print(f"DCT extraction failed: {e}") + return None + + +def _extract_lsb( + image_data: bytes, + pixel_key: bytes, + bits_per_channel: int = 1 +) -> Optional[bytes]: + """ + Extract using LSB mode (internal implementation). + """ + debug.print(f"LSB extracting from {len(image_data)} byte image") debug.data(pixel_key, "Pixel key for extraction") debug.validate(bits_per_channel in (1, 2), f"bits_per_channel must be 1 or 2, got {bits_per_channel}") @@ -449,8 +737,7 @@ def extract_from_image( img_file = Image.open(io.BytesIO(image_data)) debug.print(f"Image: {img_file.size[0]}x{img_file.size[1]}, format: {img_file.format}") - # Convert to RGB - img: Image.Image = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file.copy() + img = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file.copy() if img_file.mode != 'RGB': debug.print(f"Converting image from {img_file.mode} to RGB") @@ -460,7 +747,6 @@ def extract_from_image( debug.print(f"Image has {num_pixels} pixels, {bits_per_pixel} bits/pixel") - # First, extract enough to get the length (4 bytes = 32 bits) initial_pixels = (32 + bits_per_pixel - 1) // bits_per_pixel + 10 debug.print(f"Extracting initial {initial_pixels} pixels to find length") @@ -473,7 +759,6 @@ def extract_from_image( for bit_pos in range(bits_per_channel - 1, -1, -1): binary_data += str((channel >> bit_pos) & 1) - # Parse length try: length_bits = binary_data[:32] if len(length_bits) < 32: @@ -486,13 +771,11 @@ def extract_from_image( debug.print(f"Failed to parse length: {e}") return None - # Sanity check max_possible = (num_pixels * bits_per_pixel) // 8 - 4 if data_length > max_possible or data_length < 10: debug.print(f"Invalid data length: {data_length} (max possible: {max_possible})") return None - # Extract full data total_bits = (4 + data_length) * 8 pixels_needed = (total_bits + bits_per_pixel - 1) // bits_per_pixel @@ -519,63 +802,21 @@ def extract_from_image( if len(byte_bits) == 8: data_bytes.append(int(byte_bits, 2)) - debug.print(f"Successfully extracted {len(data_bytes)} bytes") + debug.print(f"LSB successfully extracted {len(data_bytes)} bytes") return bytes(data_bytes) except Exception as e: - debug.exception(e, "extract_from_image") - raise ExtractionError(f"Failed to extract data: {e}") from e + debug.exception(e, "extract_lsb") + return None -def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int: - """ - Calculate the maximum message capacity of an image. - - Args: - image_data: Image bytes - bits_per_channel: Bits to use per color channel - - Returns: - Maximum bytes that can be embedded (minus overhead) - - Example: - >>> capacity = calculate_capacity(image_bytes) - >>> capacity - 12000 - """ - debug.validate(bits_per_channel in (1, 2), - f"bits_per_channel must be 1 or 2, got {bits_per_channel}") - - img_file = Image.open(io.BytesIO(image_data)) - img: Image.Image = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file - - num_pixels = img.size[0] * img.size[1] - bits_per_pixel = 3 * bits_per_channel - max_bytes = (num_pixels * bits_per_pixel) // 8 - - # Subtract overhead: 4 bytes length + ~100 bytes header - capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD) - debug.print(f"Image capacity: {capacity} bytes at {bits_per_channel} bit(s)/channel") - return capacity - +# ============================================================================= +# UTILITY FUNCTIONS +# ============================================================================= def get_image_dimensions(image_data: bytes) -> Tuple[int, int]: - """ - Get image dimensions without loading full image. - - Args: - image_data: Image bytes - - Returns: - Tuple of (width, height) - - Example: - >>> width, height = get_image_dimensions(image_bytes) - >>> width, height - (800, 600) - """ + """Get image dimensions without loading full image.""" debug.validate(len(image_data) > 0, "Image data cannot be empty") - img = Image.open(io.BytesIO(image_data)) dimensions = img.size debug.print(f"Image dimensions: {dimensions[0]}x{dimensions[1]}") @@ -583,20 +824,7 @@ def get_image_dimensions(image_data: bytes) -> Tuple[int, int]: def get_image_format(image_data: bytes) -> Optional[str]: - """ - Get image format (PIL format string like 'PNG', 'JPEG'). - - Args: - image_data: Image bytes - - Returns: - Format string or None if invalid - - Example: - >>> format = get_image_format(image_bytes) - >>> format - 'PNG' - """ + """Get image format (PIL format string like 'PNG', 'JPEG').""" try: img = Image.open(io.BytesIO(image_data)) format_str = img.format @@ -608,19 +836,7 @@ def get_image_format(image_data: bytes) -> Optional[str]: def is_lossless_format(image_data: bytes) -> bool: - """ - Check if image is in a lossless format suitable for steganography. - - Args: - image_data: Image bytes - - Returns: - True if format is lossless (PNG, BMP, TIFF) - - Example: - >>> is_lossless_format(image_bytes) - True - """ + """Check if image is in a lossless format suitable for steganography.""" fmt = get_image_format(image_data) is_lossless = fmt is not None and fmt.upper() in LOSSLESS_FORMATS debug.print(f"Image is lossless: {is_lossless} (format: {fmt})")