Add per-channel hybrid audio spread spectrum and env feature toggles

Spread spectrum v2: independent per-channel embedding with round-robin
bit distribution, preserving spatial stereo/surround mix. Adaptive chip
tiers (256/512/1024) trade capacity for lossy codec robustness. LFE
channel skipped for 5.1+ layouts. v2 header (20B) with backward-
compatible v0 decode fallback.

Environment toggles (STEGASOO_AUDIO, STEGASOO_VIDEO) gate audio/video
features for minimal builds (e.g. Raspberry Pi image-only). Values:
auto (default, detect deps), 1/true (force on), 0/false (force off).

Web UI fixes: accordion defaults to step 1 on load, chevron arrow
styling, required attribute toggling for audio carrier type switch,
"Images & Mode" renamed to "Reference, Carrier, Mode".

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
adlee-was-taken
2026-02-28 11:58:40 -05:00
parent 0248bec813
commit ef5a9ce9cb
41 changed files with 4281 additions and 732 deletions

View File

@@ -7,7 +7,7 @@ Changes in v4.0.0:
- encode() and decode() now accept channel_key parameter
"""
__version__ = "4.2.1"
__version__ = "4.3.0"
# Core functionality
# Channel key management (v4.0.0)
@@ -24,8 +24,8 @@ from .channel import (
# Crypto functions
from .crypto import get_active_channel_key, get_channel_fingerprint, has_argon2
from .decode import decode, decode_audio, decode_file, decode_text
from .encode import encode, encode_audio
from .decode import decode, decode_file, decode_text
from .encode import encode
# Credential generation
from .generate import (
@@ -54,22 +54,28 @@ from .steganography import (
# Utilities
from .utils import generate_filename
# Audio utilities - optional, may not be available (v4.3.0)
try:
# Audio support — gated by STEGASOO_AUDIO env var and dependency availability
from .constants import AUDIO_ENABLED, VIDEO_ENABLED
HAS_AUDIO_SUPPORT = AUDIO_ENABLED
HAS_VIDEO_SUPPORT = VIDEO_ENABLED
if AUDIO_ENABLED:
from .audio_utils import (
detect_audio_format,
get_audio_info,
has_ffmpeg_support,
validate_audio,
)
HAS_AUDIO_SUPPORT = True
except ImportError:
HAS_AUDIO_SUPPORT = False
from .decode import decode_audio
from .encode import encode_audio
else:
detect_audio_format = None
get_audio_info = None
has_ffmpeg_support = None
validate_audio = None
encode_audio = None
decode_audio = None
# QR Code utilities - optional, may not be available
try:
@@ -203,6 +209,7 @@ __all__ = [
"has_ffmpeg_support",
"validate_audio",
"HAS_AUDIO_SUPPORT",
"HAS_VIDEO_SUPPORT",
"validate_audio_embed_mode",
"validate_audio_file",
# Generation

View File

@@ -283,7 +283,9 @@ def embed_in_audio_lsb(
# 2. Prepend magic + length prefix
header = AUDIO_MAGIC_LSB + struct.pack(">I", len(data))
payload = header + data
debug.print(f"Payload with header: {len(payload)} bytes (magic 4 + len 4 + data {len(data)})")
debug.print(
f"Payload with header: {len(payload)} bytes (magic 4 + len 4 + data {len(data)})"
)
# 3. Check capacity
max_bytes = (num_samples * bits_per_sample) // 8
@@ -463,9 +465,7 @@ def extract_from_audio_lsb(
total_samples_needed = (total_bits + bits_per_sample - 1) // bits_per_sample
if total_samples_needed > num_samples:
debug.print(
f"Need {total_samples_needed} samples but only {num_samples} available"
)
debug.print(f"Need {total_samples_needed} samples but only {num_samples} available")
return None
debug.print(f"Need {total_samples_needed} samples to extract {data_length} bytes")
@@ -483,14 +483,10 @@ def extract_from_audio_lsb(
binary_data += str((val >> bit_pos) & 1)
if progress_file and progress_idx % PROGRESS_INTERVAL == 0:
_write_progress(
progress_file, progress_idx, total_samples_needed, "extracting"
)
_write_progress(progress_file, progress_idx, total_samples_needed, "extracting")
if progress_file:
_write_progress(
progress_file, total_samples_needed, total_samples_needed, "extracting"
)
_write_progress(progress_file, total_samples_needed, total_samples_needed, "extracting")
# Skip the 8-byte header (magic + length) = 64 bits
data_bits = binary_data[64 : 64 + (data_length * 8)]

View File

@@ -13,7 +13,6 @@ Both are optional — functions degrade gracefully when unavailable.
from __future__ import annotations
import io
import logging
import shutil
from .constants import (
@@ -24,10 +23,11 @@ from .constants import (
MIN_AUDIO_SAMPLE_RATE,
VALID_AUDIO_EMBED_MODES,
)
from .debug import get_logger
from .exceptions import AudioTranscodeError, AudioValidationError, UnsupportedAudioFormatError
from .models import AudioInfo, ValidationResult
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
# =============================================================================
@@ -69,10 +69,12 @@ def detect_audio_format(audio_data: bytes) -> str:
Format string: "wav", "flac", "mp3", "ogg", "aac", "m4a", or "unknown".
"""
if len(audio_data) < 12:
logger.debug("detect_audio_format: data too short (%d bytes)", len(audio_data))
return "unknown"
# WAV: RIFF....WAVE
if audio_data[:4] == b"RIFF" and audio_data[8:12] == b"WAVE":
logger.debug("Detected WAV format (%d bytes)", len(audio_data))
return "wav"
# FLAC
@@ -124,6 +126,7 @@ def transcode_to_wav(audio_data: bytes) -> bytes:
UnsupportedAudioFormatError: If the format cannot be detected.
"""
fmt = detect_audio_format(audio_data)
logger.info("transcode_to_wav: input format=%s, size=%d bytes", fmt, len(audio_data))
if fmt == "unknown":
raise UnsupportedAudioFormatError(
@@ -325,7 +328,9 @@ def _get_info_soundfile(audio_data: bytes, fmt: str) -> AudioInfo:
try:
import soundfile as sf
except ImportError:
raise AudioTranscodeError("soundfile package is required. Install with: pip install soundfile")
raise AudioTranscodeError(
"soundfile package is required. Install with: pip install soundfile"
)
try:
buf = io.BytesIO(audio_data)
@@ -460,8 +465,7 @@ def validate_audio(
fmt = detect_audio_format(audio_data)
if fmt == "unknown":
return ValidationResult.error(
f"Could not detect {name} format. "
"Supported formats: WAV, FLAC, MP3, OGG, AAC, M4A."
f"Could not detect {name} format. " "Supported formats: WAV, FLAC, MP3, OGG, AAC, M4A."
)
# Extract metadata for further validation

View File

@@ -69,6 +69,7 @@ def _get_machine_key() -> bytes:
# Fallback to hostname
if not machine_id:
import socket
machine_id = socket.gethostname()
# Hash to get consistent 32 bytes
@@ -87,10 +88,7 @@ def _encrypt_for_storage(plaintext: str) -> str:
plaintext_bytes = plaintext.encode()
# XOR with key (cycling if needed)
encrypted = bytes(
pb ^ key[i % len(key)]
for i, pb in enumerate(plaintext_bytes)
)
encrypted = bytes(pb ^ key[i % len(key)] for i, pb in enumerate(plaintext_bytes))
return ENCRYPTED_PREFIX + base64.b64encode(encrypted).decode()
@@ -108,14 +106,11 @@ def _decrypt_from_storage(stored: str) -> str | None:
return stored
try:
encrypted = base64.b64decode(stored[len(ENCRYPTED_PREFIX):])
encrypted = base64.b64decode(stored[len(ENCRYPTED_PREFIX) :])
key = _get_machine_key()
# XOR to decrypt
decrypted = bytes(
eb ^ key[i % len(key)]
for i, eb in enumerate(encrypted)
)
decrypted = bytes(eb ^ key[i % len(key)] for i, eb in enumerate(encrypted))
return decrypted.decode()
except Exception:
@@ -413,7 +408,11 @@ def get_channel_status() -> dict:
try:
stored = config_path.read_text().strip()
file_key = _decrypt_from_storage(stored)
if file_key and validate_channel_key(file_key) and format_channel_key(file_key) == key:
if (
file_key
and validate_channel_key(file_key)
and format_channel_key(file_key) == key
):
source = str(config_path)
break
except (OSError, PermissionError, ValueError):
@@ -485,7 +484,9 @@ def resolve_channel_key(
>>> resolve_channel_key("ABCD-1234-...") # -> "ABCD-1234-..."
>>> resolve_channel_key(file_path="key.txt") # reads from file
"""
debug.print(f"resolve_channel_key: value={value}, file_path={file_path}, no_channel={no_channel}")
debug.print(
f"resolve_channel_key: value={value}, file_path={file_path}, no_channel={no_channel}"
)
# no_channel flag takes precedence
if no_channel:

