Clean up repo structure
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -71,8 +71,8 @@ scripts/
|
||||
frontends/web/instance/
|
||||
frontends/web/certs/
|
||||
|
||||
# Integration/smoke tests (local tools)
|
||||
tests/smoke-test.sh
|
||||
# Tests (private)
|
||||
tests/
|
||||
|
||||
# RPi image build artifacts
|
||||
*.img
|
||||
|
||||
@@ -1,434 +0,0 @@
|
||||
"""
|
||||
Tests for Stegasoo batch processing module (v4.0.0).
|
||||
|
||||
Updated for v4.0.0:
|
||||
- Uses 'passphrase' instead of 'phrase' in credentials dict
|
||||
- No date_str parameter
|
||||
- BatchCredentials.passphrase is a single string
|
||||
"""
|
||||
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from stegasoo.batch import (
|
||||
BatchCredentials,
|
||||
BatchItem,
|
||||
BatchProcessor,
|
||||
BatchResult,
|
||||
BatchStatus,
|
||||
batch_capacity_check,
|
||||
print_batch_result,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_dir():
|
||||
"""Create a temporary directory for tests."""
|
||||
path = Path(tempfile.mkdtemp())
|
||||
yield path
|
||||
shutil.rmtree(path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_images(temp_dir):
|
||||
"""Create sample PNG images for testing."""
|
||||
from PIL import Image
|
||||
|
||||
images = []
|
||||
for i in range(3):
|
||||
img_path = temp_dir / f"test_image_{i}.png"
|
||||
img = Image.new("RGB", (100, 100), color=(i * 50, i * 50, i * 50))
|
||||
img.save(img_path, "PNG")
|
||||
images.append(img_path)
|
||||
|
||||
return images
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_reference_photo():
|
||||
"""Create a sample reference photo as bytes."""
|
||||
from io import BytesIO
|
||||
|
||||
from PIL import Image
|
||||
|
||||
img = Image.new("RGB", (100, 100), color=(128, 128, 128))
|
||||
buf = BytesIO()
|
||||
img.save(buf, "PNG")
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_credentials(sample_reference_photo):
|
||||
"""Create sample v3.2.0 credentials dict."""
|
||||
return {
|
||||
"reference_photo": sample_reference_photo,
|
||||
"passphrase": "test phrase four words", # v3.2.0: single passphrase
|
||||
"pin": "123456",
|
||||
}
|
||||
|
||||
|
||||
class TestBatchItem:
|
||||
"""Tests for BatchItem dataclass."""
|
||||
|
||||
def test_duration_calculation(self):
|
||||
"""Duration should be calculated from start/end times."""
|
||||
item = BatchItem(input_path=Path("test.png"))
|
||||
item.start_time = 100.0
|
||||
item.end_time = 105.5
|
||||
assert item.duration == 5.5
|
||||
|
||||
def test_duration_none_without_times(self):
|
||||
"""Duration should be None if times not set."""
|
||||
item = BatchItem(input_path=Path("test.png"))
|
||||
assert item.duration is None
|
||||
|
||||
def test_to_dict(self):
|
||||
"""to_dict should serialize all fields."""
|
||||
item = BatchItem(
|
||||
input_path=Path("input.png"),
|
||||
output_path=Path("output.png"),
|
||||
status=BatchStatus.SUCCESS,
|
||||
message="Done",
|
||||
)
|
||||
result = item.to_dict()
|
||||
assert result["input_path"] == "input.png"
|
||||
assert result["output_path"] == "output.png"
|
||||
assert result["status"] == "success"
|
||||
|
||||
|
||||
class TestBatchResult:
|
||||
"""Tests for BatchResult dataclass."""
|
||||
|
||||
def test_to_json(self):
|
||||
"""Should serialize to valid JSON."""
|
||||
import json
|
||||
|
||||
result = BatchResult(operation="encode", total=5, succeeded=4, failed=1)
|
||||
json_str = result.to_json()
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["operation"] == "encode"
|
||||
assert parsed["summary"]["total"] == 5
|
||||
|
||||
def test_duration_with_end_time(self):
|
||||
"""Duration should work when end_time is set."""
|
||||
result = BatchResult(operation="test")
|
||||
result.start_time = 100.0
|
||||
result.end_time = 110.0
|
||||
assert result.duration == 10.0
|
||||
|
||||
|
||||
class TestBatchCredentials:
|
||||
"""Tests for BatchCredentials dataclass (v3.2.0)."""
|
||||
|
||||
def test_from_dict_new_format(self, sample_reference_photo):
|
||||
"""Should parse v3.2.0 format with 'passphrase' key."""
|
||||
data = {
|
||||
"reference_photo": sample_reference_photo,
|
||||
"passphrase": "test phrase four words",
|
||||
"pin": "123456",
|
||||
}
|
||||
creds = BatchCredentials.from_dict(data)
|
||||
assert creds.passphrase == "test phrase four words"
|
||||
assert creds.pin == "123456"
|
||||
|
||||
def test_from_dict_legacy_format(self, sample_reference_photo):
|
||||
"""Should parse legacy format with 'day_phrase' key for migration."""
|
||||
data = {
|
||||
"reference_photo": sample_reference_photo,
|
||||
"day_phrase": "legacy phrase here", # Old key name
|
||||
"pin": "123456",
|
||||
}
|
||||
creds = BatchCredentials.from_dict(data)
|
||||
# Should accept old key and map to passphrase
|
||||
assert creds.passphrase == "legacy phrase here"
|
||||
assert creds.pin == "123456"
|
||||
|
||||
def test_to_dict(self, sample_reference_photo):
|
||||
"""Should serialize to v3.2.0 format."""
|
||||
creds = BatchCredentials(
|
||||
reference_photo=sample_reference_photo,
|
||||
passphrase="test phrase four words",
|
||||
pin="123456",
|
||||
)
|
||||
result = creds.to_dict()
|
||||
assert result["passphrase"] == "test phrase four words"
|
||||
assert result["pin"] == "123456"
|
||||
assert "day_phrase" not in result # Old key should not be present
|
||||
|
||||
def test_passphrase_is_string(self, sample_reference_photo):
|
||||
"""Passphrase should be a string, not a dict."""
|
||||
creds = BatchCredentials(
|
||||
reference_photo=sample_reference_photo,
|
||||
passphrase="test phrase four words",
|
||||
pin="123456",
|
||||
)
|
||||
assert isinstance(creds.passphrase, str)
|
||||
|
||||
|
||||
class TestBatchProcessor:
|
||||
"""Tests for BatchProcessor class."""
|
||||
|
||||
def test_init_default_workers(self):
|
||||
"""Should default to 4 workers."""
|
||||
processor = BatchProcessor()
|
||||
assert processor.max_workers == 4
|
||||
|
||||
def test_init_custom_workers(self):
|
||||
"""Should accept custom worker count."""
|
||||
processor = BatchProcessor(max_workers=8)
|
||||
assert processor.max_workers == 8
|
||||
|
||||
def test_is_valid_image_png(self, temp_dir):
|
||||
"""Should recognize PNG as valid."""
|
||||
processor = BatchProcessor()
|
||||
png_path = temp_dir / "test.png"
|
||||
png_path.touch()
|
||||
assert processor._is_valid_image(png_path)
|
||||
|
||||
def test_is_valid_image_txt(self, temp_dir):
|
||||
"""Should reject non-image files."""
|
||||
processor = BatchProcessor()
|
||||
txt_path = temp_dir / "test.txt"
|
||||
txt_path.touch()
|
||||
assert not processor._is_valid_image(txt_path)
|
||||
|
||||
def test_find_images_file(self, sample_images):
|
||||
"""Should find single image file."""
|
||||
processor = BatchProcessor()
|
||||
results = list(processor.find_images([sample_images[0]]))
|
||||
assert len(results) == 1
|
||||
assert results[0] == sample_images[0]
|
||||
|
||||
def test_find_images_directory(self, sample_images, temp_dir):
|
||||
"""Should find images in directory."""
|
||||
processor = BatchProcessor()
|
||||
results = list(processor.find_images([temp_dir]))
|
||||
assert len(results) == 3
|
||||
|
||||
def test_find_images_recursive(self, temp_dir):
|
||||
"""Should find images recursively."""
|
||||
from PIL import Image
|
||||
|
||||
# Create nested directory
|
||||
nested = temp_dir / "nested"
|
||||
nested.mkdir()
|
||||
img_path = nested / "nested.png"
|
||||
img = Image.new("RGB", (50, 50))
|
||||
img.save(img_path)
|
||||
|
||||
processor = BatchProcessor()
|
||||
results = list(processor.find_images([temp_dir], recursive=True))
|
||||
assert any(p.name == "nested.png" for p in results)
|
||||
|
||||
def test_batch_encode_requires_message_or_file(self, sample_images, sample_credentials):
|
||||
"""Should raise if neither message nor file provided."""
|
||||
processor = BatchProcessor()
|
||||
with pytest.raises(ValueError, match="message or file_payload"):
|
||||
processor.batch_encode(
|
||||
images=sample_images,
|
||||
credentials=sample_credentials,
|
||||
)
|
||||
|
||||
def test_batch_encode_requires_credentials(self, sample_images):
|
||||
"""Should raise if credentials not provided."""
|
||||
processor = BatchProcessor()
|
||||
with pytest.raises(ValueError, match="Credentials"):
|
||||
processor.batch_encode(
|
||||
images=sample_images,
|
||||
message="test",
|
||||
)
|
||||
|
||||
def test_batch_encode_accepts_passphrase_credentials(
|
||||
self, sample_images, temp_dir, sample_credentials
|
||||
):
|
||||
"""Should accept v3.2.0 format credentials with passphrase."""
|
||||
processor = BatchProcessor()
|
||||
result = processor.batch_encode(
|
||||
images=sample_images,
|
||||
message="Test message",
|
||||
output_dir=temp_dir / "output",
|
||||
credentials=sample_credentials, # Uses 'passphrase' key
|
||||
)
|
||||
|
||||
assert isinstance(result, BatchResult)
|
||||
assert result.operation == "encode"
|
||||
assert result.total == 3
|
||||
|
||||
def test_batch_encode_creates_result(self, sample_images, temp_dir, sample_credentials):
|
||||
"""Should return BatchResult with correct structure."""
|
||||
processor = BatchProcessor()
|
||||
result = processor.batch_encode(
|
||||
images=sample_images,
|
||||
message="Test message",
|
||||
output_dir=temp_dir / "output",
|
||||
credentials=sample_credentials,
|
||||
)
|
||||
|
||||
assert isinstance(result, BatchResult)
|
||||
assert result.operation == "encode"
|
||||
assert result.total == 3
|
||||
assert len(result.items) == 3
|
||||
|
||||
def test_batch_decode_requires_credentials(self, sample_images):
|
||||
"""Should raise if credentials not provided."""
|
||||
processor = BatchProcessor()
|
||||
with pytest.raises(ValueError, match="Credentials"):
|
||||
processor.batch_decode(images=sample_images)
|
||||
|
||||
def test_batch_decode_accepts_passphrase_credentials(self, sample_images, sample_credentials):
|
||||
"""Should accept v3.2.0 format credentials with passphrase."""
|
||||
processor = BatchProcessor()
|
||||
result = processor.batch_decode(
|
||||
images=sample_images,
|
||||
credentials=sample_credentials, # Uses 'passphrase' key
|
||||
)
|
||||
|
||||
assert isinstance(result, BatchResult)
|
||||
assert result.operation == "decode"
|
||||
assert result.total == 3
|
||||
|
||||
def test_batch_decode_creates_result(self, sample_images, sample_credentials):
|
||||
"""Should return BatchResult with correct structure."""
|
||||
processor = BatchProcessor()
|
||||
result = processor.batch_decode(
|
||||
images=sample_images,
|
||||
credentials=sample_credentials,
|
||||
)
|
||||
|
||||
assert isinstance(result, BatchResult)
|
||||
assert result.operation == "decode"
|
||||
assert result.total == 3
|
||||
|
||||
def test_progress_callback_called(self, sample_images, sample_credentials):
|
||||
"""Progress callback should be called for each item."""
|
||||
processor = BatchProcessor()
|
||||
callback = Mock()
|
||||
|
||||
processor.batch_encode(
|
||||
images=sample_images,
|
||||
message="Test",
|
||||
credentials=sample_credentials,
|
||||
progress_callback=callback,
|
||||
)
|
||||
|
||||
assert callback.call_count == 3
|
||||
|
||||
def test_custom_encode_func(self, sample_images, temp_dir, sample_credentials):
|
||||
"""Should use custom encode function if provided."""
|
||||
processor = BatchProcessor()
|
||||
encode_mock = Mock()
|
||||
|
||||
processor.batch_encode(
|
||||
images=sample_images,
|
||||
message="Test",
|
||||
output_dir=temp_dir / "output",
|
||||
credentials=sample_credentials,
|
||||
encode_func=encode_mock,
|
||||
)
|
||||
|
||||
assert encode_mock.call_count == 3
|
||||
|
||||
|
||||
class TestBatchCapacityCheck:
|
||||
"""Tests for batch_capacity_check function."""
|
||||
|
||||
def test_returns_list(self, sample_images):
|
||||
"""Should return list of results."""
|
||||
results = batch_capacity_check(sample_images)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 3
|
||||
|
||||
def test_includes_capacity(self, sample_images):
|
||||
"""Results should include capacity info."""
|
||||
results = batch_capacity_check(sample_images)
|
||||
for item in results:
|
||||
assert "capacity_bytes" in item
|
||||
assert "dimensions" in item
|
||||
assert "valid" in item
|
||||
|
||||
def test_handles_invalid_files(self, temp_dir):
|
||||
"""Should handle non-image files gracefully."""
|
||||
bad_file = temp_dir / "not_an_image.png"
|
||||
bad_file.write_bytes(b"not a png")
|
||||
|
||||
results = batch_capacity_check([bad_file])
|
||||
assert len(results) == 1
|
||||
assert "error" in results[0]
|
||||
|
||||
|
||||
class TestPrintBatchResult:
|
||||
"""Tests for print_batch_result function."""
|
||||
|
||||
def test_prints_summary(self, capsys, sample_images):
|
||||
"""Should print summary without errors."""
|
||||
result = BatchResult(
|
||||
operation="encode",
|
||||
total=3,
|
||||
succeeded=2,
|
||||
failed=1,
|
||||
)
|
||||
result.end_time = result.start_time + 5.0
|
||||
|
||||
print_batch_result(result)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "ENCODE" in captured.out
|
||||
assert "3" in captured.out # total
|
||||
assert "2" in captured.out # succeeded
|
||||
|
||||
def test_verbose_shows_items(self, capsys):
|
||||
"""Verbose mode should show individual items."""
|
||||
result = BatchResult(operation="decode", total=1, succeeded=1)
|
||||
result.items = [
|
||||
BatchItem(
|
||||
input_path=Path("test.png"),
|
||||
status=BatchStatus.SUCCESS,
|
||||
message="Decoded successfully",
|
||||
)
|
||||
]
|
||||
result.end_time = result.start_time + 1.0
|
||||
|
||||
print_batch_result(result, verbose=True)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "test.png" in captured.out
|
||||
|
||||
|
||||
class TestCredentialsMigration:
|
||||
"""Tests for v3.1.x to v3.2.0 credentials migration."""
|
||||
|
||||
def test_old_phrase_key_accepted(self, sample_reference_photo):
|
||||
"""Old 'phrase' key should be accepted for migration."""
|
||||
old_format = {
|
||||
"reference_photo": sample_reference_photo,
|
||||
"phrase": "old style phrase",
|
||||
"pin": "123456",
|
||||
}
|
||||
# Should not raise
|
||||
creds = BatchCredentials.from_dict(old_format)
|
||||
assert creds.passphrase == "old style phrase"
|
||||
|
||||
def test_old_day_phrase_key_accepted(self, sample_reference_photo):
|
||||
"""Old 'day_phrase' key should be accepted for migration."""
|
||||
old_format = {
|
||||
"reference_photo": sample_reference_photo,
|
||||
"day_phrase": "old day phrase",
|
||||
"pin": "123456",
|
||||
}
|
||||
creds = BatchCredentials.from_dict(old_format)
|
||||
assert creds.passphrase == "old day phrase"
|
||||
|
||||
def test_new_passphrase_key_preferred(self, sample_reference_photo):
|
||||
"""New 'passphrase' key should take precedence if both present."""
|
||||
mixed_format = {
|
||||
"reference_photo": sample_reference_photo,
|
||||
"passphrase": "new style passphrase",
|
||||
"day_phrase": "old day phrase",
|
||||
"pin": "123456",
|
||||
}
|
||||
creds = BatchCredentials.from_dict(mixed_format)
|
||||
assert creds.passphrase == "new style passphrase"
|
||||
@@ -1,181 +0,0 @@
|
||||
"""
|
||||
Tests for Stegasoo compression module.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from stegasoo.compression import (
|
||||
COMPRESSION_MAGIC,
|
||||
HAS_LZ4,
|
||||
MIN_COMPRESS_SIZE,
|
||||
CompressionAlgorithm,
|
||||
CompressionError,
|
||||
algorithm_name,
|
||||
compress,
|
||||
decompress,
|
||||
estimate_compressed_size,
|
||||
get_available_algorithms,
|
||||
get_compression_ratio,
|
||||
)
|
||||
|
||||
|
||||
class TestCompress:
|
||||
"""Tests for compress function."""
|
||||
|
||||
def test_compress_small_data_not_compressed(self):
|
||||
"""Small data should not be compressed (overhead not worth it)."""
|
||||
small_data = b"hello"
|
||||
result = compress(small_data)
|
||||
# Should have magic header but NONE algorithm
|
||||
assert result.startswith(COMPRESSION_MAGIC)
|
||||
assert result[4] == CompressionAlgorithm.NONE
|
||||
|
||||
def test_compress_zlib_reduces_size(self):
|
||||
"""Zlib should reduce size for compressible data."""
|
||||
# Highly compressible data
|
||||
data = b"A" * 1000
|
||||
result = compress(data, CompressionAlgorithm.ZLIB)
|
||||
assert len(result) < len(data)
|
||||
assert result.startswith(COMPRESSION_MAGIC)
|
||||
assert result[4] == CompressionAlgorithm.ZLIB
|
||||
|
||||
def test_compress_incompressible_data(self):
|
||||
"""Incompressible data should be stored uncompressed."""
|
||||
import os
|
||||
|
||||
# Random data doesn't compress well
|
||||
data = os.urandom(500)
|
||||
result = compress(data, CompressionAlgorithm.ZLIB)
|
||||
# Should fall back to NONE if compression didn't help
|
||||
assert result.startswith(COMPRESSION_MAGIC)
|
||||
|
||||
def test_compress_none_algorithm(self):
|
||||
"""NONE algorithm should just wrap data."""
|
||||
data = b"Test data" * 100
|
||||
result = compress(data, CompressionAlgorithm.NONE)
|
||||
assert result.startswith(COMPRESSION_MAGIC)
|
||||
assert result[4] == CompressionAlgorithm.NONE
|
||||
# Data should be after 9-byte header
|
||||
assert result[9:] == data
|
||||
|
||||
@pytest.mark.skipif(not HAS_LZ4, reason="LZ4 not installed")
|
||||
def test_compress_lz4(self):
|
||||
"""LZ4 compression should work if available."""
|
||||
data = b"B" * 1000
|
||||
result = compress(data, CompressionAlgorithm.LZ4)
|
||||
assert len(result) < len(data)
|
||||
assert result.startswith(COMPRESSION_MAGIC)
|
||||
assert result[4] == CompressionAlgorithm.LZ4
|
||||
|
||||
|
||||
class TestDecompress:
|
||||
"""Tests for decompress function."""
|
||||
|
||||
def test_decompress_zlib(self):
|
||||
"""Decompression should restore original data."""
|
||||
original = b"Hello, World! " * 100
|
||||
compressed = compress(original, CompressionAlgorithm.ZLIB)
|
||||
result = decompress(compressed)
|
||||
assert result == original
|
||||
|
||||
def test_decompress_none(self):
|
||||
"""Uncompressed wrapped data should decompress correctly."""
|
||||
original = b"Small data"
|
||||
wrapped = compress(original, CompressionAlgorithm.NONE)
|
||||
result = decompress(wrapped)
|
||||
assert result == original
|
||||
|
||||
def test_decompress_no_magic(self):
|
||||
"""Data without magic header should be returned as-is."""
|
||||
data = b"Not compressed at all"
|
||||
result = decompress(data)
|
||||
assert result == data
|
||||
|
||||
def test_decompress_truncated_header(self):
|
||||
"""Truncated header should raise CompressionError."""
|
||||
bad_data = COMPRESSION_MAGIC + b"\x01" # Too short
|
||||
with pytest.raises(CompressionError, match="Truncated"):
|
||||
decompress(bad_data)
|
||||
|
||||
@pytest.mark.skipif(not HAS_LZ4, reason="LZ4 not installed")
|
||||
def test_decompress_lz4(self):
|
||||
"""LZ4 decompression should work."""
|
||||
original = b"LZ4 test data " * 100
|
||||
compressed = compress(original, CompressionAlgorithm.LZ4)
|
||||
result = decompress(compressed)
|
||||
assert result == original
|
||||
|
||||
def test_roundtrip_large_data(self):
|
||||
"""Large data should survive compress/decompress roundtrip."""
|
||||
import os
|
||||
|
||||
original = os.urandom(50000)
|
||||
compressed = compress(original)
|
||||
result = decompress(compressed)
|
||||
assert result == original
|
||||
|
||||
|
||||
class TestUtilities:
|
||||
"""Tests for utility functions."""
|
||||
|
||||
def test_compression_ratio_compressed(self):
|
||||
"""Ratio should be < 1 for well-compressed data."""
|
||||
original = b"X" * 1000
|
||||
compressed = compress(original)
|
||||
ratio = get_compression_ratio(original, compressed)
|
||||
assert ratio < 1.0
|
||||
|
||||
def test_compression_ratio_empty(self):
|
||||
"""Empty data should return ratio of 1.0."""
|
||||
ratio = get_compression_ratio(b"", b"")
|
||||
assert ratio == 1.0
|
||||
|
||||
def test_estimate_compressed_size_small(self):
|
||||
"""Small data estimation should be accurate."""
|
||||
data = b"Test " * 100
|
||||
estimate = estimate_compressed_size(data)
|
||||
actual = len(compress(data))
|
||||
# Should be within 20% for small data
|
||||
assert abs(estimate - actual) / actual < 0.2
|
||||
|
||||
def test_available_algorithms(self):
|
||||
"""Should always include NONE and ZLIB."""
|
||||
algos = get_available_algorithms()
|
||||
assert CompressionAlgorithm.NONE in algos
|
||||
assert CompressionAlgorithm.ZLIB in algos
|
||||
|
||||
def test_algorithm_name(self):
|
||||
"""Algorithm names should be human-readable."""
|
||||
assert "Zlib" in algorithm_name(CompressionAlgorithm.ZLIB)
|
||||
assert "None" in algorithm_name(CompressionAlgorithm.NONE)
|
||||
assert "LZ4" in algorithm_name(CompressionAlgorithm.LZ4)
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Edge case tests."""
|
||||
|
||||
def test_empty_data(self):
|
||||
"""Empty data should be handled gracefully."""
|
||||
result = compress(b"")
|
||||
assert decompress(result) == b""
|
||||
|
||||
def test_exact_min_size(self):
|
||||
"""Data at exactly MIN_COMPRESS_SIZE should be compressed."""
|
||||
data = b"x" * MIN_COMPRESS_SIZE
|
||||
result = compress(data, CompressionAlgorithm.ZLIB)
|
||||
assert result.startswith(COMPRESSION_MAGIC)
|
||||
assert decompress(result) == data
|
||||
|
||||
def test_binary_data(self):
|
||||
"""Binary data with null bytes should work."""
|
||||
data = b"\x00\x01\x02\x03" * 500
|
||||
compressed = compress(data)
|
||||
assert decompress(compressed) == data
|
||||
|
||||
def test_unicode_after_encoding(self):
|
||||
"""UTF-8 encoded Unicode should compress correctly."""
|
||||
text = "Hello, 世界! 🎉 " * 100
|
||||
data = text.encode("utf-8")
|
||||
compressed = compress(data)
|
||||
result = decompress(compressed)
|
||||
assert result.decode("utf-8") == text
|
||||
@@ -1,785 +0,0 @@
|
||||
"""
|
||||
Stegasoo Tests (v4.0.0)
|
||||
|
||||
Tests for key generation, validation, encoding/decoding, output formats,
|
||||
and channel key functionality.
|
||||
|
||||
Updated for v4.0.0:
|
||||
- Same API as v3.2.0 (passphrase, no date_str)
|
||||
- Channel key support for deployment/group isolation
|
||||
- HEADER_OVERHEAD increased to 66 bytes (flags byte added)
|
||||
- Python 3.12 recommended (3.13 not supported)
|
||||
"""
|
||||
|
||||
import io
|
||||
|
||||
import pytest
|
||||
from PIL import Image
|
||||
|
||||
import stegasoo
|
||||
from stegasoo import (
|
||||
decode,
|
||||
decode_text,
|
||||
encode,
|
||||
generate_channel_key,
|
||||
generate_credentials,
|
||||
generate_passphrase,
|
||||
generate_pin,
|
||||
get_channel_fingerprint,
|
||||
validate_channel_key,
|
||||
validate_message,
|
||||
validate_passphrase,
|
||||
validate_pin,
|
||||
)
|
||||
from stegasoo.steganography import get_output_format
|
||||
|
||||
# =============================================================================
|
||||
# Fixtures
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def png_image():
|
||||
"""Create a test PNG image."""
|
||||
img = Image.new("RGB", (100, 100), color="red")
|
||||
buf = io.BytesIO()
|
||||
img.save(buf, format="PNG")
|
||||
buf.seek(0)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def large_png_image():
|
||||
"""Create a larger test PNG image for DCT mode.
|
||||
|
||||
Uses noise instead of solid color to ensure DCT color mode works.
|
||||
Solid colors cause coefficient drift during RGB conversion that
|
||||
can exceed the quantization step and corrupt embedded data.
|
||||
"""
|
||||
import numpy as np
|
||||
# Create random noise image (ensures varied Y channel values)
|
||||
np.random.seed(42) # Reproducible
|
||||
data = np.random.randint(0, 256, (400, 400, 3), dtype=np.uint8)
|
||||
img = Image.fromarray(data, 'RGB')
|
||||
buf = io.BytesIO()
|
||||
img.save(buf, format="PNG")
|
||||
buf.seek(0)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bmp_image():
|
||||
"""Create a test BMP image."""
|
||||
img = Image.new("RGB", (100, 100), color="blue")
|
||||
buf = io.BytesIO()
|
||||
img.save(buf, format="BMP")
|
||||
buf.seek(0)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def jpeg_image():
|
||||
"""Create a test JPEG image."""
|
||||
img = Image.new("RGB", (100, 100), color="green")
|
||||
buf = io.BytesIO()
|
||||
img.save(buf, format="JPEG")
|
||||
buf.seek(0)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gif_image():
|
||||
"""Create a test GIF image."""
|
||||
img = Image.new("RGB", (100, 100), color="yellow")
|
||||
buf = io.BytesIO()
|
||||
img.save(buf, format="GIF")
|
||||
buf.seek(0)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Key Generation Tests (v3.2.0 Updated)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestKeygen:
|
||||
"""Tests for key generation functions."""
|
||||
|
||||
def test_generate_pin_default(self):
|
||||
"""Default PIN should be 6 digits, no leading zero."""
|
||||
pin = generate_pin()
|
||||
assert len(pin) == 6
|
||||
assert pin.isdigit()
|
||||
assert pin[0] != "0"
|
||||
|
||||
def test_generate_pin_lengths(self):
|
||||
"""PIN generation should work for all valid lengths."""
|
||||
for length in [6, 7, 8, 9]:
|
||||
pin = generate_pin(length)
|
||||
assert len(pin) == length
|
||||
assert pin.isdigit()
|
||||
|
||||
def test_generate_passphrase_default(self):
|
||||
"""Default passphrase should have 4 words (v3.2.0 change)."""
|
||||
phrase = generate_passphrase()
|
||||
words = phrase.split()
|
||||
assert len(words) == 4 # Changed from 3 in v3.1.x
|
||||
|
||||
def test_generate_passphrase_custom_length(self):
|
||||
"""Passphrase generation should work for custom lengths."""
|
||||
for length in [3, 4, 5, 6, 8, 12]:
|
||||
phrase = generate_passphrase(length)
|
||||
words = phrase.split()
|
||||
assert len(words) == length
|
||||
|
||||
def test_generate_credentials_pin_only(self):
|
||||
"""PIN-only credentials should have single passphrase."""
|
||||
creds = generate_credentials(use_pin=True, use_rsa=False)
|
||||
assert creds.pin is not None
|
||||
assert creds.rsa_key_pem is None
|
||||
# v3.2.0: Single passphrase instead of 7 daily phrases
|
||||
assert creds.passphrase is not None
|
||||
assert isinstance(creds.passphrase, str)
|
||||
assert " " in creds.passphrase # Should have multiple words
|
||||
|
||||
def test_generate_credentials_rsa_only(self):
|
||||
"""RSA-only credentials should have single passphrase."""
|
||||
creds = generate_credentials(use_pin=False, use_rsa=True)
|
||||
assert creds.pin is None
|
||||
assert creds.rsa_key_pem is not None
|
||||
assert creds.passphrase is not None
|
||||
|
||||
def test_generate_credentials_both(self):
|
||||
"""Both PIN and RSA should work together."""
|
||||
creds = generate_credentials(use_pin=True, use_rsa=True)
|
||||
assert creds.pin is not None
|
||||
assert creds.rsa_key_pem is not None
|
||||
assert creds.passphrase is not None
|
||||
|
||||
def test_generate_credentials_neither_fails(self):
|
||||
"""Generating with neither PIN nor RSA should fail."""
|
||||
with pytest.raises((ValueError, AssertionError)):
|
||||
generate_credentials(use_pin=False, use_rsa=False)
|
||||
|
||||
def test_generate_credentials_custom_words(self):
|
||||
"""Custom passphrase_words parameter should work."""
|
||||
creds = generate_credentials(use_pin=True, passphrase_words=6)
|
||||
words = creds.passphrase.split()
|
||||
assert len(words) == 6
|
||||
|
||||
def test_generate_credentials_default_words(self):
|
||||
"""Default should be 4 words (v3.2.0)."""
|
||||
creds = generate_credentials(use_pin=True)
|
||||
words = creds.passphrase.split()
|
||||
assert len(words) == 4
|
||||
|
||||
def test_passphrase_entropy_calculation(self):
|
||||
"""Passphrase entropy should be calculated correctly."""
|
||||
creds = generate_credentials(use_pin=True, passphrase_words=4)
|
||||
# 4 words × 11 bits/word = 44 bits
|
||||
assert creds.passphrase_entropy == 44
|
||||
|
||||
def test_total_entropy_calculation(self):
|
||||
"""Total entropy should sum all components."""
|
||||
creds = generate_credentials(use_pin=True, use_rsa=False, passphrase_words=4)
|
||||
# 44 bits (passphrase) + ~20 bits (PIN)
|
||||
assert creds.total_entropy > 0
|
||||
assert creds.total_entropy >= creds.passphrase_entropy
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Validation Tests (v3.2.0 Updated)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestValidation:
|
||||
"""Tests for validation functions."""
|
||||
|
||||
def test_validate_pin_valid(self):
|
||||
"""Valid PIN should pass validation."""
|
||||
result = validate_pin("123456")
|
||||
assert result.is_valid
|
||||
|
||||
def test_validate_pin_empty_ok(self):
|
||||
"""Empty PIN should be valid (RSA key might be used instead)."""
|
||||
result = validate_pin("")
|
||||
assert result.is_valid
|
||||
|
||||
def test_validate_pin_too_short(self):
|
||||
"""PIN shorter than 6 digits should fail."""
|
||||
result = validate_pin("12345")
|
||||
assert not result.is_valid
|
||||
|
||||
def test_validate_pin_too_long(self):
|
||||
"""PIN longer than 9 digits should fail."""
|
||||
result = validate_pin("1234567890")
|
||||
assert not result.is_valid
|
||||
|
||||
def test_validate_pin_leading_zero(self):
|
||||
"""PIN with leading zero should fail."""
|
||||
result = validate_pin("012345")
|
||||
assert not result.is_valid
|
||||
|
||||
def test_validate_pin_non_digits(self):
|
||||
"""PIN with non-digit characters should fail."""
|
||||
result = validate_pin("12345a")
|
||||
assert not result.is_valid
|
||||
|
||||
def test_validate_message_valid(self):
|
||||
"""Valid message should pass validation."""
|
||||
result = validate_message("Hello, World!")
|
||||
assert result.is_valid
|
||||
|
||||
def test_validate_message_empty(self):
|
||||
"""Empty message should fail validation."""
|
||||
result = validate_message("")
|
||||
assert not result.is_valid
|
||||
|
||||
def test_validate_passphrase_valid(self):
|
||||
"""Valid passphrase should pass validation."""
|
||||
result = validate_passphrase("word1 word2 word3 word4")
|
||||
assert result.is_valid
|
||||
|
||||
def test_validate_passphrase_empty(self):
|
||||
"""Empty passphrase should fail validation."""
|
||||
result = validate_passphrase("")
|
||||
assert not result.is_valid
|
||||
|
||||
def test_validate_passphrase_short_warning(self):
|
||||
"""Short passphrase should have warning but still be valid."""
|
||||
result = validate_passphrase("word1 word2 word3") # Only 3 words
|
||||
assert result.is_valid
|
||||
assert result.warning is not None # Should warn about short passphrase
|
||||
|
||||
def test_validate_passphrase_recommended_no_warning(self):
|
||||
"""Recommended length passphrase should have no warning."""
|
||||
result = validate_passphrase("word1 word2 word3 word4") # 4 words
|
||||
assert result.is_valid
|
||||
# May or may not have warning depending on implementation
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Output Format Tests
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestOutputFormat:
|
||||
"""Tests for output format handling."""
|
||||
|
||||
def test_png_stays_png(self):
|
||||
"""PNG input should produce PNG output."""
|
||||
fmt, ext = get_output_format("PNG")
|
||||
assert fmt == "PNG"
|
||||
assert ext == "png"
|
||||
|
||||
def test_bmp_stays_bmp(self):
|
||||
"""BMP input should produce BMP output."""
|
||||
fmt, ext = get_output_format("BMP")
|
||||
assert fmt == "BMP"
|
||||
assert ext == "bmp"
|
||||
|
||||
def test_jpeg_becomes_png(self):
|
||||
"""JPEG input should produce PNG output (lossless)."""
|
||||
fmt, ext = get_output_format("JPEG")
|
||||
assert fmt == "PNG"
|
||||
assert ext == "png"
|
||||
|
||||
def test_gif_becomes_png(self):
|
||||
"""GIF input should produce PNG output."""
|
||||
fmt, ext = get_output_format("GIF")
|
||||
assert fmt == "PNG"
|
||||
assert ext == "png"
|
||||
|
||||
def test_none_becomes_png(self):
|
||||
"""None format should default to PNG."""
|
||||
fmt, ext = get_output_format(None)
|
||||
assert fmt == "PNG"
|
||||
assert ext == "png"
|
||||
|
||||
def test_unknown_becomes_png(self):
|
||||
"""Unknown format should default to PNG."""
|
||||
fmt, ext = get_output_format("UNKNOWN")
|
||||
assert fmt == "PNG"
|
||||
assert ext == "png"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Header Overhead Test (v4.0.0)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestConstants:
|
||||
"""Tests for constants and configuration."""
|
||||
|
||||
def test_header_overhead_value(self):
|
||||
"""Header overhead should be 66 bytes (v4.0.0: added flags byte)."""
|
||||
from stegasoo.steganography import HEADER_OVERHEAD
|
||||
|
||||
assert HEADER_OVERHEAD == 66
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Encode/Decode Tests (v4.0.0 Updated)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestEncodeDecode:
|
||||
"""Tests for encoding and decoding functions."""
|
||||
|
||||
def test_encode_decode_roundtrip(self, png_image):
|
||||
"""Full encode/decode cycle should work."""
|
||||
message = "Secret message!"
|
||||
passphrase = "apple forest thunder mountain" # 4 words
|
||||
pin = "123456"
|
||||
|
||||
# v3.2.0: Use passphrase parameter, no date_str
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
assert result.stego_image is not None
|
||||
assert len(result.stego_image) > 0
|
||||
assert result.filename.endswith(".png")
|
||||
|
||||
# v3.2.0: Use passphrase parameter, no date_str
|
||||
decoded = decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
assert decoded.message == message
|
||||
|
||||
def test_decode_text_roundtrip(self, png_image):
|
||||
"""decode_text convenience function should work."""
|
||||
message = "Secret message!"
|
||||
passphrase = "apple forest thunder mountain"
|
||||
pin = "123456"
|
||||
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
# decode_text returns string directly
|
||||
decoded_text = decode_text(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
assert decoded_text == message
|
||||
|
||||
def test_png_carrier_produces_png(self, png_image):
|
||||
"""PNG carrier should produce PNG output."""
|
||||
result = encode(
|
||||
message="Test",
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase="test phrase here now",
|
||||
pin="123456",
|
||||
)
|
||||
assert result.filename.endswith(".png")
|
||||
|
||||
def test_bmp_carrier_produces_bmp(self, bmp_image, png_image):
|
||||
"""BMP carrier should produce BMP output."""
|
||||
result = encode(
|
||||
message="Test",
|
||||
reference_photo=png_image,
|
||||
carrier_image=bmp_image,
|
||||
passphrase="test phrase here now",
|
||||
pin="123456",
|
||||
)
|
||||
assert result.filename.endswith(".bmp")
|
||||
|
||||
def test_jpeg_carrier_produces_png(self, jpeg_image, png_image):
|
||||
"""JPEG carrier should produce PNG output (lossless)."""
|
||||
result = encode(
|
||||
message="Test",
|
||||
reference_photo=png_image,
|
||||
carrier_image=jpeg_image,
|
||||
passphrase="test phrase here now",
|
||||
pin="123456",
|
||||
)
|
||||
assert result.filename.endswith(".png")
|
||||
|
||||
def test_bmp_roundtrip(self, bmp_image, png_image):
|
||||
"""Full encode/decode cycle with BMP should work."""
|
||||
message = "BMP test message!"
|
||||
passphrase = "test phrase words here"
|
||||
pin = "123456"
|
||||
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=bmp_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
assert result.filename.endswith(".bmp")
|
||||
|
||||
decoded = decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
assert decoded.message == message
|
||||
|
||||
def test_wrong_pin_fails(self, png_image):
|
||||
"""Wrong PIN should fail to decode."""
|
||||
result = encode(
|
||||
message="Secret",
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase="test phrase here now",
|
||||
pin="123456",
|
||||
)
|
||||
|
||||
with pytest.raises((stegasoo.DecryptionError, stegasoo.ExtractionError)):
|
||||
decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase="test phrase here now",
|
||||
pin="654321", # Wrong PIN
|
||||
)
|
||||
|
||||
def test_wrong_passphrase_fails(self, png_image):
|
||||
"""Wrong passphrase should fail to decode."""
|
||||
result = encode(
|
||||
message="Secret",
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase="correct phrase here now",
|
||||
pin="123456",
|
||||
)
|
||||
|
||||
with pytest.raises((stegasoo.DecryptionError, stegasoo.ExtractionError)):
|
||||
decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase="wrong phrase here now", # Wrong passphrase
|
||||
pin="123456",
|
||||
)
|
||||
|
||||
def test_unicode_message(self, png_image):
|
||||
"""Unicode messages should encode/decode correctly."""
|
||||
message = "Hello, 世界! 🎉 Émojis and ümlauts"
|
||||
passphrase = "unicode test phrase here"
|
||||
pin = "123456"
|
||||
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
decoded = decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
assert decoded.message == message
|
||||
|
||||
def test_filename_format(self, png_image):
|
||||
"""Output filename should have random hex and date suffix."""
|
||||
result = encode(
|
||||
message="Test",
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase="test phrase here now",
|
||||
pin="123456",
|
||||
)
|
||||
# Filename format: {random_hex}_{YYYYMMDD}.{ext}
|
||||
# e.g., "a1b2c3d4_20251227.png"
|
||||
import re
|
||||
|
||||
assert re.search(r"^[a-f0-9]{8}_\d{8}\.png$", result.filename)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DCT Mode Tests (v3.2.0)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestDCTMode:
|
||||
"""Tests for DCT steganography mode."""
|
||||
|
||||
@pytest.fixture
|
||||
def skip_if_no_dct(self):
|
||||
"""Skip test if DCT support not available."""
|
||||
if not stegasoo.has_dct_support():
|
||||
pytest.skip("DCT support not available (scipy not installed)")
|
||||
|
||||
def test_dct_encode_decode_roundtrip(self, large_png_image, skip_if_no_dct):
|
||||
"""DCT mode encode/decode should work."""
|
||||
message = "DCT test"
|
||||
passphrase = "dct test phrase here"
|
||||
pin = "123456"
|
||||
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=large_png_image,
|
||||
carrier_image=large_png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
embed_mode="dct",
|
||||
)
|
||||
|
||||
assert result.stego_image is not None
|
||||
|
||||
decoded = decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=large_png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
)
|
||||
|
||||
assert decoded.message == message
|
||||
|
||||
def test_dct_auto_detection(self, large_png_image, skip_if_no_dct):
|
||||
"""Auto mode should detect DCT encoding."""
|
||||
message = "Auto detect DCT"
|
||||
passphrase = "auto detect test here"
|
||||
pin = "123456"
|
||||
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=large_png_image,
|
||||
carrier_image=large_png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
embed_mode="dct",
|
||||
)
|
||||
|
||||
# Decode with auto mode (default)
|
||||
decoded = decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=large_png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
embed_mode="auto",
|
||||
)
|
||||
|
||||
assert decoded.message == message
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Version Tests
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestVersion:
|
||||
"""Tests for version information."""
|
||||
|
||||
def test_version_exists(self):
|
||||
"""Version string should exist and be valid."""
|
||||
assert hasattr(stegasoo, "__version__")
|
||||
parts = stegasoo.__version__.split(".")
|
||||
assert len(parts) >= 2
|
||||
assert all(p.isdigit() for p in parts[:2])
|
||||
|
||||
def test_version_is_4_0_0(self):
|
||||
"""Version should be 4.0.0 or higher."""
|
||||
parts = stegasoo.__version__.split(".")
|
||||
major = int(parts[0])
|
||||
assert major >= 4
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Backward Compatibility Tests
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestBackwardCompatibility:
|
||||
"""Tests for backward compatibility handling."""
|
||||
|
||||
def test_old_day_phrase_parameter_raises(self, png_image):
|
||||
"""Using old day_phrase parameter should raise TypeError."""
|
||||
with pytest.raises(TypeError):
|
||||
encode(
|
||||
message="Test",
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
day_phrase="old style phrase", # Old parameter name
|
||||
pin="123456",
|
||||
)
|
||||
|
||||
def test_old_date_str_parameter_raises(self, png_image):
|
||||
"""Using old date_str parameter should raise TypeError."""
|
||||
with pytest.raises(TypeError):
|
||||
encode(
|
||||
message="Test",
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase="test phrase here now",
|
||||
pin="123456",
|
||||
date_str="2025-01-01", # Removed parameter
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Channel Key Tests (v4.0.0)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestChannelKey:
|
||||
"""Tests for channel key functionality (v4.0.0)."""
|
||||
|
||||
def test_generate_channel_key_format(self):
|
||||
"""Generated channel key should have correct format."""
|
||||
key = generate_channel_key()
|
||||
# Format: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX (8 groups of 4)
|
||||
assert len(key) == 39
|
||||
parts = key.split("-")
|
||||
assert len(parts) == 8
|
||||
for part in parts:
|
||||
assert len(part) == 4
|
||||
assert part.isalnum()
|
||||
|
||||
def test_validate_channel_key_valid(self):
|
||||
"""Valid channel key should pass validation."""
|
||||
key = generate_channel_key()
|
||||
assert validate_channel_key(key)
|
||||
|
||||
def test_validate_channel_key_invalid(self):
|
||||
"""Invalid channel key should fail validation."""
|
||||
assert not validate_channel_key("")
|
||||
assert not validate_channel_key("invalid")
|
||||
assert not validate_channel_key("ABCD-1234") # Too short
|
||||
|
||||
def test_channel_fingerprint_format(self):
|
||||
"""Channel fingerprint should mask middle sections."""
|
||||
key = "ABCD-1234-EFGH-5678-IJKL-9012-MNOP-3456"
|
||||
fingerprint = get_channel_fingerprint(key)
|
||||
assert fingerprint is not None
|
||||
# First and last groups visible, middle masked
|
||||
assert fingerprint.startswith("ABCD-")
|
||||
assert fingerprint.endswith("-3456")
|
||||
assert "••••" in fingerprint
|
||||
|
||||
def test_encode_decode_with_channel_key(self, png_image):
|
||||
"""Encode/decode should work with explicit channel key."""
|
||||
message = "Secret with channel key!"
|
||||
passphrase = "apple forest thunder mountain"
|
||||
pin = "123456"
|
||||
channel_key = generate_channel_key()
|
||||
|
||||
# Encode with channel key
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key=channel_key,
|
||||
)
|
||||
|
||||
assert result.stego_image is not None
|
||||
|
||||
# Decode with same channel key
|
||||
decoded = decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key=channel_key,
|
||||
)
|
||||
|
||||
assert decoded.message == message
|
||||
|
||||
def test_decode_wrong_channel_key_fails(self, png_image):
|
||||
"""Decoding with wrong channel key should fail."""
|
||||
message = "Secret message"
|
||||
passphrase = "apple forest thunder mountain"
|
||||
pin = "123456"
|
||||
channel_key1 = generate_channel_key()
|
||||
channel_key2 = generate_channel_key()
|
||||
|
||||
# Encode with one channel key
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key=channel_key1,
|
||||
)
|
||||
|
||||
# Decode with different channel key should fail
|
||||
with pytest.raises((stegasoo.DecryptionError, stegasoo.ExtractionError)):
|
||||
decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key=channel_key2,
|
||||
)
|
||||
|
||||
def test_encode_decode_public_mode(self, png_image):
|
||||
"""Encode/decode should work without channel key (public mode)."""
|
||||
message = "Public message!"
|
||||
passphrase = "apple forest thunder mountain"
|
||||
pin = "123456"
|
||||
|
||||
# Encode without channel key (explicit public mode)
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key="", # Explicit public mode
|
||||
)
|
||||
|
||||
# Decode without channel key
|
||||
decoded = decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key="", # Explicit public mode
|
||||
)
|
||||
|
||||
assert decoded.message == message
|
||||
|
||||
def test_channel_key_mismatch_public_vs_private(self, png_image):
|
||||
"""Decoding public message with channel key should fail."""
|
||||
message = "Public message"
|
||||
passphrase = "apple forest thunder mountain"
|
||||
pin = "123456"
|
||||
|
||||
# Encode without channel key (public)
|
||||
result = encode(
|
||||
message=message,
|
||||
reference_photo=png_image,
|
||||
carrier_image=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key="", # Public mode
|
||||
)
|
||||
|
||||
# Decode with channel key should fail
|
||||
channel_key = generate_channel_key()
|
||||
with pytest.raises((stegasoo.DecryptionError, stegasoo.ExtractionError)):
|
||||
decode(
|
||||
stego_image=result.stego_image,
|
||||
reference_photo=png_image,
|
||||
passphrase=passphrase,
|
||||
pin=pin,
|
||||
channel_key=channel_key,
|
||||
)
|
||||
Reference in New Issue
Block a user