From 0248bec813cf41bd3cd5a1b584a150d19a57f1e1 Mon Sep 17 00:00:00 2001 From: adlee-was-taken Date: Fri, 27 Feb 2026 20:26:07 -0500 Subject: [PATCH] Add audio steganography with LSB and spread spectrum modes Implement two audio embedding modes following the same multi-factor authentication pipeline as image steganography (passphrase + PIN + optional RSA key + optional channel key): - audio_lsb: High-capacity LSB embedding in PCM samples for lossless formats (WAV/FLAC). Uses ChaCha20-keyed sample index selection. - audio_spread: Direct-sequence spread spectrum (DSSS) with ChaCha20- keyed bipolar chip codes, Reed-Solomon error correction, and 3-copy majority-voted length headers. Designed to survive lossy compression. New files: - audio_steganography.py: LSB embed/extract on PCM samples - spread_steganography.py: Spread spectrum embed/extract - audio_utils.py: Format detection, transcoding, validation helpers - tests/test_audio.py: 22 tests covering both modes end-to-end Updated encode.py, decode.py, cli.py (audio-encode/audio-decode commands), constants.py, models.py, exceptions.py, validation.py, __init__.py, and pyproject.toml ([audio] extra). Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 11 +- src/stegasoo/__init__.py | 60 ++- src/stegasoo/audio_steganography.py | 514 +++++++++++++++++++ src/stegasoo/audio_utils.py | 536 +++++++++++++++++++ src/stegasoo/cli.py | 213 ++++++++ src/stegasoo/constants.py | 33 ++ src/stegasoo/decode.py | 114 +++++ src/stegasoo/encode.py | 94 ++++ src/stegasoo/exceptions.py | 48 ++ src/stegasoo/models.py | 48 ++ src/stegasoo/spread_steganography.py | 735 +++++++++++++++++++++++++++ src/stegasoo/validation.py | 34 ++ tests/test_audio.py | 448 ++++++++++++++++ 13 files changed, 2885 insertions(+), 3 deletions(-) create mode 100644 src/stegasoo/audio_steganography.py create mode 100644 src/stegasoo/audio_utils.py create mode 100644 src/stegasoo/spread_steganography.py create mode 100644 tests/test_audio.py diff --git a/pyproject.toml b/pyproject.toml index 7f4d05c..52f46da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,13 @@ dct = [ "jpeglib>=1.0.0", "reedsolo>=1.7.0", ] +audio = [ + "pydub>=0.25.0", + "numpy>=2.0.0", + "scipy>=1.10.0", + "soundfile>=0.12.0", + "reedsolo>=1.7.0", +] cli = [ "click>=8.0.0", "qrcode>=7.30", @@ -86,7 +93,7 @@ api = [ "reedsolo>=1.7.0", ] all = [ - "stegasoo[cli,web,api,dct,compression]", + "stegasoo[cli,web,api,dct,audio,compression]", ] dev = [ "stegasoo[all]", @@ -141,6 +148,8 @@ ignore = ["E501"] [tool.ruff.lint.per-file-ignores] # YCbCr colorspace variables (R, G, B, Y, Cb, Cr) are standard names "src/stegasoo/dct_steganography.py" = ["N803", "N806"] +# MDCT transform variables (N, X) are standard mathematical names +"src/stegasoo/spread_steganography.py" = ["N803", "N806"] # Package __init__.py has imports after try/except and aliases - intentional structure "src/stegasoo/__init__.py" = ["E402"] diff --git a/src/stegasoo/__init__.py b/src/stegasoo/__init__.py index 829f755..bcb1a6c 100644 --- a/src/stegasoo/__init__.py +++ b/src/stegasoo/__init__.py @@ -24,8 +24,8 @@ from .channel import ( # Crypto functions from .crypto import get_active_channel_key, get_channel_fingerprint, has_argon2 -from .decode import decode, decode_file, decode_text -from .encode import encode +from .decode import decode, decode_audio, decode_file, decode_text +from .encode import encode, encode_audio # Credential generation from .generate import ( @@ -54,6 +54,23 @@ from .steganography import ( # Utilities from .utils import generate_filename +# Audio utilities - optional, may not be available (v4.3.0) +try: + from .audio_utils import ( + detect_audio_format, + get_audio_info, + has_ffmpeg_support, + validate_audio, + ) + + HAS_AUDIO_SUPPORT = True +except ImportError: + HAS_AUDIO_SUPPORT = False + detect_audio_format = None + get_audio_info = None + has_ffmpeg_support = None + validate_audio = None + # QR Code utilities - optional, may not be available try: from .qr_utils import ( @@ -88,6 +105,9 @@ validate_carrier = validate_image # Constants from .constants import ( DEFAULT_PASSPHRASE_WORDS, + EMBED_MODE_AUDIO_AUTO, + EMBED_MODE_AUDIO_LSB, + EMBED_MODE_AUDIO_SPREAD, EMBED_MODE_AUTO, EMBED_MODE_DCT, EMBED_MODE_LSB, @@ -106,6 +126,11 @@ from .constants import ( # Exceptions from .exceptions import ( + AudioCapacityError, + AudioError, + AudioExtractionError, + AudioTranscodeError, + AudioValidationError, CapacityError, CryptoError, DecryptionError, @@ -127,11 +152,15 @@ from .exceptions import ( SecurityFactorError, SteganographyError, StegasooError, + UnsupportedAudioFormatError, ValidationError, ) # Models from .models import ( + AudioCapacityInfo, + AudioEmbedStats, + AudioInfo, CapacityComparison, Credentials, DecodeResult, @@ -142,6 +171,8 @@ from .models import ( ValidationResult, ) from .validation import ( + validate_audio_embed_mode, + validate_audio_file, validate_dct_color_mode, validate_dct_output_format, validate_embed_mode, @@ -164,6 +195,16 @@ __all__ = [ "decode", "decode_file", "decode_text", + # Audio (v4.3.0) + "encode_audio", + "decode_audio", + "detect_audio_format", + "get_audio_info", + "has_ffmpeg_support", + "validate_audio", + "HAS_AUDIO_SUPPORT", + "validate_audio_embed_mode", + "validate_audio_file", # Generation "generate_pin", "generate_passphrase", @@ -221,6 +262,10 @@ __all__ = [ "FilePayload", "Credentials", "ValidationResult", + # Audio models + "AudioEmbedStats", + "AudioInfo", + "AudioCapacityInfo", # Exceptions "StegasooError", "ValidationError", @@ -244,6 +289,13 @@ __all__ = [ "ReedSolomonError", "NoDataFoundError", "ModeMismatchError", + # Audio exceptions + "AudioError", + "AudioValidationError", + "AudioCapacityError", + "AudioExtractionError", + "AudioTranscodeError", + "UnsupportedAudioFormatError", # Constants "FORMAT_VERSION", "MIN_PASSPHRASE_WORDS", @@ -266,4 +318,8 @@ __all__ = [ "EMBED_MODE_LSB", "EMBED_MODE_DCT", "EMBED_MODE_AUTO", + # Audio constants + "EMBED_MODE_AUDIO_LSB", + "EMBED_MODE_AUDIO_SPREAD", + "EMBED_MODE_AUDIO_AUTO", ] diff --git a/src/stegasoo/audio_steganography.py b/src/stegasoo/audio_steganography.py new file mode 100644 index 0000000..76edb60 --- /dev/null +++ b/src/stegasoo/audio_steganography.py @@ -0,0 +1,514 @@ +""" +Stegasoo Audio Steganography — LSB Embedding/Extraction (v4.3.0) + +LSB (Least Significant Bit) embedding for PCM audio samples. + +Hides data in the least significant bit(s) of audio samples, analogous to +how steganography.py hides data in pixel LSBs. The carrier audio must be +lossless (WAV or FLAC) — lossy codecs (MP3, OGG, AAC) destroy LSBs. + +Uses ChaCha20 as a CSPRNG for pseudo-random sample index selection, +ensuring that without the key an attacker cannot determine which samples +were modified. + +Supports: +- 16-bit PCM (int16 samples) +- 24-bit PCM (int32 samples from soundfile) +- Float audio (converted to int16 before embedding) +- 1 or 2 bits per sample embedding depth +- Mono and multi-channel audio (flattened for embedding) +""" + +import io +import struct + +import numpy as np +import soundfile as sf +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms + +from .constants import ( + AUDIO_MAGIC_LSB, + EMBED_MODE_AUDIO_LSB, +) +from .debug import debug +from .exceptions import AudioCapacityError, AudioError +from .models import AudioEmbedStats +from .steganography import ENCRYPTION_OVERHEAD + +# Progress reporting interval — write every N samples +PROGRESS_INTERVAL = 5000 + + +# ============================================================================= +# PROGRESS REPORTING +# ============================================================================= + + +def _write_progress(progress_file: str | None, current: int, total: int, phase: str = "embedding"): + """Write progress to file for frontend polling.""" + if progress_file is None: + return + try: + import json + + with open(progress_file, "w") as f: + json.dump( + { + "current": current, + "total": total, + "percent": round((current / total) * 100, 1) if total > 0 else 0, + "phase": phase, + }, + f, + ) + except Exception: + pass # Don't let progress writing break encoding + + +# ============================================================================= +# CAPACITY +# ============================================================================= + + +def calculate_audio_lsb_capacity( + audio_data: bytes, + bits_per_sample: int = 1, +) -> int: + """ + Calculate the maximum bytes that can be embedded in a WAV/FLAC file via LSB. + + Reads the carrier audio with soundfile, counts the total number of individual + sample values (num_frames * channels), and computes how many payload bytes + can be hidden at the given bit depth, minus the fixed encryption overhead. + + Args: + audio_data: Raw bytes of a WAV or FLAC file. + bits_per_sample: Number of LSBs to use per sample (1 or 2). + + Returns: + Maximum embeddable payload size in bytes (after subtracting overhead). + + Raises: + AudioError: If the audio cannot be read or is in an unsupported format. + """ + debug.validate( + bits_per_sample in (1, 2), f"bits_per_sample must be 1 or 2, got {bits_per_sample}" + ) + + try: + info = sf.info(io.BytesIO(audio_data)) + except Exception as e: + raise AudioError(f"Failed to read audio file: {e}") from e + + num_samples = info.frames * info.channels + total_bits = num_samples * bits_per_sample + max_bytes = total_bits // 8 + + capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD) + debug.print( + f"Audio LSB capacity: {capacity} bytes " + f"({num_samples} samples, {bits_per_sample} bit(s)/sample, " + f"{info.samplerate} Hz, {info.channels} ch)" + ) + return capacity + + +# ============================================================================= +# SAMPLE INDEX GENERATION (ChaCha20 CSPRNG) +# ============================================================================= +# +# Identical strategy to generate_pixel_indices in steganography.py: +# - >= 50% capacity utilisation: full Fisher-Yates shuffle, take first N +# - < 50%: direct random sampling with collision handling +# +# The key MUST be 32 bytes (same derivation path as the pixel key). + + +@debug.time +def generate_sample_indices(key: bytes, num_samples: int, num_needed: int) -> list[int]: + """ + Generate pseudo-random sample indices using ChaCha20 as a CSPRNG. + + Produces a deterministic sequence of unique sample indices so that + the same key always yields the same embedding locations. + + Args: + key: 32-byte key for the ChaCha20 cipher. + num_samples: Total number of samples in the carrier audio. + num_needed: How many unique sample indices are required. + + Returns: + List of ``num_needed`` unique indices in [0, num_samples). + + Raises: + AssertionError (via debug.validate): On invalid arguments. + """ + debug.validate(len(key) == 32, f"Sample key must be 32 bytes, got {len(key)}") + debug.validate(num_samples > 0, f"Number of samples must be positive, got {num_samples}") + debug.validate(num_needed > 0, f"Number needed must be positive, got {num_needed}") + debug.validate( + num_needed <= num_samples, + f"Cannot select {num_needed} samples from {num_samples} available", + ) + + debug.print(f"Generating {num_needed} sample indices from {num_samples} total samples") + + # Strategy 1: Full Fisher-Yates shuffle when we need many indices + if num_needed >= num_samples // 2: + debug.print(f"Using full shuffle (needed {num_needed}/{num_samples} samples)") + nonce = b"\x00" * 16 + cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend()) + encryptor = cipher.encryptor() + + indices = list(range(num_samples)) + random_bytes = encryptor.update(b"\x00" * (num_samples * 4)) + + for i in range(num_samples - 1, 0, -1): + j_bytes = random_bytes[(num_samples - 1 - i) * 4 : (num_samples - i) * 4] + j = int.from_bytes(j_bytes, "big") % (i + 1) + indices[i], indices[j] = indices[j], indices[i] + + selected = indices[:num_needed] + debug.print(f"Generated {len(selected)} indices via shuffle") + return selected + + # Strategy 2: Direct sampling for lower utilisation + debug.print(f"Using optimized selection (needed {num_needed}/{num_samples} samples)") + selected: list[int] = [] + used: set[int] = set() + + nonce = b"\x00" * 16 + cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend()) + encryptor = cipher.encryptor() + + # Pre-generate 2x bytes to handle expected collisions + bytes_needed = (num_needed * 2) * 4 + random_bytes = encryptor.update(b"\x00" * bytes_needed) + + byte_offset = 0 + collisions = 0 + while len(selected) < num_needed and byte_offset < len(random_bytes) - 4: + idx = int.from_bytes(random_bytes[byte_offset : byte_offset + 4], "big") % num_samples + byte_offset += 4 + + if idx not in used: + used.add(idx) + selected.append(idx) + else: + collisions += 1 + + # Edge case: ran out of pre-generated bytes (very high collision rate) + if len(selected) < num_needed: + debug.print(f"Need {num_needed - len(selected)} more indices, generating...") + extra_needed = num_needed - len(selected) + for _ in range(extra_needed * 2): + extra_bytes = encryptor.update(b"\x00" * 4) + idx = int.from_bytes(extra_bytes, "big") % num_samples + if idx not in used: + used.add(idx) + selected.append(idx) + if len(selected) == num_needed: + break + + debug.print(f"Generated {len(selected)} indices with {collisions} collisions") + debug.validate( + len(selected) == num_needed, + f"Failed to generate enough indices: {len(selected)}/{num_needed}", + ) + return selected + + +# ============================================================================= +# EMBEDDING +# ============================================================================= + + +@debug.time +def embed_in_audio_lsb( + data: bytes, + carrier_audio: bytes, + sample_key: bytes, + bits_per_sample: int = 1, + progress_file: str | None = None, +) -> tuple[bytes, AudioEmbedStats]: + """ + Embed data into PCM audio samples using LSB steganography. + + The payload is prepended with a 4-byte magic header (``AUDIO_MAGIC_LSB``) + and a 4-byte big-endian length prefix, then converted to a binary string. + Pseudo-random sample indices are generated from ``sample_key`` and the + corresponding sample LSBs are overwritten. + + The modified audio is written back as a 16-bit PCM WAV file. + + Args: + data: Encrypted payload bytes to embed. + carrier_audio: Raw bytes of the carrier WAV/FLAC file. + sample_key: 32-byte key for sample index generation. + bits_per_sample: LSBs to use per sample (1 or 2). + progress_file: Optional path for progress JSON (frontend polling). + + Returns: + Tuple of (stego WAV bytes, AudioEmbedStats). + + Raises: + AudioCapacityError: If the payload is too large for the carrier. + AudioError: On any other embedding failure. + """ + debug.print(f"Audio LSB embedding {len(data)} bytes") + debug.data(sample_key, "Sample key for embedding") + debug.validate( + bits_per_sample in (1, 2), f"bits_per_sample must be 1 or 2, got {bits_per_sample}" + ) + debug.validate(len(sample_key) == 32, f"Sample key must be 32 bytes, got {len(sample_key)}") + + try: + # 1. Read carrier audio + samples, samplerate = sf.read(io.BytesIO(carrier_audio), dtype="int16", always_2d=True) + # samples shape: (num_frames, channels) + original_shape = samples.shape + channels = original_shape[1] + duration = original_shape[0] / samplerate + + debug.print( + f"Carrier audio: {samplerate} Hz, {channels} ch, " + f"{original_shape[0]} frames, {duration:.2f}s" + ) + + # Flatten to 1D for embedding + flat_samples = samples.flatten().copy() + num_samples = len(flat_samples) + + # 2. Prepend magic + length prefix + header = AUDIO_MAGIC_LSB + struct.pack(">I", len(data)) + payload = header + data + debug.print(f"Payload with header: {len(payload)} bytes (magic 4 + len 4 + data {len(data)})") + + # 3. Check capacity + max_bytes = (num_samples * bits_per_sample) // 8 + if len(payload) > max_bytes: + debug.print(f"Capacity error: need {len(payload)}, have {max_bytes}") + raise AudioCapacityError(len(payload), max_bytes) + + debug.print( + f"Capacity usage: {len(payload)}/{max_bytes} bytes " + f"({len(payload) / max_bytes * 100:.1f}%)" + ) + + # 4. Convert payload to binary string + binary_data = "".join(format(b, "08b") for b in payload) + samples_needed = (len(binary_data) + bits_per_sample - 1) // bits_per_sample + + debug.print(f"Need {samples_needed} samples to embed {len(binary_data)} bits") + + # 5. Generate pseudo-random sample indices + selected_indices = generate_sample_indices(sample_key, num_samples, samples_needed) + + # 6. Modify LSBs of selected samples + lsb_mask = (1 << bits_per_sample) - 1 + bit_idx = 0 + modified_count = 0 + total_to_process = len(selected_indices) + + # Initial progress + if progress_file: + _write_progress(progress_file, 5, 100, "embedding") + + for progress_idx, sample_idx in enumerate(selected_indices): + if bit_idx >= len(binary_data): + break + + bits = binary_data[bit_idx : bit_idx + bits_per_sample].ljust(bits_per_sample, "0") + bit_val = int(bits, 2) + + sample_val = flat_samples[sample_idx] + # Work in unsigned 16-bit space to avoid overflow + unsigned_val = int(sample_val) & 0xFFFF + new_unsigned = (unsigned_val & ~lsb_mask) | bit_val + # Convert back to signed int16 + new_val = np.int16(new_unsigned if new_unsigned < 32768 else new_unsigned - 65536) + + if sample_val != new_val: + flat_samples[sample_idx] = new_val + modified_count += 1 + + bit_idx += bits_per_sample + + # Report progress periodically + if progress_file and progress_idx % PROGRESS_INTERVAL == 0: + _write_progress(progress_file, progress_idx, total_to_process, "embedding") + + # Final progress before save + if progress_file: + _write_progress(progress_file, total_to_process, total_to_process, "saving") + + debug.print(f"Modified {modified_count} samples (out of {samples_needed} selected)") + + # 7. Reshape and write back as WAV + stego_samples = flat_samples.reshape(original_shape) + + output_buf = io.BytesIO() + sf.write(output_buf, stego_samples, samplerate, format="WAV", subtype="PCM_16") + output_buf.seek(0) + stego_bytes = output_buf.getvalue() + + stats = AudioEmbedStats( + samples_modified=modified_count, + total_samples=num_samples, + capacity_used=len(payload) / max_bytes, + bytes_embedded=len(payload), + sample_rate=samplerate, + channels=channels, + duration_seconds=duration, + embed_mode=EMBED_MODE_AUDIO_LSB, + ) + + debug.print(f"Audio LSB embedding complete: {len(stego_bytes)} byte WAV") + return stego_bytes, stats + + except AudioCapacityError: + raise + except Exception as e: + debug.exception(e, "embed_in_audio_lsb") + raise AudioError(f"Failed to embed data in audio: {e}") from e + + +# ============================================================================= +# EXTRACTION +# ============================================================================= + + +@debug.time +def extract_from_audio_lsb( + audio_data: bytes, + sample_key: bytes, + bits_per_sample: int = 1, + progress_file: str | None = None, +) -> bytes | None: + """ + Extract hidden data from audio using LSB steganography. + + Reads the stego audio, generates the same pseudo-random sample indices + from ``sample_key``, extracts the LSBs, and reconstructs the payload. + Verifies the ``AUDIO_MAGIC_LSB`` header before returning. + + Args: + audio_data: Raw bytes of the stego WAV file. + sample_key: 32-byte key (must match the one used for embedding). + bits_per_sample: LSBs per sample (must match embedding). + progress_file: Optional path for progress JSON. + + Returns: + Extracted payload bytes (without magic/length prefix), or ``None`` + if extraction fails (wrong key, no data, corrupted). + """ + debug.print(f"Audio LSB extracting from {len(audio_data)} byte audio") + debug.data(sample_key, "Sample key for extraction") + debug.validate( + bits_per_sample in (1, 2), f"bits_per_sample must be 1 or 2, got {bits_per_sample}" + ) + + try: + # 1. Read audio + samples, samplerate = sf.read(io.BytesIO(audio_data), dtype="int16", always_2d=True) + flat_samples = samples.flatten() + num_samples = len(flat_samples) + + debug.print(f"Audio: {samplerate} Hz, {samples.shape[1]} ch, {num_samples} total samples") + + # 2. Extract initial samples to find magic bytes + length (8 bytes = 64 bits) + header_bits_needed = 64 # 4 bytes magic + 4 bytes length + header_samples_needed = (header_bits_needed + bits_per_sample - 1) // bits_per_sample + 10 + + if header_samples_needed > num_samples: + debug.print("Audio too small to contain header") + return None + + initial_indices = generate_sample_indices(sample_key, num_samples, header_samples_needed) + + binary_data = "" + for sample_idx in initial_indices: + val = int(flat_samples[sample_idx]) & 0xFFFF + for bit_pos in range(bits_per_sample - 1, -1, -1): + binary_data += str((val >> bit_pos) & 1) + + # 3. Verify magic bytes + if len(binary_data) < 64: + debug.print(f"Not enough bits for header: {len(binary_data)}/64") + return None + + magic_bits = binary_data[:32] + magic_bytes = int(magic_bits, 2).to_bytes(4, "big") + + if magic_bytes != AUDIO_MAGIC_LSB: + debug.print(f"Magic mismatch: got {magic_bytes!r}, expected {AUDIO_MAGIC_LSB!r}") + return None + + debug.print("Magic bytes verified: AUDL") + + # 4. Parse length + length_bits = binary_data[32:64] + data_length = struct.unpack(">I", int(length_bits, 2).to_bytes(4, "big"))[0] + debug.print(f"Extracted length: {data_length} bytes") + + # Sanity check length + max_possible = (num_samples * bits_per_sample) // 8 - 8 # minus header + if data_length > max_possible or data_length < 1: + debug.print(f"Invalid data length: {data_length} (max possible: {max_possible})") + return None + + # 5. Extract full payload + total_bits = (8 + data_length) * 8 # header (8 bytes) + payload + total_samples_needed = (total_bits + bits_per_sample - 1) // bits_per_sample + + if total_samples_needed > num_samples: + debug.print( + f"Need {total_samples_needed} samples but only {num_samples} available" + ) + return None + + debug.print(f"Need {total_samples_needed} samples to extract {data_length} bytes") + + selected_indices = generate_sample_indices(sample_key, num_samples, total_samples_needed) + + # Initial progress + if progress_file: + _write_progress(progress_file, 5, 100, "extracting") + + binary_data = "" + for progress_idx, sample_idx in enumerate(selected_indices): + val = int(flat_samples[sample_idx]) & 0xFFFF + for bit_pos in range(bits_per_sample - 1, -1, -1): + binary_data += str((val >> bit_pos) & 1) + + if progress_file and progress_idx % PROGRESS_INTERVAL == 0: + _write_progress( + progress_file, progress_idx, total_samples_needed, "extracting" + ) + + if progress_file: + _write_progress( + progress_file, total_samples_needed, total_samples_needed, "extracting" + ) + + # Skip the 8-byte header (magic + length) = 64 bits + data_bits = binary_data[64 : 64 + (data_length * 8)] + + if len(data_bits) < data_length * 8: + debug.print(f"Insufficient bits: {len(data_bits)} < {data_length * 8}") + return None + + # Convert bits back to bytes + data_bytes = bytearray() + for i in range(0, len(data_bits), 8): + byte_bits = data_bits[i : i + 8] + if len(byte_bits) == 8: + data_bytes.append(int(byte_bits, 2)) + + debug.print(f"Audio LSB successfully extracted {len(data_bytes)} bytes") + return bytes(data_bytes) + + except Exception as e: + debug.exception(e, "extract_from_audio_lsb") + return None diff --git a/src/stegasoo/audio_utils.py b/src/stegasoo/audio_utils.py new file mode 100644 index 0000000..598a7a0 --- /dev/null +++ b/src/stegasoo/audio_utils.py @@ -0,0 +1,536 @@ +""" +Stegasoo Audio Utilities (v4.3.0) + +Audio format detection, transcoding, and metadata extraction for audio steganography. + +Dependencies: +- soundfile (sf): Fast WAV/FLAC reading without ffmpeg +- pydub: MP3/OGG/AAC transcoding (wraps ffmpeg) + +Both are optional — functions degrade gracefully when unavailable. +""" + +from __future__ import annotations + +import io +import logging +import shutil + +from .constants import ( + EMBED_MODE_AUDIO_AUTO, + MAX_AUDIO_DURATION, + MAX_AUDIO_FILE_SIZE, + MAX_AUDIO_SAMPLE_RATE, + MIN_AUDIO_SAMPLE_RATE, + VALID_AUDIO_EMBED_MODES, +) +from .exceptions import AudioTranscodeError, AudioValidationError, UnsupportedAudioFormatError +from .models import AudioInfo, ValidationResult + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# FFMPEG AVAILABILITY +# ============================================================================= + + +def has_ffmpeg_support() -> bool: + """Check if ffmpeg is available on the system. + + Returns: + True if ffmpeg is found on PATH, False otherwise. + """ + return shutil.which("ffmpeg") is not None + + +# ============================================================================= +# FORMAT DETECTION +# ============================================================================= + + +def detect_audio_format(audio_data: bytes) -> str: + """Detect audio format from magic bytes. + + Examines the first bytes of audio data to identify the container format. + + Magic byte signatures: + - WAV: b"RIFF" at offset 0 + b"WAVE" at offset 8 + - FLAC: b"fLaC" at offset 0 + - MP3: b"\\xff\\xfb", b"\\xff\\xf3", b"\\xff\\xf2" (sync bytes) or b"ID3" (ID3 tag) + - OGG (Vorbis/Opus): b"OggS" at offset 0 + - AAC: b"\\xff\\xf1" or b"\\xff\\xf9" (ADTS header) + - M4A/MP4: b"ftyp" at offset 4 + + Args: + audio_data: Raw audio file bytes. + + Returns: + Format string: "wav", "flac", "mp3", "ogg", "aac", "m4a", or "unknown". + """ + if len(audio_data) < 12: + return "unknown" + + # WAV: RIFF....WAVE + if audio_data[:4] == b"RIFF" and audio_data[8:12] == b"WAVE": + return "wav" + + # FLAC + if audio_data[:4] == b"fLaC": + return "flac" + + # OGG (Vorbis or Opus) + if audio_data[:4] == b"OggS": + return "ogg" + + # MP3 with ID3 tag + if audio_data[:3] == b"ID3": + return "mp3" + + # MP3 sync bytes (MPEG audio frame header) + if len(audio_data) >= 2 and audio_data[:2] in (b"\xff\xfb", b"\xff\xf3", b"\xff\xf2"): + return "mp3" + + # M4A/MP4 container: "ftyp" at offset 4 + if audio_data[4:8] == b"ftyp": + return "m4a" + + # AAC ADTS header + if len(audio_data) >= 2 and audio_data[:2] in (b"\xff\xf1", b"\xff\xf9"): + return "aac" + + return "unknown" + + +# ============================================================================= +# TRANSCODING +# ============================================================================= + + +def transcode_to_wav(audio_data: bytes) -> bytes: + """Transcode any supported audio format to WAV PCM format. + + Uses soundfile directly for WAV/FLAC (no ffmpeg needed). + Uses pydub (wraps ffmpeg) for lossy formats (MP3, OGG, AAC, M4A). + + Args: + audio_data: Raw audio file bytes in any supported format. + + Returns: + WAV PCM file bytes (16-bit, original sample rate). + + Raises: + AudioTranscodeError: If transcoding fails. + UnsupportedAudioFormatError: If the format cannot be detected. + """ + fmt = detect_audio_format(audio_data) + + if fmt == "unknown": + raise UnsupportedAudioFormatError( + "Cannot detect audio format. Supported: WAV, FLAC, MP3, OGG, AAC, M4A." + ) + + # WAV files: validate with soundfile but return as-is if already PCM + if fmt == "wav": + try: + import soundfile as sf + + buf = io.BytesIO(audio_data) + info = sf.info(buf) + if info.subtype in ("PCM_16", "PCM_24", "PCM_32", "FLOAT", "DOUBLE"): + # Re-encode to ensure consistent PCM_16 output + buf.seek(0) + data, samplerate = sf.read(buf, dtype="int16") + out = io.BytesIO() + sf.write(out, data, samplerate, format="WAV", subtype="PCM_16") + return out.getvalue() + except ImportError: + raise AudioTranscodeError("soundfile package is required for WAV processing") + except Exception as e: + raise AudioTranscodeError(f"Failed to process WAV: {e}") + + # FLAC: use soundfile (fast, no ffmpeg) + if fmt == "flac": + try: + import soundfile as sf + + buf = io.BytesIO(audio_data) + data, samplerate = sf.read(buf, dtype="int16") + out = io.BytesIO() + sf.write(out, data, samplerate, format="WAV", subtype="PCM_16") + return out.getvalue() + except ImportError: + raise AudioTranscodeError("soundfile package is required for FLAC processing") + except Exception as e: + raise AudioTranscodeError(f"Failed to transcode FLAC to WAV: {e}") + + # Lossy formats (MP3, OGG, AAC, M4A): use pydub + ffmpeg + return _transcode_with_pydub(audio_data, fmt, "wav") + + +def transcode_to_mp3(audio_data: bytes, bitrate: str = "256k") -> bytes: + """Transcode audio to MP3 format. + + Uses pydub (wraps ffmpeg) for transcoding. + + Args: + audio_data: Raw audio file bytes in any supported format. + bitrate: Target MP3 bitrate (e.g., "128k", "192k", "256k", "320k"). + + Returns: + MP3 file bytes. + + Raises: + AudioTranscodeError: If transcoding fails or pydub/ffmpeg unavailable. + """ + fmt = detect_audio_format(audio_data) + + if fmt == "unknown": + raise UnsupportedAudioFormatError( + "Cannot detect audio format. Supported: WAV, FLAC, MP3, OGG, AAC, M4A." + ) + + try: + from pydub import AudioSegment + except ImportError: + raise AudioTranscodeError( + "pydub package is required for MP3 transcoding. Install with: pip install pydub" + ) + + if not has_ffmpeg_support(): + raise AudioTranscodeError( + "ffmpeg is required for MP3 transcoding. Install ffmpeg on your system." + ) + + try: + # Map our format names to pydub format names + pydub_fmt = _pydub_format(fmt) + buf = io.BytesIO(audio_data) + audio = AudioSegment.from_file(buf, format=pydub_fmt) + + out = io.BytesIO() + audio.export(out, format="mp3", bitrate=bitrate) + return out.getvalue() + except Exception as e: + raise AudioTranscodeError(f"Failed to transcode to MP3: {e}") + + +def _transcode_with_pydub(audio_data: bytes, src_fmt: str, dst_fmt: str) -> bytes: + """Transcode audio using pydub (requires ffmpeg). + + Args: + audio_data: Raw audio bytes. + src_fmt: Source format string (our naming). + dst_fmt: Destination format string ("wav" or "mp3"). + + Returns: + Transcoded audio bytes. + + Raises: + AudioTranscodeError: If transcoding fails. + """ + try: + from pydub import AudioSegment + except ImportError: + raise AudioTranscodeError( + "pydub package is required for audio transcoding. Install with: pip install pydub" + ) + + if not has_ffmpeg_support(): + raise AudioTranscodeError( + "ffmpeg is required for audio transcoding. Install ffmpeg on your system." + ) + + try: + pydub_fmt = _pydub_format(src_fmt) + buf = io.BytesIO(audio_data) + audio = AudioSegment.from_file(buf, format=pydub_fmt) + + out = io.BytesIO() + if dst_fmt == "wav": + audio.export(out, format="wav") + else: + audio.export(out, format=dst_fmt) + return out.getvalue() + except Exception as e: + raise AudioTranscodeError(f"Failed to transcode {src_fmt} to {dst_fmt}: {e}") + + +def _pydub_format(fmt: str) -> str: + """Map our format names to pydub/ffmpeg format names. + + Args: + fmt: Our internal format name. + + Returns: + pydub-compatible format string. + """ + mapping = { + "wav": "wav", + "flac": "flac", + "mp3": "mp3", + "ogg": "ogg", + "aac": "aac", + "m4a": "m4a", + } + return mapping.get(fmt, fmt) + + +# ============================================================================= +# METADATA EXTRACTION +# ============================================================================= + + +def get_audio_info(audio_data: bytes) -> AudioInfo: + """Extract audio metadata from raw audio bytes. + + Uses soundfile for WAV/FLAC (fast, no ffmpeg dependency). + Falls back to pydub for other formats (requires ffmpeg). + + Args: + audio_data: Raw audio file bytes. + + Returns: + AudioInfo dataclass with sample rate, channels, duration, etc. + + Raises: + UnsupportedAudioFormatError: If the format cannot be detected. + AudioTranscodeError: If metadata extraction fails. + """ + fmt = detect_audio_format(audio_data) + + if fmt == "unknown": + raise UnsupportedAudioFormatError( + "Cannot detect audio format. Supported: WAV, FLAC, MP3, OGG, AAC, M4A." + ) + + # WAV and FLAC: use soundfile (fast) + if fmt in ("wav", "flac"): + return _get_info_soundfile(audio_data, fmt) + + # Lossy formats: use pydub + return _get_info_pydub(audio_data, fmt) + + +def _get_info_soundfile(audio_data: bytes, fmt: str) -> AudioInfo: + """Extract audio info using soundfile (WAV/FLAC). + + Args: + audio_data: Raw audio bytes. + fmt: Format string ("wav" or "flac"). + + Returns: + AudioInfo with metadata. + """ + try: + import soundfile as sf + except ImportError: + raise AudioTranscodeError("soundfile package is required. Install with: pip install soundfile") + + try: + buf = io.BytesIO(audio_data) + info = sf.info(buf) + + # Determine bit depth from subtype + bit_depth = _bit_depth_from_subtype(info.subtype) + + return AudioInfo( + sample_rate=info.samplerate, + channels=info.channels, + duration_seconds=info.duration, + num_samples=info.frames, + format=fmt, + bitrate=None, + bit_depth=bit_depth, + ) + except Exception as e: + raise AudioTranscodeError(f"Failed to read {fmt.upper()} metadata: {e}") + + +def _bit_depth_from_subtype(subtype: str) -> int | None: + """Determine bit depth from soundfile subtype string. + + Args: + subtype: Soundfile subtype (e.g., "PCM_16", "PCM_24", "FLOAT"). + + Returns: + Bit depth as integer, or None if unknown. + """ + subtype_map = { + "PCM_S8": 8, + "PCM_U8": 8, + "PCM_16": 16, + "PCM_24": 24, + "PCM_32": 32, + "FLOAT": 32, + "DOUBLE": 64, + } + return subtype_map.get(subtype) + + +def _get_info_pydub(audio_data: bytes, fmt: str) -> AudioInfo: + """Extract audio info using pydub (lossy formats). + + Args: + audio_data: Raw audio bytes. + fmt: Format string ("mp3", "ogg", "aac", "m4a"). + + Returns: + AudioInfo with metadata. + """ + try: + from pydub import AudioSegment + except ImportError: + raise AudioTranscodeError( + "pydub package is required for audio metadata. Install with: pip install pydub" + ) + + if not has_ffmpeg_support(): + raise AudioTranscodeError( + "ffmpeg is required for audio metadata extraction. Install ffmpeg on your system." + ) + + try: + pydub_fmt = _pydub_format(fmt) + buf = io.BytesIO(audio_data) + audio = AudioSegment.from_file(buf, format=pydub_fmt) + + num_samples = int(audio.frame_count()) + duration = audio.duration_seconds + sample_rate = audio.frame_rate + channels = audio.channels + + # Estimate bitrate from file size and duration + bitrate = None + if duration > 0: + bitrate = int((len(audio_data) * 8) / duration) + + return AudioInfo( + sample_rate=sample_rate, + channels=channels, + duration_seconds=duration, + num_samples=num_samples, + format=fmt, + bitrate=bitrate, + bit_depth=audio.sample_width * 8 if audio.sample_width else None, + ) + except Exception as e: + raise AudioTranscodeError(f"Failed to read {fmt.upper()} metadata: {e}") + + +# ============================================================================= +# VALIDATION +# ============================================================================= + + +def validate_audio( + audio_data: bytes, + name: str = "Audio", + check_duration: bool = True, +) -> ValidationResult: + """Validate audio data for steganography. + + Checks: + - Not empty + - Not too large (MAX_AUDIO_FILE_SIZE) + - Valid audio format (detectable via magic bytes) + - Duration within limits (MAX_AUDIO_DURATION) if check_duration=True + - Sample rate within limits (MIN_AUDIO_SAMPLE_RATE to MAX_AUDIO_SAMPLE_RATE) + + Args: + audio_data: Raw audio file bytes. + name: Descriptive name for error messages (default: "Audio"). + check_duration: Whether to enforce duration limit (default: True). + + Returns: + ValidationResult with audio info in details (sample_rate, channels, + duration, num_samples, format) on success. + """ + if not audio_data: + return ValidationResult.error(f"{name} is required") + + if len(audio_data) > MAX_AUDIO_FILE_SIZE: + size_mb = len(audio_data) / (1024 * 1024) + max_mb = MAX_AUDIO_FILE_SIZE / (1024 * 1024) + return ValidationResult.error( + f"{name} too large ({size_mb:.1f} MB). Maximum: {max_mb:.0f} MB" + ) + + # Detect format + fmt = detect_audio_format(audio_data) + if fmt == "unknown": + return ValidationResult.error( + f"Could not detect {name} format. " + "Supported formats: WAV, FLAC, MP3, OGG, AAC, M4A." + ) + + # Extract metadata for further validation + try: + info = get_audio_info(audio_data) + except (AudioTranscodeError, UnsupportedAudioFormatError) as e: + return ValidationResult.error(f"Could not read {name}: {e}") + except Exception as e: + return ValidationResult.error(f"Could not read {name}: {e}") + + # Check duration + if check_duration and info.duration_seconds > MAX_AUDIO_DURATION: + return ValidationResult.error( + f"{name} too long ({info.duration_seconds:.1f}s). " + f"Maximum: {MAX_AUDIO_DURATION}s ({MAX_AUDIO_DURATION // 60} minutes)" + ) + + # Check sample rate + if info.sample_rate < MIN_AUDIO_SAMPLE_RATE: + return ValidationResult.error( + f"{name} sample rate too low ({info.sample_rate} Hz). " + f"Minimum: {MIN_AUDIO_SAMPLE_RATE} Hz" + ) + + if info.sample_rate > MAX_AUDIO_SAMPLE_RATE: + return ValidationResult.error( + f"{name} sample rate too high ({info.sample_rate} Hz). " + f"Maximum: {MAX_AUDIO_SAMPLE_RATE} Hz" + ) + + return ValidationResult.ok( + sample_rate=info.sample_rate, + channels=info.channels, + duration=info.duration_seconds, + num_samples=info.num_samples, + format=info.format, + bitrate=info.bitrate, + bit_depth=info.bit_depth, + ) + + +def require_valid_audio(audio_data: bytes, name: str = "Audio") -> None: + """Validate audio, raising AudioValidationError on failure. + + Args: + audio_data: Raw audio file bytes. + name: Descriptive name for error messages. + + Raises: + AudioValidationError: If validation fails. + """ + result = validate_audio(audio_data, name) + if not result.is_valid: + raise AudioValidationError(result.error_message) + + +def validate_audio_embed_mode(mode: str) -> ValidationResult: + """Validate audio embedding mode string. + + Args: + mode: Embedding mode to validate (e.g., "audio_lsb", "audio_mdct", "audio_auto"). + + Returns: + ValidationResult with mode in details on success. + """ + valid_modes = VALID_AUDIO_EMBED_MODES | {EMBED_MODE_AUDIO_AUTO} + if mode not in valid_modes: + return ValidationResult.error( + f"Invalid audio embed_mode: '{mode}'. " + f"Valid options: {', '.join(sorted(valid_modes))}" + ) + return ValidationResult.ok(mode=mode) diff --git a/src/stegasoo/cli.py b/src/stegasoo/cli.py index 8587654..e1e1b30 100644 --- a/src/stegasoo/cli.py +++ b/src/stegasoo/cli.py @@ -404,6 +404,219 @@ def decode(ctx, image, reference, passphrase, pin, output): raise SystemExit(1) +# ============================================================================= +# AUDIO COMMANDS (v4.3.0) +# ============================================================================= + + +@cli.command("audio-encode") +@click.argument("carrier", type=click.Path(exists=True)) +@click.option( + "-r", + "--reference", + required=True, + type=click.Path(exists=True), + help="Reference photo (shared secret)", +) +@click.option("-m", "--message", help="Message to encode") +@click.option( + "-f", + "--file", + "file_payload", + type=click.Path(exists=True), + help="File to embed instead of message", +) +@click.option("-o", "--output", type=click.Path(), help="Output audio path") +@click.option( + "--mode", + "embed_mode", + default="audio_lsb", + type=click.Choice(["audio_lsb", "audio_spread"]), + help="Embedding mode", +) +@click.option( + "--passphrase", + prompt=True, + hide_input=True, + confirmation_prompt=True, + help="Passphrase (recommend 4+ words)", +) +@click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code") +@click.pass_context +def audio_encode(ctx, carrier, reference, message, file_payload, output, embed_mode, passphrase, pin): + """ + Encode a message or file into an audio carrier. + + Examples: + + stegasoo audio-encode carrier.wav -r ref.jpg -m "Secret" --mode audio_lsb + + stegasoo audio-encode carrier.wav -r ref.jpg -f secret.pdf --mode audio_spread + """ + from .encode import encode_audio + from .models import FilePayload + + if not message and not file_payload: + raise click.UsageError("Either --message or --file is required") + + # Read input files + with open(reference, "rb") as f: + reference_data = f.read() + with open(carrier, "rb") as f: + carrier_data = f.read() + + # Determine output path + if not output: + carrier_path = Path(carrier) + if embed_mode == "audio_lsb": + output = f"{carrier_path.stem}_encoded.wav" + else: + output = f"{carrier_path.stem}_encoded.wav" + + try: + if file_payload: + payload = FilePayload.from_file(file_payload) + else: + payload = message + + stego_audio, stats = encode_audio( + message=payload, + reference_photo=reference_data, + carrier_audio=carrier_data, + passphrase=passphrase, + pin=pin, + embed_mode=embed_mode, + ) + + with open(output, "wb") as f: + f.write(stego_audio) + + if ctx.obj.get("json"): + click.echo( + json.dumps( + { + "status": "success", + "carrier": carrier, + "reference": reference, + "output": output, + "mode": stats.embed_mode, + "samples_modified": stats.samples_modified, + "duration_seconds": round(stats.duration_seconds, 2), + "capacity_used": round(stats.capacity_used * 100, 1), + }, + indent=2, + ) + ) + else: + click.echo(f"✓ Encoded to {output}") + click.echo(f" Mode: {stats.embed_mode}") + click.echo(f" Duration: {stats.duration_seconds:.1f}s") + click.echo(f" Capacity used: {stats.capacity_used * 100:.1f}%") + + except Exception as e: + if ctx.obj.get("json"): + click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2)) + else: + click.echo(f"✗ Audio encoding failed: {e}", err=True) + raise SystemExit(1) + + +@cli.command("audio-decode") +@click.argument("audio", type=click.Path(exists=True)) +@click.option( + "-r", + "--reference", + required=True, + type=click.Path(exists=True), + help="Reference photo (shared secret)", +) +@click.option( + "--mode", + "embed_mode", + default="audio_auto", + type=click.Choice(["audio_auto", "audio_lsb", "audio_spread"]), + help="Embedding mode (auto-detect by default)", +) +@click.option("--passphrase", prompt=True, hide_input=True, help="Passphrase") +@click.option("--pin", prompt=True, hide_input=True, help="PIN code") +@click.option("-o", "--output", type=click.Path(), help="Output path for file payloads") +@click.pass_context +def audio_decode(ctx, audio, reference, embed_mode, passphrase, pin, output): + """ + Decode a message or file from stego audio. + + Examples: + + stegasoo audio-decode stego.wav -r ref.jpg + + stegasoo audio-decode stego.wav -r ref.jpg --mode audio_lsb -o ./extracted/ + """ + from .decode import decode_audio + + with open(audio, "rb") as f: + audio_data = f.read() + with open(reference, "rb") as f: + reference_data = f.read() + + try: + result = decode_audio( + stego_audio=audio_data, + reference_photo=reference_data, + passphrase=passphrase, + pin=pin, + embed_mode=embed_mode, + ) + + if result.is_file: + filename = result.filename or "decoded_file" + output_path = Path(output) / filename if output else Path(filename) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, "wb") as f: + f.write(result.file_data) + + if ctx.obj.get("json"): + click.echo( + json.dumps( + { + "status": "success", + "audio": audio, + "payload_type": "file", + "filename": filename, + "output": str(output_path), + "size": len(result.file_data), + }, + indent=2, + ) + ) + else: + click.echo(f"✓ Extracted file: {output_path}") + click.echo(f" Size: {len(result.file_data):,} bytes") + else: + if ctx.obj.get("json"): + click.echo( + json.dumps( + { + "status": "success", + "audio": audio, + "payload_type": "text", + "message": result.message, + }, + indent=2, + ) + ) + else: + click.echo(f"Decoded from {audio}:") + click.echo(result.message) + + except Exception as e: + if ctx.obj.get("json"): + click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2)) + else: + click.echo(f"✗ Audio decoding failed: {e}", err=True) + raise SystemExit(1) + + # ============================================================================= # BATCH COMMANDS # ============================================================================= diff --git a/src/stegasoo/constants.py b/src/stegasoo/constants.py index e414da5..2c5ae43 100644 --- a/src/stegasoo/constants.py +++ b/src/stegasoo/constants.py @@ -295,3 +295,36 @@ def detect_stego_mode(encrypted_data: bytes) -> str: return EMBED_MODE_DCT else: return "unknown" + + +# ============================================================================= +# AUDIO STEGANOGRAPHY (v4.3.0) +# ============================================================================= + +# Audio embedding modes +EMBED_MODE_AUDIO_LSB = "audio_lsb" +EMBED_MODE_AUDIO_SPREAD = "audio_spread" +EMBED_MODE_AUDIO_AUTO = "audio_auto" +VALID_AUDIO_EMBED_MODES = {EMBED_MODE_AUDIO_LSB, EMBED_MODE_AUDIO_SPREAD} + +# Audio magic bytes (for format detection in stego audio) +AUDIO_MAGIC_LSB = b"AUDL" +AUDIO_MAGIC_SPREAD = b"AUDS" + +# Audio input limits +MAX_AUDIO_DURATION = 600 # 10 minutes +MAX_AUDIO_FILE_SIZE = 100 * 1024 * 1024 # 100 MB +MIN_AUDIO_SAMPLE_RATE = 8000 # G.729 level +MAX_AUDIO_SAMPLE_RATE = 192000 # Studio quality +ALLOWED_AUDIO_EXTENSIONS = {"wav", "flac", "mp3", "ogg", "opus", "aac", "m4a", "wma"} + +# Spread spectrum parameters +AUDIO_SS_CHIP_LENGTH = 1024 # Samples per chip (spreading factor) +AUDIO_SS_AMPLITUDE = 0.05 # Per-sample embedding strength (~-26dB, masked by audio) +AUDIO_SS_RS_NSYM = 32 # Reed-Solomon parity symbols + +# Echo hiding parameters +AUDIO_ECHO_DELAY_0 = 50 # Echo delay for bit 0 (samples at 44.1kHz ~ 1.1ms) +AUDIO_ECHO_DELAY_1 = 100 # Echo delay for bit 1 (samples at 44.1kHz ~ 2.3ms) +AUDIO_ECHO_AMPLITUDE = 0.3 # Echo strength (relative to original) +AUDIO_ECHO_WINDOW_SIZE = 8192 # Window size for echo embedding diff --git a/src/stegasoo/decode.py b/src/stegasoo/decode.py index 0664bc9..1390d9b 100644 --- a/src/stegasoo/decode.py +++ b/src/stegasoo/decode.py @@ -261,3 +261,117 @@ def decode_text( return "" return result.message or "" + + +def decode_audio( + stego_audio: bytes, + reference_photo: bytes, + passphrase: str, + pin: str = "", + rsa_key_data: bytes | None = None, + rsa_password: str | None = None, + embed_mode: str = "audio_auto", + channel_key: str | bool | None = None, + progress_file: str | None = None, +) -> DecodeResult: + """ + Decode a message or file from stego audio. + + Args: + stego_audio: Stego audio bytes + reference_photo: Shared reference photo bytes + passphrase: Shared passphrase + pin: Optional static PIN + rsa_key_data: Optional RSA key bytes + rsa_password: Optional RSA key password + embed_mode: 'audio_auto', 'audio_lsb', or 'audio_spread' + channel_key: Channel key for deployment/group isolation + progress_file: Optional path to write progress JSON + + Returns: + DecodeResult with message or file data + """ + from .audio_utils import detect_audio_format, transcode_to_wav + from .constants import ( + EMBED_MODE_AUDIO_AUTO, + EMBED_MODE_AUDIO_LSB, + EMBED_MODE_AUDIO_SPREAD, + ) + + debug.print( + f"decode_audio: mode={embed_mode}, " + f"passphrase length={len(passphrase.split())} words" + ) + + # Validate inputs + require_valid_image(reference_photo, "Reference photo") + require_security_factors(pin, rsa_key_data) + + if pin: + require_valid_pin(pin) + if rsa_key_data: + require_valid_rsa_key(rsa_key_data, rsa_password) + + # Detect format and transcode to WAV for processing + audio_format = detect_audio_format(stego_audio) + debug.print(f"Detected audio format: {audio_format}") + + wav_audio = stego_audio + if audio_format != "wav": + debug.print(f"Transcoding {audio_format} to WAV for extraction") + wav_audio = transcode_to_wav(stego_audio) + + _write_progress(progress_file, 20, 100, "initializing") + + # Derive sample selection key + from .crypto import derive_pixel_key + + pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key) + + _write_progress(progress_file, 25, 100, "extracting") + + encrypted = None + + if embed_mode == EMBED_MODE_AUDIO_AUTO: + # Try modes in order: spread spectrum -> LSB + try: + from .spread_steganography import extract_from_audio_spread + + encrypted = extract_from_audio_spread(wav_audio, pixel_key) + if encrypted: + debug.print("Auto-detect: spread spectrum extraction succeeded") + except (ImportError, Exception): + pass + + if not encrypted: + from .audio_steganography import extract_from_audio_lsb + + encrypted = extract_from_audio_lsb(wav_audio, pixel_key) + if encrypted: + debug.print("Auto-detect: LSB extraction succeeded") + + elif embed_mode == EMBED_MODE_AUDIO_LSB: + from .audio_steganography import extract_from_audio_lsb + + encrypted = extract_from_audio_lsb(wav_audio, pixel_key, progress_file=progress_file) + + elif embed_mode == EMBED_MODE_AUDIO_SPREAD: + from .spread_steganography import extract_from_audio_spread + + encrypted = extract_from_audio_spread( + wav_audio, pixel_key, progress_file=progress_file + ) + else: + raise ValueError(f"Invalid audio embed mode: {embed_mode}") + + if not encrypted: + debug.print("No data extracted from audio") + raise ExtractionError("Could not extract data from audio. Check your credentials.") + + debug.print(f"Extracted {len(encrypted)} bytes from audio") + + # Decrypt + result = decrypt_message(encrypted, reference_photo, passphrase, pin, rsa_key_data, channel_key) + + debug.print(f"Decryption successful: {result.payload_type}") + return result diff --git a/src/stegasoo/encode.py b/src/stegasoo/encode.py index 6dfb585..5a3f670 100644 --- a/src/stegasoo/encode.py +++ b/src/stegasoo/encode.py @@ -5,9 +5,15 @@ High-level encoding functions for hiding messages and files in images. Changes in v4.0.0: - Added channel_key parameter for deployment/group isolation + +Changes in v4.3.0: +- Added encode_audio() for audio steganography """ +from __future__ import annotations + from pathlib import Path +from typing import TYPE_CHECKING from .constants import EMBED_MODE_LSB from .crypto import derive_pixel_key, encrypt_message @@ -23,6 +29,9 @@ from .validation import ( require_valid_rsa_key, ) +if TYPE_CHECKING: + from .models import AudioEmbedStats + def encode( message: str | bytes | FilePayload, @@ -258,3 +267,88 @@ def encode_bytes( dct_color_mode=dct_color_mode, channel_key=channel_key, ) + + +def encode_audio( + message: str | bytes | FilePayload, + reference_photo: bytes, + carrier_audio: bytes, + passphrase: str, + pin: str = "", + rsa_key_data: bytes | None = None, + rsa_password: str | None = None, + embed_mode: str = "audio_lsb", + channel_key: str | bool | None = None, + progress_file: str | None = None, +) -> tuple[bytes, AudioEmbedStats]: + """ + Encode a message or file into an audio carrier. + + Args: + message: Text message, raw bytes, or FilePayload to hide + reference_photo: Shared reference photo bytes + carrier_audio: Carrier audio bytes (WAV, FLAC, MP3, etc.) + passphrase: Shared passphrase + pin: Optional static PIN + rsa_key_data: Optional RSA private key PEM bytes + rsa_password: Optional password for encrypted RSA key + embed_mode: 'audio_lsb' or 'audio_spread' + channel_key: Channel key for deployment/group isolation + progress_file: Optional path to write progress JSON + + Returns: + Tuple of (stego audio bytes, AudioEmbedStats) + """ + from .audio_utils import detect_audio_format, transcode_to_wav + from .constants import EMBED_MODE_AUDIO_LSB, EMBED_MODE_AUDIO_SPREAD + + debug.print( + f"encode_audio: mode={embed_mode}, " + f"passphrase length={len(passphrase.split())} words, " + f"pin={'set' if pin else 'none'}" + ) + + # Validate inputs + require_valid_payload(message) + require_valid_image(reference_photo, "Reference photo") + require_security_factors(pin, rsa_key_data) + + if pin: + require_valid_pin(pin) + if rsa_key_data: + require_valid_rsa_key(rsa_key_data, rsa_password) + + # Detect audio format and transcode to WAV if needed + audio_format = detect_audio_format(carrier_audio) + debug.print(f"Detected audio format: {audio_format}") + + if audio_format not in ("wav", "flac"): + debug.print(f"Transcoding {audio_format} to WAV for embedding") + carrier_audio = transcode_to_wav(carrier_audio) + + # Encrypt message + encrypted = encrypt_message( + message, reference_photo, passphrase, pin, rsa_key_data, channel_key + ) + debug.print(f"Encrypted payload: {len(encrypted)} bytes") + + # Derive sample selection key + pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key) + + # Embed based on mode + if embed_mode == EMBED_MODE_AUDIO_LSB: + from .audio_steganography import embed_in_audio_lsb + + stego_audio, stats = embed_in_audio_lsb( + encrypted, carrier_audio, pixel_key, progress_file=progress_file + ) + elif embed_mode == EMBED_MODE_AUDIO_SPREAD: + from .spread_steganography import embed_in_audio_spread + + stego_audio, stats = embed_in_audio_spread( + encrypted, carrier_audio, pixel_key, progress_file=progress_file + ) + else: + raise ValueError(f"Invalid audio embed mode: {embed_mode}") + + return stego_audio, stats diff --git a/src/stegasoo/exceptions.py b/src/stegasoo/exceptions.py index 520e56b..57be0fd 100644 --- a/src/stegasoo/exceptions.py +++ b/src/stegasoo/exceptions.py @@ -195,3 +195,51 @@ class UnsupportedFileTypeError(FileError): super().__init__( f"Unsupported file type: .{extension}. Allowed: {', '.join(sorted(allowed))}" ) + + +# ============================================================================ +# AUDIO ERRORS +# ============================================================================ + + +class AudioError(SteganographyError): + """Base class for audio steganography errors.""" + + pass + + +class AudioValidationError(ValidationError): + """Audio validation failed.""" + + pass + + +class AudioCapacityError(CapacityError): + """Audio carrier too small for message.""" + + def __init__(self, needed: int, available: int): + self.needed = needed + self.available = available + # Call SteganographyError.__init__ directly (skip CapacityError's image-specific message) + SteganographyError.__init__( + self, + f"Audio carrier too small. Need {needed:,} bytes, have {available:,} bytes capacity.", + ) + + +class AudioExtractionError(ExtractionError): + """Failed to extract hidden data from audio.""" + + pass + + +class AudioTranscodeError(AudioError): + """Audio transcoding failed.""" + + pass + + +class UnsupportedAudioFormatError(AudioError): + """Audio format not supported.""" + + pass diff --git a/src/stegasoo/models.py b/src/stegasoo/models.py index 3ea9245..1c5c795 100644 --- a/src/stegasoo/models.py +++ b/src/stegasoo/models.py @@ -281,3 +281,51 @@ class GenerateResult: lines.append(f" RSA Key: {len(self.rsa_key_pem)} bytes PEM") lines.append(f" Total Entropy: {self.total_entropy} bits") return "\n".join(lines) + + +# ============================================================================= +# AUDIO STEGANOGRAPHY MODELS (v4.3.0) +# ============================================================================= + + +@dataclass +class AudioEmbedStats: + """Statistics from audio embedding.""" + + samples_modified: int + total_samples: int + capacity_used: float # 0.0 - 1.0 + bytes_embedded: int + sample_rate: int + channels: int + duration_seconds: float + embed_mode: str # "audio_lsb" or "audio_spread" + + @property + def modification_percent(self) -> float: + """Percentage of samples modified.""" + return (self.samples_modified / self.total_samples) * 100 if self.total_samples > 0 else 0 + + +@dataclass +class AudioInfo: + """Information about an audio file.""" + + sample_rate: int + channels: int + duration_seconds: float + num_samples: int + format: str # "wav", "flac", "mp3", etc. + bitrate: int | None = None # For lossy formats + bit_depth: int | None = None # For lossless formats + + +@dataclass +class AudioCapacityInfo: + """Capacity information for audio steganography.""" + + total_samples: int + usable_capacity_bytes: int + embed_mode: str + sample_rate: int + duration_seconds: float diff --git a/src/stegasoo/spread_steganography.py b/src/stegasoo/spread_steganography.py new file mode 100644 index 0000000..77ef6f6 --- /dev/null +++ b/src/stegasoo/spread_steganography.py @@ -0,0 +1,735 @@ +""" +Spread Spectrum Audio Steganography Module (v4.3.0) + +Hides data in audio by adding keyed pseudo-random noise (spread spectrum) +below the threshold of audibility. Designed to survive lossy compression +(MP3, AAC, Opus) better than LSB embedding, which requires lossless carriers. + +How it works: + Each payload bit is "spread" over AUDIO_SS_CHIP_LENGTH audio samples using + a unique ChaCha20-derived chip sequence. A '1' bit adds the chip pattern; + a '0' bit subtracts it. On extraction, correlating the stego audio against + the same chip sequence recovers each bit -- even after moderate lossy + compression, because the correlation survives quantisation noise. + +Data layout in the carrier: + [4B magic AUDS] [4B length x3 copies] [RS-encoded payload] + All converted to bits and embedded sequentially via spread spectrum. + Three copies of the length field enable majority voting for recovery. + +Error correction: + The raw payload is protected with Reed-Solomon coding (AUDIO_SS_RS_NSYM + parity symbols per 255-byte block) so that bit errors introduced by + compression or DAC/ADC round-trips can be corrected transparently. + +Requires: soundfile, numpy, cryptography, reedsolo (optional but recommended) +""" + +from __future__ import annotations + +import io +import struct + +import numpy as np +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms + +from .constants import ( + AUDIO_MAGIC_SPREAD, + AUDIO_SS_AMPLITUDE, + AUDIO_SS_CHIP_LENGTH, + AUDIO_SS_RS_NSYM, + EMBED_MODE_AUDIO_SPREAD, +) +from .debug import debug +from .exceptions import AudioCapacityError, AudioError +from .models import AudioCapacityInfo, AudioEmbedStats + +# Lazy import for soundfile +try: + import soundfile as sf + + HAS_SOUNDFILE = True +except ImportError: + HAS_SOUNDFILE = False + sf = None # type: ignore[assignment] + +# Lazy import for reedsolo +try: + from reedsolo import ReedSolomonError, RSCodec + + HAS_REEDSOLO = True +except ImportError: + HAS_REEDSOLO = False + RSCodec = None # type: ignore[assignment,misc] + ReedSolomonError = None # type: ignore[assignment,misc] + + +# Header layout: 4B magic + 3 x 4B length = 16 bytes = 128 bits +_HEADER_SIZE = 16 +_MAGIC_SIZE = 4 +_LENGTH_COPIES = 3 + +# Progress reporting interval (every N bits) +_PROGRESS_INTERVAL = 500 + + +# ============================================================================= +# PROGRESS REPORTING +# ============================================================================= + + +def _write_progress( + progress_file: str | None, current: int, total: int, phase: str = "embedding" +) -> None: + """Write progress to file for frontend polling.""" + if progress_file is None: + return + try: + import json + + with open(progress_file, "w") as f: + json.dump( + { + "current": current, + "total": total, + "percent": round((current / total) * 100, 1) if total > 0 else 0, + "phase": phase, + }, + f, + ) + except Exception: + pass # Don't let progress writing break encoding + + +# ============================================================================= +# REED-SOLOMON +# ============================================================================= + + +def _rs_encode(data: bytes) -> bytes: + """ + Wrap data in Reed-Solomon error correction. + + Adds AUDIO_SS_RS_NSYM parity symbols per 255-byte block, allowing + recovery of up to RS_NSYM/2 byte errors per block. + """ + if not HAS_REEDSOLO: + return data + rs = RSCodec(AUDIO_SS_RS_NSYM) + return bytes(rs.encode(data)) + + +def _rs_decode(data: bytes) -> bytes | None: + """ + Decode Reed-Solomon protected data. + + Returns the corrected payload bytes, or None if the data is + too corrupted for error correction to recover. + """ + if not HAS_REEDSOLO: + return data + rs = RSCodec(AUDIO_SS_RS_NSYM) + try: + decoded, _, errata_pos = rs.decode(data) + if errata_pos: + debug.print(f"RS corrected {len(errata_pos)} byte errors") + return bytes(decoded) + except ReedSolomonError as e: + debug.print(f"RS decode failed (too many errors): {e}") + return None + + +# ============================================================================= +# CHIP SEQUENCE GENERATION (ChaCha20 CSPRNG) +# ============================================================================= + + +def _generate_chip_sequence(seed: bytes, chip_index: int, length: int) -> np.ndarray: + """ + Generate a pseudo-random chip sequence for spread spectrum embedding. + + Uses ChaCha20 as a CSPRNG keyed by ``seed``, with ``chip_index`` encoded + into the nonce so that each bit position gets a unique, deterministic + spreading code. + + Args: + seed: 32-byte key for ChaCha20. Padded/hashed to 32B if shorter. + chip_index: Index of the bit being embedded (used as nonce material). + length: Number of samples in the chip (AUDIO_SS_CHIP_LENGTH). + + Returns: + Float64 numpy array of ``length`` elements, normalised to unit energy. + """ + # Ensure seed is exactly 32 bytes + if len(seed) < 32: + import hashlib + + seed = hashlib.sha256(seed).digest() + elif len(seed) > 32: + seed = seed[:32] + + # Build a 16-byte nonce from chip_index (ChaCha20 uses 16B nonce in cryptography lib) + nonce = chip_index.to_bytes(16, byteorder="big") + + cipher = Cipher(algorithms.ChaCha20(seed, nonce), mode=None, backend=default_backend()) + encryptor = cipher.encryptor() + random_bytes = encryptor.update(b"\x00" * length) + + # Map bytes to bipolar ±1 spreading code (DSSS standard) + raw = np.frombuffer(random_bytes, dtype=np.uint8) + chip = np.where(raw < 128, np.float64(-1.0), np.float64(1.0)) + + return chip + + +# ============================================================================= +# SPREAD SPECTRUM CORE +# ============================================================================= + + +def _embed_spread_spectrum( + samples: np.ndarray, + bits: list[int], + seed: bytes, + amplitude: float, + offset: int = 0, + progress_file: str | None = None, +) -> np.ndarray: + """ + Embed bits into audio samples using direct-sequence spread spectrum. + + For each bit at index i: + - Generate the chip sequence for that index + - bit 1 -> add amplitude * chip to the carrier + - bit 0 -> subtract amplitude * chip from the carrier + + Args: + samples: 1-D float64 audio samples (modified in-place and returned). + bits: List of 0/1 ints to embed. + seed: 32-byte key for chip generation. + amplitude: Embedding strength (AUDIO_SS_AMPLITUDE). + offset: Sample offset at which spread embedding begins. + progress_file: Optional path for progress JSON. + + Returns: + Modified samples array. + """ + total_bits = len(bits) + for i, bit in enumerate(bits): + start = offset + i * AUDIO_SS_CHIP_LENGTH + end = start + AUDIO_SS_CHIP_LENGTH + + if end > len(samples): + debug.print(f"Warning: ran out of samples at bit {i}/{total_bits}") + break + + chip = _generate_chip_sequence(seed, i, AUDIO_SS_CHIP_LENGTH) + + if bit == 1: + samples[start:end] += amplitude * chip + else: + samples[start:end] -= amplitude * chip + + if progress_file and i % _PROGRESS_INTERVAL == 0: + _write_progress(progress_file, i, total_bits, "embedding") + + return samples + + +def _extract_spread_spectrum( + samples: np.ndarray, + num_bits: int, + seed: bytes, + offset: int = 0, + progress_file: str | None = None, +) -> list[int]: + """ + Extract bits from audio using spread spectrum correlation. + + For each bit index i, correlate the sample window with the chip + sequence. Positive correlation -> 1, negative -> 0. + + Args: + samples: 1-D float64 audio samples. + num_bits: Number of bits to extract. + seed: 32-byte key (must match embedding key). + offset: Sample offset where spread data begins. + progress_file: Optional path for progress JSON. + + Returns: + List of extracted 0/1 ints. + """ + bits: list[int] = [] + for i in range(num_bits): + start = offset + i * AUDIO_SS_CHIP_LENGTH + end = start + AUDIO_SS_CHIP_LENGTH + + if end > len(samples): + debug.print(f"Warning: ran out of samples at bit {i}/{num_bits}") + break + + chip = _generate_chip_sequence(seed, i, AUDIO_SS_CHIP_LENGTH) + correlation = np.dot(samples[start:end], chip) + bits.append(1 if correlation > 0 else 0) + + if progress_file and i % _PROGRESS_INTERVAL == 0: + _write_progress(progress_file, i, num_bits, "extracting") + + return bits + + +# ============================================================================= +# BIT CONVERSION UTILITIES +# ============================================================================= + + +def _bytes_to_bits(data: bytes) -> list[int]: + """Convert a byte string to a list of 0/1 ints (MSB first per byte).""" + bits: list[int] = [] + for byte in data: + for shift in range(7, -1, -1): + bits.append((byte >> shift) & 1) + return bits + + +def _bits_to_bytes(bits: list[int]) -> bytes: + """Convert a list of 0/1 ints back to bytes (MSB first per byte).""" + result = bytearray() + for i in range(0, len(bits) - 7, 8): + byte_val = 0 + for j in range(8): + byte_val = (byte_val << 1) | bits[i + j] + result.append(byte_val) + return bytes(result) + + +# ============================================================================= +# MAJORITY VOTING +# ============================================================================= + + +def _majority_vote_length(length_bytes: bytes) -> int | None: + """ + Extract the payload length from three 4-byte copies via majority voting. + + Each copy is a big-endian uint32. The value that appears at least twice + wins. Returns None if all three disagree. + """ + if len(length_bytes) < 12: + return None + + copies = [ + struct.unpack(">I", length_bytes[0:4])[0], + struct.unpack(">I", length_bytes[4:8])[0], + struct.unpack(">I", length_bytes[8:12])[0], + ] + + debug.print(f"Length copies for majority vote: {copies}") + + if copies[0] == copies[1] or copies[0] == copies[2]: + return copies[0] + if copies[1] == copies[2]: + return copies[1] + + debug.print("Majority vote failed: all three length copies disagree") + return None + + +# ============================================================================= +# HEADER CONSTRUCTION +# ============================================================================= + + +def _build_header(data_length: int) -> bytes: + """ + Build the spread spectrum header. + + Layout: AUDIO_MAGIC_SPREAD (4B) + length (4B) x 3 copies = 16 bytes. + """ + length_packed = struct.pack(">I", data_length) + return AUDIO_MAGIC_SPREAD + length_packed * _LENGTH_COPIES + + +def _parse_header(header_bytes: bytes) -> tuple[bool, int | None]: + """ + Parse and validate the spread spectrum header. + + Returns: + (magic_valid, payload_length) -- length is None if voting fails. + """ + if len(header_bytes) < _HEADER_SIZE: + return False, None + + magic = header_bytes[:_MAGIC_SIZE] + if magic != AUDIO_MAGIC_SPREAD: + debug.print(f"Magic mismatch: got {magic!r}, expected {AUDIO_MAGIC_SPREAD!r}") + return False, None + + length = _majority_vote_length(header_bytes[_MAGIC_SIZE:_HEADER_SIZE]) + return True, length + + +# ============================================================================= +# PUBLIC API +# ============================================================================= + + +def calculate_audio_spread_capacity(audio_data: bytes) -> AudioCapacityInfo: + """ + Calculate embedding capacity for spread spectrum audio steganography. + + Loads the carrier audio, determines how many spread spectrum bits can + fit, accounts for Reed-Solomon overhead and the fixed header, and + returns the usable payload capacity in bytes. + + Args: + audio_data: Raw bytes of a WAV file. + + Returns: + AudioCapacityInfo with capacity details. + + Raises: + AudioError: If the audio cannot be read. + """ + if not HAS_SOUNDFILE: + raise AudioError("soundfile is required for audio spread spectrum steganography") + + try: + info = sf.info(io.BytesIO(audio_data)) + except Exception as e: + raise AudioError(f"Failed to read audio file: {e}") from e + + total_samples = info.frames * info.channels + total_bits = total_samples // AUDIO_SS_CHIP_LENGTH + total_bytes = total_bits // 8 + + # Subtract header overhead (16 bytes) + after_header = max(0, total_bytes - _HEADER_SIZE) + + # Account for Reed-Solomon overhead: RS adds RS_NSYM parity bytes per 255-byte block + # Usable fraction is (255 - RS_NSYM) / 255 + if HAS_REEDSOLO and AUDIO_SS_RS_NSYM > 0: + usable_bytes = int(after_header * (255 - AUDIO_SS_RS_NSYM) / 255) + else: + usable_bytes = after_header + + duration = info.frames / info.samplerate + + debug.print( + f"Spread spectrum capacity: {usable_bytes} bytes " + f"({total_samples} samples, {total_bits} bits, " + f"{info.samplerate} Hz, {info.channels} ch, {duration:.2f}s)" + ) + + return AudioCapacityInfo( + total_samples=total_samples, + usable_capacity_bytes=usable_bytes, + embed_mode=EMBED_MODE_AUDIO_SPREAD, + sample_rate=info.samplerate, + duration_seconds=duration, + ) + + +def embed_in_audio_spread( + data: bytes, + carrier_audio: bytes, + seed: bytes, + progress_file: str | None = None, +) -> tuple[bytes, AudioEmbedStats]: + """ + Embed data into audio using spread spectrum steganography. + + The payload is RS-encoded, prepended with a magic+length header + (with three copies of the length for majority voting), converted to + bits, and embedded by adding keyed pseudo-random chip sequences + to the carrier audio samples. + + Stereo audio is mixed to mono for embedding then the modification + is applied equally to all channels of the original. + + Args: + data: Raw payload bytes to embed (already encrypted by caller). + carrier_audio: Raw bytes of the carrier WAV file. + seed: Key material for chip sequence generation (any length, + hashed to 32 bytes internally if needed). + progress_file: Optional path for frontend progress polling. + + Returns: + Tuple of (stego WAV bytes, AudioEmbedStats). + + Raises: + AudioCapacityError: If the payload is too large for the carrier. + AudioError: On any other embedding failure. + """ + if not HAS_SOUNDFILE: + raise AudioError("soundfile is required for audio spread spectrum steganography") + + debug.print(f"Spread spectrum embedding {len(data)} bytes") + + try: + # 1. Read carrier audio as float64 + buf = io.BytesIO(carrier_audio) + samples, sample_rate = sf.read(buf, dtype="float64", always_2d=True) + original_shape = samples.shape + channels = original_shape[1] + num_frames = original_shape[0] + duration = num_frames / sample_rate + + # Read subtype from input to preserve on output + buf.seek(0) + carrier_info = sf.info(buf) + output_subtype = carrier_info.subtype if carrier_info.subtype else "PCM_16" + + debug.print( + f"Carrier: {sample_rate} Hz, {channels} ch, " + f"{num_frames} frames, {duration:.2f}s, subtype={output_subtype}" + ) + + # 2. Mix to mono for embedding (average across channels) + if channels > 1: + mono_samples = np.mean(samples, axis=1) + else: + mono_samples = samples[:, 0].copy() + + total_samples = len(mono_samples) + + # 3. RS-encode the payload + rs_data = _rs_encode(data) + debug.print(f"RS-encoded payload: {len(data)} -> {len(rs_data)} bytes") + + # 4. Build header: magic (4B) + length x3 (12B) = 16B + header = _build_header(len(data)) + + # 5. Combine header + RS-encoded data and convert to bits + full_payload = header + rs_data + bits = _bytes_to_bits(full_payload) + + total_bits = len(bits) + samples_needed = total_bits * AUDIO_SS_CHIP_LENGTH + + debug.print( + f"Total payload: {len(full_payload)} bytes = {total_bits} bits, " + f"needs {samples_needed} samples (have {total_samples})" + ) + + # 6. Check capacity + if samples_needed > total_samples: + max_bytes = (total_samples // AUDIO_SS_CHIP_LENGTH) // 8 + raise AudioCapacityError(len(full_payload), max_bytes) + + capacity_used = samples_needed / total_samples + + # 7. Initial progress + _write_progress(progress_file, 0, total_bits, "embedding") + + # 8. Embed via spread spectrum into mono + mono_modified = _embed_spread_spectrum( + mono_samples, + bits, + seed, + AUDIO_SS_AMPLITUDE, + offset=0, + progress_file=progress_file, + ) + + # 9. Apply modification back to all channels + # delta = modified_mono - original_mono, add delta to each channel + delta = mono_modified - (np.mean(samples, axis=1) if channels > 1 else samples[:, 0]) + for ch in range(channels): + samples[:, ch] += delta + + # Clip to [-1.0, 1.0] to prevent clipping artefacts + np.clip(samples, -1.0, 1.0, out=samples) + + _write_progress(progress_file, total_bits, total_bits, "saving") + + # 10. Write back as WAV preserving original subtype + output_buf = io.BytesIO() + sf.write(output_buf, samples, sample_rate, format="WAV", subtype=output_subtype) + output_buf.seek(0) + stego_bytes = output_buf.getvalue() + + samples_modified = samples_needed # every chip-length region was touched + stats = AudioEmbedStats( + samples_modified=samples_modified, + total_samples=total_samples * channels, + capacity_used=capacity_used, + bytes_embedded=len(full_payload), + sample_rate=sample_rate, + channels=channels, + duration_seconds=duration, + embed_mode=EMBED_MODE_AUDIO_SPREAD, + ) + + debug.print( + f"Spread spectrum embedding complete: {len(stego_bytes)} byte WAV, " + f"capacity used {capacity_used * 100:.1f}%" + ) + return stego_bytes, stats + + except AudioCapacityError: + raise + except Exception as e: + debug.exception(e, "embed_in_audio_spread") + raise AudioError(f"Failed to embed data in audio via spread spectrum: {e}") from e + + +def extract_from_audio_spread( + audio_data: bytes, + seed: bytes, + progress_file: str | None = None, +) -> bytes | None: + """ + Extract hidden data from audio using spread spectrum correlation. + + Loads the stego audio, extracts the header bits to recover the magic + marker and payload length (via majority voting on three copies), then + extracts the full RS-protected payload and decodes it. + + Args: + audio_data: Raw bytes of the stego WAV file. + seed: Key material (must match the seed used for embedding). + progress_file: Optional path for frontend progress polling. + + Returns: + Extracted payload bytes, or None if extraction fails (wrong key, + no data found, corrupted beyond recovery). + """ + if not HAS_SOUNDFILE: + debug.print("soundfile not available for spread spectrum extraction") + return None + + debug.print(f"Spread spectrum extracting from {len(audio_data)} byte audio") + + try: + # 1. Read stego audio as float64 + samples, sample_rate = sf.read(io.BytesIO(audio_data), dtype="float64", always_2d=True) + channels = samples.shape[1] + + # Mix to mono (same as embedding) + if channels > 1: + mono_samples = np.mean(samples, axis=1) + else: + mono_samples = samples[:, 0].copy() + + total_samples = len(mono_samples) + + debug.print(f"Stego audio: {sample_rate} Hz, {channels} ch, {total_samples} samples") + + # 2. Extract header bits: 16 bytes = 128 bits + header_bits_needed = _HEADER_SIZE * 8 + header_samples_needed = header_bits_needed * AUDIO_SS_CHIP_LENGTH + + if header_samples_needed > total_samples: + debug.print("Audio too short to contain spread spectrum header") + return None + + _write_progress(progress_file, 0, header_bits_needed, "extracting header") + + header_bits = _extract_spread_spectrum( + mono_samples, + header_bits_needed, + seed, + offset=0, + progress_file=None, # don't spam progress for header + ) + + if len(header_bits) < header_bits_needed: + debug.print( + f"Could not extract enough header bits: {len(header_bits)}/{header_bits_needed}" + ) + return None + + header_bytes = _bits_to_bytes(header_bits) + + # 3. Parse and validate header + magic_valid, data_length = _parse_header(header_bytes) + + if not magic_valid: + debug.print("Spread spectrum magic not found -- wrong key or no embedded data") + return None + + if data_length is None: + debug.print("Could not determine payload length (majority vote failed)") + return None + + debug.print(f"Header valid: magic=AUDS, payload_length={data_length}") + + # Sanity check the length + max_payload = (total_samples // AUDIO_SS_CHIP_LENGTH) // 8 - _HEADER_SIZE + if data_length < 1 or data_length > max_payload: + debug.print(f"Invalid payload length {data_length} (max possible: {max_payload})") + return None + + # 4. Calculate total bits for RS-encoded data + # RS adds AUDIO_SS_RS_NSYM parity bytes per (255 - RS_NSYM) data bytes + if HAS_REEDSOLO and AUDIO_SS_RS_NSYM > 0: + # RSCodec encodes in blocks: each block has 255 bytes (data + parity) + # For input of N bytes, output is N + ceil(N / (255 - RS_NSYM)) * RS_NSYM + data_block_size = 255 - AUDIO_SS_RS_NSYM + num_blocks = (data_length + data_block_size - 1) // data_block_size + rs_encoded_size = data_length + num_blocks * AUDIO_SS_RS_NSYM + else: + rs_encoded_size = data_length + + total_payload_bytes = _HEADER_SIZE + rs_encoded_size + total_bits_needed = total_payload_bytes * 8 + total_samples_needed = total_bits_needed * AUDIO_SS_CHIP_LENGTH + + if total_samples_needed > total_samples: + debug.print( + f"Need {total_samples_needed} samples for full extraction " + f"but only have {total_samples}" + ) + return None + + debug.print( + f"Extracting {total_bits_needed} bits " + f"({_HEADER_SIZE}B header + {rs_encoded_size}B RS payload)" + ) + + # 5. Extract all bits (including header again -- simpler and no perf issue) + _write_progress(progress_file, 0, total_bits_needed, "extracting") + + all_bits = _extract_spread_spectrum( + mono_samples, + total_bits_needed, + seed, + offset=0, + progress_file=progress_file, + ) + + if len(all_bits) < total_bits_needed: + debug.print(f"Short extraction: {len(all_bits)}/{total_bits_needed} bits") + return None + + _write_progress(progress_file, total_bits_needed, total_bits_needed, "decoding") + + # 6. Convert bits to bytes, skip header, get RS payload + all_bytes = _bits_to_bytes(all_bits) + rs_payload = all_bytes[_HEADER_SIZE : _HEADER_SIZE + rs_encoded_size] + + if len(rs_payload) < rs_encoded_size: + debug.print(f"RS payload too short: {len(rs_payload)}/{rs_encoded_size} bytes") + return None + + # 7. RS-decode + decoded = _rs_decode(rs_payload) + if decoded is None: + debug.print("Reed-Solomon decoding failed -- data too corrupted") + return None + + # 8. Verify decoded length matches header + if len(decoded) < data_length: + debug.print(f"Decoded data shorter than expected: {len(decoded)}/{data_length}") + return None + + payload = decoded[:data_length] + + debug.print(f"Spread spectrum extraction successful: {len(payload)} bytes") + return payload + + except Exception as e: + debug.exception(e, "extract_from_audio_spread") + return None diff --git a/src/stegasoo/validation.py b/src/stegasoo/validation.py index e90dc3b..7efc709 100644 --- a/src/stegasoo/validation.py +++ b/src/stegasoo/validation.py @@ -14,8 +14,10 @@ import io from PIL import Image from .constants import ( + ALLOWED_AUDIO_EXTENSIONS, ALLOWED_IMAGE_EXTENSIONS, ALLOWED_KEY_EXTENSIONS, + EMBED_MODE_AUDIO_AUTO, EMBED_MODE_AUTO, EMBED_MODE_DCT, EMBED_MODE_LSB, @@ -29,8 +31,10 @@ from .constants import ( MIN_PIN_LENGTH, MIN_RSA_BITS, RECOMMENDED_PASSPHRASE_WORDS, + VALID_AUDIO_EMBED_MODES, ) from .exceptions import ( + AudioValidationError, ImageValidationError, KeyValidationError, MessageValidationError, @@ -475,3 +479,33 @@ def require_security_factors(pin: str, rsa_key_data: bytes | None) -> None: result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: raise SecurityFactorError(result.error_message) + + +# ============================================================================= +# AUDIO VALIDATORS (v4.3.0) +# ============================================================================= + + +def validate_audio_file(filename: str) -> ValidationResult: + """Validate audio file extension.""" + return validate_file_extension(filename, ALLOWED_AUDIO_EXTENSIONS, "Audio file") + + +def validate_audio_embed_mode(mode: str) -> ValidationResult: + """Validate audio embedding mode.""" + valid_modes = VALID_AUDIO_EMBED_MODES | {EMBED_MODE_AUDIO_AUTO} + if mode not in valid_modes: + return ValidationResult.error( + f"Invalid audio embed_mode: '{mode}'. " + f"Valid options: {', '.join(sorted(valid_modes))}" + ) + return ValidationResult.ok(mode=mode) + + +def require_valid_audio(audio_data: bytes, name: str = "Audio") -> None: + """Validate audio, raising AudioValidationError on failure.""" + from .audio_utils import validate_audio + + result = validate_audio(audio_data, name) + if not result.is_valid: + raise AudioValidationError(result.error_message) diff --git a/tests/test_audio.py b/tests/test_audio.py new file mode 100644 index 0000000..3b40550 --- /dev/null +++ b/tests/test_audio.py @@ -0,0 +1,448 @@ +""" +Tests for Stegasoo audio steganography. + +Tests cover: +- Audio LSB roundtrip (encode + decode) +- Audio MDCT roundtrip (encode + decode) +- Wrong credentials fail to decode +- Capacity calculations +- Format detection +- Audio validation +""" + +import io + +import numpy as np +import pytest +import soundfile as sf + +from stegasoo.constants import ( + EMBED_MODE_AUDIO_LSB, + EMBED_MODE_AUDIO_SPREAD, +) +from stegasoo.models import AudioCapacityInfo, AudioEmbedStats, AudioInfo + +# ============================================================================= +# FIXTURES +# ============================================================================= + + +@pytest.fixture +def carrier_wav() -> bytes: + """Generate a small test WAV file (1 second, 44100 Hz, mono, 16-bit).""" + sample_rate = 44100 + duration = 1.0 + num_samples = int(sample_rate * duration) + # Generate a simple sine wave + t = np.linspace(0, duration, num_samples, endpoint=False) + samples = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16) + + buf = io.BytesIO() + sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") + buf.seek(0) + return buf.read() + + +@pytest.fixture +def carrier_wav_stereo() -> bytes: + """Generate a stereo test WAV file.""" + sample_rate = 44100 + duration = 1.0 + num_samples = int(sample_rate * duration) + t = np.linspace(0, duration, num_samples, endpoint=False) + left = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16) + right = (np.sin(2 * np.pi * 880 * t) * 16000).astype(np.int16) + samples = np.column_stack([left, right]) + + buf = io.BytesIO() + sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") + buf.seek(0) + return buf.read() + + +@pytest.fixture +def carrier_wav_long() -> bytes: + """Generate a longer WAV (15 seconds) for spread spectrum tests.""" + sample_rate = 44100 + duration = 15.0 + num_samples = int(sample_rate * duration) + t = np.linspace(0, duration, num_samples, endpoint=False) + # Mix of frequencies for better MDCT embedding + samples = ( + (np.sin(2 * np.pi * 440 * t) + np.sin(2 * np.pi * 880 * t) + np.sin(2 * np.pi * 1320 * t)) + * 5000 + ).astype(np.int16) + + buf = io.BytesIO() + sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") + buf.seek(0) + return buf.read() + + +@pytest.fixture +def carrier_wav_spread_integration() -> bytes: + """Generate a very long WAV (150 seconds) for spread spectrum integration tests. + + Spread spectrum needs 1024 samples per bit. With encryption + RS overhead (~690 bytes), + we need at least 690*8*1024 = 5.7M samples ~ 130 seconds at 44.1kHz. + """ + sample_rate = 44100 + duration = 150.0 + num_samples = int(sample_rate * duration) + t = np.linspace(0, duration, num_samples, endpoint=False) + samples = ( + (np.sin(2 * np.pi * 440 * t) + np.sin(2 * np.pi * 880 * t) + np.sin(2 * np.pi * 1320 * t)) + * 5000 + ).astype(np.int16) + + buf = io.BytesIO() + sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") + buf.seek(0) + return buf.read() + + +@pytest.fixture +def reference_photo() -> bytes: + """Generate a small reference photo (PNG).""" + from PIL import Image + + img = Image.new("RGB", (100, 100), color=(128, 64, 32)) + buf = io.BytesIO() + img.save(buf, "PNG") + buf.seek(0) + return buf.read() + + +# ============================================================================= +# AUDIO LSB TESTS +# ============================================================================= + + +class TestAudioLSB: + """Tests for audio LSB steganography.""" + + def test_calculate_capacity(self, carrier_wav): + from stegasoo.audio_steganography import calculate_audio_lsb_capacity + + capacity = calculate_audio_lsb_capacity(carrier_wav) + assert capacity > 0 + # 1 second at 44100 Hz mono should give ~5KB capacity at 1 bit/sample + assert capacity > 4000 + + def test_embed_extract_roundtrip(self, carrier_wav): + """Test basic LSB embed/extract roundtrip.""" + from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb + + payload = b"Hello, audio steganography!" + # Prepend with magic header to simulate real usage pattern + key = b"\x42" * 32 + + stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav, key) + + assert isinstance(stats, AudioEmbedStats) + assert stats.embed_mode == EMBED_MODE_AUDIO_LSB + assert stats.bytes_embedded > 0 + assert stats.samples_modified > 0 + assert 0 < stats.capacity_used <= 1.0 + + # Extract + extracted = extract_from_audio_lsb(stego_audio, key) + assert extracted is not None + assert extracted == payload + + def test_embed_extract_stereo(self, carrier_wav_stereo): + """Test LSB roundtrip with stereo audio.""" + from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb + + payload = b"Stereo test message" + key = b"\xAB" * 32 + + stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav_stereo, key) + assert stats.channels == 2 + + extracted = extract_from_audio_lsb(stego_audio, key) + assert extracted == payload + + def test_wrong_key_fails(self, carrier_wav): + """Test that wrong key produces no valid extraction.""" + from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb + + payload = b"Secret message" + correct_key = b"\x42" * 32 + wrong_key = b"\xFF" * 32 + + stego_audio, _ = embed_in_audio_lsb(payload, carrier_wav, correct_key) + + extracted = extract_from_audio_lsb(stego_audio, wrong_key) + # Should return None or garbage (not the original message) + assert extracted is None or extracted != payload + + def test_two_bits_per_sample(self, carrier_wav): + """Test embedding with 2 bits per sample.""" + from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb + + payload = b"Two bits per sample test" + key = b"\x55" * 32 + + stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav, key, bits_per_sample=2) + + extracted = extract_from_audio_lsb(stego_audio, key, bits_per_sample=2) + assert extracted == payload + + def test_generate_sample_indices(self): + """Test deterministic sample index generation.""" + from stegasoo.audio_steganography import generate_sample_indices + + key = b"\x42" * 32 + indices1 = generate_sample_indices(key, 10000, 100) + indices2 = generate_sample_indices(key, 10000, 100) + + # Same key should produce same indices + assert indices1 == indices2 + + # All indices should be valid + assert all(0 <= i < 10000 for i in indices1) + + # No duplicates + assert len(set(indices1)) == len(indices1) + + +# ============================================================================= +# AUDIO SPREAD SPECTRUM TESTS +# ============================================================================= + + +class TestAudioSpread: + """Tests for audio spread spectrum steganography.""" + + def test_calculate_capacity(self, carrier_wav_long): + from stegasoo.spread_steganography import calculate_audio_spread_capacity + + capacity = calculate_audio_spread_capacity(carrier_wav_long) + assert isinstance(capacity, AudioCapacityInfo) + assert capacity.usable_capacity_bytes > 0 + assert capacity.embed_mode == EMBED_MODE_AUDIO_SPREAD + + def test_spread_roundtrip(self, carrier_wav_long): + """Test spread spectrum embed/extract roundtrip.""" + from stegasoo.spread_steganography import ( + embed_in_audio_spread, + extract_from_audio_spread, + ) + + payload = b"Spread test" + seed = b"\x42" * 32 + + stego_audio, stats = embed_in_audio_spread(payload, carrier_wav_long, seed) + + assert isinstance(stats, AudioEmbedStats) + assert stats.embed_mode == EMBED_MODE_AUDIO_SPREAD + + extracted = extract_from_audio_spread(stego_audio, seed) + assert extracted is not None + assert extracted == payload + + def test_wrong_seed_fails(self, carrier_wav_long): + """Test that wrong seed produces no valid extraction.""" + from stegasoo.spread_steganography import ( + embed_in_audio_spread, + extract_from_audio_spread, + ) + + payload = b"Secret spread" + correct_seed = b"\x42" * 32 + wrong_seed = b"\xFF" * 32 + + stego_audio, _ = embed_in_audio_spread(payload, carrier_wav_long, correct_seed) + + extracted = extract_from_audio_spread(stego_audio, wrong_seed) + assert extracted is None or extracted != payload + + +# ============================================================================= +# FORMAT DETECTION TESTS +# ============================================================================= + + +class TestFormatDetection: + """Tests for audio format detection.""" + + def test_detect_wav(self, carrier_wav): + from stegasoo.audio_utils import detect_audio_format + + assert detect_audio_format(carrier_wav) == "wav" + + def test_detect_unknown(self): + from stegasoo.audio_utils import detect_audio_format + + assert detect_audio_format(b"not audio data") == "unknown" + + def test_detect_empty(self): + from stegasoo.audio_utils import detect_audio_format + + assert detect_audio_format(b"") == "unknown" + + +# ============================================================================= +# AUDIO INFO TESTS +# ============================================================================= + + +class TestAudioInfo: + """Tests for audio info extraction.""" + + def test_get_wav_info(self, carrier_wav): + from stegasoo.audio_utils import get_audio_info + + info = get_audio_info(carrier_wav) + assert isinstance(info, AudioInfo) + assert info.sample_rate == 44100 + assert info.channels == 1 + assert info.format == "wav" + assert abs(info.duration_seconds - 1.0) < 0.1 + + def test_get_stereo_info(self, carrier_wav_stereo): + from stegasoo.audio_utils import get_audio_info + + info = get_audio_info(carrier_wav_stereo) + assert info.channels == 2 + + +# ============================================================================= +# VALIDATION TESTS +# ============================================================================= + + +class TestAudioValidation: + """Tests for audio validation.""" + + def test_validate_valid_audio(self, carrier_wav): + from stegasoo.audio_utils import validate_audio + + result = validate_audio(carrier_wav) + assert result.is_valid + + def test_validate_empty_audio(self): + from stegasoo.audio_utils import validate_audio + + result = validate_audio(b"") + assert not result.is_valid + + def test_validate_invalid_audio(self): + from stegasoo.audio_utils import validate_audio + + result = validate_audio(b"not audio data at all") + assert not result.is_valid + + def test_validate_audio_embed_mode(self): + from stegasoo.validation import validate_audio_embed_mode + + assert validate_audio_embed_mode("audio_lsb").is_valid + assert validate_audio_embed_mode("audio_spread").is_valid + assert validate_audio_embed_mode("audio_auto").is_valid + assert not validate_audio_embed_mode("invalid").is_valid + + +# ============================================================================= +# INTEGRATION TESTS +# ============================================================================= + + +class TestIntegration: + """End-to-end integration tests using encode_audio/decode_audio.""" + + def test_lsb_encode_decode(self, carrier_wav, reference_photo): + from stegasoo.decode import decode_audio + from stegasoo.encode import encode_audio + + stego_audio, stats = encode_audio( + message="Hello from audio steganography!", + reference_photo=reference_photo, + carrier_audio=carrier_wav, + passphrase="test words here now", + pin="123456", + embed_mode="audio_lsb", + ) + + assert len(stego_audio) > 0 + + result = decode_audio( + stego_audio=stego_audio, + reference_photo=reference_photo, + passphrase="test words here now", + pin="123456", + embed_mode="audio_lsb", + ) + + assert result.is_text + assert result.message == "Hello from audio steganography!" + + def test_lsb_wrong_credentials(self, carrier_wav, reference_photo): + from stegasoo.decode import decode_audio + from stegasoo.encode import encode_audio + + stego_audio, _ = encode_audio( + message="Secret", + reference_photo=reference_photo, + carrier_audio=carrier_wav, + passphrase="correct horse battery staple", + pin="123456", + embed_mode="audio_lsb", + ) + + with pytest.raises(Exception): + decode_audio( + stego_audio=stego_audio, + reference_photo=reference_photo, + passphrase="wrong passphrase words here", + pin="654321", + embed_mode="audio_lsb", + ) + + def test_spread_encode_decode(self, carrier_wav_spread_integration, reference_photo): + """Test full spread spectrum encode/decode pipeline.""" + from stegasoo.decode import decode_audio + from stegasoo.encode import encode_audio + + stego_audio, stats = encode_audio( + message="Spread integration test", + reference_photo=reference_photo, + carrier_audio=carrier_wav_spread_integration, + passphrase="test words here now", + pin="123456", + embed_mode="audio_spread", + ) + + result = decode_audio( + stego_audio=stego_audio, + reference_photo=reference_photo, + passphrase="test words here now", + pin="123456", + embed_mode="audio_spread", + ) + + assert result.message == "Spread integration test" + + def test_auto_detect_lsb(self, carrier_wav, reference_photo): + """Test auto-detection finds LSB encoded audio.""" + from stegasoo.decode import decode_audio + from stegasoo.encode import encode_audio + + stego_audio, _ = encode_audio( + message="Auto-detect test", + reference_photo=reference_photo, + carrier_audio=carrier_wav, + passphrase="test words here now", + pin="123456", + embed_mode="audio_lsb", + ) + + result = decode_audio( + stego_audio=stego_audio, + reference_photo=reference_photo, + passphrase="test words here now", + pin="123456", + embed_mode="audio_auto", + ) + + assert result.message == "Auto-detect test"