Version 3.0.2 full expirimental DCT support, jpegio for better jpg manipulation, etc.

This commit is contained in:
Aaron D. Lee
2025-12-31 15:43:29 -05:00
parent 4eefc946c4
commit 34376b2dfe
19 changed files with 2954 additions and 2200 deletions

144
src/stegasoo/Dockerfile Normal file
View File

@@ -0,0 +1,144 @@
# Stegasoo Docker Image
# Multi-stage build for smaller image size
# Pin the base image digest for reproducibility
# To update: docker manifest inspect python:3.11-slim -v | jq -r '.[0].Descriptor.digest'
FROM python:3.11-slim@sha256:5501a4fe605abe24de87c2f3d6cf9fd760354416a0cad0296cf284fddcdca9e2 as base
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
# Suppress pip "running as root" warnings during build
ENV PIP_ROOT_USER_ACTION=ignore
# Install system dependencies
# NOTE: libjpeg-dev is required for jpegio compilation
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libc-dev \
libffi-dev \
libzbar0 \
libjpeg-dev \
&& rm -rf /var/lib/apt/lists/*
# ============================================================================
# Builder stage - install Python packages
# ============================================================================
FROM base as builder
WORKDIR /build
# Copy package files (including README.md which pyproject.toml references)
COPY pyproject.toml README.md ./
COPY src/ src/
COPY data/ data/
# Install build dependencies for jpegio, then install the package
# jpegio requires Cython and numpy to compile
RUN pip install --no-cache-dir cython numpy && \
pip install --no-cache-dir ".[web]"
# ============================================================================
# Production stage - Web UI
# ============================================================================
FROM base as web
WORKDIR /app
# Copy installed packages from builder
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application files
COPY src/ src/
COPY data/ data/
COPY frontends/web/ frontends/web/
# Create upload directory
RUN mkdir -p /tmp/stego_uploads
# Create non-root user
RUN useradd -m -u 1000 stego && chown -R stego:stego /app /tmp/stego_uploads
USER stego
# Set Python path
ENV PYTHONPATH=/app/src
# Expose port
EXPOSE 5000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:5000/')" || exit 1
# Run with gunicorn
WORKDIR /app/frontends/web
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "2", "--threads", "4", "--timeout", "60", "app:app"]
# ============================================================================
# API stage - REST API
# ============================================================================
FROM base as api
WORKDIR /app
# Install API extras (includes DCT dependencies)
COPY pyproject.toml README.md ./
COPY src/ src/
COPY data/ data/
# Install build dependencies for jpegio, then install the package
RUN pip install --no-cache-dir cython numpy && \
pip install --no-cache-dir ".[api]"
# Copy API files
COPY frontends/api/ frontends/api/
# Create non-root user
RUN useradd -m -u 1000 stego && chown -R stego:stego /app
USER stego
# Set Python path
ENV PYTHONPATH=/app/src
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/')" || exit 1
# Run with uvicorn
WORKDIR /app/frontends/api
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# ============================================================================
# CLI stage - Command line tool
# ============================================================================
FROM base as cli
WORKDIR /app
# Install CLI extras
COPY pyproject.toml README.md ./
COPY src/ src/
COPY data/ data/
# Install build dependencies for jpegio (if dct extras needed), then install
RUN pip install --no-cache-dir cython numpy && \
pip install --no-cache-dir ".[cli,dct]"
# Copy CLI files
COPY frontends/cli/ frontends/cli/
# Create non-root user
RUN useradd -m -u 1000 stego && chown -R stego:stego /app
USER stego
# Set Python path
ENV PYTHONPATH=/app/src
# Default to help
WORKDIR /app/frontends/cli
ENTRYPOINT ["python", "main.py"]
CMD ["--help"]

View File

@@ -315,6 +315,7 @@ def encode(
output_format = None, # Optional[str]
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png", # NEW in v3.0.1: 'png' or 'jpeg'
dct_color_mode: str = "grayscale", # NEW in v3.0.1: 'grayscale' or 'color'
) -> EncodeResult:
"""
Encode a secret message or file into an image.
@@ -334,6 +335,7 @@ def encode(
output_format: Force output format ('PNG', 'BMP') - LSB mode only
embed_mode: Embedding mode - 'lsb' (default) or 'dct' (v3.0+)
dct_output_format: For DCT mode - 'png' (lossless) or 'jpeg' (smaller)
dct_color_mode: For DCT mode - 'grayscale' (default) or 'color' (preserves colors)
Returns:
EncodeResult with stego image and metadata
@@ -349,16 +351,18 @@ def encode(
# Default LSB mode
>>> result = encode(message="Secret", ...)
# DCT mode with PNG output (lossless)
# DCT mode with grayscale PNG output (default)
>>> result = encode(message="Secret", ..., embed_mode='dct')
# DCT mode with JPEG output (smaller, natural)
>>> result = encode(message="Secret", ..., embed_mode='dct', dct_output_format='jpeg')
# DCT mode with color JPEG output
>>> result = encode(message="Secret", ..., embed_mode='dct',
... dct_output_format='jpeg', dct_color_mode='color')
"""
# Debug logging
debug.print(f"encode called: message type={type(message).__name__}, "
f"day_phrase='{day_phrase[:20]}...', pin_length={len(pin)}, "
f"embed_mode={embed_mode}, dct_output_format={dct_output_format}")
f"embed_mode={embed_mode}, dct_output_format={dct_output_format}, "
f"dct_color_mode={dct_color_mode}")
# Validate embed_mode
if embed_mode not in (EMBED_MODE_LSB, EMBED_MODE_DCT):
@@ -375,6 +379,11 @@ def encode(
debug.print(f"Invalid dct_output_format '{dct_output_format}', defaulting to 'png'")
dct_output_format = 'png'
# Validate dct_color_mode (v3.0.1)
if dct_color_mode not in ('grayscale', 'color'):
debug.print(f"Invalid dct_color_mode '{dct_color_mode}', defaulting to 'grayscale'")
dct_color_mode = 'grayscale'
# Validate inputs
require_valid_payload(message)
require_valid_image(carrier_image, "Carrier image")
@@ -407,7 +416,7 @@ def encode(
debug.data(pixel_key, "Pixel key")
# Embed in image (returns extension too)
# CRITICAL: Pass dct_output_format to embed_in_image
# CRITICAL: Pass dct_output_format and dct_color_mode to embed_in_image
stego_data, stats, extension = embed_in_image(
encrypted,
carrier_image,
@@ -415,6 +424,7 @@ def encode(
output_format=output_format,
embed_mode=embed_mode,
dct_output_format=dct_output_format, # NEW in v3.0.1
dct_color_mode=dct_color_mode, # NEW in v3.0.1
)
# Generate filename with correct extension
@@ -468,6 +478,7 @@ def encode_file(
filename_override: Optional[str] = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png", # NEW in v3.0.1
dct_color_mode: str = "grayscale", # NEW in v3.0.1
) -> EncodeResult:
"""
Encode a file into an image.
@@ -487,12 +498,13 @@ def encode_file(
filename_override: Override the stored filename
embed_mode: 'lsb' (default) or 'dct' (v3.0+)
dct_output_format: For DCT mode - 'png' or 'jpeg' (v3.0.1+)
dct_color_mode: For DCT mode - 'grayscale' or 'color' (v3.0.1+)
Returns:
EncodeResult with stego image and metadata
"""
debug.print(f"encode_file called: filepath={filepath}, embed_mode={embed_mode}, "
f"dct_output_format={dct_output_format}")
f"dct_output_format={dct_output_format}, dct_color_mode={dct_color_mode}")
payload = FilePayload.from_file(str(filepath), filename_override)
return encode(
@@ -507,6 +519,7 @@ def encode_file(
output_format=output_format,
embed_mode=embed_mode,
dct_output_format=dct_output_format, # NEW in v3.0.1
dct_color_mode=dct_color_mode, # NEW in v3.0.1
)
@@ -528,6 +541,7 @@ def encode_bytes(
mime_type: Optional[str] = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png", # NEW in v3.0.1
dct_color_mode: str = "grayscale", # NEW in v3.0.1
) -> EncodeResult:
"""
Encode raw bytes with a filename into an image.
@@ -548,12 +562,14 @@ def encode_bytes(
mime_type: MIME type of the data
embed_mode: 'lsb' (default) or 'dct' (v3.0+)
dct_output_format: For DCT mode - 'png' or 'jpeg' (v3.0.1+)
dct_color_mode: For DCT mode - 'grayscale' or 'color' (v3.0.1+)
Returns:
EncodeResult with stego image and metadata
"""
debug.print(f"encode_bytes called: filename={filename}, data_size={len(data)}, "
f"embed_mode={embed_mode}, dct_output_format={dct_output_format}")
f"embed_mode={embed_mode}, dct_output_format={dct_output_format}, "
f"dct_color_mode={dct_color_mode}")
payload = FilePayload(data=data, filename=filename, mime_type=mime_type)
return encode(
@@ -568,6 +584,7 @@ def encode_bytes(
output_format=output_format,
embed_mode=embed_mode,
dct_output_format=dct_output_format, # NEW in v3.0.1
dct_color_mode=dct_color_mode, # NEW in v3.0.1
)

View File

@@ -1,29 +1,32 @@
"""
DCT Domain Steganography Module (v3.0.1)
DCT Domain Steganography Module (v3.0.2)
Embeds data in DCT coefficients of grayscale images.
Supports PNG (lossless) or JPEG (natural, smaller) output.
Embeds data in DCT coefficients with two approaches:
1. PNG output: Scipy-based DCT transform (grayscale or color)
2. JPEG output: jpegio-based coefficient manipulation (if available)
This provides an alternative to LSB embedding with different trade-offs:
- More resistant to visual inspection
- Survives some image processing
- Lower capacity (~20% of LSB)
- Works in frequency domain
The JPEG approach is the "correct" way to do JPEG steganography because
it directly modifies the already-quantized coefficients without re-encoding.
Requires: scipy (for DCT transforms)
New in v3.0.2:
- jpegio integration for proper JPEG coefficient embedding
- Falls back to warning if jpegio not available for JPEG output
- Maintains backward compatibility with v3.0.1
Requires: scipy (for PNG mode), optionally jpegio (for JPEG mode)
"""
import io
import struct
import hashlib
from dataclasses import dataclass
from typing import Optional, Literal
from typing import Optional, Literal, Tuple
from enum import Enum
import numpy as np
from PIL import Image
# Check for scipy availability
# Check for scipy availability (for PNG/DCT mode)
try:
from scipy.fftpack import dct, idct
HAS_SCIPY = True
@@ -32,6 +35,14 @@ except ImportError:
dct = None
idct = None
# Check for jpegio availability (for proper JPEG mode)
try:
import jpegio as jio
HAS_JPEGIO = True
except ImportError:
HAS_JPEGIO = False
jio = None
# ============================================================================
# CONSTANTS
@@ -41,8 +52,6 @@ except ImportError:
BLOCK_SIZE = 8
# Coefficients to use for embedding (mid-frequency, zig-zag order positions)
# Avoiding DC (0,0) and high-frequency edges
# These positions are relatively stable across JPEG compression
EMBED_POSITIONS = [
(0, 1), (1, 0), (2, 0), (1, 1), (0, 2), (0, 3), (1, 2), (2, 1), (3, 0),
(4, 0), (3, 1), (2, 2), (1, 3), (0, 4), (0, 5), (1, 4), (2, 3), (3, 2),
@@ -51,25 +60,29 @@ EMBED_POSITIONS = [
]
# Use subset of mid-frequency coefficients for better robustness
# Positions 4-20 in zig-zag order (skip very low and very high frequencies)
DEFAULT_EMBED_POSITIONS = EMBED_POSITIONS[4:20] # 16 coefficients per block
# Quantization step for embedding (larger = more robust, more visible)
# Quantization step for QIM embedding (larger = more robust, more visible)
QUANT_STEP = 25
# Magic bytes for DCT stego identification
DCT_MAGIC = b'DCTS'
# Header: magic(4) + version(1) + flags(1) + length(4) = 10 bytes
# Header size: magic(4) + version(1) + flags(1) + length(4) = 10 bytes
HEADER_SIZE = 10
# Output format options
OUTPUT_FORMAT_PNG = 'png'
OUTPUT_FORMAT_JPEG = 'jpeg'
# JPEG quality for output (high to preserve coefficients)
# JPEG output quality (only for fallback mode, not jpegio)
JPEG_OUTPUT_QUALITY = 95
# jpegio constants for JPEG coefficient embedding
JPEGIO_MAGIC = b'JPGS'
JPEGIO_MIN_COEF_MAGNITUDE = 2
JPEGIO_EMBED_CHANNEL = 0 # Y channel
# ============================================================================
# DATA CLASSES
@@ -91,7 +104,9 @@ class DCTEmbedStats:
usage_percent: float
image_width: int
image_height: int
output_format: str # 'png' or 'jpeg'
output_format: str
jpeg_native: bool = False # True if used jpegio for proper JPEG embedding
color_mode: str = 'grayscale' # 'color' or 'grayscale' (v3.0.1+)
@dataclass
@@ -105,11 +120,11 @@ class DCTCapacityInfo:
bits_per_block: int
total_capacity_bits: int
total_capacity_bytes: int
usable_capacity_bytes: int # After header overhead
usable_capacity_bytes: int
# ============================================================================
# HELPER FUNCTIONS
# AVAILABILITY CHECKS
# ============================================================================
def _check_scipy():
@@ -121,6 +136,20 @@ def _check_scipy():
)
def has_dct_support() -> bool:
"""Check if DCT steganography is available (scipy installed)."""
return HAS_SCIPY
def has_jpegio_support() -> bool:
"""Check if jpegio is available for proper JPEG coefficient embedding."""
return HAS_JPEGIO
# ============================================================================
# SCIPY DCT HELPERS (for PNG output)
# ============================================================================
def _dct2(block: np.ndarray) -> np.ndarray:
"""Apply 2D DCT to a block."""
return dct(dct(block.T, norm='ortho').T, norm='ortho')
@@ -138,7 +167,7 @@ def _to_grayscale(image_data: bytes) -> np.ndarray:
return np.array(gray, dtype=np.float64)
def _pad_to_blocks(image: np.ndarray) -> tuple[np.ndarray, tuple[int, int]]:
def _pad_to_blocks(image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
"""Pad image dimensions to be divisible by block size."""
h, w = image.shape
new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
@@ -150,7 +179,6 @@ def _pad_to_blocks(image: np.ndarray) -> tuple[np.ndarray, tuple[int, int]]:
padded = np.zeros((new_h, new_w), dtype=image.dtype)
padded[:h, :w] = image
# Mirror padding for smoother edges
if new_h > h:
padded[h:, :w] = image[h-(new_h-h):h, :w][::-1, :]
if new_w > w:
@@ -161,82 +189,125 @@ def _pad_to_blocks(image: np.ndarray) -> tuple[np.ndarray, tuple[int, int]]:
return padded, (h, w)
def _unpad_image(image: np.ndarray, original_size: tuple[int, int]) -> np.ndarray:
def _unpad_image(image: np.ndarray, original_size: Tuple[int, int]) -> np.ndarray:
"""Remove padding from image."""
h, w = original_size
return image[:h, :w]
def _embed_bit_in_coeff(coeff: float, bit: int, quant_step: int = QUANT_STEP) -> float:
def _embed_bit_in_coeff(coef: float, bit: int, quant_step: int = QUANT_STEP) -> float:
"""Embed a single bit into a DCT coefficient using QIM."""
# Quantization Index Modulation
quantized = round(coeff / quant_step)
quantized = round(coef / quant_step)
if (quantized % 2) != bit:
# Adjust to embed the bit
if quantized % 2 == 0 and bit == 1:
quantized += 1 if coeff >= quantized * quant_step else -1
quantized += 1 if coef >= quantized * quant_step else -1
elif quantized % 2 == 1 and bit == 0:
quantized += 1 if coeff >= quantized * quant_step else -1
quantized += 1 if coef >= quantized * quant_step else -1
return quantized * quant_step
def _extract_bit_from_coeff(coeff: float, quant_step: int = QUANT_STEP) -> int:
def _extract_bit_from_coeff(coef: float, quant_step: int = QUANT_STEP) -> int:
"""Extract a single bit from a DCT coefficient."""
quantized = round(coeff / quant_step)
quantized = round(coef / quant_step)
return quantized % 2
def _generate_block_order(num_blocks: int, seed: bytes) -> list[int]:
def _generate_block_order(num_blocks: int, seed: bytes) -> list:
"""Generate pseudo-random block order from seed."""
# Create deterministic RNG from seed
hash_bytes = hashlib.sha256(seed).digest()
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
order = list(range(num_blocks))
rng.shuffle(order)
return order
def _save_stego_image(
image: np.ndarray,
output_format: str = OUTPUT_FORMAT_PNG
) -> bytes:
"""Save stego image in specified format."""
# Clip to valid range and convert to uint8
def _save_stego_image(image: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes:
"""Save stego image in specified format (grayscale)."""
clipped = np.clip(image, 0, 255).astype(np.uint8)
img = Image.fromarray(clipped, mode='L')
buffer = io.BytesIO()
if output_format == OUTPUT_FORMAT_JPEG:
# High-quality JPEG with no chroma subsampling
img.save(
buffer,
format='JPEG',
quality=JPEG_OUTPUT_QUALITY,
subsampling=0, # 4:4:4 - no subsampling
optimize=True
)
img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY,
subsampling=0, optimize=True)
else:
# PNG (lossless, default)
img.save(buffer, format='PNG', optimize=True)
return buffer.getvalue()
def _save_color_image(rgb_array: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes:
"""Save color RGB image in specified format."""
clipped = np.clip(rgb_array, 0, 255).astype(np.uint8)
img = Image.fromarray(clipped, mode='RGB')
buffer = io.BytesIO()
if output_format == OUTPUT_FORMAT_JPEG:
img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY,
subsampling=0, optimize=True)
else:
img.save(buffer, format='PNG', optimize=True)
return buffer.getvalue()
def _rgb_to_ycbcr(rgb: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Convert RGB array to YCbCr components.
Uses ITU-R BT.601 conversion (standard for JPEG).
Args:
rgb: RGB image array (H, W, 3), float64
Returns:
Tuple of (Y, Cb, Cr) arrays
"""
R = rgb[:, :, 0]
G = rgb[:, :, 1]
B = rgb[:, :, 2]
# ITU-R BT.601 conversion
Y = 0.299 * R + 0.587 * G + 0.114 * B
Cb = 128 - 0.168736 * R - 0.331264 * G + 0.5 * B
Cr = 128 + 0.5 * R - 0.418688 * G - 0.081312 * B
return Y, Cb, Cr
def _ycbcr_to_rgb(Y: np.ndarray, Cb: np.ndarray, Cr: np.ndarray) -> np.ndarray:
"""
Convert YCbCr components back to RGB array.
Args:
Y: Luminance channel
Cb: Blue-difference chroma
Cr: Red-difference chroma
Returns:
RGB array (H, W, 3)
"""
R = Y + 1.402 * (Cr - 128)
G = Y - 0.344136 * (Cb - 128) - 0.714136 * (Cr - 128)
B = Y + 1.772 * (Cb - 128)
rgb = np.stack([R, G, B], axis=-1)
return rgb
def _create_header(data_length: int, flags: int = 0) -> bytes:
"""Create DCT stego header."""
# Header format: MAGIC(4) + VERSION(1) + FLAGS(1) + LENGTH(4)
version = 1
return struct.pack('>4sBBI', DCT_MAGIC, version, flags, data_length)
def _parse_header(header_bits: list[int]) -> tuple[int, int, int]:
def _parse_header(header_bits: list) -> Tuple[int, int, int]:
"""Parse header from extracted bits. Returns (version, flags, data_length)."""
if len(header_bits) < HEADER_SIZE * 8:
raise ValueError("Insufficient header data")
# Convert bits to bytes
header_bytes = bytes([
sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(HEADER_SIZE)
@@ -245,7 +316,80 @@ def _parse_header(header_bits: list[int]) -> tuple[int, int, int]:
magic, version, flags, length = struct.unpack('>4sBBI', header_bytes)
if magic != DCT_MAGIC:
raise ValueError("Invalid DCT stego magic bytes - not a DCT stego image")
raise ValueError("Invalid DCT stego magic bytes")
return version, flags, length
# ============================================================================
# JPEGIO HELPERS (for proper JPEG output)
# ============================================================================
def _jpegio_bytes_to_file(data: bytes, suffix: str = '.jpg') -> str:
"""Write bytes to temp file for jpegio."""
import tempfile
import os
fd, path = tempfile.mkstemp(suffix=suffix)
try:
os.write(fd, data)
finally:
os.close(fd)
return path
def _jpegio_file_to_bytes(path: str) -> bytes:
"""Read file to bytes and delete it."""
import os
try:
with open(path, 'rb') as f:
return f.read()
finally:
try:
os.unlink(path)
except OSError:
pass
def _jpegio_get_usable_positions(coef_array: np.ndarray) -> list:
"""Get usable coefficient positions for jpegio embedding."""
positions = []
h, w = coef_array.shape
for row in range(h):
for col in range(w):
# Skip DC coefficients
if (row % BLOCK_SIZE == 0) and (col % BLOCK_SIZE == 0):
continue
# Check magnitude
if abs(coef_array[row, col]) >= JPEGIO_MIN_COEF_MAGNITUDE:
positions.append((row, col))
return positions
def _jpegio_generate_order(num_positions: int, seed: bytes) -> list:
"""Generate pseudo-random order for jpegio embedding."""
hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest()
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
order = list(range(num_positions))
rng.shuffle(order)
return order
def _jpegio_create_header(data_length: int) -> bytes:
"""Create header for jpegio embedding."""
return struct.pack('>4sBBI', JPEGIO_MAGIC, 1, 0, data_length)
def _jpegio_parse_header(header_bytes: bytes) -> Tuple[int, int, int]:
"""Parse jpegio header."""
if len(header_bytes) < HEADER_SIZE:
raise ValueError("Insufficient header data")
magic, version, flags, length = struct.unpack('>4sBBI', header_bytes[:HEADER_SIZE])
if magic != JPEGIO_MAGIC:
raise ValueError(f"Invalid JPEG stego magic: {magic}")
return version, flags, length
@@ -254,11 +398,6 @@ def _parse_header(header_bits: list[int]) -> tuple[int, int, int]:
# PUBLIC API
# ============================================================================
def has_dct_support() -> bool:
"""Check if DCT steganography is available."""
return HAS_SCIPY
def calculate_dct_capacity(image_data: bytes) -> DCTCapacityInfo:
"""
Calculate the DCT embedding capacity of an image.
@@ -274,19 +413,13 @@ def calculate_dct_capacity(image_data: bytes) -> DCTCapacityInfo:
img = Image.open(io.BytesIO(image_data))
width, height = img.size
# Calculate blocks
blocks_x = width // BLOCK_SIZE
blocks_y = height // BLOCK_SIZE
total_blocks = blocks_x * blocks_y
# Bits per block (using selected coefficient positions)
bits_per_block = len(DEFAULT_EMBED_POSITIONS)
# Total capacity
total_bits = total_blocks * bits_per_block
total_bytes = total_bits // 8
# Usable capacity (minus header)
usable_bytes = max(0, total_bytes - HEADER_SIZE)
return DCTCapacityInfo(
@@ -303,43 +436,23 @@ def calculate_dct_capacity(image_data: bytes) -> DCTCapacityInfo:
def will_fit_dct(data_length: int, image_data: bytes) -> bool:
"""
Check if data will fit in the image using DCT embedding.
Args:
data_length: Length of data in bytes
image_data: Carrier image bytes
Returns:
True if data fits, False otherwise
"""
"""Check if data will fit in the image using DCT embedding."""
capacity = calculate_dct_capacity(image_data)
return data_length <= capacity.usable_capacity_bytes
def estimate_capacity_comparison(image_data: bytes) -> dict:
"""
Compare LSB and DCT capacity for an image.
Args:
image_data: Image file bytes
Returns:
Dict with 'lsb' and 'dct' capacity info
"""
"""Compare LSB and DCT capacity for an image."""
img = Image.open(io.BytesIO(image_data))
width, height = img.size
pixels = width * height
# LSB capacity (3 bits per pixel for RGB, simplified)
lsb_bytes = (pixels * 3) // 8
# DCT capacity
if HAS_SCIPY:
dct_info = calculate_dct_capacity(image_data)
dct_bytes = dct_info.usable_capacity_bytes
else:
# Estimate without scipy
blocks = (width // 8) * (height // 8)
dct_bytes = (blocks * 16) // 8 - HEADER_SIZE
@@ -357,6 +470,10 @@ def estimate_capacity_comparison(image_data: bytes) -> dict:
'output': 'PNG or JPEG (grayscale)',
'ratio_vs_lsb': (dct_bytes / lsb_bytes * 100) if lsb_bytes > 0 else 0,
'available': HAS_SCIPY,
},
'jpeg_native': {
'available': HAS_JPEGIO,
'note': 'Uses jpegio for proper JPEG coefficient embedding',
}
}
@@ -366,30 +483,60 @@ def embed_in_dct(
carrier_image: bytes,
seed: bytes,
output_format: str = OUTPUT_FORMAT_PNG,
) -> tuple[bytes, DCTEmbedStats]:
color_mode: str = 'color', # v3.0.1: 'color' or 'grayscale'
) -> Tuple[bytes, DCTEmbedStats]:
"""
Embed data into image using DCT coefficient modification.
For PNG output: Uses scipy DCT transform
For JPEG output: Uses jpegio if available for proper coefficient embedding
Args:
data: Data to embed
carrier_image: Carrier image bytes
seed: Seed for pseudo-random block selection
output_format: Output format - 'png' (default, lossless) or 'jpeg' (smaller)
seed: Seed for pseudo-random selection
output_format: 'png' (default, lossless) or 'jpeg'
color_mode: 'color' (preserve colors) or 'grayscale' (v3.0.1+)
Returns:
Tuple of (stego_image_bytes, stats)
Raises:
ImportError: If scipy is not available
ValueError: If data is too large for carrier
"""
_check_scipy()
# Validate output format
if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG):
raise ValueError(f"Invalid output format: {output_format}. Use 'png' or 'jpeg'")
raise ValueError(f"Invalid output format: {output_format}")
# Calculate capacity
# Validate color mode
if color_mode not in ('color', 'grayscale'):
color_mode = 'color' # Default to color
# For JPEG output, try to use jpegio for proper coefficient embedding
# Note: jpegio naturally preserves color (works in YCbCr space)
if output_format == OUTPUT_FORMAT_JPEG:
if HAS_JPEGIO:
return _embed_jpegio(data, carrier_image, seed, color_mode)
else:
# Fall back to scipy + PIL JPEG (WARNING: may not decode properly)
import warnings
warnings.warn(
"jpegio not available. JPEG output may not decode correctly. "
"Install jpegio for proper JPEG steganography support.",
RuntimeWarning
)
# Continue with scipy method but output as JPEG
# PNG output or JPEG fallback: use scipy DCT method
_check_scipy()
return _embed_scipy_dct(data, carrier_image, seed, output_format, color_mode)
def _embed_scipy_dct(
data: bytes,
carrier_image: bytes,
seed: bytes,
output_format: str,
color_mode: str = 'color',
) -> Tuple[bytes, DCTEmbedStats]:
"""Embed using scipy DCT (for PNG output), with color preservation option."""
capacity_info = calculate_dct_capacity(carrier_image)
if len(data) > capacity_info.usable_capacity_bytes:
@@ -398,69 +545,216 @@ def embed_in_dct(
f"(capacity: {capacity_info.usable_capacity_bytes} bytes)"
)
# Prepare image
image = _to_grayscale(carrier_image)
padded, original_size = _pad_to_blocks(image)
# Load image
img = Image.open(io.BytesIO(carrier_image))
width, height = img.size
# Create header + data
if color_mode == 'color' and img.mode in ('RGB', 'RGBA'):
# Color mode: convert to YCbCr, embed in Y only, preserve Cb/Cr
if img.mode == 'RGBA':
img = img.convert('RGB')
rgb_array = np.array(img, dtype=np.float64)
Y, Cb, Cr = _rgb_to_ycbcr(rgb_array)
# Pad Y channel
Y_padded, original_size = _pad_to_blocks(Y)
# Embed in Y channel
Y_embedded = _embed_in_channel(Y_padded, data, seed, capacity_info)
# Unpad
Y_result = _unpad_image(Y_embedded, original_size)
# Convert back to RGB
result_rgb = _ycbcr_to_rgb(Y_result, Cb, Cr)
# Save as color image
stego_bytes = _save_color_image(result_rgb, output_format)
else:
# Grayscale mode: original behavior
image = _to_grayscale(carrier_image)
padded, original_size = _pad_to_blocks(image)
embedded = _embed_in_channel(padded, data, seed, capacity_info)
result = _unpad_image(embedded, original_size)
stego_bytes = _save_stego_image(result, output_format)
# Calculate stats
header = _create_header(len(data))
payload = header + data
bits = len(payload) * 8
stats = DCTEmbedStats(
blocks_used=(bits + len(DEFAULT_EMBED_POSITIONS) - 1) // len(DEFAULT_EMBED_POSITIONS),
blocks_available=capacity_info.total_blocks,
bits_embedded=bits,
capacity_bits=capacity_info.total_capacity_bits,
usage_percent=(bits / capacity_info.total_capacity_bits) * 100,
image_width=width,
image_height=height,
output_format=output_format,
jpeg_native=False,
color_mode=color_mode,
)
return stego_bytes, stats
def _embed_in_channel(
channel: np.ndarray,
data: bytes,
seed: bytes,
capacity_info: DCTCapacityInfo,
) -> np.ndarray:
"""Embed data in a single channel using DCT."""
header = _create_header(len(data))
payload = header + data
# Convert payload to bits
bits = []
for byte in payload:
for i in range(7, -1, -1):
bits.append((byte >> i) & 1)
# Generate block order
num_blocks = capacity_info.total_blocks
block_order = _generate_block_order(num_blocks, seed)
# Embed bits
bit_idx = 0
blocks_used = 0
h, w = padded.shape
h, w = channel.shape
result = channel.copy()
bit_idx = 0
for block_num in block_order:
if bit_idx >= len(bits):
break
# Calculate block position
by = (block_num // (w // BLOCK_SIZE)) * BLOCK_SIZE
bx = (block_num % (w // BLOCK_SIZE)) * BLOCK_SIZE
# Extract and transform block
block = padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE].copy()
block = result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE].copy()
dct_block = _dct2(block)
# Embed bits in selected coefficients
for pos in DEFAULT_EMBED_POSITIONS:
if bit_idx >= len(bits):
break
dct_block[pos] = _embed_bit_in_coeff(dct_block[pos], bits[bit_idx])
bit_idx += 1
# Inverse transform and store
modified_block = _idct2(dct_block)
padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] = modified_block
blocks_used += 1
result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] = modified_block
# Remove padding and save
result = _unpad_image(padded, original_size)
stego_bytes = _save_stego_image(result, output_format)
return result
def _embed_jpegio(
data: bytes,
carrier_image: bytes,
seed: bytes,
color_mode: str = 'color',
) -> Tuple[bytes, DCTEmbedStats]:
"""
Embed using jpegio for proper JPEG coefficient modification.
stats = DCTEmbedStats(
blocks_used=blocks_used,
blocks_available=capacity_info.total_blocks,
bits_embedded=len(bits),
capacity_bits=capacity_info.total_capacity_bits,
usage_percent=(len(bits) / capacity_info.total_capacity_bits) * 100,
image_width=original_size[1],
image_height=original_size[0],
output_format=output_format,
)
Note: jpegio naturally preserves color since JPEG stores YCbCr
and we only modify Y channel coefficients.
"""
import tempfile
import os
return stego_bytes, stats
# Check if carrier is JPEG - if not, convert it
img = Image.open(io.BytesIO(carrier_image))
width, height = img.size
if img.format != 'JPEG':
# Convert to JPEG first
buffer = io.BytesIO()
if img.mode != 'RGB':
img = img.convert('RGB')
img.save(buffer, format='JPEG', quality=95, subsampling=0)
carrier_image = buffer.getvalue()
# Write carrier to temp file
input_path = _jpegio_bytes_to_file(carrier_image, suffix='.jpg')
output_path = tempfile.mktemp(suffix='.jpg')
try:
# Read JPEG with jpegio
jpeg = jio.read(input_path)
# Get Y channel coefficients (channel 0)
# For grayscale mode, we could convert to grayscale, but jpegio
# works with the original JPEG which already has color info.
# The color_mode primarily affects the output interpretation.
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
# Find usable positions
all_positions = _jpegio_get_usable_positions(coef_array)
# Generate pseudo-random order
order = _jpegio_generate_order(len(all_positions), seed)
# Create payload
header = _jpegio_create_header(len(data))
payload = header + data
# Convert to bits
bits = []
for byte in payload:
for i in range(7, -1, -1):
bits.append((byte >> i) & 1)
if len(bits) > len(all_positions):
raise ValueError(
f"Payload too large: {len(bits)} bits, "
f"only {len(all_positions)} usable coefficients"
)
# Embed using LSB
coefs_used = 0
for bit_idx, pos_idx in enumerate(order):
if bit_idx >= len(bits):
break
row, col = all_positions[pos_idx]
coef = coef_array[row, col]
# Embed bit in LSB
if (coef & 1) != bits[bit_idx]:
if coef > 0:
coef_array[row, col] = coef - 1 if (coef & 1) else coef + 1
else:
coef_array[row, col] = coef + 1 if (coef & 1) else coef - 1
coefs_used += 1
# Write modified JPEG
jio.write(jpeg, output_path)
# Read back as bytes
with open(output_path, 'rb') as f:
stego_bytes = f.read()
stats = DCTEmbedStats(
blocks_used=coefs_used // 63, # Approximate blocks
blocks_available=len(all_positions) // 63,
bits_embedded=len(bits),
capacity_bits=len(all_positions),
usage_percent=(len(bits) / len(all_positions)) * 100 if all_positions else 0,
image_width=width,
image_height=height,
output_format=OUTPUT_FORMAT_JPEG,
jpeg_native=True,
color_mode=color_mode, # JPEG naturally preserves color
)
return stego_bytes, stats
finally:
for path in [input_path, output_path]:
try:
os.unlink(path)
except OSError:
pass
def extract_from_dct(
@@ -470,33 +764,43 @@ def extract_from_dct(
"""
Extract data from DCT stego image.
Automatically detects whether image uses scipy DCT or jpegio embedding.
Args:
stego_image: Stego image bytes
seed: Same seed used for embedding
Returns:
Extracted data bytes
Raises:
ImportError: If scipy is not available
ValueError: If image is not a valid DCT stego image
"""
_check_scipy()
# Check image format
img = Image.open(io.BytesIO(stego_image))
# Prepare image
if img.format == 'JPEG' and HAS_JPEGIO:
# Try jpegio extraction first
try:
return _extract_jpegio(stego_image, seed)
except ValueError:
# If jpegio magic not found, fall back to scipy method
pass
# PNG or fallback: use scipy DCT method
_check_scipy()
return _extract_scipy_dct(stego_image, seed)
def _extract_scipy_dct(stego_image: bytes, seed: bytes) -> bytes:
"""Extract using scipy DCT (for PNG images)."""
image = _to_grayscale(stego_image)
padded, original_size = _pad_to_blocks(image)
# Calculate capacity
h, w = padded.shape
blocks_x = w // BLOCK_SIZE
blocks_y = h // BLOCK_SIZE
num_blocks = blocks_x * blocks_y
# Generate same block order
block_order = _generate_block_order(num_blocks, seed)
# Extract all bits (we'll stop when we have enough based on header)
all_bits = []
for block_num in block_order:
@@ -510,7 +814,6 @@ def extract_from_dct(
bit = _extract_bit_from_coeff(dct_block[pos])
all_bits.append(bit)
# Check if we have enough for header
if len(all_bits) >= HEADER_SIZE * 8:
try:
_, _, data_length = _parse_header(all_bits[:HEADER_SIZE * 8])
@@ -518,16 +821,12 @@ def extract_from_dct(
if len(all_bits) >= total_needed:
break
except ValueError:
# Not enough data yet or invalid, continue
pass
# Parse header
version, flags, data_length = _parse_header(all_bits)
# Extract data bits
data_bits = all_bits[HEADER_SIZE * 8:(HEADER_SIZE + data_length) * 8]
# Convert bits to bytes
data = bytes([
sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(data_length)
@@ -536,6 +835,61 @@ def extract_from_dct(
return data
def _extract_jpegio(stego_image: bytes, seed: bytes) -> bytes:
"""Extract using jpegio for JPEG images."""
import os
temp_path = _jpegio_bytes_to_file(stego_image, suffix='.jpg')
try:
jpeg = jio.read(temp_path)
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
all_positions = _jpegio_get_usable_positions(coef_array)
order = _jpegio_generate_order(len(all_positions), seed)
# Extract header bits
header_bits = []
for pos_idx in order[:HEADER_SIZE * 8]:
row, col = all_positions[pos_idx]
coef = coef_array[row, col]
header_bits.append(coef & 1)
header_bytes = bytes([
sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(HEADER_SIZE)
])
version, flags, data_length = _jpegio_parse_header(header_bytes)
# Extract all needed bits
total_bits_needed = (HEADER_SIZE + data_length) * 8
all_bits = []
for bit_idx, pos_idx in enumerate(order):
if bit_idx >= total_bits_needed:
break
row, col = all_positions[pos_idx]
coef = coef_array[row, col]
all_bits.append(coef & 1)
# Extract data
data_bits = all_bits[HEADER_SIZE * 8:]
data = bytes([
sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(data_length)
])
return data
finally:
try:
os.unlink(temp_path)
except OSError:
pass
# ============================================================================
# CONVENIENCE FUNCTIONS
# ============================================================================

View File

@@ -11,6 +11,7 @@ New in v3.0:
New in v3.0.1:
- dct_output_format parameter for DCT mode ('png' or 'jpeg')
- dct_color_mode parameter for DCT mode ('grayscale' or 'color')
"""
import io
@@ -59,6 +60,10 @@ ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX
DCT_OUTPUT_PNG = 'png'
DCT_OUTPUT_JPEG = 'jpeg'
# DCT color mode options (v3.0.1)
DCT_COLOR_GRAYSCALE = 'grayscale'
DCT_COLOR_COLOR = 'color'
# =============================================================================
# DCT MODULE LAZY LOADING
@@ -477,6 +482,7 @@ def embed_in_image(
output_format: Optional[str] = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = DCT_OUTPUT_PNG, # NEW in v3.0.1
dct_color_mode: str = 'grayscale', # NEW in v3.0.1: 'grayscale' or 'color'
) -> Tuple[bytes, Union[EmbedStats, 'DCTEmbedStats'], str]:
"""
Embed data into an image using specified mode.
@@ -489,6 +495,7 @@ def embed_in_image(
output_format: Force output format (LSB mode only)
embed_mode: 'lsb' (default) or 'dct'
dct_output_format: For DCT mode - 'png' (lossless) or 'jpeg' (smaller)
dct_color_mode: For DCT mode - 'grayscale' (default) or 'color' (preserves colors)
Returns:
Tuple of (stego image bytes, stats, file extension)
@@ -515,14 +522,20 @@ def embed_in_image(
debug.print(f"Invalid dct_output_format '{dct_output_format}', defaulting to PNG")
dct_output_format = DCT_OUTPUT_PNG
# Validate DCT color mode (v3.0.1)
if dct_color_mode not in ('grayscale', 'color'):
debug.print(f"Invalid dct_color_mode '{dct_color_mode}', defaulting to grayscale")
dct_color_mode = 'grayscale'
dct_mod = _get_dct_module()
# Pass output_format to DCT module (v3.0.1)
# Pass output_format and color_mode to DCT module (v3.0.1)
stego_bytes, dct_stats = dct_mod.embed_in_dct(
data,
image_data,
pixel_key,
output_format=dct_output_format,
color_mode=dct_color_mode, # NEW in v3.0.1
)
# Determine extension based on output format
@@ -531,7 +544,8 @@ def embed_in_image(
else:
ext = 'png'
debug.print(f"DCT embedding complete: {dct_output_format.upper()} output, ext={ext}")
debug.print(f"DCT embedding complete: {dct_output_format.upper()} output, "
f"color_mode={dct_color_mode}, ext={ext}")
return stego_bytes, dct_stats, ext
# LSB MODE