Add jpegtran lossless rotation and EXIF orientation handling
DCT steganography improvements: - Add _apply_exif_orientation() to fix portrait photos encoding rotated - Add _jpegtran_rotate() for lossless JPEG rotation preserving DCT data - Add rotation fallback in extract_from_dct() - tries 0°, 90°, 180°, 270° - Quick header validation to skip invalid rotations efficiently - Fix: wrap debug.print in try/except to prevent extraction failures Web UI rotate tool: - Use jpegtran for JPEGs (lossless, preserves DCT steganography) - Fall back to PIL for non-JPEGs - Dynamic UI shows "DCT Safe" for JPEGs, warning for other formats This enables the workflow: encode → compress → rotate → decode Rotated stego JPEGs can now be decoded by trying all orientations. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -35,7 +35,7 @@ from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from PIL import Image, ImageOps
|
||||
|
||||
# Check for scipy availability (for PNG/DCT mode)
|
||||
# Prefer scipy.fft (newer, more stable) over scipy.fftpack
|
||||
@@ -406,6 +406,45 @@ def _safe_idct2(block: np.ndarray) -> np.ndarray:
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def _apply_exif_orientation(image_data: bytes) -> bytes:
|
||||
"""
|
||||
Apply EXIF orientation to image and return corrected bytes.
|
||||
|
||||
Portrait photos from cameras often have EXIF orientation metadata that
|
||||
tells viewers to rotate the image for display. However, the raw pixel
|
||||
data is stored in landscape orientation. This function applies that
|
||||
rotation to the pixel data so the output matches what users expect.
|
||||
|
||||
Without this, a portrait photo encoded with DCT would come out rotated
|
||||
90 degrees because we'd embed in the raw (landscape) orientation.
|
||||
"""
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
original_format = img.format or "JPEG"
|
||||
|
||||
# Apply EXIF orientation (rotates/flips pixels to match EXIF tag)
|
||||
# This also removes the EXIF orientation tag since it's now baked in
|
||||
corrected = ImageOps.exif_transpose(img)
|
||||
|
||||
# If no change was needed, return original data unchanged
|
||||
if corrected is img:
|
||||
img.close()
|
||||
return image_data
|
||||
|
||||
# Save corrected image back to bytes
|
||||
output = io.BytesIO()
|
||||
if original_format == "JPEG":
|
||||
if corrected.mode in ("RGBA", "P"):
|
||||
corrected = corrected.convert("RGB")
|
||||
corrected.save(output, format="JPEG", quality=95)
|
||||
else:
|
||||
corrected.save(output, format="PNG")
|
||||
|
||||
img.close()
|
||||
corrected.close()
|
||||
output.seek(0)
|
||||
return output.getvalue()
|
||||
|
||||
|
||||
def _to_grayscale(image_data: bytes) -> np.ndarray:
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
gray = img.convert("L")
|
||||
@@ -763,6 +802,10 @@ def embed_in_dct(
|
||||
if color_mode not in ("color", "grayscale"):
|
||||
color_mode = "color"
|
||||
|
||||
# Apply EXIF orientation to carrier image before embedding
|
||||
# This ensures portrait photos are embedded in their correct visual orientation
|
||||
carrier_image = _apply_exif_orientation(carrier_image)
|
||||
|
||||
if output_format == OUTPUT_FORMAT_JPEG and HAS_JPEGIO:
|
||||
return _embed_jpegio(data, carrier_image, seed, color_mode, progress_file)
|
||||
|
||||
@@ -1173,24 +1216,259 @@ def _embed_jpegio(
|
||||
pass
|
||||
|
||||
|
||||
def _jpegtran_available() -> bool:
|
||||
"""Check if jpegtran is available on the system."""
|
||||
import shutil
|
||||
return shutil.which("jpegtran") is not None
|
||||
|
||||
|
||||
def _jpegtran_rotate(image_data: bytes, rotation: int) -> bytes:
|
||||
"""
|
||||
Losslessly rotate a JPEG using jpegtran.
|
||||
|
||||
This preserves DCT coefficients by rearranging blocks rather than
|
||||
re-encoding. Essential for rotating stego images without destroying
|
||||
the hidden data.
|
||||
|
||||
Args:
|
||||
image_data: JPEG image bytes
|
||||
rotation: Degrees clockwise (90, 180, or 270)
|
||||
|
||||
Returns:
|
||||
Rotated JPEG bytes with DCT coefficients preserved
|
||||
"""
|
||||
import subprocess
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
if rotation not in (90, 180, 270):
|
||||
raise ValueError(f"Invalid rotation: {rotation}")
|
||||
|
||||
# Write input to temp file
|
||||
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
||||
f.write(image_data)
|
||||
input_path = f.name
|
||||
|
||||
output_path = tempfile.mktemp(suffix=".jpg")
|
||||
|
||||
try:
|
||||
# jpegtran -rotate 90|180|270 -copy all -perfect
|
||||
# -copy all: preserve all metadata
|
||||
# -perfect: fail if there are non-transformable edge blocks (rare)
|
||||
result = subprocess.run(
|
||||
["jpegtran", "-rotate", str(rotation), "-copy", "all", "-perfect",
|
||||
"-outfile", output_path, input_path],
|
||||
capture_output=True,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
# If -perfect fails (edge blocks), retry without it
|
||||
if result.returncode != 0:
|
||||
result = subprocess.run(
|
||||
["jpegtran", "-rotate", str(rotation), "-copy", "all", "-trim",
|
||||
"-outfile", output_path, input_path],
|
||||
capture_output=True,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"jpegtran failed: {result.stderr.decode()}")
|
||||
|
||||
with open(output_path, "rb") as f:
|
||||
return f.read()
|
||||
finally:
|
||||
for path in [input_path, output_path]:
|
||||
try:
|
||||
os.unlink(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _rotate_image_bytes(image_data: bytes, rotation: int, lossless: bool = True) -> bytes:
|
||||
"""
|
||||
Rotate image by 90, 180, or 270 degrees and return as bytes.
|
||||
|
||||
For JPEGs with lossless=True (default), uses jpegtran to preserve DCT
|
||||
coefficients. This is essential for rotating stego images.
|
||||
|
||||
For PNGs or when jpegtran is unavailable, uses PIL (which re-encodes
|
||||
but PNGs are lossless anyway).
|
||||
"""
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
original_format = img.format or "PNG"
|
||||
img.close()
|
||||
|
||||
# Use jpegtran for lossless JPEG rotation
|
||||
if lossless and original_format == "JPEG" and _jpegtran_available():
|
||||
return _jpegtran_rotate(image_data, rotation)
|
||||
|
||||
# Fallback to PIL for PNGs or when jpegtran unavailable
|
||||
img = Image.open(io.BytesIO(image_data))
|
||||
|
||||
# PIL rotation is counter-clockwise, we want clockwise
|
||||
# 90 CW = 270 CCW, 180 = 180, 270 CW = 90 CCW
|
||||
pil_rotation = {90: 270, 180: 180, 270: 90}[rotation]
|
||||
rotated = img.rotate(pil_rotation, expand=True)
|
||||
|
||||
output = io.BytesIO()
|
||||
# Save in original format if possible, fallback to PNG
|
||||
save_format = original_format if original_format in ("JPEG", "PNG") else "PNG"
|
||||
if save_format == "JPEG":
|
||||
rotated.save(output, format="JPEG", quality=95)
|
||||
else:
|
||||
rotated.save(output, format="PNG")
|
||||
output.seek(0)
|
||||
return output.getvalue()
|
||||
|
||||
|
||||
def _quick_validate_dct_header(image_data: bytes, seed: bytes) -> bool:
|
||||
"""
|
||||
Quick validation that only extracts enough DCT data to check magic bytes.
|
||||
Returns True if header looks valid, False otherwise.
|
||||
|
||||
This is much faster than full extraction - only processes first ~8 blocks.
|
||||
"""
|
||||
try:
|
||||
# Convert to grayscale for quick check
|
||||
gray = _to_grayscale(image_data)
|
||||
height, width = gray.shape
|
||||
padded, _ = _pad_to_blocks(gray)
|
||||
padded_h, padded_w = padded.shape
|
||||
blocks_x = padded_w // BLOCK_SIZE
|
||||
num_blocks = (padded_h // BLOCK_SIZE) * blocks_x
|
||||
|
||||
# Generate block order
|
||||
block_order = _generate_block_order(num_blocks, seed)
|
||||
|
||||
# Only extract first 8 blocks (enough for RS length prefix + header)
|
||||
# 8 blocks * 16 bits/block = 128 bits = 16 bytes (covers RS prefix)
|
||||
blocks_needed = min(8, len(block_order))
|
||||
|
||||
all_bits = []
|
||||
for block_num in block_order[:blocks_needed]:
|
||||
by = (block_num // blocks_x) * BLOCK_SIZE
|
||||
bx = (block_num % blocks_x) * BLOCK_SIZE
|
||||
block = padded[by : by + BLOCK_SIZE, bx : bx + BLOCK_SIZE].astype(np.float32)
|
||||
|
||||
dct_block = dctn(block, norm="ortho")
|
||||
|
||||
for row, col in EMBED_POSITIONS:
|
||||
coef = dct_block[row, col]
|
||||
bit = _extract_bit_from_coeff(coef)
|
||||
all_bits.append(bit)
|
||||
|
||||
# Check RS format first (3 copies of 8-byte length header)
|
||||
if len(all_bits) >= RS_LENGTH_PREFIX_SIZE * 8:
|
||||
length_prefix_bits = all_bits[: RS_LENGTH_PREFIX_SIZE * 8]
|
||||
length_prefix_bytes = bytes(
|
||||
[
|
||||
sum(length_prefix_bits[i * 8 : (i + 1) * 8][j] << (7 - j) for j in range(8))
|
||||
for i in range(RS_LENGTH_PREFIX_SIZE)
|
||||
]
|
||||
)
|
||||
|
||||
# Check if 2+ copies match (indicates valid RS format)
|
||||
copies = []
|
||||
for i in range(RS_LENGTH_COPIES):
|
||||
start = i * RS_LENGTH_HEADER_SIZE
|
||||
end = start + RS_LENGTH_HEADER_SIZE
|
||||
copies.append(length_prefix_bytes[start:end])
|
||||
|
||||
from collections import Counter
|
||||
counter = Counter(copies)
|
||||
_, count = counter.most_common(1)[0]
|
||||
|
||||
if count >= 2:
|
||||
return True # Looks like valid RS format
|
||||
|
||||
# Check legacy format (magic bytes in first 10 bytes)
|
||||
if len(all_bits) >= HEADER_SIZE * 8:
|
||||
try:
|
||||
_parse_header(all_bits[: HEADER_SIZE * 8])
|
||||
return True # Magic bytes matched
|
||||
except (ValueError, InvalidMagicBytesError):
|
||||
pass
|
||||
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def extract_from_dct(
|
||||
stego_image: bytes,
|
||||
seed: bytes,
|
||||
progress_file: str | None = None,
|
||||
) -> bytes:
|
||||
"""Extract data from DCT stego image."""
|
||||
img = Image.open(io.BytesIO(stego_image))
|
||||
fmt = img.format
|
||||
img.close()
|
||||
"""
|
||||
Extract data from DCT stego image.
|
||||
|
||||
if fmt == "JPEG" and HAS_JPEGIO:
|
||||
If extraction fails with InvalidMagicBytesError, automatically tries
|
||||
90°, 180°, and 270° rotations to handle images that were rotated after
|
||||
encoding (e.g., by external tools or EXIF orientation changes).
|
||||
|
||||
Uses quick header validation to skip obviously invalid rotations.
|
||||
"""
|
||||
rotations_to_try = [0, 90, 180, 270]
|
||||
last_error = None
|
||||
valid_rotations = []
|
||||
|
||||
# Phase 1: Quick validation to find candidate rotations
|
||||
for rotation in rotations_to_try:
|
||||
if rotation == 0:
|
||||
image_to_check = stego_image
|
||||
else:
|
||||
image_to_check = _rotate_image_bytes(stego_image, rotation)
|
||||
|
||||
if _quick_validate_dct_header(image_to_check, seed):
|
||||
valid_rotations.append((rotation, image_to_check))
|
||||
|
||||
# If no rotations pass quick check, try all anyway (fallback)
|
||||
if not valid_rotations:
|
||||
# Must try all rotations - quick validation might have failed due to
|
||||
# scipy vs jpegio differences or other edge cases
|
||||
for rotation in rotations_to_try:
|
||||
if rotation == 0:
|
||||
valid_rotations.append((0, stego_image))
|
||||
else:
|
||||
valid_rotations.append((rotation, _rotate_image_bytes(stego_image, rotation)))
|
||||
|
||||
# Phase 2: Full extraction on valid candidates
|
||||
for rotation, image_to_decode in valid_rotations:
|
||||
try:
|
||||
return _extract_jpegio(stego_image, seed, progress_file)
|
||||
except ValueError:
|
||||
pass
|
||||
img = Image.open(io.BytesIO(image_to_decode))
|
||||
fmt = img.format
|
||||
img.close()
|
||||
|
||||
_check_scipy()
|
||||
return _extract_scipy_dct_safe(stego_image, seed, progress_file)
|
||||
if fmt == "JPEG" and HAS_JPEGIO:
|
||||
try:
|
||||
result = _extract_jpegio(image_to_decode, seed, progress_file)
|
||||
if rotation != 0:
|
||||
try:
|
||||
from . import debug
|
||||
debug.print(f"DCT decode succeeded after {rotation}° rotation")
|
||||
except Exception:
|
||||
pass # Don't let debug logging break extraction
|
||||
return result
|
||||
except (ValueError, InvalidMagicBytesError) as e:
|
||||
last_error = e if isinstance(e, InvalidMagicBytesError) else last_error
|
||||
continue
|
||||
|
||||
_check_scipy()
|
||||
result = _extract_scipy_dct_safe(image_to_decode, seed, progress_file)
|
||||
if rotation != 0:
|
||||
try:
|
||||
from . import debug
|
||||
debug.print(f"DCT decode succeeded after {rotation}° rotation")
|
||||
except Exception:
|
||||
pass # Don't let debug logging break extraction
|
||||
return result
|
||||
|
||||
except InvalidMagicBytesError as e:
|
||||
last_error = e
|
||||
continue
|
||||
|
||||
# All rotations failed
|
||||
raise last_error or InvalidMagicBytesError("Not a Stegasoo image (tried all rotations)")
|
||||
|
||||
|
||||
def _extract_scipy_dct_safe(
|
||||
|
||||
Reference in New Issue
Block a user