Remove obsolete debug/diagnostic scripts
Deleted one-off debugging scripts that are no longer needed: - debug_jpegio.py - DCT/jpegio extraction debugger - test_compare_capacity_flow.py - API flow crash diagnostic - test_dct_crash.py - DCT crash diagnostic tool 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
215
debug_jpegio.py
215
debug_jpegio.py
@@ -1,215 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Debug script for DCT/jpegio extraction issues.
|
|
||||||
Run from the stegasoo directory.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import struct
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
sys.path.insert(0, str(Path(__file__).parent / 'src'))
|
|
||||||
|
|
||||||
import hashlib
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
# Check for jpegio
|
|
||||||
try:
|
|
||||||
import jpegio as jio
|
|
||||||
print("✓ jpegio available")
|
|
||||||
except ImportError:
|
|
||||||
print("✗ jpegio NOT available")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def get_usable_positions(coef_array, min_magnitude=2):
|
|
||||||
"""Get positions of usable coefficients."""
|
|
||||||
positions = []
|
|
||||||
h, w = coef_array.shape
|
|
||||||
for row in range(h):
|
|
||||||
for col in range(w):
|
|
||||||
# Skip DC coefficients (top-left of each 8x8 block)
|
|
||||||
if (row % 8 == 0) and (col % 8 == 0):
|
|
||||||
continue
|
|
||||||
if abs(coef_array[row, col]) >= min_magnitude:
|
|
||||||
positions.append((row, col))
|
|
||||||
return positions
|
|
||||||
|
|
||||||
def generate_order(num_positions, seed):
|
|
||||||
"""Generate pseudo-random order for coefficient selection."""
|
|
||||||
hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest()
|
|
||||||
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
|
|
||||||
order = list(range(num_positions))
|
|
||||||
rng.shuffle(order)
|
|
||||||
return order
|
|
||||||
|
|
||||||
def extract_bits(coef_array, positions, order, num_bits):
|
|
||||||
"""Extract bits from coefficients."""
|
|
||||||
bits = []
|
|
||||||
for i, pos_idx in enumerate(order):
|
|
||||||
if i >= num_bits:
|
|
||||||
break
|
|
||||||
row, col = positions[pos_idx]
|
|
||||||
coef = coef_array[row, col]
|
|
||||||
bits.append(coef & 1)
|
|
||||||
return bits
|
|
||||||
|
|
||||||
def bits_to_bytes(bits):
|
|
||||||
"""Convert list of bits to bytes."""
|
|
||||||
result = []
|
|
||||||
for i in range(0, len(bits), 8):
|
|
||||||
byte_bits = bits[i:i+8]
|
|
||||||
if len(byte_bits) == 8:
|
|
||||||
byte_val = sum(byte_bits[j] << (7-j) for j in range(8))
|
|
||||||
result.append(byte_val)
|
|
||||||
return bytes(result)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) < 3:
|
|
||||||
print("Usage: python debug_jpegio.py <stego_image.jpg> <reference_photo>")
|
|
||||||
print("\nOptional: add passphrase, pin, key path")
|
|
||||||
print(" python debug_jpegio.py stego.jpg ref.jpg 'passphrase' '123456' key.pem")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
stego_path = sys.argv[1]
|
|
||||||
ref_path = sys.argv[2]
|
|
||||||
passphrase = sys.argv[3] if len(sys.argv) > 3 else "test"
|
|
||||||
pin = sys.argv[4] if len(sys.argv) > 4 else ""
|
|
||||||
key_path = sys.argv[5] if len(sys.argv) > 5 else None
|
|
||||||
|
|
||||||
print(f"\n{'='*60}")
|
|
||||||
print("JPEGIO DCT EXTRACTION DEBUG")
|
|
||||||
print(f"{'='*60}")
|
|
||||||
print(f"Stego image: {stego_path}")
|
|
||||||
print(f"Reference: {ref_path}")
|
|
||||||
print(f"Passphrase: '{passphrase}'")
|
|
||||||
print(f"PIN: '{pin}'")
|
|
||||||
print(f"Key: {key_path}")
|
|
||||||
|
|
||||||
# Load stego image with jpegio
|
|
||||||
print(f"\n[1] Loading stego image with jpegio...")
|
|
||||||
try:
|
|
||||||
jpeg = jio.read(stego_path)
|
|
||||||
print(f" ✓ jpegio.read() succeeded")
|
|
||||||
print(f" Number of components: {len(jpeg.coef_arrays)}")
|
|
||||||
for i, arr in enumerate(jpeg.coef_arrays):
|
|
||||||
print(f" Component {i}: shape={arr.shape}, dtype={arr.dtype}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ✗ Failed: {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Get coefficient array (channel 0)
|
|
||||||
coef_array = jpeg.coef_arrays[0]
|
|
||||||
print(f"\n[2] Coefficient array analysis...")
|
|
||||||
print(f" Shape: {coef_array.shape}")
|
|
||||||
print(f" Non-zero coefficients: {np.count_nonzero(coef_array)}")
|
|
||||||
print(f" Min value: {coef_array.min()}")
|
|
||||||
print(f" Max value: {coef_array.max()}")
|
|
||||||
|
|
||||||
# Get usable positions
|
|
||||||
print(f"\n[3] Finding usable positions (|coef| >= 2, non-DC)...")
|
|
||||||
positions = get_usable_positions(coef_array)
|
|
||||||
print(f" Usable positions: {len(positions)}")
|
|
||||||
print(f" Capacity: ~{len(positions) // 8} bytes")
|
|
||||||
|
|
||||||
# Generate seed (this needs to match the encode seed!)
|
|
||||||
print(f"\n[4] Generating seed...")
|
|
||||||
|
|
||||||
# Load reference photo
|
|
||||||
ref_data = Path(ref_path).read_bytes()
|
|
||||||
ref_hash = hashlib.sha256(ref_data).digest()
|
|
||||||
print(f" Reference hash: {ref_hash[:8].hex()}...")
|
|
||||||
|
|
||||||
# Load RSA key if provided
|
|
||||||
rsa_component = b""
|
|
||||||
if key_path:
|
|
||||||
try:
|
|
||||||
from stegasoo import load_rsa_key
|
|
||||||
key_data = Path(key_path).read_bytes()
|
|
||||||
# Try without password first
|
|
||||||
try:
|
|
||||||
rsa_key = load_rsa_key(key_data, password=None)
|
|
||||||
except:
|
|
||||||
rsa_key = load_rsa_key(key_data, password="testpass")
|
|
||||||
|
|
||||||
# Get public key bytes for seed
|
|
||||||
from cryptography.hazmat.primitives import serialization
|
|
||||||
pub_bytes = rsa_key.public_key().public_bytes(
|
|
||||||
encoding=serialization.Encoding.DER,
|
|
||||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
||||||
)
|
|
||||||
rsa_component = hashlib.sha256(pub_bytes).digest()
|
|
||||||
print(f" RSA key loaded, hash: {rsa_component[:8].hex()}...")
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ✗ Could not load RSA key: {e}")
|
|
||||||
|
|
||||||
# Build seed like stegasoo does
|
|
||||||
# This is the critical part - must match encoding!
|
|
||||||
seed_parts = [
|
|
||||||
ref_hash,
|
|
||||||
passphrase.encode('utf-8'),
|
|
||||||
pin.encode('utf-8') if pin else b"",
|
|
||||||
rsa_component,
|
|
||||||
]
|
|
||||||
seed = hashlib.sha256(b"".join(seed_parts)).digest()
|
|
||||||
print(f" Combined seed: {seed[:8].hex()}...")
|
|
||||||
|
|
||||||
# Generate order
|
|
||||||
print(f"\n[5] Generating coefficient order...")
|
|
||||||
order = generate_order(len(positions), seed)
|
|
||||||
print(f" First 10 indices: {order[:10]}")
|
|
||||||
|
|
||||||
# Try to extract header
|
|
||||||
print(f"\n[6] Extracting header (first 80 bits = 10 bytes)...")
|
|
||||||
HEADER_SIZE = 10
|
|
||||||
header_bits = extract_bits(coef_array, positions, order, HEADER_SIZE * 8)
|
|
||||||
header_bytes = bits_to_bytes(header_bits)
|
|
||||||
print(f" Raw header bytes: {header_bytes.hex()}")
|
|
||||||
print(f" As ASCII (if printable): {repr(header_bytes)}")
|
|
||||||
|
|
||||||
# Check for JPGS magic
|
|
||||||
JPEGIO_MAGIC = b'JPGS'
|
|
||||||
if header_bytes[:4] == JPEGIO_MAGIC:
|
|
||||||
print(f" ✓ Found JPEGIO magic bytes!")
|
|
||||||
version = header_bytes[4]
|
|
||||||
flags = header_bytes[5]
|
|
||||||
data_length = struct.unpack('>I', header_bytes[6:10])[0]
|
|
||||||
print(f" Version: {version}")
|
|
||||||
print(f" Flags: {flags}")
|
|
||||||
print(f" Data length: {data_length} bytes")
|
|
||||||
|
|
||||||
if data_length > 0 and data_length < len(positions) // 8:
|
|
||||||
print(f"\n[7] Extracting payload ({data_length} bytes)...")
|
|
||||||
total_bits = (HEADER_SIZE + data_length) * 8
|
|
||||||
all_bits = extract_bits(coef_array, positions, order, total_bits)
|
|
||||||
data_bits = all_bits[HEADER_SIZE * 8:]
|
|
||||||
payload = bits_to_bytes(data_bits)
|
|
||||||
print(f" Payload (first 64 bytes): {payload[:64].hex()}")
|
|
||||||
print(f" This should be encrypted data starting with salt/IV")
|
|
||||||
else:
|
|
||||||
print(f" ✗ Invalid data length: {data_length}")
|
|
||||||
else:
|
|
||||||
print(f" ✗ No JPEGIO magic found")
|
|
||||||
print(f" Expected: {JPEGIO_MAGIC.hex()} ('JPGS')")
|
|
||||||
print(f" Got: {header_bytes[:4].hex()} ('{header_bytes[:4]}')")
|
|
||||||
|
|
||||||
# Try alternate interpretations
|
|
||||||
print(f"\n[7] Trying alternate header interpretations...")
|
|
||||||
|
|
||||||
# Maybe it's scipy DCT format?
|
|
||||||
DCT_MAGIC = b'DCTS'
|
|
||||||
if header_bytes[:4] == DCT_MAGIC:
|
|
||||||
print(f" Found SCIPY DCT magic - wrong extraction method!")
|
|
||||||
else:
|
|
||||||
# Show bit distribution
|
|
||||||
print(f" First 32 extracted bits: {header_bits[:32]}")
|
|
||||||
|
|
||||||
# Check if bits look random or patterned
|
|
||||||
ones = sum(header_bits[:80])
|
|
||||||
print(f" Bit distribution: {ones}/80 ones ({100*ones/80:.1f}%)")
|
|
||||||
|
|
||||||
print(f"\n{'='*60}")
|
|
||||||
print("DEBUG COMPLETE")
|
|
||||||
print(f"{'='*60}\n")
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
@@ -1,205 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Test that mimics the exact /api/compare-capacity flow.
|
|
||||||
Run with: python test_compare_capacity_flow.py ./xx_2.jpg
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import io
|
|
||||||
import gc
|
|
||||||
import json
|
|
||||||
import time
|
|
||||||
|
|
||||||
print("=" * 60)
|
|
||||||
print("COMPARE-CAPACITY FLOW TEST")
|
|
||||||
print("=" * 60)
|
|
||||||
|
|
||||||
if len(sys.argv) < 2:
|
|
||||||
print("Usage: python test_compare_capacity_flow.py <image_path>")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
image_path = sys.argv[1]
|
|
||||||
|
|
||||||
# Read the file
|
|
||||||
with open(image_path, 'rb') as f:
|
|
||||||
carrier_data = f.read()
|
|
||||||
print(f"Loaded {len(carrier_data)} bytes from {image_path}")
|
|
||||||
|
|
||||||
# Import everything like Flask does
|
|
||||||
print("\n[1] Importing modules...")
|
|
||||||
from PIL import Image
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
try:
|
|
||||||
import jpegio as jio
|
|
||||||
HAS_JPEGIO = True
|
|
||||||
print(f" jpegio: available")
|
|
||||||
except ImportError:
|
|
||||||
HAS_JPEGIO = False
|
|
||||||
print(f" jpegio: NOT available")
|
|
||||||
|
|
||||||
try:
|
|
||||||
from scipy.fft import dct, idct
|
|
||||||
print(f" scipy.fft: available")
|
|
||||||
except ImportError:
|
|
||||||
from scipy.fftpack import dct, idct
|
|
||||||
print(f" scipy.fftpack: available (fallback)")
|
|
||||||
|
|
||||||
print(" Imports complete")
|
|
||||||
|
|
||||||
# Simulate the compare_modes function
|
|
||||||
print("\n[2] Opening image (1st time - for dimensions)...")
|
|
||||||
img1 = Image.open(io.BytesIO(carrier_data))
|
|
||||||
width, height = img1.size
|
|
||||||
print(f" Size: {width}x{height}")
|
|
||||||
img1.close()
|
|
||||||
print(" Closed img1")
|
|
||||||
gc.collect()
|
|
||||||
|
|
||||||
print("\n[3] Opening image (2nd time - for LSB capacity)...")
|
|
||||||
img2 = Image.open(io.BytesIO(carrier_data))
|
|
||||||
num_pixels = img2.size[0] * img2.size[1]
|
|
||||||
lsb_bytes = (num_pixels * 3) // 8 - 69
|
|
||||||
print(f" LSB capacity: {lsb_bytes} bytes")
|
|
||||||
img2.close()
|
|
||||||
print(" Closed img2")
|
|
||||||
gc.collect()
|
|
||||||
|
|
||||||
print("\n[4] Opening image (3rd time - for DCT capacity)...")
|
|
||||||
img3 = Image.open(io.BytesIO(carrier_data))
|
|
||||||
w, h = img3.size
|
|
||||||
blocks_x = w // 8
|
|
||||||
blocks_y = h // 8
|
|
||||||
total_blocks = blocks_x * blocks_y
|
|
||||||
dct_bits = total_blocks * 16
|
|
||||||
dct_bytes = dct_bits // 8 - 10
|
|
||||||
print(f" DCT capacity: {dct_bytes} bytes ({total_blocks} blocks)")
|
|
||||||
img3.close()
|
|
||||||
print(" Closed img3")
|
|
||||||
gc.collect()
|
|
||||||
|
|
||||||
print("\n[5] Building response dict...")
|
|
||||||
response = {
|
|
||||||
'success': True,
|
|
||||||
'width': width,
|
|
||||||
'height': height,
|
|
||||||
'lsb': {
|
|
||||||
'capacity_bytes': lsb_bytes,
|
|
||||||
'capacity_kb': round(lsb_bytes / 1024, 1),
|
|
||||||
'output': 'PNG',
|
|
||||||
},
|
|
||||||
'dct': {
|
|
||||||
'capacity_bytes': dct_bytes,
|
|
||||||
'capacity_kb': round(dct_bytes / 1024, 1),
|
|
||||||
'output': 'JPEG',
|
|
||||||
'available': True,
|
|
||||||
'ratio': round(dct_bytes / lsb_bytes * 100, 1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
print(f" Response built")
|
|
||||||
|
|
||||||
print("\n[6] Serializing to JSON...")
|
|
||||||
json_str = json.dumps(response)
|
|
||||||
print(f" JSON length: {len(json_str)} bytes")
|
|
||||||
print(f" Content: {json_str[:200]}...")
|
|
||||||
|
|
||||||
print("\n[7] Simulating Flask response completion...")
|
|
||||||
# In Flask, after the response is sent, Python may garbage collect
|
|
||||||
del carrier_data
|
|
||||||
del response
|
|
||||||
del json_str
|
|
||||||
gc.collect()
|
|
||||||
print(" GC after response simulation")
|
|
||||||
|
|
||||||
print("\n[8] Additional cleanup (simulating request end)...")
|
|
||||||
gc.collect()
|
|
||||||
gc.collect()
|
|
||||||
print(" Multiple GC cycles complete")
|
|
||||||
|
|
||||||
print("\n[9] Waiting for delayed crash...")
|
|
||||||
for i in range(3):
|
|
||||||
time.sleep(1)
|
|
||||||
print(f" {i+1}s...")
|
|
||||||
gc.collect()
|
|
||||||
|
|
||||||
print("\n" + "=" * 60)
|
|
||||||
print("TEST PASSED - No crash detected")
|
|
||||||
print("=" * 60)
|
|
||||||
|
|
||||||
# Now test with jpegio if available
|
|
||||||
if HAS_JPEGIO:
|
|
||||||
print("\n" + "=" * 60)
|
|
||||||
print("JPEGIO SPECIFIC TEST")
|
|
||||||
print("=" * 60)
|
|
||||||
|
|
||||||
import tempfile
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Reload image data
|
|
||||||
with open(image_path, 'rb') as f:
|
|
||||||
carrier_data = f.read()
|
|
||||||
|
|
||||||
print("\n[J1] Checking if image is JPEG...")
|
|
||||||
img = Image.open(io.BytesIO(carrier_data))
|
|
||||||
is_jpeg = img.format == 'JPEG'
|
|
||||||
img.close()
|
|
||||||
print(f" Is JPEG: {is_jpeg}")
|
|
||||||
|
|
||||||
if is_jpeg:
|
|
||||||
print("\n[J2] Writing to temp file...")
|
|
||||||
fd, temp_path = tempfile.mkstemp(suffix='.jpg')
|
|
||||||
os.write(fd, carrier_data)
|
|
||||||
os.close(fd)
|
|
||||||
print(f" Temp file: {temp_path}")
|
|
||||||
|
|
||||||
print("\n[J3] Reading with jpegio...")
|
|
||||||
try:
|
|
||||||
jpeg = jio.read(temp_path)
|
|
||||||
print(f" jpegio.read() OK")
|
|
||||||
|
|
||||||
print("\n[J4] Accessing coefficient arrays...")
|
|
||||||
coef = jpeg.coef_arrays[0]
|
|
||||||
print(f" Coef shape: {coef.shape}, dtype: {coef.dtype}")
|
|
||||||
|
|
||||||
print("\n[J5] Counting usable positions...")
|
|
||||||
positions = []
|
|
||||||
h, w = coef.shape
|
|
||||||
for row in range(h):
|
|
||||||
for col in range(w):
|
|
||||||
if (row % 8 == 0) and (col % 8 == 0):
|
|
||||||
continue
|
|
||||||
if abs(coef[row, col]) >= 2:
|
|
||||||
positions.append((row, col))
|
|
||||||
print(f" Usable positions: {len(positions)}")
|
|
||||||
|
|
||||||
print("\n[J6] Cleaning up jpegio object...")
|
|
||||||
del coef
|
|
||||||
del jpeg
|
|
||||||
gc.collect()
|
|
||||||
print(" Deleted jpeg object")
|
|
||||||
|
|
||||||
print("\n[J7] Removing temp file...")
|
|
||||||
os.unlink(temp_path)
|
|
||||||
print(" Temp file removed")
|
|
||||||
|
|
||||||
gc.collect()
|
|
||||||
print("\n[J8] Final GC...")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ERROR: {e}")
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
print("\n[J9] Waiting for delayed crash...")
|
|
||||||
for i in range(3):
|
|
||||||
time.sleep(1)
|
|
||||||
print(f" {i+1}s...")
|
|
||||||
gc.collect()
|
|
||||||
|
|
||||||
print("\n" + "=" * 60)
|
|
||||||
print("JPEGIO TEST PASSED - No crash detected")
|
|
||||||
print("=" * 60)
|
|
||||||
else:
|
|
||||||
print(" Skipping jpegio test (not a JPEG)")
|
|
||||||
|
|
||||||
print("\n\nAll tests completed successfully!")
|
|
||||||
@@ -1,231 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Standalone DCT crash diagnostic script.
|
|
||||||
Run this outside of Flask to isolate the issue.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
python test_dct_crash.py /path/to/your/large_image.jpg
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import gc
|
|
||||||
import traceback
|
|
||||||
import io
|
|
||||||
|
|
||||||
print("=" * 60)
|
|
||||||
print("DCT CRASH DIAGNOSTIC TOOL")
|
|
||||||
print("=" * 60)
|
|
||||||
|
|
||||||
# Step 1: Check Python and library versions
|
|
||||||
print("\n[1] ENVIRONMENT INFO")
|
|
||||||
print(f"Python: {sys.version}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
import numpy as np
|
|
||||||
print(f"NumPy: {np.__version__}")
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"NumPy: NOT INSTALLED - {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
import scipy
|
|
||||||
print(f"SciPy: {scipy.__version__}")
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"SciPy: NOT INSTALLED - {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
from PIL import Image
|
|
||||||
import PIL
|
|
||||||
print(f"Pillow: {PIL.__version__}")
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"Pillow: NOT INSTALLED - {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Step 2: Check which DCT module we're using
|
|
||||||
print("\n[2] DCT MODULE CHECK")
|
|
||||||
try:
|
|
||||||
from scipy.fft import dct, idct
|
|
||||||
print("Using: scipy.fft (preferred)")
|
|
||||||
DCT_MODULE = "scipy.fft"
|
|
||||||
except ImportError:
|
|
||||||
try:
|
|
||||||
from scipy.fftpack import dct, idct
|
|
||||||
print("Using: scipy.fftpack (legacy)")
|
|
||||||
DCT_MODULE = "scipy.fftpack"
|
|
||||||
except ImportError:
|
|
||||||
print("ERROR: No DCT module available!")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Step 3: Test basic DCT on small array
|
|
||||||
print("\n[3] BASIC DCT TEST (8x8 block)")
|
|
||||||
try:
|
|
||||||
test_block = np.random.rand(8, 8).astype(np.float64)
|
|
||||||
|
|
||||||
# 1D DCT on rows
|
|
||||||
result = dct(test_block[0, :], norm='ortho')
|
|
||||||
print(f" 1D DCT: OK (output shape: {result.shape})")
|
|
||||||
|
|
||||||
# 1D IDCT
|
|
||||||
recovered = idct(result, norm='ortho')
|
|
||||||
error = np.max(np.abs(test_block[0, :] - recovered))
|
|
||||||
print(f" 1D IDCT: OK (roundtrip error: {error:.2e})")
|
|
||||||
|
|
||||||
# 2D via separable
|
|
||||||
temp = np.zeros_like(test_block)
|
|
||||||
for i in range(8):
|
|
||||||
temp[:, i] = dct(test_block[:, i], norm='ortho')
|
|
||||||
result2d = np.zeros_like(temp)
|
|
||||||
for i in range(8):
|
|
||||||
result2d[i, :] = dct(temp[i, :], norm='ortho')
|
|
||||||
print(f" 2D DCT: OK")
|
|
||||||
|
|
||||||
gc.collect()
|
|
||||||
print(" GC after basic test: OK")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" FAILED: {e}")
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
# Step 4: Test with larger arrays (stress test)
|
|
||||||
print("\n[4] STRESS TEST (many 8x8 blocks)")
|
|
||||||
try:
|
|
||||||
NUM_BLOCKS = 10000
|
|
||||||
print(f" Processing {NUM_BLOCKS} blocks...")
|
|
||||||
|
|
||||||
for i in range(NUM_BLOCKS):
|
|
||||||
block = np.random.rand(8, 8).astype(np.float64)
|
|
||||||
|
|
||||||
# Forward DCT
|
|
||||||
temp = np.zeros_like(block)
|
|
||||||
for j in range(8):
|
|
||||||
temp[:, j] = dct(block[:, j], norm='ortho')
|
|
||||||
result = np.zeros_like(temp)
|
|
||||||
for j in range(8):
|
|
||||||
result[j, :] = dct(temp[j, :], norm='ortho')
|
|
||||||
|
|
||||||
# Inverse DCT
|
|
||||||
temp2 = np.zeros_like(result)
|
|
||||||
for j in range(8):
|
|
||||||
temp2[j, :] = idct(result[j, :], norm='ortho')
|
|
||||||
recovered = np.zeros_like(temp2)
|
|
||||||
for j in range(8):
|
|
||||||
recovered[:, j] = idct(temp2[:, j], norm='ortho')
|
|
||||||
|
|
||||||
if i % 1000 == 0:
|
|
||||||
gc.collect()
|
|
||||||
print(f" {i}/{NUM_BLOCKS} blocks processed...")
|
|
||||||
|
|
||||||
gc.collect()
|
|
||||||
print(f" Stress test PASSED")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" FAILED at block {i}: {e}")
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
# Step 5: Test with actual image if provided
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
image_path = sys.argv[1]
|
|
||||||
print(f"\n[5] IMAGE TEST: {image_path}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(image_path, 'rb') as f:
|
|
||||||
image_data = f.read()
|
|
||||||
print(f" File size: {len(image_data) / 1024 / 1024:.2f} MB")
|
|
||||||
|
|
||||||
img = Image.open(io.BytesIO(image_data))
|
|
||||||
width, height = img.size
|
|
||||||
print(f" Dimensions: {width}x{height}")
|
|
||||||
print(f" Format: {img.format}")
|
|
||||||
print(f" Mode: {img.mode}")
|
|
||||||
|
|
||||||
# Convert to grayscale float array
|
|
||||||
gray = img.convert('L')
|
|
||||||
arr = np.array(gray, dtype=np.float64)
|
|
||||||
img.close()
|
|
||||||
gray.close()
|
|
||||||
print(f" Array shape: {arr.shape}")
|
|
||||||
print(f" Array dtype: {arr.dtype}")
|
|
||||||
|
|
||||||
# Pad to block boundary
|
|
||||||
BLOCK_SIZE = 8
|
|
||||||
h, w = arr.shape
|
|
||||||
new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
|
|
||||||
new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
|
|
||||||
|
|
||||||
if new_h != h or new_w != w:
|
|
||||||
padded = np.zeros((new_h, new_w), dtype=np.float64)
|
|
||||||
padded[:h, :w] = arr
|
|
||||||
arr = padded
|
|
||||||
print(f" Padded to: {arr.shape}")
|
|
||||||
|
|
||||||
blocks_y = arr.shape[0] // BLOCK_SIZE
|
|
||||||
blocks_x = arr.shape[1] // BLOCK_SIZE
|
|
||||||
total_blocks = blocks_y * blocks_x
|
|
||||||
print(f" Total 8x8 blocks: {total_blocks}")
|
|
||||||
|
|
||||||
# Process ALL blocks
|
|
||||||
print(f" Processing all blocks with DCT...")
|
|
||||||
|
|
||||||
processed = 0
|
|
||||||
for by in range(blocks_y):
|
|
||||||
for bx in range(blocks_x):
|
|
||||||
y = by * BLOCK_SIZE
|
|
||||||
x = bx * BLOCK_SIZE
|
|
||||||
|
|
||||||
block = arr[y:y+BLOCK_SIZE, x:x+BLOCK_SIZE].copy()
|
|
||||||
|
|
||||||
# Forward DCT
|
|
||||||
temp = np.zeros((8, 8), dtype=np.float64)
|
|
||||||
for i in range(8):
|
|
||||||
temp[:, i] = dct(block[:, i], norm='ortho')
|
|
||||||
dct_block = np.zeros((8, 8), dtype=np.float64)
|
|
||||||
for i in range(8):
|
|
||||||
dct_block[i, :] = dct(temp[i, :], norm='ortho')
|
|
||||||
|
|
||||||
# Inverse DCT
|
|
||||||
temp2 = np.zeros((8, 8), dtype=np.float64)
|
|
||||||
for i in range(8):
|
|
||||||
temp2[i, :] = idct(dct_block[i, :], norm='ortho')
|
|
||||||
recovered = np.zeros((8, 8), dtype=np.float64)
|
|
||||||
for i in range(8):
|
|
||||||
recovered[:, i] = idct(temp2[:, i], norm='ortho')
|
|
||||||
|
|
||||||
processed += 1
|
|
||||||
|
|
||||||
# GC after each row of blocks
|
|
||||||
if by % 50 == 0:
|
|
||||||
gc.collect()
|
|
||||||
print(f" Row {by}/{blocks_y} ({processed}/{total_blocks} blocks)")
|
|
||||||
|
|
||||||
gc.collect()
|
|
||||||
print(f" Image DCT test PASSED ({processed} blocks)")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" FAILED: {e}")
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("\n[5] IMAGE TEST: Skipped (no image path provided)")
|
|
||||||
print(" Usage: python test_dct_crash.py /path/to/image.jpg")
|
|
||||||
|
|
||||||
# Step 6: Final cleanup test
|
|
||||||
print("\n[6] FINAL CLEANUP TEST")
|
|
||||||
try:
|
|
||||||
gc.collect()
|
|
||||||
gc.collect()
|
|
||||||
gc.collect()
|
|
||||||
print(" Multiple GC cycles: OK")
|
|
||||||
except Exception as e:
|
|
||||||
print(f" FAILED: {e}")
|
|
||||||
|
|
||||||
print("\n" + "=" * 60)
|
|
||||||
print("If this script completes without 'free(): invalid size',")
|
|
||||||
print("the issue is likely in PIL/jpegio interaction, not scipy DCT.")
|
|
||||||
print("=" * 60)
|
|
||||||
|
|
||||||
# Keep process alive briefly to catch delayed crashes
|
|
||||||
import time
|
|
||||||
print("\nWaiting 2 seconds for delayed crashes...")
|
|
||||||
time.sleep(2)
|
|
||||||
print("Done - no crash detected!")
|
|
||||||
Reference in New Issue
Block a user