View File

@@ -108,8 +108,9 @@ CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.group(context_settings=CONTEXT_SETTINGS)
@click.version_option(__version__, "-v", "--version")
@click.option("--json", "json_output", is_flag=True, help="Output results as JSON")
@click.option("--debug", "debug_mode", is_flag=True, help="Enable debug logging to stderr")
@click.pass_context
def cli(ctx, json_output):
def cli(ctx, json_output, debug_mode):
"""
Stegasoo - Steganography with hybrid authentication.
@@ -120,6 +121,11 @@ def cli(ctx, json_output):
ctx.ensure_object(dict)
ctx.obj["json"] = json_output
if debug_mode:
from .debug import debug
debug.enable(True)
# =============================================================================
# ENCODE COMMANDS
@@ -179,9 +185,7 @@ def cli(ctx, json_output):
@click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code")
@click.option("--dry-run", is_flag=True, help="Show capacity usage without encoding")
@click.pass_context
def encode(
ctx, carrier, reference, message, file_payload, output, passphrase, pin, dry_run
):
def encode(ctx, carrier, reference, message, file_payload, output, passphrase, pin, dry_run):
"""
Encode a message or file into an image.
@@ -245,14 +249,14 @@ def encode(
# Default to JPEG for JPEG carriers (preserves DCT mode benefits)
carrier_ext = Path(carrier).suffix.lower()
if not output:
if carrier_ext in ('.jpg', '.jpeg'):
if carrier_ext in (".jpg", ".jpeg"):
output = f"{Path(carrier).stem}_encoded.jpg"
else:
output = f"{Path(carrier).stem}_encoded.png"
# Detect output format from extension
output_ext = Path(output).suffix.lower()
use_dct = output_ext in ('.jpg', '.jpeg')
use_dct = output_ext in (".jpg", ".jpeg")
from .steganography import EMBED_MODE_DCT, EMBED_MODE_LSB
@@ -442,8 +446,38 @@ def decode(ctx, image, reference, passphrase, pin, output):
help="Passphrase (recommend 4+ words)",
)
@click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code")
@click.option(
"--rsa-key",
type=click.Path(exists=True),
help="RSA private key PEM file",
)
@click.option("--rsa-password", default=None, help="Password for encrypted RSA key")
@click.option("--channel-key", default=None, help="Channel key for deployment isolation")
@click.option(
"--chip-tier",
"chip_tier",
default=None,
type=click.Choice(["lossless", "high", "low"]),
help="Spread spectrum chip tier (lossless=256, high=512, low=1024). Only for audio_spread.",
)
@click.option("--dry-run", is_flag=True, help="Show capacity usage without encoding")
@click.pass_context
def audio_encode(ctx, carrier, reference, message, file_payload, output, embed_mode, passphrase, pin):
def audio_encode(
ctx,
carrier,
reference,
message,
file_payload,
output,
embed_mode,
passphrase,
pin,
rsa_key,
rsa_password,
channel_key,
chip_tier,
dry_run,
):
"""
Encode a message or file into an audio carrier.
@@ -452,26 +486,100 @@ def audio_encode(ctx, carrier, reference, message, file_payload, output, embed_m
stegasoo audio-encode carrier.wav -r ref.jpg -m "Secret" --mode audio_lsb
stegasoo audio-encode carrier.wav -r ref.jpg -f secret.pdf --mode audio_spread
stegasoo audio-encode carrier.wav -r ref.jpg -m "Secret" --dry-run
"""
from .constants import AUDIO_ENABLED
if not AUDIO_ENABLED:
raise click.UsageError(
"Audio support is disabled. Install audio extras (pip install stegasoo[audio]) "
"or set STEGASOO_AUDIO=1 to force enable."
)
from .audio_steganography import calculate_audio_lsb_capacity
from .encode import encode_audio
from .models import FilePayload
from .spread_steganography import calculate_audio_spread_capacity
if not message and not file_payload:
raise click.UsageError("Either --message or --file is required")
# Read RSA key if provided
rsa_key_data = None
if rsa_key:
with open(rsa_key, "rb") as f:
rsa_key_data = f.read()
# Calculate payload size
if file_payload:
payload_size = Path(file_payload).stat().st_size
payload_type = "file"
else:
payload_size = len(message.encode("utf-8"))
payload_type = "text"
# Read input files
with open(reference, "rb") as f:
reference_data = f.read()
with open(carrier, "rb") as f:
carrier_data = f.read()
if dry_run:
try:
from .audio_utils import get_audio_info
info = get_audio_info(carrier_data)
lsb_capacity = calculate_audio_lsb_capacity(carrier_data)
spread_capacity = calculate_audio_spread_capacity(carrier_data)
if embed_mode == "audio_lsb":
capacity = lsb_capacity
else:
capacity = spread_capacity.usable_capacity_bytes
result = {
"carrier": carrier,
"reference": reference,
"format": info.format,
"sample_rate": info.sample_rate,
"channels": info.channels,
"duration_seconds": round(info.duration_seconds, 2),
"embed_mode": embed_mode,
"capacity_bytes": capacity,
"lsb_capacity_bytes": lsb_capacity,
"spread_capacity_bytes": spread_capacity.usable_capacity_bytes,
"payload_type": payload_type,
"payload_size": payload_size,
"usage_percent": round(payload_size / capacity * 100, 1) if capacity > 0 else 0,
"fits": payload_size < capacity,
}
if ctx.obj.get("json"):
click.echo(json.dumps(result, indent=2))
else:
click.echo(
f"Carrier: {carrier} ({info.format}, {info.sample_rate}Hz, {info.channels}ch)"
)
click.echo(f"Duration: {info.duration_seconds:.1f}s")
click.echo(f"Reference: {reference}")
click.echo(f"Mode: {embed_mode}")
click.echo(f"LSB capacity: {lsb_capacity:,} bytes ({lsb_capacity // 1024} KB)")
click.echo(f"Spread capacity: {spread_capacity.usable_capacity_bytes:,} bytes")
click.echo(f"Payload: {payload_size:,} bytes ({payload_type})")
click.echo(f"Usage: {result['usage_percent']}%")
click.echo(f"Status: {'✓ Fits' if result['fits'] else '✗ Too large'}")
except Exception as e:
if ctx.obj.get("json"):
click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
else:
click.echo(f"✗ Capacity check failed: {e}", err=True)
raise SystemExit(1)
return
# Determine output path
if not output:
carrier_path = Path(carrier)
if embed_mode == "audio_lsb":
output = f"{carrier_path.stem}_encoded.wav"
else:
output = f"{carrier_path.stem}_encoded.wav"
output = f"{Path(carrier).stem}_encoded.wav"
try:
if file_payload:
@@ -479,13 +587,24 @@ def audio_encode(ctx, carrier, reference, message, file_payload, output, embed_m
else:
payload = message
# Resolve chip tier name to integer
resolved_chip_tier = None
if chip_tier is not None:
from .constants import AUDIO_SS_CHIP_TIER_NAMES
resolved_chip_tier = AUDIO_SS_CHIP_TIER_NAMES.get(chip_tier)
stego_audio, stats = encode_audio(
message=payload,
reference_photo=reference_data,
carrier_audio=carrier_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=rsa_password,
embed_mode=embed_mode,
channel_key=channel_key,
chip_tier=resolved_chip_tier,
)
with open(output, "wb") as f:
@@ -539,9 +658,18 @@ def audio_encode(ctx, carrier, reference, message, file_payload, output, embed_m
)
@click.option("--passphrase", prompt=True, hide_input=True, help="Passphrase")
@click.option("--pin", prompt=True, hide_input=True, help="PIN code")
@click.option(
"--rsa-key",
type=click.Path(exists=True),
help="RSA private key PEM file",
)
@click.option("--rsa-password", default=None, help="Password for encrypted RSA key")
@click.option("--channel-key", default=None, help="Channel key for deployment isolation")
@click.option("-o", "--output", type=click.Path(), help="Output path for file payloads")
@click.pass_context
def audio_decode(ctx, audio, reference, embed_mode, passphrase, pin, output):
def audio_decode(
ctx, audio, reference, embed_mode, passphrase, pin, rsa_key, rsa_password, channel_key, output
):
"""
Decode a message or file from stego audio.
@@ -551,8 +679,22 @@ def audio_decode(ctx, audio, reference, embed_mode, passphrase, pin, output):
stegasoo audio-decode stego.wav -r ref.jpg --mode audio_lsb -o ./extracted/
"""
from .constants import AUDIO_ENABLED
if not AUDIO_ENABLED:
raise click.UsageError(
"Audio support is disabled. Install audio extras (pip install stegasoo[audio]) "
"or set STEGASOO_AUDIO=1 to force enable."
)
from .decode import decode_audio
# Read RSA key if provided
rsa_key_data = None
if rsa_key:
with open(rsa_key, "rb") as f:
rsa_key_data = f.read()
with open(audio, "rb") as f:
audio_data = f.read()
with open(reference, "rb") as f:
@@ -564,7 +706,10 @@ def audio_decode(ctx, audio, reference, embed_mode, passphrase, pin, output):
reference_photo=reference_data,
passphrase=passphrase,
pin=pin,
rsa_key_data=rsa_key_data,
rsa_password=rsa_password,
embed_mode=embed_mode,
channel_key=channel_key,
)
if result.is_file:
@@ -617,6 +762,97 @@ def audio_decode(ctx, audio, reference, embed_mode, passphrase, pin, output):
raise SystemExit(1)
@cli.command("audio-info")
@click.argument("audio", type=click.Path(exists=True))
@click.pass_context
def audio_info(ctx, audio):
"""
Show audio file information and steganographic capacity.
Examples:
stegasoo audio-info carrier.wav
stegasoo --json audio-info carrier.wav
"""
from .constants import AUDIO_ENABLED
if not AUDIO_ENABLED:
raise click.UsageError(
"Audio support is disabled. Install audio extras (pip install stegasoo[audio]) "
"or set STEGASOO_AUDIO=1 to force enable."
)
from .audio_steganography import calculate_audio_lsb_capacity
from .audio_utils import get_audio_info
from .spread_steganography import calculate_audio_spread_capacity
with open(audio, "rb") as f:
audio_data = f.read()
try:
info = get_audio_info(audio_data)
lsb_capacity = calculate_audio_lsb_capacity(audio_data)
# Calculate spread capacity at each chip tier
spread_tiers = {}
for tier_name, tier_val in [("lossless", 0), ("high", 1), ("low", 2)]:
cap = calculate_audio_spread_capacity(audio_data, chip_tier=tier_val)
spread_tiers[tier_name] = {
"bytes": cap.usable_capacity_bytes,
"kb": round(cap.usable_capacity_bytes / 1024, 1),
"chip_length": cap.chip_length,
"embeddable_channels": cap.embeddable_channels,
}
result = {
"file": audio,
"format": info.format,
"sample_rate": info.sample_rate,
"channels": info.channels,
"duration_seconds": round(info.duration_seconds, 2),
"num_samples": info.num_samples,
"bit_depth": info.bit_depth,
"file_size": len(audio_data),
"capacity": {
"audio_lsb": {
"bytes": lsb_capacity,
"kb": round(lsb_capacity / 1024, 1),
},
"audio_spread": spread_tiers,
},
}
if ctx.obj.get("json"):
click.echo(json.dumps(result, indent=2))
else:
click.echo(f"File: {audio}")
click.echo(f"Format: {info.format}")
click.echo(f"Sample rate: {info.sample_rate} Hz")
click.echo(f"Channels: {info.channels}")
click.echo(f"Duration: {info.duration_seconds:.1f}s")
click.echo(f"Samples: {info.num_samples:,}")
if info.bit_depth:
click.echo(f"Bit depth: {info.bit_depth}-bit")
click.echo(f"File size: {len(audio_data):,} bytes")
click.echo()
click.echo("Steganographic capacity:")
click.echo(f" LSB: {lsb_capacity:,} bytes ({lsb_capacity // 1024} KB)")
for tier_name in ("lossless", "high", "low"):
t = spread_tiers[tier_name]
click.echo(
f" Spread ({tier_name:>8}, chip={t['chip_length']}): "
f"{t['bytes']:,} bytes ({t['kb']} KB)"
)
except Exception as e:
if ctx.obj.get("json"):
click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
else:
click.echo(f"✗ Audio info failed: {e}", err=True)
raise SystemExit(1)
# =============================================================================
# BATCH COMMANDS
# =============================================================================
@@ -828,9 +1064,7 @@ def batch_check(ctx, images, recursive):
@click.option(
"--pin-length", default=DEFAULT_PIN_LENGTH, help=f"PIN length (default: {DEFAULT_PIN_LENGTH})"
)
@click.option(
"--channel-key", is_flag=True, help="Also generate a 256-bit channel key"
)
@click.option("--channel-key", is_flag=True, help="Also generate a 256-bit channel key")
@click.pass_context
def generate(ctx, words, pin_length, channel_key):
"""
@@ -889,6 +1123,7 @@ def generate(ctx, words, pin_length, channel_key):
# Generate channel key if requested
if channel_key:
from .channel import generate_channel_key
result["channel_key"] = generate_channel_key()
if ctx.obj.get("json"):
@@ -912,6 +1147,7 @@ def info(ctx, full):
# Check for DCT support
try:
from .dct_steganography import HAS_JPEGIO, HAS_SCIPY
has_dct = HAS_SCIPY and HAS_JPEGIO
except ImportError:
has_dct = False
@@ -954,6 +1190,7 @@ def info(ctx, full):
channel_source = None
try:
from .channel import get_channel_fingerprint, get_channel_key, get_channel_status
key = get_channel_key()
if key:
channel_fingerprint = get_channel_fingerprint(key)
@@ -986,7 +1223,7 @@ def info(ctx, full):
try:
# Disk free
st = os.statvfs("/")
disk_free = (st.f_bavail * st.f_frsize) / (1024 ** 3) # GB
disk_free = (st.f_bavail * st.f_frsize) / (1024**3) # GB
except OSError:
pass
@@ -1005,20 +1242,28 @@ def info(ctx, full):
"service": service_status,
"url": service_url,
"dct_support": has_dct,
"channel": {
"fingerprint": channel_fingerprint,
"source": channel_source,
} if channel_fingerprint else None,
"channel": (
{
"fingerprint": channel_fingerprint,
"source": channel_source,
}
if channel_fingerprint
else None
),
"limits": {
"max_message_bytes": MAX_MESSAGE_SIZE,
"max_file_payload_bytes": MAX_FILE_PAYLOAD_SIZE,
},
"system": {
"cpu_mhz": cpu_freq,
"temp_c": cpu_temp,
"disk_free_gb": round(disk_free, 1) if disk_free else None,
"uptime": uptime,
} if full else None,
"system": (
{
"cpu_mhz": cpu_freq,
"temp_c": cpu_temp,
"disk_free_gb": round(disk_free, 1) if disk_free else None,
"uptime": uptime,
}
if full
else None
),
}
if ctx.obj.get("json"):
@@ -1055,7 +1300,9 @@ def info(ctx, full):
if cpu_freq:
click.echo(f" CPU: {cpu_freq} MHz")
if cpu_temp:
temp_color = "\033[32m" if cpu_temp < 60 else "\033[33m" if cpu_temp < 75 else "\033[31m"
temp_color = (
"\033[32m" if cpu_temp < 60 else "\033[33m" if cpu_temp < 75 else "\033[31m"
)
click.echo(f" Temp: {temp_color}{cpu_temp:.1f}°C\033[0m")
if uptime:
click.echo(f" Uptime: {uptime}")
@@ -1384,7 +1631,7 @@ def tools_capacity(image, as_json):
click.echo(f" Megapixels: {result['megapixels']} MP")
click.echo(f" {'' * 40}")
click.echo(f" LSB Capacity: {result['lsb']['capacity_kb']:.1f} KB")
if result['dct']['available']:
if result["dct"]["available"]:
click.echo(f" DCT Capacity: {result['dct']['capacity_kb']:.1f} KB")
else:
click.echo(" DCT Capacity: N/A (scipy required)")
@@ -1394,7 +1641,9 @@ def tools_capacity(image, as_json):
@tools.command("strip")
@click.argument("image", type=click.Path(exists=True))
@click.option("-o", "--output", type=click.Path(), help="Output file (default: <name>_clean.png)")
@click.option("--format", "fmt", type=click.Choice(["png", "bmp"]), default="png", help="Output format")
@click.option(
"--format", "fmt", type=click.Choice(["png", "bmp"]), default="png", help="Output format"
)
def tools_strip(image, output, fmt):
"""Strip EXIF/metadata from an image.
@@ -1529,7 +1778,9 @@ def tools_exif(image, clear, set_fields, output, as_json):
@tools.command("compress")
@click.argument("image", type=click.Path(exists=True))
@click.option("-q", "--quality", type=int, default=75, help="JPEG quality (1-100, default: 75)")
@click.option("-o", "--output", type=click.Path(), help="Output file (default: <name>_q<quality>.jpg)")
@click.option(
"-o", "--output", type=click.Path(), help="Output file (default: <name>_q<quality>.jpg)"
)
def tools_compress(image, quality, output):
"""Compress a JPEG image.
@@ -1541,9 +1792,10 @@ def tools_compress(image, quality, output):
stegasoo tools compress photo.jpg -q 60
stegasoo tools compress photo.jpg -q 80 -o smaller.jpg
"""
from PIL import Image
import io
from PIL import Image
if not 1 <= quality <= 100:
raise click.UsageError("Quality must be between 1 and 100")
@@ -1578,7 +1830,9 @@ def tools_compress(image, quality, output):
@tools.command("rotate")
@click.argument("image", type=click.Path(exists=True))
@click.option("-r", "--rotation", type=click.Choice(["90", "180", "270"]), help="Rotation degrees clockwise")
@click.option(
"-r", "--rotation", type=click.Choice(["90", "180", "270"]), help="Rotation degrees clockwise"
)
@click.option("--flip-h", is_flag=True, help="Flip horizontally")
@click.option("--flip-v", is_flag=True, help="Flip vertically")
@click.option("-o", "--output", type=click.Path(), help="Output file")
@@ -1593,10 +1847,11 @@ def tools_rotate(image, rotation, flip_h, flip_v, output):
stegasoo tools rotate photo.jpg -r 90
stegasoo tools rotate photo.jpg -r 180 --flip-h -o rotated.jpg
"""
from PIL import Image
import io
import shutil
from PIL import Image
with open(image, "rb") as f:
image_data = f.read()
@@ -1622,9 +1877,9 @@ def tools_rotate(image, rotation, flip_h, flip_v, output):
# Apply flips using jpegtran
if flip_h or flip_v:
import os
import subprocess
import tempfile
import os
for flip_type in (["horizontal"] if flip_h else []) + (["vertical"] if flip_v else []):
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
@@ -1633,9 +1888,19 @@ def tools_rotate(image, rotation, flip_h, flip_v, output):
output_path = tempfile.mktemp(suffix=".jpg")
try:
subprocess.run(
["jpegtran", "-flip", flip_type, "-copy", "all",
"-outfile", output_path, input_path],
capture_output=True, timeout=30, check=True
[
"jpegtran",
"-flip",
flip_type,
"-copy",
"all",
"-outfile",
output_path,
input_path,
],
capture_output=True,
timeout=30,
check=True,
)
with open(output_path, "rb") as f:
result_data = f.read()
@@ -1680,8 +1945,17 @@ def tools_rotate(image, rotation, flip_h, flip_v, output):
@tools.command("convert")
@click.argument("image", type=click.Path(exists=True))
@click.option("-f", "--format", "fmt", type=click.Choice(["png", "jpg", "bmp", "webp"]), required=True, help="Output format")
@click.option("-q", "--quality", type=int, default=95, help="Quality for lossy formats (default: 95)")
@click.option(
"-f",
"--format",
"fmt",
type=click.Choice(["png", "jpg", "bmp", "webp"]),
required=True,
help="Output format",
)
@click.option(
"-q", "--quality", type=int, default=95, help="Quality for lossy formats (default: 95)"
)
@click.option("-o", "--output", type=click.Path(), help="Output file")
def tools_convert(image, fmt, quality, output):
"""Convert image to a different format.
@@ -1691,9 +1965,10 @@ def tools_convert(image, fmt, quality, output):
stegasoo tools convert photo.png -f jpg
stegasoo tools convert photo.jpg -f png -o lossless.png
"""
from PIL import Image
import io
from PIL import Image
with open(image, "rb") as f:
image_data = f.read()
@@ -1737,12 +2012,14 @@ def admin(ctx):
@admin.command("recover")
@click.option(
"--db", "db_path",
"--db",
"db_path",
type=click.Path(exists=True),
help="Path to stegasoo.db (default: frontends/web/instance/stegasoo.db)"
help="Path to stegasoo.db (default: frontends/web/instance/stegasoo.db)",
)
@click.option(
"--password", prompt=True, hide_input=True, confirmation_prompt=True, help="New admin password"
)
@click.option("--password", prompt=True, hide_input=True, confirmation_prompt=True,
help="New admin password")
def admin_recover(db_path, password):
"""Reset admin password using recovery key.
@@ -1772,9 +2049,7 @@ def admin_recover(db_path, password):
break
if not db_path or not Path(db_path).exists():
raise click.UsageError(
"Database not found. Use --db to specify path to stegasoo.db"
)
raise click.UsageError("Database not found. Use --db to specify path to stegasoo.db")
click.echo(f"Database: {db_path}")
@@ -1783,16 +2058,13 @@ def admin_recover(db_path, password):
db.row_factory = sqlite3.Row
# Get recovery key hash from app_settings
cursor = db.execute(
"SELECT value FROM app_settings WHERE key = 'recovery_key_hash'"
)
cursor = db.execute("SELECT value FROM app_settings WHERE key = 'recovery_key_hash'")
row = cursor.fetchone()
if not row:
db.close()
raise click.ClickException(
"No recovery key configured for this instance. "
"Password reset is not possible."
"No recovery key configured for this instance. " "Password reset is not possible."
)
stored_hash = row["value"]
@@ -1869,6 +2141,7 @@ def admin_generate_key(show_qr):
if show_qr:
try:
import qrcode
qr = qrcode.QRCode(box_size=1, border=1)
qr.add_data(key)
qr.make()
@@ -1920,8 +2193,12 @@ def api_keys():
@api_keys.command("list")
@click.option("--location", type=click.Choice(["user", "project", "all"]), default="all",
help="Config location to list keys from")
@click.option(
"--location",
type=click.Choice(["user", "project", "all"]),
default="all",
help="Config location to list keys from",
)
def api_keys_list(location):
"""List configured API keys.
@@ -1935,7 +2212,7 @@ def api_keys_list(location):
_setup_frontends_path()
try:
from api.auth import list_api_keys, get_api_key_status
from api.auth import get_api_key_status, list_api_keys
except ImportError:
raise click.ClickException("API frontend not available")
@@ -1959,8 +2236,12 @@ def api_keys_list(location):
@api_keys.command("create")
@click.argument("name")
@click.option("--location", type=click.Choice(["user", "project"]), default="user",
help="Where to store the key")
@click.option(
"--location",
type=click.Choice(["user", "project"]),
default="user",
help="Where to store the key",
)
def api_keys_create(name, location):
"""Create a new API key.
@@ -1993,8 +2274,9 @@ def api_keys_create(name, location):
@api_keys.command("delete")
@click.argument("name")
@click.option("--location", type=click.Choice(["user", "project"]), default="user",
help="Config location")
@click.option(
"--location", type=click.Choice(["user", "project"]), default="user", help="Config location"
)
def api_keys_delete(name, location):
"""Delete an API key by name.
@@ -2025,7 +2307,9 @@ def api_tls():
@api_tls.command("generate")
@click.option("--hostname", default="localhost", help="Server hostname for certificate")
@click.option("--days", default=365, help="Certificate validity in days")
@click.option("--output", "-o", type=click.Path(), help="Output directory (default: ~/.stegasoo/certs)")
@click.option(
"--output", "-o", type=click.Path(), help="Output directory (default: ~/.stegasoo/certs)"
)
def api_tls_generate(hostname, days, output):
"""Generate self-signed TLS certificate.
@@ -2065,7 +2349,12 @@ def api_tls_generate(hostname, days, output):
@api_tls.command("info")
@click.option("--cert", "-c", type=click.Path(exists=True), help="Certificate file (default: ~/.stegasoo/certs/server.crt)")
@click.option(
"--cert",
"-c",
type=click.Path(exists=True),
help="Certificate file (default: ~/.stegasoo/certs/server.crt)",
)
def api_tls_info(cert):
"""Show information about a TLS certificate.
@@ -2075,12 +2364,13 @@ def api_tls_info(cert):
stegasoo api tls info --cert /path/to/server.crt
"""
from cryptography import x509
from cryptography.hazmat.primitives import serialization
if not cert:
cert = Path.home() / ".stegasoo" / "certs" / "server.crt"
if not cert.exists():
raise click.ClickException(f"No certificate found at {cert}. Generate one with: stegasoo api tls generate")
raise click.ClickException(
f"No certificate found at {cert}. Generate one with: stegasoo api tls generate"
)
cert_data = Path(cert).read_bytes()
certificate = x509.load_pem_x509_certificate(cert_data)
@@ -2095,7 +2385,8 @@ def api_tls_info(cert):
# Check expiry
import datetime
now = datetime.datetime.now(datetime.timezone.utc)
now = datetime.datetime.now(datetime.UTC)
if certificate.not_valid_after_utc < now:
click.echo("\nStatus: EXPIRED")
elif certificate.not_valid_after_utc < now + datetime.timedelta(days=30):
@@ -2144,8 +2435,11 @@ def api_serve(host, port, ssl, cert, key, do_reload):
else:
try:
from web.ssl_utils import ensure_certs
base_dir = Path.home() / ".stegasoo"
cert_path, key_path = ensure_certs(base_dir, host if host != "0.0.0.0" else "localhost")
cert_path, key_path = ensure_certs(
base_dir, host if host != "0.0.0.0" else "localhost"
)
except ImportError:
raise click.ClickException("ssl_utils not available")

