Added debug, additional comments, encoded file thumbnail view.

This commit is contained in:
Aaron D. Lee
2025-12-28 20:18:11 -05:00
parent 541e6424ea
commit 7dd2e2daf7
32 changed files with 5618 additions and 153 deletions

View File

@@ -57,6 +57,11 @@ File Embedding:
f.write(decoded.file_data)
else:
print(decoded.message)
Debugging:
from stegasoo.debug import debug
debug.enable(True) # Enable debug output
debug.enable_performance(True) # Enable timing
"""
from .constants import __version__, DAY_NAMES, MAX_MESSAGE_SIZE, MAX_FILE_PAYLOAD_SIZE
@@ -148,6 +153,7 @@ from .utils import (
SecureDeleter,
format_file_size,
)
from .debug import debug # Import debug utilities
# QR Code utilities (optional, depends on qrcode and pyzbar)
try:
@@ -174,7 +180,7 @@ except ImportError:
from datetime import date
from pathlib import Path
from typing import Optional, Union
from typing import Optional, Union, Dict, Any
def encode(
@@ -219,6 +225,10 @@ def encode(
Output format is always lossless (PNG or BMP) to preserve hidden data.
If carrier is JPEG/GIF, output will be PNG to maintain data integrity.
"""
# Debug logging
debug.print(f"encode called: message type={type(message).__name__}, "
f"day_phrase='{day_phrase[:20]}...', pin_length={len(pin)}")
# Validate inputs
require_valid_payload(message)
require_valid_image(carrier_image, "Carrier image")
@@ -233,16 +243,23 @@ def encode(
if date_str is None:
date_str = date.today().isoformat()
debug.print(f"Encoding for date: {date_str}")
# Encrypt message/file
encrypted = encrypt_message(
message, reference_photo, day_phrase, date_str, pin, rsa_key_data
)
# Debug: show encrypted data size
debug.print(f"Encrypted payload: {len(encrypted)} bytes")
# Get pixel key
pixel_key = derive_pixel_key(
reference_photo, day_phrase, date_str, pin, rsa_key_data
)
debug.data(pixel_key, "Pixel key")
# Embed in image (returns extension too)
stego_data, stats, extension = embed_in_image(
carrier_image, encrypted, pixel_key, output_format=output_format
@@ -251,6 +268,10 @@ def encode(
# Generate filename with correct extension
filename = generate_filename(date_str, extension=extension)
debug.print(f"Encoding complete: {filename}, "
f"modified {stats.pixels_modified}/{stats.total_pixels} pixels "
f"({stats.modification_percent:.2f}%)")
return EncodeResult(
stego_image=stego_data,
filename=filename,
@@ -293,6 +314,7 @@ def encode_file(
Returns:
EncodeResult with stego image and metadata
"""
debug.print(f"encode_file called: filepath={filepath}")
payload = FilePayload.from_file(str(filepath), filename_override)
return encode(
@@ -342,6 +364,7 @@ def encode_bytes(
Returns:
EncodeResult with stego image and metadata
"""
debug.print(f"encode_bytes called: filename={filename}, data_size={len(data)}")
payload = FilePayload(data=data, filename=filename, mime_type=mime_type)
return encode(
@@ -357,6 +380,7 @@ def encode_bytes(
)
@debug.time
def decode(
stego_image: bytes,
reference_photo: bytes,
@@ -393,6 +417,9 @@ def decode(
ExtractionError: If data cannot be extracted
DecryptionError: If decryption fails
"""
debug.print(f"decode called: stego_image_size={len(stego_image)}, "
f"day_phrase='{day_phrase[:20]}...'")
# Validate inputs
require_security_factors(pin, rsa_key_data)
@@ -407,12 +434,15 @@ def decode(
reference_photo, day_phrase, date_str, pin, rsa_key_data
)
debug.data(pixel_key, "Pixel key for extraction")
encrypted = extract_from_image(stego_image, pixel_key)
# If we got data, check if it's from a different date
if encrypted:
header = parse_header(encrypted)
if header and header['date'] != date_str:
debug.print(f"Found different date in header: {header['date']} (expected {date_str})")
# Re-extract with correct date
pixel_key = derive_pixel_key(
reference_photo, day_phrase, header['date'], pin, rsa_key_data
@@ -420,8 +450,12 @@ def decode(
encrypted = extract_from_image(stego_image, pixel_key)
if not encrypted:
debug.print("No data extracted from image")
raise ExtractionError("Could not extract data. Check your inputs.")
debug.print(f"Extracted {len(encrypted)} bytes from image")
debug.data(encrypted[:64], "First 64 bytes of extracted data")
# Decrypt and return full result
return decrypt_message(encrypted, reference_photo, day_phrase, pin, rsa_key_data)
@@ -454,6 +488,7 @@ def decode_text(
Raises:
DecryptionError: If content is a binary file, not text
"""
debug.print("decode_text called")
result = decode(stego_image, reference_photo, day_phrase, pin, rsa_key_data, rsa_password)
if result.is_file:
@@ -462,12 +497,14 @@ def decode_text(
try:
return result.file_data.decode('utf-8')
except UnicodeDecodeError:
debug.print(f"File is binary: {result.filename or 'unnamed'}")
raise DecryptionError(
f"Content is a binary file ({result.filename or 'unnamed'}), not text. "
"Use decode() instead and check result.is_file."
)
return ""
debug.print(f"Decoded text: {result.message[:100] if result.message else 'empty'}...")
return result.message or ""
@@ -574,4 +611,7 @@ __all__ = [
'secure_delete',
'SecureDeleter',
'format_file_size',
]
# Debugging
'debug',
]

View File

@@ -11,7 +11,7 @@ from pathlib import Path
# VERSION
# ============================================================================
__version__ = "2.1.1"
__version__ = "2.1.3"
# ============================================================================
# FILE FORMAT

180
src/stegasoo/debug.py Normal file
View File

@@ -0,0 +1,180 @@
"""
Stegasoo Debugging Utilities
Debugging, logging, and performance monitoring tools.
Can be disabled for production use.
"""
import time
import traceback
from datetime import datetime
from functools import wraps
from typing import Callable, Any, Optional, Dict
import sys
# Global debug configuration
DEBUG_ENABLED = False # Set to True to enable debug output
LOG_PERFORMANCE = True # Log function timing
VALIDATION_ASSERTIONS = True # Enable runtime validation assertions
def enable_debug(enable: bool = True) -> None:
"""Enable or disable debug mode globally."""
global DEBUG_ENABLED
DEBUG_ENABLED = enable
def enable_performance_logging(enable: bool = True) -> None:
"""Enable or disable performance timing."""
global LOG_PERFORMANCE
LOG_PERFORMANCE = enable
def enable_assertions(enable: bool = True) -> None:
"""Enable or disable validation assertions."""
global VALIDATION_ASSERTIONS
VALIDATION_ASSERTIONS = enable
def debug_print(message: str, level: str = "INFO") -> None:
"""Print debug message with timestamp if debugging is enabled."""
if DEBUG_ENABLED:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] [{level}] {message}", file=sys.stderr)
def debug_data(data: bytes, label: str = "Data", max_bytes: int = 32) -> str:
"""Format bytes for debugging."""
if not DEBUG_ENABLED:
return ""
if not data:
return f"{label}: Empty"
if len(data) <= max_bytes:
return f"{label} ({len(data)} bytes): {data.hex()}"
else:
return f"{label} ({len(data)} bytes): {data[:max_bytes//2].hex()}...{data[-max_bytes//2:].hex()}"
def debug_exception(e: Exception, context: str = "") -> None:
"""Log exception with context for debugging."""
if DEBUG_ENABLED:
debug_print(f"Exception in {context}: {type(e).__name__}: {e}", "ERROR")
if DEBUG_ENABLED:
traceback.print_exc()
def time_function(func: Callable) -> Callable:
"""Decorator to time function execution for performance debugging."""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
if not (DEBUG_ENABLED and LOG_PERFORMANCE):
return func(*args, **kwargs)
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
end = time.perf_counter()
debug_print(f"{func.__name__} took {end - start:.6f}s", "PERF")
return wrapper
def validate_assertion(condition: bool, message: str) -> None:
"""Runtime validation that can be disabled in production."""
if VALIDATION_ASSERTIONS and not condition:
raise AssertionError(f"Validation failed: {message}")
def memory_usage() -> Dict[str, float]:
"""Get current memory usage (if psutil is available)."""
try:
import psutil
import os
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return {
'rss_mb': mem_info.rss / 1024 / 1024, # Resident Set Size
'vms_mb': mem_info.vms / 1024 / 1024, # Virtual Memory Size
'percent': process.memory_percent(),
}
except ImportError:
return {'error': 'psutil not installed'}
def hexdump(data: bytes, offset: int = 0, length: int = 64) -> str:
"""Create hexdump string for debugging binary data."""
if not data:
return "Empty"
result = []
data_to_dump = data[:length]
for i in range(0, len(data_to_dump), 16):
chunk = data_to_dump[i:i+16]
hex_str = ' '.join(f'{b:02x}' for b in chunk)
hex_str = hex_str.ljust(47) # Pad to consistent width
ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
result.append(f"{offset + i:08x}: {hex_str} {ascii_str}")
if len(data) > length:
result.append(f"... ({len(data) - length} more bytes)")
return '\n'.join(result)
# Create singleton instance for easy import
class Debug:
"""Debugging utility class."""
def __init__(self):
self.enabled = DEBUG_ENABLED
def print(self, message: str, level: str = "INFO") -> None:
"""Print debug message."""
debug_print(message, level)
def data(self, data: bytes, label: str = "Data", max_bytes: int = 32) -> str:
"""Format bytes for debugging."""
return debug_data(data, label, max_bytes)
def exception(self, e: Exception, context: str = "") -> None:
"""Log exception with context."""
debug_exception(e, context)
def time(self, func: Callable) -> Callable:
"""Decorator to time function execution."""
return time_function(func)
def validate(self, condition: bool, message: str) -> None:
"""Runtime validation assertion."""
validate_assertion(condition, message)
def memory(self) -> Dict[str, float]:
"""Get current memory usage."""
return memory_usage()
def hexdump(self, data: bytes, offset: int = 0, length: int = 64) -> str:
"""Create hexdump string."""
return hexdump(data, offset, length)
def enable(self, enable: bool = True) -> None:
"""Enable or disable debug mode."""
enable_debug(enable)
self.enabled = enable
def enable_performance(self, enable: bool = True) -> None:
"""Enable or disable performance logging."""
enable_performance_logging(enable)
def enable_assertions(self, enable: bool = True) -> None:
"""Enable or disable validation assertions."""
enable_assertions(enable)
# Create singleton instance
debug = Debug()

View File

@@ -5,7 +5,7 @@ Generate PINs, passphrases, and RSA keys.
"""
import secrets
from typing import Optional
from typing import Optional, Dict
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
@@ -21,6 +21,7 @@ from .constants import (
)
from .models import Credentials, KeyInfo
from .exceptions import KeyGenerationError, KeyPasswordError
from .debug import debug
def generate_pin(length: int = DEFAULT_PIN_LENGTH) -> str:
@@ -34,7 +35,14 @@ def generate_pin(length: int = DEFAULT_PIN_LENGTH) -> str:
Returns:
PIN string
Example:
>>> generate_pin(6)
"812345"
"""
debug.validate(length >= MIN_PIN_LENGTH and length <= MAX_PIN_LENGTH,
f"PIN length must be between {MIN_PIN_LENGTH} and {MAX_PIN_LENGTH}")
length = max(MIN_PIN_LENGTH, min(MAX_PIN_LENGTH, length))
# First digit: 1-9 (no leading zero)
@@ -43,7 +51,9 @@ def generate_pin(length: int = DEFAULT_PIN_LENGTH) -> str:
# Remaining digits: 0-9
rest = ''.join(str(secrets.randbelow(10)) for _ in range(length - 1))
return first_digit + rest
pin = first_digit + rest
debug.print(f"Generated PIN: {pin}")
return pin
def generate_phrase(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> str:
@@ -55,15 +65,24 @@ def generate_phrase(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> str:
Returns:
Space-separated phrase
Example:
>>> generate_phrase(3)
"apple forest thunder"
"""
debug.validate(words_per_phrase >= MIN_PHRASE_WORDS and words_per_phrase <= MAX_PHRASE_WORDS,
f"Words per phrase must be between {MIN_PHRASE_WORDS} and {MAX_PHRASE_WORDS}")
words_per_phrase = max(MIN_PHRASE_WORDS, min(MAX_PHRASE_WORDS, words_per_phrase))
wordlist = get_wordlist()
words = [secrets.choice(wordlist) for _ in range(words_per_phrase)]
return ' '.join(words)
phrase = ' '.join(words)
debug.print(f"Generated phrase: {phrase}")
return phrase
def generate_day_phrases(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> dict[str, str]:
def generate_day_phrases(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> Dict[str, str]:
"""
Generate phrases for all days of the week.
@@ -72,8 +91,14 @@ def generate_day_phrases(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> dict[s
Returns:
Dict mapping day names to phrases
Example:
>>> generate_day_phrases(3)
{'Monday': 'apple forest thunder', 'Tuesday': 'banana river lightning', ...}
"""
return {day: generate_phrase(words_per_phrase) for day in DAY_NAMES}
phrases = {day: generate_phrase(words_per_phrase) for day in DAY_NAMES}
debug.print(f"Generated phrases for {len(phrases)} days")
return phrases
def generate_rsa_key(bits: int = DEFAULT_RSA_BITS) -> rsa.RSAPrivateKey:
@@ -88,17 +113,29 @@ def generate_rsa_key(bits: int = DEFAULT_RSA_BITS) -> rsa.RSAPrivateKey:
Raises:
KeyGenerationError: If generation fails
Example:
>>> key = generate_rsa_key(2048)
>>> key.key_size
2048
"""
debug.validate(bits in VALID_RSA_SIZES,
f"RSA key size must be one of {VALID_RSA_SIZES}")
if bits not in VALID_RSA_SIZES:
bits = DEFAULT_RSA_BITS
debug.print(f"Generating {bits}-bit RSA key...")
try:
return rsa.generate_private_key(
key = rsa.generate_private_key(
public_exponent=65537,
key_size=bits,
backend=default_backend()
)
debug.print(f"RSA key generated: {bits} bits")
return key
except Exception as e:
debug.exception(e, "RSA key generation")
raise KeyGenerationError(f"Failed to generate RSA key: {e}") from e
@@ -115,11 +152,21 @@ def export_rsa_key_pem(
Returns:
PEM-encoded key bytes
Example:
>>> key = generate_rsa_key()
>>> pem = export_rsa_key_pem(key)
>>> pem[:50]
b'-----BEGIN PRIVATE KEY-----\\nMIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYw'
"""
debug.validate(private_key is not None, "Private key cannot be None")
if password:
encryption = serialization.BestAvailableEncryption(password.encode())
debug.print("Exporting RSA key with encryption")
else:
encryption = serialization.NoEncryption()
debug.print("Exporting RSA key without encryption")
return private_key.private_bytes(
encoding=serialization.Encoding.PEM,
@@ -145,17 +192,31 @@ def load_rsa_key(
Raises:
KeyPasswordError: If password is wrong or missing
KeyGenerationError: If key is invalid
Example:
>>> key = load_rsa_key(pem_data, "my_password")
"""
debug.validate(key_data is not None and len(key_data) > 0,
"Key data cannot be empty")
try:
pwd_bytes = password.encode() if password else None
return load_pem_private_key(key_data, password=pwd_bytes, backend=default_backend())
debug.print(f"Loading RSA key (encrypted: {bool(password)})")
key = load_pem_private_key(key_data, password=pwd_bytes, backend=default_backend())
debug.print(f"RSA key loaded: {key.key_size} bits")
return key
except TypeError:
debug.print("RSA key is password-protected but no password provided")
raise KeyPasswordError("RSA key is password-protected. Please provide the password.")
except ValueError as e:
if "password" in str(e).lower() or "encrypted" in str(e).lower():
error_msg = str(e).lower()
if "password" in error_msg or "encrypted" in error_msg:
debug.print("Incorrect password for RSA key")
raise KeyPasswordError("Incorrect password for RSA key.")
debug.exception(e, "RSA key loading")
raise KeyGenerationError(f"Invalid RSA key: {e}") from e
except Exception as e:
debug.exception(e, "RSA key loading")
raise KeyGenerationError(f"Could not load RSA key: {e}") from e
@@ -169,17 +230,28 @@ def get_key_info(key_data: bytes, password: Optional[str] = None) -> KeyInfo:
Returns:
KeyInfo with key size and encryption status
Example:
>>> info = get_key_info(pem_data)
>>> info.key_size
2048
>>> info.is_encrypted
False
"""
debug.print("Getting RSA key info")
# Check if encrypted
is_encrypted = b'ENCRYPTED' in key_data
private_key = load_rsa_key(key_data, password)
return KeyInfo(
info = KeyInfo(
key_size=private_key.key_size,
is_encrypted=is_encrypted,
pem_data=key_data
)
debug.print(f"Key info: {info.key_size} bits, encrypted: {info.is_encrypted}")
return info
def generate_credentials(
@@ -206,23 +278,40 @@ def generate_credentials(
Raises:
ValueError: If neither PIN nor RSA is selected
Example:
>>> creds = generate_credentials(use_pin=True, use_rsa=False)
>>> creds.pin
"812345"
>>> creds.phrases['Monday']
"apple forest thunder"
"""
debug.validate(use_pin or use_rsa,
"Must select at least one security factor (PIN or RSA key)")
if not use_pin and not use_rsa:
raise ValueError("Must select at least one security factor (PIN or RSA key)")
debug.print(f"Generating credentials: PIN={use_pin}, RSA={use_rsa}, "
f"words={words_per_phrase}")
phrases = generate_day_phrases(words_per_phrase)
pin = generate_pin(pin_length) if use_pin else None
rsa_key_pem = None
rsa_key_obj = None
if use_rsa:
private_key = generate_rsa_key(rsa_bits)
rsa_key_pem = export_rsa_key_pem(private_key).decode('utf-8')
rsa_key_obj = generate_rsa_key(rsa_bits)
rsa_key_pem = export_rsa_key_pem(rsa_key_obj).decode('utf-8')
return Credentials(
creds = Credentials(
phrases=phrases,
pin=pin,
rsa_key_pem=rsa_key_pem,
rsa_bits=rsa_bits if use_rsa else None,
words_per_phrase=words_per_phrase
)
debug.print(f"Credentials generated: {creds.total_entropy} bits total entropy")
return creds

View File

@@ -6,7 +6,7 @@ LSB embedding and extraction with pseudo-random pixel selection.
import io
import struct
from typing import Optional
from typing import Optional, Tuple, List
from PIL import Image
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
@@ -14,6 +14,7 @@ from cryptography.hazmat.backends import default_backend
from .models import EmbedStats
from .exceptions import CapacityError, ExtractionError, EmbeddingError
from .debug import debug
# Lossless formats that preserve LSB data
@@ -35,7 +36,7 @@ EXT_TO_FORMAT = {
}
def get_output_format(input_format: Optional[str]) -> tuple[str, str]:
def get_output_format(input_format: Optional[str]) -> Tuple[str, str]:
"""
Determine the output format based on input format.
@@ -45,15 +46,29 @@ def get_output_format(input_format: Optional[str]) -> tuple[str, str]:
Returns:
Tuple of (PIL format string, file extension) for output
Falls back to PNG for lossy or unknown formats.
Example:
>>> get_output_format('JPEG')
('PNG', 'png')
>>> get_output_format('PNG')
('PNG', 'png')
"""
debug.validate(input_format is None or isinstance(input_format, str),
"Input format must be string or None")
if input_format and input_format.upper() in LOSSLESS_FORMATS:
fmt = input_format.upper()
return fmt, FORMAT_TO_EXT.get(fmt, 'png')
ext = FORMAT_TO_EXT.get(fmt, 'png')
debug.print(f"Using lossless format: {fmt} -> .{ext}")
return fmt, ext
# Default to PNG for lossy formats (JPEG, GIF) or unknown
debug.print(f"Input format {input_format} is lossy or unknown, defaulting to PNG")
return 'PNG', 'png'
def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> list[int]:
@debug.time
def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List[int]:
"""
Generate pseudo-random pixel indices for embedding.
@@ -67,9 +82,21 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> list
Returns:
List of pixel indices
Note:
Optimizes for both small and large num_needed values.
"""
debug.validate(len(key) == 32, f"Pixel key must be 32 bytes, got {len(key)}")
debug.validate(num_pixels > 0, f"Number of pixels must be positive, got {num_pixels}")
debug.validate(num_needed > 0, f"Number needed must be positive, got {num_needed}")
debug.validate(num_needed <= num_pixels,
f"Cannot select {num_needed} pixels from {num_pixels} available")
debug.print(f"Generating {num_needed} pixel indices from {num_pixels} total pixels")
if num_needed >= num_pixels // 2:
# If we need many pixels, shuffle all indices
debug.print(f"Using full shuffle (needed {num_needed}/{num_pixels} pixels)")
nonce = b'\x00' * 16
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
encryptor = cipher.encryptor()
@@ -77,14 +104,18 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> list
indices = list(range(num_pixels))
random_bytes = encryptor.update(b'\x00' * (num_pixels * 4))
# Fisher-Yates shuffle using CSPRNG
for i in range(num_pixels - 1, 0, -1):
j_bytes = random_bytes[(num_pixels - 1 - i) * 4:(num_pixels - i) * 4]
j = int.from_bytes(j_bytes, 'big') % (i + 1)
indices[i], indices[j] = indices[j], indices[i]
return indices[:num_needed]
selected = indices[:num_needed]
debug.print(f"Generated {len(selected)} indices via shuffle")
return selected
# Optimized path: generate indices directly
# Optimized path: generate indices directly (for smaller selections)
debug.print(f"Using optimized selection (needed {num_needed}/{num_pixels} pixels)")
selected = []
used = set()
@@ -97,6 +128,7 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> list
random_bytes = encryptor.update(b'\x00' * bytes_needed)
byte_offset = 0
collisions = 0
while len(selected) < num_needed and byte_offset < len(random_bytes) - 4:
idx = int.from_bytes(random_bytes[byte_offset:byte_offset + 4], 'big') % num_pixels
byte_offset += 4
@@ -104,25 +136,36 @@ def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> list
if idx not in used:
used.add(idx)
selected.append(idx)
else:
collisions += 1
# Generate additional if needed (rare)
while len(selected) < num_needed:
extra_bytes = encryptor.update(b'\x00' * 4)
idx = int.from_bytes(extra_bytes, 'big') % num_pixels
if idx not in used:
used.add(idx)
selected.append(idx)
if len(selected) < num_needed:
debug.print(f"Need {num_needed - len(selected)} more indices, generating...")
extra_needed = num_needed - len(selected)
for _ in range(extra_needed * 2): # Try twice as many to account for collisions
extra_bytes = encryptor.update(b'\x00' * 4)
idx = int.from_bytes(extra_bytes, 'big') % num_pixels
if idx not in used:
used.add(idx)
selected.append(idx)
if len(selected) == num_needed:
break
debug.print(f"Generated {len(selected)} indices with {collisions} collisions")
debug.validate(len(selected) == num_needed,
f"Failed to generate enough indices: {len(selected)}/{num_needed}")
return selected
@debug.time
def embed_in_image(
carrier_data: bytes,
encrypted_data: bytes,
pixel_key: bytes,
bits_per_channel: int = 1,
output_format: Optional[str] = None
) -> tuple[bytes, EmbedStats, str]:
) -> Tuple[bytes, EmbedStats, str]:
"""
Embed encrypted data in carrier image using LSB steganography.
@@ -143,12 +186,27 @@ def embed_in_image(
Raises:
CapacityError: If carrier is too small
EmbeddingError: If embedding fails
Example:
>>> stego_bytes, stats, ext = embed_in_image(carrier, encrypted, key)
>>> stats.pixels_modified
1500
"""
debug.print(f"Embedding {len(encrypted_data)} bytes into image")
debug.data(pixel_key, "Pixel key for embedding")
debug.validate(bits_per_channel in (1, 2),
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
debug.validate(len(pixel_key) == 32,
f"Pixel key must be 32 bytes, got {len(pixel_key)}")
try:
img = Image.open(io.BytesIO(carrier_data))
input_format = img.format
debug.print(f"Carrier image: {img.size[0]}x{img.size[1]}, format: {input_format}")
if img.mode != 'RGB':
debug.print(f"Converting image from {img.mode} to RGB")
img = img.convert('RGB')
pixels = list(img.getdata())
@@ -157,16 +215,24 @@ def embed_in_image(
bits_per_pixel = 3 * bits_per_channel
max_bytes = (num_pixels * bits_per_pixel) // 8
debug.print(f"Image capacity: {max_bytes} bytes at {bits_per_channel} bit(s)/channel")
# Prepend length
data_with_len = struct.pack('>I', len(encrypted_data)) + encrypted_data
if len(data_with_len) > max_bytes:
debug.print(f"Capacity error: need {len(data_with_len)}, have {max_bytes}")
raise CapacityError(len(data_with_len), max_bytes)
debug.print(f"Total data to embed: {len(data_with_len)} bytes "
f"({len(data_with_len)/max_bytes*100:.1f}% of capacity)")
# Convert to binary string
binary_data = ''.join(format(b, '08b') for b in data_with_len)
pixels_needed = (len(binary_data) + bits_per_pixel - 1) // bits_per_pixel
debug.print(f"Need {pixels_needed} pixels to embed {len(binary_data)} bits")
# Get pixel indices
selected_indices = generate_pixel_indices(pixel_key, num_pixels, pixels_needed)
@@ -175,11 +241,14 @@ def embed_in_image(
clear_mask = 0xFF ^ ((1 << bits_per_channel) - 1)
bit_idx = 0
modified_pixels = 0
for pixel_idx in selected_indices:
if bit_idx >= len(binary_data):
break
r, g, b = new_pixels[pixel_idx]
modified = False
for channel_idx, channel_val in enumerate([r, g, b]):
if bit_idx >= len(binary_data):
@@ -187,16 +256,22 @@ def embed_in_image(
bits = binary_data[bit_idx:bit_idx + bits_per_channel].ljust(bits_per_channel, '0')
new_val = (channel_val & clear_mask) | int(bits, 2)
if channel_idx == 0:
r = new_val
elif channel_idx == 1:
g = new_val
else:
b = new_val
if channel_val != new_val:
modified = True
if channel_idx == 0:
r = new_val
elif channel_idx == 1:
g = new_val
else:
b = new_val
bit_idx += bits_per_channel
new_pixels[pixel_idx] = (r, g, b)
if modified:
new_pixels[pixel_idx] = (r, g, b)
modified_pixels += 1
debug.print(f"Modified {modified_pixels} pixels (out of {len(selected_indices)} selected)")
# Create output image
stego_img = Image.new('RGB', img.size)
@@ -206,28 +281,33 @@ def embed_in_image(
if output_format:
out_fmt = output_format.upper()
out_ext = FORMAT_TO_EXT.get(out_fmt, 'png')
debug.print(f"Using forced output format: {out_fmt}")
else:
out_fmt, out_ext = get_output_format(input_format)
debug.print(f"Auto-selected output format: {out_fmt}")
output = io.BytesIO()
stego_img.save(output, out_fmt)
output.seek(0)
stats = EmbedStats(
pixels_modified=len(selected_indices),
pixels_modified=modified_pixels,
total_pixels=num_pixels,
capacity_used=len(data_with_len) / max_bytes,
bytes_embedded=len(data_with_len)
)
debug.print(f"Embedding complete: {out_fmt} image, {len(output.getvalue())} bytes")
return output.getvalue(), stats, out_ext
except CapacityError:
raise
except Exception as e:
debug.exception(e, "embed_in_image")
raise EmbeddingError(f"Failed to embed data: {e}") from e
@debug.time
def extract_from_image(
image_data: bytes,
pixel_key: bytes,
@@ -246,18 +326,35 @@ def extract_from_image(
Raises:
ExtractionError: If extraction fails critically
Example:
>>> extracted = extract_from_image(stego_bytes, key)
>>> len(extracted)
1024
"""
debug.print(f"Extracting from {len(image_data)} byte image")
debug.data(pixel_key, "Pixel key for extraction")
debug.validate(bits_per_channel in (1, 2),
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
try:
img = Image.open(io.BytesIO(image_data))
debug.print(f"Image: {img.size[0]}x{img.size[1]}, format: {img.format}")
if img.mode != 'RGB':
debug.print(f"Converting image from {img.mode} to RGB")
img = img.convert('RGB')
pixels = list(img.getdata())
num_pixels = len(pixels)
bits_per_pixel = 3 * bits_per_channel
debug.print(f"Image has {num_pixels} pixels, {bits_per_pixel} bits/pixel")
# First, extract enough to get the length (4 bytes = 32 bits)
initial_pixels = (32 + bits_per_pixel - 1) // bits_per_pixel + 10
debug.print(f"Extracting initial {initial_pixels} pixels to find length")
initial_indices = generate_pixel_indices(pixel_key, num_pixels, initial_pixels)
binary_data = ''
@@ -270,19 +367,28 @@ def extract_from_image(
# Parse length
try:
length_bits = binary_data[:32]
if len(length_bits) < 32:
debug.print(f"Not enough bits for length: {len(length_bits)}/32")
return None
data_length = struct.unpack('>I', int(length_bits, 2).to_bytes(4, 'big'))[0]
except Exception:
debug.print(f"Extracted length: {data_length} bytes")
except Exception as e:
debug.print(f"Failed to parse length: {e}")
return None
# Sanity check
max_possible = (num_pixels * bits_per_pixel) // 8 - 4
if data_length > max_possible or data_length < 10:
debug.print(f"Invalid data length: {data_length} (max possible: {max_possible})")
return None
# Extract full data
total_bits = (4 + data_length) * 8
pixels_needed = (total_bits + bits_per_pixel - 1) // bits_per_pixel
debug.print(f"Need {pixels_needed} pixels to extract {data_length} bytes")
selected_indices = generate_pixel_indices(pixel_key, num_pixels, pixels_needed)
binary_data = ''
@@ -294,15 +400,21 @@ def extract_from_image(
data_bits = binary_data[32:32 + (data_length * 8)]
if len(data_bits) < data_length * 8:
debug.print(f"Insufficient bits: {len(data_bits)} < {data_length * 8}")
return None
data_bytes = bytearray()
for i in range(0, len(data_bits), 8):
byte_bits = data_bits[i:i + 8]
if len(byte_bits) == 8:
data_bytes.append(int(byte_bits, 2))
debug.print(f"Successfully extracted {len(data_bytes)} bytes")
return bytes(data_bytes)
except Exception as e:
debug.exception(e, "extract_from_image")
raise ExtractionError(f"Failed to extract data: {e}") from e
@@ -316,7 +428,15 @@ def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int:
Returns:
Maximum bytes that can be embedded (minus overhead)
Example:
>>> capacity = calculate_capacity(image_bytes)
>>> capacity
12000
"""
debug.validate(bits_per_channel in (1, 2),
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
img = Image.open(io.BytesIO(image_data))
if img.mode != 'RGB':
img = img.convert('RGB')
@@ -326,25 +446,74 @@ def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int:
max_bytes = (num_pixels * bits_per_pixel) // 8
# Subtract overhead: 4 bytes length + ~100 bytes header
return max(0, max_bytes - 104)
capacity = max(0, max_bytes - 104)
debug.print(f"Image capacity: {capacity} bytes at {bits_per_channel} bit(s)/channel")
return capacity
def get_image_dimensions(image_data: bytes) -> tuple[int, int]:
"""Get image dimensions without loading full image."""
def get_image_dimensions(image_data: bytes) -> Tuple[int, int]:
"""
Get image dimensions without loading full image.
Args:
image_data: Image bytes
Returns:
Tuple of (width, height)
Example:
>>> width, height = get_image_dimensions(image_bytes)
>>> width, height
(800, 600)
"""
debug.validate(len(image_data) > 0, "Image data cannot be empty")
img = Image.open(io.BytesIO(image_data))
return img.size
dimensions = img.size
debug.print(f"Image dimensions: {dimensions[0]}x{dimensions[1]}")
return dimensions
def get_image_format(image_data: bytes) -> Optional[str]:
"""Get image format (PIL format string like 'PNG', 'JPEG')."""
"""
Get image format (PIL format string like 'PNG', 'JPEG').
Args:
image_data: Image bytes
Returns:
Format string or None if invalid
Example:
>>> format = get_image_format(image_bytes)
>>> format
'PNG'
"""
try:
img = Image.open(io.BytesIO(image_data))
return img.format
except Exception:
format_str = img.format
debug.print(f"Image format: {format_str}")
return format_str
except Exception as e:
debug.print(f"Failed to get image format: {e}")
return None
def is_lossless_format(image_data: bytes) -> bool:
"""Check if image is in a lossless format suitable for steganography."""
"""
Check if image is in a lossless format suitable for steganography.
Args:
image_data: Image bytes
Returns:
True if format is lossless (PNG, BMP, TIFF)
Example:
>>> is_lossless_format(image_bytes)
True
"""
fmt = get_image_format(image_data)
return fmt is not None and fmt.upper() in LOSSLESS_FORMATS
is_lossless = fmt is not None and fmt.upper() in LOSSLESS_FORMATS
debug.print(f"Image is lossless: {is_lossless} (format: {fmt})")
return is_lossless

View File

@@ -10,9 +10,10 @@ import secrets
import shutil
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from typing import Optional, Union
from .constants import DAY_NAMES
from .debug import debug
def generate_filename(
@@ -32,7 +33,14 @@ def generate_filename(
Returns:
Filename string
Example:
>>> generate_filename("2023-12-25", "secret_", "png")
"secret_a1b2c3d4_20231225.png"
"""
debug.validate(extension and '.' not in extension,
f"Extension must not contain dot, got '{extension}'")
if date_str is None:
date_str = date.today().isoformat()
@@ -42,7 +50,9 @@ def generate_filename(
# Ensure extension doesn't have a leading dot
extension = extension.lstrip('.')
return f"{prefix}{random_hex}_{date_compact}.{extension}"
filename = f"{prefix}{random_hex}_{date_compact}.{extension}"
debug.print(f"Generated filename: {filename}")
return filename
def parse_date_from_filename(filename: str) -> Optional[str]:
@@ -56,6 +66,10 @@ def parse_date_from_filename(filename: str) -> Optional[str]:
Returns:
Date string (YYYY-MM-DD) or None
Example:
>>> parse_date_from_filename("secret_a1b2c3d4_20231225.png")
"2023-12-25"
"""
import re
@@ -63,14 +77,19 @@ def parse_date_from_filename(filename: str) -> Optional[str]:
match = re.search(r'_(\d{4})(\d{2})(\d{2})(?:\.|$)', filename)
if match:
year, month, day = match.groups()
return f"{year}-{month}-{day}"
date_str = f"{year}-{month}-{day}"
debug.print(f"Parsed date (compact): {date_str}")
return date_str
# Try YYYY-MM-DD format
match = re.search(r'_(\d{4})-(\d{2})-(\d{2})(?:\.|$)', filename)
if match:
year, month, day = match.groups()
return f"{year}-{month}-{day}"
date_str = f"{year}-{month}-{day}"
debug.print(f"Parsed date (dashed): {date_str}")
return date_str
debug.print(f"No date found in filename: {filename}")
return None
@@ -83,23 +102,55 @@ def get_day_from_date(date_str: str) -> str:
Returns:
Day name (e.g., "Monday")
Example:
>>> get_day_from_date("2023-12-25")
"Monday"
"""
debug.validate(len(date_str) == 10 and date_str[4] == '-' and date_str[7] == '-',
f"Invalid date format: {date_str}, expected YYYY-MM-DD")
try:
year, month, day = map(int, date_str.split('-'))
d = date(year, month, day)
return DAY_NAMES[d.weekday()]
except Exception:
day_name = DAY_NAMES[d.weekday()]
debug.print(f"Date {date_str} is {day_name}")
return day_name
except Exception as e:
debug.exception(e, f"get_day_from_date for {date_str}")
return ""
def get_today_date() -> str:
"""Get today's date as YYYY-MM-DD."""
return date.today().isoformat()
"""
Get today's date as YYYY-MM-DD.
Returns:
Today's date string
Example:
>>> get_today_date()
"2023-12-25"
"""
today = date.today().isoformat()
debug.print(f"Today's date: {today}")
return today
def get_today_day() -> str:
"""Get today's day name."""
return DAY_NAMES[date.today().weekday()]
"""
Get today's day name.
Returns:
Today's day name
Example:
>>> get_today_day()
"Monday"
"""
today_day = DAY_NAMES[date.today().weekday()]
debug.print(f"Today is {today_day}")
return today_day
class SecureDeleter:
@@ -107,9 +158,13 @@ class SecureDeleter:
Securely delete files by overwriting with random data.
Implements multi-pass overwriting before deletion.
Example:
>>> deleter = SecureDeleter("secret.txt", passes=3)
>>> deleter.execute()
"""
def __init__(self, path: str | Path, passes: int = 7):
def __init__(self, path: Union[str, Path], passes: int = 7):
"""
Initialize secure deleter.
@@ -117,66 +172,99 @@ class SecureDeleter:
path: Path to file or directory
passes: Number of overwrite passes
"""
debug.validate(passes > 0, f"Passes must be positive, got {passes}")
self.path = Path(path)
self.passes = passes
debug.print(f"SecureDeleter initialized for {self.path} with {passes} passes")
def _overwrite_file(self, file_path: Path) -> None:
"""Overwrite file with random data multiple times."""
if not file_path.exists() or not file_path.is_file():
debug.print(f"File does not exist or is not a file: {file_path}")
return
length = file_path.stat().st_size
debug.print(f"Overwriting file {file_path} ({length} bytes)")
if length == 0:
debug.print("File is empty, nothing to overwrite")
return
patterns = [b'\x00', b'\xFF', bytes([random.randint(0, 255)])]
for _ in range(self.passes):
for pass_num in range(self.passes):
debug.print(f"Overwrite pass {pass_num + 1}/{self.passes}")
with open(file_path, 'r+b') as f:
for pattern in patterns:
for pattern_idx, pattern in enumerate(patterns):
f.seek(0)
for _ in range(length):
f.write(pattern)
# Write pattern in chunks for large files
chunk_size = 1024 * 1024 # 1MB chunks
for offset in range(0, length, chunk_size):
chunk = min(chunk_size, length - offset)
f.write(pattern * (chunk // len(pattern)))
f.write(pattern[:chunk % len(pattern)])
# Final pass with random data
f.seek(0)
f.write(os.urandom(length))
debug.print(f"Completed {self.passes} overwrite passes")
def delete_file(self) -> None:
"""Securely delete a single file."""
if self.path.is_file():
debug.print(f"Securely deleting file: {self.path}")
self._overwrite_file(self.path)
self.path.unlink()
debug.print(f"File deleted: {self.path}")
else:
debug.print(f"Not a file: {self.path}")
def delete_directory(self) -> None:
"""Securely delete a directory and all contents."""
if not self.path.is_dir():
debug.print(f"Not a directory: {self.path}")
return
debug.print(f"Securely deleting directory: {self.path}")
# First, securely overwrite all files
file_count = 0
for file_path in self.path.rglob('*'):
if file_path.is_file():
self._overwrite_file(file_path)
file_count += 1
debug.print(f"Overwrote {file_count} files")
# Then remove the directory tree
shutil.rmtree(self.path)
debug.print(f"Directory deleted: {self.path}")
def execute(self) -> None:
"""Securely delete the path (file or directory)."""
debug.print(f"Executing secure deletion: {self.path}")
if self.path.is_file():
self.delete_file()
elif self.path.is_dir():
self.delete_directory()
else:
debug.print(f"Path does not exist: {self.path}")
def secure_delete(path: str | Path, passes: int = 7) -> None:
def secure_delete(path: Union[str, Path], passes: int = 7) -> None:
"""
Convenience function for secure deletion.
Args:
path: Path to file or directory
passes: Number of overwrite passes
Example:
>>> secure_delete("secret.txt", passes=3)
"""
debug.print(f"secure_delete called: {path}, passes={passes}")
SecureDeleter(path, passes).execute()
@@ -189,7 +277,13 @@ def format_file_size(size_bytes: int) -> str:
Returns:
Human-readable string (e.g., "1.5 MB")
Example:
>>> format_file_size(1500000)
"1.5 MB"
"""
debug.validate(size_bytes >= 0, f"File size cannot be negative: {size_bytes}")
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024:
if unit == 'B':
@@ -200,10 +294,38 @@ def format_file_size(size_bytes: int) -> str:
def format_number(n: int) -> str:
"""Format number with commas."""
"""
Format number with commas.
Args:
n: Integer to format
Returns:
Formatted string
Example:
>>> format_number(1234567)
"1,234,567"
"""
debug.validate(isinstance(n, int), f"Input must be integer, got {type(n)}")
return f"{n:,}"
def clamp(value: int, min_val: int, max_val: int) -> int:
"""Clamp value to range."""
"""
Clamp value to range.
Args:
value: Value to clamp
min_val: Minimum allowed value
max_val: Maximum allowed value
Returns:
Clamped value
Example:
>>> clamp(15, 0, 10)
10
"""
debug.validate(min_val <= max_val, f"min_val ({min_val}) must be <= max_val ({max_val})")
return max(min_val, min(max_val, value))