From 70b941d55a45ac19e46dbf6a083adfc8170c093e Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Wed, 1 Apr 2026 18:56:36 -0400 Subject: [PATCH] Pre-consolidation snapshot: backends, steganalysis, platform presets, and WIP changes Snapshot of all uncommitted work before merging stegasoo into soosef monorepo. Includes: pluggable backends registry, steganalysis detection, platform presets, and various in-progress modifications across core modules. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/stegasoo/__init__.py | 19 ++ src/stegasoo/backends/__init__.py | 31 ++++ src/stegasoo/backends/dct.py | 69 ++++++++ src/stegasoo/backends/lsb.py | 63 +++++++ src/stegasoo/backends/protocol.py | 91 ++++++++++ src/stegasoo/backends/registry.py | 63 +++++++ src/stegasoo/cli.py | 149 +++++++++++++--- src/stegasoo/constants.py | 20 ++- src/stegasoo/crypto.py | 157 +++++++++++++---- src/stegasoo/dct_steganography.py | 158 ++++++++++------- src/stegasoo/decode.py | 10 ++ src/stegasoo/encode.py | 14 ++ src/stegasoo/platform_presets.py | 169 ++++++++++++++++++ src/stegasoo/steganalysis.py | 281 ++++++++++++++++++++++++++++++ src/stegasoo/steganography.py | 118 +++++++------ 15 files changed, 1241 insertions(+), 171 deletions(-) create mode 100644 src/stegasoo/backends/__init__.py create mode 100644 src/stegasoo/backends/dct.py create mode 100644 src/stegasoo/backends/lsb.py create mode 100644 src/stegasoo/backends/protocol.py create mode 100644 src/stegasoo/backends/registry.py create mode 100644 src/stegasoo/platform_presets.py create mode 100644 src/stegasoo/steganalysis.py diff --git a/src/stegasoo/__init__.py b/src/stegasoo/__init__.py index 5432ec7..aaa13b7 100644 --- a/src/stegasoo/__init__.py +++ b/src/stegasoo/__init__.py @@ -46,6 +46,16 @@ from .image_utils import ( get_image_info, ) +# Backend registry +from .backends import EmbeddingBackend, registry as backend_registry + +# Platform presets +from .platform_presets import PLATFORMS, get_preset + +# Steganalysis +from .steganalysis import check_image +from .backends.registry import BackendNotFoundError + # Steganography functions from .steganography import ( calculate_capacity_by_mode, @@ -273,6 +283,15 @@ __all__ = [ "generate_filename", # Crypto "has_argon2", + # Backends + "EmbeddingBackend", + "backend_registry", + "BackendNotFoundError", + # Platform presets + "get_preset", + "PLATFORMS", + # Steganalysis + "check_image", # Steganography "has_dct_support", "calculate_capacity_by_mode", diff --git a/src/stegasoo/backends/__init__.py b/src/stegasoo/backends/__init__.py new file mode 100644 index 0000000..85a3b72 --- /dev/null +++ b/src/stegasoo/backends/__init__.py @@ -0,0 +1,31 @@ +""" +Stegasoo embedding backends. + +Provides a typed plugin interface for all embedding algorithms. +Backends register with the module-level ``registry`` on import. + +Usage:: + + from stegasoo.backends import registry + + backend = registry.get("lsb") + stego, stats = backend.embed(data, carrier, key) +""" + +from .dct import DCTBackend +from .lsb import LSBBackend +from .protocol import EmbeddingBackend +from .registry import BackendNotFoundError, BackendRegistry, registry + +# Auto-register built-in backends +registry.register(LSBBackend()) +registry.register(DCTBackend()) + +__all__ = [ + "EmbeddingBackend", + "BackendRegistry", + "BackendNotFoundError", + "registry", + "LSBBackend", + "DCTBackend", +] diff --git a/src/stegasoo/backends/dct.py b/src/stegasoo/backends/dct.py new file mode 100644 index 0000000..a36477b --- /dev/null +++ b/src/stegasoo/backends/dct.py @@ -0,0 +1,69 @@ +""" +DCT (Discrete Cosine Transform) image embedding backend. + +Wraps the existing frequency-domain DCT functions in dct_steganography.py. +""" + +from __future__ import annotations + +from typing import Any + + +class DCTBackend: + """Frequency-domain DCT embedding for JPEG-resilient steganography.""" + + @property + def mode(self) -> str: + return "dct" + + @property + def carrier_type(self) -> str: + return "image" + + def is_available(self) -> bool: + from ..dct_steganography import HAS_SCIPY + + return HAS_SCIPY + + def embed( + self, + data: bytes, + carrier: bytes, + key: bytes, + *, + progress_file: str | None = None, + **options: Any, + ) -> tuple[bytes, Any]: + from ..dct_steganography import embed_in_dct + + output_format = options.get("dct_output_format", "png") + color_mode = options.get("dct_color_mode", "color") + quant_step = options.get("quant_step") + jpeg_quality = options.get("jpeg_quality") + max_dimension = options.get("max_dimension") + return embed_in_dct( + data, carrier, key, output_format, color_mode, progress_file, + quant_step=quant_step, jpeg_quality=jpeg_quality, max_dimension=max_dimension, + ) + + def extract( + self, + carrier: bytes, + key: bytes, + *, + progress_file: str | None = None, + **options: Any, + ) -> bytes | None: + from ..dct_steganography import extract_from_dct + + quant_step = options.get("quant_step") + try: + return extract_from_dct(carrier, key, progress_file, quant_step=quant_step) + except Exception: + return None + + def calculate_capacity(self, carrier: bytes, **options: Any) -> int: + from ..dct_steganography import calculate_dct_capacity + + info = calculate_dct_capacity(carrier) + return info.usable_capacity_bytes diff --git a/src/stegasoo/backends/lsb.py b/src/stegasoo/backends/lsb.py new file mode 100644 index 0000000..c385c54 --- /dev/null +++ b/src/stegasoo/backends/lsb.py @@ -0,0 +1,63 @@ +""" +LSB (Least Significant Bit) image embedding backend. + +Wraps the existing spatial-domain LSB functions in steganography.py. +""" + +from __future__ import annotations + +from typing import Any + + +class LSBBackend: + """Spatial-domain LSB embedding for lossless image formats.""" + + @property + def mode(self) -> str: + return "lsb" + + @property + def carrier_type(self) -> str: + return "image" + + def is_available(self) -> bool: + return True # Only needs Pillow, which is always present + + def embed( + self, + data: bytes, + carrier: bytes, + key: bytes, + *, + progress_file: str | None = None, + **options: Any, + ) -> tuple[bytes, Any]: + from ..steganography import _embed_lsb + + bits_per_channel = options.get("bits_per_channel", 1) + output_format = options.get("output_format", None) + stego_bytes, stats, ext = _embed_lsb( + data, carrier, key, bits_per_channel, output_format, progress_file + ) + # Attach output extension to stats for callers that need it + stats.output_extension = ext # type: ignore[attr-defined] + return stego_bytes, stats + + def extract( + self, + carrier: bytes, + key: bytes, + *, + progress_file: str | None = None, + **options: Any, + ) -> bytes | None: + from ..steganography import _extract_lsb + + bits_per_channel = options.get("bits_per_channel", 1) + return _extract_lsb(carrier, key, bits_per_channel) + + def calculate_capacity(self, carrier: bytes, **options: Any) -> int: + from ..steganography import calculate_capacity + + bits_per_channel = options.get("bits_per_channel", 1) + return calculate_capacity(carrier, bits_per_channel) diff --git a/src/stegasoo/backends/protocol.py b/src/stegasoo/backends/protocol.py new file mode 100644 index 0000000..21cf60d --- /dev/null +++ b/src/stegasoo/backends/protocol.py @@ -0,0 +1,91 @@ +""" +Embedding backend protocol definition. + +All embedding backends (LSB, DCT, audio, video, etc.) implement this protocol, +enabling registry-based dispatch instead of if/elif chains. +""" + +from __future__ import annotations + +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class EmbeddingBackend(Protocol): + """Protocol that all embedding backends must satisfy. + + Each backend handles a specific embedding mode (e.g. 'lsb', 'dct', + 'audio_lsb', 'audio_spread') for a specific carrier type ('image', + 'audio', 'video'). + """ + + @property + def mode(self) -> str: + """The embedding mode identifier (e.g. 'lsb', 'dct').""" + ... + + @property + def carrier_type(self) -> str: + """The carrier media type: 'image', 'audio', or 'video'.""" + ... + + def is_available(self) -> bool: + """Whether this backend's dependencies are installed.""" + ... + + def embed( + self, + data: bytes, + carrier: bytes, + key: bytes, + *, + progress_file: str | None = None, + **options: Any, + ) -> tuple[bytes, Any]: + """Embed data into a carrier. + + Args: + data: Encrypted payload bytes. + carrier: Raw carrier file bytes (image, audio, etc.). + key: Derived key for pixel/sample selection. + progress_file: Optional progress file path. + **options: Backend-specific options (bits_per_channel, + output_format, color_mode, chip_tier, etc.). + + Returns: + Tuple of (stego carrier bytes, embed stats). + """ + ... + + def extract( + self, + carrier: bytes, + key: bytes, + *, + progress_file: str | None = None, + **options: Any, + ) -> bytes | None: + """Extract data from a carrier. + + Args: + carrier: Stego carrier file bytes. + key: Derived key for pixel/sample selection. + progress_file: Optional progress file path. + **options: Backend-specific options. + + Returns: + Extracted payload bytes, or None if no payload found. + """ + ... + + def calculate_capacity(self, carrier: bytes, **options: Any) -> int: + """Calculate maximum embeddable payload size in bytes. + + Args: + carrier: Raw carrier file bytes. + **options: Backend-specific options (e.g. bits_per_channel). + + Returns: + Maximum payload capacity in bytes. + """ + ... diff --git a/src/stegasoo/backends/registry.py b/src/stegasoo/backends/registry.py new file mode 100644 index 0000000..b2eec43 --- /dev/null +++ b/src/stegasoo/backends/registry.py @@ -0,0 +1,63 @@ +""" +Backend registry for embedding mode dispatch. + +Backends register themselves by mode string. The registry replaces +if/elif dispatch in steganography.py with a lookup table. +""" + +from __future__ import annotations + +from ..exceptions import StegasooError +from .protocol import EmbeddingBackend + + +class BackendNotFoundError(StegasooError): + """Raised when a requested backend mode is not registered.""" + + +class BackendRegistry: + """Registry mapping mode strings to embedding backends.""" + + def __init__(self) -> None: + self._backends: dict[str, EmbeddingBackend] = {} + + def register(self, backend: EmbeddingBackend) -> None: + """Register a backend for its mode string.""" + self._backends[backend.mode] = backend + + def get(self, mode: str) -> EmbeddingBackend: + """Look up a backend by mode. Raises BackendNotFoundError if not found.""" + if mode not in self._backends: + available = ", ".join(sorted(self._backends.keys())) or "(none)" + raise BackendNotFoundError( + f"No backend registered for mode '{mode}'. Available: {available}" + ) + return self._backends[mode] + + def has(self, mode: str) -> bool: + """Check if a backend is registered for the given mode.""" + return mode in self._backends + + def available_modes(self, carrier_type: str | None = None) -> list[str]: + """List registered mode strings, optionally filtered by carrier type. + + Only includes modes whose backend reports is_available() == True. + """ + return sorted( + mode + for mode, backend in self._backends.items() + if backend.is_available() + and (carrier_type is None or backend.carrier_type == carrier_type) + ) + + def all_modes(self, carrier_type: str | None = None) -> list[str]: + """List all registered mode strings (including unavailable ones).""" + return sorted( + mode + for mode, backend in self._backends.items() + if carrier_type is None or backend.carrier_type == carrier_type + ) + + +# Module-level singleton +registry = BackendRegistry() diff --git a/src/stegasoo/cli.py b/src/stegasoo/cli.py index 5dcec66..00fa7d8 100644 --- a/src/stegasoo/cli.py +++ b/src/stegasoo/cli.py @@ -184,8 +184,14 @@ def cli(ctx, json_output, debug_mode): ) @click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code") @click.option("--dry-run", is_flag=True, help="Show capacity usage without encoding") +@click.option( + "--platform", + type=click.Choice(["telegram", "discord", "signal", "whatsapp"], case_sensitive=False), + help="DCT preset for social media platform (implies DCT+JPEG mode)", +) +@click.option("--verify/--no-verify", default=True, help="Pre-verify payload survives platform recompression") @click.pass_context -def encode(ctx, carrier, reference, message, file_payload, output, passphrase, pin, dry_run): +def encode(ctx, carrier, reference, message, file_payload, output, passphrase, pin, dry_run, platform, verify): """ Encode a message or file into an image. @@ -260,29 +266,48 @@ def encode(ctx, carrier, reference, message, file_payload, output, passphrase, p from .steganography import EMBED_MODE_DCT, EMBED_MODE_LSB + # Platform preset overrides + preset = None + if platform: + from .platform_presets import get_preset + + preset = get_preset(platform) + use_dct = True # Platform mode implies DCT+JPEG + if output_ext not in (".jpg", ".jpeg"): + output = str(Path(output).with_suffix(".jpg")) + click.echo(f" Platform mode: output changed to {output}") + try: + encode_kwargs = { + "reference_photo": reference_data, + "carrier_image": carrier_data, + "passphrase": passphrase, + "pin": pin, + "embed_mode": EMBED_MODE_DCT if use_dct else EMBED_MODE_LSB, + "dct_output_format": "jpeg" if use_dct else "png", + } + + if preset: + encode_kwargs["platform"] = platform + if file_payload: - # Encode file - result = stegasoo_encode_file( - filepath=file_payload, - reference_photo=reference_data, - carrier_image=carrier_data, - passphrase=passphrase, - pin=pin, - embed_mode=EMBED_MODE_DCT if use_dct else EMBED_MODE_LSB, - dct_output_format="jpeg" if use_dct else "png", - ) + result = stegasoo_encode_file(filepath=file_payload, **encode_kwargs) else: - # Encode message - result = stegasoo_encode( - message=message, - reference_photo=reference_data, - carrier_image=carrier_data, - passphrase=passphrase, - pin=pin, - embed_mode=EMBED_MODE_DCT if use_dct else EMBED_MODE_LSB, - dct_output_format="jpeg" if use_dct else "png", - ) + result = stegasoo_encode(message=message, **encode_kwargs) + + # Pre-verify survival if platform mode + if preset and verify: + from .crypto import derive_pixel_key + from .platform_presets import pre_verify_survival + + pixel_key = derive_pixel_key(reference_data, passphrase, pin) + survived = pre_verify_survival(result.stego_image, pixel_key, preset) + if not survived: + click.echo( + f" ⚠ Warning: Payload may not survive {preset.name} recompression. " + "Try a larger carrier image or shorter message.", + err=True, + ) # Write output with open(output, "wb") as f: @@ -325,8 +350,13 @@ def encode(ctx, carrier, reference, message, file_payload, output, passphrase, p @click.option("--passphrase", prompt=True, hide_input=True, help="Passphrase") @click.option("--pin", prompt=True, hide_input=True, help="PIN code") @click.option("-o", "--output", type=click.Path(), help="Output path for file payloads") +@click.option( + "--platform", + type=click.Choice(["telegram", "discord", "signal", "whatsapp"], case_sensitive=False), + help="Platform preset (must match encoding platform)", +) @click.pass_context -def decode(ctx, image, reference, passphrase, pin, output): +def decode(ctx, image, reference, passphrase, pin, output, platform): """ Decode a message or file from an image. @@ -334,7 +364,7 @@ def decode(ctx, image, reference, passphrase, pin, output): stegasoo decode encoded.png -r ref.jpg --passphrase --pin - stegasoo decode encoded.png -r ref.jpg -o ./extracted/ + stegasoo decode encoded.png -r ref.jpg --platform telegram """ from .decode import decode as stegasoo_decode @@ -344,12 +374,21 @@ def decode(ctx, image, reference, passphrase, pin, output): with open(reference, "rb") as f: reference_data = f.read() + # Resolve platform preset for DCT decoding + decode_kwargs = {} + if platform: + from .platform_presets import get_preset + + preset = get_preset(platform) + decode_kwargs["platform"] = platform + try: result = stegasoo_decode( stego_image=stego_data, reference_photo=reference_data, passphrase=passphrase, pin=pin, + **decode_kwargs, ) if result.is_file: @@ -1550,9 +1589,9 @@ def info(ctx, full): # Check for DCT support try: - from .dct_steganography import HAS_JPEGIO, HAS_SCIPY + from .dct_steganography import HAS_JPEGLIB, HAS_SCIPY - has_dct = HAS_SCIPY and HAS_JPEGIO + has_dct = HAS_SCIPY and HAS_JPEGLIB except ImportError: has_dct = False @@ -2402,6 +2441,66 @@ def tools_convert(image, fmt, quality, output): click.echo(f"Converted to: {output}") +# ============================================================================= +# STEGANALYSIS COMMANDS +# ============================================================================= + + +@cli.command() +@click.argument("image", type=click.Path(exists=True)) +@click.option("--json", "as_json", is_flag=True, help="Output as JSON") +@click.option( + "--mode", + type=click.Choice(["lsb", "auto"]), + default="lsb", + help="Analysis mode (default: lsb)", +) +def check(image, as_json, mode): + """Analyze an image for steganographic detectability. + + Runs chi-square and RS (Regular-Singular) statistical tests to estimate + how detectable any hidden data might be. Outputs a risk level. + + Examples: + + stegasoo check carrier.png + + stegasoo check stego.png --json + + stegasoo check suspicious.bmp --mode lsb + """ + from .steganalysis import check_image + + with open(image, "rb") as f: + image_data = f.read() + + result = check_image(image_data, mode=mode) + result["filename"] = Path(image).name + + if as_json: + click.echo(json.dumps(result, indent=2)) + else: + risk = result["risk"] + risk_colors = {"low": "green", "medium": "yellow", "high": "red"} + risk_display = click.style(risk.upper(), fg=risk_colors.get(risk, "white"), bold=True) + + click.echo(f"\n Steganalysis: {result['filename']}") + click.echo(f" Image: {result['width']}x{result['height']}, {result['channels']} channels") + click.echo(f" Detectability risk: {risk_display}") + + click.echo("\n Chi-square (p-values):") + for ch, p in result["chi_square"].items(): + indicator = "!" if p < 0.05 else " " + click.echo(f" {indicator} {ch}: {p:.6f}") + + click.echo("\n RS embedding estimate:") + for ch, est in result["rs"].items(): + indicator = "!" if est > 0.1 else " " + click.echo(f" {indicator} {ch}: {est:.4f} ({est * 100:.1f}%)") + + click.echo() + + # ============================================================================= # ADMIN COMMANDS (Web UI administration) # ============================================================================= diff --git a/src/stegasoo/constants.py b/src/stegasoo/constants.py index 47fbd46..938aff3 100644 --- a/src/stegasoo/constants.py +++ b/src/stegasoo/constants.py @@ -44,7 +44,9 @@ MAGIC_HEADER = b"\x89ST3" # Version 1-3: Date-dependent encryption (v3.0.x - v3.1.x) # Version 4: Date-independent encryption (v3.2.0) # Version 5: Channel key support (v4.0.0) - adds flags byte to header -FORMAT_VERSION = 5 +# Version 6: HKDF per-message key derivation (v4.4.0) - adds message nonce to header +FORMAT_VERSION = 6 +FORMAT_VERSION_LEGACY = 5 # For backward-compatible decryption # Payload type markers PAYLOAD_TEXT = 0x01 @@ -66,6 +68,11 @@ ARGON2_PARALLELISM = 4 # PBKDF2 fallback parameters PBKDF2_ITERATIONS = 600000 +# HKDF per-message key derivation (v4.4.0 / FORMAT_VERSION 6) +MESSAGE_NONCE_SIZE = 16 # 128-bit random nonce per message +HKDF_INFO_ENCRYPT = b"stegasoo-v6-encrypt" # HKDF info for encryption key +HKDF_INFO_PIXEL = b"stegasoo-v6-pixel" # HKDF info for pixel selection key (reserved) + # ============================================================================ # INPUT LIMITS # ============================================================================ @@ -244,6 +251,17 @@ def get_wordlist() -> list[str]: return _bip39_words +# ============================================================================= +# STEGANALYSIS (v4.4.0) +# ============================================================================= + +# Chi-square p-value threshold: HIGH p-value = equalized PoV pairs = suspicious +STEGANALYSIS_CHI_SUSPICIOUS_THRESHOLD = 0.95 # p > 0.95 → pairs suspiciously equalized + +# RS embedding rate thresholds (primary metric): higher = more likely embedded +STEGANALYSIS_RS_HIGH_THRESHOLD = 0.3 # > 30% estimated embedding → high risk +STEGANALYSIS_RS_MEDIUM_THRESHOLD = 0.1 # > 10% estimated embedding → medium risk + # ============================================================================= # DCT STEGANOGRAPHY (v3.0+) # ============================================================================= diff --git a/src/stegasoo/crypto.py b/src/stegasoo/crypto.py index 3536c2e..8f473a9 100644 --- a/src/stegasoo/crypto.py +++ b/src/stegasoo/crypto.py @@ -29,7 +29,9 @@ import secrets import struct from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes as _hashes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand from PIL import Image from .constants import ( @@ -37,9 +39,12 @@ from .constants import ( ARGON2_PARALLELISM, ARGON2_TIME_COST, FORMAT_VERSION, + FORMAT_VERSION_LEGACY, + HKDF_INFO_ENCRYPT, IV_SIZE, MAGIC_HEADER, MAX_FILENAME_LENGTH, + MESSAGE_NONCE_SIZE, PAYLOAD_FILE, PAYLOAD_TEXT, PBKDF2_ITERATIONS, @@ -63,6 +68,7 @@ except ImportError: from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + # ============================================================================= # CHANNEL KEY RESOLUTION # ============================================================================= @@ -314,6 +320,30 @@ def derive_pixel_key( return hashlib.sha256(material + b"pixel_selection").digest() +def derive_message_key(root_key: bytes, nonce: bytes) -> bytes: + """ + Derive a per-message encryption key via HKDF-Expand. + + Each message gets a unique encryption key even with identical credentials, + because the nonce is random per message. This provides key diversification: + compromising the ciphertext of one message doesn't help with another. + + Args: + root_key: 32-byte root key from Argon2id/PBKDF2 + nonce: 16-byte random nonce (unique per message) + + Returns: + 32-byte per-message encryption key + """ + hkdf = HKDFExpand( + algorithm=_hashes.SHA256(), + length=32, + info=HKDF_INFO_ENCRYPT + nonce, + backend=default_backend(), + ) + return hkdf.derive(root_key) + + def _pack_payload( content: str | bytes | FilePayload, ) -> tuple[bytes, int]: @@ -472,7 +502,12 @@ def encrypt_message( """ try: salt = secrets.token_bytes(SALT_SIZE) - key = derive_hybrid_key(photo_data, passphrase, salt, pin, rsa_key_data, channel_key) + root_key = derive_hybrid_key(photo_data, passphrase, salt, pin, rsa_key_data, channel_key) + + # v6: Per-message key via HKDF — each message gets a unique encryption key + message_nonce = secrets.token_bytes(MESSAGE_NONCE_SIZE) + key = derive_message_key(root_key, message_nonce) + iv = secrets.token_bytes(IV_SIZE) # Determine flags @@ -502,28 +537,36 @@ def encrypt_message( "Padded message: %d bytes (payload + %d padding)", len(padded_message), padding_needed ) - # Build header for AAD + # Build header for AAD (v6: includes nonce in authenticated data) header = MAGIC_HEADER + bytes([FORMAT_VERSION, flags]) # Encrypt with AES-256-GCM cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend()) encryptor = cipher.encryptor() - encryptor.authenticate_additional_data(header) + encryptor.authenticate_additional_data(header + message_nonce) ciphertext = encryptor.update(padded_message) + encryptor.finalize() - total_size = len(header) + len(salt) + len(iv) + len(encryptor.tag) + len(ciphertext) + total_size = ( + len(header) + + MESSAGE_NONCE_SIZE + + len(salt) + + len(iv) + + len(encryptor.tag) + + len(ciphertext) + ) logger.debug( - "Encrypted output: %d bytes (header=%d, salt=%d, iv=%d, tag=%d, ciphertext=%d)", + "Encrypted output: %d bytes (header=%d, nonce=%d, salt=%d, iv=%d, tag=%d, ct=%d)", total_size, len(header), + MESSAGE_NONCE_SIZE, len(salt), len(iv), len(encryptor.tag), len(ciphertext), ) - # v4.0.0: Header with flags byte - return header + salt + iv + encryptor.tag + ciphertext + # v6: [magic|version|flags|nonce|salt|iv|tag|ciphertext] + return header + message_nonce + salt + iv + encryptor.tag + ciphertext except Exception as e: logger.error("Encryption failed: %s", e) @@ -534,43 +577,78 @@ def parse_header(encrypted_data: bytes) -> dict | None: """ Parse the header from encrypted data. - v4.0.0: Includes flags byte for channel key indicator. + Supports both v5 (legacy) and v6 (HKDF) header formats. + + v5: [magic:4][ver:1][flags:1][salt:32][iv:12][tag:16][ciphertext] (66+ bytes) + v6: [magic:4][ver:1][flags:1][nonce:16][salt:32][iv:12][tag:16][ciphertext] (82+ bytes) Args: encrypted_data: Raw encrypted bytes Returns: - Dict with salt, iv, tag, ciphertext, flags or None if invalid + Dict with version, salt, iv, tag, ciphertext, flags, and optionally + message_nonce (v6). Returns None if invalid. """ - # Min size: Magic(4) + Version(1) + Flags(1) + Salt(32) + IV(12) + Tag(16) = 66 bytes + # Min v5 size: 4+1+1+32+12+16 = 66 bytes if len(encrypted_data) < 66 or encrypted_data[:4] != MAGIC_HEADER: return None try: version = encrypted_data[4] - if version != FORMAT_VERSION: + + if version == FORMAT_VERSION: + # v6: has message nonce + if len(encrypted_data) < 82: + return None + flags = encrypted_data[5] + offset = 6 + message_nonce = encrypted_data[offset : offset + MESSAGE_NONCE_SIZE] + offset += MESSAGE_NONCE_SIZE + salt = encrypted_data[offset : offset + SALT_SIZE] + offset += SALT_SIZE + iv = encrypted_data[offset : offset + IV_SIZE] + offset += IV_SIZE + tag = encrypted_data[offset : offset + TAG_SIZE] + offset += TAG_SIZE + ciphertext = encrypted_data[offset:] + + return { + "version": version, + "flags": flags, + "has_channel_key": bool(flags & FLAG_CHANNEL_KEY), + "message_nonce": message_nonce, + "salt": salt, + "iv": iv, + "tag": tag, + "ciphertext": ciphertext, + } + + elif version == FORMAT_VERSION_LEGACY: + # v5: no nonce + flags = encrypted_data[5] + offset = 6 + salt = encrypted_data[offset : offset + SALT_SIZE] + offset += SALT_SIZE + iv = encrypted_data[offset : offset + IV_SIZE] + offset += IV_SIZE + tag = encrypted_data[offset : offset + TAG_SIZE] + offset += TAG_SIZE + ciphertext = encrypted_data[offset:] + + return { + "version": version, + "flags": flags, + "has_channel_key": bool(flags & FLAG_CHANNEL_KEY), + "message_nonce": None, + "salt": salt, + "iv": iv, + "tag": tag, + "ciphertext": ciphertext, + } + + else: return None - flags = encrypted_data[5] - - offset = 6 - salt = encrypted_data[offset : offset + SALT_SIZE] - offset += SALT_SIZE - iv = encrypted_data[offset : offset + IV_SIZE] - offset += IV_SIZE - tag = encrypted_data[offset : offset + TAG_SIZE] - offset += TAG_SIZE - ciphertext = encrypted_data[offset:] - - return { - "version": version, - "flags": flags, - "has_channel_key": bool(flags & FLAG_CHANNEL_KEY), - "salt": salt, - "iv": iv, - "tag": tag, - "ciphertext": ciphertext, - } except Exception: return None @@ -622,12 +700,21 @@ def decrypt_message( message_has_key = header["has_channel_key"] try: - key = derive_hybrid_key( + root_key = derive_hybrid_key( photo_data, passphrase, header["salt"], pin, rsa_key_data, channel_key ) - # Reconstruct header for AAD verification - aad_header = MAGIC_HEADER + bytes([FORMAT_VERSION, header["flags"]]) + version = header["version"] + message_nonce = header["message_nonce"] + + if version == FORMAT_VERSION and message_nonce is not None: + # v6: Derive per-message key via HKDF + key = derive_message_key(root_key, message_nonce) + aad_header = MAGIC_HEADER + bytes([FORMAT_VERSION, header["flags"]]) + message_nonce + else: + # v5 (legacy): Root key used directly + key = root_key + aad_header = MAGIC_HEADER + bytes([FORMAT_VERSION_LEGACY, header["flags"]]) cipher = Cipher( algorithms.AES(key), modes.GCM(header["iv"], header["tag"]), backend=default_backend() @@ -647,7 +734,7 @@ def decrypt_message( payload_data = padded_plaintext[:original_length] result = _unpack_payload(payload_data) - logger.debug("Decryption successful: %s", result.payload_type) + logger.debug("Decryption successful: %s (v%d)", result.payload_type, version) return result except Exception as e: diff --git a/src/stegasoo/dct_steganography.py b/src/stegasoo/dct_steganography.py index 0b96459..afba8e8 100644 --- a/src/stegasoo/dct_steganography.py +++ b/src/stegasoo/dct_steganography.py @@ -12,7 +12,7 @@ Why is this cool? Two approaches depending on what you want: 1. PNG output: We do our own DCT math via scipy (works on any image) -2. JPEG output: We use jpeglib to directly tweak the coefficients (chef's kiss) +2. JPEG output: We use jpeglib to directly modify the coefficients (chef's kiss) v4.1.0 - The "please stop corrupting my data" release: - Reed-Solomon error correction (can fix up to 16 byte errors per chunk) @@ -56,13 +56,12 @@ except ImportError: idctn = None # Check for jpeglib availability (for proper JPEG mode) -# jpeglib replaces jpegio for Python 3.13+ compatibility try: import jpeglib - HAS_JPEGIO = True # Keep variable name for compatibility + HAS_JPEGLIB = True except ImportError: - HAS_JPEGIO = False + HAS_JPEGLIB = False jpeglib = None # Import custom exceptions @@ -170,20 +169,20 @@ QUANT_STEP = 25 # Magic bytes so we can identify our own images DCT_MAGIC = b"DCTS" # scipy DCT mode marker -JPEGIO_MAGIC = b"JPGS" # jpegio native JPEG mode marker +JPEGLIB_MAGIC = b"JPGS" # jpeglib native JPEG mode marker HEADER_SIZE = 10 # Magic (4) + version (1) + flags (1) + length (4) OUTPUT_FORMAT_PNG = "png" OUTPUT_FORMAT_JPEG = "jpeg" JPEG_OUTPUT_QUALITY = 95 # High quality but not 100 (100 causes issues, see below) -# For jpegio mode: we only embed in coefficients with magnitude >= 2 +# For jpeglib mode: we only embed in coefficients with magnitude >= 2 # Coefficients of 0 or 1 are usually quantized noise - unreliable -JPEGIO_MIN_COEF_MAGNITUDE = 2 +JPEGLIB_MIN_COEF_MAGNITUDE = 2 # We embed in the Y (luminance) channel only - it has the most capacity # Cb/Cr are often subsampled 4:2:0 anyway -JPEGIO_EMBED_CHANNEL = 0 +JPEGLIB_EMBED_CHANNEL = 0 # Header flags FLAG_COLOR_MODE = 0x01 # Set if we preserved color (YCbCr mode) @@ -204,10 +203,10 @@ RS_LENGTH_PREFIX_SIZE = RS_LENGTH_HEADER_SIZE * RS_LENGTH_COPIES # 24 bytes tot MAX_CHUNK_HEIGHT = 512 # Process in strips to keep memory sane # Fun bug: JPEGs saved with quality=100 have quantization tables full of 1s -# This makes the DCT coefficients HUGE and jpegio crashes spectacularly +# This makes the DCT coefficients HUGE and jpeglib crashes spectacularly # Solution: detect and re-save at quality 95 first -JPEGIO_NORMALIZE_QUALITY = 95 -JPEGIO_MAX_QUANT_VALUE_THRESHOLD = 1 # All 1s in quant table = bad news +JPEGLIB_NORMALIZE_QUALITY = 95 +JPEGLIB_MAX_QUANT_VALUE_THRESHOLD = 1 # All 1s in quant table = bad news # ============================================================================ @@ -261,8 +260,8 @@ def has_dct_support() -> bool: return HAS_SCIPY -def has_jpegio_support() -> bool: - return HAS_JPEGIO +def has_jpeglib_support() -> bool: + return HAS_JPEGLIB # ============================================================================ @@ -654,11 +653,11 @@ def _parse_header(header_bits: list) -> tuple[int, int, int]: # ============================================================================ -# JPEGIO HELPERS +# JPEGLIB HELPERS # ============================================================================ -def _jpegio_bytes_to_file(data: bytes, suffix: str = ".jpg") -> str: +def _jpeglib_bytes_to_file(data: bytes, suffix: str = ".jpg") -> str: import os import tempfile @@ -670,19 +669,19 @@ def _jpegio_bytes_to_file(data: bytes, suffix: str = ".jpg") -> str: return path -def _jpegio_get_usable_positions(coef_array: np.ndarray) -> list: +def _jpeglib_get_usable_positions(coef_array: np.ndarray) -> list: positions = [] h, w = coef_array.shape for row in range(h): for col in range(w): if (row % BLOCK_SIZE == 0) and (col % BLOCK_SIZE == 0): continue - if abs(coef_array[row, col]) >= JPEGIO_MIN_COEF_MAGNITUDE: + if abs(coef_array[row, col]) >= JPEGLIB_MIN_COEF_MAGNITUDE: positions.append((row, col)) return positions -def _jpegio_generate_order(num_positions: int, seed: bytes) -> list: +def _jpeglib_generate_order(num_positions: int, seed: bytes) -> list: hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest() rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], "big")) order = list(range(num_positions)) @@ -690,15 +689,15 @@ def _jpegio_generate_order(num_positions: int, seed: bytes) -> list: return order -def _jpegio_create_header(data_length: int, flags: int = 0) -> bytes: - return struct.pack(">4sBBI", JPEGIO_MAGIC, 1, flags, data_length) +def _jpeglib_create_header(data_length: int, flags: int = 0) -> bytes: + return struct.pack(">4sBBI", JPEGLIB_MAGIC, 1, flags, data_length) -def _jpegio_parse_header(header_bytes: bytes) -> tuple[int, int, int]: +def _jpeglib_parse_header(header_bytes: bytes) -> tuple[int, int, int]: if len(header_bytes) < HEADER_SIZE: raise ValueError("Insufficient header data") magic, version, flags, length = struct.unpack(">4sBBI", header_bytes[:HEADER_SIZE]) - if magic != JPEGIO_MAGIC: + if magic != JPEGLIB_MAGIC: raise InvalidMagicBytesError("Not a Stegasoo JPEG or wrong mode") return version, flags, length @@ -782,7 +781,7 @@ def estimate_capacity_comparison(image_data: bytes) -> dict: "available": HAS_SCIPY, }, "jpeg_native": { - "available": HAS_JPEGIO, + "available": HAS_JPEGLIB, "note": "Uses jpeglib for proper JPEG coefficient embedding", }, } @@ -795,24 +794,54 @@ def embed_in_dct( output_format: str = OUTPUT_FORMAT_PNG, color_mode: str = "color", progress_file: str | None = None, + quant_step: int | None = None, + jpeg_quality: int | None = None, + max_dimension: int | None = None, ) -> tuple[bytes, DCTEmbedStats]: - """Embed data using DCT coefficient modification.""" + """Embed data using DCT coefficient modification. + + Args: + data: Payload bytes to embed. + carrier_image: Carrier image bytes. + seed: Key for block selection. + output_format: 'png' or 'jpeg'. + color_mode: 'color' or 'grayscale'. + progress_file: Optional progress file. + quant_step: Override QIM quantization step (default: QUANT_STEP). + Higher = more robust to recompression, more visible. + jpeg_quality: Override JPEG output quality (default: JPEG_OUTPUT_QUALITY). + max_dimension: Resize carrier if larger than this. + """ if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG): raise ValueError(f"Invalid output format: {output_format}") if color_mode not in ("color", "grayscale"): color_mode = "color" + qs = quant_step if quant_step is not None else QUANT_STEP + # Apply EXIF orientation to carrier image before embedding - # This ensures portrait photos are embedded in their correct visual orientation carrier_image = _apply_exif_orientation(carrier_image) - if output_format == OUTPUT_FORMAT_JPEG and HAS_JPEGIO: - return _embed_jpegio(data, carrier_image, seed, color_mode, progress_file) + # Resize if max_dimension specified (for platform presets) + if max_dimension is not None: + img_check = Image.open(io.BytesIO(carrier_image)) + w, h = img_check.size + if max(w, h) > max_dimension: + scale = max_dimension / max(w, h) + new_size = (int(w * scale), int(h * scale)) + img_check = img_check.resize(new_size, Image.LANCZOS) + buf = io.BytesIO() + img_check.save(buf, format="PNG") + carrier_image = buf.getvalue() + img_check.close() + + if output_format == OUTPUT_FORMAT_JPEG and HAS_JPEGLIB: + return _embed_jpeglib(data, carrier_image, seed, color_mode, progress_file) _check_scipy() return _embed_scipy_dct_safe( - data, carrier_image, seed, output_format, color_mode, progress_file + data, carrier_image, seed, output_format, color_mode, progress_file, quant_step=qs ) @@ -823,6 +852,7 @@ def _embed_scipy_dct_safe( output_format: str, color_mode: str = "color", progress_file: str | None = None, + quant_step: int = QUANT_STEP, ) -> tuple[bytes, DCTEmbedStats]: """ Embed using scipy DCT with safe memory handling. @@ -885,7 +915,9 @@ def _embed_scipy_dct_safe( gc.collect() # Embed in Y channel - Y_embedded = _embed_in_channel_safe(Y_padded, bits, block_order, blocks_x, progress_file) + Y_embedded = _embed_in_channel_safe( + Y_padded, bits, block_order, blocks_x, progress_file, quant_step=quant_step + ) del Y_padded gc.collect() @@ -909,7 +941,9 @@ def _embed_scipy_dct_safe( del image gc.collect() - embedded = _embed_in_channel_safe(padded, bits, block_order, blocks_x, progress_file) + embedded = _embed_in_channel_safe( + padded, bits, block_order, blocks_x, progress_file, quant_step=quant_step + ) del padded gc.collect() @@ -943,6 +977,7 @@ def _embed_in_channel_safe( block_order: list, blocks_x: int, progress_file: str | None = None, + quant_step: int = QUANT_STEP, ) -> np.ndarray: """ Embed bits in channel using vectorized DCT operations. @@ -1005,17 +1040,17 @@ def _embed_in_channel_safe( coeffs = dct_blocks[i, embed_rows, embed_cols] bit_array = np.array(block_bits) # QIM embedding: round to grid, adjust for bit - quantized = np.round(coeffs / QUANT_STEP).astype(int) + quantized = np.round(coeffs / quant_step).astype(int) # If quantized % 2 != bit, nudge coefficient needs_adjust = (quantized % 2) != bit_array # Determine direction to nudge dct_blocks[i, embed_rows[needs_adjust], embed_cols[needs_adjust]] = ( (quantized[needs_adjust] + (1 - 2 * (quantized[needs_adjust] % 2 == 1))) - * QUANT_STEP + * quant_step ).astype(np.float64) # For bits that already match, just quantize dct_blocks[i, embed_rows[~needs_adjust], embed_cols[~needs_adjust]] = ( - quantized[~needs_adjust] * QUANT_STEP + quantized[~needs_adjust] * quant_step ).astype(np.float64) else: # Partial block - process remaining bits individually @@ -1052,12 +1087,12 @@ def _embed_in_channel_safe( return result -def _normalize_jpeg_for_jpegio(image_data: bytes) -> bytes: +def _normalize_jpeg_for_jpeglib(image_data: bytes) -> bytes: """ - Normalize a JPEG image to ensure jpegio can process it safely. + Normalize a JPEG image to ensure jpeglib can process it safely. JPEGs saved with quality=100 have quantization tables with all values = 1, - which causes jpegio to crash due to huge coefficient magnitudes. + which causes jpeglib to crash due to huge coefficient magnitudes. This function detects such images and re-saves them at a safe quality level. Args: @@ -1078,7 +1113,7 @@ def _normalize_jpeg_for_jpegio(image_data: bytes) -> bytes: if hasattr(img, "quantization") and img.quantization: for table_id, table in img.quantization.items(): # If all values in any table are <= threshold, normalize - if max(table) <= JPEGIO_MAX_QUANT_VALUE_THRESHOLD: + if max(table) <= JPEGLIB_MAX_QUANT_VALUE_THRESHOLD: needs_normalization = True break @@ -1091,25 +1126,25 @@ def _normalize_jpeg_for_jpegio(image_data: bytes) -> bytes: img = img.convert("RGB") buffer = io.BytesIO() - img.save(buffer, format="JPEG", quality=JPEGIO_NORMALIZE_QUALITY, subsampling=0) + img.save(buffer, format="JPEG", quality=JPEGLIB_NORMALIZE_QUALITY, subsampling=0) img.close() return buffer.getvalue() -def _embed_jpegio( +def _embed_jpeglib( data: bytes, carrier_image: bytes, seed: bytes, color_mode: str = "color", progress_file: str | None = None, ) -> tuple[bytes, DCTEmbedStats]: - """Embed using jpegio for proper JPEG coefficient modification.""" + """Embed using jpeglib for proper JPEG coefficient modification.""" import os import tempfile # Normalize JPEG to avoid crashes with quality=100 images - carrier_image = _normalize_jpeg_for_jpegio(carrier_image) + carrier_image = _normalize_jpeg_for_jpeglib(carrier_image) img = Image.open(io.BytesIO(carrier_image)) width, height = img.size @@ -1122,20 +1157,20 @@ def _embed_jpegio( carrier_image = buffer.getvalue() img.close() - input_path = _jpegio_bytes_to_file(carrier_image, suffix=".jpg") + input_path = _jpeglib_bytes_to_file(carrier_image, suffix=".jpg") output_path = tempfile.mktemp(suffix=".jpg") flags = FLAG_COLOR_MODE if color_mode == "color" else 0 try: jpeg = jpeglib.to_jpegio(jpeglib.read_dct(input_path)) - coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL] + coef_array = jpeg.coef_arrays[JPEGLIB_EMBED_CHANNEL] - all_positions = _jpegio_get_usable_positions(coef_array) - order = _jpegio_generate_order(len(all_positions), seed) + all_positions = _jpeglib_get_usable_positions(coef_array) + order = _jpeglib_generate_order(len(all_positions), seed) # Build raw payload (header + data) - header = _jpegio_create_header(len(data), flags) + header = _jpeglib_create_header(len(data), flags) raw_payload = header + data # Apply Reed-Solomon error correction to entire payload if available @@ -1402,6 +1437,7 @@ def extract_from_dct( stego_image: bytes, seed: bytes, progress_file: str | None = None, + quant_step: int | None = None, ) -> bytes: """ Extract data from DCT stego image. @@ -1412,6 +1448,7 @@ def extract_from_dct( Uses quick header validation to skip obviously invalid rotations. """ + qs = quant_step if quant_step is not None else QUANT_STEP rotations_to_try = [0, 90, 180, 270] last_error = None valid_rotations = [] @@ -1429,7 +1466,7 @@ def extract_from_dct( # If no rotations pass quick check, try all anyway (fallback) if not valid_rotations: # Must try all rotations - quick validation might have failed due to - # scipy vs jpegio differences or other edge cases + # scipy vs jpeglib differences or other edge cases for rotation in rotations_to_try: if rotation == 0: valid_rotations.append((0, stego_image)) @@ -1443,9 +1480,9 @@ def extract_from_dct( fmt = img.format img.close() - if fmt == "JPEG" and HAS_JPEGIO: + if fmt == "JPEG" and HAS_JPEGLIB: try: - result = _extract_jpegio(image_to_decode, seed, progress_file) + result = _extract_jpeglib(image_to_decode, seed, progress_file) if rotation != 0: try: from . import debug @@ -1459,7 +1496,7 @@ def extract_from_dct( continue _check_scipy() - result = _extract_scipy_dct_safe(image_to_decode, seed, progress_file) + result = _extract_scipy_dct_safe(image_to_decode, seed, progress_file, quant_step=qs) if rotation != 0: try: from . import debug @@ -1481,6 +1518,7 @@ def _extract_scipy_dct_safe( stego_image: bytes, seed: bytes, progress_file: str | None = None, + quant_step: int = QUANT_STEP, ) -> bytes: """Extract using safe DCT operations with vectorized processing.""" # Progress starts at 25% (decode.py writes 20% for Argon2, 25% before extraction) @@ -1542,7 +1580,7 @@ def _extract_scipy_dct_safe( coeffs = dct_blocks[:, embed_rows, embed_cols] # Quantize and extract bits (vectorized) - quantized = np.round(coeffs / QUANT_STEP).astype(int) + quantized = np.round(coeffs / quant_step).astype(int) bits = (quantized % 2).flatten().tolist() all_bits.extend(bits) @@ -1660,28 +1698,28 @@ def _extract_scipy_dct_safe( return data -def _extract_jpegio( +def _extract_jpeglib( stego_image: bytes, seed: bytes, progress_file: str | None = None, ) -> bytes: - """Extract using jpegio for JPEG images.""" + """Extract using jpeglib for JPEG images.""" import os # Progress starts at 25% (decode.py writes 20% for Argon2, 25% before extraction) # Normalize JPEG to avoid crashes with quality=100 images # (shouldn't happen with stego images, but be defensive) - stego_image = _normalize_jpeg_for_jpegio(stego_image) + stego_image = _normalize_jpeg_for_jpeglib(stego_image) - temp_path = _jpegio_bytes_to_file(stego_image, suffix=".jpg") + temp_path = _jpeglib_bytes_to_file(stego_image, suffix=".jpg") try: jpeg = jpeglib.to_jpegio(jpeglib.read_dct(temp_path)) - coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL] + coef_array = jpeg.coef_arrays[JPEGLIB_EMBED_CHANNEL] - all_positions = _jpegio_get_usable_positions(coef_array) - order = _jpegio_generate_order(len(all_positions), seed) + all_positions = _jpeglib_get_usable_positions(coef_array) + order = _jpeglib_generate_order(len(all_positions), seed) _write_progress(progress_file, 30, 100, "extracting") @@ -1751,7 +1789,7 @@ def _extract_jpegio( _write_progress(progress_file, 75, 100, "decoding") raw_payload = _rs_decode(rs_encoded) _write_progress(progress_file, 95, 100, "decoding") - _, flags, data_length = _jpegio_parse_header(raw_payload[:HEADER_SIZE]) + _, flags, data_length = _jpeglib_parse_header(raw_payload[:HEADER_SIZE]) data = raw_payload[HEADER_SIZE : HEADER_SIZE + data_length] _write_progress(progress_file, 100, 100, "complete") return data @@ -1772,7 +1810,7 @@ def _extract_jpegio( ] ) - _, flags, data_length = _jpegio_parse_header(header_bytes) + _, flags, data_length = _jpeglib_parse_header(header_bytes) total_bits_needed = (HEADER_SIZE + data_length) * 8 all_bits = [] diff --git a/src/stegasoo/decode.py b/src/stegasoo/decode.py index bdf9348..115115e 100644 --- a/src/stegasoo/decode.py +++ b/src/stegasoo/decode.py @@ -54,6 +54,7 @@ def decode( embed_mode: str = EMBED_MODE_AUTO, channel_key: str | bool | None = None, progress_file: str | None = None, + platform: str | None = None, ) -> DecodeResult: """ Decode a message or file from a stego image. @@ -124,12 +125,21 @@ def decode( # Progress: key derivation done, starting extraction _write_progress(progress_file, 25, 100, "extracting") + # Resolve platform preset for DCT extraction + extract_kwargs = {} + if platform: + from .platform_presets import get_preset + + preset = get_preset(platform) + extract_kwargs["quant_step"] = preset.quant_step + # Extract encrypted data encrypted = extract_from_image( stego_image, pixel_key, embed_mode=embed_mode, progress_file=progress_file, + **extract_kwargs, ) if not encrypted: diff --git a/src/stegasoo/encode.py b/src/stegasoo/encode.py index 5b81c13..234bc1d 100644 --- a/src/stegasoo/encode.py +++ b/src/stegasoo/encode.py @@ -51,6 +51,7 @@ def encode( dct_color_mode: str = "color", channel_key: str | bool | None = None, progress_file: str | None = None, + platform: str | None = None, ) -> EncodeResult: """ Encode a message or file into an image. @@ -123,6 +124,18 @@ def encode( # Derive pixel/coefficient selection key (with channel key) pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key) + # Resolve platform preset for DCT encoding + platform_kwargs = {} + if platform: + from .platform_presets import get_preset + + preset = get_preset(platform) + platform_kwargs = { + "quant_step": preset.quant_step, + "max_dimension": preset.max_dimension, + "jpeg_quality": preset.jpeg_quality, + } + # Embed in image stego_data, stats, extension = embed_in_image( encrypted, @@ -133,6 +146,7 @@ def encode( dct_output_format=dct_output_format, dct_color_mode=dct_color_mode, progress_file=progress_file, + **platform_kwargs, ) # Generate filename diff --git a/src/stegasoo/platform_presets.py b/src/stegasoo/platform_presets.py new file mode 100644 index 0000000..5ccceab --- /dev/null +++ b/src/stegasoo/platform_presets.py @@ -0,0 +1,169 @@ +""" +Platform-Calibrated DCT Presets (v4.4.0) + +Pre-tuned DCT embedding parameters for social media platforms. Each platform +recompresses uploaded images differently — these presets bake in the known +parameters so payloads survive the round-trip. + +Usage:: + + from stegasoo.platform_presets import get_preset, PLATFORMS + + preset = get_preset("telegram") + # Use preset.quant_step, preset.jpeg_quality, etc. in DCT encode + +Preset parameters were derived from empirical testing. Platform compression +behavior can change without notice — use ``pre_verify_survival()`` to confirm +payloads survive before relying on a preset. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class PlatformPreset: + """Tuned DCT parameters for a specific platform.""" + + name: str + jpeg_quality: int # Platform's recompression quality + max_dimension: int # Max width/height before platform resizes + quant_step: int # QIM quantization step (higher = more robust) + embed_start: int # Start index into EMBED_POSITIONS (skip low-freq) + embed_end: int # End index into EMBED_POSITIONS (skip high-freq) + recompress_quality: int # Quality to simulate platform recompression for pre-verify + notes: str = "" + + +# Platform presets — derived from empirical testing of each platform's +# image processing pipeline. These WILL change as platforms update. +# Last verified: 2026-03-25 + +PRESETS: dict[str, PlatformPreset] = { + "telegram": PlatformPreset( + name="Telegram", + jpeg_quality=82, + max_dimension=2560, + quant_step=35, + embed_start=4, + embed_end=16, + recompress_quality=80, + notes="~81KB max embeddable. Moderate recompression.", + ), + "discord": PlatformPreset( + name="Discord", + jpeg_quality=85, + max_dimension=4096, + quant_step=30, + embed_start=4, + embed_end=18, + recompress_quality=83, + notes="Varies with Nitro. Non-Nitro users get more aggressive compression.", + ), + "signal": PlatformPreset( + name="Signal", + jpeg_quality=80, + max_dimension=2048, + quant_step=40, + embed_start=5, + embed_end=15, + recompress_quality=78, + notes="Aggressive recompression. Use smaller payloads for reliability.", + ), + "whatsapp": PlatformPreset( + name="WhatsApp", + jpeg_quality=70, + max_dimension=1600, + quant_step=50, + embed_start=5, + embed_end=14, + recompress_quality=68, + notes="Most lossy. Capacity is significantly reduced.", + ), +} + +PLATFORMS = sorted(PRESETS.keys()) + + +def get_preset(platform: str) -> PlatformPreset: + """Get the preset for a platform. + + Args: + platform: Platform name (telegram, discord, signal, whatsapp). + + Returns: + PlatformPreset with tuned DCT parameters. + + Raises: + ValueError: If platform is not recognized. + """ + key = platform.lower() + if key not in PRESETS: + available = ", ".join(PLATFORMS) + raise ValueError(f"Unknown platform '{platform}'. Available: {available}") + return PRESETS[key] + + +def get_embed_positions(preset: PlatformPreset) -> list[tuple[int, int]]: + """Get the embed positions for a preset. + + Args: + preset: Platform preset. + + Returns: + List of (row, col) DCT coefficient positions. + """ + from .dct_steganography import EMBED_POSITIONS + + return EMBED_POSITIONS[preset.embed_start : preset.embed_end] + + +def pre_verify_survival( + stego_image: bytes, + seed: bytes, + preset: PlatformPreset, +) -> bool: + """Verify that a payload survives simulated platform recompression. + + Encodes → recompresses at platform quality → attempts extraction. + If extraction succeeds, the payload should survive the real platform. + + Args: + stego_image: The stego JPEG image bytes (already encoded). + seed: The same seed used for encoding. + preset: Platform preset to simulate. + + Returns: + True if payload survived simulated recompression. + """ + import io + + from PIL import Image + + from .dct_steganography import extract_from_dct + + # Simulate platform recompression + img = Image.open(io.BytesIO(stego_image)) + + # Resize if over max dimension + w, h = img.size + if max(w, h) > preset.max_dimension: + scale = preset.max_dimension / max(w, h) + new_size = (int(w * scale), int(h * scale)) + img = img.resize(new_size, Image.LANCZOS) + + # Recompress at platform quality + buf = io.BytesIO() + if img.mode != "RGB": + img = img.convert("RGB") + img.save(buf, format="JPEG", quality=preset.recompress_quality) + img.close() + recompressed = buf.getvalue() + + # Try extraction + try: + result = extract_from_dct(recompressed, seed) + return result is not None and len(result) > 0 + except Exception: + return False diff --git a/src/stegasoo/steganalysis.py b/src/stegasoo/steganalysis.py new file mode 100644 index 0000000..f417534 --- /dev/null +++ b/src/stegasoo/steganalysis.py @@ -0,0 +1,281 @@ +""" +Steganalysis Self-Check Module (v4.4.0) + +Statistical analysis to estimate detectability risk of stego images. +Runs chi-square and RS (Regular-Singular) analysis on pixel data +to assess how visible the embedding is to an attacker. + +Currently LSB-only. DCT steganalysis (calibration attack) deferred. + +Usage:: + + from stegasoo.steganalysis import check_image + + result = check_image(image_data) + print(result["risk"]) # "low", "medium", or "high" + print(result["chi_square"]) # per-channel chi-square p-values + print(result["rs"]) # per-channel RS embedding estimates +""" + +from __future__ import annotations + +import io +from dataclasses import dataclass, field + +import numpy as np +from PIL import Image + +from .constants import ( + STEGANALYSIS_CHI_SUSPICIOUS_THRESHOLD, + STEGANALYSIS_RS_HIGH_THRESHOLD, + STEGANALYSIS_RS_MEDIUM_THRESHOLD, +) + + +@dataclass +class SteganalysisResult: + """Result of steganalysis on an image.""" + + risk: str # "low", "medium", or "high" + chi_square: dict = field(default_factory=dict) # per-channel p-values + rs: dict = field(default_factory=dict) # per-channel embedding estimates + width: int = 0 + height: int = 0 + channels: int = 0 + mode: str = "lsb" + + +def chi_square_analysis(channel_data: np.ndarray) -> float: + """Chi-square test on LSB distribution of a single channel. + + Groups pixel values into pairs (2i, 2i+1) — so-called "pairs of values" + (PoVs). In a clean image, each pair has a natural frequency ratio. + LSB embedding with random data forces each pair toward equal frequency. + + The test measures H0: "pairs are equalized" (consistent with embedding). + + Args: + channel_data: Flattened 1-D array of pixel values (uint8). + + Returns: + p-value from chi-square test. + HIGH p-value (close to 1.0) → pairs are equalized → suspicious. + LOW p-value (close to 0.0) → pairs are not equalized → less suspicious. + """ + from scipy.stats import chi2 + + # Count occurrences of each value 0-255 + histogram = np.bincount(channel_data.ravel(), minlength=256) + + # Group into 128 pairs: (0,1), (2,3), ..., (254,255) + chi_sq = 0.0 + degrees_of_freedom = 0 + + for i in range(0, 256, 2): + observed_even = histogram[i] + observed_odd = histogram[i + 1] + total = observed_even + observed_odd + + if total == 0: + continue + + expected = total / 2.0 + chi_sq += (observed_even - expected) ** 2 / expected + chi_sq += (observed_odd - expected) ** 2 / expected + degrees_of_freedom += 1 + + if degrees_of_freedom == 0: + return 1.0 # No data to analyze + + # p-value: probability of observing this chi-square value by chance + # Low p-value = LSBs are suspiciously uniform = likely embedded + p_value = 1.0 - chi2.cdf(chi_sq, degrees_of_freedom) + return float(p_value) + + +def rs_analysis(channel_data: np.ndarray, block_size: int = 8) -> float: + """Regular-Singular groups analysis on a single channel. + + Divides the image channel into groups of `block_size` pixels and measures + the "smoothness" (variation) of each group. Applying a flipping function + F1 (flip LSB) and F-1 (flip LSB of value-1) produces Regular (smoother) + and Singular (rougher) groups. + + In a clean image: R_m ≈ R_{-m} and S_m ≈ S_{-m}. + LSB embedding causes R_m and S_{-m} to converge while S_m and R_{-m} + diverge, allowing estimation of the embedding rate. + + Args: + channel_data: Flattened 1-D array of pixel values (uint8). + block_size: Number of pixels per group (default 8). + + Returns: + Estimated embedding rate (0.0 = clean, 1.0 = fully embedded). + Values > 0.5 strongly indicate LSB embedding. + """ + data = channel_data.ravel().astype(np.int16) + n = len(data) + # Trim to multiple of block_size + n_blocks = n // block_size + if n_blocks < 10: + return 0.0 # Not enough data + + data = data[: n_blocks * block_size].reshape(n_blocks, block_size) + + def variation(block: np.ndarray) -> float: + """Sum of absolute differences between adjacent pixels.""" + return float(np.sum(np.abs(np.diff(block)))) + + def flip_positive(block: np.ndarray) -> np.ndarray: + """F1: flip LSB (0↔1, 2↔3, 4↔5, ...).""" + return block ^ 1 + + def flip_negative(block: np.ndarray) -> np.ndarray: + """F-1: flip LSB of (value - 1), i.e. -1↔0, 1↔2, 3↔4, ...""" + result = block.copy() + even_mask = (block % 2) == 0 + result[even_mask] -= 1 + result[~even_mask] += 1 + return result + + r_m = s_m = r_neg = s_neg = 0 + + for i in range(n_blocks): + block = data[i] + v_orig = variation(block) + + v_f1 = variation(flip_positive(block)) + if v_f1 > v_orig: + r_m += 1 + elif v_f1 < v_orig: + s_m += 1 + + v_fn1 = variation(flip_negative(block)) + if v_fn1 > v_orig: + r_neg += 1 + elif v_fn1 < v_orig: + s_neg += 1 + + # Estimate embedding rate using the RS quadratic formula + # d0 = R_m - S_m, d1 = R_{-m} - S_{-m} + # The embedding rate p satisfies: d(p/2) = d0, d(1 - p/2) = d1 + # Simplified estimator: p ≈ (R_m - S_m) / (R_{-m} - S_{-m}) divergence + d0 = r_m - s_m + d1 = r_neg - s_neg + + if n_blocks == 0: + return 0.0 + + # Use the simplified dual-statistic estimator + # In clean images: d0 ≈ d1 (both positive) + # In embedded images: d0 → 0 while d1 stays positive + if d1 == 0: + # Can't estimate — likely very embedded or degenerate + return 0.5 if d0 == 0 else 0.0 + + # Ratio-based estimate: how much has d0 dropped relative to d1 + ratio = d0 / d1 + if ratio >= 1.0: + return 0.0 # d0 ≥ d1 means no evidence of embedding + if ratio <= 0.0: + return 1.0 # d0 collapsed or inverted + + # Linear interpolation: ratio=1 → 0% embedded, ratio=0 → 100% embedded + estimate = 1.0 - ratio + return float(np.clip(estimate, 0.0, 1.0)) + + +def assess_risk(chi_p_values: dict[str, float], rs_estimates: dict[str, float]) -> str: + """Map analysis results to a risk level. + + RS analysis is the primary metric (reliable for both sequential and + random-order embedding). Chi-square is supplementary — high p-values + indicate equalized PoV pairs, which is suspicious for random LSB embedding. + + Args: + chi_p_values: Per-channel chi-square p-values (high = suspicious). + rs_estimates: Per-channel RS embedding rate estimates (high = suspicious). + + Returns: + "low", "medium", or "high" detectability risk. + """ + if not chi_p_values and not rs_estimates: + return "low" + + # RS is the primary indicator: any channel with high embedding estimate + max_rs = max(rs_estimates.values()) if rs_estimates else 0.0 + + # Chi-square: high p-value means pairs are equalized (suspicious) + max_chi_p = max(chi_p_values.values()) if chi_p_values else 0.0 + chi_suspicious = max_chi_p > STEGANALYSIS_CHI_SUSPICIOUS_THRESHOLD + + # High risk: RS strongly indicates embedding + if max_rs > STEGANALYSIS_RS_HIGH_THRESHOLD: + return "high" + + # Medium risk: moderate RS signal, or RS + chi-square both flagging + if max_rs > STEGANALYSIS_RS_MEDIUM_THRESHOLD: + return "medium" + if chi_suspicious and max_rs > 0.05: + return "medium" + + return "low" + + +def check_image(image_data: bytes, mode: str = "lsb") -> dict: + """Run steganalysis on an image and return detectability assessment. + + Args: + image_data: Raw image bytes (PNG, BMP, etc.). + mode: Analysis mode — currently only "lsb" is supported. + + Returns: + Dict with keys: risk, chi_square, rs, width, height, channels, mode. + """ + if mode not in ("lsb", "auto"): + raise ValueError(f"Unsupported steganalysis mode: {mode}. Use 'lsb' or 'auto'.") + + img = Image.open(io.BytesIO(image_data)) + if img.mode not in ("RGB", "RGBA", "L"): + img = img.convert("RGB") + + width, height = img.size + pixels = np.array(img) + img.close() + + channel_names = ["R", "G", "B"] if pixels.ndim == 3 else ["L"] + if pixels.ndim == 2: + pixels = pixels[:, :, np.newaxis] + + num_channels = min(pixels.shape[2], 3) # Skip alpha + + chi_p_values = {} + rs_estimates = {} + + for i in range(num_channels): + name = channel_names[i] + channel = pixels[:, :, i].ravel() + chi_p_values[name] = chi_square_analysis(channel) + rs_estimates[name] = rs_analysis(channel) + + risk = assess_risk(chi_p_values, rs_estimates) + + result = SteganalysisResult( + risk=risk, + chi_square=chi_p_values, + rs=rs_estimates, + width=width, + height=height, + channels=num_channels, + mode=mode, + ) + + return { + "risk": result.risk, + "chi_square": result.chi_square, + "rs": result.rs, + "width": result.width, + "height": result.height, + "channels": result.channels, + "mode": result.mode, + } diff --git a/src/stegasoo/steganography.py b/src/stegasoo/steganography.py index 4877507..c750510 100644 --- a/src/stegasoo/steganography.py +++ b/src/stegasoo/steganography.py @@ -107,13 +107,14 @@ EXT_TO_FORMAT = { # - v3.1.0: 76 bytes (had date field - 10+1 bytes) # - v3.2.0: 65 bytes (removed date, simpler) # - v4.0.0: 66 bytes (added flags byte for channel key) +# - v4.4.0: 82 bytes (added 16-byte message nonce for HKDF) -HEADER_OVERHEAD = 66 # What the crypto layer adds to any message +HEADER_OVERHEAD = 82 # What the crypto layer adds to any message (v6 format) LENGTH_PREFIX = 4 # We prepend the payload length for LSB extraction -ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX # Total: 70 bytes +ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX # Total: 86 bytes -# That 70 bytes is your minimum image capacity requirement. -# A tiny 100x100 image gives you ~3750 bytes capacity, minus 70 = ~3680 usable. +# That 86 bytes is your minimum image capacity requirement. +# A tiny 100x100 image gives you ~3750 bytes capacity, minus 86 = ~3664 usable. # DCT output format options (v3.0.1) DCT_OUTPUT_PNG = "png" @@ -609,6 +610,9 @@ def embed_in_image( dct_output_format: str = DCT_OUTPUT_PNG, dct_color_mode: str = "color", progress_file: str | None = None, + quant_step: int | None = None, + jpeg_quality: int | None = None, + max_dimension: int | None = None, ) -> tuple[bytes, Union[EmbedStats, "DCTEmbedStats"], str]: """ Embed data into an image using specified mode. @@ -636,49 +640,54 @@ def embed_in_image( embed_mode in VALID_EMBED_MODES, f"Invalid embed_mode: {embed_mode}. Use 'lsb' or 'dct'" ) - # DCT MODE - if embed_mode == EMBED_MODE_DCT: - if not has_dct_support(): - raise ImportError( - "scipy is required for DCT embedding mode. " "Install with: pip install scipy" - ) + # Dispatch via backend registry + from .backends import registry - # Validate DCT output format + backend = registry.get(embed_mode) + if not backend.is_available(): + raise ImportError( + f"Dependencies for '{embed_mode}' mode are not installed. " + f"Install with: pip install stegasoo[dct]" + ) + + if embed_mode == EMBED_MODE_DCT: + # Validate DCT-specific options if dct_output_format not in (DCT_OUTPUT_PNG, DCT_OUTPUT_JPEG): debug.print(f"Invalid dct_output_format '{dct_output_format}', defaulting to PNG") dct_output_format = DCT_OUTPUT_PNG - - # Validate DCT color mode (v3.0.1) if dct_color_mode not in ("grayscale", "color"): debug.print(f"Invalid dct_color_mode '{dct_color_mode}', defaulting to color") dct_color_mode = "color" - dct_mod = _get_dct_module() - - # Pass output_format and color_mode to DCT module (v3.0.1) - stego_bytes, dct_stats = dct_mod.embed_in_dct( + stego_bytes, dct_stats = backend.embed( data, image_data, pixel_key, - output_format=dct_output_format, - color_mode=dct_color_mode, progress_file=progress_file, + dct_output_format=dct_output_format, + dct_color_mode=dct_color_mode, + quant_step=quant_step, + jpeg_quality=jpeg_quality, + max_dimension=max_dimension, ) - - # Determine extension based on output format - if dct_output_format == DCT_OUTPUT_JPEG: - ext = "jpg" - else: - ext = "png" - + ext = "jpg" if dct_output_format == DCT_OUTPUT_JPEG else "png" debug.print( f"DCT embedding complete: {dct_output_format.upper()} output, " f"color_mode={dct_color_mode}, ext={ext}" ) return stego_bytes, dct_stats, ext - # LSB MODE - return _embed_lsb(data, image_data, pixel_key, bits_per_channel, output_format, progress_file) + # LSB and other image backends + stego_bytes, stats = backend.embed( + data, + image_data, + pixel_key, + progress_file=progress_file, + bits_per_channel=bits_per_channel, + output_format=output_format, + ) + ext = getattr(stats, "output_extension", "png") + return stego_bytes, stats, ext def _embed_lsb( @@ -844,6 +853,7 @@ def extract_from_image( bits_per_channel: int = 1, embed_mode: str = EMBED_MODE_AUTO, progress_file: str | None = None, + quant_step: int | None = None, ) -> bytes | None: """ Extract hidden data from a stego image. @@ -860,32 +870,40 @@ def extract_from_image( """ debug.print(f"extract_from_image: mode={embed_mode}") - # AUTO MODE: Try LSB first, then DCT + from .backends import registry + + # AUTO MODE: Try LSB first (cheaper), then other backends if embed_mode == EMBED_MODE_AUTO: - result = _extract_lsb(image_data, pixel_key, bits_per_channel) - if result is not None: - debug.print("Auto-detect: LSB extraction succeeded") - return result - - if has_dct_support(): - debug.print("Auto-detect: LSB failed, trying DCT") - result = _extract_dct(image_data, pixel_key, progress_file) + auto_order = [EMBED_MODE_LSB] + [ + m for m in registry.available_modes(carrier_type="image") if m != EMBED_MODE_LSB + ] + for mode in auto_order: + backend = registry.get(mode) + debug.print(f"Auto-detect: trying {mode}") + result = backend.extract( + image_data, + pixel_key, + progress_file=progress_file, + bits_per_channel=bits_per_channel, + quant_step=quant_step, + ) if result is not None: - debug.print("Auto-detect: DCT extraction succeeded") + debug.print(f"Auto-detect: {mode} extraction succeeded") return result - debug.print("Auto-detect: All modes failed") return None - # EXPLICIT DCT MODE - elif embed_mode == EMBED_MODE_DCT: - if not has_dct_support(): - raise ImportError("scipy required for DCT mode") - return _extract_dct(image_data, pixel_key, progress_file) - - # EXPLICIT LSB MODE - else: - return _extract_lsb(image_data, pixel_key, bits_per_channel) + # EXPLICIT MODE + backend = registry.get(embed_mode) + if not backend.is_available(): + raise ImportError(f"Dependencies for '{embed_mode}' mode are not installed.") + return backend.extract( + image_data, + pixel_key, + progress_file=progress_file, + bits_per_channel=bits_per_channel, + quant_step=quant_step, + ) def _extract_dct( @@ -1099,9 +1117,9 @@ def peek_image(image_data: bytes) -> dict: # Try DCT extraction (requires scipy/jpeglib) try: - from .dct_steganography import HAS_JPEGIO, HAS_SCIPY + from .dct_steganography import HAS_JPEGLIB, HAS_SCIPY - if HAS_SCIPY or HAS_JPEGIO: + if HAS_SCIPY or HAS_JPEGLIB: from .dct_steganography import extract_from_dct # Extract first few bytes to check header