New Version 2 -- prolly doesn't work.

This commit is contained in:
Aaron D. Lee
2025-12-27 22:40:31 -05:00
parent ee937c832f
commit 8581b86104
55 changed files with 5970 additions and 113 deletions

0
src/__init__.py Normal file
View File

11
src/main.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python3
"""Main entry point."""
def main():
"""Main function."""
print("Hello, World!")
if __name__ == "__main__":
main()

357
src/stegasoo/__init__.py Normal file
View File

@@ -0,0 +1,357 @@
"""
Stegasoo - Secure Steganography Library
A Python library for hiding encrypted messages in images using
hybrid photo + passphrase + PIN authentication.
Basic Usage:
from stegasoo import encode, decode, generate_credentials
# Generate credentials
creds = generate_credentials(use_pin=True, use_rsa=False)
print(creds.phrases['Monday'])
print(creds.pin)
# Encode a message
with open('secret.jpg', 'rb') as f:
ref_photo = f.read()
with open('meme.png', 'rb') as f:
carrier = f.read()
result = encode(
message="Meet at midnight",
reference_photo=ref_photo,
carrier_image=carrier,
day_phrase="apple forest thunder",
pin="123456"
)
with open('stego.png', 'wb') as f:
f.write(result.stego_image)
# Decode a message
message = decode(
stego_image=result.stego_image,
reference_photo=ref_photo,
day_phrase="apple forest thunder",
pin="123456"
)
print(message) # "Meet at midnight"
"""
from .constants import __version__, DAY_NAMES
from .models import (
Credentials,
EncodeInput,
EncodeResult,
DecodeInput,
DecodeResult,
EmbedStats,
KeyInfo,
ValidationResult,
)
from .exceptions import (
StegasooError,
ValidationError,
PinValidationError,
MessageValidationError,
ImageValidationError,
KeyValidationError,
SecurityFactorError,
CryptoError,
EncryptionError,
DecryptionError,
KeyDerivationError,
KeyGenerationError,
KeyPasswordError,
SteganographyError,
CapacityError,
ExtractionError,
EmbeddingError,
InvalidHeaderError,
)
from .keygen import (
generate_credentials,
generate_pin,
generate_phrase,
generate_day_phrases,
generate_rsa_key,
export_rsa_key_pem,
load_rsa_key,
get_key_info,
)
from .validation import (
validate_pin,
validate_message,
validate_image,
validate_rsa_key,
validate_security_factors,
validate_phrase,
validate_date_string,
require_valid_pin,
require_valid_message,
require_valid_image,
require_valid_rsa_key,
require_security_factors,
)
from .crypto import (
encrypt_message,
decrypt_message,
derive_hybrid_key,
derive_pixel_key,
hash_photo,
parse_header,
get_date_from_encrypted,
has_argon2,
)
from .steganography import (
embed_in_image,
extract_from_image,
calculate_capacity,
get_image_dimensions,
)
from .utils import (
generate_filename,
parse_date_from_filename,
get_day_from_date,
get_today_date,
get_today_day,
secure_delete,
SecureDeleter,
format_file_size,
)
from datetime import date
from typing import Optional
def encode(
message: str,
reference_photo: bytes,
carrier_image: bytes,
day_phrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
date_str: Optional[str] = None,
) -> EncodeResult:
"""
Encode a secret message into an image.
High-level convenience function that handles validation,
encryption, and embedding in one call.
Args:
message: Secret message to hide
reference_photo: Shared reference photo bytes
carrier_image: Image to hide message in
day_phrase: Today's passphrase
pin: Static PIN (optional if using RSA key)
rsa_key_data: RSA private key PEM bytes (optional if using PIN)
rsa_password: Password for RSA key if encrypted
date_str: Date string YYYY-MM-DD (defaults to today)
Returns:
EncodeResult with stego image and metadata
Raises:
ValidationError: If inputs are invalid
SecurityFactorError: If no PIN or RSA key provided
CapacityError: If carrier is too small
EncryptionError: If encryption fails
"""
# Validate inputs
require_valid_message(message)
require_valid_image(carrier_image, "Carrier image")
require_security_factors(pin, rsa_key_data)
if pin:
require_valid_pin(pin)
if rsa_key_data:
require_valid_rsa_key(rsa_key_data, rsa_password)
# Default date to today
if date_str is None:
date_str = date.today().isoformat()
# Encrypt message
encrypted = encrypt_message(
message, reference_photo, day_phrase, date_str, pin, rsa_key_data
)
# Get pixel key
pixel_key = derive_pixel_key(
reference_photo, day_phrase, date_str, pin, rsa_key_data
)
# Embed in image
stego_data, stats = embed_in_image(carrier_image, encrypted, pixel_key)
# Generate filename
filename = generate_filename(date_str)
return EncodeResult(
stego_image=stego_data,
filename=filename,
pixels_modified=stats.pixels_modified,
total_pixels=stats.total_pixels,
capacity_used=stats.capacity_used,
date_used=date_str
)
def decode(
stego_image: bytes,
reference_photo: bytes,
day_phrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
) -> str:
"""
Decode a secret message from a stego image.
High-level convenience function that handles extraction
and decryption in one call.
Args:
stego_image: Image containing hidden message
reference_photo: Shared reference photo bytes
day_phrase: Passphrase for the day message was encoded
pin: Static PIN (if used during encoding)
rsa_key_data: RSA private key PEM bytes (if used during encoding)
rsa_password: Password for RSA key if encrypted
Returns:
Decrypted message string
Raises:
ValidationError: If inputs are invalid
SecurityFactorError: If no PIN or RSA key provided
ExtractionError: If data cannot be extracted
DecryptionError: If decryption fails
"""
# Validate inputs
require_security_factors(pin, rsa_key_data)
if pin:
require_valid_pin(pin)
if rsa_key_data:
require_valid_rsa_key(rsa_key_data, rsa_password)
# Try to extract with today's date first
date_str = date.today().isoformat()
pixel_key = derive_pixel_key(
reference_photo, day_phrase, date_str, pin, rsa_key_data
)
encrypted = extract_from_image(stego_image, pixel_key)
# If we got data, check if it's from a different date
if encrypted:
header = parse_header(encrypted)
if header and header['date'] != date_str:
# Re-extract with correct date
pixel_key = derive_pixel_key(
reference_photo, day_phrase, header['date'], pin, rsa_key_data
)
encrypted = extract_from_image(stego_image, pixel_key)
if not encrypted:
raise ExtractionError("Could not extract data. Check your inputs.")
# Decrypt
return decrypt_message(encrypted, reference_photo, day_phrase, pin, rsa_key_data)
__all__ = [
# Version
'__version__',
# High-level API
'encode',
'decode',
'generate_credentials',
# Constants
'DAY_NAMES',
# Models
'Credentials',
'EncodeInput',
'EncodeResult',
'DecodeInput',
'DecodeResult',
'EmbedStats',
'KeyInfo',
'ValidationResult',
# Exceptions
'StegasooError',
'ValidationError',
'PinValidationError',
'MessageValidationError',
'ImageValidationError',
'KeyValidationError',
'SecurityFactorError',
'CryptoError',
'EncryptionError',
'DecryptionError',
'KeyDerivationError',
'KeyGenerationError',
'KeyPasswordError',
'SteganographyError',
'CapacityError',
'ExtractionError',
'EmbeddingError',
'InvalidHeaderError',
# Key generation
'generate_pin',
'generate_phrase',
'generate_day_phrases',
'generate_rsa_key',
'export_rsa_key_pem',
'load_rsa_key',
'get_key_info',
# Validation
'validate_pin',
'validate_message',
'validate_image',
'validate_rsa_key',
'validate_security_factors',
'validate_phrase',
'validate_date_string',
'require_valid_pin',
'require_valid_message',
'require_valid_image',
'require_valid_rsa_key',
'require_security_factors',
# Crypto
'encrypt_message',
'decrypt_message',
'derive_hybrid_key',
'derive_pixel_key',
'hash_photo',
'parse_header',
'get_date_from_encrypted',
'has_argon2',
# Steganography
'embed_in_image',
'extract_from_image',
'calculate_capacity',
'get_image_dimensions',
# Utilities
'generate_filename',
'parse_date_from_filename',
'get_day_from_date',
'get_today_date',
'get_today_day',
'secure_delete',
'SecureDeleter',
'format_file_size',
]

