A whoooole lotta 4.0.x fixes.
This commit is contained in:
@@ -18,7 +18,7 @@ from pathlib import Path
|
||||
# VERSION
|
||||
# ============================================================================
|
||||
|
||||
__version__ = "3.2.0"
|
||||
__version__ = "4.0.0"
|
||||
|
||||
# ============================================================================
|
||||
# FILE FORMAT
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
974
src/stegasoo/dct_steganography.py_old
Normal file
974
src/stegasoo/dct_steganography.py_old
Normal file
@@ -0,0 +1,974 @@
|
||||
"""
|
||||
DCT Domain Steganography Module (v3.2.0)
|
||||
|
||||
Embeds data in DCT coefficients with two approaches:
|
||||
1. PNG output: Scipy-based DCT transform (grayscale or color)
|
||||
2. JPEG output: jpegio-based coefficient manipulation (if available)
|
||||
|
||||
The JPEG approach is the "correct" way to do JPEG steganography because
|
||||
it directly modifies the already-quantized coefficients without re-encoding.
|
||||
|
||||
Changes in v3.0.2:
|
||||
- jpegio integration for proper JPEG coefficient embedding
|
||||
- Falls back to warning if jpegio not available for JPEG output
|
||||
- Maintains backward compatibility with v3.0.1
|
||||
|
||||
Changes in v3.2.0:
|
||||
- Fixed color-mode extraction to properly extract from Y channel
|
||||
- Added _extract_from_y_channel() for accurate color-mode extraction
|
||||
- Improved extraction robustness for both grayscale and color modes
|
||||
|
||||
Requires: scipy (for PNG mode), optionally jpegio (for JPEG mode)
|
||||
"""
|
||||
|
||||
import io
|
||||
import struct
|
||||
import hashlib
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Literal, Tuple
|
||||
from enum import Enum
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
# Check for scipy availability (for PNG/DCT mode)
|
||||
try:
|
||||
from scipy.fftpack import dct, idct
|
||||
HAS_SCIPY = True
|
||||
except ImportError:
|
||||
HAS_SCIPY = False
|
||||
dct = None
|
||||
idct = None
|
||||
|
||||
# Check for jpegio availability (for proper JPEG mode)
|
||||
try:
|
||||
import jpegio as jio
|
||||
HAS_JPEGIO = True
|
||||
except ImportError:
|
||||
HAS_JPEGIO = False
|
||||
jio = None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CONSTANTS
|
||||
# ============================================================================
|
||||
|
||||
# DCT block size (standard 8x8 like JPEG)
|
||||
BLOCK_SIZE = 8
|
||||
|
||||
# Coefficients to use for embedding (mid-frequency, zig-zag order positions)
|
||||
EMBED_POSITIONS = [
|
||||
(0, 1), (1, 0), (2, 0), (1, 1), (0, 2), (0, 3), (1, 2), (2, 1), (3, 0),
|
||||
(4, 0), (3, 1), (2, 2), (1, 3), (0, 4), (0, 5), (1, 4), (2, 3), (3, 2),
|
||||
(4, 1), (5, 0), (5, 1), (4, 2), (3, 3), (2, 4), (1, 5), (0, 6), (0, 7),
|
||||
(1, 6), (2, 5), (3, 4), (4, 3), (5, 2), (6, 1), (7, 0),
|
||||
]
|
||||
|
||||
# Use subset of mid-frequency coefficients for better robustness
|
||||
DEFAULT_EMBED_POSITIONS = EMBED_POSITIONS[4:20] # 16 coefficients per block
|
||||
|
||||
# Quantization step for QIM embedding (larger = more robust, more visible)
|
||||
QUANT_STEP = 25
|
||||
|
||||
# Magic bytes for DCT stego identification
|
||||
DCT_MAGIC = b'DCTS'
|
||||
|
||||
# Header size: magic(4) + version(1) + flags(1) + length(4) = 10 bytes
|
||||
HEADER_SIZE = 10
|
||||
|
||||
# Output format options
|
||||
OUTPUT_FORMAT_PNG = 'png'
|
||||
OUTPUT_FORMAT_JPEG = 'jpeg'
|
||||
|
||||
# JPEG output quality (only for fallback mode, not jpegio)
|
||||
JPEG_OUTPUT_QUALITY = 95
|
||||
|
||||
# jpegio constants for JPEG coefficient embedding
|
||||
JPEGIO_MAGIC = b'JPGS'
|
||||
JPEGIO_MIN_COEF_MAGNITUDE = 2
|
||||
JPEGIO_EMBED_CHANNEL = 0 # Y channel
|
||||
|
||||
# Flag bits for header
|
||||
FLAG_COLOR_MODE = 0x01 # Set if embedded in color mode (Y channel of YCbCr)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DATA CLASSES
|
||||
# ============================================================================
|
||||
|
||||
class DCTOutputFormat(Enum):
|
||||
"""Output format for DCT stego images."""
|
||||
PNG = 'png'
|
||||
JPEG = 'jpeg'
|
||||
|
||||
|
||||
@dataclass
|
||||
class DCTEmbedStats:
|
||||
"""Statistics from DCT embedding operation."""
|
||||
blocks_used: int
|
||||
blocks_available: int
|
||||
bits_embedded: int
|
||||
capacity_bits: int
|
||||
usage_percent: float
|
||||
image_width: int
|
||||
image_height: int
|
||||
output_format: str
|
||||
jpeg_native: bool = False # True if used jpegio for proper JPEG embedding
|
||||
color_mode: str = 'grayscale' # 'color' or 'grayscale' (v3.0.1+)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DCTCapacityInfo:
|
||||
"""Capacity information for a carrier image."""
|
||||
width: int
|
||||
height: int
|
||||
blocks_x: int
|
||||
blocks_y: int
|
||||
total_blocks: int
|
||||
bits_per_block: int
|
||||
total_capacity_bits: int
|
||||
total_capacity_bytes: int
|
||||
usable_capacity_bytes: int
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# AVAILABILITY CHECKS
|
||||
# ============================================================================
|
||||
|
||||
def _check_scipy():
|
||||
"""Raise ImportError if scipy is not available."""
|
||||
if not HAS_SCIPY:
|
||||
raise ImportError(
|
||||
"DCT steganography requires scipy. "
|
||||
"Install with: pip install scipy"
|
||||
)
|
||||
|
||||
|
||||
def has_dct_support() -> bool:
|
||||
"""Check if DCT steganography is available (scipy installed)."""
|
||||
return HAS_SCIPY
|
||||
|
||||
|
||||
def has_jpegio_support() -> bool:
|
||||
"""Check if jpegio is available for proper JPEG coefficient embedding."""
|
||||
return HAS_JPEGIO
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# SCIPY DCT HELPERS (for PNG output)
|
||||
# ============================================================================
|
||||
|
||||
def _dct2(block: np.ndarray) -> np.ndarray:
|
||||
"""Apply 2D DCT to a block."""
|
||||
return dct(dct(block.T, norm='ortho').T, norm='ortho')
|
||||
|
||||
|
||||
def _idct2(block: np.ndarray) -> np.ndarray:
|
||||
"""Apply 2D inverse DCT to a block."""
|
||||
return idct(idct(block.T, norm='ortho').T, norm='ortho')
|
||||
|
||||
|
||||
def _to_grayscale(image_data: bytes) -> np.ndarray:
|
||||
"""Convert image bytes to grayscale numpy array."""
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
gray = img.convert('L')
|
||||
return np.array(gray, dtype=np.float64)
|
||||
|
||||
|
||||
def _extract_y_channel(image_data: bytes) -> np.ndarray:
|
||||
"""
|
||||
Extract Y (luminance) channel from image for color-mode extraction.
|
||||
|
||||
This uses the same YCbCr conversion as embedding to ensure
|
||||
accurate extraction from color-mode stego images.
|
||||
|
||||
Args:
|
||||
image_data: Image file bytes
|
||||
|
||||
Returns:
|
||||
Y channel as float64 numpy array
|
||||
"""
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
|
||||
# Convert to RGB if needed
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
rgb_array = np.array(img, dtype=np.float64)
|
||||
|
||||
# Extract Y channel using ITU-R BT.601 (same as embedding)
|
||||
R = rgb_array[:, :, 0]
|
||||
G = rgb_array[:, :, 1]
|
||||
B = rgb_array[:, :, 2]
|
||||
|
||||
Y = 0.299 * R + 0.587 * G + 0.114 * B
|
||||
|
||||
return Y
|
||||
|
||||
|
||||
def _pad_to_blocks(image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
|
||||
"""Pad image dimensions to be divisible by block size."""
|
||||
h, w = image.shape
|
||||
new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
|
||||
new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
|
||||
|
||||
if new_h == h and new_w == w:
|
||||
return image, (h, w)
|
||||
|
||||
padded = np.zeros((new_h, new_w), dtype=image.dtype)
|
||||
padded[:h, :w] = image
|
||||
|
||||
if new_h > h:
|
||||
padded[h:, :w] = image[h-(new_h-h):h, :w][::-1, :]
|
||||
if new_w > w:
|
||||
padded[:h, w:] = image[:h, w-(new_w-w):w][:, ::-1]
|
||||
if new_h > h and new_w > w:
|
||||
padded[h:, w:] = image[h-(new_h-h):h, w-(new_w-w):w][::-1, ::-1]
|
||||
|
||||
return padded, (h, w)
|
||||
|
||||
|
||||
def _unpad_image(image: np.ndarray, original_size: Tuple[int, int]) -> np.ndarray:
|
||||
"""Remove padding from image."""
|
||||
h, w = original_size
|
||||
return image[:h, :w]
|
||||
|
||||
|
||||
def _embed_bit_in_coeff(coef: float, bit: int, quant_step: int = QUANT_STEP) -> float:
|
||||
"""Embed a single bit into a DCT coefficient using QIM."""
|
||||
quantized = round(coef / quant_step)
|
||||
if (quantized % 2) != bit:
|
||||
if quantized % 2 == 0 and bit == 1:
|
||||
quantized += 1 if coef >= quantized * quant_step else -1
|
||||
elif quantized % 2 == 1 and bit == 0:
|
||||
quantized += 1 if coef >= quantized * quant_step else -1
|
||||
return quantized * quant_step
|
||||
|
||||
|
||||
def _extract_bit_from_coeff(coef: float, quant_step: int = QUANT_STEP) -> int:
|
||||
"""Extract a single bit from a DCT coefficient."""
|
||||
quantized = round(coef / quant_step)
|
||||
return quantized % 2
|
||||
|
||||
|
||||
def _generate_block_order(num_blocks: int, seed: bytes) -> list:
|
||||
"""Generate pseudo-random block order from seed."""
|
||||
hash_bytes = hashlib.sha256(seed).digest()
|
||||
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
|
||||
order = list(range(num_blocks))
|
||||
rng.shuffle(order)
|
||||
return order
|
||||
|
||||
|
||||
def _save_stego_image(image: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes:
|
||||
"""Save stego image in specified format (grayscale)."""
|
||||
clipped = np.clip(image, 0, 255).astype(np.uint8)
|
||||
img = Image.fromarray(clipped, mode='L')
|
||||
|
||||
buffer = io.BytesIO()
|
||||
|
||||
if output_format == OUTPUT_FORMAT_JPEG:
|
||||
img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY,
|
||||
subsampling=0, optimize=True)
|
||||
else:
|
||||
img.save(buffer, format='PNG', optimize=True)
|
||||
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
def _save_color_image(rgb_array: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes:
|
||||
"""Save color RGB image in specified format."""
|
||||
clipped = np.clip(rgb_array, 0, 255).astype(np.uint8)
|
||||
img = Image.fromarray(clipped, mode='RGB')
|
||||
|
||||
buffer = io.BytesIO()
|
||||
|
||||
if output_format == OUTPUT_FORMAT_JPEG:
|
||||
img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY,
|
||||
subsampling=0, optimize=True)
|
||||
else:
|
||||
img.save(buffer, format='PNG', optimize=True)
|
||||
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
def _rgb_to_ycbcr(rgb: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||||
"""
|
||||
Convert RGB array to YCbCr components.
|
||||
|
||||
Uses ITU-R BT.601 conversion (standard for JPEG).
|
||||
|
||||
Args:
|
||||
rgb: RGB image array (H, W, 3), float64
|
||||
|
||||
Returns:
|
||||
Tuple of (Y, Cb, Cr) arrays
|
||||
"""
|
||||
R = rgb[:, :, 0]
|
||||
G = rgb[:, :, 1]
|
||||
B = rgb[:, :, 2]
|
||||
|
||||
# ITU-R BT.601 conversion
|
||||
Y = 0.299 * R + 0.587 * G + 0.114 * B
|
||||
Cb = 128 - 0.168736 * R - 0.331264 * G + 0.5 * B
|
||||
Cr = 128 + 0.5 * R - 0.418688 * G - 0.081312 * B
|
||||
|
||||
return Y, Cb, Cr
|
||||
|
||||
|
||||
def _ycbcr_to_rgb(Y: np.ndarray, Cb: np.ndarray, Cr: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Convert YCbCr components back to RGB array.
|
||||
|
||||
Args:
|
||||
Y: Luminance channel
|
||||
Cb: Blue-difference chroma
|
||||
Cr: Red-difference chroma
|
||||
|
||||
Returns:
|
||||
RGB array (H, W, 3)
|
||||
"""
|
||||
R = Y + 1.402 * (Cr - 128)
|
||||
G = Y - 0.344136 * (Cb - 128) - 0.714136 * (Cr - 128)
|
||||
B = Y + 1.772 * (Cb - 128)
|
||||
|
||||
rgb = np.stack([R, G, B], axis=-1)
|
||||
return rgb
|
||||
|
||||
|
||||
def _create_header(data_length: int, flags: int = 0) -> bytes:
|
||||
"""Create DCT stego header."""
|
||||
version = 1
|
||||
return struct.pack('>4sBBI', DCT_MAGIC, version, flags, data_length)
|
||||
|
||||
|
||||
def _parse_header(header_bits: list) -> Tuple[int, int, int]:
|
||||
"""Parse header from extracted bits. Returns (version, flags, data_length)."""
|
||||
if len(header_bits) < HEADER_SIZE * 8:
|
||||
raise ValueError("Insufficient header data")
|
||||
|
||||
header_bytes = bytes([
|
||||
sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
|
||||
for i in range(HEADER_SIZE)
|
||||
])
|
||||
|
||||
magic, version, flags, length = struct.unpack('>4sBBI', header_bytes)
|
||||
|
||||
if magic != DCT_MAGIC:
|
||||
raise ValueError("Invalid DCT stego magic bytes")
|
||||
|
||||
return version, flags, length
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# JPEGIO HELPERS (for proper JPEG output)
|
||||
# ============================================================================
|
||||
|
||||
def _jpegio_bytes_to_file(data: bytes, suffix: str = '.jpg') -> str:
|
||||
"""Write bytes to temp file for jpegio."""
|
||||
import tempfile
|
||||
import os
|
||||
fd, path = tempfile.mkstemp(suffix=suffix)
|
||||
try:
|
||||
os.write(fd, data)
|
||||
finally:
|
||||
os.close(fd)
|
||||
return path
|
||||
|
||||
|
||||
def _jpegio_file_to_bytes(path: str) -> bytes:
|
||||
"""Read file to bytes and delete it."""
|
||||
import os
|
||||
try:
|
||||
with open(path, 'rb') as f:
|
||||
return f.read()
|
||||
finally:
|
||||
try:
|
||||
os.unlink(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _jpegio_get_usable_positions(coef_array: np.ndarray) -> list:
|
||||
"""Get usable coefficient positions for jpegio embedding."""
|
||||
positions = []
|
||||
h, w = coef_array.shape
|
||||
|
||||
for row in range(h):
|
||||
for col in range(w):
|
||||
# Skip DC coefficients
|
||||
if (row % BLOCK_SIZE == 0) and (col % BLOCK_SIZE == 0):
|
||||
continue
|
||||
# Check magnitude
|
||||
if abs(coef_array[row, col]) >= JPEGIO_MIN_COEF_MAGNITUDE:
|
||||
positions.append((row, col))
|
||||
|
||||
return positions
|
||||
|
||||
|
||||
def _jpegio_generate_order(num_positions: int, seed: bytes) -> list:
|
||||
"""Generate pseudo-random order for jpegio embedding."""
|
||||
hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest()
|
||||
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
|
||||
order = list(range(num_positions))
|
||||
rng.shuffle(order)
|
||||
return order
|
||||
|
||||
|
||||
def _jpegio_create_header(data_length: int, flags: int = 0) -> bytes:
|
||||
"""Create header for jpegio embedding."""
|
||||
return struct.pack('>4sBBI', JPEGIO_MAGIC, 1, flags, data_length)
|
||||
|
||||
|
||||
def _jpegio_parse_header(header_bytes: bytes) -> Tuple[int, int, int]:
|
||||
"""Parse jpegio header."""
|
||||
if len(header_bytes) < HEADER_SIZE:
|
||||
raise ValueError("Insufficient header data")
|
||||
|
||||
magic, version, flags, length = struct.unpack('>4sBBI', header_bytes[:HEADER_SIZE])
|
||||
|
||||
if magic != JPEGIO_MAGIC:
|
||||
raise ValueError(f"Invalid JPEG stego magic: {magic}")
|
||||
|
||||
return version, flags, length
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PUBLIC API
|
||||
# ============================================================================
|
||||
|
||||
def calculate_dct_capacity(image_data: bytes) -> DCTCapacityInfo:
|
||||
"""
|
||||
Calculate the DCT embedding capacity of an image.
|
||||
|
||||
Args:
|
||||
image_data: Image file bytes
|
||||
|
||||
Returns:
|
||||
DCTCapacityInfo with capacity details
|
||||
"""
|
||||
_check_scipy()
|
||||
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
width, height = img.size
|
||||
|
||||
blocks_x = width // BLOCK_SIZE
|
||||
blocks_y = height // BLOCK_SIZE
|
||||
total_blocks = blocks_x * blocks_y
|
||||
|
||||
bits_per_block = len(DEFAULT_EMBED_POSITIONS)
|
||||
total_bits = total_blocks * bits_per_block
|
||||
total_bytes = total_bits // 8
|
||||
usable_bytes = max(0, total_bytes - HEADER_SIZE)
|
||||
|
||||
return DCTCapacityInfo(
|
||||
width=width,
|
||||
height=height,
|
||||
blocks_x=blocks_x,
|
||||
blocks_y=blocks_y,
|
||||
total_blocks=total_blocks,
|
||||
bits_per_block=bits_per_block,
|
||||
total_capacity_bits=total_bits,
|
||||
total_capacity_bytes=total_bytes,
|
||||
usable_capacity_bytes=usable_bytes
|
||||
)
|
||||
|
||||
|
||||
def will_fit_dct(data_length: int, image_data: bytes) -> bool:
|
||||
"""Check if data will fit in the image using DCT embedding."""
|
||||
capacity = calculate_dct_capacity(image_data)
|
||||
return data_length <= capacity.usable_capacity_bytes
|
||||
|
||||
|
||||
def estimate_capacity_comparison(image_data: bytes) -> dict:
|
||||
"""Compare LSB and DCT capacity for an image."""
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
width, height = img.size
|
||||
pixels = width * height
|
||||
|
||||
lsb_bytes = (pixels * 3) // 8
|
||||
|
||||
if HAS_SCIPY:
|
||||
dct_info = calculate_dct_capacity(image_data)
|
||||
dct_bytes = dct_info.usable_capacity_bytes
|
||||
else:
|
||||
blocks = (width // 8) * (height // 8)
|
||||
dct_bytes = (blocks * 16) // 8 - HEADER_SIZE
|
||||
|
||||
return {
|
||||
'width': width,
|
||||
'height': height,
|
||||
'lsb': {
|
||||
'capacity_bytes': lsb_bytes,
|
||||
'capacity_kb': lsb_bytes / 1024,
|
||||
'output': 'PNG/BMP (color)',
|
||||
},
|
||||
'dct': {
|
||||
'capacity_bytes': dct_bytes,
|
||||
'capacity_kb': dct_bytes / 1024,
|
||||
'output': 'PNG or JPEG (grayscale)',
|
||||
'ratio_vs_lsb': (dct_bytes / lsb_bytes * 100) if lsb_bytes > 0 else 0,
|
||||
'available': HAS_SCIPY,
|
||||
},
|
||||
'jpeg_native': {
|
||||
'available': HAS_JPEGIO,
|
||||
'note': 'Uses jpegio for proper JPEG coefficient embedding',
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def embed_in_dct(
|
||||
data: bytes,
|
||||
carrier_image: bytes,
|
||||
seed: bytes,
|
||||
output_format: str = OUTPUT_FORMAT_PNG,
|
||||
color_mode: str = 'color', # v3.0.1: 'color' or 'grayscale'
|
||||
) -> Tuple[bytes, DCTEmbedStats]:
|
||||
"""
|
||||
Embed data into image using DCT coefficient modification.
|
||||
|
||||
For PNG output: Uses scipy DCT transform
|
||||
For JPEG output: Uses jpegio if available for proper coefficient embedding
|
||||
|
||||
Args:
|
||||
data: Data to embed
|
||||
carrier_image: Carrier image bytes
|
||||
seed: Seed for pseudo-random selection
|
||||
output_format: 'png' (default, lossless) or 'jpeg'
|
||||
color_mode: 'color' (preserve colors) or 'grayscale' (v3.0.1+)
|
||||
|
||||
Returns:
|
||||
Tuple of (stego_image_bytes, stats)
|
||||
"""
|
||||
# Validate output format
|
||||
if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG):
|
||||
raise ValueError(f"Invalid output format: {output_format}")
|
||||
|
||||
# Validate color mode
|
||||
if color_mode not in ('color', 'grayscale'):
|
||||
color_mode = 'color' # Default to color
|
||||
|
||||
# For JPEG output, try to use jpegio for proper coefficient embedding
|
||||
# Note: jpegio naturally preserves color (works in YCbCr space)
|
||||
if output_format == OUTPUT_FORMAT_JPEG:
|
||||
if HAS_JPEGIO:
|
||||
return _embed_jpegio(data, carrier_image, seed, color_mode)
|
||||
else:
|
||||
# Fall back to scipy + PIL JPEG (WARNING: may not decode properly)
|
||||
import warnings
|
||||
warnings.warn(
|
||||
"jpegio not available. JPEG output may not decode correctly. "
|
||||
"Install jpegio for proper JPEG steganography support.",
|
||||
RuntimeWarning
|
||||
)
|
||||
# Continue with scipy method but output as JPEG
|
||||
|
||||
# PNG output or JPEG fallback: use scipy DCT method
|
||||
_check_scipy()
|
||||
return _embed_scipy_dct(data, carrier_image, seed, output_format, color_mode)
|
||||
|
||||
|
||||
def _embed_scipy_dct(
|
||||
data: bytes,
|
||||
carrier_image: bytes,
|
||||
seed: bytes,
|
||||
output_format: str,
|
||||
color_mode: str = 'color',
|
||||
) -> Tuple[bytes, DCTEmbedStats]:
|
||||
"""Embed using scipy DCT (for PNG output), with color preservation option."""
|
||||
capacity_info = calculate_dct_capacity(carrier_image)
|
||||
|
||||
if len(data) > capacity_info.usable_capacity_bytes:
|
||||
raise ValueError(
|
||||
f"Data too large ({len(data)} bytes) for carrier "
|
||||
f"(capacity: {capacity_info.usable_capacity_bytes} bytes)"
|
||||
)
|
||||
|
||||
# Load image
|
||||
img = Image.open(io.BytesIO(carrier_image))
|
||||
width, height = img.size
|
||||
|
||||
# Set flags for header
|
||||
flags = FLAG_COLOR_MODE if color_mode == 'color' else 0
|
||||
|
||||
if color_mode == 'color' and img.mode in ('RGB', 'RGBA'):
|
||||
# Color mode: convert to YCbCr, embed in Y only, preserve Cb/Cr
|
||||
if img.mode == 'RGBA':
|
||||
img = img.convert('RGB')
|
||||
|
||||
rgb_array = np.array(img, dtype=np.float64)
|
||||
Y, Cb, Cr = _rgb_to_ycbcr(rgb_array)
|
||||
|
||||
# Pad Y channel
|
||||
Y_padded, original_size = _pad_to_blocks(Y)
|
||||
|
||||
# Embed in Y channel (with color flag)
|
||||
Y_embedded = _embed_in_channel(Y_padded, data, seed, capacity_info, flags)
|
||||
|
||||
# Unpad
|
||||
Y_result = _unpad_image(Y_embedded, original_size)
|
||||
|
||||
# Convert back to RGB
|
||||
result_rgb = _ycbcr_to_rgb(Y_result, Cb, Cr)
|
||||
|
||||
# Save as color image
|
||||
stego_bytes = _save_color_image(result_rgb, output_format)
|
||||
else:
|
||||
# Grayscale mode: original behavior
|
||||
image = _to_grayscale(carrier_image)
|
||||
padded, original_size = _pad_to_blocks(image)
|
||||
|
||||
embedded = _embed_in_channel(padded, data, seed, capacity_info, flags)
|
||||
|
||||
result = _unpad_image(embedded, original_size)
|
||||
stego_bytes = _save_stego_image(result, output_format)
|
||||
|
||||
# Calculate stats
|
||||
header = _create_header(len(data), flags)
|
||||
payload = header + data
|
||||
bits = len(payload) * 8
|
||||
|
||||
stats = DCTEmbedStats(
|
||||
blocks_used=(bits + len(DEFAULT_EMBED_POSITIONS) - 1) // len(DEFAULT_EMBED_POSITIONS),
|
||||
blocks_available=capacity_info.total_blocks,
|
||||
bits_embedded=bits,
|
||||
capacity_bits=capacity_info.total_capacity_bits,
|
||||
usage_percent=(bits / capacity_info.total_capacity_bits) * 100,
|
||||
image_width=width,
|
||||
image_height=height,
|
||||
output_format=output_format,
|
||||
jpeg_native=False,
|
||||
color_mode=color_mode,
|
||||
)
|
||||
|
||||
return stego_bytes, stats
|
||||
|
||||
|
||||
def _embed_in_channel(
|
||||
channel: np.ndarray,
|
||||
data: bytes,
|
||||
seed: bytes,
|
||||
capacity_info: DCTCapacityInfo,
|
||||
flags: int = 0,
|
||||
) -> np.ndarray:
|
||||
"""Embed data in a single channel using DCT."""
|
||||
header = _create_header(len(data), flags)
|
||||
payload = header + data
|
||||
|
||||
bits = []
|
||||
for byte in payload:
|
||||
for i in range(7, -1, -1):
|
||||
bits.append((byte >> i) & 1)
|
||||
|
||||
num_blocks = capacity_info.total_blocks
|
||||
block_order = _generate_block_order(num_blocks, seed)
|
||||
|
||||
h, w = channel.shape
|
||||
result = channel.copy()
|
||||
|
||||
bit_idx = 0
|
||||
for block_num in block_order:
|
||||
if bit_idx >= len(bits):
|
||||
break
|
||||
|
||||
by = (block_num // (w // BLOCK_SIZE)) * BLOCK_SIZE
|
||||
bx = (block_num % (w // BLOCK_SIZE)) * BLOCK_SIZE
|
||||
|
||||
block = result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE].copy()
|
||||
dct_block = _dct2(block)
|
||||
|
||||
for pos in DEFAULT_EMBED_POSITIONS:
|
||||
if bit_idx >= len(bits):
|
||||
break
|
||||
dct_block[pos] = _embed_bit_in_coeff(dct_block[pos], bits[bit_idx])
|
||||
bit_idx += 1
|
||||
|
||||
modified_block = _idct2(dct_block)
|
||||
result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] = modified_block
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _embed_jpegio(
|
||||
data: bytes,
|
||||
carrier_image: bytes,
|
||||
seed: bytes,
|
||||
color_mode: str = 'color',
|
||||
) -> Tuple[bytes, DCTEmbedStats]:
|
||||
"""
|
||||
Embed using jpegio for proper JPEG coefficient modification.
|
||||
|
||||
Note: jpegio naturally preserves color since JPEG stores YCbCr
|
||||
and we only modify Y channel coefficients.
|
||||
"""
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
# Check if carrier is JPEG - if not, convert it
|
||||
img = Image.open(io.BytesIO(carrier_image))
|
||||
width, height = img.size
|
||||
|
||||
if img.format != 'JPEG':
|
||||
# Convert to JPEG first
|
||||
buffer = io.BytesIO()
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
img.save(buffer, format='JPEG', quality=95, subsampling=0)
|
||||
carrier_image = buffer.getvalue()
|
||||
|
||||
# Write carrier to temp file
|
||||
input_path = _jpegio_bytes_to_file(carrier_image, suffix='.jpg')
|
||||
output_path = tempfile.mktemp(suffix='.jpg')
|
||||
|
||||
# Set flags
|
||||
flags = FLAG_COLOR_MODE if color_mode == 'color' else 0
|
||||
|
||||
try:
|
||||
# Read JPEG with jpegio
|
||||
jpeg = jio.read(input_path)
|
||||
|
||||
# Get Y channel coefficients (channel 0)
|
||||
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
|
||||
|
||||
# Find usable positions
|
||||
all_positions = _jpegio_get_usable_positions(coef_array)
|
||||
|
||||
# Generate pseudo-random order
|
||||
order = _jpegio_generate_order(len(all_positions), seed)
|
||||
|
||||
# Create payload with flags
|
||||
header = _jpegio_create_header(len(data), flags)
|
||||
payload = header + data
|
||||
|
||||
# Convert to bits
|
||||
bits = []
|
||||
for byte in payload:
|
||||
for i in range(7, -1, -1):
|
||||
bits.append((byte >> i) & 1)
|
||||
|
||||
if len(bits) > len(all_positions):
|
||||
raise ValueError(
|
||||
f"Payload too large: {len(bits)} bits, "
|
||||
f"only {len(all_positions)} usable coefficients"
|
||||
)
|
||||
|
||||
# Embed using LSB
|
||||
coefs_used = 0
|
||||
for bit_idx, pos_idx in enumerate(order):
|
||||
if bit_idx >= len(bits):
|
||||
break
|
||||
|
||||
row, col = all_positions[pos_idx]
|
||||
coef = coef_array[row, col]
|
||||
|
||||
# Embed bit in LSB
|
||||
if (coef & 1) != bits[bit_idx]:
|
||||
if coef > 0:
|
||||
coef_array[row, col] = coef - 1 if (coef & 1) else coef + 1
|
||||
else:
|
||||
coef_array[row, col] = coef + 1 if (coef & 1) else coef - 1
|
||||
|
||||
coefs_used += 1
|
||||
|
||||
# Write modified JPEG
|
||||
jio.write(jpeg, output_path)
|
||||
|
||||
# Read back as bytes
|
||||
with open(output_path, 'rb') as f:
|
||||
stego_bytes = f.read()
|
||||
|
||||
stats = DCTEmbedStats(
|
||||
blocks_used=coefs_used // 63, # Approximate blocks
|
||||
blocks_available=len(all_positions) // 63,
|
||||
bits_embedded=len(bits),
|
||||
capacity_bits=len(all_positions),
|
||||
usage_percent=(len(bits) / len(all_positions)) * 100 if all_positions else 0,
|
||||
image_width=width,
|
||||
image_height=height,
|
||||
output_format=OUTPUT_FORMAT_JPEG,
|
||||
jpeg_native=True,
|
||||
color_mode=color_mode, # JPEG naturally preserves color
|
||||
)
|
||||
|
||||
return stego_bytes, stats
|
||||
|
||||
finally:
|
||||
for path in [input_path, output_path]:
|
||||
try:
|
||||
os.unlink(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def extract_from_dct(
|
||||
stego_image: bytes,
|
||||
seed: bytes,
|
||||
) -> bytes:
|
||||
"""
|
||||
Extract data from DCT stego image.
|
||||
|
||||
Automatically detects whether image uses scipy DCT or jpegio embedding,
|
||||
and handles both grayscale and color modes.
|
||||
|
||||
Args:
|
||||
stego_image: Stego image bytes
|
||||
seed: Same seed used for embedding
|
||||
|
||||
Returns:
|
||||
Extracted data bytes
|
||||
"""
|
||||
# Check image format
|
||||
img = Image.open(io.BytesIO(stego_image))
|
||||
|
||||
if img.format == 'JPEG' and HAS_JPEGIO:
|
||||
# Try jpegio extraction first
|
||||
try:
|
||||
return _extract_jpegio(stego_image, seed)
|
||||
except ValueError:
|
||||
# If jpegio magic not found, fall back to scipy method
|
||||
pass
|
||||
|
||||
# PNG or fallback: use scipy DCT method
|
||||
_check_scipy()
|
||||
return _extract_scipy_dct(stego_image, seed)
|
||||
|
||||
|
||||
def _extract_scipy_dct(stego_image: bytes, seed: bytes) -> bytes:
|
||||
"""
|
||||
Extract using scipy DCT (for PNG images).
|
||||
|
||||
v3.2.0: Now properly handles both grayscale and color modes by
|
||||
first trying to detect the mode from header flags, then extracting
|
||||
from the appropriate channel.
|
||||
"""
|
||||
# First, try extracting from grayscale to get header and detect mode
|
||||
# This works because even color-mode images can be converted to grayscale
|
||||
# and the Y channel ≈ grayscale for extraction purposes
|
||||
|
||||
# Try Y channel extraction first (works for both color and grayscale)
|
||||
img = Image.open(io.BytesIO(stego_image))
|
||||
|
||||
if img.mode in ('RGB', 'RGBA'):
|
||||
# Extract from Y channel (more accurate for color-mode images)
|
||||
channel = _extract_y_channel(stego_image)
|
||||
else:
|
||||
# Grayscale image
|
||||
channel = _to_grayscale(stego_image)
|
||||
|
||||
padded, original_size = _pad_to_blocks(channel)
|
||||
|
||||
h, w = padded.shape
|
||||
blocks_x = w // BLOCK_SIZE
|
||||
blocks_y = h // BLOCK_SIZE
|
||||
num_blocks = blocks_x * blocks_y
|
||||
|
||||
block_order = _generate_block_order(num_blocks, seed)
|
||||
|
||||
all_bits = []
|
||||
|
||||
for block_num in block_order:
|
||||
by = (block_num // blocks_x) * BLOCK_SIZE
|
||||
bx = (block_num % blocks_x) * BLOCK_SIZE
|
||||
|
||||
block = padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE]
|
||||
dct_block = _dct2(block)
|
||||
|
||||
for pos in DEFAULT_EMBED_POSITIONS:
|
||||
bit = _extract_bit_from_coeff(dct_block[pos])
|
||||
all_bits.append(bit)
|
||||
|
||||
if len(all_bits) >= HEADER_SIZE * 8:
|
||||
try:
|
||||
_, flags, data_length = _parse_header(all_bits[:HEADER_SIZE * 8])
|
||||
total_needed = (HEADER_SIZE + data_length) * 8
|
||||
if len(all_bits) >= total_needed:
|
||||
break
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
version, flags, data_length = _parse_header(all_bits)
|
||||
|
||||
# Check if color mode flag is set (for informational purposes)
|
||||
is_color_mode = bool(flags & FLAG_COLOR_MODE)
|
||||
|
||||
data_bits = all_bits[HEADER_SIZE * 8:(HEADER_SIZE + data_length) * 8]
|
||||
|
||||
data = bytes([
|
||||
sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
|
||||
for i in range(data_length)
|
||||
])
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def _extract_jpegio(stego_image: bytes, seed: bytes) -> bytes:
|
||||
"""Extract using jpegio for JPEG images."""
|
||||
import os
|
||||
|
||||
temp_path = _jpegio_bytes_to_file(stego_image, suffix='.jpg')
|
||||
|
||||
try:
|
||||
jpeg = jio.read(temp_path)
|
||||
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
|
||||
|
||||
all_positions = _jpegio_get_usable_positions(coef_array)
|
||||
order = _jpegio_generate_order(len(all_positions), seed)
|
||||
|
||||
# Extract header bits
|
||||
header_bits = []
|
||||
for pos_idx in order[:HEADER_SIZE * 8]:
|
||||
row, col = all_positions[pos_idx]
|
||||
coef = coef_array[row, col]
|
||||
header_bits.append(coef & 1)
|
||||
|
||||
header_bytes = bytes([
|
||||
sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
|
||||
for i in range(HEADER_SIZE)
|
||||
])
|
||||
|
||||
version, flags, data_length = _jpegio_parse_header(header_bytes)
|
||||
|
||||
# Extract all needed bits
|
||||
total_bits_needed = (HEADER_SIZE + data_length) * 8
|
||||
|
||||
all_bits = []
|
||||
for bit_idx, pos_idx in enumerate(order):
|
||||
if bit_idx >= total_bits_needed:
|
||||
break
|
||||
row, col = all_positions[pos_idx]
|
||||
coef = coef_array[row, col]
|
||||
all_bits.append(coef & 1)
|
||||
|
||||
# Extract data
|
||||
data_bits = all_bits[HEADER_SIZE * 8:]
|
||||
|
||||
data = bytes([
|
||||
sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
|
||||
for i in range(data_length)
|
||||
])
|
||||
|
||||
return data
|
||||
|
||||
finally:
|
||||
try:
|
||||
os.unlink(temp_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CONVENIENCE FUNCTIONS
|
||||
# ============================================================================
|
||||
|
||||
def get_output_extension(output_format: str) -> str:
|
||||
"""Get file extension for output format."""
|
||||
if output_format == OUTPUT_FORMAT_JPEG:
|
||||
return '.jpg'
|
||||
return '.png'
|
||||
|
||||
|
||||
def get_output_mimetype(output_format: str) -> str:
|
||||
"""Get MIME type for output format."""
|
||||
if output_format == OUTPUT_FORMAT_JPEG:
|
||||
return 'image/jpeg'
|
||||
return 'image/png'
|
||||
@@ -234,15 +234,16 @@ def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int:
|
||||
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
|
||||
|
||||
img_file = Image.open(io.BytesIO(image_data))
|
||||
img = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file
|
||||
|
||||
num_pixels = img.size[0] * img.size[1]
|
||||
bits_per_pixel = 3 * bits_per_channel
|
||||
max_bytes = (num_pixels * bits_per_pixel) // 8
|
||||
|
||||
capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD)
|
||||
debug.print(f"LSB capacity: {capacity} bytes at {bits_per_channel} bit(s)/channel")
|
||||
return capacity
|
||||
try:
|
||||
num_pixels = img_file.size[0] * img_file.size[1]
|
||||
bits_per_pixel = 3 * bits_per_channel
|
||||
max_bytes = (num_pixels * bits_per_pixel) // 8
|
||||
|
||||
capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD)
|
||||
debug.print(f"LSB capacity: {capacity} bytes at {bits_per_channel} bit(s)/channel")
|
||||
return capacity
|
||||
finally:
|
||||
img_file.close()
|
||||
|
||||
|
||||
def calculate_capacity_by_mode(
|
||||
@@ -279,7 +280,10 @@ def calculate_capacity_by_mode(
|
||||
else:
|
||||
capacity = calculate_capacity(image_data, bits_per_channel)
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
width, height = img.size
|
||||
try:
|
||||
width, height = img.size
|
||||
finally:
|
||||
img.close()
|
||||
|
||||
return {
|
||||
'mode': EMBED_MODE_LSB,
|
||||
@@ -378,7 +382,10 @@ def compare_modes(image_data: bytes) -> dict:
|
||||
Dict with comparison of LSB vs DCT modes
|
||||
"""
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
width, height = img.size
|
||||
try:
|
||||
width, height = img.size
|
||||
finally:
|
||||
img.close()
|
||||
|
||||
lsb_bytes = calculate_capacity(image_data, 1)
|
||||
|
||||
@@ -590,6 +597,10 @@ def _embed_lsb(
|
||||
debug.validate(len(pixel_key) == 32,
|
||||
f"Pixel key must be 32 bytes, got {len(pixel_key)}")
|
||||
|
||||
img_file = None
|
||||
img = None
|
||||
stego_img = None
|
||||
|
||||
try:
|
||||
img_file = Image.open(io.BytesIO(image_data))
|
||||
input_format = img_file.format
|
||||
@@ -690,6 +701,14 @@ def _embed_lsb(
|
||||
except Exception as e:
|
||||
debug.exception(e, "embed_lsb")
|
||||
raise EmbeddingError(f"Failed to embed data: {e}") from e
|
||||
finally:
|
||||
# Properly close all PIL Images to prevent memory leaks
|
||||
if stego_img is not None:
|
||||
stego_img.close()
|
||||
if img is not None and img is not img_file:
|
||||
img.close()
|
||||
if img_file is not None:
|
||||
img_file.close()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -768,6 +787,9 @@ def _extract_lsb(
|
||||
debug.validate(bits_per_channel in (1, 2),
|
||||
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
|
||||
|
||||
img_file = None
|
||||
img = None
|
||||
|
||||
try:
|
||||
img_file = Image.open(io.BytesIO(image_data))
|
||||
debug.print(f"Image: {img_file.size[0]}x{img_file.size[1]}, format: {img_file.format}")
|
||||
@@ -843,6 +865,12 @@ def _extract_lsb(
|
||||
except Exception as e:
|
||||
debug.exception(e, "extract_lsb")
|
||||
return None
|
||||
finally:
|
||||
# Properly close all PIL Images to prevent memory leaks
|
||||
if img is not None and img is not img_file:
|
||||
img.close()
|
||||
if img_file is not None:
|
||||
img_file.close()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -853,18 +881,24 @@ def get_image_dimensions(image_data: bytes) -> Tuple[int, int]:
|
||||
"""Get image dimensions without loading full image."""
|
||||
debug.validate(len(image_data) > 0, "Image data cannot be empty")
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
dimensions = img.size
|
||||
debug.print(f"Image dimensions: {dimensions[0]}x{dimensions[1]}")
|
||||
return dimensions
|
||||
try:
|
||||
dimensions = img.size
|
||||
debug.print(f"Image dimensions: {dimensions[0]}x{dimensions[1]}")
|
||||
return dimensions
|
||||
finally:
|
||||
img.close()
|
||||
|
||||
|
||||
def get_image_format(image_data: bytes) -> Optional[str]:
|
||||
"""Get image format (PIL format string like 'PNG', 'JPEG')."""
|
||||
try:
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
format_str = img.format
|
||||
debug.print(f"Image format: {format_str}")
|
||||
return format_str
|
||||
try:
|
||||
format_str = img.format
|
||||
debug.print(f"Image format: {format_str}")
|
||||
return format_str
|
||||
finally:
|
||||
img.close()
|
||||
except Exception as e:
|
||||
debug.print(f"Failed to get image format: {e}")
|
||||
return None
|
||||
|
||||
878
src/stegasoo/steganography.py_old
Normal file
878
src/stegasoo/steganography.py_old
Normal file
@@ -0,0 +1,878 @@
|
||||
"""
|
||||
Stegasoo Steganography Functions (v3.2.0)
|
||||
|
||||
LSB and DCT embedding modes with pseudo-random pixel/coefficient selection.
|
||||
|
||||
Changes in v3.0:
|
||||
- DCT domain embedding mode (requires scipy)
|
||||
- embed_mode parameter for encode/decode
|
||||
- Auto-detection of embedding mode
|
||||
- Comparison utilities
|
||||
|
||||
Changes in v3.0.1:
|
||||
- dct_output_format parameter for DCT mode ('png' or 'jpeg')
|
||||
- dct_color_mode parameter for DCT mode ('grayscale' or 'color')
|
||||
|
||||
Changes in v3.2.0:
|
||||
- Fixed HEADER_OVERHEAD constant (65 bytes, not 104 - date field removed)
|
||||
- Updated ENCRYPTION_OVERHEAD calculation
|
||||
"""
|
||||
|
||||
import io
|
||||
import struct
|
||||
from typing import Optional, Tuple, List, Union
|
||||
|
||||
from PIL import Image
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
from .models import EmbedStats, FilePayload
|
||||
from .exceptions import CapacityError, ExtractionError, EmbeddingError
|
||||
from .debug import debug
|
||||
from .constants import (
|
||||
EMBED_MODE_LSB,
|
||||
EMBED_MODE_DCT,
|
||||
EMBED_MODE_AUTO,
|
||||
VALID_EMBED_MODES,
|
||||
)
|
||||
|
||||
|
||||
# Lossless formats that preserve LSB data
|
||||
LOSSLESS_FORMATS = {'PNG', 'BMP', 'TIFF'}
|
||||
|
||||
# Format to extension mapping
|
||||
FORMAT_TO_EXT = {
|
||||
'PNG': 'png',
|
||||
'BMP': 'bmp',
|
||||
'TIFF': 'tiff',
|
||||
}
|
||||
|
||||
# Extension to PIL format mapping
|
||||
EXT_TO_FORMAT = {
|
||||
'png': 'PNG',
|
||||
'bmp': 'BMP',
|
||||
'tiff': 'TIFF',
|
||||
'tif': 'TIFF',
|
||||
}
|
||||
|
||||
# =============================================================================
|
||||
# OVERHEAD CONSTANTS (v3.2.0 - Updated for date-independent format)
|
||||
# =============================================================================
|
||||
# v3.2.0 Header format (no date field):
|
||||
# Magic: 4 bytes (\x89ST3)
|
||||
# Version: 1 byte (4 for v3.2.0)
|
||||
# Salt: 32 bytes
|
||||
# IV: 12 bytes
|
||||
# Tag: 16 bytes
|
||||
# -----------------
|
||||
# Total: 65 bytes
|
||||
#
|
||||
# Previous v3.1.0 had date field (10 bytes + 1 byte length) = 76 bytes header
|
||||
# The old value of 104 was incorrect even for v3.1.0
|
||||
|
||||
HEADER_OVERHEAD = 65 # v3.2.0: Magic + version + salt + iv + tag
|
||||
LENGTH_PREFIX = 4 # 4 bytes for payload length in LSB embedding
|
||||
ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX # 69 bytes total
|
||||
|
||||
# DCT output format options (v3.0.1)
|
||||
DCT_OUTPUT_PNG = 'png'
|
||||
DCT_OUTPUT_JPEG = 'jpeg'
|
||||
|
||||
# DCT color mode options (v3.0.1)
|
||||
DCT_COLOR_GRAYSCALE = 'grayscale'
|
||||
DCT_COLOR_COLOR = 'color'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DCT MODULE LAZY LOADING
|
||||
# =============================================================================
|
||||
|
||||
_dct_module = None
|
||||
|
||||
|
||||
def _get_dct_module():
|
||||
"""Lazy load DCT module to avoid scipy import if not needed."""
|
||||
global _dct_module
|
||||
if _dct_module is None:
|
||||
from . import dct_steganography
|
||||
_dct_module = dct_steganography
|
||||
return _dct_module
|
||||
|
||||
|
||||
def has_dct_support() -> bool:
|
||||
"""
|
||||
Check if DCT steganography mode is available.
|
||||
|
||||
Returns:
|
||||
True if scipy is installed and DCT functions work
|
||||
|
||||
Example:
|
||||
>>> if has_dct_support():
|
||||
... result = encode(..., embed_mode='dct')
|
||||
"""
|
||||
try:
|
||||
dct_mod = _get_dct_module()
|
||||
return dct_mod.has_dct_support()
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# FORMAT UTILITIES
|
||||
# =============================================================================
|
||||
|
||||
def get_output_format(input_format: Optional[str]) -> Tuple[str, str]:
|
||||
"""
|
||||
Determine the output format based on input format.
|
||||
|
||||
Args:
|
||||
input_format: PIL format string of input image (e.g., 'JPEG', 'PNG')
|
||||
|
||||
Returns:
|
||||
Tuple of (PIL format string, file extension) for output
|
||||
Falls back to PNG for lossy or unknown formats.
|
||||
"""
|
||||
debug.validate(input_format is None or isinstance(input_format, str),
|
||||
"Input format must be string or None")
|
||||
|
||||
if input_format and input_format.upper() in LOSSLESS_FORMATS:
|
||||
fmt = input_format.upper()
|
||||
ext = FORMAT_TO_EXT.get(fmt, 'png')
|
||||
debug.print(f"Using lossless format: {fmt} -> .{ext}")
|
||||
return fmt, ext
|
||||
|
||||
debug.print(f"Input format {input_format} is lossy or unknown, defaulting to PNG")
|
||||
return 'PNG', 'png'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# CAPACITY FUNCTIONS
|
||||
# =============================================================================
|
||||
|
||||
def will_fit(
|
||||
payload: Union[str, bytes, FilePayload, int],
|
||||
carrier_image: bytes,
|
||||
bits_per_channel: int = 1,
|
||||
include_compression_estimate: bool = True,
|
||||
) -> dict:
|
||||
"""
|
||||
Check if a payload will fit in a carrier image (LSB mode).
|
||||
|
||||
Args:
|
||||
payload: Message string, raw bytes, FilePayload, or size in bytes
|
||||
carrier_image: Carrier image bytes
|
||||
bits_per_channel: Bits to use per color channel (1-2)
|
||||
include_compression_estimate: Estimate compressed size
|
||||
|
||||
Returns:
|
||||
Dict with fits, capacity, usage info
|
||||
"""
|
||||
# Determine payload size
|
||||
if isinstance(payload, int):
|
||||
payload_size = payload
|
||||
payload_data = None
|
||||
elif isinstance(payload, str):
|
||||
payload_data = payload.encode('utf-8')
|
||||
payload_size = len(payload_data)
|
||||
elif isinstance(payload, FilePayload):
|
||||
payload_data = payload.data
|
||||
filename_overhead = len(payload.filename.encode('utf-8')) if payload.filename else 0
|
||||
mime_overhead = len(payload.mime_type.encode('utf-8')) if payload.mime_type else 0
|
||||
payload_size = len(payload.data) + filename_overhead + mime_overhead + 5
|
||||
else:
|
||||
payload_data = payload
|
||||
payload_size = len(payload)
|
||||
|
||||
capacity = calculate_capacity(carrier_image, bits_per_channel)
|
||||
|
||||
# Estimate encrypted size with padding
|
||||
# Padding adds 64-319 bytes, rounded up to 256-byte boundary
|
||||
# Average case: ~190 bytes padding
|
||||
estimated_padding = 190
|
||||
estimated_encrypted_size = payload_size + estimated_padding + ENCRYPTION_OVERHEAD
|
||||
|
||||
compressed_estimate = None
|
||||
if include_compression_estimate and payload_data is not None and len(payload_data) >= 64:
|
||||
try:
|
||||
import zlib
|
||||
compressed = zlib.compress(payload_data, level=6)
|
||||
compressed_size = len(compressed) + 9 # Compression header
|
||||
if compressed_size < payload_size:
|
||||
compressed_estimate = compressed_size
|
||||
estimated_encrypted_size = compressed_size + estimated_padding + ENCRYPTION_OVERHEAD
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
headroom = capacity - estimated_encrypted_size
|
||||
fits = headroom >= 0
|
||||
usage_percent = (estimated_encrypted_size / capacity * 100) if capacity > 0 else 100.0
|
||||
|
||||
return {
|
||||
'fits': fits,
|
||||
'payload_size': payload_size,
|
||||
'estimated_encrypted_size': estimated_encrypted_size,
|
||||
'capacity': capacity,
|
||||
'usage_percent': min(usage_percent, 100.0),
|
||||
'headroom': headroom,
|
||||
'compressed_estimate': compressed_estimate,
|
||||
'mode': EMBED_MODE_LSB,
|
||||
}
|
||||
|
||||
|
||||
def calculate_capacity(image_data: bytes, bits_per_channel: int = 1) -> int:
|
||||
"""
|
||||
Calculate the maximum message capacity of an image (LSB mode).
|
||||
|
||||
Args:
|
||||
image_data: Image bytes
|
||||
bits_per_channel: Bits to use per color channel
|
||||
|
||||
Returns:
|
||||
Maximum bytes that can be embedded (minus overhead)
|
||||
"""
|
||||
debug.validate(bits_per_channel in (1, 2),
|
||||
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
|
||||
|
||||
img_file = Image.open(io.BytesIO(image_data))
|
||||
img = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file
|
||||
|
||||
num_pixels = img.size[0] * img.size[1]
|
||||
bits_per_pixel = 3 * bits_per_channel
|
||||
max_bytes = (num_pixels * bits_per_pixel) // 8
|
||||
|
||||
capacity = max(0, max_bytes - ENCRYPTION_OVERHEAD)
|
||||
debug.print(f"LSB capacity: {capacity} bytes at {bits_per_channel} bit(s)/channel")
|
||||
return capacity
|
||||
|
||||
|
||||
def calculate_capacity_by_mode(
|
||||
image_data: bytes,
|
||||
embed_mode: str = EMBED_MODE_LSB,
|
||||
bits_per_channel: int = 1,
|
||||
) -> dict:
|
||||
"""
|
||||
Calculate capacity for specified embedding mode.
|
||||
|
||||
Args:
|
||||
image_data: Carrier image bytes
|
||||
embed_mode: 'lsb' or 'dct'
|
||||
bits_per_channel: Bits per channel for LSB mode
|
||||
|
||||
Returns:
|
||||
Dict with capacity information
|
||||
"""
|
||||
if embed_mode == EMBED_MODE_DCT:
|
||||
if not has_dct_support():
|
||||
raise ImportError("scipy required for DCT mode. Install: pip install scipy")
|
||||
|
||||
dct_mod = _get_dct_module()
|
||||
dct_info = dct_mod.calculate_dct_capacity(image_data)
|
||||
|
||||
return {
|
||||
'mode': EMBED_MODE_DCT,
|
||||
'capacity_bytes': dct_info.usable_capacity_bytes,
|
||||
'capacity_bits': dct_info.total_capacity_bits,
|
||||
'width': dct_info.width,
|
||||
'height': dct_info.height,
|
||||
'total_blocks': dct_info.total_blocks,
|
||||
}
|
||||
else:
|
||||
capacity = calculate_capacity(image_data, bits_per_channel)
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
width, height = img.size
|
||||
|
||||
return {
|
||||
'mode': EMBED_MODE_LSB,
|
||||
'capacity_bytes': capacity,
|
||||
'capacity_bits': capacity * 8,
|
||||
'width': width,
|
||||
'height': height,
|
||||
'bits_per_channel': bits_per_channel,
|
||||
}
|
||||
|
||||
|
||||
def will_fit_by_mode(
|
||||
payload: Union[str, bytes, FilePayload, int],
|
||||
carrier_image: bytes,
|
||||
embed_mode: str = EMBED_MODE_LSB,
|
||||
bits_per_channel: int = 1,
|
||||
) -> dict:
|
||||
"""
|
||||
Check if payload fits in specified mode.
|
||||
|
||||
Args:
|
||||
payload: Message, bytes, FilePayload, or size in bytes
|
||||
carrier_image: Carrier image bytes
|
||||
embed_mode: 'lsb' or 'dct'
|
||||
bits_per_channel: For LSB mode
|
||||
|
||||
Returns:
|
||||
Dict with fits, capacity, usage info
|
||||
"""
|
||||
if embed_mode == EMBED_MODE_DCT:
|
||||
if not has_dct_support():
|
||||
return {'fits': False, 'error': 'scipy not available', 'mode': EMBED_MODE_DCT}
|
||||
|
||||
if isinstance(payload, int):
|
||||
payload_size = payload
|
||||
elif isinstance(payload, str):
|
||||
payload_size = len(payload.encode('utf-8'))
|
||||
elif hasattr(payload, 'data'):
|
||||
payload_size = len(payload.data)
|
||||
else:
|
||||
payload_size = len(payload)
|
||||
|
||||
estimated_size = payload_size + ENCRYPTION_OVERHEAD + 190 # padding estimate
|
||||
|
||||
dct_mod = _get_dct_module()
|
||||
fits = dct_mod.will_fit_dct(estimated_size, carrier_image)
|
||||
capacity_info = dct_mod.calculate_dct_capacity(carrier_image)
|
||||
capacity = capacity_info.usable_capacity_bytes
|
||||
|
||||
usage_percent = (estimated_size / capacity * 100) if capacity > 0 else 100.0
|
||||
|
||||
return {
|
||||
'fits': fits,
|
||||
'payload_size': payload_size,
|
||||
'capacity': capacity,
|
||||
'usage_percent': min(usage_percent, 100.0),
|
||||
'headroom': capacity - estimated_size,
|
||||
'mode': EMBED_MODE_DCT,
|
||||
}
|
||||
else:
|
||||
return will_fit(payload, carrier_image, bits_per_channel)
|
||||
|
||||
|
||||
def get_available_modes() -> dict:
|
||||
"""
|
||||
Get available embedding modes and their status.
|
||||
|
||||
Returns:
|
||||
Dict mapping mode name to availability info
|
||||
"""
|
||||
return {
|
||||
EMBED_MODE_LSB: {
|
||||
'available': True,
|
||||
'name': 'Spatial LSB',
|
||||
'description': 'Embed in pixel LSBs, outputs PNG/BMP',
|
||||
'output_format': 'PNG (color)',
|
||||
},
|
||||
EMBED_MODE_DCT: {
|
||||
'available': has_dct_support(),
|
||||
'name': 'DCT Domain',
|
||||
'description': 'Embed in DCT coefficients, outputs grayscale PNG or JPEG',
|
||||
'output_formats': ['PNG (grayscale)', 'JPEG (grayscale)'],
|
||||
'requires': 'scipy',
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def compare_modes(image_data: bytes) -> dict:
|
||||
"""
|
||||
Compare embedding modes for a carrier image.
|
||||
|
||||
Args:
|
||||
image_data: Carrier image bytes
|
||||
|
||||
Returns:
|
||||
Dict with comparison of LSB vs DCT modes
|
||||
"""
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
width, height = img.size
|
||||
|
||||
lsb_bytes = calculate_capacity(image_data, 1)
|
||||
|
||||
if has_dct_support():
|
||||
dct_mod = _get_dct_module()
|
||||
dct_info = dct_mod.calculate_dct_capacity(image_data)
|
||||
dct_bytes = dct_info.usable_capacity_bytes
|
||||
dct_available = True
|
||||
else:
|
||||
safe_blocks = (height // 8) * (width // 8)
|
||||
dct_bytes = (safe_blocks * 16) // 8 # Estimated
|
||||
dct_available = False
|
||||
|
||||
return {
|
||||
'width': width,
|
||||
'height': height,
|
||||
'lsb': {
|
||||
'capacity_bytes': lsb_bytes,
|
||||
'capacity_kb': lsb_bytes / 1024,
|
||||
'available': True,
|
||||
'output': 'PNG (color)',
|
||||
},
|
||||
'dct': {
|
||||
'capacity_bytes': dct_bytes,
|
||||
'capacity_kb': dct_bytes / 1024,
|
||||
'available': dct_available,
|
||||
'output': 'PNG or JPEG (grayscale)',
|
||||
'ratio_vs_lsb': (dct_bytes / lsb_bytes * 100) if lsb_bytes > 0 else 0,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# PIXEL INDEX GENERATION
|
||||
# =============================================================================
|
||||
|
||||
@debug.time
|
||||
def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List[int]:
|
||||
"""
|
||||
Generate pseudo-random pixel indices for embedding.
|
||||
|
||||
Uses ChaCha20 as a CSPRNG seeded by the key to deterministically
|
||||
select which pixels will hold hidden data.
|
||||
"""
|
||||
debug.validate(len(key) == 32, f"Pixel key must be 32 bytes, got {len(key)}")
|
||||
debug.validate(num_pixels > 0, f"Number of pixels must be positive, got {num_pixels}")
|
||||
debug.validate(num_needed > 0, f"Number needed must be positive, got {num_needed}")
|
||||
debug.validate(num_needed <= num_pixels,
|
||||
f"Cannot select {num_needed} pixels from {num_pixels} available")
|
||||
|
||||
debug.print(f"Generating {num_needed} pixel indices from {num_pixels} total pixels")
|
||||
|
||||
if num_needed >= num_pixels // 2:
|
||||
debug.print(f"Using full shuffle (needed {num_needed}/{num_pixels} pixels)")
|
||||
nonce = b'\x00' * 16
|
||||
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
indices = list(range(num_pixels))
|
||||
random_bytes = encryptor.update(b'\x00' * (num_pixels * 4))
|
||||
|
||||
for i in range(num_pixels - 1, 0, -1):
|
||||
j_bytes = random_bytes[(num_pixels - 1 - i) * 4:(num_pixels - i) * 4]
|
||||
j = int.from_bytes(j_bytes, 'big') % (i + 1)
|
||||
indices[i], indices[j] = indices[j], indices[i]
|
||||
|
||||
selected = indices[:num_needed]
|
||||
debug.print(f"Generated {len(selected)} indices via shuffle")
|
||||
return selected
|
||||
|
||||
debug.print(f"Using optimized selection (needed {num_needed}/{num_pixels} pixels)")
|
||||
selected = []
|
||||
used = set()
|
||||
|
||||
nonce = b'\x00' * 16
|
||||
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
bytes_needed = (num_needed * 2) * 4
|
||||
random_bytes = encryptor.update(b'\x00' * bytes_needed)
|
||||
|
||||
byte_offset = 0
|
||||
collisions = 0
|
||||
while len(selected) < num_needed and byte_offset < len(random_bytes) - 4:
|
||||
idx = int.from_bytes(random_bytes[byte_offset:byte_offset + 4], 'big') % num_pixels
|
||||
byte_offset += 4
|
||||
|
||||
if idx not in used:
|
||||
used.add(idx)
|
||||
selected.append(idx)
|
||||
else:
|
||||
collisions += 1
|
||||
|
||||
if len(selected) < num_needed:
|
||||
debug.print(f"Need {num_needed - len(selected)} more indices, generating...")
|
||||
extra_needed = num_needed - len(selected)
|
||||
for _ in range(extra_needed * 2):
|
||||
extra_bytes = encryptor.update(b'\x00' * 4)
|
||||
idx = int.from_bytes(extra_bytes, 'big') % num_pixels
|
||||
if idx not in used:
|
||||
used.add(idx)
|
||||
selected.append(idx)
|
||||
if len(selected) == num_needed:
|
||||
break
|
||||
|
||||
debug.print(f"Generated {len(selected)} indices with {collisions} collisions")
|
||||
debug.validate(len(selected) == num_needed,
|
||||
f"Failed to generate enough indices: {len(selected)}/{num_needed}")
|
||||
return selected
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# EMBEDDING FUNCTIONS
|
||||
# =============================================================================
|
||||
|
||||
@debug.time
|
||||
def embed_in_image(
|
||||
data: bytes,
|
||||
image_data: bytes,
|
||||
pixel_key: bytes,
|
||||
bits_per_channel: int = 1,
|
||||
output_format: Optional[str] = None,
|
||||
embed_mode: str = EMBED_MODE_LSB,
|
||||
dct_output_format: str = DCT_OUTPUT_PNG,
|
||||
dct_color_mode: str = 'grayscale',
|
||||
) -> Tuple[bytes, Union[EmbedStats, 'DCTEmbedStats'], str]:
|
||||
"""
|
||||
Embed data into an image using specified mode.
|
||||
|
||||
Args:
|
||||
data: Data to embed (encrypted payload)
|
||||
image_data: Carrier image bytes
|
||||
pixel_key: Key for pixel/coefficient selection
|
||||
bits_per_channel: Bits per channel (LSB mode only)
|
||||
output_format: Force output format (LSB mode only)
|
||||
embed_mode: 'lsb' (default) or 'dct'
|
||||
dct_output_format: For DCT mode - 'png' (lossless) or 'jpeg' (smaller)
|
||||
dct_color_mode: For DCT mode - 'grayscale' (default) or 'color' (preserves colors)
|
||||
|
||||
Returns:
|
||||
Tuple of (stego image bytes, stats, file extension)
|
||||
|
||||
Raises:
|
||||
CapacityError: If data won't fit
|
||||
EmbeddingError: If embedding fails
|
||||
ImportError: If DCT mode requested but scipy unavailable
|
||||
"""
|
||||
debug.print(f"embed_in_image: mode={embed_mode}, data={len(data)} bytes")
|
||||
debug.validate(embed_mode in VALID_EMBED_MODES,
|
||||
f"Invalid embed_mode: {embed_mode}. Use 'lsb' or 'dct'")
|
||||
|
||||
# DCT MODE
|
||||
if embed_mode == EMBED_MODE_DCT:
|
||||
if not has_dct_support():
|
||||
raise ImportError(
|
||||
"scipy is required for DCT embedding mode. "
|
||||
"Install with: pip install scipy"
|
||||
)
|
||||
|
||||
# Validate DCT output format
|
||||
if dct_output_format not in (DCT_OUTPUT_PNG, DCT_OUTPUT_JPEG):
|
||||
debug.print(f"Invalid dct_output_format '{dct_output_format}', defaulting to PNG")
|
||||
dct_output_format = DCT_OUTPUT_PNG
|
||||
|
||||
# Validate DCT color mode (v3.0.1)
|
||||
if dct_color_mode not in ('grayscale', 'color'):
|
||||
debug.print(f"Invalid dct_color_mode '{dct_color_mode}', defaulting to grayscale")
|
||||
dct_color_mode = 'grayscale'
|
||||
|
||||
dct_mod = _get_dct_module()
|
||||
|
||||
# Pass output_format and color_mode to DCT module (v3.0.1)
|
||||
stego_bytes, dct_stats = dct_mod.embed_in_dct(
|
||||
data,
|
||||
image_data,
|
||||
pixel_key,
|
||||
output_format=dct_output_format,
|
||||
color_mode=dct_color_mode,
|
||||
)
|
||||
|
||||
# Determine extension based on output format
|
||||
if dct_output_format == DCT_OUTPUT_JPEG:
|
||||
ext = 'jpg'
|
||||
else:
|
||||
ext = 'png'
|
||||
|
||||
debug.print(f"DCT embedding complete: {dct_output_format.upper()} output, "
|
||||
f"color_mode={dct_color_mode}, ext={ext}")
|
||||
return stego_bytes, dct_stats, ext
|
||||
|
||||
# LSB MODE
|
||||
return _embed_lsb(data, image_data, pixel_key, bits_per_channel, output_format)
|
||||
|
||||
|
||||
def _embed_lsb(
|
||||
data: bytes,
|
||||
image_data: bytes,
|
||||
pixel_key: bytes,
|
||||
bits_per_channel: int = 1,
|
||||
output_format: Optional[str] = None,
|
||||
) -> Tuple[bytes, EmbedStats, str]:
|
||||
"""
|
||||
Embed data using LSB steganography (internal implementation).
|
||||
"""
|
||||
debug.print(f"LSB embedding {len(data)} bytes into image")
|
||||
debug.data(pixel_key, "Pixel key for embedding")
|
||||
debug.validate(bits_per_channel in (1, 2),
|
||||
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
|
||||
debug.validate(len(pixel_key) == 32,
|
||||
f"Pixel key must be 32 bytes, got {len(pixel_key)}")
|
||||
|
||||
try:
|
||||
img_file = Image.open(io.BytesIO(image_data))
|
||||
input_format = img_file.format
|
||||
|
||||
debug.print(f"Carrier image: {img_file.size[0]}x{img_file.size[1]}, format: {input_format}")
|
||||
|
||||
img = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file.copy()
|
||||
if img_file.mode != 'RGB':
|
||||
debug.print(f"Converting image from {img_file.mode} to RGB")
|
||||
|
||||
pixels = list(img.getdata())
|
||||
num_pixels = len(pixels)
|
||||
|
||||
bits_per_pixel = 3 * bits_per_channel
|
||||
max_bytes = (num_pixels * bits_per_pixel) // 8
|
||||
|
||||
debug.print(f"Image capacity: {max_bytes} bytes at {bits_per_channel} bit(s)/channel")
|
||||
|
||||
data_with_len = struct.pack('>I', len(data)) + data
|
||||
|
||||
if len(data_with_len) > max_bytes:
|
||||
debug.print(f"Capacity error: need {len(data_with_len)}, have {max_bytes}")
|
||||
raise CapacityError(len(data_with_len), max_bytes)
|
||||
|
||||
debug.print(f"Total data to embed: {len(data_with_len)} bytes "
|
||||
f"({len(data_with_len)/max_bytes*100:.1f}% of capacity)")
|
||||
|
||||
binary_data = ''.join(format(b, '08b') for b in data_with_len)
|
||||
pixels_needed = (len(binary_data) + bits_per_pixel - 1) // bits_per_pixel
|
||||
|
||||
debug.print(f"Need {pixels_needed} pixels to embed {len(binary_data)} bits")
|
||||
|
||||
selected_indices = generate_pixel_indices(pixel_key, num_pixels, pixels_needed)
|
||||
|
||||
new_pixels = list(pixels)
|
||||
clear_mask = 0xFF ^ ((1 << bits_per_channel) - 1)
|
||||
|
||||
bit_idx = 0
|
||||
modified_pixels = 0
|
||||
|
||||
for pixel_idx in selected_indices:
|
||||
if bit_idx >= len(binary_data):
|
||||
break
|
||||
|
||||
r, g, b = new_pixels[pixel_idx]
|
||||
modified = False
|
||||
|
||||
for channel_idx, channel_val in enumerate([r, g, b]):
|
||||
if bit_idx >= len(binary_data):
|
||||
break
|
||||
bits = binary_data[bit_idx:bit_idx + bits_per_channel].ljust(bits_per_channel, '0')
|
||||
new_val = (channel_val & clear_mask) | int(bits, 2)
|
||||
|
||||
if channel_val != new_val:
|
||||
modified = True
|
||||
if channel_idx == 0:
|
||||
r = new_val
|
||||
elif channel_idx == 1:
|
||||
g = new_val
|
||||
else:
|
||||
b = new_val
|
||||
|
||||
bit_idx += bits_per_channel
|
||||
|
||||
if modified:
|
||||
new_pixels[pixel_idx] = (r, g, b)
|
||||
modified_pixels += 1
|
||||
|
||||
debug.print(f"Modified {modified_pixels} pixels (out of {len(selected_indices)} selected)")
|
||||
|
||||
stego_img = Image.new('RGB', img.size)
|
||||
stego_img.putdata(new_pixels)
|
||||
|
||||
if output_format:
|
||||
out_fmt = output_format.upper()
|
||||
out_ext = FORMAT_TO_EXT.get(out_fmt, 'png')
|
||||
debug.print(f"Using forced output format: {out_fmt}")
|
||||
else:
|
||||
out_fmt, out_ext = get_output_format(input_format)
|
||||
debug.print(f"Auto-selected output format: {out_fmt}")
|
||||
|
||||
output = io.BytesIO()
|
||||
stego_img.save(output, out_fmt)
|
||||
output.seek(0)
|
||||
|
||||
stats = EmbedStats(
|
||||
pixels_modified=modified_pixels,
|
||||
total_pixels=num_pixels,
|
||||
capacity_used=len(data_with_len) / max_bytes,
|
||||
bytes_embedded=len(data_with_len)
|
||||
)
|
||||
|
||||
debug.print(f"LSB embedding complete: {out_fmt} image, {len(output.getvalue())} bytes")
|
||||
return output.getvalue(), stats, out_ext
|
||||
|
||||
except CapacityError:
|
||||
raise
|
||||
except Exception as e:
|
||||
debug.exception(e, "embed_lsb")
|
||||
raise EmbeddingError(f"Failed to embed data: {e}") from e
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# EXTRACTION FUNCTIONS
|
||||
# =============================================================================
|
||||
|
||||
@debug.time
|
||||
def extract_from_image(
|
||||
image_data: bytes,
|
||||
pixel_key: bytes,
|
||||
bits_per_channel: int = 1,
|
||||
embed_mode: str = EMBED_MODE_AUTO,
|
||||
) -> Optional[bytes]:
|
||||
"""
|
||||
Extract hidden data from a stego image.
|
||||
|
||||
Args:
|
||||
image_data: Stego image bytes
|
||||
pixel_key: Key for pixel/coefficient selection (must match encoding)
|
||||
bits_per_channel: Bits per channel (LSB mode only)
|
||||
embed_mode: 'auto' (try both), 'lsb', or 'dct'
|
||||
|
||||
Returns:
|
||||
Extracted data bytes, or None if extraction fails
|
||||
"""
|
||||
debug.print(f"extract_from_image: mode={embed_mode}")
|
||||
|
||||
# AUTO MODE: Try LSB first, then DCT
|
||||
if embed_mode == EMBED_MODE_AUTO:
|
||||
result = _extract_lsb(image_data, pixel_key, bits_per_channel)
|
||||
if result is not None:
|
||||
debug.print("Auto-detect: LSB extraction succeeded")
|
||||
return result
|
||||
|
||||
if has_dct_support():
|
||||
debug.print("Auto-detect: LSB failed, trying DCT")
|
||||
result = _extract_dct(image_data, pixel_key)
|
||||
if result is not None:
|
||||
debug.print("Auto-detect: DCT extraction succeeded")
|
||||
return result
|
||||
|
||||
debug.print("Auto-detect: All modes failed")
|
||||
return None
|
||||
|
||||
# EXPLICIT DCT MODE
|
||||
elif embed_mode == EMBED_MODE_DCT:
|
||||
if not has_dct_support():
|
||||
raise ImportError("scipy required for DCT mode")
|
||||
return _extract_dct(image_data, pixel_key)
|
||||
|
||||
# EXPLICIT LSB MODE
|
||||
else:
|
||||
return _extract_lsb(image_data, pixel_key, bits_per_channel)
|
||||
|
||||
|
||||
def _extract_dct(image_data: bytes, pixel_key: bytes) -> Optional[bytes]:
|
||||
"""Extract using DCT mode."""
|
||||
try:
|
||||
dct_mod = _get_dct_module()
|
||||
return dct_mod.extract_from_dct(image_data, pixel_key)
|
||||
except Exception as e:
|
||||
debug.print(f"DCT extraction failed: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _extract_lsb(
|
||||
image_data: bytes,
|
||||
pixel_key: bytes,
|
||||
bits_per_channel: int = 1
|
||||
) -> Optional[bytes]:
|
||||
"""
|
||||
Extract using LSB mode (internal implementation).
|
||||
"""
|
||||
debug.print(f"LSB extracting from {len(image_data)} byte image")
|
||||
debug.data(pixel_key, "Pixel key for extraction")
|
||||
debug.validate(bits_per_channel in (1, 2),
|
||||
f"bits_per_channel must be 1 or 2, got {bits_per_channel}")
|
||||
|
||||
try:
|
||||
img_file = Image.open(io.BytesIO(image_data))
|
||||
debug.print(f"Image: {img_file.size[0]}x{img_file.size[1]}, format: {img_file.format}")
|
||||
|
||||
img = img_file.convert('RGB') if img_file.mode != 'RGB' else img_file.copy()
|
||||
if img_file.mode != 'RGB':
|
||||
debug.print(f"Converting image from {img_file.mode} to RGB")
|
||||
|
||||
pixels = list(img.getdata())
|
||||
num_pixels = len(pixels)
|
||||
bits_per_pixel = 3 * bits_per_channel
|
||||
|
||||
debug.print(f"Image has {num_pixels} pixels, {bits_per_pixel} bits/pixel")
|
||||
|
||||
initial_pixels = (32 + bits_per_pixel - 1) // bits_per_pixel + 10
|
||||
debug.print(f"Extracting initial {initial_pixels} pixels to find length")
|
||||
|
||||
initial_indices = generate_pixel_indices(pixel_key, num_pixels, initial_pixels)
|
||||
|
||||
binary_data = ''
|
||||
for pixel_idx in initial_indices:
|
||||
r, g, b = pixels[pixel_idx]
|
||||
for channel in [r, g, b]:
|
||||
for bit_pos in range(bits_per_channel - 1, -1, -1):
|
||||
binary_data += str((channel >> bit_pos) & 1)
|
||||
|
||||
try:
|
||||
length_bits = binary_data[:32]
|
||||
if len(length_bits) < 32:
|
||||
debug.print(f"Not enough bits for length: {len(length_bits)}/32")
|
||||
return None
|
||||
|
||||
data_length = struct.unpack('>I', int(length_bits, 2).to_bytes(4, 'big'))[0]
|
||||
debug.print(f"Extracted length: {data_length} bytes")
|
||||
except Exception as e:
|
||||
debug.print(f"Failed to parse length: {e}")
|
||||
return None
|
||||
|
||||
max_possible = (num_pixels * bits_per_pixel) // 8 - 4
|
||||
if data_length > max_possible or data_length < 10:
|
||||
debug.print(f"Invalid data length: {data_length} (max possible: {max_possible})")
|
||||
return None
|
||||
|
||||
total_bits = (4 + data_length) * 8
|
||||
pixels_needed = (total_bits + bits_per_pixel - 1) // bits_per_pixel
|
||||
|
||||
debug.print(f"Need {pixels_needed} pixels to extract {data_length} bytes")
|
||||
|
||||
selected_indices = generate_pixel_indices(pixel_key, num_pixels, pixels_needed)
|
||||
|
||||
binary_data = ''
|
||||
for pixel_idx in selected_indices:
|
||||
r, g, b = pixels[pixel_idx]
|
||||
for channel in [r, g, b]:
|
||||
for bit_pos in range(bits_per_channel - 1, -1, -1):
|
||||
binary_data += str((channel >> bit_pos) & 1)
|
||||
|
||||
data_bits = binary_data[32:32 + (data_length * 8)]
|
||||
|
||||
if len(data_bits) < data_length * 8:
|
||||
debug.print(f"Insufficient bits: {len(data_bits)} < {data_length * 8}")
|
||||
return None
|
||||
|
||||
data_bytes = bytearray()
|
||||
for i in range(0, len(data_bits), 8):
|
||||
byte_bits = data_bits[i:i + 8]
|
||||
if len(byte_bits) == 8:
|
||||
data_bytes.append(int(byte_bits, 2))
|
||||
|
||||
debug.print(f"LSB successfully extracted {len(data_bytes)} bytes")
|
||||
return bytes(data_bytes)
|
||||
|
||||
except Exception as e:
|
||||
debug.exception(e, "extract_lsb")
|
||||
return None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# UTILITY FUNCTIONS
|
||||
# =============================================================================
|
||||
|
||||
def get_image_dimensions(image_data: bytes) -> Tuple[int, int]:
|
||||
"""Get image dimensions without loading full image."""
|
||||
debug.validate(len(image_data) > 0, "Image data cannot be empty")
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
dimensions = img.size
|
||||
debug.print(f"Image dimensions: {dimensions[0]}x{dimensions[1]}")
|
||||
return dimensions
|
||||
|
||||
|
||||
def get_image_format(image_data: bytes) -> Optional[str]:
|
||||
"""Get image format (PIL format string like 'PNG', 'JPEG')."""
|
||||
try:
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
format_str = img.format
|
||||
debug.print(f"Image format: {format_str}")
|
||||
return format_str
|
||||
except Exception as e:
|
||||
debug.print(f"Failed to get image format: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def is_lossless_format(image_data: bytes) -> bool:
|
||||
"""Check if image is in a lossless format suitable for steganography."""
|
||||
fmt = get_image_format(image_data)
|
||||
is_lossless = fmt is not None and fmt.upper() in LOSSLESS_FORMATS
|
||||
debug.print(f"Image is lossless: {is_lossless} (format: {fmt})")
|
||||
return is_lossless
|
||||
Reference in New Issue
Block a user