""" Tests for Stego audio steganography. Tests cover: - Audio LSB roundtrip (encode + decode) - Audio spread spectrum roundtrip (v0 legacy + v2 per-channel) - Wrong credentials fail to decode - Capacity calculations (per-tier) - Format detection - Audio validation - Per-channel stereo/multichannel embedding (v4.4.0) - Chip tier roundtrips (v4.4.0) - LFE channel skipping (v4.4.0) - Backward compat: v0 decode from v2 code - Header v2 build/parse roundtrip - Round-robin bit distribution """ import io from pathlib import Path import numpy as np import pytest import soundfile as sf from fieldwitness.stego.constants import AUDIO_ENABLED, EMBED_MODE_AUDIO_LSB, EMBED_MODE_AUDIO_SPREAD from fieldwitness.stego.models import AudioCapacityInfo, AudioEmbedStats, AudioInfo pytestmark = pytest.mark.skipif(not AUDIO_ENABLED, reason="Audio support disabled (FIELDWITNESS_AUDIO)") # Path to real test data files _TEST_DATA = Path(__file__).parent.parent / "test_data" _REFERENCE_PNG = _TEST_DATA / "reference.png" _SPEECH_WAV = _TEST_DATA / "stupid_elitist_speech.wav" # ============================================================================= # FIXTURES # ============================================================================= @pytest.fixture def carrier_wav() -> bytes: """Generate a small test WAV file (1 second, 44100 Hz, mono, 16-bit).""" sample_rate = 44100 duration = 1.0 num_samples = int(sample_rate * duration) t = np.linspace(0, duration, num_samples, endpoint=False) samples = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16) buf = io.BytesIO() sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") buf.seek(0) return buf.read() @pytest.fixture def carrier_wav_stereo() -> bytes: """Generate a stereo test WAV file (5 seconds for spread spectrum capacity).""" sample_rate = 44100 duration = 5.0 num_samples = int(sample_rate * duration) t = np.linspace(0, duration, num_samples, endpoint=False) left = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16) right = (np.sin(2 * np.pi * 880 * t) * 16000).astype(np.int16) samples = np.column_stack([left, right]) buf = io.BytesIO() sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") buf.seek(0) return buf.read() @pytest.fixture def carrier_wav_long() -> bytes: """Generate a longer WAV (15 seconds) for spread spectrum tests.""" sample_rate = 44100 duration = 15.0 num_samples = int(sample_rate * duration) t = np.linspace(0, duration, num_samples, endpoint=False) samples = ( (np.sin(2 * np.pi * 440 * t) + np.sin(2 * np.pi * 880 * t) + np.sin(2 * np.pi * 1320 * t)) * 5000 ).astype(np.int16) buf = io.BytesIO() sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") buf.seek(0) return buf.read() @pytest.fixture def carrier_wav_stereo_long() -> bytes: """Generate a stereo WAV (15 seconds) for per-channel spread tests.""" sample_rate = 48000 duration = 15.0 num_samples = int(sample_rate * duration) t = np.linspace(0, duration, num_samples, endpoint=False) left = (np.sin(2 * np.pi * 440 * t) * 10000).astype(np.float64) / 32768.0 right = (np.sin(2 * np.pi * 660 * t) * 10000).astype(np.float64) / 32768.0 samples = np.column_stack([left, right]) buf = io.BytesIO() sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") buf.seek(0) return buf.read() @pytest.fixture def carrier_wav_5_1() -> bytes: """Generate a 6-channel (5.1) WAV for LFE skip tests.""" sample_rate = 48000 duration = 15.0 num_samples = int(sample_rate * duration) t = np.linspace(0, duration, num_samples, endpoint=False) # 6 channels with different frequencies freqs = [440, 554, 660, 80, 880, 1100] # ch3 = LFE (low freq) channels = [] for freq in freqs: ch = (np.sin(2 * np.pi * freq * t) * 8000).astype(np.float64) / 32768.0 channels.append(ch) samples = np.column_stack(channels) buf = io.BytesIO() sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") buf.seek(0) return buf.read() @pytest.fixture def carrier_wav_spread_integration() -> bytes: """Generate a very long WAV (150 seconds) for spread spectrum integration tests.""" sample_rate = 44100 duration = 150.0 num_samples = int(sample_rate * duration) t = np.linspace(0, duration, num_samples, endpoint=False) samples = ( (np.sin(2 * np.pi * 440 * t) + np.sin(2 * np.pi * 880 * t) + np.sin(2 * np.pi * 1320 * t)) * 5000 ).astype(np.int16) buf = io.BytesIO() sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") buf.seek(0) return buf.read() @pytest.fixture def reference_photo() -> bytes: """Load real reference photo from test_data, or generate a small one.""" if _REFERENCE_PNG.exists(): return _REFERENCE_PNG.read_bytes() from PIL import Image img = Image.new("RGB", (100, 100), color=(128, 64, 32)) buf = io.BytesIO() img.save(buf, "PNG") buf.seek(0) return buf.read() @pytest.fixture def speech_wav() -> bytes: """Load real speech WAV from test_data (48kHz mono, ~68s).""" if not _SPEECH_WAV.exists(): pytest.skip("test_data/stupid_elitist_speech.wav not found") return _SPEECH_WAV.read_bytes() # ============================================================================= # AUDIO LSB TESTS # ============================================================================= class TestAudioLSB: """Tests for audio LSB steganography.""" def test_calculate_capacity(self, carrier_wav): from fieldwitness.stego.audio_steganography import calculate_audio_lsb_capacity capacity = calculate_audio_lsb_capacity(carrier_wav) assert capacity > 0 # 1 second at 44100 Hz mono should give ~5KB capacity at 1 bit/sample assert capacity > 4000 def test_embed_extract_roundtrip(self, carrier_wav): """Test basic LSB embed/extract roundtrip.""" from fieldwitness.stego.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb payload = b"Hello, audio steganography!" key = b"\x42" * 32 stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav, key) assert isinstance(stats, AudioEmbedStats) assert stats.embed_mode == EMBED_MODE_AUDIO_LSB assert stats.bytes_embedded > 0 assert stats.samples_modified > 0 assert 0 < stats.capacity_used <= 1.0 extracted = extract_from_audio_lsb(stego_audio, key) assert extracted is not None assert extracted == payload def test_embed_extract_stereo(self, carrier_wav_stereo): """Test LSB roundtrip with stereo audio.""" from fieldwitness.stego.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb payload = b"Stereo test message" key = b"\xAB" * 32 stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav_stereo, key) assert stats.channels == 2 extracted = extract_from_audio_lsb(stego_audio, key) assert extracted == payload def test_wrong_key_fails(self, carrier_wav): """Test that wrong key produces no valid extraction.""" from fieldwitness.stego.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb payload = b"Secret message" correct_key = b"\x42" * 32 wrong_key = b"\xFF" * 32 stego_audio, _ = embed_in_audio_lsb(payload, carrier_wav, correct_key) extracted = extract_from_audio_lsb(stego_audio, wrong_key) assert extracted is None or extracted != payload def test_two_bits_per_sample(self, carrier_wav): """Test embedding with 2 bits per sample.""" from fieldwitness.stego.audio_steganography import embed_in_audio_lsb, extract_from_audio_lsb payload = b"Two bits per sample test" key = b"\x55" * 32 stego_audio, stats = embed_in_audio_lsb(payload, carrier_wav, key, bits_per_sample=2) extracted = extract_from_audio_lsb(stego_audio, key, bits_per_sample=2) assert extracted == payload def test_generate_sample_indices(self): """Test deterministic sample index generation.""" from fieldwitness.stego.audio_steganography import generate_sample_indices key = b"\x42" * 32 indices1 = generate_sample_indices(key, 10000, 100) indices2 = generate_sample_indices(key, 10000, 100) assert indices1 == indices2 assert all(0 <= i < 10000 for i in indices1) assert len(set(indices1)) == len(indices1) # ============================================================================= # AUDIO SPREAD SPECTRUM TESTS (v2 per-channel) # ============================================================================= class TestAudioSpread: """Tests for audio spread spectrum steganography (v2 per-channel).""" def test_calculate_capacity_default_tier(self, carrier_wav_long): from fieldwitness.stego.spread_steganography import calculate_audio_spread_capacity capacity = calculate_audio_spread_capacity(carrier_wav_long) assert isinstance(capacity, AudioCapacityInfo) assert capacity.usable_capacity_bytes > 0 assert capacity.embed_mode == EMBED_MODE_AUDIO_SPREAD assert capacity.chip_tier == 2 # default assert capacity.chip_length == 1024 def test_calculate_capacity_per_tier(self, carrier_wav_long): """Capacity should increase as chip length decreases.""" from fieldwitness.stego.spread_steganography import calculate_audio_spread_capacity cap_lossless = calculate_audio_spread_capacity(carrier_wav_long, chip_tier=0) cap_high = calculate_audio_spread_capacity(carrier_wav_long, chip_tier=1) cap_low = calculate_audio_spread_capacity(carrier_wav_long, chip_tier=2) assert cap_lossless.chip_length == 256 assert cap_high.chip_length == 512 assert cap_low.chip_length == 1024 # Smaller chip = more capacity assert cap_lossless.usable_capacity_bytes > cap_high.usable_capacity_bytes assert cap_high.usable_capacity_bytes > cap_low.usable_capacity_bytes def test_spread_roundtrip_default_tier(self, carrier_wav_long): """Test spread spectrum embed/extract roundtrip (default tier 2).""" from fieldwitness.stego.spread_steganography import ( embed_in_audio_spread, extract_from_audio_spread, ) payload = b"Spread test v2" seed = b"\x42" * 32 stego_audio, stats = embed_in_audio_spread(payload, carrier_wav_long, seed) assert isinstance(stats, AudioEmbedStats) assert stats.embed_mode == EMBED_MODE_AUDIO_SPREAD assert stats.chip_tier == 2 assert stats.chip_length == 1024 extracted = extract_from_audio_spread(stego_audio, seed) assert extracted is not None assert extracted == payload def test_spread_roundtrip_tier_0(self, carrier_wav_long): """Test spread spectrum at tier 0 (chip=256, lossless).""" from fieldwitness.stego.spread_steganography import ( embed_in_audio_spread, extract_from_audio_spread, ) payload = b"Lossless tier test with more data to embed for coverage" seed = b"\x42" * 32 stego_audio, stats = embed_in_audio_spread(payload, carrier_wav_long, seed, chip_tier=0) assert stats.chip_tier == 0 assert stats.chip_length == 256 extracted = extract_from_audio_spread(stego_audio, seed) assert extracted is not None assert extracted == payload def test_spread_roundtrip_tier_1(self, carrier_wav_long): """Test spread spectrum at tier 1 (chip=512, high lossy).""" from fieldwitness.stego.spread_steganography import ( embed_in_audio_spread, extract_from_audio_spread, ) payload = b"High lossy tier test" seed = b"\x42" * 32 stego_audio, stats = embed_in_audio_spread(payload, carrier_wav_long, seed, chip_tier=1) assert stats.chip_tier == 1 assert stats.chip_length == 512 extracted = extract_from_audio_spread(stego_audio, seed) assert extracted is not None assert extracted == payload def test_wrong_seed_fails(self, carrier_wav_long): """Test that wrong seed produces no valid extraction.""" from fieldwitness.stego.spread_steganography import ( embed_in_audio_spread, extract_from_audio_spread, ) payload = b"Secret spread" correct_seed = b"\x42" * 32 wrong_seed = b"\xFF" * 32 stego_audio, _ = embed_in_audio_spread(payload, carrier_wav_long, correct_seed) extracted = extract_from_audio_spread(stego_audio, wrong_seed) assert extracted is None or extracted != payload def test_per_channel_stereo_roundtrip(self, carrier_wav_stereo_long): """Test that stereo per-channel embedding/extraction works.""" from fieldwitness.stego.spread_steganography import ( embed_in_audio_spread, extract_from_audio_spread, ) payload = b"Stereo per-channel test" seed = b"\xAB" * 32 stego_audio, stats = embed_in_audio_spread( payload, carrier_wav_stereo_long, seed, chip_tier=0 ) assert stats.channels == 2 assert stats.embeddable_channels == 2 extracted = extract_from_audio_spread(stego_audio, seed) assert extracted is not None assert extracted == payload def test_per_channel_preserves_spatial_mix(self, carrier_wav_stereo_long): """Verify that per-channel embedding doesn't destroy the spatial mix. The difference between left and right channels should be preserved (not zeroed out as the old mono-broadcast approach would do). """ from fieldwitness.stego.spread_steganography import embed_in_audio_spread payload = b"Spatial preservation test" seed = b"\xCD" * 32 # Read original orig_samples, _ = sf.read(io.BytesIO(carrier_wav_stereo_long), dtype="float64", always_2d=True) orig_diff = orig_samples[:, 0] - orig_samples[:, 1] # Embed stego_bytes, _ = embed_in_audio_spread( payload, carrier_wav_stereo_long, seed, chip_tier=0 ) # Read stego stego_samples, _ = sf.read(io.BytesIO(stego_bytes), dtype="float64", always_2d=True) stego_diff = stego_samples[:, 0] - stego_samples[:, 1] # The channel difference should not be identical (embedding adds different # noise per channel), but should be very close (embedding is subtle) # With the old mono-broadcast approach, stego_diff would equal orig_diff # exactly in unmodified regions but differ where data was embedded. # With per-channel, both channels get independent modifications. correlation = np.corrcoef(orig_diff, stego_diff)[0, 1] assert correlation > 0.95, f"Spatial mix correlation too low: {correlation}" def test_capacity_scales_with_channels(self, carrier_wav_long, carrier_wav_stereo_long): """Stereo should have roughly double the capacity of mono.""" from fieldwitness.stego.spread_steganography import calculate_audio_spread_capacity mono_cap = calculate_audio_spread_capacity(carrier_wav_long, chip_tier=0) stereo_cap = calculate_audio_spread_capacity(carrier_wav_stereo_long, chip_tier=0) # Stereo should be ~1.5-2.2x mono (not exact because header is ch0 only # and the files have slightly different durations/sample rates) ratio = stereo_cap.usable_capacity_bytes / mono_cap.usable_capacity_bytes assert ratio > 1.3, f"Stereo/mono capacity ratio too low: {ratio}" def test_lfe_skip_5_1(self, carrier_wav_5_1): """LFE channel (index 3) should be unmodified in 6-channel audio.""" from fieldwitness.stego.spread_steganography import embed_in_audio_spread payload = b"LFE skip test" seed = b"\xEE" * 32 # Read original LFE channel orig_samples, _ = sf.read(io.BytesIO(carrier_wav_5_1), dtype="float64", always_2d=True) orig_lfe = orig_samples[:, 3].copy() stego_bytes, stats = embed_in_audio_spread( payload, carrier_wav_5_1, seed, chip_tier=0 ) assert stats.embeddable_channels == 5 # 6 channels - 1 LFE = 5 stego_samples, _ = sf.read(io.BytesIO(stego_bytes), dtype="float64", always_2d=True) stego_lfe = stego_samples[:, 3] # LFE channel should be completely unmodified np.testing.assert_array_equal(orig_lfe, stego_lfe) def test_lfe_skip_roundtrip(self, carrier_wav_5_1): """5.1 audio embed/extract roundtrip with LFE skipping.""" from fieldwitness.stego.spread_steganography import ( embed_in_audio_spread, extract_from_audio_spread, ) payload = b"5.1 surround test" seed = b"\xEE" * 32 stego_bytes, stats = embed_in_audio_spread( payload, carrier_wav_5_1, seed, chip_tier=0 ) assert stats.channels == 6 assert stats.embeddable_channels == 5 extracted = extract_from_audio_spread(stego_bytes, seed) assert extracted is not None assert extracted == payload # ============================================================================= # HEADER V2 TESTS # ============================================================================= class TestHeaderV2: """Tests for v2 header construction and parsing.""" def test_header_v2_build_parse_roundtrip(self): from fieldwitness.stego.spread_steganography import _build_header_v2, _parse_header data_length = 12345 chip_tier = 1 num_ch = 2 lfe_skipped = False header = _build_header_v2(data_length, chip_tier, num_ch, lfe_skipped) assert len(header) == 20 magic_valid, version, length, tier, nch, lfe = _parse_header(header) assert magic_valid assert version == 2 assert length == data_length assert tier == chip_tier assert nch == num_ch assert lfe is False def test_header_v2_with_lfe_flag(self): from fieldwitness.stego.spread_steganography import _build_header_v2, _parse_header header = _build_header_v2(999, 0, 5, lfe_skipped=True) magic_valid, version, length, tier, nch, lfe = _parse_header(header) assert magic_valid assert version == 2 assert length == 999 assert tier == 0 assert nch == 5 assert lfe is True def test_header_v0_build_parse(self): from fieldwitness.stego.spread_steganography import _build_header_v0, _parse_header header = _build_header_v0(4567) assert len(header) == 16 magic_valid, version, length, tier, nch, lfe = _parse_header(header) assert magic_valid assert version == 0 assert length == 4567 assert tier is None assert nch is None def test_header_bad_magic(self): from fieldwitness.stego.spread_steganography import _parse_header bad_header = b"XXXX" + b"\x00" * 16 magic_valid, version, length, tier, nch, lfe = _parse_header(bad_header) assert not magic_valid # ============================================================================= # ROUND-ROBIN BIT DISTRIBUTION TESTS # ============================================================================= class TestRoundRobin: """Tests for round-robin bit distribution.""" def test_distribute_and_collect_identity(self): from fieldwitness.stego.spread_steganography import ( _collect_bits_round_robin, _distribute_bits_round_robin, ) bits = [1, 0, 1, 1, 0, 0, 1, 0, 1, 1] for num_ch in [1, 2, 3, 4, 5]: per_ch = _distribute_bits_round_robin(bits, num_ch) assert len(per_ch) == num_ch reassembled = _collect_bits_round_robin(per_ch) assert reassembled == bits, f"Failed for {num_ch} channels" def test_distribute_round_robin_ordering(self): from fieldwitness.stego.spread_steganography import _distribute_bits_round_robin bits = [0, 1, 2, 3, 4, 5] # using ints for clarity per_ch = _distribute_bits_round_robin(bits, 3) # ch0: bits 0, 3 ch1: bits 1, 4 ch2: bits 2, 5 assert per_ch[0] == [0, 3] assert per_ch[1] == [1, 4] assert per_ch[2] == [2, 5] def test_distribute_uneven(self): from fieldwitness.stego.spread_steganography import ( _collect_bits_round_robin, _distribute_bits_round_robin, ) bits = [0, 1, 2, 3, 4] # 5 bits across 3 channels per_ch = _distribute_bits_round_robin(bits, 3) assert per_ch[0] == [0, 3] assert per_ch[1] == [1, 4] assert per_ch[2] == [2] reassembled = _collect_bits_round_robin(per_ch) assert reassembled == bits # ============================================================================= # CHANNEL MANAGEMENT TESTS # ============================================================================= class TestChannelManagement: """Tests for embeddable channel selection.""" def test_mono(self): from fieldwitness.stego.spread_steganography import _embeddable_channels assert _embeddable_channels(1) == [0] def test_stereo(self): from fieldwitness.stego.spread_steganography import _embeddable_channels assert _embeddable_channels(2) == [0, 1] def test_5_1_skips_lfe(self): from fieldwitness.stego.spread_steganography import _embeddable_channels channels = _embeddable_channels(6) assert channels == [0, 1, 2, 4, 5] assert 3 not in channels # LFE skipped def test_7_1_skips_lfe(self): from fieldwitness.stego.spread_steganography import _embeddable_channels channels = _embeddable_channels(8) assert 3 not in channels assert len(channels) == 7 def test_quad_no_skip(self): from fieldwitness.stego.spread_steganography import _embeddable_channels # 4 channels < 6, so no LFE skip assert _embeddable_channels(4) == [0, 1, 2, 3] # ============================================================================= # FORMAT DETECTION TESTS # ============================================================================= class TestFormatDetection: """Tests for audio format detection.""" def test_detect_wav(self, carrier_wav): from fieldwitness.stego.audio_utils import detect_audio_format assert detect_audio_format(carrier_wav) == "wav" def test_detect_unknown(self): from fieldwitness.stego.audio_utils import detect_audio_format assert detect_audio_format(b"not audio data") == "unknown" def test_detect_empty(self): from fieldwitness.stego.audio_utils import detect_audio_format assert detect_audio_format(b"") == "unknown" # ============================================================================= # AUDIO INFO TESTS # ============================================================================= class TestAudioInfo: """Tests for audio info extraction.""" def test_get_wav_info(self, carrier_wav): from fieldwitness.stego.audio_utils import get_audio_info info = get_audio_info(carrier_wav) assert isinstance(info, AudioInfo) assert info.sample_rate == 44100 assert info.channels == 1 assert info.format == "wav" assert abs(info.duration_seconds - 1.0) < 0.1 def test_get_stereo_info(self, carrier_wav_stereo): from fieldwitness.stego.audio_utils import get_audio_info info = get_audio_info(carrier_wav_stereo) assert info.channels == 2 # ============================================================================= # VALIDATION TESTS # ============================================================================= class TestAudioValidation: """Tests for audio validation.""" def test_validate_valid_audio(self, carrier_wav): from fieldwitness.stego.audio_utils import validate_audio result = validate_audio(carrier_wav) assert result.is_valid def test_validate_empty_audio(self): from fieldwitness.stego.audio_utils import validate_audio result = validate_audio(b"") assert not result.is_valid def test_validate_invalid_audio(self): from fieldwitness.stego.audio_utils import validate_audio result = validate_audio(b"not audio data at all") assert not result.is_valid def test_validate_audio_embed_mode(self): from fieldwitness.stego.validation import validate_audio_embed_mode assert validate_audio_embed_mode("audio_lsb").is_valid assert validate_audio_embed_mode("audio_spread").is_valid assert validate_audio_embed_mode("audio_auto").is_valid assert not validate_audio_embed_mode("invalid").is_valid # ============================================================================= # INTEGRATION TESTS # ============================================================================= class TestIntegration: """End-to-end integration tests using encode_audio/decode_audio.""" def test_lsb_encode_decode(self, carrier_wav, reference_photo): from fieldwitness.stego.decode import decode_audio from fieldwitness.stego.encode import encode_audio stego_audio, stats = encode_audio( message="Hello from audio steganography!", reference_photo=reference_photo, carrier_audio=carrier_wav, passphrase="test words here now", pin="123456", embed_mode="audio_lsb", ) assert len(stego_audio) > 0 result = decode_audio( stego_audio=stego_audio, reference_photo=reference_photo, passphrase="test words here now", pin="123456", embed_mode="audio_lsb", ) assert result.is_text assert result.message == "Hello from audio steganography!" def test_lsb_wrong_credentials(self, carrier_wav, reference_photo): from fieldwitness.stego.decode import decode_audio from fieldwitness.stego.encode import encode_audio stego_audio, _ = encode_audio( message="Secret", reference_photo=reference_photo, carrier_audio=carrier_wav, passphrase="correct horse battery staple", pin="123456", embed_mode="audio_lsb", ) with pytest.raises(Exception): decode_audio( stego_audio=stego_audio, reference_photo=reference_photo, passphrase="wrong passphrase words here", pin="654321", embed_mode="audio_lsb", ) def test_spread_encode_decode(self, carrier_wav_spread_integration, reference_photo): """Test full spread spectrum encode/decode pipeline.""" from fieldwitness.stego.decode import decode_audio from fieldwitness.stego.encode import encode_audio stego_audio, stats = encode_audio( message="Spread integration test", reference_photo=reference_photo, carrier_audio=carrier_wav_spread_integration, passphrase="test words here now", pin="123456", embed_mode="audio_spread", ) result = decode_audio( stego_audio=stego_audio, reference_photo=reference_photo, passphrase="test words here now", pin="123456", embed_mode="audio_spread", ) assert result.message == "Spread integration test" def test_spread_encode_decode_with_chip_tier( self, carrier_wav_spread_integration, reference_photo ): """Test spread spectrum with explicit chip tier.""" from fieldwitness.stego.decode import decode_audio from fieldwitness.stego.encode import encode_audio stego_audio, stats = encode_audio( message="Tier 0 integration", reference_photo=reference_photo, carrier_audio=carrier_wav_spread_integration, passphrase="test words here now", pin="123456", embed_mode="audio_spread", chip_tier=0, ) assert stats.chip_tier == 0 assert stats.chip_length == 256 result = decode_audio( stego_audio=stego_audio, reference_photo=reference_photo, passphrase="test words here now", pin="123456", embed_mode="audio_spread", ) assert result.message == "Tier 0 integration" def test_auto_detect_lsb(self, carrier_wav, reference_photo): """Test auto-detection finds LSB encoded audio.""" from fieldwitness.stego.decode import decode_audio from fieldwitness.stego.encode import encode_audio stego_audio, _ = encode_audio( message="Auto-detect test", reference_photo=reference_photo, carrier_audio=carrier_wav, passphrase="test words here now", pin="123456", embed_mode="audio_lsb", ) result = decode_audio( stego_audio=stego_audio, reference_photo=reference_photo, passphrase="test words here now", pin="123456", embed_mode="audio_auto", ) assert result.message == "Auto-detect test" def test_spread_with_real_speech(self, speech_wav, reference_photo): """Test spread spectrum with real speech audio from test_data.""" from fieldwitness.stego.decode import decode_audio from fieldwitness.stego.encode import encode_audio message = "Hidden in a speech about elitism" stego_audio, stats = encode_audio( message=message, reference_photo=reference_photo, carrier_audio=speech_wav, passphrase="test words here now", pin="123456", embed_mode="audio_spread", chip_tier=0, # lossless tier for max capacity ) assert stats.chip_tier == 0 result = decode_audio( stego_audio=stego_audio, reference_photo=reference_photo, passphrase="test words here now", pin="123456", embed_mode="audio_spread", ) assert result.message == message