66
src/stegasoo/cli.py Normal file
View File

@@ -0,0 +1,66 @@
"""
Stegasoo CLI - Command-line interface for steganography operations.
This is the package entry point. For full CLI, install with: pip install stegasoo[cli]
"""
def main():
"""Main entry point for the CLI."""
try:
import click
except ImportError:
print("CLI requires click. Install with: pip install stegasoo[cli]")
return 1
# Import the CLI from frontends
import sys
from pathlib import Path
# Add frontends to path for development
root = Path(__file__).parent.parent.parent
cli_path = root / 'frontends' / 'cli'
if cli_path.exists():
sys.path.insert(0, str(cli_path))
try:
from main import cli
cli()
except ImportError:
# Minimal fallback CLI
_minimal_cli()
def _minimal_cli():
"""Minimal CLI when full CLI is not available."""
import sys
from . import __version__, generate_credentials, DAY_NAMES
if len(sys.argv) < 2 or sys.argv[1] in ['-h', '--help']:
print(f"Stegasoo v{__version__} - Secure Steganography")
print()
print("Usage: stegasoo <command>")
print()
print("Commands:")
print(" generate Generate credentials")
print(" encode Encode a message (requires full CLI)")
print(" decode Decode a message (requires full CLI)")
print()
print("For full CLI functionality:")
print(" pip install stegasoo[cli]")
return
if sys.argv[1] == 'generate':
creds = generate_credentials(use_pin=True, use_rsa=False)
print("\n=== STEGASOO CREDENTIALS ===\n")
print(f"PIN: {creds.pin}\n")
print("Daily Phrases:")
for day in DAY_NAMES:
print(f" {day:9} | {creds.phrases[day]}")
print(f"\nEntropy: {creds.total_entropy} bits (+ photo)")
else:
print(f"Command '{sys.argv[1]}' requires full CLI.")
print("Install with: pip install stegasoo[cli]")
if __name__ == '__main__':
main()

119
src/stegasoo/constants.py Normal file
View File

@@ -0,0 +1,119 @@
"""
Stegasoo Constants and Configuration
Central location for all magic numbers, limits, and crypto parameters.
"""
import os
from pathlib import Path
# ============================================================================
# VERSION
# ============================================================================
__version__ = "2.0.0"
# ============================================================================
# FILE FORMAT
# ============================================================================
MAGIC_HEADER = b'\x89ST3'
FORMAT_VERSION = 3
# ============================================================================
# CRYPTO PARAMETERS
# ============================================================================
SALT_SIZE = 32
IV_SIZE = 12
TAG_SIZE = 16
# Argon2 parameters (memory-hard KDF)
ARGON2_TIME_COST = 4
ARGON2_MEMORY_COST = 256 * 1024 # 256 MB
ARGON2_PARALLELISM = 4
# PBKDF2 fallback parameters
PBKDF2_ITERATIONS = 600000
# ============================================================================
# INPUT LIMITS
# ============================================================================
MAX_IMAGE_PIXELS = 4_000_000 # ~4 megapixels (2000x2000)
MAX_MESSAGE_SIZE = 50_000 # 50 KB
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB
MIN_PIN_LENGTH = 6
MAX_PIN_LENGTH = 9
DEFAULT_PIN_LENGTH = 6
MIN_PHRASE_WORDS = 3
MAX_PHRASE_WORDS = 12
DEFAULT_PHRASE_WORDS = 3
MIN_RSA_BITS = 2048
VALID_RSA_SIZES = (2048, 3072, 4096)
DEFAULT_RSA_BITS = 2048
MIN_KEY_PASSWORD_LENGTH = 8
# ============================================================================
# FILE TYPES
# ============================================================================
ALLOWED_IMAGE_EXTENSIONS = {'png', 'jpg', 'jpeg', 'bmp', 'gif'}
ALLOWED_KEY_EXTENSIONS = {'pem', 'key'}
# ============================================================================
# DAYS
# ============================================================================
DAY_NAMES = ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')
# ============================================================================
# DATA FILES
# ============================================================================
def get_data_dir() -> Path:
"""Get the data directory path."""
# Check multiple locations
candidates = [
Path(__file__).parent.parent.parent.parent / 'data', # Development
Path(__file__).parent / 'data', # Installed package
Path('/app/data'), # Docker
Path.cwd() / 'data', # Current directory
]
for path in candidates:
if path.exists():
return path
# Default to first candidate
return candidates[0]
def get_bip39_words() -> list[str]:
"""Load BIP-39 wordlist."""
wordlist_path = get_data_dir() / 'bip39-words.txt'
if not wordlist_path.exists():
raise FileNotFoundError(
f"BIP-39 wordlist not found at {wordlist_path}. "
"Please ensure bip39-words.txt is in the data directory."
)
with open(wordlist_path, 'r') as f:
return [line.strip() for line in f if line.strip()]
# Lazy-loaded wordlist
_bip39_words: list[str] | None = None
def get_wordlist() -> list[str]:
"""Get the BIP-39 wordlist (cached)."""
global _bip39_words
if _bip39_words is None:
_bip39_words = get_bip39_words()
return _bip39_words

358
src/stegasoo/crypto.py Normal file
View File

