diff --git a/debug_jpegio.py b/debug_jpegio.py deleted file mode 100644 index 1ba9a3d..0000000 --- a/debug_jpegio.py +++ /dev/null @@ -1,215 +0,0 @@ -#!/usr/bin/env python3 -""" -Debug script for DCT/jpegio extraction issues. -Run from the stegasoo directory. -""" - -import sys -import struct -from pathlib import Path - -sys.path.insert(0, str(Path(__file__).parent / 'src')) - -import hashlib -import numpy as np - -# Check for jpegio -try: - import jpegio as jio - print("✓ jpegio available") -except ImportError: - print("✗ jpegio NOT available") - sys.exit(1) - -def get_usable_positions(coef_array, min_magnitude=2): - """Get positions of usable coefficients.""" - positions = [] - h, w = coef_array.shape - for row in range(h): - for col in range(w): - # Skip DC coefficients (top-left of each 8x8 block) - if (row % 8 == 0) and (col % 8 == 0): - continue - if abs(coef_array[row, col]) >= min_magnitude: - positions.append((row, col)) - return positions - -def generate_order(num_positions, seed): - """Generate pseudo-random order for coefficient selection.""" - hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest() - rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big')) - order = list(range(num_positions)) - rng.shuffle(order) - return order - -def extract_bits(coef_array, positions, order, num_bits): - """Extract bits from coefficients.""" - bits = [] - for i, pos_idx in enumerate(order): - if i >= num_bits: - break - row, col = positions[pos_idx] - coef = coef_array[row, col] - bits.append(coef & 1) - return bits - -def bits_to_bytes(bits): - """Convert list of bits to bytes.""" - result = [] - for i in range(0, len(bits), 8): - byte_bits = bits[i:i+8] - if len(byte_bits) == 8: - byte_val = sum(byte_bits[j] << (7-j) for j in range(8)) - result.append(byte_val) - return bytes(result) - -def main(): - if len(sys.argv) < 3: - print("Usage: python debug_jpegio.py ") - print("\nOptional: add passphrase, pin, key path") - print(" python debug_jpegio.py stego.jpg ref.jpg 'passphrase' '123456' key.pem") - sys.exit(1) - - stego_path = sys.argv[1] - ref_path = sys.argv[2] - passphrase = sys.argv[3] if len(sys.argv) > 3 else "test" - pin = sys.argv[4] if len(sys.argv) > 4 else "" - key_path = sys.argv[5] if len(sys.argv) > 5 else None - - print(f"\n{'='*60}") - print("JPEGIO DCT EXTRACTION DEBUG") - print(f"{'='*60}") - print(f"Stego image: {stego_path}") - print(f"Reference: {ref_path}") - print(f"Passphrase: '{passphrase}'") - print(f"PIN: '{pin}'") - print(f"Key: {key_path}") - - # Load stego image with jpegio - print(f"\n[1] Loading stego image with jpegio...") - try: - jpeg = jio.read(stego_path) - print(f" ✓ jpegio.read() succeeded") - print(f" Number of components: {len(jpeg.coef_arrays)}") - for i, arr in enumerate(jpeg.coef_arrays): - print(f" Component {i}: shape={arr.shape}, dtype={arr.dtype}") - except Exception as e: - print(f" ✗ Failed: {e}") - sys.exit(1) - - # Get coefficient array (channel 0) - coef_array = jpeg.coef_arrays[0] - print(f"\n[2] Coefficient array analysis...") - print(f" Shape: {coef_array.shape}") - print(f" Non-zero coefficients: {np.count_nonzero(coef_array)}") - print(f" Min value: {coef_array.min()}") - print(f" Max value: {coef_array.max()}") - - # Get usable positions - print(f"\n[3] Finding usable positions (|coef| >= 2, non-DC)...") - positions = get_usable_positions(coef_array) - print(f" Usable positions: {len(positions)}") - print(f" Capacity: ~{len(positions) // 8} bytes") - - # Generate seed (this needs to match the encode seed!) - print(f"\n[4] Generating seed...") - - # Load reference photo - ref_data = Path(ref_path).read_bytes() - ref_hash = hashlib.sha256(ref_data).digest() - print(f" Reference hash: {ref_hash[:8].hex()}...") - - # Load RSA key if provided - rsa_component = b"" - if key_path: - try: - from stegasoo import load_rsa_key - key_data = Path(key_path).read_bytes() - # Try without password first - try: - rsa_key = load_rsa_key(key_data, password=None) - except: - rsa_key = load_rsa_key(key_data, password="testpass") - - # Get public key bytes for seed - from cryptography.hazmat.primitives import serialization - pub_bytes = rsa_key.public_key().public_bytes( - encoding=serialization.Encoding.DER, - format=serialization.PublicFormat.SubjectPublicKeyInfo - ) - rsa_component = hashlib.sha256(pub_bytes).digest() - print(f" RSA key loaded, hash: {rsa_component[:8].hex()}...") - except Exception as e: - print(f" ✗ Could not load RSA key: {e}") - - # Build seed like stegasoo does - # This is the critical part - must match encoding! - seed_parts = [ - ref_hash, - passphrase.encode('utf-8'), - pin.encode('utf-8') if pin else b"", - rsa_component, - ] - seed = hashlib.sha256(b"".join(seed_parts)).digest() - print(f" Combined seed: {seed[:8].hex()}...") - - # Generate order - print(f"\n[5] Generating coefficient order...") - order = generate_order(len(positions), seed) - print(f" First 10 indices: {order[:10]}") - - # Try to extract header - print(f"\n[6] Extracting header (first 80 bits = 10 bytes)...") - HEADER_SIZE = 10 - header_bits = extract_bits(coef_array, positions, order, HEADER_SIZE * 8) - header_bytes = bits_to_bytes(header_bits) - print(f" Raw header bytes: {header_bytes.hex()}") - print(f" As ASCII (if printable): {repr(header_bytes)}") - - # Check for JPGS magic - JPEGIO_MAGIC = b'JPGS' - if header_bytes[:4] == JPEGIO_MAGIC: - print(f" ✓ Found JPEGIO magic bytes!") - version = header_bytes[4] - flags = header_bytes[5] - data_length = struct.unpack('>I', header_bytes[6:10])[0] - print(f" Version: {version}") - print(f" Flags: {flags}") - print(f" Data length: {data_length} bytes") - - if data_length > 0 and data_length < len(positions) // 8: - print(f"\n[7] Extracting payload ({data_length} bytes)...") - total_bits = (HEADER_SIZE + data_length) * 8 - all_bits = extract_bits(coef_array, positions, order, total_bits) - data_bits = all_bits[HEADER_SIZE * 8:] - payload = bits_to_bytes(data_bits) - print(f" Payload (first 64 bytes): {payload[:64].hex()}") - print(f" This should be encrypted data starting with salt/IV") - else: - print(f" ✗ Invalid data length: {data_length}") - else: - print(f" ✗ No JPEGIO magic found") - print(f" Expected: {JPEGIO_MAGIC.hex()} ('JPGS')") - print(f" Got: {header_bytes[:4].hex()} ('{header_bytes[:4]}')") - - # Try alternate interpretations - print(f"\n[7] Trying alternate header interpretations...") - - # Maybe it's scipy DCT format? - DCT_MAGIC = b'DCTS' - if header_bytes[:4] == DCT_MAGIC: - print(f" Found SCIPY DCT magic - wrong extraction method!") - else: - # Show bit distribution - print(f" First 32 extracted bits: {header_bits[:32]}") - - # Check if bits look random or patterned - ones = sum(header_bits[:80]) - print(f" Bit distribution: {ones}/80 ones ({100*ones/80:.1f}%)") - - print(f"\n{'='*60}") - print("DEBUG COMPLETE") - print(f"{'='*60}\n") - -if __name__ == '__main__': - main() diff --git a/test_compare_capacity_flow.py b/test_compare_capacity_flow.py deleted file mode 100644 index 28bbab3..0000000 --- a/test_compare_capacity_flow.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env python3 -""" -Test that mimics the exact /api/compare-capacity flow. -Run with: python test_compare_capacity_flow.py ./xx_2.jpg -""" - -import sys -import io -import gc -import json -import time - -print("=" * 60) -print("COMPARE-CAPACITY FLOW TEST") -print("=" * 60) - -if len(sys.argv) < 2: - print("Usage: python test_compare_capacity_flow.py ") - sys.exit(1) - -image_path = sys.argv[1] - -# Read the file -with open(image_path, 'rb') as f: - carrier_data = f.read() -print(f"Loaded {len(carrier_data)} bytes from {image_path}") - -# Import everything like Flask does -print("\n[1] Importing modules...") -from PIL import Image -import numpy as np - -try: - import jpegio as jio - HAS_JPEGIO = True - print(f" jpegio: available") -except ImportError: - HAS_JPEGIO = False - print(f" jpegio: NOT available") - -try: - from scipy.fft import dct, idct - print(f" scipy.fft: available") -except ImportError: - from scipy.fftpack import dct, idct - print(f" scipy.fftpack: available (fallback)") - -print(" Imports complete") - -# Simulate the compare_modes function -print("\n[2] Opening image (1st time - for dimensions)...") -img1 = Image.open(io.BytesIO(carrier_data)) -width, height = img1.size -print(f" Size: {width}x{height}") -img1.close() -print(" Closed img1") -gc.collect() - -print("\n[3] Opening image (2nd time - for LSB capacity)...") -img2 = Image.open(io.BytesIO(carrier_data)) -num_pixels = img2.size[0] * img2.size[1] -lsb_bytes = (num_pixels * 3) // 8 - 69 -print(f" LSB capacity: {lsb_bytes} bytes") -img2.close() -print(" Closed img2") -gc.collect() - -print("\n[4] Opening image (3rd time - for DCT capacity)...") -img3 = Image.open(io.BytesIO(carrier_data)) -w, h = img3.size -blocks_x = w // 8 -blocks_y = h // 8 -total_blocks = blocks_x * blocks_y -dct_bits = total_blocks * 16 -dct_bytes = dct_bits // 8 - 10 -print(f" DCT capacity: {dct_bytes} bytes ({total_blocks} blocks)") -img3.close() -print(" Closed img3") -gc.collect() - -print("\n[5] Building response dict...") -response = { - 'success': True, - 'width': width, - 'height': height, - 'lsb': { - 'capacity_bytes': lsb_bytes, - 'capacity_kb': round(lsb_bytes / 1024, 1), - 'output': 'PNG', - }, - 'dct': { - 'capacity_bytes': dct_bytes, - 'capacity_kb': round(dct_bytes / 1024, 1), - 'output': 'JPEG', - 'available': True, - 'ratio': round(dct_bytes / lsb_bytes * 100, 1), - } -} -print(f" Response built") - -print("\n[6] Serializing to JSON...") -json_str = json.dumps(response) -print(f" JSON length: {len(json_str)} bytes") -print(f" Content: {json_str[:200]}...") - -print("\n[7] Simulating Flask response completion...") -# In Flask, after the response is sent, Python may garbage collect -del carrier_data -del response -del json_str -gc.collect() -print(" GC after response simulation") - -print("\n[8] Additional cleanup (simulating request end)...") -gc.collect() -gc.collect() -print(" Multiple GC cycles complete") - -print("\n[9] Waiting for delayed crash...") -for i in range(3): - time.sleep(1) - print(f" {i+1}s...") - gc.collect() - -print("\n" + "=" * 60) -print("TEST PASSED - No crash detected") -print("=" * 60) - -# Now test with jpegio if available -if HAS_JPEGIO: - print("\n" + "=" * 60) - print("JPEGIO SPECIFIC TEST") - print("=" * 60) - - import tempfile - import os - - # Reload image data - with open(image_path, 'rb') as f: - carrier_data = f.read() - - print("\n[J1] Checking if image is JPEG...") - img = Image.open(io.BytesIO(carrier_data)) - is_jpeg = img.format == 'JPEG' - img.close() - print(f" Is JPEG: {is_jpeg}") - - if is_jpeg: - print("\n[J2] Writing to temp file...") - fd, temp_path = tempfile.mkstemp(suffix='.jpg') - os.write(fd, carrier_data) - os.close(fd) - print(f" Temp file: {temp_path}") - - print("\n[J3] Reading with jpegio...") - try: - jpeg = jio.read(temp_path) - print(f" jpegio.read() OK") - - print("\n[J4] Accessing coefficient arrays...") - coef = jpeg.coef_arrays[0] - print(f" Coef shape: {coef.shape}, dtype: {coef.dtype}") - - print("\n[J5] Counting usable positions...") - positions = [] - h, w = coef.shape - for row in range(h): - for col in range(w): - if (row % 8 == 0) and (col % 8 == 0): - continue - if abs(coef[row, col]) >= 2: - positions.append((row, col)) - print(f" Usable positions: {len(positions)}") - - print("\n[J6] Cleaning up jpegio object...") - del coef - del jpeg - gc.collect() - print(" Deleted jpeg object") - - print("\n[J7] Removing temp file...") - os.unlink(temp_path) - print(" Temp file removed") - - gc.collect() - print("\n[J8] Final GC...") - - except Exception as e: - print(f" ERROR: {e}") - import traceback - traceback.print_exc() - - print("\n[J9] Waiting for delayed crash...") - for i in range(3): - time.sleep(1) - print(f" {i+1}s...") - gc.collect() - - print("\n" + "=" * 60) - print("JPEGIO TEST PASSED - No crash detected") - print("=" * 60) - else: - print(" Skipping jpegio test (not a JPEG)") - -print("\n\nAll tests completed successfully!") diff --git a/test_dct_crash.py b/test_dct_crash.py deleted file mode 100644 index 16a3362..0000000 --- a/test_dct_crash.py +++ /dev/null @@ -1,231 +0,0 @@ -#!/usr/bin/env python3 -""" -Standalone DCT crash diagnostic script. -Run this outside of Flask to isolate the issue. - -Usage: - python test_dct_crash.py /path/to/your/large_image.jpg -""" - -import sys -import gc -import traceback -import io - -print("=" * 60) -print("DCT CRASH DIAGNOSTIC TOOL") -print("=" * 60) - -# Step 1: Check Python and library versions -print("\n[1] ENVIRONMENT INFO") -print(f"Python: {sys.version}") - -try: - import numpy as np - print(f"NumPy: {np.__version__}") -except ImportError as e: - print(f"NumPy: NOT INSTALLED - {e}") - sys.exit(1) - -try: - import scipy - print(f"SciPy: {scipy.__version__}") -except ImportError as e: - print(f"SciPy: NOT INSTALLED - {e}") - sys.exit(1) - -try: - from PIL import Image - import PIL - print(f"Pillow: {PIL.__version__}") -except ImportError as e: - print(f"Pillow: NOT INSTALLED - {e}") - sys.exit(1) - -# Step 2: Check which DCT module we're using -print("\n[2] DCT MODULE CHECK") -try: - from scipy.fft import dct, idct - print("Using: scipy.fft (preferred)") - DCT_MODULE = "scipy.fft" -except ImportError: - try: - from scipy.fftpack import dct, idct - print("Using: scipy.fftpack (legacy)") - DCT_MODULE = "scipy.fftpack" - except ImportError: - print("ERROR: No DCT module available!") - sys.exit(1) - -# Step 3: Test basic DCT on small array -print("\n[3] BASIC DCT TEST (8x8 block)") -try: - test_block = np.random.rand(8, 8).astype(np.float64) - - # 1D DCT on rows - result = dct(test_block[0, :], norm='ortho') - print(f" 1D DCT: OK (output shape: {result.shape})") - - # 1D IDCT - recovered = idct(result, norm='ortho') - error = np.max(np.abs(test_block[0, :] - recovered)) - print(f" 1D IDCT: OK (roundtrip error: {error:.2e})") - - # 2D via separable - temp = np.zeros_like(test_block) - for i in range(8): - temp[:, i] = dct(test_block[:, i], norm='ortho') - result2d = np.zeros_like(temp) - for i in range(8): - result2d[i, :] = dct(temp[i, :], norm='ortho') - print(f" 2D DCT: OK") - - gc.collect() - print(" GC after basic test: OK") - -except Exception as e: - print(f" FAILED: {e}") - traceback.print_exc() - -# Step 4: Test with larger arrays (stress test) -print("\n[4] STRESS TEST (many 8x8 blocks)") -try: - NUM_BLOCKS = 10000 - print(f" Processing {NUM_BLOCKS} blocks...") - - for i in range(NUM_BLOCKS): - block = np.random.rand(8, 8).astype(np.float64) - - # Forward DCT - temp = np.zeros_like(block) - for j in range(8): - temp[:, j] = dct(block[:, j], norm='ortho') - result = np.zeros_like(temp) - for j in range(8): - result[j, :] = dct(temp[j, :], norm='ortho') - - # Inverse DCT - temp2 = np.zeros_like(result) - for j in range(8): - temp2[j, :] = idct(result[j, :], norm='ortho') - recovered = np.zeros_like(temp2) - for j in range(8): - recovered[:, j] = idct(temp2[:, j], norm='ortho') - - if i % 1000 == 0: - gc.collect() - print(f" {i}/{NUM_BLOCKS} blocks processed...") - - gc.collect() - print(f" Stress test PASSED") - -except Exception as e: - print(f" FAILED at block {i}: {e}") - traceback.print_exc() - -# Step 5: Test with actual image if provided -if len(sys.argv) > 1: - image_path = sys.argv[1] - print(f"\n[5] IMAGE TEST: {image_path}") - - try: - with open(image_path, 'rb') as f: - image_data = f.read() - print(f" File size: {len(image_data) / 1024 / 1024:.2f} MB") - - img = Image.open(io.BytesIO(image_data)) - width, height = img.size - print(f" Dimensions: {width}x{height}") - print(f" Format: {img.format}") - print(f" Mode: {img.mode}") - - # Convert to grayscale float array - gray = img.convert('L') - arr = np.array(gray, dtype=np.float64) - img.close() - gray.close() - print(f" Array shape: {arr.shape}") - print(f" Array dtype: {arr.dtype}") - - # Pad to block boundary - BLOCK_SIZE = 8 - h, w = arr.shape - new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE - new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE - - if new_h != h or new_w != w: - padded = np.zeros((new_h, new_w), dtype=np.float64) - padded[:h, :w] = arr - arr = padded - print(f" Padded to: {arr.shape}") - - blocks_y = arr.shape[0] // BLOCK_SIZE - blocks_x = arr.shape[1] // BLOCK_SIZE - total_blocks = blocks_y * blocks_x - print(f" Total 8x8 blocks: {total_blocks}") - - # Process ALL blocks - print(f" Processing all blocks with DCT...") - - processed = 0 - for by in range(blocks_y): - for bx in range(blocks_x): - y = by * BLOCK_SIZE - x = bx * BLOCK_SIZE - - block = arr[y:y+BLOCK_SIZE, x:x+BLOCK_SIZE].copy() - - # Forward DCT - temp = np.zeros((8, 8), dtype=np.float64) - for i in range(8): - temp[:, i] = dct(block[:, i], norm='ortho') - dct_block = np.zeros((8, 8), dtype=np.float64) - for i in range(8): - dct_block[i, :] = dct(temp[i, :], norm='ortho') - - # Inverse DCT - temp2 = np.zeros((8, 8), dtype=np.float64) - for i in range(8): - temp2[i, :] = idct(dct_block[i, :], norm='ortho') - recovered = np.zeros((8, 8), dtype=np.float64) - for i in range(8): - recovered[:, i] = idct(temp2[:, i], norm='ortho') - - processed += 1 - - # GC after each row of blocks - if by % 50 == 0: - gc.collect() - print(f" Row {by}/{blocks_y} ({processed}/{total_blocks} blocks)") - - gc.collect() - print(f" Image DCT test PASSED ({processed} blocks)") - - except Exception as e: - print(f" FAILED: {e}") - traceback.print_exc() - -else: - print("\n[5] IMAGE TEST: Skipped (no image path provided)") - print(" Usage: python test_dct_crash.py /path/to/image.jpg") - -# Step 6: Final cleanup test -print("\n[6] FINAL CLEANUP TEST") -try: - gc.collect() - gc.collect() - gc.collect() - print(" Multiple GC cycles: OK") -except Exception as e: - print(f" FAILED: {e}") - -print("\n" + "=" * 60) -print("If this script completes without 'free(): invalid size',") -print("the issue is likely in PIL/jpegio interaction, not scipy DCT.") -print("=" * 60) - -# Keep process alive briefly to catch delayed crashes -import time -print("\nWaiting 2 seconds for delayed crashes...") -time.sleep(2) -print("Done - no crash detected!")