diff --git a/TODO-4.2.1.md b/TODO-4.2.1.md new file mode 100644 index 0000000..d2fb4b5 --- /dev/null +++ b/TODO-4.2.1.md @@ -0,0 +1,29 @@ +# Stegasoo 4.2.1 Plan + +## Bugs +- [ ] Fix EXIF viewer panel not loading metadata in Web UI +- [x] DCT mode: portrait photos export rotated 90° (EXIF orientation not handled) + - Added `_apply_exif_orientation()` to apply EXIF rotation before embedding +- [x] DCT mode: add rotation fallback (try as-is, rotate 90°, retry on failure) + - Added rotation fallback in `extract_from_dct()` with quick header validation +- [x] Rotate tool: use jpegtran for lossless JPEG rotation (preserves DCT stego!) + - Web UI rotate tool now uses jpegtran for JPEGs + - DCT decode rotation fallback now uses jpegtran for JPEGs + - Dynamic UI shows "DCT Safe" for JPEGs, warning for other formats + +## Tools Audit +- [ ] Web UI tools - full shakedown and fixes +- [ ] CLI tools - full shakedown and fixes + +## AUR Packages +- [ ] `stegasoo-cli` - standalone CLI package (no web dependencies) +- [ ] `stegasoo-api` - REST API package (needs auth overhaul first) + +## API Auth Work (blocking stegasoo-api) +- [ ] Implement OAuth2 authentication +- [ ] TLS 1.3 support with self-signed certificates +- [ ] Figure out cert trust/distribution for clients + +## API Documentation +- [ ] Postman collection +- [ ] Environment variable templates diff --git a/frontends/web/app.py b/frontends/web/app.py index 28cd708..f7bdc06 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -2100,8 +2100,11 @@ def api_tools_exif_clear(): @app.route("/api/tools/rotate", methods=["POST"]) @login_required def api_tools_rotate(): - """Rotate and/or flip an image.""" + """Rotate and/or flip an image, using lossless jpegtran for JPEGs.""" from PIL import Image + import shutil + import subprocess + import tempfile image_file = request.files.get("image") if not image_file: @@ -2112,22 +2115,115 @@ def api_tools_rotate(): flip_v = request.form.get("flip_v", "false").lower() == "true" try: - img = Image.open(io.BytesIO(image_file.read())) + image_data = image_file.read() + img = Image.open(io.BytesIO(image_data)) + original_format = img.format # JPEG, PNG, etc. + img.close() - # Apply rotation (PIL rotates counter-clockwise, so negate) - if rotation: - img = img.rotate(-rotation, expand=True) + # For JPEGs, use jpegtran for lossless rotation/flip (preserves DCT stego) + has_jpegtran = shutil.which("jpegtran") is not None + use_jpegtran = original_format == "JPEG" and has_jpegtran and (rotation or flip_h or flip_v) - # Apply flips - if flip_h: - img = img.transpose(Image.FLIP_LEFT_RIGHT) - if flip_v: - img = img.transpose(Image.FLIP_TOP_BOTTOM) + if use_jpegtran: + # Chain jpegtran operations for lossless transformation + current_data = image_data - # Output as PNG (lossless) - buffer = io.BytesIO() - img.save(buffer, format="PNG") - buffer.seek(0) + # Apply rotation first + if rotation in (90, 180, 270): + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(current_data) + input_path = f.name + output_path = tempfile.mktemp(suffix=".jpg") + try: + result = subprocess.run( + ["jpegtran", "-rotate", str(rotation), "-copy", "all", "-trim", + "-outfile", output_path, input_path], + capture_output=True, timeout=30 + ) + if result.returncode == 0: + with open(output_path, "rb") as f: + current_data = f.read() + finally: + for p in [input_path, output_path]: + try: + os.unlink(p) + except OSError: + pass + + # Apply horizontal flip + if flip_h: + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(current_data) + input_path = f.name + output_path = tempfile.mktemp(suffix=".jpg") + try: + result = subprocess.run( + ["jpegtran", "-flip", "horizontal", "-copy", "all", "-trim", + "-outfile", output_path, input_path], + capture_output=True, timeout=30 + ) + if result.returncode == 0: + with open(output_path, "rb") as f: + current_data = f.read() + finally: + for p in [input_path, output_path]: + try: + os.unlink(p) + except OSError: + pass + + # Apply vertical flip + if flip_v: + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(current_data) + input_path = f.name + output_path = tempfile.mktemp(suffix=".jpg") + try: + result = subprocess.run( + ["jpegtran", "-flip", "vertical", "-copy", "all", "-trim", + "-outfile", output_path, input_path], + capture_output=True, timeout=30 + ) + if result.returncode == 0: + with open(output_path, "rb") as f: + current_data = f.read() + finally: + for p in [input_path, output_path]: + try: + os.unlink(p) + except OSError: + pass + + buffer = io.BytesIO(current_data) + mimetype = "image/jpeg" + ext = "jpg" + else: + # Fallback to PIL for non-JPEGs or when jpegtran unavailable + img = Image.open(io.BytesIO(image_data)) + + # Apply rotation (PIL rotates counter-clockwise, so negate) + if rotation: + img = img.rotate(-rotation, expand=True) + + # Apply flips + if flip_h: + img = img.transpose(Image.FLIP_LEFT_RIGHT) + if flip_v: + img = img.transpose(Image.FLIP_TOP_BOTTOM) + + # Preserve original format + buffer = io.BytesIO() + if original_format == "JPEG": + if img.mode in ("RGBA", "P"): + img = img.convert("RGB") + img.save(buffer, format="JPEG", quality=95) + mimetype = "image/jpeg" + ext = "jpg" + else: + img.save(buffer, format="PNG") + mimetype = "image/png" + ext = "png" + buffer.seek(0) stem = ( image_file.filename.rsplit(".", 1)[0] @@ -2136,9 +2232,9 @@ def api_tools_rotate(): ) return send_file( buffer, - mimetype="image/png", + mimetype=mimetype, as_attachment=True, - download_name=f"{stem}_transformed.png", + download_name=f"{stem}_transformed.{ext}", ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 diff --git a/frontends/web/templates/tools.html b/frontends/web/templates/tools.html index 6ddb6dd..f78f7c2 100644 --- a/frontends/web/templates/tools.html +++ b/frontends/web/templates/tools.html @@ -22,17 +22,17 @@
- -
+ +
@@ -634,6 +642,18 @@ setupDropZone('exifZone', 'exifFile', async (file) => { try { const res = await fetch('/api/tools/exif', { method: 'POST', body: formData }); + + // Check for auth redirect or non-JSON response + const contentType = res.headers.get('content-type') || ''; + if (!contentType.includes('application/json')) { + console.error('EXIF API returned non-JSON:', res.status, contentType); + document.getElementById('exifNoData').classList.remove('d-none'); + document.getElementById('exifNoData').innerHTML = 'Session expired - please refresh'; + document.getElementById('exifEmpty').classList.add('d-none'); + document.getElementById('exifData').classList.remove('d-none'); + return; + } + const data = await res.json(); if (data.success) { @@ -643,6 +663,7 @@ setupDropZone('exifZone', 'exifFile', async (file) => { if (entries.length === 0) { tbody.innerHTML = ''; document.getElementById('exifNoData').classList.remove('d-none'); + document.getElementById('exifNoData').innerHTML = 'No metadata found'; } else { document.getElementById('exifNoData').classList.add('d-none'); tbody.innerHTML = entries.map(([key, value]) => { @@ -655,9 +676,20 @@ setupDropZone('exifZone', 'exifFile', async (file) => { document.getElementById('exifEmpty').classList.add('d-none'); document.getElementById('exifData').classList.remove('d-none'); document.getElementById('exifActions').classList.remove('d-none'); + } else { + // API returned success: false + console.error('EXIF API error:', data.error); + document.getElementById('exifNoData').classList.remove('d-none'); + document.getElementById('exifNoData').innerHTML = `${data.error || 'Error reading metadata'}`; + document.getElementById('exifEmpty').classList.add('d-none'); + document.getElementById('exifData').classList.remove('d-none'); } } catch (err) { - console.error(err); + console.error('EXIF fetch error:', err); + document.getElementById('exifNoData').classList.remove('d-none'); + document.getElementById('exifNoData').innerHTML = 'Error loading metadata'; + document.getElementById('exifEmpty').classList.add('d-none'); + document.getElementById('exifData').classList.remove('d-none'); } }); @@ -796,6 +828,11 @@ setupDropZone('rotateZone', 'rotateFile', async (file) => { document.getElementById('rotateData').classList.remove('d-none'); document.getElementById('rotateActions').classList.remove('d-none'); + // Show appropriate DCT warning based on file type + const isJpeg = file.type === 'image/jpeg' || file.name.toLowerCase().match(/\.jpe?g$/); + document.getElementById('rotateJpegSafe').style.display = isJpeg ? 'block' : 'none'; + document.getElementById('rotateNonJpegWarn').style.display = isJpeg ? 'none' : 'block'; + // Load image to get dimensions, then show preview const thumb = document.getElementById('rotateThumb'); const objectUrl = URL.createObjectURL(file); @@ -889,6 +926,8 @@ function clearRotate() { document.getElementById('rotateData').classList.add('d-none'); document.getElementById('rotateActions').classList.add('d-none'); document.getElementById('rotateFileInfo').classList.add('d-none'); + document.getElementById('rotateJpegSafe').style.display = 'none'; + document.getElementById('rotateNonJpegWarn').style.display = 'none'; const thumb = document.getElementById('rotateThumb'); thumb.style.transform = ''; thumb.style.width = ''; @@ -920,8 +959,7 @@ document.getElementById('rotateDownload')?.addEventListener('click', async funct const url = URL.createObjectURL(blob); const a = document.createElement('a'); a.href = url; - const baseName = rotateCurrentFile?.name?.replace(/\.[^.]+$/, '') || 'rotated'; - a.download = `${baseName}_transformed.png`; + a.download = res.headers.get('Content-Disposition')?.split('filename=')[1]?.replace(/"/g, '') || 'rotated.jpg'; document.body.appendChild(a); a.click(); document.body.removeChild(a); diff --git a/src/stegasoo/dct_steganography.py b/src/stegasoo/dct_steganography.py index 0c21ca1..99aabe2 100644 --- a/src/stegasoo/dct_steganography.py +++ b/src/stegasoo/dct_steganography.py @@ -35,7 +35,7 @@ from dataclasses import dataclass from enum import Enum import numpy as np -from PIL import Image +from PIL import Image, ImageOps # Check for scipy availability (for PNG/DCT mode) # Prefer scipy.fft (newer, more stable) over scipy.fftpack @@ -406,6 +406,45 @@ def _safe_idct2(block: np.ndarray) -> np.ndarray: # ============================================================================ +def _apply_exif_orientation(image_data: bytes) -> bytes: + """ + Apply EXIF orientation to image and return corrected bytes. + + Portrait photos from cameras often have EXIF orientation metadata that + tells viewers to rotate the image for display. However, the raw pixel + data is stored in landscape orientation. This function applies that + rotation to the pixel data so the output matches what users expect. + + Without this, a portrait photo encoded with DCT would come out rotated + 90 degrees because we'd embed in the raw (landscape) orientation. + """ + img = Image.open(io.BytesIO(image_data)) + original_format = img.format or "JPEG" + + # Apply EXIF orientation (rotates/flips pixels to match EXIF tag) + # This also removes the EXIF orientation tag since it's now baked in + corrected = ImageOps.exif_transpose(img) + + # If no change was needed, return original data unchanged + if corrected is img: + img.close() + return image_data + + # Save corrected image back to bytes + output = io.BytesIO() + if original_format == "JPEG": + if corrected.mode in ("RGBA", "P"): + corrected = corrected.convert("RGB") + corrected.save(output, format="JPEG", quality=95) + else: + corrected.save(output, format="PNG") + + img.close() + corrected.close() + output.seek(0) + return output.getvalue() + + def _to_grayscale(image_data: bytes) -> np.ndarray: img = Image.open(io.BytesIO(image_data)) gray = img.convert("L") @@ -763,6 +802,10 @@ def embed_in_dct( if color_mode not in ("color", "grayscale"): color_mode = "color" + # Apply EXIF orientation to carrier image before embedding + # This ensures portrait photos are embedded in their correct visual orientation + carrier_image = _apply_exif_orientation(carrier_image) + if output_format == OUTPUT_FORMAT_JPEG and HAS_JPEGIO: return _embed_jpegio(data, carrier_image, seed, color_mode, progress_file) @@ -1173,24 +1216,259 @@ def _embed_jpegio( pass +def _jpegtran_available() -> bool: + """Check if jpegtran is available on the system.""" + import shutil + return shutil.which("jpegtran") is not None + + +def _jpegtran_rotate(image_data: bytes, rotation: int) -> bytes: + """ + Losslessly rotate a JPEG using jpegtran. + + This preserves DCT coefficients by rearranging blocks rather than + re-encoding. Essential for rotating stego images without destroying + the hidden data. + + Args: + image_data: JPEG image bytes + rotation: Degrees clockwise (90, 180, or 270) + + Returns: + Rotated JPEG bytes with DCT coefficients preserved + """ + import subprocess + import tempfile + import os + + if rotation not in (90, 180, 270): + raise ValueError(f"Invalid rotation: {rotation}") + + # Write input to temp file + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(image_data) + input_path = f.name + + output_path = tempfile.mktemp(suffix=".jpg") + + try: + # jpegtran -rotate 90|180|270 -copy all -perfect + # -copy all: preserve all metadata + # -perfect: fail if there are non-transformable edge blocks (rare) + result = subprocess.run( + ["jpegtran", "-rotate", str(rotation), "-copy", "all", "-perfect", + "-outfile", output_path, input_path], + capture_output=True, + timeout=30 + ) + + # If -perfect fails (edge blocks), retry without it + if result.returncode != 0: + result = subprocess.run( + ["jpegtran", "-rotate", str(rotation), "-copy", "all", "-trim", + "-outfile", output_path, input_path], + capture_output=True, + timeout=30 + ) + + if result.returncode != 0: + raise RuntimeError(f"jpegtran failed: {result.stderr.decode()}") + + with open(output_path, "rb") as f: + return f.read() + finally: + for path in [input_path, output_path]: + try: + os.unlink(path) + except OSError: + pass + + +def _rotate_image_bytes(image_data: bytes, rotation: int, lossless: bool = True) -> bytes: + """ + Rotate image by 90, 180, or 270 degrees and return as bytes. + + For JPEGs with lossless=True (default), uses jpegtran to preserve DCT + coefficients. This is essential for rotating stego images. + + For PNGs or when jpegtran is unavailable, uses PIL (which re-encodes + but PNGs are lossless anyway). + """ + img = Image.open(io.BytesIO(image_data)) + original_format = img.format or "PNG" + img.close() + + # Use jpegtran for lossless JPEG rotation + if lossless and original_format == "JPEG" and _jpegtran_available(): + return _jpegtran_rotate(image_data, rotation) + + # Fallback to PIL for PNGs or when jpegtran unavailable + img = Image.open(io.BytesIO(image_data)) + + # PIL rotation is counter-clockwise, we want clockwise + # 90 CW = 270 CCW, 180 = 180, 270 CW = 90 CCW + pil_rotation = {90: 270, 180: 180, 270: 90}[rotation] + rotated = img.rotate(pil_rotation, expand=True) + + output = io.BytesIO() + # Save in original format if possible, fallback to PNG + save_format = original_format if original_format in ("JPEG", "PNG") else "PNG" + if save_format == "JPEG": + rotated.save(output, format="JPEG", quality=95) + else: + rotated.save(output, format="PNG") + output.seek(0) + return output.getvalue() + + +def _quick_validate_dct_header(image_data: bytes, seed: bytes) -> bool: + """ + Quick validation that only extracts enough DCT data to check magic bytes. + Returns True if header looks valid, False otherwise. + + This is much faster than full extraction - only processes first ~8 blocks. + """ + try: + # Convert to grayscale for quick check + gray = _to_grayscale(image_data) + height, width = gray.shape + padded, _ = _pad_to_blocks(gray) + padded_h, padded_w = padded.shape + blocks_x = padded_w // BLOCK_SIZE + num_blocks = (padded_h // BLOCK_SIZE) * blocks_x + + # Generate block order + block_order = _generate_block_order(num_blocks, seed) + + # Only extract first 8 blocks (enough for RS length prefix + header) + # 8 blocks * 16 bits/block = 128 bits = 16 bytes (covers RS prefix) + blocks_needed = min(8, len(block_order)) + + all_bits = [] + for block_num in block_order[:blocks_needed]: + by = (block_num // blocks_x) * BLOCK_SIZE + bx = (block_num % blocks_x) * BLOCK_SIZE + block = padded[by : by + BLOCK_SIZE, bx : bx + BLOCK_SIZE].astype(np.float32) + + dct_block = dctn(block, norm="ortho") + + for row, col in EMBED_POSITIONS: + coef = dct_block[row, col] + bit = _extract_bit_from_coeff(coef) + all_bits.append(bit) + + # Check RS format first (3 copies of 8-byte length header) + if len(all_bits) >= RS_LENGTH_PREFIX_SIZE * 8: + length_prefix_bits = all_bits[: RS_LENGTH_PREFIX_SIZE * 8] + length_prefix_bytes = bytes( + [ + sum(length_prefix_bits[i * 8 : (i + 1) * 8][j] << (7 - j) for j in range(8)) + for i in range(RS_LENGTH_PREFIX_SIZE) + ] + ) + + # Check if 2+ copies match (indicates valid RS format) + copies = [] + for i in range(RS_LENGTH_COPIES): + start = i * RS_LENGTH_HEADER_SIZE + end = start + RS_LENGTH_HEADER_SIZE + copies.append(length_prefix_bytes[start:end]) + + from collections import Counter + counter = Counter(copies) + _, count = counter.most_common(1)[0] + + if count >= 2: + return True # Looks like valid RS format + + # Check legacy format (magic bytes in first 10 bytes) + if len(all_bits) >= HEADER_SIZE * 8: + try: + _parse_header(all_bits[: HEADER_SIZE * 8]) + return True # Magic bytes matched + except (ValueError, InvalidMagicBytesError): + pass + + return False + except Exception: + return False + + def extract_from_dct( stego_image: bytes, seed: bytes, progress_file: str | None = None, ) -> bytes: - """Extract data from DCT stego image.""" - img = Image.open(io.BytesIO(stego_image)) - fmt = img.format - img.close() + """ + Extract data from DCT stego image. - if fmt == "JPEG" and HAS_JPEGIO: + If extraction fails with InvalidMagicBytesError, automatically tries + 90°, 180°, and 270° rotations to handle images that were rotated after + encoding (e.g., by external tools or EXIF orientation changes). + + Uses quick header validation to skip obviously invalid rotations. + """ + rotations_to_try = [0, 90, 180, 270] + last_error = None + valid_rotations = [] + + # Phase 1: Quick validation to find candidate rotations + for rotation in rotations_to_try: + if rotation == 0: + image_to_check = stego_image + else: + image_to_check = _rotate_image_bytes(stego_image, rotation) + + if _quick_validate_dct_header(image_to_check, seed): + valid_rotations.append((rotation, image_to_check)) + + # If no rotations pass quick check, try all anyway (fallback) + if not valid_rotations: + # Must try all rotations - quick validation might have failed due to + # scipy vs jpegio differences or other edge cases + for rotation in rotations_to_try: + if rotation == 0: + valid_rotations.append((0, stego_image)) + else: + valid_rotations.append((rotation, _rotate_image_bytes(stego_image, rotation))) + + # Phase 2: Full extraction on valid candidates + for rotation, image_to_decode in valid_rotations: try: - return _extract_jpegio(stego_image, seed, progress_file) - except ValueError: - pass + img = Image.open(io.BytesIO(image_to_decode)) + fmt = img.format + img.close() - _check_scipy() - return _extract_scipy_dct_safe(stego_image, seed, progress_file) + if fmt == "JPEG" and HAS_JPEGIO: + try: + result = _extract_jpegio(image_to_decode, seed, progress_file) + if rotation != 0: + try: + from . import debug + debug.print(f"DCT decode succeeded after {rotation}° rotation") + except Exception: + pass # Don't let debug logging break extraction + return result + except (ValueError, InvalidMagicBytesError) as e: + last_error = e if isinstance(e, InvalidMagicBytesError) else last_error + continue + + _check_scipy() + result = _extract_scipy_dct_safe(image_to_decode, seed, progress_file) + if rotation != 0: + try: + from . import debug + debug.print(f"DCT decode succeeded after {rotation}° rotation") + except Exception: + pass # Don't let debug logging break extraction + return result + + except InvalidMagicBytesError as e: + last_error = e + continue + + # All rotations failed + raise last_error or InvalidMagicBytesError("Not a Stegasoo image (tried all rotations)") def _extract_scipy_dct_safe(