@@ -0,0 +1,358 @@
"""
Stegasoo Cryptographic Functions
Key derivation, encryption, and decryption using AES-256-GCM.
"""
import io
import hashlib
import secrets
import struct
from typing import Optional
from PIL import Image
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from .constants import (
MAGIC_HEADER, FORMAT_VERSION,
SALT_SIZE, IV_SIZE, TAG_SIZE,
ARGON2_TIME_COST, ARGON2_MEMORY_COST, ARGON2_PARALLELISM,
PBKDF2_ITERATIONS,
)
from .exceptions import (
EncryptionError, DecryptionError, KeyDerivationError, InvalidHeaderError
)
# Check for Argon2 availability
try:
from argon2.low_level import hash_secret_raw, Type
HAS_ARGON2 = True
except ImportError:
HAS_ARGON2 = False
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
def hash_photo(image_data: bytes) -> bytes:
"""
Compute deterministic hash of photo pixel content.
This normalizes the image to RGB and hashes the raw pixel data,
making it resistant to metadata changes.
Args:
image_data: Raw image file bytes
Returns:
32-byte SHA-256 hash
"""
img = Image.open(io.BytesIO(image_data))
img = img.convert('RGB')
pixels = img.tobytes()
# Double-hash with prefix for additional mixing
h = hashlib.sha256(pixels).digest()
h = hashlib.sha256(h + pixels[:1024]).digest()
return h
def derive_hybrid_key(
photo_data: bytes,
day_phrase: str,
date_str: str,
salt: bytes,
pin: str = "",
rsa_key_data: Optional[bytes] = None
) -> bytes:
"""
Derive encryption key from multiple factors.
Combines:
- Photo hash (something you have)
- Day phrase (something you know, rotates daily)
- PIN (something you know, static)
- RSA key (something you have)
- Date (automatic rotation)
- Salt (random per message)
Uses Argon2id if available, falls back to PBKDF2.
Args:
photo_data: Reference photo bytes
day_phrase: The day's phrase
date_str: Date string (YYYY-MM-DD)
salt: Random salt for this message
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
Returns:
32-byte derived key
Raises:
KeyDerivationError: If key derivation fails
"""
try:
photo_hash = hash_photo(photo_data)
key_material = (
photo_hash +
day_phrase.lower().encode() +
pin.encode() +
date_str.encode() +
salt
)
# Add RSA key hash if provided
if rsa_key_data:
key_material += hashlib.sha256(rsa_key_data).digest()
if HAS_ARGON2:
key = hash_secret_raw(
secret=key_material,
salt=salt[:32],
time_cost=ARGON2_TIME_COST,
memory_cost=ARGON2_MEMORY_COST,
parallelism=ARGON2_PARALLELISM,
hash_len=32,
type=Type.ID
)
else:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=salt,
iterations=PBKDF2_ITERATIONS,
backend=default_backend()
)
key = kdf.derive(key_material)
return key
except Exception as e:
raise KeyDerivationError(f"Failed to derive key: {e}") from e
def derive_pixel_key(
photo_data: bytes,
day_phrase: str,
date_str: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None
) -> bytes:
"""
Derive key for pseudo-random pixel selection.
This key determines which pixels are used for embedding,
making the message location unpredictable without the correct inputs.
Args:
photo_data: Reference photo bytes
day_phrase: The day's phrase
date_str: Date string (YYYY-MM-DD)
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
Returns:
32-byte key for pixel selection
"""
photo_hash = hash_photo(photo_data)
material = (
photo_hash +
day_phrase.lower().encode() +
pin.encode() +
date_str.encode()
)
if rsa_key_data:
material += hashlib.sha256(rsa_key_data).digest()
return hashlib.sha256(material + b"pixel_selection").digest()
def encrypt_message(
message: str | bytes,
photo_data: bytes,
day_phrase: str,
date_str: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None
) -> bytes:
"""
Encrypt message using AES-256-GCM with hybrid key derivation.
Message format:
- Magic header (4 bytes)
- Version (1 byte)
- Date length (1 byte)
- Date string (variable)
- Salt (32 bytes)
- IV (12 bytes)
- Auth tag (16 bytes)
- Ciphertext (variable, padded)
Args:
message: Message to encrypt
photo_data: Reference photo bytes
day_phrase: The day's phrase
date_str: Date string (YYYY-MM-DD)
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
Returns:
Encrypted message bytes
Raises:
EncryptionError: If encryption fails
"""
try:
salt = secrets.token_bytes(SALT_SIZE)
key = derive_hybrid_key(photo_data, day_phrase, date_str, salt, pin, rsa_key_data)
iv = secrets.token_bytes(IV_SIZE)
if isinstance(message, str):
message = message.encode('utf-8')
# Random padding to hide message length
padding_len = secrets.randbelow(256) + 64
padded_len = ((len(message) + padding_len + 255) // 256) * 256
padding_needed = padded_len - len(message)
padding = secrets.token_bytes(padding_needed - 4) + struct.pack('>I', len(message))
padded_message = message + padding
# Encrypt with AES-256-GCM
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
encryptor.authenticate_additional_data(MAGIC_HEADER + bytes([FORMAT_VERSION]))
ciphertext = encryptor.update(padded_message) + encryptor.finalize()
date_bytes = date_str.encode()
return (
MAGIC_HEADER +
bytes([FORMAT_VERSION]) +
bytes([len(date_bytes)]) +
date_bytes +
salt +
iv +
encryptor.tag +
ciphertext
)
except Exception as e:
raise EncryptionError(f"Encryption failed: {e}") from e
def parse_header(encrypted_data: bytes) -> Optional[dict]:
"""
Parse the header from encrypted data.
Args:
encrypted_data: Raw encrypted bytes
Returns:
Dict with date, salt, iv, tag, ciphertext or None if invalid
"""
if len(encrypted_data) < 10 or encrypted_data[:4] != MAGIC_HEADER:
return None
try:
version = encrypted_data[4]
if version != FORMAT_VERSION:
return None
date_len = encrypted_data[5]
date_str = encrypted_data[6:6 + date_len].decode()
offset = 6 + date_len
salt = encrypted_data[offset:offset + SALT_SIZE]
offset += SALT_SIZE
iv = encrypted_data[offset:offset + IV_SIZE]
offset += IV_SIZE
tag = encrypted_data[offset:offset + TAG_SIZE]
offset += TAG_SIZE
ciphertext = encrypted_data[offset:]
return {
'date': date_str,
'salt': salt,
'iv': iv,
'tag': tag,
'ciphertext': ciphertext
}
except Exception:
return None
def decrypt_message(
encrypted_data: bytes,
photo_data: bytes,
day_phrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None
) -> str:
"""
Decrypt message using the embedded date from the header.
Args:
encrypted_data: Encrypted message bytes
photo_data: Reference photo bytes
day_phrase: The day's phrase (must match encoding day)
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
Returns:
Decrypted message string
Raises:
InvalidHeaderError: If data doesn't have valid Stegasoo header
DecryptionError: If decryption fails (wrong credentials)
"""
header = parse_header(encrypted_data)
if not header:
raise InvalidHeaderError("Invalid or missing Stegasoo header")
try:
key = derive_hybrid_key(
photo_data, day_phrase, header['date'], header['salt'], pin, rsa_key_data
)
cipher = Cipher(
algorithms.AES(key),
modes.GCM(header['iv'], header['tag']),
backend=default_backend()
)
decryptor = cipher.decryptor()
decryptor.authenticate_additional_data(MAGIC_HEADER + bytes([FORMAT_VERSION]))
padded_plaintext = decryptor.update(header['ciphertext']) + decryptor.finalize()
original_length = struct.unpack('>I', padded_plaintext[-4:])[0]
return padded_plaintext[:original_length].decode('utf-8')
except Exception as e:
raise DecryptionError(
"Decryption failed. Check your phrase, PIN, RSA key, and reference photo."
) from e
def get_date_from_encrypted(encrypted_data: bytes) -> Optional[str]:
"""
Extract the date string from encrypted data without decrypting.
Useful for determining which day's phrase to use.
Args:
encrypted_data: Encrypted message bytes
Returns:
Date string (YYYY-MM-DD) or None if invalid
"""
header = parse_header(encrypted_data)
return header['date'] if header else None
def has_argon2() -> bool:
"""Check if Argon2 is available."""
return HAS_ARGON2

150
src/stegasoo/exceptions.py Normal file
View File

@@ -0,0 +1,150 @@
"""
Stegasoo Exceptions
Custom exception classes for clear error handling across all frontends.
"""
class StegasooError(Exception):
"""Base exception for all Stegasoo errors."""
pass
# ============================================================================
# VALIDATION ERRORS
# ============================================================================
class ValidationError(StegasooError):
"""Base class for validation errors."""
pass
class PinValidationError(ValidationError):
"""PIN validation failed."""
pass
class MessageValidationError(ValidationError):
"""Message validation failed."""
pass
class ImageValidationError(ValidationError):
"""Image validation failed."""
pass
class KeyValidationError(ValidationError):
"""RSA key validation failed."""
pass
class SecurityFactorError(ValidationError):
"""Security factor requirements not met."""
pass
# ============================================================================
# CRYPTO ERRORS
# ============================================================================
class CryptoError(StegasooError):
"""Base class for cryptographic errors."""
pass
class EncryptionError(CryptoError):
"""Encryption failed."""
pass
class DecryptionError(CryptoError):
"""Decryption failed (wrong key, corrupted data, etc.)."""
pass
class KeyDerivationError(CryptoError):
"""Key derivation failed."""
pass
class KeyGenerationError(CryptoError):
"""Key generation failed."""
pass
class KeyPasswordError(CryptoError):
"""RSA key password is incorrect or missing."""
pass
# ============================================================================
# STEGANOGRAPHY ERRORS
# ============================================================================
class SteganographyError(StegasooError):
"""Base class for steganography errors."""
pass
class CapacityError(SteganographyError):
"""Carrier image too small for message."""
def __init__(self, needed: int, available: int):
self.needed = needed
self.available = available
super().__init__(
f"Carrier image too small. Need {needed:,} bytes, have {available:,} bytes capacity."
)
class ExtractionError(SteganographyError):
"""Failed to extract hidden data from image."""
pass
class EmbeddingError(SteganographyError):
"""Failed to embed data in image."""
pass
class InvalidHeaderError(SteganographyError):
"""Invalid or missing Stegasoo header in extracted data."""
pass
# ============================================================================
# FILE ERRORS
# ============================================================================
class FileError(StegasooError):
"""Base class for file-related errors."""
pass
class FileNotFoundError(FileError):
"""Required file not found."""
pass
class FileTooLargeError(FileError):
"""File exceeds size limit."""
def __init__(self, size: int, limit: int, filename: str = "File"):
self.size = size
self.limit = limit
self.filename = filename
super().__init__(
f"{filename} too large ({size:,} bytes). Maximum allowed: {limit:,} bytes."
)
class UnsupportedFileTypeError(FileError):
"""File type not supported."""
def __init__(self, extension: str, allowed: set[str]):
self.extension = extension
self.allowed = allowed
super().__init__(
f"Unsupported file type: .{extension}. Allowed: {', '.join(sorted(allowed))}"
)

228
src/stegasoo/keygen.py Normal file
View File

@@ -0,0 +1,228 @@
"""
Stegasoo Key Generation
Generate PINs, passphrases, and RSA keys.
"""
import secrets
from typing import Optional
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from .constants import (
DAY_NAMES,
MIN_PIN_LENGTH, MAX_PIN_LENGTH, DEFAULT_PIN_LENGTH,
MIN_PHRASE_WORDS, MAX_PHRASE_WORDS, DEFAULT_PHRASE_WORDS,
MIN_RSA_BITS, VALID_RSA_SIZES, DEFAULT_RSA_BITS,
get_wordlist,
)
from .models import Credentials, KeyInfo
from .exceptions import KeyGenerationError, KeyPasswordError
def generate_pin(length: int = DEFAULT_PIN_LENGTH) -> str:
"""
Generate a random PIN.
PINs never start with zero for usability.
Args:
length: PIN length (6-9 digits)
Returns:
PIN string
"""
length = max(MIN_PIN_LENGTH, min(MAX_PIN_LENGTH, length))
# First digit: 1-9 (no leading zero)
first_digit = str(secrets.randbelow(9) + 1)
# Remaining digits: 0-9
rest = ''.join(str(secrets.randbelow(10)) for _ in range(length - 1))
return first_digit + rest
def generate_phrase(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> str:
"""
Generate a random passphrase from BIP-39 wordlist.
Args:
words_per_phrase: Number of words (3-12)
Returns:
Space-separated phrase
"""
words_per_phrase = max(MIN_PHRASE_WORDS, min(MAX_PHRASE_WORDS, words_per_phrase))
wordlist = get_wordlist()
words = [secrets.choice(wordlist) for _ in range(words_per_phrase)]
return ' '.join(words)
def generate_day_phrases(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> dict[str, str]:
"""
Generate phrases for all days of the week.
Args:
words_per_phrase: Number of words per phrase (3-12)
Returns:
Dict mapping day names to phrases
"""
return {day: generate_phrase(words_per_phrase) for day in DAY_NAMES}
def generate_rsa_key(bits: int = DEFAULT_RSA_BITS) -> rsa.RSAPrivateKey:
"""
Generate an RSA private key.
Args:
bits: Key size (2048, 3072, or 4096)
Returns:
RSA private key object
Raises:
KeyGenerationError: If generation fails
"""
if bits not in VALID_RSA_SIZES:
bits = DEFAULT_RSA_BITS
try:
return rsa.generate_private_key(
public_exponent=65537,
key_size=bits,
backend=default_backend()
)
except Exception as e:
raise KeyGenerationError(f"Failed to generate RSA key: {e}") from e
def export_rsa_key_pem(
private_key: rsa.RSAPrivateKey,
password: Optional[str] = None
) -> bytes:
"""
Export RSA key to PEM format.
Args:
private_key: RSA private key object
password: Optional password for encryption
Returns:
PEM-encoded key bytes
"""
if password:
encryption = serialization.BestAvailableEncryption(password.encode())
else:
encryption = serialization.NoEncryption()
return private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=encryption
)
def load_rsa_key(
key_data: bytes,
password: Optional[str] = None
) -> rsa.RSAPrivateKey:
"""
Load RSA private key from PEM data.
Args:
key_data: PEM-encoded key bytes
password: Password if key is encrypted
Returns:
RSA private key object
Raises:
KeyPasswordError: If password is wrong or missing
KeyGenerationError: If key is invalid
"""
try:
pwd_bytes = password.encode() if password else None
return load_pem_private_key(key_data, password=pwd_bytes, backend=default_backend())
except TypeError:
raise KeyPasswordError("RSA key is password-protected. Please provide the password.")
except ValueError as e:
if "password" in str(e).lower() or "encrypted" in str(e).lower():
raise KeyPasswordError("Incorrect password for RSA key.")
raise KeyGenerationError(f"Invalid RSA key: {e}") from e
except Exception as e:
raise KeyGenerationError(f"Could not load RSA key: {e}") from e
def get_key_info(key_data: bytes, password: Optional[str] = None) -> KeyInfo:
"""
Get information about an RSA key.
Args:
key_data: PEM-encoded key bytes
password: Password if key is encrypted
Returns:
KeyInfo with key size and encryption status
"""
# Check if encrypted
is_encrypted = b'ENCRYPTED' in key_data
private_key = load_rsa_key(key_data, password)
return KeyInfo(
key_size=private_key.key_size,
is_encrypted=is_encrypted,
pem_data=key_data
)
def generate_credentials(
use_pin: bool = True,
use_rsa: bool = False,
pin_length: int = DEFAULT_PIN_LENGTH,
rsa_bits: int = DEFAULT_RSA_BITS,
words_per_phrase: int = DEFAULT_PHRASE_WORDS
) -> Credentials:
"""
Generate a complete set of credentials.
At least one of use_pin or use_rsa must be True.
Args:
use_pin: Whether to generate a PIN
use_rsa: Whether to generate an RSA key
pin_length: PIN length if generating
rsa_bits: RSA key size if generating
words_per_phrase: Words per daily phrase
Returns:
Credentials object
Raises:
ValueError: If neither PIN nor RSA is selected
"""
if not use_pin and not use_rsa:
raise ValueError("Must select at least one security factor (PIN or RSA key)")
phrases = generate_day_phrases(words_per_phrase)
pin = generate_pin(pin_length) if use_pin else None
rsa_key_pem = None
if use_rsa:
private_key = generate_rsa_key(rsa_bits)
rsa_key_pem = export_rsa_key_pem(private_key).decode('utf-8')
return Credentials(
phrases=phrases,
pin=pin,
rsa_key_pem=rsa_key_pem,
rsa_bits=rsa_bits if use_rsa else None,
words_per_phrase=words_per_phrase
)

134
src/stegasoo/models.py Normal file
View File

@@ -0,0 +1,134 @@
"""
Stegasoo Data Models
Dataclasses for structured data exchange between modules and frontends.
"""
from dataclasses import dataclass, field
from datetime import date
from typing import Optional
@dataclass
class Credentials:
"""Generated credentials for encoding/decoding."""
phrases: dict[str, str] # Day -> phrase mapping
pin: Optional[str] = None
rsa_key_pem: Optional[str] = None
rsa_bits: Optional[int] = None
words_per_phrase: int = 3
@property
def phrase_entropy(self) -> int:
"""Entropy in bits from phrases (~11 bits per BIP-39 word)."""
return self.words_per_phrase * 11
@property
def pin_entropy(self) -> int:
"""Entropy in bits from PIN (~3.32 bits per digit)."""
if self.pin:
return int(len(self.pin) * 3.32)
return 0
@property
def rsa_entropy(self) -> int:
"""Effective entropy from RSA key."""
if self.rsa_key_pem and self.rsa_bits:
return min(self.rsa_bits // 16, 128)
return 0
@property
def total_entropy(self) -> int:
"""Total entropy in bits (excluding reference photo)."""
return self.phrase_entropy + self.pin_entropy + self.rsa_entropy
@dataclass
class EncodeInput:
"""Input parameters for encoding a message."""
message: str
reference_photo: bytes
carrier_image: bytes
day_phrase: str
pin: str = ""
rsa_key_data: Optional[bytes] = None
rsa_password: Optional[str] = None
date_str: Optional[str] = None # YYYY-MM-DD, defaults to today
def __post_init__(self):
if self.date_str is None:
self.date_str = date.today().isoformat()
@dataclass
class EncodeResult:
"""Result of encoding operation."""
stego_image: bytes
filename: str
pixels_modified: int
total_pixels: int
capacity_used: float # 0.0 - 1.0
date_used: str
@property
def capacity_percent(self) -> float:
"""Capacity used as percentage."""
return self.capacity_used * 100
@dataclass
class DecodeInput:
"""Input parameters for decoding a message."""
stego_image: bytes
reference_photo: bytes
day_phrase: str
pin: str = ""
rsa_key_data: Optional[bytes] = None
rsa_password: Optional[str] = None
@dataclass
class DecodeResult:
"""Result of decoding operation."""
message: str
date_encoded: str
@dataclass
class EmbedStats:
"""Statistics from image embedding."""
pixels_modified: int
total_pixels: int
capacity_used: float
bytes_embedded: int
@property
def modification_percent(self) -> float:
"""Percentage of pixels modified."""
return (self.pixels_modified / self.total_pixels) * 100 if self.total_pixels > 0 else 0
@dataclass
class KeyInfo:
"""Information about an RSA key."""
key_size: int
is_encrypted: bool
pem_data: bytes
@dataclass
class ValidationResult:
"""Result of input validation."""
is_valid: bool
error_message: str = ""
details: dict = field(default_factory=dict)
@classmethod
def ok(cls, **details) -> 'ValidationResult':
"""Create a successful validation result."""
return cls(is_valid=True, details=details)
@classmethod
def error(cls, message: str, **details) -> 'ValidationResult':
"""Create a failed validation result."""
return cls(is_valid=False, error_message=message, details=details)

View File

@@ -0,0 +1,286 @@
"""
Stegasoo Steganography Functions
LSB embedding and extraction with pseudo-random pixel selection.
"""
import io
import struct
from typing import Optional
from PIL import Image
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.backends import default_backend
from .models import EmbedStats
from .exceptions import CapacityError, ExtractionError, EmbeddingError
def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> list[int]:
"""
Generate pseudo-random pixel indices for embedding.
Uses ChaCha20 as a CSPRNG seeded by the key to deterministically
select which pixels will hold hidden data.
Args:
key: 32-byte key for pixel selection
num_pixels: Total pixels in image
num_needed: Number of pixels needed for embedding
Returns:
List of pixel indices
"""
if num_needed >= num_pixels // 2:
# If we need many pixels, shuffle all indices
nonce = b'\x00' * 16
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
encryptor = cipher.encryptor()
indices = list(range(num_pixels))
random_bytes = encryptor.update(b'\x00' * (num_pixels * 4))
for i in range(num_pixels - 1, 0, -1):
j_bytes = random_bytes[(num_pixels - 1 - i) * 4:(num_pixels - i) * 4]
j = int.from_bytes(j_bytes, 'big') % (i + 1)
indices[i], indices[j] = indices[j], indices[i]
return indices[:num_needed]
# Optimized path: generate indices directly
selected = []
used = set()
nonce = b'\x00' * 16
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
encryptor = cipher.encryptor()
# Generate more than needed to handle collisions
bytes_needed = (num_needed * 2) * 4
random_bytes = encryptor.update(b'\x00' * bytes_needed)
byte_offset = 0
while len(selected) < num_needed and byte_offset < len(random_bytes) - 4:
idx = int.from_bytes(random_bytes[byte_offset:byte_offset + 4], 'big') % num_pixels
byte_offset += 4
if idx not in used:
used.add(idx)
selected.append(idx)
# Generate additional if needed (rare)
while len(selected) < num_needed:
extra_bytes = encryptor.update(b'\x00' * 4)
idx = int.from_bytes(extra_bytes, 'big') % num_pixels
if idx not in used:
used.add(idx)
selected.append(idx)
return selected
def embed_in_image(
carrier_data: bytes,
encrypted_data: bytes,
pixel_key: bytes,
bits_per_channel: int = 1
) -> tuple[bytes, EmbedStats]:
"""
Embed encrypted data in carrier image using LSB steganography.
Uses pseudo-random pixel selection based on pixel_key to scatter
the data across the image, defeating statistical analysis.
Args:
carrier_data: Carrier image bytes
encrypted_data: Data to embed
pixel_key: Key for pixel selection
bits_per_channel: Bits to use per color channel (1-2)
Returns:
Tuple of (PNG image bytes, EmbedStats)
Raises:
CapacityError: If carrier is too small
EmbeddingError: If embedding fails
"""
try:
img = Image.open(io.BytesIO(carrier_data))
if img.mode != 'RGB':
img = img.convert('RGB')
pixels = list(img.getdata())
num_pixels = len(pixels)
bits_per_pixel = 3 * bits_per_channel
max_bytes = (num_pixels * bits_per_pixel) // 8
# Prepend length
data_with_len = struct.pack('>I', len(encrypted_data)) + encrypted_data
if len(data_with_len) > max_bytes:
raise CapacityError(len(data_with_len), max_bytes)
# Convert to binary string
binary_data = ''.join(format(b, '08b') for b in data_with_len)
pixels_needed = (len(binary_data) + bits_per_pixel - 1) // bits_per_pixel
# Get pixel indices
selected_indices = generate_pixel_indices(pixel_key, num_pixels, pixels_needed)
# Embed data
new_pixels = list(pixels)
clear_mask = 0xFF ^ ((1 << bits_per_channel) - 1)
bit_idx = 0
for pixel_idx in selected_indices:
if bit_idx >= len(binary_data):
break
r, g, b = new_pixels[pixel_idx]
for channel_idx, channel_val in enumerate([r, g, b]):
if bit_idx >= len(binary_data):
break
bits = binary_data[bit_idx:bit_idx + bits_per_channel].ljust(bits_per_channel, '0')
new_val = (channel_val & clear_mask) | int(bits, 2)
if channel_idx == 0:
r = new_val
elif channel_idx == 1:
g = new_val
else:
b = new_val
bit_idx += bits_per_channel
new_pixels[pixel_idx] = (r, g, b)
# Create output image
stego_img = Image.new('RGB', img.size)
stego_img.putdata(new_pixels)
output = io.BytesIO()
stego_img.save(output, 'PNG')
output.seek(0)
stats = EmbedStats(
pixels_modified=len(selected_indices),
total_pixels=num_pixels,
capacity_used=len(data_with_len) / max_bytes,
bytes_embedded=len(data_with_len)
)
return output.getvalue(), stats
except CapacityError:
raise
except Exception as e:
raise EmbeddingError(f"Failed to embed data: {e}") from e
def extract_from_image(
image_data: bytes,
pixel_key: bytes,
bits_per_channel: int = 1
) -> Optional[bytes]:
"""
Extract hidden data from a stego image.
Args:
image_data: Stego image bytes
pixel_key: Key for pixel selection (must match encoding)
bits_per_channel: Bits per channel (must match encoding)
Returns:
Extracted data bytes, or None if extraction fails
Raises:
ExtractionError: If extraction fails critically
"""
try:
img = Image.open(io.BytesIO(image_data))
if img.mode != 'RGB':
img = img.convert('RGB')
pixels = list(img.getdata())
num_pixels = len(pixels)
bits_per_pixel = 3 * bits_per_channel
# First, extract enough to get the length (4 bytes = 32 bits)
initial_pixels = (32 + bits_per_pixel - 1) // bits_per_pixel + 10
initial_indices = generate_pixel_indices(pixel_key, num_pixels, initial_pixels)
binary_data = ''
for pixel_idx in initial_indices:
r, g, b = pixels[pixel_idx]
for channel in [r, g, b]:
for bit_pos in range(bits_per_channel - 1, -1, -1):
binary_data += str((channel >> bit_pos) & 1)
# Parse length
try:
length_bits = binary_data[:32]
data_length = struct.unpack('>I', int(length_bits, 2).to_bytes(4, 'big'))[0]
except Exception:
return None
# Sanity check
max_possible = (num_pixels * bits_per_pixel) // 8 - 4
if data_length > max_possible or data_length < 10:
return None
# Extract full data
total_bits = (4 + data_length) * 8
pixels_needed = (total_bits + bits_per_pixel - 1) // bits_per_pixel
selected_indices = generate_pixel_indices(pixel_key, num_pixels, pixels_needed)
binary_data = ''
for pixel_idx in selected_indices:
r, g, b = pixels[pixel_idx]
for channel in [r, g, b]:
for bit_pos in range(bits_per_channel - 1, -1, -1):
binary_data += str((channel >> bit_pos) & 1)
data_bits = binary_data[32:32 + (data_length * 8)]
data_bytes = bytearray()
for i in range(0, len(data_bits), 8):
byte_bits = data_bits[i:i + 8]
if len(byte_bits) == 8:
data_bytes.append(int(byte_bits, 2))
return bytes(data_bytes)
except Exception as e:
raise ExtractionError(f"Failed to extract data: {e}") from e
def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int:
"""
Calculate the maximum message capacity of an image.
Args:
image_data: Image bytes
bits_per_channel: Bits to use per color channel
Returns:
Maximum bytes that can be embedded (minus overhead)
"""
img = Image.open(io.BytesIO(image_data))
if img.mode != 'RGB':
img = img.convert('RGB')
num_pixels = img.size[0] * img.size[1]
bits_per_pixel = 3 * bits_per_channel
max_bytes = (num_pixels * bits_per_pixel) // 8
# Subtract overhead: 4 bytes length + ~100 bytes header
return max(0, max_bytes - 104)
def get_image_dimensions(image_data: bytes) -> tuple[int, int]:
"""Get image dimensions without loading full image."""
img = Image.open(io.BytesIO(image_data))
return img.size

201
src/stegasoo/utils.py Normal file
View File

@@ -0,0 +1,201 @@
"""
Stegasoo Utilities
Secure deletion, filename generation, and other helpers.
"""
import os
import random
import secrets
import shutil
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from .constants import DAY_NAMES
def generate_filename(date_str: Optional[str] = None, prefix: str = "") -> str:
"""
Generate a filename for stego images.
Format: {prefix}{random}_{YYYYMMDD}.png
Args:
date_str: Date string (YYYY-MM-DD), defaults to today
prefix: Optional prefix
Returns:
Filename string
"""
if date_str is None:
date_str = date.today().isoformat()
date_compact = date_str.replace('-', '')
random_hex = secrets.token_hex(4)
return f"{prefix}{random_hex}_{date_compact}.png"
def parse_date_from_filename(filename: str) -> Optional[str]:
"""
Extract date from a stego filename.
Looks for patterns like _20251227 or _2025-12-27
Args:
filename: Filename to parse
Returns:
Date string (YYYY-MM-DD) or None
"""
import re
# Try YYYYMMDD format
match = re.search(r'_(\d{4})(\d{2})(\d{2})(?:\.|$)', filename)
if match:
year, month, day = match.groups()
return f"{year}-{month}-{day}"
# Try YYYY-MM-DD format
match = re.search(r'_(\d{4})-(\d{2})-(\d{2})(?:\.|$)', filename)
if match:
year, month, day = match.groups()
return f"{year}-{month}-{day}"
return None
def get_day_from_date(date_str: str) -> str:
"""
Get day of week name from date string.
Args:
date_str: Date string (YYYY-MM-DD)
Returns:
Day name (e.g., "Monday")
"""
try:
year, month, day = map(int, date_str.split('-'))
d = date(year, month, day)
return DAY_NAMES[d.weekday()]
except Exception:
return ""
def get_today_date() -> str:
"""Get today's date as YYYY-MM-DD."""
return date.today().isoformat()
def get_today_day() -> str:
"""Get today's day name."""
return DAY_NAMES[date.today().weekday()]
class SecureDeleter:
"""
Securely delete files by overwriting with random data.
Implements multi-pass overwriting before deletion.
"""
def __init__(self, path: str | Path, passes: int = 7):
"""
Initialize secure deleter.
Args:
path: Path to file or directory
passes: Number of overwrite passes
"""
self.path = Path(path)
self.passes = passes
def _overwrite_file(self, file_path: Path) -> None:
"""Overwrite file with random data multiple times."""
if not file_path.exists() or not file_path.is_file():
return
length = file_path.stat().st_size
if length == 0:
return
patterns = [b'\x00', b'\xFF', bytes([random.randint(0, 255)])]
for _ in range(self.passes):
with open(file_path, 'r+b') as f:
for pattern in patterns:
f.seek(0)
for _ in range(length):
f.write(pattern)
# Final pass with random data
f.seek(0)
f.write(os.urandom(length))
def delete_file(self) -> None:
"""Securely delete a single file."""
if self.path.is_file():
self._overwrite_file(self.path)
self.path.unlink()
def delete_directory(self) -> None:
"""Securely delete a directory and all contents."""
if not self.path.is_dir():
return
# First, securely overwrite all files
for file_path in self.path.rglob('*'):
if file_path.is_file():
self._overwrite_file(file_path)
# Then remove the directory tree
shutil.rmtree(self.path)
def execute(self) -> None:
"""Securely delete the path (file or directory)."""
if self.path.is_file():
self.delete_file()
elif self.path.is_dir():
self.delete_directory()
def secure_delete(path: str | Path, passes: int = 7) -> None:
"""
Convenience function for secure deletion.
Args:
path: Path to file or directory
passes: Number of overwrite passes
"""
SecureDeleter(path, passes).execute()
def format_file_size(size_bytes: int) -> str:
"""
Format file size for display.
Args:
size_bytes: Size in bytes
Returns:
Human-readable string (e.g., "1.5 MB")
"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024:
if unit == 'B':
return f"{size_bytes} {unit}"
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"
def format_number(n: int) -> str:
"""Format number with commas."""
return f"{n:,}"
def clamp(value: int, min_val: int, max_val: int) -> int:
"""Clamp value to range."""
return max(min_val, min(max_val, value))

344
src/stegasoo/validation.py Normal file
View File

@@ -0,0 +1,344 @@
"""
Stegasoo Input Validation
Validators for all user inputs with clear error messages.
"""
import io
from typing import Optional
from PIL import Image
from .constants import (
MIN_PIN_LENGTH, MAX_PIN_LENGTH,
MAX_MESSAGE_SIZE, MAX_IMAGE_PIXELS, MAX_FILE_SIZE,
MIN_RSA_BITS, MIN_KEY_PASSWORD_LENGTH,
ALLOWED_IMAGE_EXTENSIONS, ALLOWED_KEY_EXTENSIONS,
)
from .models import ValidationResult
from .exceptions import (
ValidationError, PinValidationError, MessageValidationError,
ImageValidationError, KeyValidationError, SecurityFactorError,
FileTooLargeError, UnsupportedFileTypeError,
)
from .keygen import load_rsa_key
def validate_pin(pin: str, required: bool = False) -> ValidationResult:
"""
Validate PIN format.
Rules:
- 6-9 digits only
- Cannot start with zero
- Empty is OK if not required
Args:
pin: PIN string to validate
required: Whether PIN is required
Returns:
ValidationResult
"""
if not pin:
if required:
return ValidationResult.error("PIN is required")
return ValidationResult.ok()
if not pin.isdigit():
return ValidationResult.error("PIN must contain only digits")
if len(pin) < MIN_PIN_LENGTH or len(pin) > MAX_PIN_LENGTH:
return ValidationResult.error(
f"PIN must be {MIN_PIN_LENGTH}-{MAX_PIN_LENGTH} digits"
)
if pin[0] == '0':
return ValidationResult.error("PIN cannot start with zero")
return ValidationResult.ok(length=len(pin))
def validate_message(message: str) -> ValidationResult:
"""
Validate message content and size.
Args:
message: Message text
Returns:
ValidationResult
"""
if not message:
return ValidationResult.error("Message is required")
if len(message) > MAX_MESSAGE_SIZE:
return ValidationResult.error(
f"Message too long ({len(message):,} chars). Maximum: {MAX_MESSAGE_SIZE:,} characters"
)
return ValidationResult.ok(length=len(message))
def validate_image(
image_data: bytes,
name: str = "Image",
check_size: bool = True
) -> ValidationResult:
"""
Validate image data and dimensions.
Args:
image_data: Raw image bytes
name: Name for error messages
check_size: Whether to check pixel dimensions
Returns:
ValidationResult with width, height, pixels
"""
if not image_data:
return ValidationResult.error(f"{name} is required")
if len(image_data) > MAX_FILE_SIZE:
return ValidationResult.error(
f"{name} too large ({len(image_data):,} bytes). Maximum: {MAX_FILE_SIZE:,} bytes"
)
try:
img = Image.open(io.BytesIO(image_data))
width, height = img.size
num_pixels = width * height
if check_size and num_pixels > MAX_IMAGE_PIXELS:
max_dim = int(MAX_IMAGE_PIXELS ** 0.5)
return ValidationResult.error(
f"{name} too large ({width}×{height} = {num_pixels:,} pixels). "
f"Maximum: ~{MAX_IMAGE_PIXELS:,} pixels ({max_dim}×{max_dim})"
)
return ValidationResult.ok(
width=width,
height=height,
pixels=num_pixels,
mode=img.mode,
format=img.format
)
except Exception as e:
return ValidationResult.error(f"Could not read {name}: {e}")
def validate_rsa_key(
key_data: bytes,
password: Optional[str] = None,
required: bool = False
) -> ValidationResult:
"""
Validate RSA private key.
Args:
key_data: PEM-encoded key bytes
password: Password if key is encrypted
required: Whether key is required
Returns:
ValidationResult with key_size
"""
if not key_data:
if required:
return ValidationResult.error("RSA key is required")
return ValidationResult.ok()
try:
private_key = load_rsa_key(key_data, password)
key_size = private_key.key_size
if key_size < MIN_RSA_BITS:
return ValidationResult.error(
f"RSA key must be at least {MIN_RSA_BITS} bits (got {key_size})"
)
return ValidationResult.ok(key_size=key_size)
except Exception as e:
return ValidationResult.error(str(e))
def validate_security_factors(
pin: str,
rsa_key_data: Optional[bytes]
) -> ValidationResult:
"""
Validate that at least one security factor is provided.
Args:
pin: PIN string (may be empty)
rsa_key_data: RSA key bytes (may be None/empty)
Returns:
ValidationResult
"""
has_pin = bool(pin and pin.strip())
has_key = bool(rsa_key_data and len(rsa_key_data) > 0)
if not has_pin and not has_key:
return ValidationResult.error(
"You must provide at least a PIN or RSA Key"
)
return ValidationResult.ok(has_pin=has_pin, has_key=has_key)
def validate_file_extension(
filename: str,
allowed: set[str],
file_type: str = "File"
) -> ValidationResult:
"""
Validate file extension.
Args:
filename: Filename to check
allowed: Set of allowed extensions (lowercase, no dot)
file_type: Name for error messages
Returns:
ValidationResult with extension
"""
if not filename or '.' not in filename:
return ValidationResult.error(f"{file_type} must have a file extension")
ext = filename.rsplit('.', 1)[1].lower()
if ext not in allowed:
return ValidationResult.error(
f"Unsupported {file_type.lower()} type: .{ext}. "
f"Allowed: {', '.join(sorted('.' + e for e in allowed))}"
)
return ValidationResult.ok(extension=ext)
def validate_image_file(filename: str) -> ValidationResult:
"""Validate image file extension."""
return validate_file_extension(filename, ALLOWED_IMAGE_EXTENSIONS, "Image")
def validate_key_file(filename: str) -> ValidationResult:
"""Validate key file extension."""
return validate_file_extension(filename, ALLOWED_KEY_EXTENSIONS, "Key file")
def validate_key_password(password: str) -> ValidationResult:
"""
Validate password for key encryption.
Args:
password: Password string
Returns:
ValidationResult
"""
if not password:
return ValidationResult.error("Password is required")
if len(password) < MIN_KEY_PASSWORD_LENGTH:
return ValidationResult.error(
f"Password must be at least {MIN_KEY_PASSWORD_LENGTH} characters"
)
return ValidationResult.ok(length=len(password))
def validate_phrase(phrase: str) -> ValidationResult:
"""
Validate day phrase.
Args:
phrase: Phrase string
Returns:
ValidationResult with word_count
"""
if not phrase or not phrase.strip():
return ValidationResult.error("Day phrase is required")
words = phrase.strip().split()
return ValidationResult.ok(word_count=len(words))
def validate_date_string(date_str: str) -> ValidationResult:
"""
Validate date string format (YYYY-MM-DD).
Args:
date_str: Date string
Returns:
ValidationResult
"""
if not date_str:
return ValidationResult.error("Date is required")
if len(date_str) != 10:
return ValidationResult.error("Date must be in YYYY-MM-DD format")
if date_str[4] != '-' or date_str[7] != '-':
return ValidationResult.error("Date must be in YYYY-MM-DD format")
try:
year = int(date_str[0:4])
month = int(date_str[5:7])
day = int(date_str[8:10])
if not (1 <= month <= 12 and 1 <= day <= 31 and 2000 <= year <= 2100):
return ValidationResult.error("Invalid date values")
return ValidationResult.ok(year=year, month=month, day=day)
except ValueError:
return ValidationResult.error("Date must contain valid numbers")
# ============================================================================
# EXCEPTION-RAISING VALIDATORS (for CLI/API use)
# ============================================================================
def require_valid_pin(pin: str, required: bool = False) -> None:
"""Validate PIN, raising exception on failure."""
result = validate_pin(pin, required)
if not result.is_valid:
raise PinValidationError(result.error_message)
def require_valid_message(message: str) -> None:
"""Validate message, raising exception on failure."""
result = validate_message(message)
if not result.is_valid:
raise MessageValidationError(result.error_message)
def require_valid_image(image_data: bytes, name: str = "Image") -> None:
"""Validate image, raising exception on failure."""
result = validate_image(image_data, name)
if not result.is_valid:
raise ImageValidationError(result.error_message)
def require_valid_rsa_key(
key_data: bytes,
password: Optional[str] = None,
required: bool = False
) -> None:
"""Validate RSA key, raising exception on failure."""
result = validate_rsa_key(key_data, password, required)
if not result.is_valid:
raise KeyValidationError(result.error_message)
def require_security_factors(pin: str, rsa_key_data: Optional[bytes]) -> None:
"""Validate security factors, raising exception on failure."""
result = validate_security_factors(pin, rsa_key_data)
if not result.is_valid:
raise SecurityFactorError(result.error_message)