View File

@@ -9,6 +9,10 @@ import struct
import zlib
from enum import IntEnum
from .debug import get_logger
logger = get_logger(__name__)
# Optional LZ4 support (faster, slightly worse ratio)
try:
import lz4.frame

View File

@@ -262,8 +262,7 @@ DCT_STEP_SIZE = 8 # QIM quantization step
# SHA256("\x89ST3\x89DCT") - hardcoded so it never changes even if headers are added
# Used to XOR recovery keys in QR codes so they scan as gibberish
RECOVERY_OBFUSCATION_KEY = bytes.fromhex(
"d6c70bce27780db942562550e9fe1459"
"9dfdb8421f5acc79696b05db4e7afbd2"
"d6c70bce27780db942562550e9fe1459" "9dfdb8421f5acc79696b05db4e7afbd2"
) # 32 bytes
# Valid embedding modes
@@ -297,6 +296,69 @@ def detect_stego_mode(encrypted_data: bytes) -> str:
return "unknown"
# =============================================================================
# FEATURE TOGGLES (v4.3.1)
# =============================================================================
# Environment variables to enable/disable optional feature families.
# Values: "auto" (default — detect dependencies), "1"/"true" (force on),
# "0"/"false" (force off even if deps are installed).
# Pi builds or minimal installs can set STEGASOO_AUDIO=0 to stay image-only.
import os as _os
def _parse_feature_toggle(env_var: str, default: str = "auto") -> str | bool:
"""Parse a feature toggle env var. Returns 'auto', True, or False."""
val = _os.environ.get(env_var, default).strip().lower()
if val in ("1", "true", "yes", "on"):
return True
if val in ("0", "false", "no", "off"):
return False
return "auto"
def _check_audio_deps() -> bool:
"""Check if audio dependencies (soundfile, numpy) are importable."""
try:
import numpy # noqa: F401
import soundfile # noqa: F401
return True
except ImportError:
return False
def _check_video_deps() -> bool:
"""Check if video dependencies (ffmpeg binary + audio deps) are available."""
import shutil
if not _check_audio_deps():
return False
return shutil.which("ffmpeg") is not None
def _resolve_feature(toggle: str | bool, dep_check: callable) -> bool:
"""Resolve a feature toggle to a final bool."""
if toggle is True:
if not dep_check():
raise ImportError(
f"Feature force-enabled but required dependencies are missing. "
f"Install the relevant extras (e.g. pip install stegasoo[audio])."
)
return True
if toggle is False:
return False
# auto
return dep_check()
_audio_toggle = _parse_feature_toggle("STEGASOO_AUDIO")
_video_toggle = _parse_feature_toggle("STEGASOO_VIDEO")
AUDIO_ENABLED: bool = _resolve_feature(_audio_toggle, _check_audio_deps)
VIDEO_ENABLED: bool = _resolve_feature(_video_toggle, _check_video_deps)
# =============================================================================
# AUDIO STEGANOGRAPHY (v4.3.0)
# =============================================================================
@@ -319,12 +381,33 @@ MAX_AUDIO_SAMPLE_RATE = 192000 # Studio quality
ALLOWED_AUDIO_EXTENSIONS = {"wav", "flac", "mp3", "ogg", "opus", "aac", "m4a", "wma"}
# Spread spectrum parameters
AUDIO_SS_CHIP_LENGTH = 1024 # Samples per chip (spreading factor)
AUDIO_SS_AMPLITUDE = 0.05 # Per-sample embedding strength (~-26dB, masked by audio)
AUDIO_SS_RS_NSYM = 32 # Reed-Solomon parity symbols
AUDIO_SS_CHIP_LENGTH = 1024 # Samples per chip (spreading factor) — legacy/default
AUDIO_SS_AMPLITUDE = 0.05 # Per-sample embedding strength (~-26dB, masked by audio)
AUDIO_SS_RS_NSYM = 32 # Reed-Solomon parity symbols
# Spread spectrum v2: per-channel hybrid embedding (v4.4.0)
AUDIO_SS_HEADER_VERSION = 2 # v2 header format identifier
# Chip tier system — trade capacity for robustness
AUDIO_SS_CHIP_TIER_LOSSLESS = 0 # 256 chips — lossless carriers (FLAC/WAV/ALAC)
AUDIO_SS_CHIP_TIER_HIGH_LOSSY = 1 # 512 chips — high-rate lossy (AAC 256k+)
AUDIO_SS_CHIP_TIER_LOW_LOSSY = 2 # 1024 chips — low-rate lossy (AAC 128k, Opus)
AUDIO_SS_DEFAULT_CHIP_TIER = 2 # Most robust, backward compatible
AUDIO_SS_CHIP_LENGTHS = {0: 256, 1: 512, 2: 1024}
# Chip tier name mapping (for CLI/UI)
AUDIO_SS_CHIP_TIER_NAMES = {
"lossless": AUDIO_SS_CHIP_TIER_LOSSLESS,
"high": AUDIO_SS_CHIP_TIER_HIGH_LOSSY,
"low": AUDIO_SS_CHIP_TIER_LOW_LOSSY,
}
# LFE channel skipping — LFE is bandlimited to ~120Hz, terrible carrier
AUDIO_LFE_CHANNEL_INDEX = 3 # Standard WAV/WAVEFORMATEXTENSIBLE ordering
AUDIO_LFE_MIN_CHANNELS = 6 # Only skip LFE for 5.1+ layouts
# Echo hiding parameters
AUDIO_ECHO_DELAY_0 = 50 # Echo delay for bit 0 (samples at 44.1kHz ~ 1.1ms)
AUDIO_ECHO_DELAY_1 = 100 # Echo delay for bit 1 (samples at 44.1kHz ~ 2.3ms)
AUDIO_ECHO_AMPLITUDE = 0.3 # Echo strength (relative to original)
AUDIO_ECHO_WINDOW_SIZE = 8192 # Window size for echo embedding
AUDIO_ECHO_DELAY_0 = 50 # Echo delay for bit 0 (samples at 44.1kHz ~ 1.1ms)
AUDIO_ECHO_DELAY_1 = 100 # Echo delay for bit 1 (samples at 44.1kHz ~ 2.3ms)
AUDIO_ECHO_AMPLITUDE = 0.3 # Echo strength (relative to original)
AUDIO_ECHO_WINDOW_SIZE = 8192 # Window size for echo embedding

