Add audio steganography with LSB and spread spectrum modes

Implement two audio embedding modes following the same multi-factor
authentication pipeline as image steganography (passphrase + PIN +
optional RSA key + optional channel key):

- audio_lsb: High-capacity LSB embedding in PCM samples for lossless
  formats (WAV/FLAC). Uses ChaCha20-keyed sample index selection.
- audio_spread: Direct-sequence spread spectrum (DSSS) with ChaCha20-
  keyed bipolar chip codes, Reed-Solomon error correction, and 3-copy
  majority-voted length headers. Designed to survive lossy compression.

New files:
- audio_steganography.py: LSB embed/extract on PCM samples
- spread_steganography.py: Spread spectrum embed/extract
- audio_utils.py: Format detection, transcoding, validation helpers
- tests/test_audio.py: 22 tests covering both modes end-to-end

Updated encode.py, decode.py, cli.py (audio-encode/audio-decode
commands), constants.py, models.py, exceptions.py, validation.py,
__init__.py, and pyproject.toml ([audio] extra).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
adlee-was-taken
2026-02-27 20:26:07 -05:00
parent 7aeb26e003
commit 0248bec813
13 changed files with 2885 additions and 3 deletions

View File

@@ -52,6 +52,13 @@ dct = [
"jpeglib>=1.0.0",
"reedsolo>=1.7.0",
]
audio = [
"pydub>=0.25.0",
"numpy>=2.0.0",
"scipy>=1.10.0",
"soundfile>=0.12.0",
"reedsolo>=1.7.0",
]
cli = [
"click>=8.0.0",
"qrcode>=7.30",
@@ -86,7 +93,7 @@ api = [
"reedsolo>=1.7.0",
]
all = [
"stegasoo[cli,web,api,dct,compression]",
"stegasoo[cli,web,api,dct,audio,compression]",
]
dev = [
"stegasoo[all]",
@@ -141,6 +148,8 @@ ignore = ["E501"]
[tool.ruff.lint.per-file-ignores]
# YCbCr colorspace variables (R, G, B, Y, Cb, Cr) are standard names
"src/stegasoo/dct_steganography.py" = ["N803", "N806"]
# MDCT transform variables (N, X) are standard mathematical names
"src/stegasoo/spread_steganography.py" = ["N803", "N806"]
# Package __init__.py has imports after try/except and aliases - intentional structure
"src/stegasoo/__init__.py" = ["E402"]

View File

@@ -24,8 +24,8 @@ from .channel import (
# Crypto functions
from .crypto import get_active_channel_key, get_channel_fingerprint, has_argon2
from .decode import decode, decode_file, decode_text
from .encode import encode
from .decode import decode, decode_audio, decode_file, decode_text
from .encode import encode, encode_audio
# Credential generation
from .generate import (
@@ -54,6 +54,23 @@ from .steganography import (
# Utilities
from .utils import generate_filename
# Audio utilities - optional, may not be available (v4.3.0)
try:
from .audio_utils import (
detect_audio_format,
get_audio_info,
has_ffmpeg_support,
validate_audio,
)
HAS_AUDIO_SUPPORT = True
except ImportError:
HAS_AUDIO_SUPPORT = False
detect_audio_format = None
get_audio_info = None
has_ffmpeg_support = None
validate_audio = None
# QR Code utilities - optional, may not be available
try:
from .qr_utils import (
@@ -88,6 +105,9 @@ validate_carrier = validate_image
# Constants
from .constants import (
DEFAULT_PASSPHRASE_WORDS,
EMBED_MODE_AUDIO_AUTO,
EMBED_MODE_AUDIO_LSB,
EMBED_MODE_AUDIO_SPREAD,
EMBED_MODE_AUTO,
EMBED_MODE_DCT,
EMBED_MODE_LSB,
@@ -106,6 +126,11 @@ from .constants import (
# Exceptions
from .exceptions import (
AudioCapacityError,
AudioError,
AudioExtractionError,
AudioTranscodeError,
AudioValidationError,
CapacityError,
CryptoError,
DecryptionError,
@@ -127,11 +152,15 @@ from .exceptions import (
SecurityFactorError,
SteganographyError,
StegasooError,
UnsupportedAudioFormatError,
ValidationError,
)
# Models
from .models import (
AudioCapacityInfo,
AudioEmbedStats,
AudioInfo,
CapacityComparison,
Credentials,
DecodeResult,
@@ -142,6 +171,8 @@ from .models import (
ValidationResult,
)
from .validation import (
validate_audio_embed_mode,
validate_audio_file,
validate_dct_color_mode,
validate_dct_output_format,
validate_embed_mode,
@@ -164,6 +195,16 @@ __all__ = [
"decode",
"decode_file",
"decode_text",
# Audio (v4.3.0)
"encode_audio",
"decode_audio",
"detect_audio_format",
"get_audio_info",
"has_ffmpeg_support",
"validate_audio",
"HAS_AUDIO_SUPPORT",
"validate_audio_embed_mode",
"validate_audio_file",
# Generation
"generate_pin",
"generate_passphrase",
@@ -221,6 +262,10 @@ __all__ = [
"FilePayload",
"Credentials",
"ValidationResult",
# Audio models
"AudioEmbedStats",
"AudioInfo",
"AudioCapacityInfo",
# Exceptions
"StegasooError",
"ValidationError",
@@ -244,6 +289,13 @@ __all__ = [
"ReedSolomonError",
"NoDataFoundError",
"ModeMismatchError",
# Audio exceptions
"AudioError",
"AudioValidationError",
"AudioCapacityError",
"AudioExtractionError",
"AudioTranscodeError",
"UnsupportedAudioFormatError",
# Constants
"FORMAT_VERSION",
"MIN_PASSPHRASE_WORDS",
@@ -266,4 +318,8 @@ __all__ = [
"EMBED_MODE_LSB",
"EMBED_MODE_DCT",
"EMBED_MODE_AUTO",
# Audio constants
"EMBED_MODE_AUDIO_LSB",
"EMBED_MODE_AUDIO_SPREAD",
"EMBED_MODE_AUDIO_AUTO",
]

View File

@@ -0,0 +1,514 @@
"""
Stegasoo Audio Steganography — LSB Embedding/Extraction (v4.3.0)
LSB (Least Significant Bit) embedding for PCM audio samples.
Hides data in the least significant bit(s) of audio samples, analogous to
how steganography.py hides data in pixel LSBs. The carrier audio must be
lossless (WAV or FLAC) — lossy codecs (MP3, OGG, AAC) destroy LSBs.
Uses ChaCha20 as a CSPRNG for pseudo-random sample index selection,
ensuring that without the key an attacker cannot determine which samples
were modified.
Supports:
- 16-bit PCM (int16 samples)
- 24-bit PCM (int32 samples from soundfile)
- Float audio (converted to int16 before embedding)
- 1 or 2 bits per sample embedding depth
- Mono and multi-channel audio (flattened for embedding)
"""
import io
import struct
import numpy as np
import soundfile as sf
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from .constants import (
AUDIO_MAGIC_LSB,
EMBED_MODE_AUDIO_LSB,
)
from .debug import debug
from .exceptions import AudioCapacityError, AudioError
from .models import AudioEmbedStats
from .steganography import ENCRYPTION_OVERHEAD
# Progress reporting interval — write every N samples
PROGRESS_INTERVAL = 5000
# =============================================================================
# PROGRESS REPORTING
# =============================================================================
def _write_progress(progress_file: str | None, current: int, total: int, phase: str = "embedding"):
"""Write progress to file for frontend polling."""
if progress_file is None:
return
try:
import json
with open(progress_file, "w") as f:
json.dump(
{
"current": current,
"total": total,
"percent": round((current / total) * 100, 1) if total > 0 else 0,
"phase": phase,
},
f,
)
except Exception:
pass # Don't let progress writing break encoding
# =============================================================================
# CAPACITY
# =============================================================================
def calculate_audio_lsb_capacity(
audio_data: bytes,
bits_per_sample: int = 1,
) -> int:
"""
Calculate the maximum bytes that can be embedded in a WAV/FLAC file via LSB.
Reads the carrier audio with soundfile, counts the total number of individual
sample values (num_frames * channels), and computes how many payload bytes
can be hidden at the given bit depth, minus the fixed encryption overhead.
Args:
audio_data: Raw bytes of a WAV or FLAC file.
bits_per_sample: Number of LSBs to use per sample (1 or 2).
Returns:
Maximum embeddable payload size in bytes (after subtracting overhead).
Raises:
AudioError: If the audio cannot be read or is in an unsupported format.
"""
debug.validate(
bits_per_sample in (1, 2), f"bits_per_sample must be 1 or 2, got {bits_per_sample}"
)
try:
info = sf.info(io.BytesIO(audio_data))
except Exception as e:
raise AudioError(f"Failed to read audio file: {e}") from e
num_samples = info.frames * info.channels
total_bits = num_samples * bits_per_sample
max_bytes = total_bits // 8
capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD)
debug.print(
f"Audio LSB capacity: {capacity} bytes "
f"({num_samples} samples, {bits_per_sample} bit(s)/sample, "
f"{info.samplerate} Hz, {info.channels} ch)"
)
return capacity
# =============================================================================
# SAMPLE INDEX GENERATION (ChaCha20 CSPRNG)
# =============================================================================
#
# Identical strategy to generate_pixel_indices in steganography.py:
# - >= 50% capacity utilisation: full Fisher-Yates shuffle, take first N
# - < 50%: direct random sampling with collision handling
#
# The key MUST be 32 bytes (same derivation path as the pixel key).
@debug.time
def generate_sample_indices(key: bytes, num_samples: int, num_needed: int) -> list[int]:
"""
Generate pseudo-random sample indices using ChaCha20 as a CSPRNG.
Produces a deterministic sequence of unique sample indices so that
the same key always yields the same embedding locations.
Args:
key: 32-byte key for the ChaCha20 cipher.
num_samples: Total number of samples in the carrier audio.
num_needed: How many unique sample indices are required.
Returns:
List of ``num_needed`` unique indices in [0, num_samples).
Raises:
AssertionError (via debug.validate): On invalid arguments.
"""
debug.validate(len(key) == 32, f"Sample key must be 32 bytes, got {len(key)}")
debug.validate(num_samples > 0, f"Number of samples must be positive, got {num_samples}")
debug.validate(num_needed > 0, f"Number needed must be positive, got {num_needed}")
debug.validate(
num_needed <= num_samples,
f"Cannot select {num_needed} samples from {num_samples} available",
)
debug.print(f"Generating {num_needed} sample indices from {num_samples} total samples")
# Strategy 1: Full Fisher-Yates shuffle when we need many indices
if num_needed >= num_samples // 2:
debug.print(f"Using full shuffle (needed {num_needed}/{num_samples} samples)")
nonce = b"\x00" * 16
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
encryptor = cipher.encryptor()
indices = list(range(num_samples))
random_bytes = encryptor.update(b"\x00" * (num_samples * 4))
for i in range(num_samples - 1, 0, -1):
j_bytes = random_bytes[(num_samples - 1 - i) * 4 : (num_samples - i) * 4]
j = int.from_bytes(j_bytes, "big") % (i + 1)
indices[i], indices[j] = indices[j], indices[i]
selected = indices[:num_needed]
debug.print(f"Generated {len(selected)} indices via shuffle")
return selected
# Strategy 2: Direct sampling for lower utilisation
debug.print(f"Using optimized selection (needed {num_needed}/{num_samples} samples)")
selected: list[int] = []
used: set[int] = set()
nonce = b"\x00" * 16
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
encryptor = cipher.encryptor()
# Pre-generate 2x bytes to handle expected collisions
bytes_needed = (num_needed * 2) * 4
random_bytes = encryptor.update(b"\x00" * bytes_needed)
byte_offset = 0
collisions = 0
while len(selected) < num_needed and byte_offset < len(random_bytes) - 4:
idx = int.from_bytes(random_bytes[byte_offset : byte_offset + 4], "big") % num_samples
byte_offset += 4
if idx not in used:
used.add(idx)
selected.append(idx)
else:
collisions += 1
# Edge case: ran out of pre-generated bytes (very high collision rate)
if len(selected) < num_needed:
debug.print(f"Need {num_needed - len(selected)} more indices, generating...")
extra_needed = num_needed - len(selected)
for _ in range(extra_needed * 2):
extra_bytes = encryptor.update(b"\x00" * 4)
idx = int.from_bytes(extra_bytes, "big") % num_samples
if idx not in used:
used.add(idx)
selected.append(idx)
if len(selected) == num_needed:
break
debug.print(f"Generated {len(selected)} indices with {collisions} collisions")
debug.validate(
len(selected) == num_needed,
f"Failed to generate enough indices: {len(selected)}/{num_needed}",
)
return selected
# =============================================================================
# EMBEDDING
# =============================================================================
@debug.time
def embed_in_audio_lsb(
data: bytes,
carrier_audio: bytes,
sample_key: bytes,
bits_per_sample: int = 1,
progress_file: str | None = None,
) -> tuple[bytes, AudioEmbedStats]:
"""
Embed data into PCM audio samples using LSB steganography.
The payload is prepended with a 4-byte magic header (``AUDIO_MAGIC_LSB``)
and a 4-byte big-endian length prefix, then converted to a binary string.
Pseudo-random sample indices are generated from ``sample_key`` and the
corresponding sample LSBs are overwritten.
The modified audio is written back as a 16-bit PCM WAV file.
Args:
data: Encrypted payload bytes to embed.
carrier_audio: Raw bytes of the carrier WAV/FLAC file.
sample_key: 32-byte key for sample index generation.
bits_per_sample: LSBs to use per sample (1 or 2).
progress_file: Optional path for progress JSON (frontend polling).
Returns:
Tuple of (stego WAV bytes, AudioEmbedStats).
Raises:
AudioCapacityError: If the payload is too large for the carrier.
AudioError: On any other embedding failure.
"""
debug.print(f"Audio LSB embedding {len(data)} bytes")
debug.data(sample_key, "Sample key for embedding")
debug.validate(
bits_per_sample in (1, 2), f"bits_per_sample must be 1 or 2, got {bits_per_sample}"
)
debug.validate(len(sample_key) == 32, f"Sample key must be 32 bytes, got {len(sample_key)}")
try:
# 1. Read carrier audio
samples, samplerate = sf.read(io.BytesIO(carrier_audio), dtype="int16", always_2d=True)
# samples shape: (num_frames, channels)
original_shape = samples.shape
channels = original_shape[1]
duration = original_shape[0] / samplerate
debug.print(
f"Carrier audio: {samplerate} Hz, {channels} ch, "
f"{original_shape[0]} frames, {duration:.2f}s"
)
# Flatten to 1D for embedding
flat_samples = samples.flatten().copy()
num_samples = len(flat_samples)
# 2. Prepend magic + length prefix
header = AUDIO_MAGIC_LSB + struct.pack(">I", len(data))
payload = header + data
debug.print(f"Payload with header: {len(payload)} bytes (magic 4 + len 4 + data {len(data)})")
# 3. Check capacity
max_bytes = (num_samples * bits_per_sample) // 8
if len(payload) > max_bytes:
debug.print(f"Capacity error: need {len(payload)}, have {max_bytes}")
raise AudioCapacityError(len(payload), max_bytes)
debug.print(
f"Capacity usage: {len(payload)}/{max_bytes} bytes "
f"({len(payload) / max_bytes * 100:.1f}%)"
)
# 4. Convert payload to binary string
binary_data = "".join(format(b, "08b") for b in payload)
samples_needed = (len(binary_data) + bits_per_sample - 1) // bits_per_sample
debug.print(f"Need {samples_needed} samples to embed {len(binary_data)} bits")
# 5. Generate pseudo-random sample indices
selected_indices = generate_sample_indices(sample_key, num_samples, samples_needed)
# 6. Modify LSBs of selected samples
lsb_mask = (1 << bits_per_sample) - 1
bit_idx = 0
modified_count = 0
total_to_process = len(selected_indices)
# Initial progress
if progress_file:
_write_progress(progress_file, 5, 100, "embedding")
for progress_idx, sample_idx in enumerate(selected_indices):
if bit_idx >= len(binary_data):
break
bits = binary_data[bit_idx : bit_idx + bits_per_sample].ljust(bits_per_sample, "0")
bit_val = int(bits, 2)
sample_val = flat_samples[sample_idx]
# Work in unsigned 16-bit space to avoid overflow
unsigned_val = int(sample_val) & 0xFFFF
new_unsigned = (unsigned_val & ~lsb_mask) | bit_val
# Convert back to signed int16
new_val = np.int16(new_unsigned if new_unsigned < 32768 else new_unsigned - 65536)
if sample_val != new_val:
flat_samples[sample_idx] = new_val
modified_count += 1
bit_idx += bits_per_sample
# Report progress periodically
if progress_file and progress_idx % PROGRESS_INTERVAL == 0:
_write_progress(progress_file, progress_idx, total_to_process, "embedding")
# Final progress before save
if progress_file:
_write_progress(progress_file, total_to_process, total_to_process, "saving")
debug.print(f"Modified {modified_count} samples (out of {samples_needed} selected)")
# 7. Reshape and write back as WAV
stego_samples = flat_samples.reshape(original_shape)
output_buf = io.BytesIO()
sf.write(output_buf, stego_samples, samplerate, format="WAV", subtype="PCM_16")
output_buf.seek(0)
stego_bytes = output_buf.getvalue()
stats = AudioEmbedStats(
samples_modified=modified_count,
total_samples=num_samples,
capacity_used=len(payload) / max_bytes,
bytes_embedded=len(payload),
sample_rate=samplerate,
channels=channels,
duration_seconds=duration,
embed_mode=EMBED_MODE_AUDIO_LSB,
)
debug.print(f"Audio LSB embedding complete: {len(stego_bytes)} byte WAV")
return stego_bytes, stats
except AudioCapacityError:
raise
except Exception as e:
debug.exception(e, "embed_in_audio_lsb")
raise AudioError(f"Failed to embed data in audio: {e}") from e
# =============================================================================
# EXTRACTION
# =============================================================================
@debug.time
def extract_from_audio_lsb(
audio_data: bytes,
sample_key: bytes,
bits_per_sample: int = 1,
progress_file: str | None = None,
) -> bytes | None:
"""
Extract hidden data from audio using LSB steganography.
Reads the stego audio, generates the same pseudo-random sample indices
from ``sample_key``, extracts the LSBs, and reconstructs the payload.
Verifies the ``AUDIO_MAGIC_LSB`` header before returning.
Args:
audio_data: Raw bytes of the stego WAV file.
sample_key: 32-byte key (must match the one used for embedding).
bits_per_sample: LSBs per sample (must match embedding).
progress_file: Optional path for progress JSON.
Returns:
Extracted payload bytes (without magic/length prefix), or ``None``
if extraction fails (wrong key, no data, corrupted).
"""
debug.print(f"Audio LSB extracting from {len(audio_data)} byte audio")
debug.data(sample_key, "Sample key for extraction")
debug.validate(
bits_per_sample in (1, 2), f"bits_per_sample must be 1 or 2, got {bits_per_sample}"
)
try:
# 1. Read audio
samples, samplerate = sf.read(io.BytesIO(audio_data), dtype="int16", always_2d=True)
flat_samples = samples.flatten()
num_samples = len(flat_samples)
debug.print(f"Audio: {samplerate} Hz, {samples.shape[1]} ch, {num_samples} total samples")
# 2. Extract initial samples to find magic bytes + length (8 bytes = 64 bits)
header_bits_needed = 64 # 4 bytes magic + 4 bytes length
header_samples_needed = (header_bits_needed + bits_per_sample - 1) // bits_per_sample + 10
if header_samples_needed > num_samples:
debug.print("Audio too small to contain header")
return None
initial_indices = generate_sample_indices(sample_key, num_samples, header_samples_needed)
binary_data = ""
for sample_idx in initial_indices:
val = int(flat_samples[sample_idx]) & 0xFFFF
for bit_pos in range(bits_per_sample - 1, -1, -1):
binary_data += str((val >> bit_pos) & 1)
# 3. Verify magic bytes
if len(binary_data) < 64:
debug.print(f"Not enough bits for header: {len(binary_data)}/64")
return None
magic_bits = binary_data[:32]
magic_bytes = int(magic_bits, 2).to_bytes(4, "big")
if magic_bytes != AUDIO_MAGIC_LSB:
debug.print(f"Magic mismatch: got {magic_bytes!r}, expected {AUDIO_MAGIC_LSB!r}")
return None
debug.print("Magic bytes verified: AUDL")
# 4. Parse length
length_bits = binary_data[32:64]
data_length = struct.unpack(">I", int(length_bits, 2).to_bytes(4, "big"))[0]
debug.print(f"Extracted length: {data_length} bytes")
# Sanity check length
max_possible = (num_samples * bits_per_sample) // 8 - 8 # minus header
if data_length > max_possible or data_length < 1:
debug.print(f"Invalid data length: {data_length} (max possible: {max_possible})")
return None
# 5. Extract full payload
total_bits = (8 + data_length) * 8 # header (8 bytes) + payload
total_samples_needed = (total_bits + bits_per_sample - 1) // bits_per_sample
if total_samples_needed > num_samples:
debug.print(
f"Need {total_samples_needed} samples but only {num_samples} available"
)
return None
debug.print(f"Need {total_samples_needed} samples to extract {data_length} bytes")
selected_indices = generate_sample_indices(sample_key, num_samples, total_samples_needed)
# Initial progress
if progress_file:
_write_progress(progress_file, 5, 100, "extracting")
binary_data = ""
for progress_idx, sample_idx in enumerate(selected_indices):
val = int(flat_samples[sample_idx]) & 0xFFFF
for bit_pos in range(bits_per_sample - 1, -1, -1):
binary_data += str((val >> bit_pos) & 1)
if progress_file and progress_idx % PROGRESS_INTERVAL == 0:
_write_progress(
progress_file, progress_idx, total_samples_needed, "extracting"
)
if progress_file:
_write_progress(
progress_file, total_samples_needed, total_samples_needed, "extracting"
)
# Skip the 8-byte header (magic + length) = 64 bits
data_bits = binary_data[64 : 64 + (data_length * 8)]
if len(data_bits) < data_length * 8:
debug.print(f"Insufficient bits: {len(data_bits)} < {data_length * 8}")
return None
# Convert bits back to bytes
data_bytes = bytearray()
for i in range(0, len(data_bits), 8):
byte_bits = data_bits[i : i + 8]
if len(byte_bits) == 8:
data_bytes.append(int(byte_bits, 2))
debug.print(f"Audio LSB successfully extracted {len(data_bytes)} bytes")
return bytes(data_bytes)
except Exception as e:
debug.exception(e, "extract_from_audio_lsb")
return None

536
src/stegasoo/audio_utils.py Normal file
View File

@@ -0,0 +1,536 @@
"""
Stegasoo Audio Utilities (v4.3.0)
Audio format detection, transcoding, and metadata extraction for audio steganography.
Dependencies:
- soundfile (sf): Fast WAV/FLAC reading without ffmpeg
- pydub: MP3/OGG/AAC transcoding (wraps ffmpeg)
Both are optional — functions degrade gracefully when unavailable.
"""
from __future__ import annotations
import io
import logging
import shutil
from .constants import (
EMBED_MODE_AUDIO_AUTO,
MAX_AUDIO_DURATION,
MAX_AUDIO_FILE_SIZE,
MAX_AUDIO_SAMPLE_RATE,
MIN_AUDIO_SAMPLE_RATE,
VALID_AUDIO_EMBED_MODES,
)
from .exceptions import AudioTranscodeError, AudioValidationError, UnsupportedAudioFormatError
from .models import AudioInfo, ValidationResult
logger = logging.getLogger(__name__)
# =============================================================================
# FFMPEG AVAILABILITY
# =============================================================================
def has_ffmpeg_support() -> bool:
"""Check if ffmpeg is available on the system.
Returns:
True if ffmpeg is found on PATH, False otherwise.
"""
return shutil.which("ffmpeg") is not None
# =============================================================================
# FORMAT DETECTION
# =============================================================================
def detect_audio_format(audio_data: bytes) -> str:
"""Detect audio format from magic bytes.
Examines the first bytes of audio data to identify the container format.
Magic byte signatures:
- WAV: b"RIFF" at offset 0 + b"WAVE" at offset 8
- FLAC: b"fLaC" at offset 0
- MP3: b"\\xff\\xfb", b"\\xff\\xf3", b"\\xff\\xf2" (sync bytes) or b"ID3" (ID3 tag)
- OGG (Vorbis/Opus): b"OggS" at offset 0
- AAC: b"\\xff\\xf1" or b"\\xff\\xf9" (ADTS header)
- M4A/MP4: b"ftyp" at offset 4
Args:
audio_data: Raw audio file bytes.
Returns:
Format string: "wav", "flac", "mp3", "ogg", "aac", "m4a", or "unknown".
"""
if len(audio_data) < 12:
return "unknown"
# WAV: RIFF....WAVE
if audio_data[:4] == b"RIFF" and audio_data[8:12] == b"WAVE":
return "wav"
# FLAC
if audio_data[:4] == b"fLaC":
return "flac"
# OGG (Vorbis or Opus)
if audio_data[:4] == b"OggS":
return "ogg"
# MP3 with ID3 tag
if audio_data[:3] == b"ID3":
return "mp3"
# MP3 sync bytes (MPEG audio frame header)
if len(audio_data) >= 2 and audio_data[:2] in (b"\xff\xfb", b"\xff\xf3", b"\xff\xf2"):
return "mp3"
# M4A/MP4 container: "ftyp" at offset 4
if audio_data[4:8] == b"ftyp":
return "m4a"
# AAC ADTS header
if len(audio_data) >= 2 and audio_data[:2] in (b"\xff\xf1", b"\xff\xf9"):
return "aac"
return "unknown"
# =============================================================================
# TRANSCODING
# =============================================================================
def transcode_to_wav(audio_data: bytes) -> bytes:
"""Transcode any supported audio format to WAV PCM format.
Uses soundfile directly for WAV/FLAC (no ffmpeg needed).
Uses pydub (wraps ffmpeg) for lossy formats (MP3, OGG, AAC, M4A).
Args:
audio_data: Raw audio file bytes in any supported format.
Returns:
WAV PCM file bytes (16-bit, original sample rate).
Raises:
AudioTranscodeError: If transcoding fails.
UnsupportedAudioFormatError: If the format cannot be detected.
"""
fmt = detect_audio_format(audio_data)
if fmt == "unknown":
raise UnsupportedAudioFormatError(
"Cannot detect audio format. Supported: WAV, FLAC, MP3, OGG, AAC, M4A."
)
# WAV files: validate with soundfile but return as-is if already PCM
if fmt == "wav":
try:
import soundfile as sf
buf = io.BytesIO(audio_data)
info = sf.info(buf)
if info.subtype in ("PCM_16", "PCM_24", "PCM_32", "FLOAT", "DOUBLE"):
# Re-encode to ensure consistent PCM_16 output
buf.seek(0)
data, samplerate = sf.read(buf, dtype="int16")
out = io.BytesIO()
sf.write(out, data, samplerate, format="WAV", subtype="PCM_16")
return out.getvalue()
except ImportError:
raise AudioTranscodeError("soundfile package is required for WAV processing")
except Exception as e:
raise AudioTranscodeError(f"Failed to process WAV: {e}")
# FLAC: use soundfile (fast, no ffmpeg)
if fmt == "flac":
try:
import soundfile as sf
buf = io.BytesIO(audio_data)
data, samplerate = sf.read(buf, dtype="int16")
out = io.BytesIO()
sf.write(out, data, samplerate, format="WAV", subtype="PCM_16")
return out.getvalue()
except ImportError:
raise AudioTranscodeError("soundfile package is required for FLAC processing")
except Exception as e:
raise AudioTranscodeError(f"Failed to transcode FLAC to WAV: {e}")
# Lossy formats (MP3, OGG, AAC, M4A): use pydub + ffmpeg
return _transcode_with_pydub(audio_data, fmt, "wav")
def transcode_to_mp3(audio_data: bytes, bitrate: str = "256k") -> bytes:
"""Transcode audio to MP3 format.
Uses pydub (wraps ffmpeg) for transcoding.
Args:
audio_data: Raw audio file bytes in any supported format.
bitrate: Target MP3 bitrate (e.g., "128k", "192k", "256k", "320k").
Returns:
MP3 file bytes.
Raises:
AudioTranscodeError: If transcoding fails or pydub/ffmpeg unavailable.
"""
fmt = detect_audio_format(audio_data)
if fmt == "unknown":
raise UnsupportedAudioFormatError(
"Cannot detect audio format. Supported: WAV, FLAC, MP3, OGG, AAC, M4A."
)
try:
from pydub import AudioSegment
except ImportError:
raise AudioTranscodeError(
"pydub package is required for MP3 transcoding. Install with: pip install pydub"
)
if not has_ffmpeg_support():
raise AudioTranscodeError(
"ffmpeg is required for MP3 transcoding. Install ffmpeg on your system."
)
try:
# Map our format names to pydub format names
pydub_fmt = _pydub_format(fmt)
buf = io.BytesIO(audio_data)
audio = AudioSegment.from_file(buf, format=pydub_fmt)
out = io.BytesIO()
audio.export(out, format="mp3", bitrate=bitrate)
return out.getvalue()
except Exception as e:
raise AudioTranscodeError(f"Failed to transcode to MP3: {e}")
def _transcode_with_pydub(audio_data: bytes, src_fmt: str, dst_fmt: str) -> bytes:
"""Transcode audio using pydub (requires ffmpeg).
Args:
audio_data: Raw audio bytes.
src_fmt: Source format string (our naming).
dst_fmt: Destination format string ("wav" or "mp3").
Returns:
Transcoded audio bytes.
Raises:
AudioTranscodeError: If transcoding fails.
"""
try:
from pydub import AudioSegment
except ImportError:
raise AudioTranscodeError(
"pydub package is required for audio transcoding. Install with: pip install pydub"
)
if not has_ffmpeg_support():
raise AudioTranscodeError(
"ffmpeg is required for audio transcoding. Install ffmpeg on your system."
)
try:
pydub_fmt = _pydub_format(src_fmt)
buf = io.BytesIO(audio_data)
audio = AudioSegment.from_file(buf, format=pydub_fmt)
out = io.BytesIO()
if dst_fmt == "wav":
audio.export(out, format="wav")
else:
audio.export(out, format=dst_fmt)
return out.getvalue()
except Exception as e:
raise AudioTranscodeError(f"Failed to transcode {src_fmt} to {dst_fmt}: {e}")
def _pydub_format(fmt: str) -> str:
"""Map our format names to pydub/ffmpeg format names.
Args:
fmt: Our internal format name.
Returns:
pydub-compatible format string.
"""
mapping = {
"wav": "wav",
"flac": "flac",
"mp3": "mp3",
"ogg": "ogg",
"aac": "aac",
"m4a": "m4a",
}
return mapping.get(fmt, fmt)
# =============================================================================
# METADATA EXTRACTION
# =============================================================================
def get_audio_info(audio_data: bytes) -> AudioInfo:
"""Extract audio metadata from raw audio bytes.
Uses soundfile for WAV/FLAC (fast, no ffmpeg dependency).
Falls back to pydub for other formats (requires ffmpeg).
Args:
audio_data: Raw audio file bytes.
Returns:
AudioInfo dataclass with sample rate, channels, duration, etc.
Raises:
UnsupportedAudioFormatError: If the format cannot be detected.
AudioTranscodeError: If metadata extraction fails.
"""
fmt = detect_audio_format(audio_data)
if fmt == "unknown":
raise UnsupportedAudioFormatError(
"Cannot detect audio format. Supported: WAV, FLAC, MP3, OGG, AAC, M4A."
)
# WAV and FLAC: use soundfile (fast)
if fmt in ("wav", "flac"):
return _get_info_soundfile(audio_data, fmt)
# Lossy formats: use pydub
return _get_info_pydub(audio_data, fmt)
def _get_info_soundfile(audio_data: bytes, fmt: str) -> AudioInfo:
"""Extract audio info using soundfile (WAV/FLAC).
Args:
audio_data: Raw audio bytes.
fmt: Format string ("wav" or "flac").
Returns:
AudioInfo with metadata.
"""
try:
import soundfile as sf
except ImportError:
raise AudioTranscodeError("soundfile package is required. Install with: pip install soundfile")
try:
buf = io.BytesIO(audio_data)
info = sf.info(buf)
# Determine bit depth from subtype
bit_depth = _bit_depth_from_subtype(info.subtype)
return AudioInfo(
sample_rate=info.samplerate,
channels=info.channels,
duration_seconds=info.duration,
num_samples=info.frames,
format=fmt,
bitrate=None,
bit_depth=bit_depth,
)
except Exception as e:
raise AudioTranscodeError(f"Failed to read {fmt.upper()} metadata: {e}")
def _bit_depth_from_subtype(subtype: str) -> int | None:
"""Determine bit depth from soundfile subtype string.
Args:
subtype: Soundfile subtype (e.g., "PCM_16", "PCM_24", "FLOAT").
Returns:
Bit depth as integer, or None if unknown.
"""
subtype_map = {
"PCM_S8": 8,
"PCM_U8": 8,
"PCM_16": 16,
"PCM_24": 24,
"PCM_32": 32,
"FLOAT": 32,
"DOUBLE": 64,
}
return subtype_map.get(subtype)
def _get_info_pydub(audio_data: bytes, fmt: str) -> AudioInfo:
"""Extract audio info using pydub (lossy formats).
Args:
audio_data: Raw audio bytes.
fmt: Format string ("mp3", "ogg", "aac", "m4a").
Returns:
AudioInfo with metadata.
"""
try:
from pydub import AudioSegment
except ImportError:
raise AudioTranscodeError(
"pydub package is required for audio metadata. Install with: pip install pydub"
)
if not has_ffmpeg_support():
raise AudioTranscodeError(
"ffmpeg is required for audio metadata extraction. Install ffmpeg on your system."
)
try:
pydub_fmt = _pydub_format(fmt)
buf = io.BytesIO(audio_data)
audio = AudioSegment.from_file(buf, format=pydub_fmt)
num_samples = int(audio.frame_count())
duration = audio.duration_seconds
sample_rate = audio.frame_rate
channels = audio.channels
# Estimate bitrate from file size and duration
bitrate = None
if duration > 0:
bitrate = int((len(audio_data) * 8) / duration)
return AudioInfo(
sample_rate=sample_rate,
channels=channels,
duration_seconds=duration,
num_samples=num_samples,
format=fmt,
bitrate=bitrate,
bit_depth=audio.sample_width * 8 if audio.sample_width else None,
)
except Exception as e:
raise AudioTranscodeError(f"Failed to read {fmt.upper()} metadata: {e}")
# =============================================================================
# VALIDATION
# =============================================================================
def validate_audio(
audio_data: bytes,
name: str = "Audio",
check_duration: bool = True,
) -> ValidationResult:
"""Validate audio data for steganography.
Checks:
- Not empty
- Not too large (MAX_AUDIO_FILE_SIZE)
- Valid audio format (detectable via magic bytes)
- Duration within limits (MAX_AUDIO_DURATION) if check_duration=True
- Sample rate within limits (MIN_AUDIO_SAMPLE_RATE to MAX_AUDIO_SAMPLE_RATE)
Args:
audio_data: Raw audio file bytes.
name: Descriptive name for error messages (default: "Audio").
check_duration: Whether to enforce duration limit (default: True).
Returns:
ValidationResult with audio info in details (sample_rate, channels,
duration, num_samples, format) on success.
"""
if not audio_data:
return ValidationResult.error(f"{name} is required")
if len(audio_data) > MAX_AUDIO_FILE_SIZE:
size_mb = len(audio_data) / (1024 * 1024)
max_mb = MAX_AUDIO_FILE_SIZE / (1024 * 1024)
return ValidationResult.error(
f"{name} too large ({size_mb:.1f} MB). Maximum: {max_mb:.0f} MB"
)
# Detect format
fmt = detect_audio_format(audio_data)
if fmt == "unknown":
return ValidationResult.error(
f"Could not detect {name} format. "
"Supported formats: WAV, FLAC, MP3, OGG, AAC, M4A."
)
# Extract metadata for further validation
try:
info = get_audio_info(audio_data)
except (AudioTranscodeError, UnsupportedAudioFormatError) as e:
return ValidationResult.error(f"Could not read {name}: {e}")
except Exception as e:
return ValidationResult.error(f"Could not read {name}: {e}")
# Check duration
if check_duration and info.duration_seconds > MAX_AUDIO_DURATION:
return ValidationResult.error(
f"{name} too long ({info.duration_seconds:.1f}s). "
f"Maximum: {MAX_AUDIO_DURATION}s ({MAX_AUDIO_DURATION // 60} minutes)"
)
# Check sample rate
if info.sample_rate < MIN_AUDIO_SAMPLE_RATE:
return ValidationResult.error(
f"{name} sample rate too low ({info.sample_rate} Hz). "
f"Minimum: {MIN_AUDIO_SAMPLE_RATE} Hz"
)
if info.sample_rate > MAX_AUDIO_SAMPLE_RATE:
return ValidationResult.error(
f"{name} sample rate too high ({info.sample_rate} Hz). "
f"Maximum: {MAX_AUDIO_SAMPLE_RATE} Hz"
)
return ValidationResult.ok(
sample_rate=info.sample_rate,
channels=info.channels,
duration=info.duration_seconds,
num_samples=info.num_samples,
format=info.format,
bitrate=info.bitrate,
bit_depth=info.bit_depth,
)
def require_valid_audio(audio_data: bytes, name: str = "Audio") -> None:
"""Validate audio, raising AudioValidationError on failure.
Args:
audio_data: Raw audio file bytes.
name: Descriptive name for error messages.
Raises:
AudioValidationError: If validation fails.
"""
result = validate_audio(audio_data, name)
if not result.is_valid:
raise AudioValidationError(result.error_message)
def validate_audio_embed_mode(mode: str) -> ValidationResult:
"""Validate audio embedding mode string.
Args:
mode: Embedding mode to validate (e.g., "audio_lsb", "audio_mdct", "audio_auto").
Returns:
ValidationResult with mode in details on success.
"""
valid_modes = VALID_AUDIO_EMBED_MODES | {EMBED_MODE_AUDIO_AUTO}
if mode not in valid_modes:
return ValidationResult.error(
f"Invalid audio embed_mode: '{mode}'. "
f"Valid options: {', '.join(sorted(valid_modes))}"
)
return ValidationResult.ok(mode=mode)

View File

@@ -404,6 +404,219 @@ def decode(ctx, image, reference, passphrase, pin, output):
raise SystemExit(1)
# =============================================================================
# AUDIO COMMANDS (v4.3.0)
# =============================================================================
@cli.command("audio-encode")
@click.argument("carrier", type=click.Path(exists=True))
@click.option(
"-r",
"--reference",
required=True,
type=click.Path(exists=True),
help="Reference photo (shared secret)",
)
@click.option("-m", "--message", help="Message to encode")
@click.option(
"-f",
"--file",
"file_payload",
type=click.Path(exists=True),
help="File to embed instead of message",
)
@click.option("-o", "--output", type=click.Path(), help="Output audio path")
@click.option(
"--mode",
"embed_mode",
default="audio_lsb",
type=click.Choice(["audio_lsb", "audio_spread"]),
help="Embedding mode",
)
@click.option(
"--passphrase",
prompt=True,
hide_input=True,
confirmation_prompt=True,
help="Passphrase (recommend 4+ words)",
)
@click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code")
@click.pass_context
def audio_encode(ctx, carrier, reference, message, file_payload, output, embed_mode, passphrase, pin):
"""
Encode a message or file into an audio carrier.
Examples:
stegasoo audio-encode carrier.wav -r ref.jpg -m "Secret" --mode audio_lsb
stegasoo audio-encode carrier.wav -r ref.jpg -f secret.pdf --mode audio_spread
"""
from .encode import encode_audio
from .models import FilePayload
if not message and not file_payload:
raise click.UsageError("Either --message or --file is required")
# Read input files
with open(reference, "rb") as f:
reference_data = f.read()
with open(carrier, "rb") as f:
carrier_data = f.read()
# Determine output path
if not output:
carrier_path = Path(carrier)
if embed_mode == "audio_lsb":
output = f"{carrier_path.stem}_encoded.wav"
else:
output = f"{carrier_path.stem}_encoded.wav"
try:
if file_payload:
payload = FilePayload.from_file(file_payload)
else:
payload = message
stego_audio, stats = encode_audio(
message=payload,
reference_photo=reference_data,
carrier_audio=carrier_data,
passphrase=passphrase,
pin=pin,
embed_mode=embed_mode,
)
with open(output, "wb") as f:
f.write(stego_audio)
if ctx.obj.get("json"):
click.echo(
json.dumps(
{
"status": "success",
"carrier": carrier,
"reference": reference,
"output": output,
"mode": stats.embed_mode,
"samples_modified": stats.samples_modified,
"duration_seconds": round(stats.duration_seconds, 2),
"capacity_used": round(stats.capacity_used * 100, 1),
},
indent=2,
)
)
else:
click.echo(f"✓ Encoded to {output}")
click.echo(f" Mode: {stats.embed_mode}")
click.echo(f" Duration: {stats.duration_seconds:.1f}s")
click.echo(f" Capacity used: {stats.capacity_used * 100:.1f}%")
except Exception as e:
if ctx.obj.get("json"):
click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
else:
click.echo(f"✗ Audio encoding failed: {e}", err=True)
raise SystemExit(1)
@cli.command("audio-decode")
@click.argument("audio", type=click.Path(exists=True))
@click.option(
"-r",
"--reference",
required=True,
type=click.Path(exists=True),
help="Reference photo (shared secret)",
)
@click.option(
"--mode",
"embed_mode",
default="audio_auto",
type=click.Choice(["audio_auto", "audio_lsb", "audio_spread"]),
help="Embedding mode (auto-detect by default)",
)
@click.option("--passphrase", prompt=True, hide_input=True, help="Passphrase")
@click.option("--pin", prompt=True, hide_input=True, help="PIN code")
@click.option("-o", "--output", type=click.Path(), help="Output path for file payloads")
@click.pass_context
def audio_decode(ctx, audio, reference, embed_mode, passphrase, pin, output):
"""
Decode a message or file from stego audio.
Examples:
stegasoo audio-decode stego.wav -r ref.jpg
stegasoo audio-decode stego.wav -r ref.jpg --mode audio_lsb -o ./extracted/
"""
from .decode import decode_audio
with open(audio, "rb") as f:
audio_data = f.read()
with open(reference, "rb") as f:
reference_data = f.read()
try:
result = decode_audio(
stego_audio=audio_data,
reference_photo=reference_data,
passphrase=passphrase,
pin=pin,
embed_mode=embed_mode,
)
if result.is_file:
filename = result.filename or "decoded_file"
output_path = Path(output) / filename if output else Path(filename)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
f.write(result.file_data)
if ctx.obj.get("json"):
click.echo(
json.dumps(
{
"status": "success",
"audio": audio,
"payload_type": "file",
"filename": filename,
"output": str(output_path),
"size": len(result.file_data),
},
indent=2,
)
)
else:
click.echo(f"✓ Extracted file: {output_path}")
click.echo(f" Size: {len(result.file_data):,} bytes")
else:
if ctx.obj.get("json"):
click.echo(
json.dumps(
{
"status": "success",
"audio": audio,
"payload_type": "text",
"message": result.message,
},
indent=2,
)
)
else:
click.echo(f"Decoded from {audio}:")
click.echo(result.message)
except Exception as e:
if ctx.obj.get("json"):
click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
else:
click.echo(f"✗ Audio decoding failed: {e}", err=True)
raise SystemExit(1)
# =============================================================================
# BATCH COMMANDS
# =============================================================================

View File

@@ -295,3 +295,36 @@ def detect_stego_mode(encrypted_data: bytes) -> str:
return EMBED_MODE_DCT
else:
return "unknown"
# =============================================================================
# AUDIO STEGANOGRAPHY (v4.3.0)
# =============================================================================
# Audio embedding modes
EMBED_MODE_AUDIO_LSB = "audio_lsb"
EMBED_MODE_AUDIO_SPREAD = "audio_spread"
EMBED_MODE_AUDIO_AUTO = "audio_auto"
VALID_AUDIO_EMBED_MODES = {EMBED_MODE_AUDIO_LSB, EMBED_MODE_AUDIO_SPREAD}
# Audio magic bytes (for format detection in stego audio)
AUDIO_MAGIC_LSB = b"AUDL"
AUDIO_MAGIC_SPREAD = b"AUDS"
# Audio input limits
MAX_AUDIO_DURATION = 600 # 10 minutes
MAX_AUDIO_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
MIN_AUDIO_SAMPLE_RATE = 8000 # G.729 level
MAX_AUDIO_SAMPLE_RATE = 192000 # Studio quality
ALLOWED_AUDIO_EXTENSIONS = {"wav", "flac", "mp3", "ogg", "opus", "aac", "m4a", "wma"}
# Spread spectrum parameters
AUDIO_SS_CHIP_LENGTH = 1024 # Samples per chip (spreading factor)
AUDIO_SS_AMPLITUDE = 0.05 # Per-sample embedding strength (~-26dB, masked by audio)
AUDIO_SS_RS_NSYM = 32 # Reed-Solomon parity symbols
# Echo hiding parameters
AUDIO_ECHO_DELAY_0 = 50 # Echo delay for bit 0 (samples at 44.1kHz ~ 1.1ms)
AUDIO_ECHO_DELAY_1 = 100 # Echo delay for bit 1 (samples at 44.1kHz ~ 2.3ms)
AUDIO_ECHO_AMPLITUDE = 0.3 # Echo strength (relative to original)
AUDIO_ECHO_WINDOW_SIZE = 8192 # Window size for echo embedding

View File

@@ -261,3 +261,117 @@ def decode_text(
return ""
return result.message or ""
def decode_audio(
stego_audio: bytes,
reference_photo: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "audio_auto",
channel_key: str | bool | None = None,
progress_file: str | None = None,
) -> DecodeResult:
"""
Decode a message or file from stego audio.
Args:
stego_audio: Stego audio bytes
reference_photo: Shared reference photo bytes
passphrase: Shared passphrase
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
rsa_password: Optional RSA key password
embed_mode: 'audio_auto', 'audio_lsb', or 'audio_spread'
channel_key: Channel key for deployment/group isolation
progress_file: Optional path to write progress JSON
Returns:
DecodeResult with message or file data
"""
from .audio_utils import detect_audio_format, transcode_to_wav
from .constants import (
EMBED_MODE_AUDIO_AUTO,
EMBED_MODE_AUDIO_LSB,
EMBED_MODE_AUDIO_SPREAD,
)
debug.print(
f"decode_audio: mode={embed_mode}, "
f"passphrase length={len(passphrase.split())} words"
)
# Validate inputs
require_valid_image(reference_photo, "Reference photo")
require_security_factors(pin, rsa_key_data)
if pin:
require_valid_pin(pin)
if rsa_key_data:
require_valid_rsa_key(rsa_key_data, rsa_password)
# Detect format and transcode to WAV for processing
audio_format = detect_audio_format(stego_audio)
debug.print(f"Detected audio format: {audio_format}")
wav_audio = stego_audio
if audio_format != "wav":
debug.print(f"Transcoding {audio_format} to WAV for extraction")
wav_audio = transcode_to_wav(stego_audio)
_write_progress(progress_file, 20, 100, "initializing")
# Derive sample selection key
from .crypto import derive_pixel_key
pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key)
_write_progress(progress_file, 25, 100, "extracting")
encrypted = None
if embed_mode == EMBED_MODE_AUDIO_AUTO:
# Try modes in order: spread spectrum -> LSB
try:
from .spread_steganography import extract_from_audio_spread
encrypted = extract_from_audio_spread(wav_audio, pixel_key)
if encrypted:
debug.print("Auto-detect: spread spectrum extraction succeeded")
except (ImportError, Exception):
pass
if not encrypted:
from .audio_steganography import extract_from_audio_lsb
encrypted = extract_from_audio_lsb(wav_audio, pixel_key)
if encrypted:
debug.print("Auto-detect: LSB extraction succeeded")
elif embed_mode == EMBED_MODE_AUDIO_LSB:
from .audio_steganography import extract_from_audio_lsb
encrypted = extract_from_audio_lsb(wav_audio, pixel_key, progress_file=progress_file)
elif embed_mode == EMBED_MODE_AUDIO_SPREAD:
from .spread_steganography import extract_from_audio_spread
encrypted = extract_from_audio_spread(
wav_audio, pixel_key, progress_file=progress_file
)
else:
raise ValueError(f"Invalid audio embed mode: {embed_mode}")
if not encrypted:
debug.print("No data extracted from audio")
raise ExtractionError("Could not extract data from audio. Check your credentials.")
debug.print(f"Extracted {len(encrypted)} bytes from audio")
# Decrypt
result = decrypt_message(encrypted, reference_photo, passphrase, pin, rsa_key_data, channel_key)
debug.print(f"Decryption successful: {result.payload_type}")
return result

View File

@@ -5,9 +5,15 @@ High-level encoding functions for hiding messages and files in images.
Changes in v4.0.0:
- Added channel_key parameter for deployment/group isolation
Changes in v4.3.0:
- Added encode_audio() for audio steganography
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from .constants import EMBED_MODE_LSB
from .crypto import derive_pixel_key, encrypt_message
@@ -23,6 +29,9 @@ from .validation import (
require_valid_rsa_key,
)
if TYPE_CHECKING:
from .models import AudioEmbedStats
def encode(
message: str | bytes | FilePayload,
@@ -258,3 +267,88 @@ def encode_bytes(
dct_color_mode=dct_color_mode,
channel_key=channel_key,
)
def encode_audio(
message: str | bytes | FilePayload,
reference_photo: bytes,
carrier_audio: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "audio_lsb",
channel_key: str | bool | None = None,
progress_file: str | None = None,
) -> tuple[bytes, AudioEmbedStats]:
"""
Encode a message or file into an audio carrier.
Args:
message: Text message, raw bytes, or FilePayload to hide
reference_photo: Shared reference photo bytes
carrier_audio: Carrier audio bytes (WAV, FLAC, MP3, etc.)
passphrase: Shared passphrase
pin: Optional static PIN
rsa_key_data: Optional RSA private key PEM bytes
rsa_password: Optional password for encrypted RSA key
embed_mode: 'audio_lsb' or 'audio_spread'
channel_key: Channel key for deployment/group isolation
progress_file: Optional path to write progress JSON
Returns:
Tuple of (stego audio bytes, AudioEmbedStats)
"""
from .audio_utils import detect_audio_format, transcode_to_wav
from .constants import EMBED_MODE_AUDIO_LSB, EMBED_MODE_AUDIO_SPREAD
debug.print(
f"encode_audio: mode={embed_mode}, "
f"passphrase length={len(passphrase.split())} words, "
f"pin={'set' if pin else 'none'}"
)
# Validate inputs
require_valid_payload(message)
require_valid_image(reference_photo, "Reference photo")
require_security_factors(pin, rsa_key_data)
if pin:
require_valid_pin(pin)
if rsa_key_data:
require_valid_rsa_key(rsa_key_data, rsa_password)
# Detect audio format and transcode to WAV if needed
audio_format = detect_audio_format(carrier_audio)
debug.print(f"Detected audio format: {audio_format}")
if audio_format not in ("wav", "flac"):
debug.print(f"Transcoding {audio_format} to WAV for embedding")
carrier_audio = transcode_to_wav(carrier_audio)
# Encrypt message
encrypted = encrypt_message(
message, reference_photo, passphrase, pin, rsa_key_data, channel_key
)
debug.print(f"Encrypted payload: {len(encrypted)} bytes")
# Derive sample selection key
pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key)
# Embed based on mode
if embed_mode == EMBED_MODE_AUDIO_LSB:
from .audio_steganography import embed_in_audio_lsb
stego_audio, stats = embed_in_audio_lsb(
encrypted, carrier_audio, pixel_key, progress_file=progress_file
)
elif embed_mode == EMBED_MODE_AUDIO_SPREAD:
from .spread_steganography import embed_in_audio_spread
stego_audio, stats = embed_in_audio_spread(
encrypted, carrier_audio, pixel_key, progress_file=progress_file
)
else:
raise ValueError(f"Invalid audio embed mode: {embed_mode}")
return stego_audio, stats

View File

@@ -195,3 +195,51 @@ class UnsupportedFileTypeError(FileError):
super().__init__(
f"Unsupported file type: .{extension}. Allowed: {', '.join(sorted(allowed))}"
)
# ============================================================================
# AUDIO ERRORS
# ============================================================================
class AudioError(SteganographyError):
"""Base class for audio steganography errors."""
pass
class AudioValidationError(ValidationError):
"""Audio validation failed."""
pass
class AudioCapacityError(CapacityError):
"""Audio carrier too small for message."""
def __init__(self, needed: int, available: int):
self.needed = needed
self.available = available
# Call SteganographyError.__init__ directly (skip CapacityError's image-specific message)
SteganographyError.__init__(
self,
f"Audio carrier too small. Need {needed:,} bytes, have {available:,} bytes capacity.",
)
class AudioExtractionError(ExtractionError):
"""Failed to extract hidden data from audio."""
pass
class AudioTranscodeError(AudioError):
"""Audio transcoding failed."""
pass
class UnsupportedAudioFormatError(AudioError):
"""Audio format not supported."""
pass

View File

@@ -281,3 +281,51 @@ class GenerateResult:
lines.append(f" RSA Key: {len(self.rsa_key_pem)} bytes PEM")
lines.append(f" Total Entropy: {self.total_entropy} bits")
return "\n".join(lines)
# =============================================================================
# AUDIO STEGANOGRAPHY MODELS (v4.3.0)
# =============================================================================
@dataclass
class AudioEmbedStats:
"""Statistics from audio embedding."""
samples_modified: int
total_samples: int
capacity_used: float # 0.0 - 1.0
bytes_embedded: int
sample_rate: int
channels: int
duration_seconds: float
embed_mode: str # "audio_lsb" or "audio_spread"
@property
def modification_percent(self) -> float:
"""Percentage of samples modified."""
return (self.samples_modified / self.total_samples) * 100 if self.total_samples > 0 else 0
@dataclass
class AudioInfo:
"""Information about an audio file."""
sample_rate: int
channels: int
duration_seconds: float
num_samples: int
format: str # "wav", "flac", "mp3", etc.
bitrate: int | None = None # For lossy formats
bit_depth: int | None = None # For lossless formats
@dataclass
class AudioCapacityInfo:
"""Capacity information for audio steganography."""
total_samples: int
usable_capacity_bytes: int
embed_mode: str
sample_rate: int
duration_seconds: float

View File

@@ -0,0 +1,735 @@
"""
Spread Spectrum Audio Steganography Module (v4.3.0)
Hides data in audio by adding keyed pseudo-random noise (spread spectrum)
below the threshold of audibility. Designed to survive lossy compression
(MP3, AAC, Opus) better than LSB embedding, which requires lossless carriers.
How it works:
Each payload bit is "spread" over AUDIO_SS_CHIP_LENGTH audio samples using
a unique ChaCha20-derived chip sequence. A '1' bit adds the chip pattern;
a '0' bit subtracts it. On extraction, correlating the stego audio against
the same chip sequence recovers each bit -- even after moderate lossy
compression, because the correlation survives quantisation noise.
Data layout in the carrier:
[4B magic AUDS] [4B length x3 copies] [RS-encoded payload]
All converted to bits and embedded sequentially via spread spectrum.
Three copies of the length field enable majority voting for recovery.
Error correction:
The raw payload is protected with Reed-Solomon coding (AUDIO_SS_RS_NSYM
parity symbols per 255-byte block) so that bit errors introduced by
compression or DAC/ADC round-trips can be corrected transparently.
Requires: soundfile, numpy, cryptography, reedsolo (optional but recommended)
"""
from __future__ import annotations
import io
import struct
import numpy as np
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from .constants import (
AUDIO_MAGIC_SPREAD,
AUDIO_SS_AMPLITUDE,
AUDIO_SS_CHIP_LENGTH,
AUDIO_SS_RS_NSYM,
EMBED_MODE_AUDIO_SPREAD,
)
from .debug import debug
from .exceptions import AudioCapacityError, AudioError
from .models import AudioCapacityInfo, AudioEmbedStats
# Lazy import for soundfile
try:
import soundfile as sf
HAS_SOUNDFILE = True
except ImportError:
HAS_SOUNDFILE = False
sf = None # type: ignore[assignment]
# Lazy import for reedsolo
try:
from reedsolo import ReedSolomonError, RSCodec
HAS_REEDSOLO = True
except ImportError:
HAS_REEDSOLO = False
RSCodec = None # type: ignore[assignment,misc]
ReedSolomonError = None # type: ignore[assignment,misc]
# Header layout: 4B magic + 3 x 4B length = 16 bytes = 128 bits
_HEADER_SIZE = 16
_MAGIC_SIZE = 4
_LENGTH_COPIES = 3
# Progress reporting interval (every N bits)
_PROGRESS_INTERVAL = 500
# =============================================================================
# PROGRESS REPORTING
# =============================================================================
def _write_progress(
progress_file: str | None, current: int, total: int, phase: str = "embedding"
) -> None:
"""Write progress to file for frontend polling."""
if progress_file is None:
return
try:
import json
with open(progress_file, "w") as f:
json.dump(
{
"current": current,
"total": total,
"percent": round((current / total) * 100, 1) if total > 0 else 0,
"phase": phase,
},
f,
)
except Exception:
pass # Don't let progress writing break encoding
# =============================================================================
# REED-SOLOMON
# =============================================================================
def _rs_encode(data: bytes) -> bytes:
"""
Wrap data in Reed-Solomon error correction.
Adds AUDIO_SS_RS_NSYM parity symbols per 255-byte block, allowing
recovery of up to RS_NSYM/2 byte errors per block.
"""
if not HAS_REEDSOLO:
return data
rs = RSCodec(AUDIO_SS_RS_NSYM)
return bytes(rs.encode(data))
def _rs_decode(data: bytes) -> bytes | None:
"""
Decode Reed-Solomon protected data.
Returns the corrected payload bytes, or None if the data is
too corrupted for error correction to recover.
"""
if not HAS_REEDSOLO:
return data
rs = RSCodec(AUDIO_SS_RS_NSYM)
try:
decoded, _, errata_pos = rs.decode(data)
if errata_pos:
debug.print(f"RS corrected {len(errata_pos)} byte errors")
return bytes(decoded)
except ReedSolomonError as e:
debug.print(f"RS decode failed (too many errors): {e}")
return None
# =============================================================================
# CHIP SEQUENCE GENERATION (ChaCha20 CSPRNG)
# =============================================================================
def _generate_chip_sequence(seed: bytes, chip_index: int, length: int) -> np.ndarray:
"""
Generate a pseudo-random chip sequence for spread spectrum embedding.
Uses ChaCha20 as a CSPRNG keyed by ``seed``, with ``chip_index`` encoded
into the nonce so that each bit position gets a unique, deterministic
spreading code.
Args:
seed: 32-byte key for ChaCha20. Padded/hashed to 32B if shorter.
chip_index: Index of the bit being embedded (used as nonce material).
length: Number of samples in the chip (AUDIO_SS_CHIP_LENGTH).
Returns:
Float64 numpy array of ``length`` elements, normalised to unit energy.
"""
# Ensure seed is exactly 32 bytes
if len(seed) < 32:
import hashlib
seed = hashlib.sha256(seed).digest()
elif len(seed) > 32:
seed = seed[:32]
# Build a 16-byte nonce from chip_index (ChaCha20 uses 16B nonce in cryptography lib)
nonce = chip_index.to_bytes(16, byteorder="big")
cipher = Cipher(algorithms.ChaCha20(seed, nonce), mode=None, backend=default_backend())
encryptor = cipher.encryptor()
random_bytes = encryptor.update(b"\x00" * length)
# Map bytes to bipolar ±1 spreading code (DSSS standard)
raw = np.frombuffer(random_bytes, dtype=np.uint8)
chip = np.where(raw < 128, np.float64(-1.0), np.float64(1.0))
return chip
# =============================================================================
# SPREAD SPECTRUM CORE
# =============================================================================
def _embed_spread_spectrum(
samples: np.ndarray,
bits: list[int],
seed: bytes,
amplitude: float,
offset: int = 0,
progress_file: str | None = None,
) -> np.ndarray:
"""
Embed bits into audio samples using direct-sequence spread spectrum.
For each bit at index i:
- Generate the chip sequence for that index
- bit 1 -> add amplitude * chip to the carrier
- bit 0 -> subtract amplitude * chip from the carrier
Args:
samples: 1-D float64 audio samples (modified in-place and returned).
bits: List of 0/1 ints to embed.
seed: 32-byte key for chip generation.
amplitude: Embedding strength (AUDIO_SS_AMPLITUDE).
offset: Sample offset at which spread embedding begins.
progress_file: Optional path for progress JSON.
Returns:
Modified samples array.
"""
total_bits = len(bits)
for i, bit in enumerate(bits):
start = offset + i * AUDIO_SS_CHIP_LENGTH
end = start + AUDIO_SS_CHIP_LENGTH
if end > len(samples):
debug.print(f"Warning: ran out of samples at bit {i}/{total_bits}")
break
chip = _generate_chip_sequence(seed, i, AUDIO_SS_CHIP_LENGTH)
if bit == 1:
samples[start:end] += amplitude * chip
else:
samples[start:end] -= amplitude * chip
if progress_file and i % _PROGRESS_INTERVAL == 0:
_write_progress(progress_file, i, total_bits, "embedding")
return samples
def _extract_spread_spectrum(
samples: np.ndarray,
num_bits: int,
seed: bytes,
offset: int = 0,
progress_file: str | None = None,
) -> list[int]:
"""
Extract bits from audio using spread spectrum correlation.
For each bit index i, correlate the sample window with the chip
sequence. Positive correlation -> 1, negative -> 0.
Args:
samples: 1-D float64 audio samples.
num_bits: Number of bits to extract.
seed: 32-byte key (must match embedding key).
offset: Sample offset where spread data begins.
progress_file: Optional path for progress JSON.
Returns:
List of extracted 0/1 ints.
"""
bits: list[int] = []
for i in range(num_bits):
start = offset + i * AUDIO_SS_CHIP_LENGTH
end = start + AUDIO_SS_CHIP_LENGTH
if end > len(samples):
debug.print(f"Warning: ran out of samples at bit {i}/{num_bits}")
break
chip = _generate_chip_sequence(seed, i, AUDIO_SS_CHIP_LENGTH)
correlation = np.dot(samples[start:end], chip)
bits.append(1 if correlation > 0 else 0)
if progress_file and i % _PROGRESS_INTERVAL == 0:
_write_progress(progress_file, i, num_bits, "extracting")
return bits
# =============================================================================
# BIT CONVERSION UTILITIES
# =============================================================================
def _bytes_to_bits(data: bytes) -> list[int]:
"""Convert a byte string to a list of 0/1 ints (MSB first per byte)."""
bits: list[int] = []
for byte in data:
for shift in range(7, -1, -1):
bits.append((byte >> shift) & 1)
return bits
def _bits_to_bytes(bits: list[int]) -> bytes:
"""Convert a list of 0/1 ints back to bytes (MSB first per byte)."""
result = bytearray()
for i in range(0, len(bits) - 7, 8):
byte_val = 0
for j in range(8):
byte_val = (byte_val << 1) | bits[i + j]
result.append(byte_val)
return bytes(result)
# =============================================================================
# MAJORITY VOTING
# =============================================================================
def _majority_vote_length(length_bytes: bytes) -> int | None:
"""
Extract the payload length from three 4-byte copies via majority voting.
Each copy is a big-endian uint32. The value that appears at least twice
wins. Returns None if all three disagree.
"""
if len(length_bytes) < 12:
return None
copies = [
struct.unpack(">I", length_bytes[0:4])[0],
struct.unpack(">I", length_bytes[4:8])[0],
struct.unpack(">I", length_bytes[8:12])[0],
]
debug.print(f"Length copies for majority vote: {copies}")
if copies[0] == copies[1] or copies[0] == copies[2]:
return copies[0]
if copies[1] == copies[2]:
return copies[1]
debug.print("Majority vote failed: all three length copies disagree")
return None
# =============================================================================
# HEADER CONSTRUCTION
# =============================================================================
def _build_header(data_length: int) -> bytes:
"""
Build the spread spectrum header.
Layout: AUDIO_MAGIC_SPREAD (4B) + length (4B) x 3 copies = 16 bytes.
"""
length_packed = struct.pack(">I", data_length)
return AUDIO_MAGIC_SPREAD + length_packed * _LENGTH_COPIES
def _parse_header(header_bytes: bytes) -> tuple[bool, int | None]:
"""
Parse and validate the spread spectrum header.
Returns:
(magic_valid, payload_length) -- length is None if voting fails.
"""
if len(header_bytes) < _HEADER_SIZE:
return False, None
magic = header_bytes[:_MAGIC_SIZE]
if magic != AUDIO_MAGIC_SPREAD:
debug.print(f"Magic mismatch: got {magic!r}, expected {AUDIO_MAGIC_SPREAD!r}")
return False, None
length = _majority_vote_length(header_bytes[_MAGIC_SIZE:_HEADER_SIZE])
return True, length
# =============================================================================
# PUBLIC API
# =============================================================================
def calculate_audio_spread_capacity(audio_data: bytes) -> AudioCapacityInfo:
"""
Calculate embedding capacity for spread spectrum audio steganography.
Loads the carrier audio, determines how many spread spectrum bits can
fit, accounts for Reed-Solomon overhead and the fixed header, and
returns the usable payload capacity in bytes.
Args:
audio_data: Raw bytes of a WAV file.
Returns:
AudioCapacityInfo with capacity details.
Raises:
AudioError: If the audio cannot be read.
"""
if not HAS_SOUNDFILE:
raise AudioError("soundfile is required for audio spread spectrum steganography")
try:
info = sf.info(io.BytesIO(audio_data))
except Exception as e:
raise AudioError(f"Failed to read audio file: {e}") from e
total_samples = info.frames * info.channels
total_bits = total_samples // AUDIO_SS_CHIP_LENGTH
total_bytes = total_bits // 8
# Subtract header overhead (16 bytes)
after_header = max(0, total_bytes - _HEADER_SIZE)
# Account for Reed-Solomon overhead: RS adds RS_NSYM parity bytes per 255-byte block
# Usable fraction is (255 - RS_NSYM) / 255
if HAS_REEDSOLO and AUDIO_SS_RS_NSYM > 0:
usable_bytes = int(after_header * (255 - AUDIO_SS_RS_NSYM) / 255)
else:
usable_bytes = after_header
duration = info.frames / info.samplerate
debug.print(
f"Spread spectrum capacity: {usable_bytes} bytes "
f"({total_samples} samples, {total_bits} bits, "
f"{info.samplerate} Hz, {info.channels} ch, {duration:.2f}s)"
)
return AudioCapacityInfo(
total_samples=total_samples,
usable_capacity_bytes=usable_bytes,
embed_mode=EMBED_MODE_AUDIO_SPREAD,
sample_rate=info.samplerate,
duration_seconds=duration,
)
def embed_in_audio_spread(
data: bytes,
carrier_audio: bytes,
seed: bytes,
progress_file: str | None = None,
) -> tuple[bytes, AudioEmbedStats]:
"""
Embed data into audio using spread spectrum steganography.
The payload is RS-encoded, prepended with a magic+length header
(with three copies of the length for majority voting), converted to
bits, and embedded by adding keyed pseudo-random chip sequences
to the carrier audio samples.
Stereo audio is mixed to mono for embedding then the modification
is applied equally to all channels of the original.
Args:
data: Raw payload bytes to embed (already encrypted by caller).
carrier_audio: Raw bytes of the carrier WAV file.
seed: Key material for chip sequence generation (any length,
hashed to 32 bytes internally if needed).
progress_file: Optional path for frontend progress polling.
Returns:
Tuple of (stego WAV bytes, AudioEmbedStats).
Raises:
AudioCapacityError: If the payload is too large for the carrier.
AudioError: On any other embedding failure.
"""
if not HAS_SOUNDFILE:
raise AudioError("soundfile is required for audio spread spectrum steganography")
debug.print(f"Spread spectrum embedding {len(data)} bytes")
try:
# 1. Read carrier audio as float64
buf = io.BytesIO(carrier_audio)
samples, sample_rate = sf.read(buf, dtype="float64", always_2d=True)
original_shape = samples.shape
channels = original_shape[1]
num_frames = original_shape[0]
duration = num_frames / sample_rate
# Read subtype from input to preserve on output
buf.seek(0)
carrier_info = sf.info(buf)
output_subtype = carrier_info.subtype if carrier_info.subtype else "PCM_16"
debug.print(
f"Carrier: {sample_rate} Hz, {channels} ch, "
f"{num_frames} frames, {duration:.2f}s, subtype={output_subtype}"
)
# 2. Mix to mono for embedding (average across channels)
if channels > 1:
mono_samples = np.mean(samples, axis=1)
else:
mono_samples = samples[:, 0].copy()
total_samples = len(mono_samples)
# 3. RS-encode the payload
rs_data = _rs_encode(data)
debug.print(f"RS-encoded payload: {len(data)} -> {len(rs_data)} bytes")
# 4. Build header: magic (4B) + length x3 (12B) = 16B
header = _build_header(len(data))
# 5. Combine header + RS-encoded data and convert to bits
full_payload = header + rs_data
bits = _bytes_to_bits(full_payload)
total_bits = len(bits)
samples_needed = total_bits * AUDIO_SS_CHIP_LENGTH
debug.print(
f"Total payload: {len(full_payload)} bytes = {total_bits} bits, "
f"needs {samples_needed} samples (have {total_samples})"
)
# 6. Check capacity
if samples_needed > total_samples:
max_bytes = (total_samples // AUDIO_SS_CHIP_LENGTH) // 8
raise AudioCapacityError(len(full_payload), max_bytes)
capacity_used = samples_needed / total_samples
# 7. Initial progress
_write_progress(progress_file, 0, total_bits, "embedding")
# 8. Embed via spread spectrum into mono
mono_modified = _embed_spread_spectrum(
mono_samples,
bits,
seed,
AUDIO_SS_AMPLITUDE,
offset=0,
progress_file=progress_file,
)
# 9. Apply modification back to all channels
# delta = modified_mono - original_mono, add delta to each channel
delta = mono_modified - (np.mean(samples, axis=1) if channels > 1 else samples[:, 0])
for ch in range(channels):
samples[:, ch] += delta
# Clip to [-1.0, 1.0] to prevent clipping artefacts
np.clip(samples, -1.0, 1.0, out=samples)
_write_progress(progress_file, total_bits, total_bits, "saving")
# 10. Write back as WAV preserving original subtype
output_buf = io.BytesIO()
sf.write(output_buf, samples, sample_rate, format="WAV", subtype=output_subtype)
output_buf.seek(0)
stego_bytes = output_buf.getvalue()
samples_modified = samples_needed # every chip-length region was touched
stats = AudioEmbedStats(
samples_modified=samples_modified,
total_samples=total_samples * channels,
capacity_used=capacity_used,
bytes_embedded=len(full_payload),
sample_rate=sample_rate,
channels=channels,
duration_seconds=duration,
embed_mode=EMBED_MODE_AUDIO_SPREAD,
)
debug.print(
f"Spread spectrum embedding complete: {len(stego_bytes)} byte WAV, "
f"capacity used {capacity_used * 100:.1f}%"
)
return stego_bytes, stats
except AudioCapacityError:
raise
except Exception as e:
debug.exception(e, "embed_in_audio_spread")
raise AudioError(f"Failed to embed data in audio via spread spectrum: {e}") from e
def extract_from_audio_spread(
audio_data: bytes,
seed: bytes,
progress_file: str | None = None,
) -> bytes | None:
"""
Extract hidden data from audio using spread spectrum correlation.
Loads the stego audio, extracts the header bits to recover the magic
marker and payload length (via majority voting on three copies), then
extracts the full RS-protected payload and decodes it.
Args:
audio_data: Raw bytes of the stego WAV file.
seed: Key material (must match the seed used for embedding).
progress_file: Optional path for frontend progress polling.
Returns:
Extracted payload bytes, or None if extraction fails (wrong key,
no data found, corrupted beyond recovery).
"""
if not HAS_SOUNDFILE:
debug.print("soundfile not available for spread spectrum extraction")
return None
debug.print(f"Spread spectrum extracting from {len(audio_data)} byte audio")
try:
# 1. Read stego audio as float64
samples, sample_rate = sf.read(io.BytesIO(audio_data), dtype="float64", always_2d=True)
channels = samples.shape[1]
# Mix to mono (same as embedding)
if channels > 1:
mono_samples = np.mean(samples, axis=1)
else:
mono_samples = samples[:, 0].copy()
total_samples = len(mono_samples)
debug.print(f"Stego audio: {sample_rate} Hz, {channels} ch, {total_samples} samples")
# 2. Extract header bits: 16 bytes = 128 bits
header_bits_needed = _HEADER_SIZE * 8
header_samples_needed = header_bits_needed * AUDIO_SS_CHIP_LENGTH
if header_samples_needed > total_samples:
debug.print("Audio too short to contain spread spectrum header")
return None
_write_progress(progress_file, 0, header_bits_needed, "extracting header")
header_bits = _extract_spread_spectrum(
mono_samples,
header_bits_needed,
seed,
offset=0,
progress_file=None, # don't spam progress for header
)
if len(header_bits) < header_bits_needed:
debug.print(
f"Could not extract enough header bits: {len(header_bits)}/{header_bits_needed}"
)
return None
header_bytes = _bits_to_bytes(header_bits)
# 3. Parse and validate header
magic_valid, data_length = _parse_header(header_bytes)
if not magic_valid:
debug.print("Spread spectrum magic not found -- wrong key or no embedded data")
return None
if data_length is None:
debug.print("Could not determine payload length (majority vote failed)")
return None
debug.print(f"Header valid: magic=AUDS, payload_length={data_length}")
# Sanity check the length
max_payload = (total_samples // AUDIO_SS_CHIP_LENGTH) // 8 - _HEADER_SIZE
if data_length < 1 or data_length > max_payload:
debug.print(f"Invalid payload length {data_length} (max possible: {max_payload})")
return None
# 4. Calculate total bits for RS-encoded data
# RS adds AUDIO_SS_RS_NSYM parity bytes per (255 - RS_NSYM) data bytes
if HAS_REEDSOLO and AUDIO_SS_RS_NSYM > 0:
# RSCodec encodes in blocks: each block has 255 bytes (data + parity)
# For input of N bytes, output is N + ceil(N / (255 - RS_NSYM)) * RS_NSYM
data_block_size = 255 - AUDIO_SS_RS_NSYM
num_blocks = (data_length + data_block_size - 1) // data_block_size
rs_encoded_size = data_length + num_blocks * AUDIO_SS_RS_NSYM
else:
rs_encoded_size = data_length
total_payload_bytes = _HEADER_SIZE + rs_encoded_size
total_bits_needed = total_payload_bytes * 8
total_samples_needed = total_bits_needed * AUDIO_SS_CHIP_LENGTH
if total_samples_needed > total_samples:
debug.print(
f"Need {total_samples_needed} samples for full extraction "
f"but only have {total_samples}"
)
return None
debug.print(
f"Extracting {total_bits_needed} bits "
f"({_HEADER_SIZE}B header + {rs_encoded_size}B RS payload)"
)
# 5. Extract all bits (including header again -- simpler and no perf issue)
_write_progress(progress_file, 0, total_bits_needed, "extracting")
all_bits = _extract_spread_spectrum(
mono_samples,
total_bits_needed,
seed,
offset=0,
progress_file=progress_file,
)
if len(all_bits) < total_bits_needed:
debug.print(f"Short extraction: {len(all_bits)}/{total_bits_needed} bits")
return None
_write_progress(progress_file, total_bits_needed, total_bits_needed, "decoding")
# 6. Convert bits to bytes, skip header, get RS payload
all_bytes = _bits_to_bytes(all_bits)
rs_payload = all_bytes[_HEADER_SIZE : _HEADER_SIZE + rs_encoded_size]
if len(rs_payload) < rs_encoded_size:
debug.print(f"RS payload too short: {len(rs_payload)}/{rs_encoded_size} bytes")
return None
# 7. RS-decode
decoded = _rs_decode(rs_payload)
if decoded is None:
debug.print("Reed-Solomon decoding failed -- data too corrupted")
return None
# 8. Verify decoded length matches header
if len(decoded) < data_length:
debug.print(f"Decoded data shorter than expected: {len(decoded)}/{data_length}")
return None
payload = decoded[:data_length]
debug.print(f"Spread spectrum extraction successful: {len(payload)} bytes")
return payload
except Exception as e:
debug.exception(e, "extract_from_audio_spread")
return None

View File

@@ -14,8 +14,10 @@ import io
from PIL import Image
from .constants import (
ALLOWED_AUDIO_EXTENSIONS,
ALLOWED_IMAGE_EXTENSIONS,
ALLOWED_KEY_EXTENSIONS,
EMBED_MODE_AUDIO_AUTO,
EMBED_MODE_AUTO,
EMBED_MODE_DCT,
EMBED_MODE_LSB,
@@ -29,8 +31,10 @@ from .constants import (
MIN_PIN_LENGTH,
MIN_RSA_BITS,
RECOMMENDED_PASSPHRASE_WORDS,
VALID_AUDIO_EMBED_MODES,
)
from .exceptions import (
AudioValidationError,
ImageValidationError,
KeyValidationError,
MessageValidationError,
@@ -475,3 +479,33 @@ def require_security_factors(pin: str, rsa_key_data: bytes | None) -> None:
result = validate_security_factors(pin, rsa_key_data)
if not result.is_valid:
raise SecurityFactorError(result.error_message)
# =============================================================================
# AUDIO VALIDATORS (v4.3.0)
# =============================================================================
def validate_audio_file(filename: str) -> ValidationResult:
"""Validate audio file extension."""
return validate_file_extension(filename, ALLOWED_AUDIO_EXTENSIONS, "Audio file")
def validate_audio_embed_mode(mode: str) -> ValidationResult:
"""Validate audio embedding mode."""
valid_modes = VALID_AUDIO_EMBED_MODES | {EMBED_MODE_AUDIO_AUTO}
if mode not in valid_modes:
return ValidationResult.error(
f"Invalid audio embed_mode: '{mode}'. "
f"Valid options: {', '.join(sorted(valid_modes))}"
)
return ValidationResult.ok(mode=mode)
def require_valid_audio(audio_data: bytes, name: str = "Audio") -> None:
"""Validate audio, raising AudioValidationError on failure."""
from .audio_utils import validate_audio
result = validate_audio(audio_data, name)
if not result.is_valid:
raise AudioValidationError(result.error_message)

448
tests/test_audio.py Normal file
View File

@@ -0,0 +1,448 @@
"""
Tests for Stegasoo audio steganography.
Tests cover:
- Audio LSB roundtrip (encode + decode)
- Audio MDCT roundtrip (encode + decode)
- Wrong credentials fail to decode
- Capacity calculations
- Format detection
- Audio validation
"""
import io
import numpy as np
import pytest
import soundfile as sf
from stegasoo.constants import (
EMBED_MODE_AUDIO_LSB,
EMBED_MODE_AUDIO_SPREAD,
)
from stegasoo.models import AudioCapacityInfo, AudioEmbedStats, AudioInfo
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def carrier_wav() -> bytes:
"""Generate a small test WAV file (1 second, 44100 Hz, mono, 16-bit)."""
sample_rate = 44100
duration = 1.0
num_samples = int(sample_rate * duration)
# Generate a simple sine wave
t = np.linspace(0, duration, num_samples, endpoint=False)
samples = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16)
buf = io.BytesIO()
sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16")
buf.seek(0)
return buf.read()
@pytest.fixture
def carrier_wav_stereo() -> bytes:
"""Generate a stereo test WAV file."""
sample_rate = 44100
duration = 1.0
num_samples = int(sample_rate * duration)
t = np.linspace(0, duration, num_samples, endpoint=False)
left = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16)
right = (np.sin(2 * np.pi * 880 * t) * 16000).astype(np.int16)
samples = np.column_stack([left, right])
buf = io.BytesIO()
sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16")
buf.seek(0)
return buf.read()
@pytest.fixture
def carrier_wav_long() -> bytes:
"""Generate a longer WAV (15 seconds) for spread spectrum tests."""
sample_rate = 44100
duration = 15.0
num_samples = int(sample_rate * duration)
t = np.linspace(0, duration, num_samples, endpoint=False)
# Mix of frequencies for better MDCT embedding
samples = (
(np.sin(2 * np.pi * 440 * t) + np.sin(2 * np.pi * 880 * t) + np.sin(2 * np.pi * 1320 * t))
* 5000
).astype(np.int16)
buf = io.BytesIO()
sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16")
buf.seek(0)
return buf.read()
@pytest.fixture
def carrier_wav_spread_integration() -> bytes:
"""Generate a very long WAV (150 seconds) for spread spectrum integration tests.
Spread spectrum needs 1024 samples per bit. With encryption + RS overhead (~690 bytes),
we need at least 690*8*1024 = 5.7M samples ~ 130 seconds at 44.1kHz.
"""
sample_rate = 44100
duration = 150.0
num_samples = int(sample_rate * duration)
t = np.linspace(0, duration, num_samples, endpoint=False)
samples = (
(np.sin(2 * np.pi * 440 * t) + np.sin(2 * np.pi * 880 * t) + np.sin(2 * np.pi * 1320 * t))
* 5000
).astype(np.int16)
buf = io.BytesIO()
sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16")
buf.seek(0)
return buf.read()
@pytest.fixture
def reference_photo() -> bytes:
"""Generate a small reference photo (PNG)."""
from PIL import Image
img = Image.new("RGB", (100, 100), color=(128, 64, 32))
buf = io.BytesIO()
img.save(buf, "PNG")
buf.seek(0)
return buf.read()
# =============================================================================
# AUDIO LSB TESTS
# =============================================================================
class TestAudioLSB:
"""Tests for audio LSB steganography."""
def test_calculate_capacity(self, carrier_wav):
from stegasoo.audio_steganography import calculate_audio_lsb_capacity
capacity = calculate_audio_lsb_capacity(carrier_wav)
assert capacity > 0
# 1 second at 44100 Hz mono should give ~5KB capacity at 1 bit/sample
assert capacity > 4000
def test_embed_extract_roundtrip(self, carrier_wav):
"""Test basic LSB embed/extract roundtrip."""
from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb
payload = b"Hello, audio steganography!"
# Prepend with magic header to simulate real usage pattern
key = b"\x42" * 32
stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav, key)
assert isinstance(stats, AudioEmbedStats)
assert stats.embed_mode == EMBED_MODE_AUDIO_LSB
assert stats.bytes_embedded > 0
assert stats.samples_modified > 0
assert 0 < stats.capacity_used <= 1.0
# Extract
extracted = extract_from_audio_lsb(stego_audio, key)
assert extracted is not None
assert extracted == payload
def test_embed_extract_stereo(self, carrier_wav_stereo):
"""Test LSB roundtrip with stereo audio."""
from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb
payload = b"Stereo test message"
key = b"\xAB" * 32
stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav_stereo, key)
assert stats.channels == 2
extracted = extract_from_audio_lsb(stego_audio, key)
assert extracted == payload
def test_wrong_key_fails(self, carrier_wav):
"""Test that wrong key produces no valid extraction."""
from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb
payload = b"Secret message"
correct_key = b"\x42" * 32
wrong_key = b"\xFF" * 32
stego_audio, _ = embed_in_audio_lsb(payload, carrier_wav, correct_key)
extracted = extract_from_audio_lsb(stego_audio, wrong_key)
# Should return None or garbage (not the original message)
assert extracted is None or extracted != payload
def test_two_bits_per_sample(self, carrier_wav):
"""Test embedding with 2 bits per sample."""
from stegasoo.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb
payload = b"Two bits per sample test"
key = b"\x55" * 32
stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav, key, bits_per_sample=2)
extracted = extract_from_audio_lsb(stego_audio, key, bits_per_sample=2)
assert extracted == payload
def test_generate_sample_indices(self):
"""Test deterministic sample index generation."""
from stegasoo.audio_steganography import generate_sample_indices
key = b"\x42" * 32
indices1 = generate_sample_indices(key, 10000, 100)
indices2 = generate_sample_indices(key, 10000, 100)
# Same key should produce same indices
assert indices1 == indices2
# All indices should be valid
assert all(0 <= i < 10000 for i in indices1)
# No duplicates
assert len(set(indices1)) == len(indices1)
# =============================================================================
# AUDIO SPREAD SPECTRUM TESTS
# =============================================================================
class TestAudioSpread:
"""Tests for audio spread spectrum steganography."""
def test_calculate_capacity(self, carrier_wav_long):
from stegasoo.spread_steganography import calculate_audio_spread_capacity
capacity = calculate_audio_spread_capacity(carrier_wav_long)
assert isinstance(capacity, AudioCapacityInfo)
assert capacity.usable_capacity_bytes > 0
assert capacity.embed_mode == EMBED_MODE_AUDIO_SPREAD
def test_spread_roundtrip(self, carrier_wav_long):
"""Test spread spectrum embed/extract roundtrip."""
from stegasoo.spread_steganography import (
embed_in_audio_spread,
extract_from_audio_spread,
)
payload = b"Spread test"
seed = b"\x42" * 32
stego_audio, stats = embed_in_audio_spread(payload, carrier_wav_long, seed)
assert isinstance(stats, AudioEmbedStats)
assert stats.embed_mode == EMBED_MODE_AUDIO_SPREAD
extracted = extract_from_audio_spread(stego_audio, seed)
assert extracted is not None
assert extracted == payload
def test_wrong_seed_fails(self, carrier_wav_long):
"""Test that wrong seed produces no valid extraction."""
from stegasoo.spread_steganography import (
embed_in_audio_spread,
extract_from_audio_spread,
)
payload = b"Secret spread"
correct_seed = b"\x42" * 32
wrong_seed = b"\xFF" * 32
stego_audio, _ = embed_in_audio_spread(payload, carrier_wav_long, correct_seed)
extracted = extract_from_audio_spread(stego_audio, wrong_seed)
assert extracted is None or extracted != payload
# =============================================================================
# FORMAT DETECTION TESTS
# =============================================================================
class TestFormatDetection:
"""Tests for audio format detection."""
def test_detect_wav(self, carrier_wav):
from stegasoo.audio_utils import detect_audio_format
assert detect_audio_format(carrier_wav) == "wav"
def test_detect_unknown(self):
from stegasoo.audio_utils import detect_audio_format
assert detect_audio_format(b"not audio data") == "unknown"
def test_detect_empty(self):
from stegasoo.audio_utils import detect_audio_format
assert detect_audio_format(b"") == "unknown"
# =============================================================================
# AUDIO INFO TESTS
# =============================================================================
class TestAudioInfo:
"""Tests for audio info extraction."""
def test_get_wav_info(self, carrier_wav):
from stegasoo.audio_utils import get_audio_info
info = get_audio_info(carrier_wav)
assert isinstance(info, AudioInfo)
assert info.sample_rate == 44100
assert info.channels == 1
assert info.format == "wav"
assert abs(info.duration_seconds - 1.0) < 0.1
def test_get_stereo_info(self, carrier_wav_stereo):
from stegasoo.audio_utils import get_audio_info
info = get_audio_info(carrier_wav_stereo)
assert info.channels == 2
# =============================================================================
# VALIDATION TESTS
# =============================================================================
class TestAudioValidation:
"""Tests for audio validation."""
def test_validate_valid_audio(self, carrier_wav):
from stegasoo.audio_utils import validate_audio
result = validate_audio(carrier_wav)
assert result.is_valid
def test_validate_empty_audio(self):
from stegasoo.audio_utils import validate_audio
result = validate_audio(b"")
assert not result.is_valid
def test_validate_invalid_audio(self):
from stegasoo.audio_utils import validate_audio
result = validate_audio(b"not audio data at all")
assert not result.is_valid
def test_validate_audio_embed_mode(self):
from stegasoo.validation import validate_audio_embed_mode
assert validate_audio_embed_mode("audio_lsb").is_valid
assert validate_audio_embed_mode("audio_spread").is_valid
assert validate_audio_embed_mode("audio_auto").is_valid
assert not validate_audio_embed_mode("invalid").is_valid
# =============================================================================
# INTEGRATION TESTS
# =============================================================================
class TestIntegration:
"""End-to-end integration tests using encode_audio/decode_audio."""
def test_lsb_encode_decode(self, carrier_wav, reference_photo):
from stegasoo.decode import decode_audio
from stegasoo.encode import encode_audio
stego_audio, stats = encode_audio(
message="Hello from audio steganography!",
reference_photo=reference_photo,
carrier_audio=carrier_wav,
passphrase="test words here now",
pin="123456",
embed_mode="audio_lsb",
)
assert len(stego_audio) > 0
result = decode_audio(
stego_audio=stego_audio,
reference_photo=reference_photo,
passphrase="test words here now",
pin="123456",
embed_mode="audio_lsb",
)
assert result.is_text
assert result.message == "Hello from audio steganography!"
def test_lsb_wrong_credentials(self, carrier_wav, reference_photo):
from stegasoo.decode import decode_audio
from stegasoo.encode import encode_audio
stego_audio, _ = encode_audio(
message="Secret",
reference_photo=reference_photo,
carrier_audio=carrier_wav,
passphrase="correct horse battery staple",
pin="123456",
embed_mode="audio_lsb",
)
with pytest.raises(Exception):
decode_audio(
stego_audio=stego_audio,
reference_photo=reference_photo,
passphrase="wrong passphrase words here",
pin="654321",
embed_mode="audio_lsb",
)
def test_spread_encode_decode(self, carrier_wav_spread_integration, reference_photo):
"""Test full spread spectrum encode/decode pipeline."""
from stegasoo.decode import decode_audio
from stegasoo.encode import encode_audio
stego_audio, stats = encode_audio(
message="Spread integration test",
reference_photo=reference_photo,
carrier_audio=carrier_wav_spread_integration,
passphrase="test words here now",
pin="123456",
embed_mode="audio_spread",
)
result = decode_audio(
stego_audio=stego_audio,
reference_photo=reference_photo,
passphrase="test words here now",
pin="123456",
embed_mode="audio_spread",
)
assert result.message == "Spread integration test"
def test_auto_detect_lsb(self, carrier_wav, reference_photo):
"""Test auto-detection finds LSB encoded audio."""
from stegasoo.decode import decode_audio
from stegasoo.encode import encode_audio
stego_audio, _ = encode_audio(
message="Auto-detect test",
reference_photo=reference_photo,
carrier_audio=carrier_wav,
passphrase="test words here now",
pin="123456",
embed_mode="audio_lsb",
)
result = decode_audio(
stego_audio=stego_audio,
reference_photo=reference_photo,
passphrase="test words here now",
pin="123456",
embed_mode="audio_auto",
)
assert result.message == "Auto-detect test"