Pinned the container, some other resiliancy stuff.
This commit is contained in:
@@ -58,6 +58,16 @@ File Embedding:
|
||||
else:
|
||||
print(decoded.message)
|
||||
|
||||
Capacity Pre-check (v2.2.1):
|
||||
from stegasoo import will_fit
|
||||
|
||||
# Check if payload will fit before encoding
|
||||
result = will_fit("My secret message", carrier_image)
|
||||
if result['fits']:
|
||||
print(f"Will use {result['usage_percent']:.1f}% capacity")
|
||||
else:
|
||||
print(f"Need {-result['headroom']} more bytes")
|
||||
|
||||
Debugging:
|
||||
from stegasoo.debug import debug
|
||||
debug.enable(True) # Enable debug output
|
||||
@@ -142,6 +152,8 @@ from .steganography import (
|
||||
get_image_format,
|
||||
is_lossless_format,
|
||||
LOSSLESS_FORMATS,
|
||||
# NEW in v2.2.1
|
||||
will_fit,
|
||||
)
|
||||
from .utils import (
|
||||
generate_filename,
|
||||
@@ -152,6 +164,8 @@ from .utils import (
|
||||
secure_delete,
|
||||
SecureDeleter,
|
||||
format_file_size,
|
||||
# NEW in v2.2.1
|
||||
strip_image_metadata,
|
||||
)
|
||||
from .debug import debug # Import debug utilities
|
||||
|
||||
@@ -177,6 +191,8 @@ from .batch import (
|
||||
BatchItem,
|
||||
BatchStatus,
|
||||
batch_capacity_check,
|
||||
# NEW in v2.2.1
|
||||
BatchCredentials,
|
||||
)
|
||||
|
||||
# QR Code utilities (optional, depends on qrcode and pyzbar)
|
||||
@@ -630,6 +646,7 @@ __all__ = [
|
||||
'get_image_dimensions',
|
||||
'get_image_format',
|
||||
'is_lossless_format',
|
||||
'will_fit', # NEW in v2.2.1
|
||||
|
||||
# Utilities
|
||||
'generate_filename',
|
||||
@@ -640,6 +657,7 @@ __all__ = [
|
||||
'secure_delete',
|
||||
'SecureDeleter',
|
||||
'format_file_size',
|
||||
'strip_image_metadata', # NEW in v2.2.1
|
||||
|
||||
# Debugging
|
||||
'debug',
|
||||
@@ -659,4 +677,5 @@ __all__ = [
|
||||
'BatchItem',
|
||||
'BatchStatus',
|
||||
'batch_capacity_check',
|
||||
'BatchCredentials', # NEW in v2.2.1
|
||||
]
|
||||
|
||||
@@ -61,6 +61,41 @@ class BatchItem:
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchCredentials:
|
||||
"""
|
||||
Credentials for batch encode/decode operations.
|
||||
|
||||
Provides a structured way to pass authentication factors
|
||||
for batch processing instead of using plain dicts.
|
||||
|
||||
Example:
|
||||
creds = BatchCredentials(
|
||||
reference_photo=ref_bytes,
|
||||
day_phrase="apple forest thunder",
|
||||
pin="123456"
|
||||
)
|
||||
result = processor.batch_encode(images, creds, message="secret")
|
||||
"""
|
||||
reference_photo: bytes
|
||||
day_phrase: str
|
||||
pin: str = ""
|
||||
rsa_key_data: Optional[bytes] = None
|
||||
rsa_password: Optional[str] = None
|
||||
date_str: Optional[str] = None # YYYY-MM-DD, defaults to today
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to dictionary for legacy API compatibility."""
|
||||
return {
|
||||
"reference_photo": self.reference_photo,
|
||||
"day_phrase": self.day_phrase,
|
||||
"pin": self.pin,
|
||||
"rsa_key_data": self.rsa_key_data,
|
||||
"rsa_password": self.rsa_password,
|
||||
"date_str": self.date_str,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchResult:
|
||||
"""Summary of a batch operation."""
|
||||
|
||||
@@ -12,7 +12,7 @@ from pathlib import Path
|
||||
# VERSION
|
||||
# ============================================================================
|
||||
|
||||
__version__ = "2.2.0"
|
||||
__version__ = "2.2.1"
|
||||
|
||||
# ============================================================================
|
||||
# FILE FORMAT
|
||||
|
||||
@@ -6,13 +6,13 @@ LSB embedding and extraction with pseudo-random pixel selection.
|
||||
|
||||
import io
|
||||
import struct
|
||||
from typing import Optional, Tuple, List
|
||||
from typing import Optional, Tuple, List, Union
|
||||
|
||||
from PIL import Image
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
from .models import EmbedStats
|
||||
from .models import EmbedStats, FilePayload
|
||||
from .exceptions import CapacityError, ExtractionError, EmbeddingError
|
||||
from .debug import debug
|
||||
|
||||
@@ -35,6 +35,11 @@ EXT_TO_FORMAT = {
|
||||
'tif': 'TIFF',
|
||||
}
|
||||
|
||||
# Overhead constants for capacity estimation
|
||||
HEADER_OVERHEAD = 104 # Magic + version + date + salt + iv + tag
|
||||
LENGTH_PREFIX = 4 # 4 bytes for payload length
|
||||
ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX
|
||||
|
||||
|
||||
def get_output_format(input_format: Optional[str]) -> Tuple[str, str]:
|
||||
"""
|
||||
@@ -67,6 +72,106 @@ def get_output_format(input_format: Optional[str]) -> Tuple[str, str]:
|
||||
return 'PNG', 'png'
|
||||
|
||||
|
||||
def will_fit(
|
||||
payload: Union[str, bytes, FilePayload, int],
|
||||
carrier_image: bytes,
|
||||
bits_per_channel: int = 1,
|
||||
include_compression_estimate: bool = True,
|
||||
) -> dict:
|
||||
"""
|
||||
Check if a payload will fit in a carrier image without performing encryption.
|
||||
|
||||
This is a lightweight pre-check to avoid wasted work on payloads that
|
||||
are too large. For accurate results with compression, the actual compressed
|
||||
size may vary.
|
||||
|
||||
Args:
|
||||
payload: Message string, raw bytes, FilePayload, or size in bytes
|
||||
carrier_image: Carrier image bytes
|
||||
bits_per_channel: Bits to use per color channel (1-2)
|
||||
include_compression_estimate: Estimate compressed size (requires payload data)
|
||||
|
||||
Returns:
|
||||
Dict with:
|
||||
- fits: bool - Whether payload will fit
|
||||
- payload_size: int - Raw payload size in bytes
|
||||
- estimated_encrypted_size: int - Estimated size after encryption + overhead
|
||||
- capacity: int - Available capacity in bytes
|
||||
- usage_percent: float - Estimated capacity usage (0-100)
|
||||
- headroom: int - Bytes remaining (negative if won't fit)
|
||||
- compressed_estimate: int | None - Estimated compressed size (if applicable)
|
||||
|
||||
Example:
|
||||
>>> result = will_fit("Hello world", carrier_bytes)
|
||||
>>> result['fits']
|
||||
True
|
||||
>>> result['usage_percent']
|
||||
0.5
|
||||
|
||||
>>> result = will_fit(50000, carrier_bytes) # Check if 50KB would fit
|
||||
>>> result['fits']
|
||||
False
|
||||
"""
|
||||
# Determine payload size
|
||||
if isinstance(payload, int):
|
||||
payload_size = payload
|
||||
payload_data = None
|
||||
elif isinstance(payload, str):
|
||||
payload_data = payload.encode('utf-8')
|
||||
payload_size = len(payload_data)
|
||||
elif isinstance(payload, FilePayload):
|
||||
payload_data = payload.data
|
||||
# Account for filename/mime metadata
|
||||
filename_overhead = len(payload.filename.encode('utf-8')) if payload.filename else 0
|
||||
mime_overhead = len(payload.mime_type.encode('utf-8')) if payload.mime_type else 0
|
||||
payload_size = len(payload.data) + filename_overhead + mime_overhead + 5 # +5 for length prefixes + type byte
|
||||
else:
|
||||
payload_data = payload
|
||||
payload_size = len(payload)
|
||||
|
||||
# Calculate capacity
|
||||
capacity = calculate_capacity(carrier_image, bits_per_channel)
|
||||
|
||||
# Estimate encrypted size (payload + random padding + overhead)
|
||||
# Padding adds 64-319 bytes, averaging ~190
|
||||
estimated_padding = 190
|
||||
estimated_encrypted_size = payload_size + estimated_padding + ENCRYPTION_OVERHEAD
|
||||
|
||||
# Compression estimate
|
||||
compressed_estimate = None
|
||||
if include_compression_estimate and payload_data is not None and len(payload_data) >= 64:
|
||||
try:
|
||||
import zlib
|
||||
compressed = zlib.compress(payload_data, level=6)
|
||||
# Add compression header overhead (9 bytes)
|
||||
compressed_size = len(compressed) + 9
|
||||
if compressed_size < payload_size:
|
||||
compressed_estimate = compressed_size
|
||||
# Use compressed size for fit calculation
|
||||
estimated_encrypted_size = compressed_size + estimated_padding + ENCRYPTION_OVERHEAD
|
||||
except Exception:
|
||||
pass # Ignore compression errors
|
||||
|
||||
headroom = capacity - estimated_encrypted_size
|
||||
fits = headroom >= 0
|
||||
usage_percent = (estimated_encrypted_size / capacity * 100) if capacity > 0 else 100.0
|
||||
|
||||
result = {
|
||||
'fits': fits,
|
||||
'payload_size': payload_size,
|
||||
'estimated_encrypted_size': estimated_encrypted_size,
|
||||
'capacity': capacity,
|
||||
'usage_percent': min(usage_percent, 100.0),
|
||||
'headroom': headroom,
|
||||
'compressed_estimate': compressed_estimate,
|
||||
}
|
||||
|
||||
debug.print(f"will_fit: payload={payload_size}, encrypted~={estimated_encrypted_size}, "
|
||||
f"capacity={capacity}, fits={fits}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@debug.time
|
||||
def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List[int]:
|
||||
"""
|
||||
@@ -172,6 +277,8 @@ def embed_in_image(
|
||||
Uses pseudo-random pixel selection based on pixel_key to scatter
|
||||
the data across the image, defeating statistical analysis.
|
||||
|
||||
Note: Output images have all metadata (EXIF, etc.) stripped automatically.
|
||||
|
||||
Args:
|
||||
carrier_data: Carrier image bytes
|
||||
encrypted_data: Data to embed
|
||||
@@ -274,7 +381,7 @@ def embed_in_image(
|
||||
|
||||
debug.print(f"Modified {modified_pixels} pixels (out of {len(selected_indices)} selected)")
|
||||
|
||||
# Create output image
|
||||
# Create output image (fresh image = no metadata/EXIF carried over)
|
||||
stego_img = Image.new('RGB', img.size)
|
||||
stego_img.putdata(new_pixels)
|
||||
|
||||
@@ -447,7 +554,7 @@ def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int:
|
||||
max_bytes = (num_pixels * bits_per_pixel) // 8
|
||||
|
||||
# Subtract overhead: 4 bytes length + ~100 bytes header
|
||||
capacity = max(0, max_bytes - 104)
|
||||
capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD)
|
||||
debug.print(f"Image capacity: {capacity} bytes at {bits_per_channel} bit(s)/channel")
|
||||
return capacity
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ Stegasoo Utilities
|
||||
Secure deletion, filename generation, and other helpers.
|
||||
"""
|
||||
|
||||
import io
|
||||
import os
|
||||
import random
|
||||
import secrets
|
||||
@@ -12,10 +13,50 @@ from datetime import date, datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from .constants import DAY_NAMES
|
||||
from .debug import debug
|
||||
|
||||
|
||||
def strip_image_metadata(image_data: bytes, output_format: str = 'PNG') -> bytes:
|
||||
"""
|
||||
Remove all metadata (EXIF, ICC profiles, etc.) from an image.
|
||||
|
||||
Creates a fresh image with only pixel data - no EXIF, GPS coordinates,
|
||||
camera info, timestamps, or other potentially sensitive metadata.
|
||||
|
||||
Args:
|
||||
image_data: Raw image bytes
|
||||
output_format: Output format ('PNG', 'BMP', 'TIFF')
|
||||
|
||||
Returns:
|
||||
Clean image bytes with no metadata
|
||||
|
||||
Example:
|
||||
>>> clean = strip_image_metadata(photo_bytes)
|
||||
>>> # EXIF data is now removed
|
||||
"""
|
||||
debug.print(f"Stripping metadata, output format: {output_format}")
|
||||
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
|
||||
# Convert to RGB if needed (handles RGBA, P, L, etc.)
|
||||
if img.mode not in ('RGB', 'RGBA'):
|
||||
img = img.convert('RGB')
|
||||
|
||||
# Create fresh image - this discards all metadata
|
||||
clean = Image.new(img.mode, img.size)
|
||||
clean.putdata(list(img.getdata()))
|
||||
|
||||
output = io.BytesIO()
|
||||
clean.save(output, output_format.upper())
|
||||
output.seek(0)
|
||||
|
||||
debug.print(f"Metadata stripped: {len(image_data)} -> {len(output.getvalue())} bytes")
|
||||
return output.getvalue()
|
||||
|
||||
|
||||
def generate_filename(
|
||||
date_str: Optional[str] = None,
|
||||
prefix: str = "",
|
||||
|
||||
Reference in New Issue
Block a user