View File

@@ -46,9 +46,12 @@ from .constants import (
SALT_SIZE,
TAG_SIZE,
)
from .debug import get_logger
from .exceptions import DecryptionError, EncryptionError, InvalidHeaderError, KeyDerivationError
from .models import DecodeResult, FilePayload
logger = get_logger(__name__)
# Check for Argon2 availability
try:
from argon2.low_level import Type, hash_secret_raw
@@ -201,6 +204,18 @@ def derive_hybrid_key(
"""
try:
photo_hash = hash_photo(photo_data)
logger.debug(
"derive_hybrid_key: photo_hash=%s, pin=%s, rsa=%s, channel=%s, salt=%d bytes",
photo_hash[:4].hex(),
"set" if pin else "none",
"set" if rsa_key_data else "none",
(
"explicit"
if isinstance(channel_key, str) and channel_key
else "auto" if channel_key is None else "none"
),
len(salt),
)
# Resolve channel key (server-specific binding)
channel_hash = _resolve_channel_key(channel_key)
@@ -217,19 +232,30 @@ def derive_hybrid_key(
if channel_hash:
key_material += channel_hash
logger.debug("Key material: %d bytes", len(key_material))
# Run it all through the KDF
if HAS_ARGON2:
logger.debug(
"KDF: Argon2id (memory=%dKB, time=%d, parallel=%d)",
ARGON2_MEMORY_COST,
ARGON2_TIME_COST,
ARGON2_PARALLELISM,
)
# Argon2id: the good stuff
key = hash_secret_raw(
secret=key_material,
salt=salt[:32],
time_cost=ARGON2_TIME_COST, # 4 iterations
time_cost=ARGON2_TIME_COST, # 4 iterations
memory_cost=ARGON2_MEMORY_COST, # 256 MB RAM
parallelism=ARGON2_PARALLELISM, # 4 threads
hash_len=32,
type=Type.ID, # Hybrid mode: resists side-channel AND GPU attacks
)
else:
logger.warning(
"KDF: PBKDF2 fallback (%d iterations) - argon2 not available", PBKDF2_ITERATIONS
)
# PBKDF2 fallback for systems without argon2-cffi
# 600K iterations is slow but not memory-hard
kdf = PBKDF2HMAC(
@@ -241,6 +267,7 @@ def derive_hybrid_key(
)
key = kdf.derive(key_material)
logger.debug("KDF complete, derived %d-byte key", len(key))
return key
except Exception as e:
@@ -457,6 +484,13 @@ def encrypt_message(
# Pack payload with type marker
packed_payload, _ = _pack_payload(message)
logger.debug(
"encrypt_message: packed_payload=%d bytes, flags=0x%02x, format_version=%d",
len(packed_payload),
flags,
FORMAT_VERSION,
)
# Random padding to hide message length
padding_len = secrets.randbelow(256) + 64
padded_len = ((len(packed_payload) + padding_len + 255) // 256) * 256
@@ -464,6 +498,10 @@ def encrypt_message(
padding = secrets.token_bytes(padding_needed - 4) + struct.pack(">I", len(packed_payload))
padded_message = packed_payload + padding
logger.debug(
"Padded message: %d bytes (payload + %d padding)", len(padded_message), padding_needed
)
# Build header for AAD
header = MAGIC_HEADER + bytes([FORMAT_VERSION, flags])
@@ -473,10 +511,22 @@ def encrypt_message(
encryptor.authenticate_additional_data(header)
ciphertext = encryptor.update(padded_message) + encryptor.finalize()
total_size = len(header) + len(salt) + len(iv) + len(encryptor.tag) + len(ciphertext)
logger.debug(
"Encrypted output: %d bytes (header=%d, salt=%d, iv=%d, tag=%d, ciphertext=%d)",
total_size,
len(header),
len(salt),
len(iv),
len(encryptor.tag),
len(ciphertext),
)
# v4.0.0: Header with flags byte
return header + salt + iv + encryptor.tag + ciphertext
except Exception as e:
logger.error("Encryption failed: %s", e)
raise EncryptionError(f"Encryption failed: {e}") from e
@@ -551,10 +601,21 @@ def decrypt_message(
InvalidHeaderError: If data doesn't have valid Stegasoo header
DecryptionError: If decryption fails (wrong credentials)
"""
logger.debug("decrypt_message: %d bytes of encrypted data", len(encrypted_data))
header = parse_header(encrypted_data)
if not header:
logger.error("Invalid or missing Stegasoo header in %d bytes", len(encrypted_data))
raise InvalidHeaderError("Invalid or missing Stegasoo header")
logger.debug(
"Header: version=%d, flags=0x%02x, has_channel_key=%s, ciphertext=%d bytes",
header["version"],
header["flags"],
header["has_channel_key"],
len(header["ciphertext"]),
)
# Check for channel key mismatch and provide helpful error
channel_hash = _resolve_channel_key(channel_key)
has_configured_key = channel_hash is not None
@@ -577,9 +638,16 @@ def decrypt_message(
padded_plaintext = decryptor.update(header["ciphertext"]) + decryptor.finalize()
original_length = struct.unpack(">I", padded_plaintext[-4:])[0]
logger.debug(
"Decrypted %d bytes, original payload length: %d",
len(padded_plaintext),
original_length,
)
payload_data = padded_plaintext[:original_length]
result = _unpack_payload(payload_data)
logger.debug("Decryption successful: %s", result.payload_type)
return result
except Exception as e:

View File

@@ -40,12 +40,12 @@ from PIL import Image, ImageOps
# Check for scipy availability (for PNG/DCT mode)
# Prefer scipy.fft (newer, more stable) over scipy.fftpack
try:
from scipy.fft import dct, idct, dctn, idctn
from scipy.fft import dct, dctn, idct, idctn
HAS_SCIPY = True
except ImportError:
try:
from scipy.fftpack import dct, idct, dctn, idctn
from scipy.fftpack import dct, dctn, idct, idctn
HAS_SCIPY = True
except ImportError:
@@ -120,9 +120,9 @@ BLOCK_SIZE = 8
# Position (0,0) is the DC coefficient - the average brightness of the block.
# We NEVER touch DC because changing it causes visible brightness shifts.
EMBED_POSITIONS = [
(0, 1), # 1st AC coefficient
(1, 0), # 2nd AC coefficient
(2, 0), # ... and so on in zig-zag order
(0, 1), # 1st AC coefficient
(1, 0), # 2nd AC coefficient
(2, 0), # ... and so on in zig-zag order
(1, 1),
(0, 2),
(0, 3),
@@ -169,9 +169,9 @@ DEFAULT_EMBED_POSITIONS = EMBED_POSITIONS[4:20]
QUANT_STEP = 25
# Magic bytes so we can identify our own images
DCT_MAGIC = b"DCTS" # scipy DCT mode marker
JPEGIO_MAGIC = b"JPGS" # jpegio native JPEG mode marker
HEADER_SIZE = 10 # Magic (4) + version (1) + flags (1) + length (4)
DCT_MAGIC = b"DCTS" # scipy DCT mode marker
JPEGIO_MAGIC = b"JPGS" # jpegio native JPEG mode marker
HEADER_SIZE = 10 # Magic (4) + version (1) + flags (1) + length (4)
OUTPUT_FORMAT_PNG = "png"
OUTPUT_FORMAT_JPEG = "jpeg"
@@ -186,8 +186,8 @@ JPEGIO_MIN_COEF_MAGNITUDE = 2
JPEGIO_EMBED_CHANNEL = 0
# Header flags
FLAG_COLOR_MODE = 0x01 # Set if we preserved color (YCbCr mode)
FLAG_RS_PROTECTED = 0x02 # Set if Reed-Solomon protected (v4.1.0+)
FLAG_COLOR_MODE = 0x01 # Set if we preserved color (YCbCr mode)
FLAG_RS_PROTECTED = 0x02 # Set if Reed-Solomon protected (v4.1.0+)
# Reed-Solomon settings - the "please don't lose my data" system
# 32 parity symbols per chunk means we can correct up to 16 byte errors
@@ -196,8 +196,8 @@ RS_NSYM = 32
# We store the payload length 3 times and take majority vote
# Because if the length is wrong, everything is wrong
RS_LENGTH_HEADER_SIZE = 8 # 4 bytes raw length + 4 bytes RS-encoded length
RS_LENGTH_COPIES = 3 # Store 3 copies, need 2 to agree
RS_LENGTH_HEADER_SIZE = 8 # 4 bytes raw length + 4 bytes RS-encoded length
RS_LENGTH_COPIES = 3 # Store 3 copies, need 2 to agree
RS_LENGTH_PREFIX_SIZE = RS_LENGTH_HEADER_SIZE * RS_LENGTH_COPIES # 24 bytes total
# Chunking for large images - scipy's FFT gets memory-corrupty on huge arrays
@@ -287,6 +287,7 @@ def has_jpegio_support() -> bool:
try:
from reedsolo import ReedSolomonError, RSCodec
HAS_REEDSOLO = True
except ImportError:
HAS_REEDSOLO = False
@@ -1009,7 +1010,8 @@ def _embed_in_channel_safe(
needs_adjust = (quantized % 2) != bit_array
# Determine direction to nudge
dct_blocks[i, embed_rows[needs_adjust], embed_cols[needs_adjust]] = (
(quantized[needs_adjust] + (1 - 2 * (quantized[needs_adjust] % 2 == 1))) * QUANT_STEP
(quantized[needs_adjust] + (1 - 2 * (quantized[needs_adjust] % 2 == 1)))
* QUANT_STEP
).astype(np.float64)
# For bits that already match, just quantize
dct_blocks[i, embed_rows[~needs_adjust], embed_cols[~needs_adjust]] = (
@@ -1219,6 +1221,7 @@ def _embed_jpegio(
def _jpegtran_available() -> bool:
"""Check if jpegtran is available on the system."""
import shutil
return shutil.which("jpegtran") is not None
@@ -1237,9 +1240,9 @@ def _jpegtran_rotate(image_data: bytes, rotation: int) -> bytes:
Returns:
Rotated JPEG bytes with DCT coefficients preserved
"""
import os
import subprocess
import tempfile
import os
if rotation not in (90, 180, 270):
raise ValueError(f"Invalid rotation: {rotation}")
@@ -1257,10 +1260,18 @@ def _jpegtran_rotate(image_data: bytes, rotation: int) -> bytes:
# NOTE: Don't use -trim as it drops edge blocks and destroys stego data
# NOTE: Don't use -perfect as it fails on images with non-MCU-aligned edges
result = subprocess.run(
["jpegtran", "-rotate", str(rotation), "-copy", "all",
"-outfile", output_path, input_path],
[
"jpegtran",
"-rotate",
str(rotation),
"-copy",
"all",
"-outfile",
output_path,
input_path,
],
capture_output=True,
timeout=30
timeout=30,
)
if result.returncode != 0:
@@ -1367,6 +1378,7 @@ def _quick_validate_dct_header(image_data: bytes, seed: bytes) -> bool:
copies.append(length_prefix_bytes[start:end])
from collections import Counter
counter = Counter(copies)
_, count = counter.most_common(1)[0]
@@ -1437,6 +1449,7 @@ def extract_from_dct(
if rotation != 0:
try:
from . import debug
debug.print(f"DCT decode succeeded after {rotation}° rotation")
except Exception:
pass # Don't let debug logging break extraction
@@ -1450,6 +1463,7 @@ def extract_from_dct(
if rotation != 0:
try:
from . import debug
debug.print(f"DCT decode succeeded after {rotation}° rotation")
except Exception:
pass # Don't let debug logging break extraction

View File

@@ -2,27 +2,96 @@
Stegasoo Debugging Utilities
Debugging, logging, and performance monitoring tools.
Can be disabled for production use.
Configuration:
STEGASOO_LOG_LEVEL env var controls log level:
- Not set or empty: logging disabled (production default)
- DEBUG: verbose debug output (encode/decode flow, crypto params, etc.)
- INFO: operational messages (format detection, mode selection)
- WARNING: potential issues (fallback KDF, format transcoding)
- ERROR: operation failures
STEGASOO_DEBUG=1 is a shorthand for STEGASOO_LOG_LEVEL=DEBUG
CLI: stegasoo --debug encode ... (sets DEBUG level for that invocation)
All output goes to Python's logging module under the 'stegasoo' logger hierarchy.
The legacy debug.print() API is preserved for backward compatibility.
"""
import logging
import os
import sys
import time
import traceback
from collections.abc import Callable
from datetime import datetime
from functools import wraps
from typing import Any
# Map string level names to logging levels
_LEVEL_MAP = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
# Root logger for the stegasoo package
logger = logging.getLogger("stegasoo")
# Global debug configuration
DEBUG_ENABLED = False # Set to True to enable debug output
LOG_PERFORMANCE = True # Log function timing
VALIDATION_ASSERTIONS = True # Enable runtime validation assertions
def _configure_from_env() -> bool:
"""Configure logging from environment variables. Returns True if debug enabled."""
# STEGASOO_DEBUG=1 is shorthand for DEBUG level
if os.environ.get("STEGASOO_DEBUG", "").strip() in ("1", "true", "yes"):
_setup_logging(logging.DEBUG)
return True
level_str = os.environ.get("STEGASOO_LOG_LEVEL", "").strip().upper()
if level_str and level_str in _LEVEL_MAP:
_setup_logging(_LEVEL_MAP[level_str])
return level_str == "DEBUG"
return False
def _setup_logging(level: int) -> None:
"""Configure the stegasoo logger with a stderr handler."""
logger.setLevel(level)
# Only add handler if none exist (avoid duplicates on re-init)
if not logger.handlers:
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(level)
formatter = logging.Formatter(
"[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s",
datefmt="%H:%M:%S",
)
handler.setFormatter(formatter)
logger.addHandler(handler)
else:
# Update existing handler level
for handler in logger.handlers:
handler.setLevel(level)
# Auto-configure on import
DEBUG_ENABLED = _configure_from_env()
def enable_debug(enable: bool = True) -> None:
"""Enable or disable debug mode globally."""
global DEBUG_ENABLED
DEBUG_ENABLED = enable
if enable:
_setup_logging(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
def enable_performance_logging(enable: bool = True) -> None:
@@ -38,15 +107,14 @@ def enable_assertions(enable: bool = True) -> None:
def debug_print(message: str, level: str = "INFO") -> None:
"""Print debug message with timestamp if debugging is enabled."""
if DEBUG_ENABLED:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] [{level}] {message}", file=sys.stderr)
"""Log a message at the given level via the stegasoo logger."""
log_level = _LEVEL_MAP.get(level.upper(), logging.DEBUG)
logger.log(log_level, message)
def debug_data(data: bytes, label: str = "Data", max_bytes: int = 32) -> str:
"""Format bytes for debugging."""
if not DEBUG_ENABLED:
if not logger.isEnabledFor(logging.DEBUG):
return ""
if not data:
@@ -55,15 +123,17 @@ def debug_data(data: bytes, label: str = "Data", max_bytes: int = 32) -> str:
if len(data) <= max_bytes:
return f"{label} ({len(data)} bytes): {data.hex()}"
else:
return f"{label} ({len(data)} bytes): {data[:max_bytes//2].hex()}...{data[-max_bytes//2:].hex()}"
return (
f"{label} ({len(data)} bytes): "
f"{data[:max_bytes // 2].hex()}...{data[-max_bytes // 2:].hex()}"
)
def debug_exception(e: Exception, context: str = "") -> None:
"""Log exception with context for debugging."""
if DEBUG_ENABLED:
debug_print(f"Exception in {context}: {type(e).__name__}: {e}", "ERROR")
if DEBUG_ENABLED:
traceback.print_exc()
logger.error("Exception in %s: %s: %s", context, type(e).__name__, e)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(traceback.format_exc())
def time_function(func: Callable) -> Callable:
@@ -71,7 +141,7 @@ def time_function(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
if not (DEBUG_ENABLED and LOG_PERFORMANCE):
if not (logger.isEnabledFor(logging.DEBUG) and LOG_PERFORMANCE):
return func(*args, **kwargs)
start = time.perf_counter()
@@ -80,7 +150,7 @@ def time_function(func: Callable) -> Callable:
return result
finally:
end = time.perf_counter()
debug_print(f"{func.__name__} took {end - start:.6f}s", "PERF")
logger.debug("%s took %.6fs", func.__name__, end - start)
return wrapper
@@ -94,8 +164,6 @@ def validate_assertion(condition: bool, message: str) -> None:
def memory_usage() -> dict[str, float | str]:
"""Get current memory usage (if psutil is available)."""
try:
import os
import psutil
process = psutil.Process(os.getpid())
@@ -131,8 +199,19 @@ def hexdump(data: bytes, offset: int = 0, length: int = 64) -> str:
return "\n".join(result)
def get_logger(name: str) -> logging.Logger:
"""Get a child logger under the stegasoo namespace.
Usage in modules:
from .debug import get_logger
logger = get_logger(__name__)
logger.debug("message")
"""
return logging.getLogger(name)
class Debug:
"""Debugging utility class."""
"""Debugging utility class (backward-compatible API)."""
def __init__(self):
self.enabled = DEBUG_ENABLED

View File

@@ -31,12 +31,15 @@ def _write_progress(progress_file: str | None, current: int, total: int, phase:
return
try:
with open(progress_file, "w") as f:
json.dump({
"current": current,
"total": total,
"percent": (current / total * 100) if total > 0 else 0,
"phase": phase,
}, f)
json.dump(
{
"current": current,
"total": total,
"percent": (current / total * 100) if total > 0 else 0,
"phase": phase,
},
f,
)
except OSError:
pass
@@ -291,16 +294,23 @@ def decode_audio(
Returns:
DecodeResult with message or file data
"""
from .audio_utils import detect_audio_format, transcode_to_wav
from .constants import (
AUDIO_ENABLED,
EMBED_MODE_AUDIO_AUTO,
EMBED_MODE_AUDIO_LSB,
EMBED_MODE_AUDIO_SPREAD,
)
if not AUDIO_ENABLED:
raise ExtractionError(
"Audio support is disabled. Install audio extras (pip install stegasoo[audio]) "
"or set STEGASOO_AUDIO=1 to force enable."
)
from .audio_utils import detect_audio_format, transcode_to_wav
debug.print(
f"decode_audio: mode={embed_mode}, "
f"passphrase length={len(passphrase.split())} words"
f"decode_audio: mode={embed_mode}, " f"passphrase length={len(passphrase.split())} words"
)
# Validate inputs
@@ -358,9 +368,7 @@ def decode_audio(
elif embed_mode == EMBED_MODE_AUDIO_SPREAD:
from .spread_steganography import extract_from_audio_spread
encrypted = extract_from_audio_spread(
wav_audio, pixel_key, progress_file=progress_file
)
encrypted = extract_from_audio_spread(wav_audio, pixel_key, progress_file=progress_file)
else:
raise ValueError(f"Invalid audio embed mode: {embed_mode}")

View File

@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING
from .constants import EMBED_MODE_LSB
from .crypto import derive_pixel_key, encrypt_message
from .debug import debug
from .exceptions import AudioError
from .models import EncodeResult, FilePayload
from .steganography import embed_in_image
from .utils import generate_filename
@@ -280,6 +281,7 @@ def encode_audio(
embed_mode: str = "audio_lsb",
channel_key: str | bool | None = None,
progress_file: str | None = None,
chip_tier: int | None = None,
) -> tuple[bytes, AudioEmbedStats]:
"""
Encode a message or file into an audio carrier.
@@ -295,12 +297,21 @@ def encode_audio(
embed_mode: 'audio_lsb' or 'audio_spread'
channel_key: Channel key for deployment/group isolation
progress_file: Optional path to write progress JSON
chip_tier: Spread spectrum chip tier (0=lossless, 1=high_lossy, 2=low_lossy).
Only used for audio_spread mode. Default None → uses constant default.
Returns:
Tuple of (stego audio bytes, AudioEmbedStats)
"""
from .constants import AUDIO_ENABLED, EMBED_MODE_AUDIO_LSB, EMBED_MODE_AUDIO_SPREAD
if not AUDIO_ENABLED:
raise AudioError(
"Audio support is disabled. Install audio extras (pip install stegasoo[audio]) "
"or set STEGASOO_AUDIO=1 to force enable."
)
from .audio_utils import detect_audio_format, transcode_to_wav
from .constants import EMBED_MODE_AUDIO_LSB, EMBED_MODE_AUDIO_SPREAD
debug.print(
f"encode_audio: mode={embed_mode}, "
@@ -343,10 +354,12 @@ def encode_audio(
encrypted, carrier_audio, pixel_key, progress_file=progress_file
)
elif embed_mode == EMBED_MODE_AUDIO_SPREAD:
from .constants import AUDIO_SS_DEFAULT_CHIP_TIER
from .spread_steganography import embed_in_audio_spread
tier = chip_tier if chip_tier is not None else AUDIO_SS_DEFAULT_CHIP_TIER
stego_audio, stats = embed_in_audio_spread(
encrypted, carrier_audio, pixel_key, progress_file=progress_file
encrypted, carrier_audio, pixel_key, chip_tier=tier, progress_file=progress_file
)
else:
raise ValueError(f"Invalid audio embed mode: {embed_mode}")

View File

@@ -300,6 +300,9 @@ class AudioEmbedStats:
channels: int
duration_seconds: float
embed_mode: str # "audio_lsb" or "audio_spread"
chip_tier: int | None = None # v4.4.0: spread spectrum chip tier (0/1/2)
chip_length: int | None = None # v4.4.0: samples per chip
embeddable_channels: int | None = None # v4.4.0: channels used (excl. LFE)
@property
def modification_percent(self) -> float:
@@ -329,3 +332,7 @@ class AudioCapacityInfo:
embed_mode: str
sample_rate: int
duration_seconds: float
chip_tier: int | None = None # v4.4.0: spread spectrum chip tier
chip_length: int | None = None # v4.4.0: samples per chip
embeddable_channels: int | None = None # v4.4.0: channels used (excl. LFE)
total_channels: int | None = None # v4.4.0: total channels in carrier

View File

@@ -105,14 +105,14 @@ def decompress_data(data: str) -> str:
"Data compressed with zstd but zstandard package not installed. "
"Run: pip install zstandard"
)
encoded = data[len(COMPRESSION_PREFIX_ZSTD):]
encoded = data[len(COMPRESSION_PREFIX_ZSTD) :]
compressed = base64.b64decode(encoded)
dctx = zstd.ZstdDecompressor()
return dctx.decompress(compressed).decode("utf-8")
elif data.startswith(COMPRESSION_PREFIX_ZLIB):
# Legacy zlib compression
encoded = data[len(COMPRESSION_PREFIX_ZLIB):]
encoded = data[len(COMPRESSION_PREFIX_ZLIB) :]
compressed = base64.b64decode(encoded)
return zlib.decompress(compressed).decode("utf-8")

View File

@@ -98,7 +98,7 @@ _RECOVERY_STEGO_PASSPHRASE = "stegasoo-recovery-v1"
_RECOVERY_STEGO_PIN = "314159" # Pi digits - fixed, not secret
# Size limits for carrier image
STEGO_BACKUP_MIN_SIZE = 50 * 1024 # 50 KB
STEGO_BACKUP_MIN_SIZE = 50 * 1024 # 50 KB
STEGO_BACKUP_MAX_SIZE = 2 * 1024 * 1024 # 2 MB
@@ -182,6 +182,7 @@ def extract_stego_backup(
debug.print(f"Stego backup extraction failed: {e}")
return None
# Recovery key format: same as channel key (32 chars, 8 groups of 4)
RECOVERY_KEY_LENGTH = 32
RECOVERY_KEY_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
@@ -205,16 +206,10 @@ def generate_recovery_key() -> str:
7
"""
# Generate 32 random alphanumeric characters
raw_key = "".join(
secrets.choice(RECOVERY_KEY_ALPHABET)
for _ in range(RECOVERY_KEY_LENGTH)
)
raw_key = "".join(secrets.choice(RECOVERY_KEY_ALPHABET) for _ in range(RECOVERY_KEY_LENGTH))
# Format with dashes every 4 characters
formatted = "-".join(
raw_key[i:i + 4]
for i in range(0, RECOVERY_KEY_LENGTH, 4)
)
formatted = "-".join(raw_key[i : i + 4] for i in range(0, RECOVERY_KEY_LENGTH, 4))
debug.print(f"Generated recovery key: {formatted[:4]}-••••-...-{formatted[-4:]}")
return formatted
@@ -245,15 +240,12 @@ def normalize_recovery_key(key: str) -> str:
# Validate length
if len(clean) != RECOVERY_KEY_LENGTH:
raise ValueError(
f"Recovery key must be {RECOVERY_KEY_LENGTH} characters "
f"(got {len(clean)})"
f"Recovery key must be {RECOVERY_KEY_LENGTH} characters " f"(got {len(clean)})"
)
# Validate characters
if not all(c in RECOVERY_KEY_ALPHABET for c in clean):
raise ValueError(
"Recovery key must contain only letters A-Z and digits 0-9"
)
raise ValueError("Recovery key must contain only letters A-Z and digits 0-9")
return clean
@@ -273,7 +265,7 @@ def format_recovery_key(key: str) -> str:
"ABCD-1234-EFGH-5678-IJKL-9012-MNOP-3456"
"""
clean = normalize_recovery_key(key)
return "-".join(clean[i:i + 4] for i in range(0, RECOVERY_KEY_LENGTH, 4))
return "-".join(clean[i : i + 4] for i in range(0, RECOVERY_KEY_LENGTH, 4))
def hash_recovery_key(key: str) -> str:

File diff suppressed because it is too large Load Diff

View File

@@ -109,7 +109,7 @@ EXT_TO_FORMAT = {
# - v4.0.0: 66 bytes (added flags byte for channel key)
HEADER_OVERHEAD = 66 # What the crypto layer adds to any message
LENGTH_PREFIX = 4 # We prepend the payload length for LSB extraction
LENGTH_PREFIX = 4 # We prepend the payload length for LSB extraction
ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX # Total: 70 bytes
# That 70 bytes is your minimum image capacity requirement.

View File

@@ -54,8 +54,7 @@ def read_image_exif(image_data: bytes) -> dict:
gps[gps_tag] = float(gps_value)
elif isinstance(gps_value, tuple):
gps[gps_tag] = [
float(v) if hasattr(v, "numerator") else v
for v in gps_value
float(v) if hasattr(v, "numerator") else v for v in gps_value
]
else:
gps[gps_tag] = gps_value
@@ -69,7 +68,9 @@ def read_image_exif(image_data: bytes) -> dict:
# Try to decode as ASCII/UTF-8 text
decoded = value.decode("utf-8", errors="strict").strip("\x00")
# Only keep if it looks like printable text
if decoded.isprintable() or all(c.isspace() or c.isprintable() for c in decoded):
if decoded.isprintable() or all(
c.isspace() or c.isprintable() for c in decoded
):
result[tag] = decoded
else:
result[tag] = f"<{len(value)} bytes binary>"

View File

@@ -13,6 +13,10 @@ import io
from PIL import Image
from .debug import get_logger
logger = get_logger(__name__)
from .constants import (
ALLOWED_AUDIO_EXTENSIONS,
ALLOWED_IMAGE_EXTENSIONS,