Pre-consolidation snapshot: backends, steganalysis, platform presets, and WIP changes
Snapshot of all uncommitted work before merging stegasoo into soosef monorepo. Includes: pluggable backends registry, steganalysis detection, platform presets, and various in-progress modifications across core modules. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -46,6 +46,16 @@ from .image_utils import (
|
||||
get_image_info,
|
||||
)
|
||||
|
||||
# Backend registry
|
||||
from .backends import EmbeddingBackend, registry as backend_registry
|
||||
|
||||
# Platform presets
|
||||
from .platform_presets import PLATFORMS, get_preset
|
||||
|
||||
# Steganalysis
|
||||
from .steganalysis import check_image
|
||||
from .backends.registry import BackendNotFoundError
|
||||
|
||||
# Steganography functions
|
||||
from .steganography import (
|
||||
calculate_capacity_by_mode,
|
||||
@@ -273,6 +283,15 @@ __all__ = [
|
||||
"generate_filename",
|
||||
# Crypto
|
||||
"has_argon2",
|
||||
# Backends
|
||||
"EmbeddingBackend",
|
||||
"backend_registry",
|
||||
"BackendNotFoundError",
|
||||
# Platform presets
|
||||
"get_preset",
|
||||
"PLATFORMS",
|
||||
# Steganalysis
|
||||
"check_image",
|
||||
# Steganography
|
||||
"has_dct_support",
|
||||
"calculate_capacity_by_mode",
|
||||
|
||||
31
src/stegasoo/backends/__init__.py
Normal file
31
src/stegasoo/backends/__init__.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""
|
||||
Stegasoo embedding backends.
|
||||
|
||||
Provides a typed plugin interface for all embedding algorithms.
|
||||
Backends register with the module-level ``registry`` on import.
|
||||
|
||||
Usage::
|
||||
|
||||
from stegasoo.backends import registry
|
||||
|
||||
backend = registry.get("lsb")
|
||||
stego, stats = backend.embed(data, carrier, key)
|
||||
"""
|
||||
|
||||
from .dct import DCTBackend
|
||||
from .lsb import LSBBackend
|
||||
from .protocol import EmbeddingBackend
|
||||
from .registry import BackendNotFoundError, BackendRegistry, registry
|
||||
|
||||
# Auto-register built-in backends
|
||||
registry.register(LSBBackend())
|
||||
registry.register(DCTBackend())
|
||||
|
||||
__all__ = [
|
||||
"EmbeddingBackend",
|
||||
"BackendRegistry",
|
||||
"BackendNotFoundError",
|
||||
"registry",
|
||||
"LSBBackend",
|
||||
"DCTBackend",
|
||||
]
|
||||
69
src/stegasoo/backends/dct.py
Normal file
69
src/stegasoo/backends/dct.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""
|
||||
DCT (Discrete Cosine Transform) image embedding backend.
|
||||
|
||||
Wraps the existing frequency-domain DCT functions in dct_steganography.py.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class DCTBackend:
|
||||
"""Frequency-domain DCT embedding for JPEG-resilient steganography."""
|
||||
|
||||
@property
|
||||
def mode(self) -> str:
|
||||
return "dct"
|
||||
|
||||
@property
|
||||
def carrier_type(self) -> str:
|
||||
return "image"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
from ..dct_steganography import HAS_SCIPY
|
||||
|
||||
return HAS_SCIPY
|
||||
|
||||
def embed(
|
||||
self,
|
||||
data: bytes,
|
||||
carrier: bytes,
|
||||
key: bytes,
|
||||
*,
|
||||
progress_file: str | None = None,
|
||||
**options: Any,
|
||||
) -> tuple[bytes, Any]:
|
||||
from ..dct_steganography import embed_in_dct
|
||||
|
||||
output_format = options.get("dct_output_format", "png")
|
||||
color_mode = options.get("dct_color_mode", "color")
|
||||
quant_step = options.get("quant_step")
|
||||
jpeg_quality = options.get("jpeg_quality")
|
||||
max_dimension = options.get("max_dimension")
|
||||
return embed_in_dct(
|
||||
data, carrier, key, output_format, color_mode, progress_file,
|
||||
quant_step=quant_step, jpeg_quality=jpeg_quality, max_dimension=max_dimension,
|
||||
)
|
||||
|
||||
def extract(
|
||||
self,
|
||||
carrier: bytes,
|
||||
key: bytes,
|
||||
*,
|
||||
progress_file: str | None = None,
|
||||
**options: Any,
|
||||
) -> bytes | None:
|
||||
from ..dct_steganography import extract_from_dct
|
||||
|
||||
quant_step = options.get("quant_step")
|
||||
try:
|
||||
return extract_from_dct(carrier, key, progress_file, quant_step=quant_step)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def calculate_capacity(self, carrier: bytes, **options: Any) -> int:
|
||||
from ..dct_steganography import calculate_dct_capacity
|
||||
|
||||
info = calculate_dct_capacity(carrier)
|
||||
return info.usable_capacity_bytes
|
||||
63
src/stegasoo/backends/lsb.py
Normal file
63
src/stegasoo/backends/lsb.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""
|
||||
LSB (Least Significant Bit) image embedding backend.
|
||||
|
||||
Wraps the existing spatial-domain LSB functions in steganography.py.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class LSBBackend:
|
||||
"""Spatial-domain LSB embedding for lossless image formats."""
|
||||
|
||||
@property
|
||||
def mode(self) -> str:
|
||||
return "lsb"
|
||||
|
||||
@property
|
||||
def carrier_type(self) -> str:
|
||||
return "image"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True # Only needs Pillow, which is always present
|
||||
|
||||
def embed(
|
||||
self,
|
||||
data: bytes,
|
||||
carrier: bytes,
|
||||
key: bytes,
|
||||
*,
|
||||
progress_file: str | None = None,
|
||||
**options: Any,
|
||||
) -> tuple[bytes, Any]:
|
||||
from ..steganography import _embed_lsb
|
||||
|
||||
bits_per_channel = options.get("bits_per_channel", 1)
|
||||
output_format = options.get("output_format", None)
|
||||
stego_bytes, stats, ext = _embed_lsb(
|
||||
data, carrier, key, bits_per_channel, output_format, progress_file
|
||||
)
|
||||
# Attach output extension to stats for callers that need it
|
||||
stats.output_extension = ext # type: ignore[attr-defined]
|
||||
return stego_bytes, stats
|
||||
|
||||
def extract(
|
||||
self,
|
||||
carrier: bytes,
|
||||
key: bytes,
|
||||
*,
|
||||
progress_file: str | None = None,
|
||||
**options: Any,
|
||||
) -> bytes | None:
|
||||
from ..steganography import _extract_lsb
|
||||
|
||||
bits_per_channel = options.get("bits_per_channel", 1)
|
||||
return _extract_lsb(carrier, key, bits_per_channel)
|
||||
|
||||
def calculate_capacity(self, carrier: bytes, **options: Any) -> int:
|
||||
from ..steganography import calculate_capacity
|
||||
|
||||
bits_per_channel = options.get("bits_per_channel", 1)
|
||||
return calculate_capacity(carrier, bits_per_channel)
|
||||
91
src/stegasoo/backends/protocol.py
Normal file
91
src/stegasoo/backends/protocol.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""
|
||||
Embedding backend protocol definition.
|
||||
|
||||
All embedding backends (LSB, DCT, audio, video, etc.) implement this protocol,
|
||||
enabling registry-based dispatch instead of if/elif chains.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class EmbeddingBackend(Protocol):
|
||||
"""Protocol that all embedding backends must satisfy.
|
||||
|
||||
Each backend handles a specific embedding mode (e.g. 'lsb', 'dct',
|
||||
'audio_lsb', 'audio_spread') for a specific carrier type ('image',
|
||||
'audio', 'video').
|
||||
"""
|
||||
|
||||
@property
|
||||
def mode(self) -> str:
|
||||
"""The embedding mode identifier (e.g. 'lsb', 'dct')."""
|
||||
...
|
||||
|
||||
@property
|
||||
def carrier_type(self) -> str:
|
||||
"""The carrier media type: 'image', 'audio', or 'video'."""
|
||||
...
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Whether this backend's dependencies are installed."""
|
||||
...
|
||||
|
||||
def embed(
|
||||
self,
|
||||
data: bytes,
|
||||
carrier: bytes,
|
||||
key: bytes,
|
||||
*,
|
||||
progress_file: str | None = None,
|
||||
**options: Any,
|
||||
) -> tuple[bytes, Any]:
|
||||
"""Embed data into a carrier.
|
||||
|
||||
Args:
|
||||
data: Encrypted payload bytes.
|
||||
carrier: Raw carrier file bytes (image, audio, etc.).
|
||||
key: Derived key for pixel/sample selection.
|
||||
progress_file: Optional progress file path.
|
||||
**options: Backend-specific options (bits_per_channel,
|
||||
output_format, color_mode, chip_tier, etc.).
|
||||
|
||||
Returns:
|
||||
Tuple of (stego carrier bytes, embed stats).
|
||||
"""
|
||||
...
|
||||
|
||||
def extract(
|
||||
self,
|
||||
carrier: bytes,
|
||||
key: bytes,
|
||||
*,
|
||||
progress_file: str | None = None,
|
||||
**options: Any,
|
||||
) -> bytes | None:
|
||||
"""Extract data from a carrier.
|
||||
|
||||
Args:
|
||||
carrier: Stego carrier file bytes.
|
||||
key: Derived key for pixel/sample selection.
|
||||
progress_file: Optional progress file path.
|
||||
**options: Backend-specific options.
|
||||
|
||||
Returns:
|
||||
Extracted payload bytes, or None if no payload found.
|
||||
"""
|
||||
...
|
||||
|
||||
def calculate_capacity(self, carrier: bytes, **options: Any) -> int:
|
||||
"""Calculate maximum embeddable payload size in bytes.
|
||||
|
||||
Args:
|
||||
carrier: Raw carrier file bytes.
|
||||
**options: Backend-specific options (e.g. bits_per_channel).
|
||||
|
||||
Returns:
|
||||
Maximum payload capacity in bytes.
|
||||
"""
|
||||
...
|
||||
63
src/stegasoo/backends/registry.py
Normal file
63
src/stegasoo/backends/registry.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""
|
||||
Backend registry for embedding mode dispatch.
|
||||
|
||||
Backends register themselves by mode string. The registry replaces
|
||||
if/elif dispatch in steganography.py with a lookup table.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ..exceptions import StegasooError
|
||||
from .protocol import EmbeddingBackend
|
||||
|
||||
|
||||
class BackendNotFoundError(StegasooError):
|
||||
"""Raised when a requested backend mode is not registered."""
|
||||
|
||||
|
||||
class BackendRegistry:
|
||||
"""Registry mapping mode strings to embedding backends."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._backends: dict[str, EmbeddingBackend] = {}
|
||||
|
||||
def register(self, backend: EmbeddingBackend) -> None:
|
||||
"""Register a backend for its mode string."""
|
||||
self._backends[backend.mode] = backend
|
||||
|
||||
def get(self, mode: str) -> EmbeddingBackend:
|
||||
"""Look up a backend by mode. Raises BackendNotFoundError if not found."""
|
||||
if mode not in self._backends:
|
||||
available = ", ".join(sorted(self._backends.keys())) or "(none)"
|
||||
raise BackendNotFoundError(
|
||||
f"No backend registered for mode '{mode}'. Available: {available}"
|
||||
)
|
||||
return self._backends[mode]
|
||||
|
||||
def has(self, mode: str) -> bool:
|
||||
"""Check if a backend is registered for the given mode."""
|
||||
return mode in self._backends
|
||||
|
||||
def available_modes(self, carrier_type: str | None = None) -> list[str]:
|
||||
"""List registered mode strings, optionally filtered by carrier type.
|
||||
|
||||
Only includes modes whose backend reports is_available() == True.
|
||||
"""
|
||||
return sorted(
|
||||
mode
|
||||
for mode, backend in self._backends.items()
|
||||
if backend.is_available()
|
||||
and (carrier_type is None or backend.carrier_type == carrier_type)
|
||||
)
|
||||
|
||||
def all_modes(self, carrier_type: str | None = None) -> list[str]:
|
||||
"""List all registered mode strings (including unavailable ones)."""
|
||||
return sorted(
|
||||
mode
|
||||
for mode, backend in self._backends.items()
|
||||
if carrier_type is None or backend.carrier_type == carrier_type
|
||||
)
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
registry = BackendRegistry()
|
||||
@@ -184,8 +184,14 @@ def cli(ctx, json_output, debug_mode):
|
||||
)
|
||||
@click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code")
|
||||
@click.option("--dry-run", is_flag=True, help="Show capacity usage without encoding")
|
||||
@click.option(
|
||||
"--platform",
|
||||
type=click.Choice(["telegram", "discord", "signal", "whatsapp"], case_sensitive=False),
|
||||
help="DCT preset for social media platform (implies DCT+JPEG mode)",
|
||||
)
|
||||
@click.option("--verify/--no-verify", default=True, help="Pre-verify payload survives platform recompression")
|
||||
@click.pass_context
|
||||
def encode(ctx, carrier, reference, message, file_payload, output, passphrase, pin, dry_run):
|
||||
def encode(ctx, carrier, reference, message, file_payload, output, passphrase, pin, dry_run, platform, verify):
|
||||
"""
|
||||
Encode a message or file into an image.
|
||||
|
||||
@@ -260,28 +266,47 @@ def encode(ctx, carrier, reference, message, file_payload, output, passphrase, p
|
||||
|
||||
from .steganography import EMBED_MODE_DCT, EMBED_MODE_LSB
|
||||
|
||||
# Platform preset overrides
|
||||
preset = None
|
||||
if platform:
|
||||
from .platform_presets import get_preset
|
||||
|
||||
preset = get_preset(platform)
|
||||
use_dct = True # Platform mode implies DCT+JPEG
|
||||
if output_ext not in (".jpg", ".jpeg"):
|
||||
output = str(Path(output).with_suffix(".jpg"))
|
||||
click.echo(f" Platform mode: output changed to {output}")
|
||||
|
||||
try:
|
||||
encode_kwargs = {
|
||||
"reference_photo": reference_data,
|
||||
"carrier_image": carrier_data,
|
||||
"passphrase": passphrase,
|
||||
"pin": pin,
|
||||
"embed_mode": EMBED_MODE_DCT if use_dct else EMBED_MODE_LSB,
|
||||
"dct_output_format": "jpeg" if use_dct else "png",
|
||||
}
|
||||
|
||||
if preset:
|
||||
encode_kwargs["platform"] = platform
|
||||
|
||||
if file_payload:
|
||||
# Encode file
|
||||
result = stegasoo_encode_file(
|
||||
filepath=file_payload,
|
||||
reference_photo=reference_data,
|
||||
carrier_image=carrier_data,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
embed_mode=EMBED_MODE_DCT if use_dct else EMBED_MODE_LSB,
|
||||
dct_output_format="jpeg" if use_dct else "png",
|
||||
)
|
||||
result = stegasoo_encode_file(filepath=file_payload, **encode_kwargs)
|
||||
else:
|
||||
# Encode message
|
||||
result = stegasoo_encode(
|
||||
message=message,
|
||||
reference_photo=reference_data,
|
||||
carrier_image=carrier_data,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
embed_mode=EMBED_MODE_DCT if use_dct else EMBED_MODE_LSB,
|
||||
dct_output_format="jpeg" if use_dct else "png",
|
||||
result = stegasoo_encode(message=message, **encode_kwargs)
|
||||
|
||||
# Pre-verify survival if platform mode
|
||||
if preset and verify:
|
||||
from .crypto import derive_pixel_key
|
||||
from .platform_presets import pre_verify_survival
|
||||
|
||||
pixel_key = derive_pixel_key(reference_data, passphrase, pin)
|
||||
survived = pre_verify_survival(result.stego_image, pixel_key, preset)
|
||||
if not survived:
|
||||
click.echo(
|
||||
f" ⚠ Warning: Payload may not survive {preset.name} recompression. "
|
||||
"Try a larger carrier image or shorter message.",
|
||||
err=True,
|
||||
)
|
||||
|
||||
# Write output
|
||||
@@ -325,8 +350,13 @@ def encode(ctx, carrier, reference, message, file_payload, output, passphrase, p
|
||||
@click.option("--passphrase", prompt=True, hide_input=True, help="Passphrase")
|
||||
@click.option("--pin", prompt=True, hide_input=True, help="PIN code")
|
||||
@click.option("-o", "--output", type=click.Path(), help="Output path for file payloads")
|
||||
@click.option(
|
||||
"--platform",
|
||||
type=click.Choice(["telegram", "discord", "signal", "whatsapp"], case_sensitive=False),
|
||||
help="Platform preset (must match encoding platform)",
|
||||
)
|
||||
@click.pass_context
|
||||
def decode(ctx, image, reference, passphrase, pin, output):
|
||||
def decode(ctx, image, reference, passphrase, pin, output, platform):
|
||||
"""
|
||||
Decode a message or file from an image.
|
||||
|
||||
@@ -334,7 +364,7 @@ def decode(ctx, image, reference, passphrase, pin, output):
|
||||
|
||||
stegasoo decode encoded.png -r ref.jpg --passphrase --pin
|
||||
|
||||
stegasoo decode encoded.png -r ref.jpg -o ./extracted/
|
||||
stegasoo decode encoded.png -r ref.jpg --platform telegram
|
||||
"""
|
||||
from .decode import decode as stegasoo_decode
|
||||
|
||||
@@ -344,12 +374,21 @@ def decode(ctx, image, reference, passphrase, pin, output):
|
||||
with open(reference, "rb") as f:
|
||||
reference_data = f.read()
|
||||
|
||||
# Resolve platform preset for DCT decoding
|
||||
decode_kwargs = {}
|
||||
if platform:
|
||||
from .platform_presets import get_preset
|
||||
|
||||
preset = get_preset(platform)
|
||||
decode_kwargs["platform"] = platform
|
||||
|
||||
try:
|
||||
result = stegasoo_decode(
|
||||
stego_image=stego_data,
|
||||
reference_photo=reference_data,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
**decode_kwargs,
|
||||
)
|
||||
|
||||
if result.is_file:
|
||||
@@ -1550,9 +1589,9 @@ def info(ctx, full):
|
||||
|
||||
# Check for DCT support
|
||||
try:
|
||||
from .dct_steganography import HAS_JPEGIO, HAS_SCIPY
|
||||
from .dct_steganography import HAS_JPEGLIB, HAS_SCIPY
|
||||
|
||||
has_dct = HAS_SCIPY and HAS_JPEGIO
|
||||
has_dct = HAS_SCIPY and HAS_JPEGLIB
|
||||
except ImportError:
|
||||
has_dct = False
|
||||
|
||||
@@ -2402,6 +2441,66 @@ def tools_convert(image, fmt, quality, output):
|
||||
click.echo(f"Converted to: {output}")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# STEGANALYSIS COMMANDS
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("image", type=click.Path(exists=True))
|
||||
@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
|
||||
@click.option(
|
||||
"--mode",
|
||||
type=click.Choice(["lsb", "auto"]),
|
||||
default="lsb",
|
||||
help="Analysis mode (default: lsb)",
|
||||
)
|
||||
def check(image, as_json, mode):
|
||||
"""Analyze an image for steganographic detectability.
|
||||
|
||||
Runs chi-square and RS (Regular-Singular) statistical tests to estimate
|
||||
how detectable any hidden data might be. Outputs a risk level.
|
||||
|
||||
Examples:
|
||||
|
||||
stegasoo check carrier.png
|
||||
|
||||
stegasoo check stego.png --json
|
||||
|
||||
stegasoo check suspicious.bmp --mode lsb
|
||||
"""
|
||||
from .steganalysis import check_image
|
||||
|
||||
with open(image, "rb") as f:
|
||||
image_data = f.read()
|
||||
|
||||
result = check_image(image_data, mode=mode)
|
||||
result["filename"] = Path(image).name
|
||||
|
||||
if as_json:
|
||||
click.echo(json.dumps(result, indent=2))
|
||||
else:
|
||||
risk = result["risk"]
|
||||
risk_colors = {"low": "green", "medium": "yellow", "high": "red"}
|
||||
risk_display = click.style(risk.upper(), fg=risk_colors.get(risk, "white"), bold=True)
|
||||
|
||||
click.echo(f"\n Steganalysis: {result['filename']}")
|
||||
click.echo(f" Image: {result['width']}x{result['height']}, {result['channels']} channels")
|
||||
click.echo(f" Detectability risk: {risk_display}")
|
||||
|
||||
click.echo("\n Chi-square (p-values):")
|
||||
for ch, p in result["chi_square"].items():
|
||||
indicator = "!" if p < 0.05 else " "
|
||||
click.echo(f" {indicator} {ch}: {p:.6f}")
|
||||
|
||||
click.echo("\n RS embedding estimate:")
|
||||
for ch, est in result["rs"].items():
|
||||
indicator = "!" if est > 0.1 else " "
|
||||
click.echo(f" {indicator} {ch}: {est:.4f} ({est * 100:.1f}%)")
|
||||
|
||||
click.echo()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# ADMIN COMMANDS (Web UI administration)
|
||||
# =============================================================================
|
||||
|
||||
@@ -44,7 +44,9 @@ MAGIC_HEADER = b"\x89ST3"
|
||||
# Version 1-3: Date-dependent encryption (v3.0.x - v3.1.x)
|
||||
# Version 4: Date-independent encryption (v3.2.0)
|
||||
# Version 5: Channel key support (v4.0.0) - adds flags byte to header
|
||||
FORMAT_VERSION = 5
|
||||
# Version 6: HKDF per-message key derivation (v4.4.0) - adds message nonce to header
|
||||
FORMAT_VERSION = 6
|
||||
FORMAT_VERSION_LEGACY = 5 # For backward-compatible decryption
|
||||
|
||||
# Payload type markers
|
||||
PAYLOAD_TEXT = 0x01
|
||||
@@ -66,6 +68,11 @@ ARGON2_PARALLELISM = 4
|
||||
# PBKDF2 fallback parameters
|
||||
PBKDF2_ITERATIONS = 600000
|
||||
|
||||
# HKDF per-message key derivation (v4.4.0 / FORMAT_VERSION 6)
|
||||
MESSAGE_NONCE_SIZE = 16 # 128-bit random nonce per message
|
||||
HKDF_INFO_ENCRYPT = b"stegasoo-v6-encrypt" # HKDF info for encryption key
|
||||
HKDF_INFO_PIXEL = b"stegasoo-v6-pixel" # HKDF info for pixel selection key (reserved)
|
||||
|
||||
# ============================================================================
|
||||
# INPUT LIMITS
|
||||
# ============================================================================
|
||||
@@ -244,6 +251,17 @@ def get_wordlist() -> list[str]:
|
||||
return _bip39_words
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# STEGANALYSIS (v4.4.0)
|
||||
# =============================================================================
|
||||
|
||||
# Chi-square p-value threshold: HIGH p-value = equalized PoV pairs = suspicious
|
||||
STEGANALYSIS_CHI_SUSPICIOUS_THRESHOLD = 0.95 # p > 0.95 → pairs suspiciously equalized
|
||||
|
||||
# RS embedding rate thresholds (primary metric): higher = more likely embedded
|
||||
STEGANALYSIS_RS_HIGH_THRESHOLD = 0.3 # > 30% estimated embedding → high risk
|
||||
STEGANALYSIS_RS_MEDIUM_THRESHOLD = 0.1 # > 10% estimated embedding → medium risk
|
||||
|
||||
# =============================================================================
|
||||
# DCT STEGANOGRAPHY (v3.0+)
|
||||
# =============================================================================
|
||||
|
||||
@@ -29,7 +29,9 @@ import secrets
|
||||
import struct
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes as _hashes
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
|
||||
from PIL import Image
|
||||
|
||||
from .constants import (
|
||||
@@ -37,9 +39,12 @@ from .constants import (
|
||||
ARGON2_PARALLELISM,
|
||||
ARGON2_TIME_COST,
|
||||
FORMAT_VERSION,
|
||||
FORMAT_VERSION_LEGACY,
|
||||
HKDF_INFO_ENCRYPT,
|
||||
IV_SIZE,
|
||||
MAGIC_HEADER,
|
||||
MAX_FILENAME_LENGTH,
|
||||
MESSAGE_NONCE_SIZE,
|
||||
PAYLOAD_FILE,
|
||||
PAYLOAD_TEXT,
|
||||
PBKDF2_ITERATIONS,
|
||||
@@ -63,6 +68,7 @@ except ImportError:
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# CHANNEL KEY RESOLUTION
|
||||
# =============================================================================
|
||||
@@ -314,6 +320,30 @@ def derive_pixel_key(
|
||||
return hashlib.sha256(material + b"pixel_selection").digest()
|
||||
|
||||
|
||||
def derive_message_key(root_key: bytes, nonce: bytes) -> bytes:
|
||||
"""
|
||||
Derive a per-message encryption key via HKDF-Expand.
|
||||
|
||||
Each message gets a unique encryption key even with identical credentials,
|
||||
because the nonce is random per message. This provides key diversification:
|
||||
compromising the ciphertext of one message doesn't help with another.
|
||||
|
||||
Args:
|
||||
root_key: 32-byte root key from Argon2id/PBKDF2
|
||||
nonce: 16-byte random nonce (unique per message)
|
||||
|
||||
Returns:
|
||||
32-byte per-message encryption key
|
||||
"""
|
||||
hkdf = HKDFExpand(
|
||||
algorithm=_hashes.SHA256(),
|
||||
length=32,
|
||||
info=HKDF_INFO_ENCRYPT + nonce,
|
||||
backend=default_backend(),
|
||||
)
|
||||
return hkdf.derive(root_key)
|
||||
|
||||
|
||||
def _pack_payload(
|
||||
content: str | bytes | FilePayload,
|
||||
) -> tuple[bytes, int]:
|
||||
@@ -472,7 +502,12 @@ def encrypt_message(
|
||||
"""
|
||||
try:
|
||||
salt = secrets.token_bytes(SALT_SIZE)
|
||||
key = derive_hybrid_key(photo_data, passphrase, salt, pin, rsa_key_data, channel_key)
|
||||
root_key = derive_hybrid_key(photo_data, passphrase, salt, pin, rsa_key_data, channel_key)
|
||||
|
||||
# v6: Per-message key via HKDF — each message gets a unique encryption key
|
||||
message_nonce = secrets.token_bytes(MESSAGE_NONCE_SIZE)
|
||||
key = derive_message_key(root_key, message_nonce)
|
||||
|
||||
iv = secrets.token_bytes(IV_SIZE)
|
||||
|
||||
# Determine flags
|
||||
@@ -502,28 +537,36 @@ def encrypt_message(
|
||||
"Padded message: %d bytes (payload + %d padding)", len(padded_message), padding_needed
|
||||
)
|
||||
|
||||
# Build header for AAD
|
||||
# Build header for AAD (v6: includes nonce in authenticated data)
|
||||
header = MAGIC_HEADER + bytes([FORMAT_VERSION, flags])
|
||||
|
||||
# Encrypt with AES-256-GCM
|
||||
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
encryptor.authenticate_additional_data(header)
|
||||
encryptor.authenticate_additional_data(header + message_nonce)
|
||||
ciphertext = encryptor.update(padded_message) + encryptor.finalize()
|
||||
|
||||
total_size = len(header) + len(salt) + len(iv) + len(encryptor.tag) + len(ciphertext)
|
||||
total_size = (
|
||||
len(header)
|
||||
+ MESSAGE_NONCE_SIZE
|
||||
+ len(salt)
|
||||
+ len(iv)
|
||||
+ len(encryptor.tag)
|
||||
+ len(ciphertext)
|
||||
)
|
||||
logger.debug(
|
||||
"Encrypted output: %d bytes (header=%d, salt=%d, iv=%d, tag=%d, ciphertext=%d)",
|
||||
"Encrypted output: %d bytes (header=%d, nonce=%d, salt=%d, iv=%d, tag=%d, ct=%d)",
|
||||
total_size,
|
||||
len(header),
|
||||
MESSAGE_NONCE_SIZE,
|
||||
len(salt),
|
||||
len(iv),
|
||||
len(encryptor.tag),
|
||||
len(ciphertext),
|
||||
)
|
||||
|
||||
# v4.0.0: Header with flags byte
|
||||
return header + salt + iv + encryptor.tag + ciphertext
|
||||
# v6: [magic|version|flags|nonce|salt|iv|tag|ciphertext]
|
||||
return header + message_nonce + salt + iv + encryptor.tag + ciphertext
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Encryption failed: %s", e)
|
||||
@@ -534,25 +577,55 @@ def parse_header(encrypted_data: bytes) -> dict | None:
|
||||
"""
|
||||
Parse the header from encrypted data.
|
||||
|
||||
v4.0.0: Includes flags byte for channel key indicator.
|
||||
Supports both v5 (legacy) and v6 (HKDF) header formats.
|
||||
|
||||
v5: [magic:4][ver:1][flags:1][salt:32][iv:12][tag:16][ciphertext] (66+ bytes)
|
||||
v6: [magic:4][ver:1][flags:1][nonce:16][salt:32][iv:12][tag:16][ciphertext] (82+ bytes)
|
||||
|
||||
Args:
|
||||
encrypted_data: Raw encrypted bytes
|
||||
|
||||
Returns:
|
||||
Dict with salt, iv, tag, ciphertext, flags or None if invalid
|
||||
Dict with version, salt, iv, tag, ciphertext, flags, and optionally
|
||||
message_nonce (v6). Returns None if invalid.
|
||||
"""
|
||||
# Min size: Magic(4) + Version(1) + Flags(1) + Salt(32) + IV(12) + Tag(16) = 66 bytes
|
||||
# Min v5 size: 4+1+1+32+12+16 = 66 bytes
|
||||
if len(encrypted_data) < 66 or encrypted_data[:4] != MAGIC_HEADER:
|
||||
return None
|
||||
|
||||
try:
|
||||
version = encrypted_data[4]
|
||||
if version != FORMAT_VERSION:
|
||||
|
||||
if version == FORMAT_VERSION:
|
||||
# v6: has message nonce
|
||||
if len(encrypted_data) < 82:
|
||||
return None
|
||||
|
||||
flags = encrypted_data[5]
|
||||
offset = 6
|
||||
message_nonce = encrypted_data[offset : offset + MESSAGE_NONCE_SIZE]
|
||||
offset += MESSAGE_NONCE_SIZE
|
||||
salt = encrypted_data[offset : offset + SALT_SIZE]
|
||||
offset += SALT_SIZE
|
||||
iv = encrypted_data[offset : offset + IV_SIZE]
|
||||
offset += IV_SIZE
|
||||
tag = encrypted_data[offset : offset + TAG_SIZE]
|
||||
offset += TAG_SIZE
|
||||
ciphertext = encrypted_data[offset:]
|
||||
|
||||
return {
|
||||
"version": version,
|
||||
"flags": flags,
|
||||
"has_channel_key": bool(flags & FLAG_CHANNEL_KEY),
|
||||
"message_nonce": message_nonce,
|
||||
"salt": salt,
|
||||
"iv": iv,
|
||||
"tag": tag,
|
||||
"ciphertext": ciphertext,
|
||||
}
|
||||
|
||||
elif version == FORMAT_VERSION_LEGACY:
|
||||
# v5: no nonce
|
||||
flags = encrypted_data[5]
|
||||
offset = 6
|
||||
salt = encrypted_data[offset : offset + SALT_SIZE]
|
||||
offset += SALT_SIZE
|
||||
@@ -566,11 +639,16 @@ def parse_header(encrypted_data: bytes) -> dict | None:
|
||||
"version": version,
|
||||
"flags": flags,
|
||||
"has_channel_key": bool(flags & FLAG_CHANNEL_KEY),
|
||||
"message_nonce": None,
|
||||
"salt": salt,
|
||||
"iv": iv,
|
||||
"tag": tag,
|
||||
"ciphertext": ciphertext,
|
||||
}
|
||||
|
||||
else:
|
||||
return None
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@@ -622,12 +700,21 @@ def decrypt_message(
|
||||
message_has_key = header["has_channel_key"]
|
||||
|
||||
try:
|
||||
key = derive_hybrid_key(
|
||||
root_key = derive_hybrid_key(
|
||||
photo_data, passphrase, header["salt"], pin, rsa_key_data, channel_key
|
||||
)
|
||||
|
||||
# Reconstruct header for AAD verification
|
||||
aad_header = MAGIC_HEADER + bytes([FORMAT_VERSION, header["flags"]])
|
||||
version = header["version"]
|
||||
message_nonce = header["message_nonce"]
|
||||
|
||||
if version == FORMAT_VERSION and message_nonce is not None:
|
||||
# v6: Derive per-message key via HKDF
|
||||
key = derive_message_key(root_key, message_nonce)
|
||||
aad_header = MAGIC_HEADER + bytes([FORMAT_VERSION, header["flags"]]) + message_nonce
|
||||
else:
|
||||
# v5 (legacy): Root key used directly
|
||||
key = root_key
|
||||
aad_header = MAGIC_HEADER + bytes([FORMAT_VERSION_LEGACY, header["flags"]])
|
||||
|
||||
cipher = Cipher(
|
||||
algorithms.AES(key), modes.GCM(header["iv"], header["tag"]), backend=default_backend()
|
||||
@@ -647,7 +734,7 @@ def decrypt_message(
|
||||
payload_data = padded_plaintext[:original_length]
|
||||
result = _unpack_payload(payload_data)
|
||||
|
||||
logger.debug("Decryption successful: %s", result.payload_type)
|
||||
logger.debug("Decryption successful: %s (v%d)", result.payload_type, version)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -12,7 +12,7 @@ Why is this cool?
|
||||
|
||||
Two approaches depending on what you want:
|
||||
1. PNG output: We do our own DCT math via scipy (works on any image)
|
||||
2. JPEG output: We use jpeglib to directly tweak the coefficients (chef's kiss)
|
||||
2. JPEG output: We use jpeglib to directly modify the coefficients (chef's kiss)
|
||||
|
||||
v4.1.0 - The "please stop corrupting my data" release:
|
||||
- Reed-Solomon error correction (can fix up to 16 byte errors per chunk)
|
||||
@@ -56,13 +56,12 @@ except ImportError:
|
||||
idctn = None
|
||||
|
||||
# Check for jpeglib availability (for proper JPEG mode)
|
||||
# jpeglib replaces jpegio for Python 3.13+ compatibility
|
||||
try:
|
||||
import jpeglib
|
||||
|
||||
HAS_JPEGIO = True # Keep variable name for compatibility
|
||||
HAS_JPEGLIB = True
|
||||
except ImportError:
|
||||
HAS_JPEGIO = False
|
||||
HAS_JPEGLIB = False
|
||||
jpeglib = None
|
||||
|
||||
# Import custom exceptions
|
||||
@@ -170,20 +169,20 @@ QUANT_STEP = 25
|
||||
|
||||
# Magic bytes so we can identify our own images
|
||||
DCT_MAGIC = b"DCTS" # scipy DCT mode marker
|
||||
JPEGIO_MAGIC = b"JPGS" # jpegio native JPEG mode marker
|
||||
JPEGLIB_MAGIC = b"JPGS" # jpeglib native JPEG mode marker
|
||||
HEADER_SIZE = 10 # Magic (4) + version (1) + flags (1) + length (4)
|
||||
|
||||
OUTPUT_FORMAT_PNG = "png"
|
||||
OUTPUT_FORMAT_JPEG = "jpeg"
|
||||
JPEG_OUTPUT_QUALITY = 95 # High quality but not 100 (100 causes issues, see below)
|
||||
|
||||
# For jpegio mode: we only embed in coefficients with magnitude >= 2
|
||||
# For jpeglib mode: we only embed in coefficients with magnitude >= 2
|
||||
# Coefficients of 0 or 1 are usually quantized noise - unreliable
|
||||
JPEGIO_MIN_COEF_MAGNITUDE = 2
|
||||
JPEGLIB_MIN_COEF_MAGNITUDE = 2
|
||||
|
||||
# We embed in the Y (luminance) channel only - it has the most capacity
|
||||
# Cb/Cr are often subsampled 4:2:0 anyway
|
||||
JPEGIO_EMBED_CHANNEL = 0
|
||||
JPEGLIB_EMBED_CHANNEL = 0
|
||||
|
||||
# Header flags
|
||||
FLAG_COLOR_MODE = 0x01 # Set if we preserved color (YCbCr mode)
|
||||
@@ -204,10 +203,10 @@ RS_LENGTH_PREFIX_SIZE = RS_LENGTH_HEADER_SIZE * RS_LENGTH_COPIES # 24 bytes tot
|
||||
MAX_CHUNK_HEIGHT = 512 # Process in strips to keep memory sane
|
||||
|
||||
# Fun bug: JPEGs saved with quality=100 have quantization tables full of 1s
|
||||
# This makes the DCT coefficients HUGE and jpegio crashes spectacularly
|
||||
# This makes the DCT coefficients HUGE and jpeglib crashes spectacularly
|
||||
# Solution: detect and re-save at quality 95 first
|
||||
JPEGIO_NORMALIZE_QUALITY = 95
|
||||
JPEGIO_MAX_QUANT_VALUE_THRESHOLD = 1 # All 1s in quant table = bad news
|
||||
JPEGLIB_NORMALIZE_QUALITY = 95
|
||||
JPEGLIB_MAX_QUANT_VALUE_THRESHOLD = 1 # All 1s in quant table = bad news
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -261,8 +260,8 @@ def has_dct_support() -> bool:
|
||||
return HAS_SCIPY
|
||||
|
||||
|
||||
def has_jpegio_support() -> bool:
|
||||
return HAS_JPEGIO
|
||||
def has_jpeglib_support() -> bool:
|
||||
return HAS_JPEGLIB
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -654,11 +653,11 @@ def _parse_header(header_bits: list) -> tuple[int, int, int]:
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# JPEGIO HELPERS
|
||||
# JPEGLIB HELPERS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def _jpegio_bytes_to_file(data: bytes, suffix: str = ".jpg") -> str:
|
||||
def _jpeglib_bytes_to_file(data: bytes, suffix: str = ".jpg") -> str:
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
@@ -670,19 +669,19 @@ def _jpegio_bytes_to_file(data: bytes, suffix: str = ".jpg") -> str:
|
||||
return path
|
||||
|
||||
|
||||
def _jpegio_get_usable_positions(coef_array: np.ndarray) -> list:
|
||||
def _jpeglib_get_usable_positions(coef_array: np.ndarray) -> list:
|
||||
positions = []
|
||||
h, w = coef_array.shape
|
||||
for row in range(h):
|
||||
for col in range(w):
|
||||
if (row % BLOCK_SIZE == 0) and (col % BLOCK_SIZE == 0):
|
||||
continue
|
||||
if abs(coef_array[row, col]) >= JPEGIO_MIN_COEF_MAGNITUDE:
|
||||
if abs(coef_array[row, col]) >= JPEGLIB_MIN_COEF_MAGNITUDE:
|
||||
positions.append((row, col))
|
||||
return positions
|
||||
|
||||
|
||||
def _jpegio_generate_order(num_positions: int, seed: bytes) -> list:
|
||||
def _jpeglib_generate_order(num_positions: int, seed: bytes) -> list:
|
||||
hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest()
|
||||
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], "big"))
|
||||
order = list(range(num_positions))
|
||||
@@ -690,15 +689,15 @@ def _jpegio_generate_order(num_positions: int, seed: bytes) -> list:
|
||||
return order
|
||||
|
||||
|
||||
def _jpegio_create_header(data_length: int, flags: int = 0) -> bytes:
|
||||
return struct.pack(">4sBBI", JPEGIO_MAGIC, 1, flags, data_length)
|
||||
def _jpeglib_create_header(data_length: int, flags: int = 0) -> bytes:
|
||||
return struct.pack(">4sBBI", JPEGLIB_MAGIC, 1, flags, data_length)
|
||||
|
||||
|
||||
def _jpegio_parse_header(header_bytes: bytes) -> tuple[int, int, int]:
|
||||
def _jpeglib_parse_header(header_bytes: bytes) -> tuple[int, int, int]:
|
||||
if len(header_bytes) < HEADER_SIZE:
|
||||
raise ValueError("Insufficient header data")
|
||||
magic, version, flags, length = struct.unpack(">4sBBI", header_bytes[:HEADER_SIZE])
|
||||
if magic != JPEGIO_MAGIC:
|
||||
if magic != JPEGLIB_MAGIC:
|
||||
raise InvalidMagicBytesError("Not a Stegasoo JPEG or wrong mode")
|
||||
return version, flags, length
|
||||
|
||||
@@ -782,7 +781,7 @@ def estimate_capacity_comparison(image_data: bytes) -> dict:
|
||||
"available": HAS_SCIPY,
|
||||
},
|
||||
"jpeg_native": {
|
||||
"available": HAS_JPEGIO,
|
||||
"available": HAS_JPEGLIB,
|
||||
"note": "Uses jpeglib for proper JPEG coefficient embedding",
|
||||
},
|
||||
}
|
||||
@@ -795,24 +794,54 @@ def embed_in_dct(
|
||||
output_format: str = OUTPUT_FORMAT_PNG,
|
||||
color_mode: str = "color",
|
||||
progress_file: str | None = None,
|
||||
quant_step: int | None = None,
|
||||
jpeg_quality: int | None = None,
|
||||
max_dimension: int | None = None,
|
||||
) -> tuple[bytes, DCTEmbedStats]:
|
||||
"""Embed data using DCT coefficient modification."""
|
||||
"""Embed data using DCT coefficient modification.
|
||||
|
||||
Args:
|
||||
data: Payload bytes to embed.
|
||||
carrier_image: Carrier image bytes.
|
||||
seed: Key for block selection.
|
||||
output_format: 'png' or 'jpeg'.
|
||||
color_mode: 'color' or 'grayscale'.
|
||||
progress_file: Optional progress file.
|
||||
quant_step: Override QIM quantization step (default: QUANT_STEP).
|
||||
Higher = more robust to recompression, more visible.
|
||||
jpeg_quality: Override JPEG output quality (default: JPEG_OUTPUT_QUALITY).
|
||||
max_dimension: Resize carrier if larger than this.
|
||||
"""
|
||||
if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG):
|
||||
raise ValueError(f"Invalid output format: {output_format}")
|
||||
|
||||
if color_mode not in ("color", "grayscale"):
|
||||
color_mode = "color"
|
||||
|
||||
qs = quant_step if quant_step is not None else QUANT_STEP
|
||||
|
||||
# Apply EXIF orientation to carrier image before embedding
|
||||
# This ensures portrait photos are embedded in their correct visual orientation
|
||||
carrier_image = _apply_exif_orientation(carrier_image)
|
||||
|
||||
if output_format == OUTPUT_FORMAT_JPEG and HAS_JPEGIO:
|
||||
return _embed_jpegio(data, carrier_image, seed, color_mode, progress_file)
|
||||
# Resize if max_dimension specified (for platform presets)
|
||||
if max_dimension is not None:
|
||||
img_check = Image.open(io.BytesIO(carrier_image))
|
||||
w, h = img_check.size
|
||||
if max(w, h) > max_dimension:
|
||||
scale = max_dimension / max(w, h)
|
||||
new_size = (int(w * scale), int(h * scale))
|
||||
img_check = img_check.resize(new_size, Image.LANCZOS)
|
||||
buf = io.BytesIO()
|
||||
img_check.save(buf, format="PNG")
|
||||
carrier_image = buf.getvalue()
|
||||
img_check.close()
|
||||
|
||||
if output_format == OUTPUT_FORMAT_JPEG and HAS_JPEGLIB:
|
||||
return _embed_jpeglib(data, carrier_image, seed, color_mode, progress_file)
|
||||
|
||||
_check_scipy()
|
||||
return _embed_scipy_dct_safe(
|
||||
data, carrier_image, seed, output_format, color_mode, progress_file
|
||||
data, carrier_image, seed, output_format, color_mode, progress_file, quant_step=qs
|
||||
)
|
||||
|
||||
|
||||
@@ -823,6 +852,7 @@ def _embed_scipy_dct_safe(
|
||||
output_format: str,
|
||||
color_mode: str = "color",
|
||||
progress_file: str | None = None,
|
||||
quant_step: int = QUANT_STEP,
|
||||
) -> tuple[bytes, DCTEmbedStats]:
|
||||
"""
|
||||
Embed using scipy DCT with safe memory handling.
|
||||
@@ -885,7 +915,9 @@ def _embed_scipy_dct_safe(
|
||||
gc.collect()
|
||||
|
||||
# Embed in Y channel
|
||||
Y_embedded = _embed_in_channel_safe(Y_padded, bits, block_order, blocks_x, progress_file)
|
||||
Y_embedded = _embed_in_channel_safe(
|
||||
Y_padded, bits, block_order, blocks_x, progress_file, quant_step=quant_step
|
||||
)
|
||||
del Y_padded
|
||||
gc.collect()
|
||||
|
||||
@@ -909,7 +941,9 @@ def _embed_scipy_dct_safe(
|
||||
del image
|
||||
gc.collect()
|
||||
|
||||
embedded = _embed_in_channel_safe(padded, bits, block_order, blocks_x, progress_file)
|
||||
embedded = _embed_in_channel_safe(
|
||||
padded, bits, block_order, blocks_x, progress_file, quant_step=quant_step
|
||||
)
|
||||
del padded
|
||||
gc.collect()
|
||||
|
||||
@@ -943,6 +977,7 @@ def _embed_in_channel_safe(
|
||||
block_order: list,
|
||||
blocks_x: int,
|
||||
progress_file: str | None = None,
|
||||
quant_step: int = QUANT_STEP,
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Embed bits in channel using vectorized DCT operations.
|
||||
@@ -1005,17 +1040,17 @@ def _embed_in_channel_safe(
|
||||
coeffs = dct_blocks[i, embed_rows, embed_cols]
|
||||
bit_array = np.array(block_bits)
|
||||
# QIM embedding: round to grid, adjust for bit
|
||||
quantized = np.round(coeffs / QUANT_STEP).astype(int)
|
||||
quantized = np.round(coeffs / quant_step).astype(int)
|
||||
# If quantized % 2 != bit, nudge coefficient
|
||||
needs_adjust = (quantized % 2) != bit_array
|
||||
# Determine direction to nudge
|
||||
dct_blocks[i, embed_rows[needs_adjust], embed_cols[needs_adjust]] = (
|
||||
(quantized[needs_adjust] + (1 - 2 * (quantized[needs_adjust] % 2 == 1)))
|
||||
* QUANT_STEP
|
||||
* quant_step
|
||||
).astype(np.float64)
|
||||
# For bits that already match, just quantize
|
||||
dct_blocks[i, embed_rows[~needs_adjust], embed_cols[~needs_adjust]] = (
|
||||
quantized[~needs_adjust] * QUANT_STEP
|
||||
quantized[~needs_adjust] * quant_step
|
||||
).astype(np.float64)
|
||||
else:
|
||||
# Partial block - process remaining bits individually
|
||||
@@ -1052,12 +1087,12 @@ def _embed_in_channel_safe(
|
||||
return result
|
||||
|
||||
|
||||
def _normalize_jpeg_for_jpegio(image_data: bytes) -> bytes:
|
||||
def _normalize_jpeg_for_jpeglib(image_data: bytes) -> bytes:
|
||||
"""
|
||||
Normalize a JPEG image to ensure jpegio can process it safely.
|
||||
Normalize a JPEG image to ensure jpeglib can process it safely.
|
||||
|
||||
JPEGs saved with quality=100 have quantization tables with all values = 1,
|
||||
which causes jpegio to crash due to huge coefficient magnitudes.
|
||||
which causes jpeglib to crash due to huge coefficient magnitudes.
|
||||
This function detects such images and re-saves them at a safe quality level.
|
||||
|
||||
Args:
|
||||
@@ -1078,7 +1113,7 @@ def _normalize_jpeg_for_jpegio(image_data: bytes) -> bytes:
|
||||
if hasattr(img, "quantization") and img.quantization:
|
||||
for table_id, table in img.quantization.items():
|
||||
# If all values in any table are <= threshold, normalize
|
||||
if max(table) <= JPEGIO_MAX_QUANT_VALUE_THRESHOLD:
|
||||
if max(table) <= JPEGLIB_MAX_QUANT_VALUE_THRESHOLD:
|
||||
needs_normalization = True
|
||||
break
|
||||
|
||||
@@ -1091,25 +1126,25 @@ def _normalize_jpeg_for_jpegio(image_data: bytes) -> bytes:
|
||||
img = img.convert("RGB")
|
||||
|
||||
buffer = io.BytesIO()
|
||||
img.save(buffer, format="JPEG", quality=JPEGIO_NORMALIZE_QUALITY, subsampling=0)
|
||||
img.save(buffer, format="JPEG", quality=JPEGLIB_NORMALIZE_QUALITY, subsampling=0)
|
||||
img.close()
|
||||
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
def _embed_jpegio(
|
||||
def _embed_jpeglib(
|
||||
data: bytes,
|
||||
carrier_image: bytes,
|
||||
seed: bytes,
|
||||
color_mode: str = "color",
|
||||
progress_file: str | None = None,
|
||||
) -> tuple[bytes, DCTEmbedStats]:
|
||||
"""Embed using jpegio for proper JPEG coefficient modification."""
|
||||
"""Embed using jpeglib for proper JPEG coefficient modification."""
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
# Normalize JPEG to avoid crashes with quality=100 images
|
||||
carrier_image = _normalize_jpeg_for_jpegio(carrier_image)
|
||||
carrier_image = _normalize_jpeg_for_jpeglib(carrier_image)
|
||||
|
||||
img = Image.open(io.BytesIO(carrier_image))
|
||||
width, height = img.size
|
||||
@@ -1122,20 +1157,20 @@ def _embed_jpegio(
|
||||
carrier_image = buffer.getvalue()
|
||||
img.close()
|
||||
|
||||
input_path = _jpegio_bytes_to_file(carrier_image, suffix=".jpg")
|
||||
input_path = _jpeglib_bytes_to_file(carrier_image, suffix=".jpg")
|
||||
output_path = tempfile.mktemp(suffix=".jpg")
|
||||
|
||||
flags = FLAG_COLOR_MODE if color_mode == "color" else 0
|
||||
|
||||
try:
|
||||
jpeg = jpeglib.to_jpegio(jpeglib.read_dct(input_path))
|
||||
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
|
||||
coef_array = jpeg.coef_arrays[JPEGLIB_EMBED_CHANNEL]
|
||||
|
||||
all_positions = _jpegio_get_usable_positions(coef_array)
|
||||
order = _jpegio_generate_order(len(all_positions), seed)
|
||||
all_positions = _jpeglib_get_usable_positions(coef_array)
|
||||
order = _jpeglib_generate_order(len(all_positions), seed)
|
||||
|
||||
# Build raw payload (header + data)
|
||||
header = _jpegio_create_header(len(data), flags)
|
||||
header = _jpeglib_create_header(len(data), flags)
|
||||
raw_payload = header + data
|
||||
|
||||
# Apply Reed-Solomon error correction to entire payload if available
|
||||
@@ -1402,6 +1437,7 @@ def extract_from_dct(
|
||||
stego_image: bytes,
|
||||
seed: bytes,
|
||||
progress_file: str | None = None,
|
||||
quant_step: int | None = None,
|
||||
) -> bytes:
|
||||
"""
|
||||
Extract data from DCT stego image.
|
||||
@@ -1412,6 +1448,7 @@ def extract_from_dct(
|
||||
|
||||
Uses quick header validation to skip obviously invalid rotations.
|
||||
"""
|
||||
qs = quant_step if quant_step is not None else QUANT_STEP
|
||||
rotations_to_try = [0, 90, 180, 270]
|
||||
last_error = None
|
||||
valid_rotations = []
|
||||
@@ -1429,7 +1466,7 @@ def extract_from_dct(
|
||||
# If no rotations pass quick check, try all anyway (fallback)
|
||||
if not valid_rotations:
|
||||
# Must try all rotations - quick validation might have failed due to
|
||||
# scipy vs jpegio differences or other edge cases
|
||||
# scipy vs jpeglib differences or other edge cases
|
||||
for rotation in rotations_to_try:
|
||||
if rotation == 0:
|
||||
valid_rotations.append((0, stego_image))
|
||||
@@ -1443,9 +1480,9 @@ def extract_from_dct(
|
||||
fmt = img.format
|
||||
img.close()
|
||||
|
||||
if fmt == "JPEG" and HAS_JPEGIO:
|
||||
if fmt == "JPEG" and HAS_JPEGLIB:
|
||||
try:
|
||||
result = _extract_jpegio(image_to_decode, seed, progress_file)
|
||||
result = _extract_jpeglib(image_to_decode, seed, progress_file)
|
||||
if rotation != 0:
|
||||
try:
|
||||
from . import debug
|
||||
@@ -1459,7 +1496,7 @@ def extract_from_dct(
|
||||
continue
|
||||
|
||||
_check_scipy()
|
||||
result = _extract_scipy_dct_safe(image_to_decode, seed, progress_file)
|
||||
result = _extract_scipy_dct_safe(image_to_decode, seed, progress_file, quant_step=qs)
|
||||
if rotation != 0:
|
||||
try:
|
||||
from . import debug
|
||||
@@ -1481,6 +1518,7 @@ def _extract_scipy_dct_safe(
|
||||
stego_image: bytes,
|
||||
seed: bytes,
|
||||
progress_file: str | None = None,
|
||||
quant_step: int = QUANT_STEP,
|
||||
) -> bytes:
|
||||
"""Extract using safe DCT operations with vectorized processing."""
|
||||
# Progress starts at 25% (decode.py writes 20% for Argon2, 25% before extraction)
|
||||
@@ -1542,7 +1580,7 @@ def _extract_scipy_dct_safe(
|
||||
coeffs = dct_blocks[:, embed_rows, embed_cols]
|
||||
|
||||
# Quantize and extract bits (vectorized)
|
||||
quantized = np.round(coeffs / QUANT_STEP).astype(int)
|
||||
quantized = np.round(coeffs / quant_step).astype(int)
|
||||
bits = (quantized % 2).flatten().tolist()
|
||||
all_bits.extend(bits)
|
||||
|
||||
@@ -1660,28 +1698,28 @@ def _extract_scipy_dct_safe(
|
||||
return data
|
||||
|
||||
|
||||
def _extract_jpegio(
|
||||
def _extract_jpeglib(
|
||||
stego_image: bytes,
|
||||
seed: bytes,
|
||||
progress_file: str | None = None,
|
||||
) -> bytes:
|
||||
"""Extract using jpegio for JPEG images."""
|
||||
"""Extract using jpeglib for JPEG images."""
|
||||
import os
|
||||
|
||||
# Progress starts at 25% (decode.py writes 20% for Argon2, 25% before extraction)
|
||||
|
||||
# Normalize JPEG to avoid crashes with quality=100 images
|
||||
# (shouldn't happen with stego images, but be defensive)
|
||||
stego_image = _normalize_jpeg_for_jpegio(stego_image)
|
||||
stego_image = _normalize_jpeg_for_jpeglib(stego_image)
|
||||
|
||||
temp_path = _jpegio_bytes_to_file(stego_image, suffix=".jpg")
|
||||
temp_path = _jpeglib_bytes_to_file(stego_image, suffix=".jpg")
|
||||
|
||||
try:
|
||||
jpeg = jpeglib.to_jpegio(jpeglib.read_dct(temp_path))
|
||||
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
|
||||
coef_array = jpeg.coef_arrays[JPEGLIB_EMBED_CHANNEL]
|
||||
|
||||
all_positions = _jpegio_get_usable_positions(coef_array)
|
||||
order = _jpegio_generate_order(len(all_positions), seed)
|
||||
all_positions = _jpeglib_get_usable_positions(coef_array)
|
||||
order = _jpeglib_generate_order(len(all_positions), seed)
|
||||
|
||||
_write_progress(progress_file, 30, 100, "extracting")
|
||||
|
||||
@@ -1751,7 +1789,7 @@ def _extract_jpegio(
|
||||
_write_progress(progress_file, 75, 100, "decoding")
|
||||
raw_payload = _rs_decode(rs_encoded)
|
||||
_write_progress(progress_file, 95, 100, "decoding")
|
||||
_, flags, data_length = _jpegio_parse_header(raw_payload[:HEADER_SIZE])
|
||||
_, flags, data_length = _jpeglib_parse_header(raw_payload[:HEADER_SIZE])
|
||||
data = raw_payload[HEADER_SIZE : HEADER_SIZE + data_length]
|
||||
_write_progress(progress_file, 100, 100, "complete")
|
||||
return data
|
||||
@@ -1772,7 +1810,7 @@ def _extract_jpegio(
|
||||
]
|
||||
)
|
||||
|
||||
_, flags, data_length = _jpegio_parse_header(header_bytes)
|
||||
_, flags, data_length = _jpeglib_parse_header(header_bytes)
|
||||
total_bits_needed = (HEADER_SIZE + data_length) * 8
|
||||
|
||||
all_bits = []
|
||||
|
||||
@@ -54,6 +54,7 @@ def decode(
|
||||
embed_mode: str = EMBED_MODE_AUTO,
|
||||
channel_key: str | bool | None = None,
|
||||
progress_file: str | None = None,
|
||||
platform: str | None = None,
|
||||
) -> DecodeResult:
|
||||
"""
|
||||
Decode a message or file from a stego image.
|
||||
@@ -124,12 +125,21 @@ def decode(
|
||||
# Progress: key derivation done, starting extraction
|
||||
_write_progress(progress_file, 25, 100, "extracting")
|
||||
|
||||
# Resolve platform preset for DCT extraction
|
||||
extract_kwargs = {}
|
||||
if platform:
|
||||
from .platform_presets import get_preset
|
||||
|
||||
preset = get_preset(platform)
|
||||
extract_kwargs["quant_step"] = preset.quant_step
|
||||
|
||||
# Extract encrypted data
|
||||
encrypted = extract_from_image(
|
||||
stego_image,
|
||||
pixel_key,
|
||||
embed_mode=embed_mode,
|
||||
progress_file=progress_file,
|
||||
**extract_kwargs,
|
||||
)
|
||||
|
||||
if not encrypted:
|
||||
|
||||
@@ -51,6 +51,7 @@ def encode(
|
||||
dct_color_mode: str = "color",
|
||||
channel_key: str | bool | None = None,
|
||||
progress_file: str | None = None,
|
||||
platform: str | None = None,
|
||||
) -> EncodeResult:
|
||||
"""
|
||||
Encode a message or file into an image.
|
||||
@@ -123,6 +124,18 @@ def encode(
|
||||
# Derive pixel/coefficient selection key (with channel key)
|
||||
pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key)
|
||||
|
||||
# Resolve platform preset for DCT encoding
|
||||
platform_kwargs = {}
|
||||
if platform:
|
||||
from .platform_presets import get_preset
|
||||
|
||||
preset = get_preset(platform)
|
||||
platform_kwargs = {
|
||||
"quant_step": preset.quant_step,
|
||||
"max_dimension": preset.max_dimension,
|
||||
"jpeg_quality": preset.jpeg_quality,
|
||||
}
|
||||
|
||||
# Embed in image
|
||||
stego_data, stats, extension = embed_in_image(
|
||||
encrypted,
|
||||
@@ -133,6 +146,7 @@ def encode(
|
||||
dct_output_format=dct_output_format,
|
||||
dct_color_mode=dct_color_mode,
|
||||
progress_file=progress_file,
|
||||
**platform_kwargs,
|
||||
)
|
||||
|
||||
# Generate filename
|
||||
|
||||
169
src/stegasoo/platform_presets.py
Normal file
169
src/stegasoo/platform_presets.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""
|
||||
Platform-Calibrated DCT Presets (v4.4.0)
|
||||
|
||||
Pre-tuned DCT embedding parameters for social media platforms. Each platform
|
||||
recompresses uploaded images differently — these presets bake in the known
|
||||
parameters so payloads survive the round-trip.
|
||||
|
||||
Usage::
|
||||
|
||||
from stegasoo.platform_presets import get_preset, PLATFORMS
|
||||
|
||||
preset = get_preset("telegram")
|
||||
# Use preset.quant_step, preset.jpeg_quality, etc. in DCT encode
|
||||
|
||||
Preset parameters were derived from empirical testing. Platform compression
|
||||
behavior can change without notice — use ``pre_verify_survival()`` to confirm
|
||||
payloads survive before relying on a preset.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PlatformPreset:
|
||||
"""Tuned DCT parameters for a specific platform."""
|
||||
|
||||
name: str
|
||||
jpeg_quality: int # Platform's recompression quality
|
||||
max_dimension: int # Max width/height before platform resizes
|
||||
quant_step: int # QIM quantization step (higher = more robust)
|
||||
embed_start: int # Start index into EMBED_POSITIONS (skip low-freq)
|
||||
embed_end: int # End index into EMBED_POSITIONS (skip high-freq)
|
||||
recompress_quality: int # Quality to simulate platform recompression for pre-verify
|
||||
notes: str = ""
|
||||
|
||||
|
||||
# Platform presets — derived from empirical testing of each platform's
|
||||
# image processing pipeline. These WILL change as platforms update.
|
||||
# Last verified: 2026-03-25
|
||||
|
||||
PRESETS: dict[str, PlatformPreset] = {
|
||||
"telegram": PlatformPreset(
|
||||
name="Telegram",
|
||||
jpeg_quality=82,
|
||||
max_dimension=2560,
|
||||
quant_step=35,
|
||||
embed_start=4,
|
||||
embed_end=16,
|
||||
recompress_quality=80,
|
||||
notes="~81KB max embeddable. Moderate recompression.",
|
||||
),
|
||||
"discord": PlatformPreset(
|
||||
name="Discord",
|
||||
jpeg_quality=85,
|
||||
max_dimension=4096,
|
||||
quant_step=30,
|
||||
embed_start=4,
|
||||
embed_end=18,
|
||||
recompress_quality=83,
|
||||
notes="Varies with Nitro. Non-Nitro users get more aggressive compression.",
|
||||
),
|
||||
"signal": PlatformPreset(
|
||||
name="Signal",
|
||||
jpeg_quality=80,
|
||||
max_dimension=2048,
|
||||
quant_step=40,
|
||||
embed_start=5,
|
||||
embed_end=15,
|
||||
recompress_quality=78,
|
||||
notes="Aggressive recompression. Use smaller payloads for reliability.",
|
||||
),
|
||||
"whatsapp": PlatformPreset(
|
||||
name="WhatsApp",
|
||||
jpeg_quality=70,
|
||||
max_dimension=1600,
|
||||
quant_step=50,
|
||||
embed_start=5,
|
||||
embed_end=14,
|
||||
recompress_quality=68,
|
||||
notes="Most lossy. Capacity is significantly reduced.",
|
||||
),
|
||||
}
|
||||
|
||||
PLATFORMS = sorted(PRESETS.keys())
|
||||
|
||||
|
||||
def get_preset(platform: str) -> PlatformPreset:
|
||||
"""Get the preset for a platform.
|
||||
|
||||
Args:
|
||||
platform: Platform name (telegram, discord, signal, whatsapp).
|
||||
|
||||
Returns:
|
||||
PlatformPreset with tuned DCT parameters.
|
||||
|
||||
Raises:
|
||||
ValueError: If platform is not recognized.
|
||||
"""
|
||||
key = platform.lower()
|
||||
if key not in PRESETS:
|
||||
available = ", ".join(PLATFORMS)
|
||||
raise ValueError(f"Unknown platform '{platform}'. Available: {available}")
|
||||
return PRESETS[key]
|
||||
|
||||
|
||||
def get_embed_positions(preset: PlatformPreset) -> list[tuple[int, int]]:
|
||||
"""Get the embed positions for a preset.
|
||||
|
||||
Args:
|
||||
preset: Platform preset.
|
||||
|
||||
Returns:
|
||||
List of (row, col) DCT coefficient positions.
|
||||
"""
|
||||
from .dct_steganography import EMBED_POSITIONS
|
||||
|
||||
return EMBED_POSITIONS[preset.embed_start : preset.embed_end]
|
||||
|
||||
|
||||
def pre_verify_survival(
|
||||
stego_image: bytes,
|
||||
seed: bytes,
|
||||
preset: PlatformPreset,
|
||||
) -> bool:
|
||||
"""Verify that a payload survives simulated platform recompression.
|
||||
|
||||
Encodes → recompresses at platform quality → attempts extraction.
|
||||
If extraction succeeds, the payload should survive the real platform.
|
||||
|
||||
Args:
|
||||
stego_image: The stego JPEG image bytes (already encoded).
|
||||
seed: The same seed used for encoding.
|
||||
preset: Platform preset to simulate.
|
||||
|
||||
Returns:
|
||||
True if payload survived simulated recompression.
|
||||
"""
|
||||
import io
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from .dct_steganography import extract_from_dct
|
||||
|
||||
# Simulate platform recompression
|
||||
img = Image.open(io.BytesIO(stego_image))
|
||||
|
||||
# Resize if over max dimension
|
||||
w, h = img.size
|
||||
if max(w, h) > preset.max_dimension:
|
||||
scale = preset.max_dimension / max(w, h)
|
||||
new_size = (int(w * scale), int(h * scale))
|
||||
img = img.resize(new_size, Image.LANCZOS)
|
||||
|
||||
# Recompress at platform quality
|
||||
buf = io.BytesIO()
|
||||
if img.mode != "RGB":
|
||||
img = img.convert("RGB")
|
||||
img.save(buf, format="JPEG", quality=preset.recompress_quality)
|
||||
img.close()
|
||||
recompressed = buf.getvalue()
|
||||
|
||||
# Try extraction
|
||||
try:
|
||||
result = extract_from_dct(recompressed, seed)
|
||||
return result is not None and len(result) > 0
|
||||
except Exception:
|
||||
return False
|
||||
281
src/stegasoo/steganalysis.py
Normal file
281
src/stegasoo/steganalysis.py
Normal file
@@ -0,0 +1,281 @@
|
||||
"""
|
||||
Steganalysis Self-Check Module (v4.4.0)
|
||||
|
||||
Statistical analysis to estimate detectability risk of stego images.
|
||||
Runs chi-square and RS (Regular-Singular) analysis on pixel data
|
||||
to assess how visible the embedding is to an attacker.
|
||||
|
||||
Currently LSB-only. DCT steganalysis (calibration attack) deferred.
|
||||
|
||||
Usage::
|
||||
|
||||
from stegasoo.steganalysis import check_image
|
||||
|
||||
result = check_image(image_data)
|
||||
print(result["risk"]) # "low", "medium", or "high"
|
||||
print(result["chi_square"]) # per-channel chi-square p-values
|
||||
print(result["rs"]) # per-channel RS embedding estimates
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from .constants import (
|
||||
STEGANALYSIS_CHI_SUSPICIOUS_THRESHOLD,
|
||||
STEGANALYSIS_RS_HIGH_THRESHOLD,
|
||||
STEGANALYSIS_RS_MEDIUM_THRESHOLD,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SteganalysisResult:
|
||||
"""Result of steganalysis on an image."""
|
||||
|
||||
risk: str # "low", "medium", or "high"
|
||||
chi_square: dict = field(default_factory=dict) # per-channel p-values
|
||||
rs: dict = field(default_factory=dict) # per-channel embedding estimates
|
||||
width: int = 0
|
||||
height: int = 0
|
||||
channels: int = 0
|
||||
mode: str = "lsb"
|
||||
|
||||
|
||||
def chi_square_analysis(channel_data: np.ndarray) -> float:
|
||||
"""Chi-square test on LSB distribution of a single channel.
|
||||
|
||||
Groups pixel values into pairs (2i, 2i+1) — so-called "pairs of values"
|
||||
(PoVs). In a clean image, each pair has a natural frequency ratio.
|
||||
LSB embedding with random data forces each pair toward equal frequency.
|
||||
|
||||
The test measures H0: "pairs are equalized" (consistent with embedding).
|
||||
|
||||
Args:
|
||||
channel_data: Flattened 1-D array of pixel values (uint8).
|
||||
|
||||
Returns:
|
||||
p-value from chi-square test.
|
||||
HIGH p-value (close to 1.0) → pairs are equalized → suspicious.
|
||||
LOW p-value (close to 0.0) → pairs are not equalized → less suspicious.
|
||||
"""
|
||||
from scipy.stats import chi2
|
||||
|
||||
# Count occurrences of each value 0-255
|
||||
histogram = np.bincount(channel_data.ravel(), minlength=256)
|
||||
|
||||
# Group into 128 pairs: (0,1), (2,3), ..., (254,255)
|
||||
chi_sq = 0.0
|
||||
degrees_of_freedom = 0
|
||||
|
||||
for i in range(0, 256, 2):
|
||||
observed_even = histogram[i]
|
||||
observed_odd = histogram[i + 1]
|
||||
total = observed_even + observed_odd
|
||||
|
||||
if total == 0:
|
||||
continue
|
||||
|
||||
expected = total / 2.0
|
||||
chi_sq += (observed_even - expected) ** 2 / expected
|
||||
chi_sq += (observed_odd - expected) ** 2 / expected
|
||||
degrees_of_freedom += 1
|
||||
|
||||
if degrees_of_freedom == 0:
|
||||
return 1.0 # No data to analyze
|
||||
|
||||
# p-value: probability of observing this chi-square value by chance
|
||||
# Low p-value = LSBs are suspiciously uniform = likely embedded
|
||||
p_value = 1.0 - chi2.cdf(chi_sq, degrees_of_freedom)
|
||||
return float(p_value)
|
||||
|
||||
|
||||
def rs_analysis(channel_data: np.ndarray, block_size: int = 8) -> float:
|
||||
"""Regular-Singular groups analysis on a single channel.
|
||||
|
||||
Divides the image channel into groups of `block_size` pixels and measures
|
||||
the "smoothness" (variation) of each group. Applying a flipping function
|
||||
F1 (flip LSB) and F-1 (flip LSB of value-1) produces Regular (smoother)
|
||||
and Singular (rougher) groups.
|
||||
|
||||
In a clean image: R_m ≈ R_{-m} and S_m ≈ S_{-m}.
|
||||
LSB embedding causes R_m and S_{-m} to converge while S_m and R_{-m}
|
||||
diverge, allowing estimation of the embedding rate.
|
||||
|
||||
Args:
|
||||
channel_data: Flattened 1-D array of pixel values (uint8).
|
||||
block_size: Number of pixels per group (default 8).
|
||||
|
||||
Returns:
|
||||
Estimated embedding rate (0.0 = clean, 1.0 = fully embedded).
|
||||
Values > 0.5 strongly indicate LSB embedding.
|
||||
"""
|
||||
data = channel_data.ravel().astype(np.int16)
|
||||
n = len(data)
|
||||
# Trim to multiple of block_size
|
||||
n_blocks = n // block_size
|
||||
if n_blocks < 10:
|
||||
return 0.0 # Not enough data
|
||||
|
||||
data = data[: n_blocks * block_size].reshape(n_blocks, block_size)
|
||||
|
||||
def variation(block: np.ndarray) -> float:
|
||||
"""Sum of absolute differences between adjacent pixels."""
|
||||
return float(np.sum(np.abs(np.diff(block))))
|
||||
|
||||
def flip_positive(block: np.ndarray) -> np.ndarray:
|
||||
"""F1: flip LSB (0↔1, 2↔3, 4↔5, ...)."""
|
||||
return block ^ 1
|
||||
|
||||
def flip_negative(block: np.ndarray) -> np.ndarray:
|
||||
"""F-1: flip LSB of (value - 1), i.e. -1↔0, 1↔2, 3↔4, ..."""
|
||||
result = block.copy()
|
||||
even_mask = (block % 2) == 0
|
||||
result[even_mask] -= 1
|
||||
result[~even_mask] += 1
|
||||
return result
|
||||
|
||||
r_m = s_m = r_neg = s_neg = 0
|
||||
|
||||
for i in range(n_blocks):
|
||||
block = data[i]
|
||||
v_orig = variation(block)
|
||||
|
||||
v_f1 = variation(flip_positive(block))
|
||||
if v_f1 > v_orig:
|
||||
r_m += 1
|
||||
elif v_f1 < v_orig:
|
||||
s_m += 1
|
||||
|
||||
v_fn1 = variation(flip_negative(block))
|
||||
if v_fn1 > v_orig:
|
||||
r_neg += 1
|
||||
elif v_fn1 < v_orig:
|
||||
s_neg += 1
|
||||
|
||||
# Estimate embedding rate using the RS quadratic formula
|
||||
# d0 = R_m - S_m, d1 = R_{-m} - S_{-m}
|
||||
# The embedding rate p satisfies: d(p/2) = d0, d(1 - p/2) = d1
|
||||
# Simplified estimator: p ≈ (R_m - S_m) / (R_{-m} - S_{-m}) divergence
|
||||
d0 = r_m - s_m
|
||||
d1 = r_neg - s_neg
|
||||
|
||||
if n_blocks == 0:
|
||||
return 0.0
|
||||
|
||||
# Use the simplified dual-statistic estimator
|
||||
# In clean images: d0 ≈ d1 (both positive)
|
||||
# In embedded images: d0 → 0 while d1 stays positive
|
||||
if d1 == 0:
|
||||
# Can't estimate — likely very embedded or degenerate
|
||||
return 0.5 if d0 == 0 else 0.0
|
||||
|
||||
# Ratio-based estimate: how much has d0 dropped relative to d1
|
||||
ratio = d0 / d1
|
||||
if ratio >= 1.0:
|
||||
return 0.0 # d0 ≥ d1 means no evidence of embedding
|
||||
if ratio <= 0.0:
|
||||
return 1.0 # d0 collapsed or inverted
|
||||
|
||||
# Linear interpolation: ratio=1 → 0% embedded, ratio=0 → 100% embedded
|
||||
estimate = 1.0 - ratio
|
||||
return float(np.clip(estimate, 0.0, 1.0))
|
||||
|
||||
|
||||
def assess_risk(chi_p_values: dict[str, float], rs_estimates: dict[str, float]) -> str:
|
||||
"""Map analysis results to a risk level.
|
||||
|
||||
RS analysis is the primary metric (reliable for both sequential and
|
||||
random-order embedding). Chi-square is supplementary — high p-values
|
||||
indicate equalized PoV pairs, which is suspicious for random LSB embedding.
|
||||
|
||||
Args:
|
||||
chi_p_values: Per-channel chi-square p-values (high = suspicious).
|
||||
rs_estimates: Per-channel RS embedding rate estimates (high = suspicious).
|
||||
|
||||
Returns:
|
||||
"low", "medium", or "high" detectability risk.
|
||||
"""
|
||||
if not chi_p_values and not rs_estimates:
|
||||
return "low"
|
||||
|
||||
# RS is the primary indicator: any channel with high embedding estimate
|
||||
max_rs = max(rs_estimates.values()) if rs_estimates else 0.0
|
||||
|
||||
# Chi-square: high p-value means pairs are equalized (suspicious)
|
||||
max_chi_p = max(chi_p_values.values()) if chi_p_values else 0.0
|
||||
chi_suspicious = max_chi_p > STEGANALYSIS_CHI_SUSPICIOUS_THRESHOLD
|
||||
|
||||
# High risk: RS strongly indicates embedding
|
||||
if max_rs > STEGANALYSIS_RS_HIGH_THRESHOLD:
|
||||
return "high"
|
||||
|
||||
# Medium risk: moderate RS signal, or RS + chi-square both flagging
|
||||
if max_rs > STEGANALYSIS_RS_MEDIUM_THRESHOLD:
|
||||
return "medium"
|
||||
if chi_suspicious and max_rs > 0.05:
|
||||
return "medium"
|
||||
|
||||
return "low"
|
||||
|
||||
|
||||
def check_image(image_data: bytes, mode: str = "lsb") -> dict:
|
||||
"""Run steganalysis on an image and return detectability assessment.
|
||||
|
||||
Args:
|
||||
image_data: Raw image bytes (PNG, BMP, etc.).
|
||||
mode: Analysis mode — currently only "lsb" is supported.
|
||||
|
||||
Returns:
|
||||
Dict with keys: risk, chi_square, rs, width, height, channels, mode.
|
||||
"""
|
||||
if mode not in ("lsb", "auto"):
|
||||
raise ValueError(f"Unsupported steganalysis mode: {mode}. Use 'lsb' or 'auto'.")
|
||||
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
if img.mode not in ("RGB", "RGBA", "L"):
|
||||
img = img.convert("RGB")
|
||||
|
||||
width, height = img.size
|
||||
pixels = np.array(img)
|
||||
img.close()
|
||||
|
||||
channel_names = ["R", "G", "B"] if pixels.ndim == 3 else ["L"]
|
||||
if pixels.ndim == 2:
|
||||
pixels = pixels[:, :, np.newaxis]
|
||||
|
||||
num_channels = min(pixels.shape[2], 3) # Skip alpha
|
||||
|
||||
chi_p_values = {}
|
||||
rs_estimates = {}
|
||||
|
||||
for i in range(num_channels):
|
||||
name = channel_names[i]
|
||||
channel = pixels[:, :, i].ravel()
|
||||
chi_p_values[name] = chi_square_analysis(channel)
|
||||
rs_estimates[name] = rs_analysis(channel)
|
||||
|
||||
risk = assess_risk(chi_p_values, rs_estimates)
|
||||
|
||||
result = SteganalysisResult(
|
||||
risk=risk,
|
||||
chi_square=chi_p_values,
|
||||
rs=rs_estimates,
|
||||
width=width,
|
||||
height=height,
|
||||
channels=num_channels,
|
||||
mode=mode,
|
||||
)
|
||||
|
||||
return {
|
||||
"risk": result.risk,
|
||||
"chi_square": result.chi_square,
|
||||
"rs": result.rs,
|
||||
"width": result.width,
|
||||
"height": result.height,
|
||||
"channels": result.channels,
|
||||
"mode": result.mode,
|
||||
}
|
||||
@@ -107,13 +107,14 @@ EXT_TO_FORMAT = {
|
||||
# - v3.1.0: 76 bytes (had date field - 10+1 bytes)
|
||||
# - v3.2.0: 65 bytes (removed date, simpler)
|
||||
# - v4.0.0: 66 bytes (added flags byte for channel key)
|
||||
# - v4.4.0: 82 bytes (added 16-byte message nonce for HKDF)
|
||||
|
||||
HEADER_OVERHEAD = 66 # What the crypto layer adds to any message
|
||||
HEADER_OVERHEAD = 82 # What the crypto layer adds to any message (v6 format)
|
||||
LENGTH_PREFIX = 4 # We prepend the payload length for LSB extraction
|
||||
ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX # Total: 70 bytes
|
||||
ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX # Total: 86 bytes
|
||||
|
||||
# That 70 bytes is your minimum image capacity requirement.
|
||||
# A tiny 100x100 image gives you ~3750 bytes capacity, minus 70 = ~3680 usable.
|
||||
# That 86 bytes is your minimum image capacity requirement.
|
||||
# A tiny 100x100 image gives you ~3750 bytes capacity, minus 86 = ~3664 usable.
|
||||
|
||||
# DCT output format options (v3.0.1)
|
||||
DCT_OUTPUT_PNG = "png"
|
||||
@@ -609,6 +610,9 @@ def embed_in_image(
|
||||
dct_output_format: str = DCT_OUTPUT_PNG,
|
||||
dct_color_mode: str = "color",
|
||||
progress_file: str | None = None,
|
||||
quant_step: int | None = None,
|
||||
jpeg_quality: int | None = None,
|
||||
max_dimension: int | None = None,
|
||||
) -> tuple[bytes, Union[EmbedStats, "DCTEmbedStats"], str]:
|
||||
"""
|
||||
Embed data into an image using specified mode.
|
||||
@@ -636,49 +640,54 @@ def embed_in_image(
|
||||
embed_mode in VALID_EMBED_MODES, f"Invalid embed_mode: {embed_mode}. Use 'lsb' or 'dct'"
|
||||
)
|
||||
|
||||
# DCT MODE
|
||||
if embed_mode == EMBED_MODE_DCT:
|
||||
if not has_dct_support():
|
||||
# Dispatch via backend registry
|
||||
from .backends import registry
|
||||
|
||||
backend = registry.get(embed_mode)
|
||||
if not backend.is_available():
|
||||
raise ImportError(
|
||||
"scipy is required for DCT embedding mode. " "Install with: pip install scipy"
|
||||
f"Dependencies for '{embed_mode}' mode are not installed. "
|
||||
f"Install with: pip install stegasoo[dct]"
|
||||
)
|
||||
|
||||
# Validate DCT output format
|
||||
if embed_mode == EMBED_MODE_DCT:
|
||||
# Validate DCT-specific options
|
||||
if dct_output_format not in (DCT_OUTPUT_PNG, DCT_OUTPUT_JPEG):
|
||||
debug.print(f"Invalid dct_output_format '{dct_output_format}', defaulting to PNG")
|
||||
dct_output_format = DCT_OUTPUT_PNG
|
||||
|
||||
# Validate DCT color mode (v3.0.1)
|
||||
if dct_color_mode not in ("grayscale", "color"):
|
||||
debug.print(f"Invalid dct_color_mode '{dct_color_mode}', defaulting to color")
|
||||
dct_color_mode = "color"
|
||||
|
||||
dct_mod = _get_dct_module()
|
||||
|
||||
# Pass output_format and color_mode to DCT module (v3.0.1)
|
||||
stego_bytes, dct_stats = dct_mod.embed_in_dct(
|
||||
stego_bytes, dct_stats = backend.embed(
|
||||
data,
|
||||
image_data,
|
||||
pixel_key,
|
||||
output_format=dct_output_format,
|
||||
color_mode=dct_color_mode,
|
||||
progress_file=progress_file,
|
||||
dct_output_format=dct_output_format,
|
||||
dct_color_mode=dct_color_mode,
|
||||
quant_step=quant_step,
|
||||
jpeg_quality=jpeg_quality,
|
||||
max_dimension=max_dimension,
|
||||
)
|
||||
|
||||
# Determine extension based on output format
|
||||
if dct_output_format == DCT_OUTPUT_JPEG:
|
||||
ext = "jpg"
|
||||
else:
|
||||
ext = "png"
|
||||
|
||||
ext = "jpg" if dct_output_format == DCT_OUTPUT_JPEG else "png"
|
||||
debug.print(
|
||||
f"DCT embedding complete: {dct_output_format.upper()} output, "
|
||||
f"color_mode={dct_color_mode}, ext={ext}"
|
||||
)
|
||||
return stego_bytes, dct_stats, ext
|
||||
|
||||
# LSB MODE
|
||||
return _embed_lsb(data, image_data, pixel_key, bits_per_channel, output_format, progress_file)
|
||||
# LSB and other image backends
|
||||
stego_bytes, stats = backend.embed(
|
||||
data,
|
||||
image_data,
|
||||
pixel_key,
|
||||
progress_file=progress_file,
|
||||
bits_per_channel=bits_per_channel,
|
||||
output_format=output_format,
|
||||
)
|
||||
ext = getattr(stats, "output_extension", "png")
|
||||
return stego_bytes, stats, ext
|
||||
|
||||
|
||||
def _embed_lsb(
|
||||
@@ -844,6 +853,7 @@ def extract_from_image(
|
||||
bits_per_channel: int = 1,
|
||||
embed_mode: str = EMBED_MODE_AUTO,
|
||||
progress_file: str | None = None,
|
||||
quant_step: int | None = None,
|
||||
) -> bytes | None:
|
||||
"""
|
||||
Extract hidden data from a stego image.
|
||||
@@ -860,32 +870,40 @@ def extract_from_image(
|
||||
"""
|
||||
debug.print(f"extract_from_image: mode={embed_mode}")
|
||||
|
||||
# AUTO MODE: Try LSB first, then DCT
|
||||
from .backends import registry
|
||||
|
||||
# AUTO MODE: Try LSB first (cheaper), then other backends
|
||||
if embed_mode == EMBED_MODE_AUTO:
|
||||
result = _extract_lsb(image_data, pixel_key, bits_per_channel)
|
||||
auto_order = [EMBED_MODE_LSB] + [
|
||||
m for m in registry.available_modes(carrier_type="image") if m != EMBED_MODE_LSB
|
||||
]
|
||||
for mode in auto_order:
|
||||
backend = registry.get(mode)
|
||||
debug.print(f"Auto-detect: trying {mode}")
|
||||
result = backend.extract(
|
||||
image_data,
|
||||
pixel_key,
|
||||
progress_file=progress_file,
|
||||
bits_per_channel=bits_per_channel,
|
||||
quant_step=quant_step,
|
||||
)
|
||||
if result is not None:
|
||||
debug.print("Auto-detect: LSB extraction succeeded")
|
||||
debug.print(f"Auto-detect: {mode} extraction succeeded")
|
||||
return result
|
||||
|
||||
if has_dct_support():
|
||||
debug.print("Auto-detect: LSB failed, trying DCT")
|
||||
result = _extract_dct(image_data, pixel_key, progress_file)
|
||||
if result is not None:
|
||||
debug.print("Auto-detect: DCT extraction succeeded")
|
||||
return result
|
||||
|
||||
debug.print("Auto-detect: All modes failed")
|
||||
return None
|
||||
|
||||
# EXPLICIT DCT MODE
|
||||
elif embed_mode == EMBED_MODE_DCT:
|
||||
if not has_dct_support():
|
||||
raise ImportError("scipy required for DCT mode")
|
||||
return _extract_dct(image_data, pixel_key, progress_file)
|
||||
|
||||
# EXPLICIT LSB MODE
|
||||
else:
|
||||
return _extract_lsb(image_data, pixel_key, bits_per_channel)
|
||||
# EXPLICIT MODE
|
||||
backend = registry.get(embed_mode)
|
||||
if not backend.is_available():
|
||||
raise ImportError(f"Dependencies for '{embed_mode}' mode are not installed.")
|
||||
return backend.extract(
|
||||
image_data,
|
||||
pixel_key,
|
||||
progress_file=progress_file,
|
||||
bits_per_channel=bits_per_channel,
|
||||
quant_step=quant_step,
|
||||
)
|
||||
|
||||
|
||||
def _extract_dct(
|
||||
@@ -1099,9 +1117,9 @@ def peek_image(image_data: bytes) -> dict:
|
||||
|
||||
# Try DCT extraction (requires scipy/jpeglib)
|
||||
try:
|
||||
from .dct_steganography import HAS_JPEGIO, HAS_SCIPY
|
||||
from .dct_steganography import HAS_JPEGLIB, HAS_SCIPY
|
||||
|
||||
if HAS_SCIPY or HAS_JPEGIO:
|
||||
if HAS_SCIPY or HAS_JPEGLIB:
|
||||
from .dct_steganography import extract_from_dct
|
||||
|
||||
# Extract first few bytes to check header
|
||||
|
||||
Reference in New Issue
Block a user