diff --git a/.gitignore b/.gitignore index 2911eb1..068fd5e 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,12 @@ old_files/ *.swp *.swo +# Backup files +*_old +*_old.* +*.bak +*.orig + # Testing .pytest_cache/ .coverage diff --git a/frontends/cli/main.py_old b/frontends/cli/main.py_old deleted file mode 100644 index 4f08fa2..0000000 --- a/frontends/cli/main.py_old +++ /dev/null @@ -1,1073 +0,0 @@ -#!/usr/bin/env python3 -""" -Stegasoo CLI - Command-line interface for steganography operations (v3.2.0). - -CHANGES in v3.2.0: -- Removed date dependency from all operations -- Renamed day_phrase → passphrase -- No longer need to specify or remember encoding dates -- Default passphrase length increased to 4 words - -Usage: - stegasoo generate [OPTIONS] - stegasoo encode [OPTIONS] - stegasoo decode [OPTIONS] - stegasoo verify [OPTIONS] - stegasoo info [OPTIONS] - stegasoo compare [OPTIONS] - stegasoo modes [OPTIONS] -""" - -import sys -from pathlib import Path -from typing import Optional - -import click - -# Add parent to path for development -sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'src')) - -import stegasoo -from stegasoo import ( - # Core operations - encode, decode, - - # Credential generation - generate_credentials, - generate_passphrase, - generate_pin, - export_rsa_key_pem, - load_rsa_key, - - # Validation - validate_image, - - # Image utilities - get_image_info, - compare_capacity, - - # Steganography functions - has_dct_support, - compare_modes, - will_fit_by_mode, - - # Utilities - generate_filename, - - # Version - __version__, - - # Exceptions - StegasooError, - DecryptionError, - ExtractionError, - - # Models - FilePayload, -) - -# Import constants - try main module first, then constants submodule -try: - from stegasoo import ( - EMBED_MODE_LSB, - EMBED_MODE_DCT, - EMBED_MODE_AUTO, - ) -except ImportError: - from stegasoo.constants import ( - EMBED_MODE_LSB, - EMBED_MODE_DCT, - EMBED_MODE_AUTO, - ) - -# Import constants that may not be in main __init__ -try: - from stegasoo.constants import ( - DEFAULT_PASSPHRASE_WORDS, - DEFAULT_PIN_LENGTH, - MIN_PIN_LENGTH, - MAX_PIN_LENGTH, - ) -except ImportError: - # Fallback defaults if constants not available - DEFAULT_PASSPHRASE_WORDS = 4 - DEFAULT_PIN_LENGTH = 6 - MIN_PIN_LENGTH = 6 - MAX_PIN_LENGTH = 9 - -# Optional: strip_image_metadata from utils -try: - from stegasoo.utils import strip_image_metadata - HAS_STRIP_METADATA = True -except ImportError: - HAS_STRIP_METADATA = False - -# QR Code utilities -try: - from stegasoo.qr_utils import ( - extract_key_from_qr_file, - generate_qr_code, - has_qr_read, has_qr_write, - can_fit_in_qr, needs_compression, - ) - HAS_QR = True -except ImportError: - HAS_QR = False - has_qr_read = lambda: False - has_qr_write = lambda: False - - -# ============================================================================ -# CLI SETUP -# ============================================================================ - -CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) - - -@click.group(context_settings=CONTEXT_SETTINGS) -@click.version_option(__version__, '-v', '--version') -def cli(): - """ - Stegasoo - Secure steganography with hybrid authentication. - - Hide encrypted messages or files in images using a combination of: - - \b - - Reference photo (something you have) - - Passphrase (something you know) - - Static PIN or RSA key (additional security) - - \b - Version 3.2.0 Changes: - - No more date parameters - encode/decode anytime! - - Simplified passphrase (no daily rotation) - - Default passphrase increased to 4 words - - True asynchronous communications - - \b - Embedding Modes: - - LSB mode (default): Full color output, higher capacity - - DCT mode: Frequency domain, ~20% capacity, better stealth - - \b - DCT Options: - - Color mode: grayscale (default) or color (preserves colors) - - Output format: png (lossless) or jpeg (smaller, natural) - """ - pass - - -# ============================================================================ -# GENERATE COMMAND -# ============================================================================ - -@cli.command() -@click.option('--pin/--no-pin', default=True, help='Generate a PIN (default: yes)') -@click.option('--rsa/--no-rsa', default=False, help='Generate an RSA key') -@click.option('--pin-length', type=click.IntRange(6, 9), default=DEFAULT_PIN_LENGTH, - help=f'PIN length (6-9, default: {DEFAULT_PIN_LENGTH})') -@click.option('--rsa-bits', type=click.Choice(['2048', '3072', '4096']), default='2048', - help='RSA key size') -@click.option('--words', type=click.IntRange(3, 12), default=DEFAULT_PASSPHRASE_WORDS, - help=f'Words per passphrase (default: {DEFAULT_PASSPHRASE_WORDS})') -@click.option('--output', '-o', type=click.Path(), help='Save RSA key to file (requires password)') -@click.option('--password', '-p', help='Password for RSA key file') -@click.option('--json', 'as_json', is_flag=True, help='Output as JSON') -def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json): - """ - Generate credentials for encoding/decoding. - - Creates a passphrase and optionally a PIN and/or RSA key. - At least one of --pin or --rsa must be enabled. - - v3.2.0: Single passphrase (no more daily rotation!) - Default increased to 4 words for better security. - - \b - Examples: - stegasoo generate - stegasoo generate --words 5 - stegasoo generate --rsa --rsa-bits 4096 - stegasoo generate --rsa -o mykey.pem -p "secretpassword" - stegasoo generate --no-pin --rsa - """ - if not pin and not rsa: - raise click.UsageError("Must enable at least one of --pin or --rsa") - - if output and not password: - raise click.UsageError("--password is required when saving RSA key to file") - - if password and len(password) < 8: - raise click.UsageError("Password must be at least 8 characters") - - try: - creds = generate_credentials( - use_pin=pin, - use_rsa=rsa, - pin_length=pin_length, - rsa_bits=int(rsa_bits), - passphrase_words=words, # v3.2.0: renamed parameter - rsa_password=password if output else None, - ) - - if as_json: - import json - data = { - 'passphrase': creds.passphrase, - 'pin': creds.pin, - 'rsa_key': creds.rsa_key_pem, - 'entropy': { - 'passphrase': creds.passphrase_entropy, - 'pin': creds.pin_entropy, - 'rsa': creds.rsa_entropy, - 'total': creds.total_entropy, - } - } - click.echo(json.dumps(data, indent=2)) - return - - # Pretty output - click.echo() - click.secho("=" * 60, fg='cyan') - click.secho(" STEGASOO CREDENTIALS (v3.2.0)", fg='cyan', bold=True) - click.secho("=" * 60, fg='cyan') - click.echo() - - click.secho("⚠️ MEMORIZE THESE AND CLOSE THIS WINDOW", fg='yellow', bold=True) - click.secho(" Do not screenshot or save to file!", fg='yellow') - click.echo() - - if creds.pin: - click.secho("─── STATIC PIN ───", fg='green') - click.secho(f" {creds.pin}", fg='bright_yellow', bold=True) - click.echo() - - click.secho("─── PASSPHRASE ───", fg='green') - click.secho(f" {creds.passphrase}", fg='bright_white', bold=True) - click.echo() - - if creds.rsa_key_pem: - click.secho("─── RSA KEY ───", fg='green') - if output: - # Save to file - private_key = load_rsa_key(creds.rsa_key_pem.encode()) - encrypted_pem = export_rsa_key_pem(private_key, password) - Path(output).write_bytes(encrypted_pem) - click.secho(f" Saved to: {output}", fg='bright_white') - click.secho(f" Password: {'*' * len(password)}", dim=True) - else: - click.echo(creds.rsa_key_pem) - click.echo() - - click.secho("─── SECURITY ───", fg='green') - click.echo(f" Passphrase entropy: {creds.passphrase_entropy} bits ({words} words)") - if creds.pin: - click.echo(f" PIN entropy: {creds.pin_entropy} bits") - if creds.rsa_key_pem: - click.echo(f" RSA entropy: {creds.rsa_entropy} bits") - click.echo(f" Combined: {creds.total_entropy} bits") - click.secho(f" + photo entropy: 80-256 bits", dim=True) - click.echo() - - click.secho("✓ v3.2.0: Use this passphrase anytime - no date needed!", fg='cyan') - click.echo() - - except Exception as e: - raise click.ClickException(str(e)) - - -# ============================================================================ -# ENCODE COMMAND -# ============================================================================ - -@cli.command() -@click.option('--ref', '-r', required=True, type=click.Path(exists=True), help='Reference photo') -@click.option('--carrier', '-c', required=True, type=click.Path(exists=True), help='Carrier image') -@click.option('--message', '-m', help='Text message to encode') -@click.option('--message-file', '-f', type=click.Path(exists=True), help='Read text message from file') -@click.option('--embed-file', '-e', type=click.Path(exists=True), help='Embed a file (binary)') -@click.option('--passphrase', '-p', required=True, help='Passphrase') -@click.option('--pin', help='Static PIN') -@click.option('--key', '-k', type=click.Path(exists=True), help='RSA key file (.pem)') -@click.option('--key-qr', type=click.Path(exists=True), help='RSA key from QR code image') -@click.option('--key-password', help='RSA key password (for encrypted .pem files)') -@click.option('--output', '-o', type=click.Path(), help='Output file (default: auto-generated)') -@click.option('--mode', 'embed_mode', type=click.Choice(['lsb', 'dct']), default='lsb', - help='Embedding mode: lsb (default, color) or dct (requires scipy)') -@click.option('--dct-format', 'dct_output_format', type=click.Choice(['png', 'jpeg']), default='png', - help='DCT output format: png (lossless, default) or jpeg (smaller)') -@click.option('--dct-color', 'dct_color_mode', type=click.Choice(['grayscale', 'color']), default='grayscale', - help='DCT color mode: grayscale (default) or color (preserves original colors)') -@click.option('--quiet', '-q', is_flag=True, help='Suppress output except errors') -def encode_cmd(ref, carrier, message, message_file, embed_file, passphrase, pin, key, key_qr, - key_password, output, embed_mode, dct_output_format, dct_color_mode, quiet): - """ - Encode a secret message or file into an image. - - Requires a reference photo, carrier image, and passphrase. - Must provide either --pin or --key/--key-qr (or both). - - v3.2.0: No --date parameter needed! Encode and decode anytime. - - For text messages, use -m or -f or pipe via stdin. - For binary files, use -e/--embed-file. - RSA key can be provided as a .pem file (--key) or QR code image (--key-qr). - - \b - Embedding Modes: - --mode lsb Spatial LSB embedding (default) - - Full color output (PNG/BMP) - - Higher capacity (~375 KB/megapixel) - - --mode dct DCT domain embedding (requires scipy) - - Configurable color/grayscale output - - Lower capacity (~75 KB/megapixel) - - Better resistance to visual analysis - - \b - DCT Options: - --dct-format png Lossless output (default) - --dct-format jpeg Smaller file, more natural appearance - - --dct-color grayscale Convert to grayscale (default, traditional) - --dct-color color Preserve original colors (experimental) - - \b - Examples: - # Text message with PIN (LSB mode, default) - stegasoo encode -r photo.jpg -c meme.png -p "apple forest thunder mountain" --pin 123456 -m "secret" - - # DCT mode - grayscale PNG (traditional) - stegasoo encode -r photo.jpg -c meme.png -p "secure words here now" --pin 123456 -m "secret" --mode dct - - # DCT mode - color JPEG - stegasoo encode -r photo.jpg -c meme.png -p "my strong passphrase" --pin 123456 -m "secret" \\ - --mode dct --dct-color color --dct-format jpeg - """ - # Check DCT mode availability - if embed_mode == 'dct' and not has_dct_support(): - raise click.ClickException( - "DCT mode requires scipy. Install with: pip install scipy" - ) - - # Warn if DCT options used with LSB mode - if embed_mode == 'lsb': - if dct_output_format != 'png' or dct_color_mode != 'grayscale': - if not quiet: - click.secho("Note: --dct-format and --dct-color only apply to DCT mode", fg='yellow', dim=True) - - # Determine what to encode - payload = None - - if embed_file: - # Binary file embedding - payload = FilePayload.from_file(embed_file) - if not quiet: - click.echo(f"Embedding file: {payload.filename} ({len(payload.data):,} bytes)") - elif message: - payload = message - elif message_file: - payload = Path(message_file).read_text() - elif not sys.stdin.isatty(): - payload = sys.stdin.read() - else: - raise click.UsageError("Must provide message via -m, -f, -e, or stdin") - - # Load key if provided (from .pem file or QR code image) - rsa_key_data = None - rsa_key_from_qr = False - - if key and key_qr: - raise click.UsageError("Cannot use both --key and --key-qr. Choose one.") - - if key: - rsa_key_data = Path(key).read_bytes() - elif key_qr: - if not HAS_QR or not has_qr_read(): - raise click.ClickException( - "QR code reading not available. Install: pip install pyzbar\n" - "Also requires system library: sudo apt-get install libzbar0" - ) - key_pem = extract_key_from_qr_file(key_qr) - if not key_pem: - raise click.ClickException(f"Could not extract RSA key from QR code: {key_qr}") - rsa_key_data = key_pem.encode('utf-8') - rsa_key_from_qr = True - if not quiet: - click.echo(f"Loaded RSA key from QR code: {key_qr}") - - # QR code keys are never password-protected - effective_key_password = None if rsa_key_from_qr else key_password - - # Validate security factors - if not pin and not rsa_key_data: - raise click.UsageError("Must provide --pin or --key/--key-qr (or both)") - - try: - ref_photo = Path(ref).read_bytes() - carrier_image = Path(carrier).read_bytes() - - # Pre-check capacity with selected mode - fit_check = will_fit_by_mode(payload, carrier_image, embed_mode=embed_mode) - if not fit_check['fits']: - # Suggest alternative mode if it would fit - alt_mode = 'lsb' if embed_mode == 'dct' else 'dct' - alt_check = will_fit_by_mode(payload, carrier_image, embed_mode=alt_mode) - - suggestion = "" - if alt_mode == 'lsb' and alt_check['fits']: - suggestion = f"\n Tip: Payload would fit in LSB mode (--mode lsb)" - - raise click.ClickException( - f"Payload too large for {embed_mode.upper()} mode.\n" - f" Payload: {fit_check['payload_size']:,} bytes\n" - f" Capacity: {fit_check['capacity']:,} bytes\n" - f" Shortfall: {-fit_check['headroom']:,} bytes" - f"{suggestion}" - ) - - if not quiet: - mode_desc = embed_mode.upper() - if embed_mode == 'dct': - mode_desc += f" ({dct_color_mode}, {dct_output_format.upper()})" - click.echo(f"Mode: {mode_desc} ({fit_check['usage_percent']:.1f}% capacity)") - - # v3.2.0: No date_str parameter - result = encode( - message=payload, - reference_photo=ref_photo, - carrier_image=carrier_image, - passphrase=passphrase, - pin=pin or "", - rsa_key_data=rsa_key_data, - rsa_password=effective_key_password, - embed_mode=embed_mode, - dct_output_format=dct_output_format, - dct_color_mode=dct_color_mode, - ) - - # Determine output path - if output: - out_path = Path(output) - else: - out_path = Path(result.filename) - - # Write output - out_path.write_bytes(result.stego_image) - - if not quiet: - click.secho(f"✓ Encoded successfully!", fg='green') - click.echo(f" Output: {out_path}") - click.echo(f" Size: {len(result.stego_image):,} bytes") - click.echo(f" Capacity used: {result.capacity_percent:.1f}%") - if embed_mode == 'dct': - color_note = "color preserved" if dct_color_mode == 'color' else "grayscale" - format_note = dct_output_format.upper() - click.secho(f" DCT output: {format_note} ({color_note})", dim=True) - - except StegasooError as e: - raise click.ClickException(str(e)) - except click.ClickException: - raise - except Exception as e: - raise click.ClickException(f"Error: {e}") - - -# ============================================================================ -# DECODE COMMAND -# ============================================================================ - -@cli.command() -@click.option('--ref', '-r', required=True, type=click.Path(exists=True), help='Reference photo') -@click.option('--stego', '-s', required=True, type=click.Path(exists=True), help='Stego image') -@click.option('--passphrase', '-p', required=True, help='Passphrase') -@click.option('--pin', help='Static PIN') -@click.option('--key', '-k', type=click.Path(exists=True), help='RSA key file (.pem)') -@click.option('--key-qr', type=click.Path(exists=True), help='RSA key from QR code image') -@click.option('--key-password', help='RSA key password (for encrypted .pem files)') -@click.option('--output', '-o', type=click.Path(), help='Save decoded content to file') -@click.option('--mode', 'embed_mode', type=click.Choice(['auto', 'lsb', 'dct']), default='auto', - help='Extraction mode: auto (default), lsb, or dct') -@click.option('--quiet', '-q', is_flag=True, help='Output only the content (for text) or suppress messages (for files)') -@click.option('--force', is_flag=True, help='Overwrite existing output file') -def decode_cmd(ref, stego, passphrase, pin, key, key_qr, key_password, output, embed_mode, quiet, force): - """ - Decode a secret message or file from a stego image. - - Must use the same credentials that were used for encoding. - Automatically detects whether content is text or a file. - RSA key can be provided as a .pem file (--key) or QR code image (--key-qr). - - v3.2.0: No --date parameter needed! Just use your passphrase. - - Note: Extraction works the same regardless of whether the image was - created with color mode or grayscale mode - both use the same Y channel. - - \b - Extraction Modes: - --mode auto Auto-detect (default) - tries LSB first, then DCT - --mode lsb Only try LSB extraction - --mode dct Only try DCT extraction (requires scipy) - - \b - Examples: - # Decode with PIN (auto-detect mode) - stegasoo decode -r photo.jpg -s stego.png -p "apple forest thunder mountain" --pin 123456 - - # Explicitly specify DCT mode - stegasoo decode -r photo.jpg -s stego.png -p "my passphrase here" --pin 123456 --mode dct - - # Decode with RSA key file - stegasoo decode -r photo.jpg -s stego.png -p "strong words" -k mykey.pem - - # Save output to file - stegasoo decode -r photo.jpg -s stego.png -p "passphrase" --pin 123456 -o output.txt - """ - # Check DCT mode availability - if embed_mode == 'dct' and not has_dct_support(): - raise click.ClickException( - "DCT mode requires scipy. Install with: pip install scipy" - ) - - # Load key if provided (from .pem file or QR code image) - rsa_key_data = None - rsa_key_from_qr = False - - if key and key_qr: - raise click.UsageError("Cannot use both --key and --key-qr. Choose one.") - - if key: - rsa_key_data = Path(key).read_bytes() - elif key_qr: - if not HAS_QR or not has_qr_read(): - raise click.ClickException( - "QR code reading not available. Install: pip install pyzbar\n" - "Also requires system library: sudo apt-get install libzbar0" - ) - key_pem = extract_key_from_qr_file(key_qr) - if not key_pem: - raise click.ClickException(f"Could not extract RSA key from QR code: {key_qr}") - rsa_key_data = key_pem.encode('utf-8') - rsa_key_from_qr = True - if not quiet: - click.echo(f"Loaded RSA key from QR code: {key_qr}") - - # QR code keys are never password-protected - effective_key_password = None if rsa_key_from_qr else key_password - - # Validate security factors - if not pin and not rsa_key_data: - raise click.UsageError("Must provide --pin or --key/--key-qr (or both)") - - try: - ref_photo = Path(ref).read_bytes() - stego_image = Path(stego).read_bytes() - - # v3.2.0: No date_str parameter - result = decode( - stego_image=stego_image, - reference_photo=ref_photo, - passphrase=passphrase, - pin=pin or "", - rsa_key_data=rsa_key_data, - rsa_password=effective_key_password, - embed_mode=embed_mode, - ) - - if result.is_file: - # File content - if output: - out_path = Path(output) - elif result.filename: - out_path = Path(result.filename) - else: - out_path = Path("decoded_file") - - if out_path.exists() and not force: - raise click.ClickException( - f"Output file '{out_path}' exists. Use --force to overwrite." - ) - - out_path.write_bytes(result.file_data) - - if not quiet: - click.secho("✓ Decoded file successfully!", fg='green') - click.echo(f" Saved to: {out_path}") - click.echo(f" Size: {len(result.file_data):,} bytes") - if result.mime_type: - click.echo(f" Type: {result.mime_type}") - else: - # Text content - if output: - Path(output).write_text(result.message) - if not quiet: - click.secho("✓ Decoded successfully!", fg='green') - click.echo(f" Saved to: {output}") - else: - if quiet: - click.echo(result.message) - else: - click.secho("✓ Decoded successfully!", fg='green') - click.echo() - click.echo(result.message) - - except (DecryptionError, ExtractionError) as e: - raise click.ClickException(f"Decryption failed: {e}") - except StegasooError as e: - raise click.ClickException(str(e)) - except Exception as e: - raise click.ClickException(f"Error: {e}") - - -# ============================================================================ -# VERIFY COMMAND -# ============================================================================ - -@cli.command() -@click.option('--ref', '-r', required=True, type=click.Path(exists=True), help='Reference photo') -@click.option('--stego', '-s', required=True, type=click.Path(exists=True), help='Stego image') -@click.option('--passphrase', '-p', required=True, help='Passphrase') -@click.option('--pin', help='Static PIN') -@click.option('--key', '-k', type=click.Path(exists=True), help='RSA key file (.pem)') -@click.option('--key-qr', type=click.Path(exists=True), help='RSA key from QR code image') -@click.option('--key-password', help='RSA key password (for encrypted .pem files)') -@click.option('--mode', 'embed_mode', type=click.Choice(['auto', 'lsb', 'dct']), default='auto', - help='Extraction mode: auto (default), lsb, or dct') -@click.option('--json', 'as_json', is_flag=True, help='Output as JSON') -def verify(ref, stego, passphrase, pin, key, key_qr, key_password, embed_mode, as_json): - """ - Verify that a stego image can be decoded without extracting the message. - - Quick check to validate credentials are correct and data is intact. - Does NOT output the actual message content. - - \b - Examples: - stegasoo verify -r photo.jpg -s stego.png -p "my passphrase" --pin 123456 - - stegasoo verify -r photo.jpg -s stego.png -p "words here" -k mykey.pem --json - - stegasoo verify -r photo.jpg -s stego.png -p "test phrase" --pin 123456 --mode dct - """ - # Check DCT mode availability - if embed_mode == 'dct' and not has_dct_support(): - raise click.ClickException( - "DCT mode requires scipy. Install with: pip install scipy" - ) - - # Load key if provided - rsa_key_data = None - rsa_key_from_qr = False - - if key and key_qr: - raise click.UsageError("Cannot use both --key and --key-qr. Choose one.") - - if key: - rsa_key_data = Path(key).read_bytes() - elif key_qr: - if not HAS_QR or not has_qr_read(): - raise click.ClickException( - "QR code reading not available. Install: pip install pyzbar\n" - "Also requires system library: sudo apt-get install libzbar0" - ) - key_pem = extract_key_from_qr_file(key_qr) - if not key_pem: - raise click.ClickException(f"Could not extract RSA key from QR code: {key_qr}") - rsa_key_data = key_pem.encode('utf-8') - rsa_key_from_qr = True - - effective_key_password = None if rsa_key_from_qr else key_password - - if not pin and not rsa_key_data: - raise click.UsageError("Must provide --pin or --key/--key-qr (or both)") - - try: - ref_photo = Path(ref).read_bytes() - stego_image = Path(stego).read_bytes() - - # Attempt to decode - result = decode( - stego_image=stego_image, - reference_photo=ref_photo, - passphrase=passphrase, - pin=pin or "", - rsa_key_data=rsa_key_data, - rsa_password=effective_key_password, - embed_mode=embed_mode, - ) - - # Calculate payload size - if result.is_file: - payload_size = len(result.file_data) if result.file_data else 0 - payload_type = "file" - payload_desc = result.filename or "unnamed file" - if result.mime_type: - payload_desc += f" ({result.mime_type})" - else: - payload_size = len(result.message.encode('utf-8')) if result.message else 0 - payload_type = "text" - payload_desc = f"{payload_size} bytes" - - if as_json: - import json - output_data = { - "valid": True, - "stego_file": stego, - "payload_type": payload_type, - "payload_size": payload_size, - } - if result.is_file: - output_data["filename"] = result.filename - output_data["mime_type"] = result.mime_type - click.echo(json.dumps(output_data, indent=2)) - else: - click.secho("✓ Valid stego image", fg='green', bold=True) - click.echo(f" Payload: {payload_type} ({payload_desc})") - click.echo(f" Size: {payload_size:,} bytes") - - except (DecryptionError, ExtractionError) as e: - if as_json: - import json - output_data = { - "valid": False, - "stego_file": stego, - "error": str(e), - } - click.echo(json.dumps(output_data, indent=2)) - sys.exit(1) - else: - click.secho("✗ Verification failed", fg='red', bold=True) - click.echo(f" Error: {e}") - sys.exit(1) - except StegasooError as e: - raise click.ClickException(str(e)) - except Exception as e: - raise click.ClickException(f"Error: {e}") - - -# ============================================================================ -# INFO COMMAND -# ============================================================================ - -@cli.command() -@click.argument('image', type=click.Path(exists=True)) -@click.option('--json', 'as_json', is_flag=True, help='Output as JSON') -def info(image, as_json): - """ - Show information about an image. - - Displays dimensions, capacity for both LSB and DCT modes. - """ - try: - image_data = Path(image).read_bytes() - - result = validate_image(image_data, check_size=False) - if not result.is_valid: - raise click.ClickException(result.error_message) - - # Get capacity comparison - comparison = compare_modes(image_data) - - if as_json: - import json - output_data = { - "file": image, - "width": result.details['width'], - "height": result.details['height'], - "pixels": result.details['pixels'], - "mode": result.details['mode'], - "format": result.details['format'], - "capacity": { - "lsb": { - "bytes": comparison['lsb']['capacity_bytes'], - "kb": round(comparison['lsb']['capacity_kb'], 1), - }, - "dct": { - "bytes": comparison['dct']['capacity_bytes'], - "kb": round(comparison['dct']['capacity_kb'], 1), - "available": comparison['dct']['available'], - "ratio_vs_lsb": round(comparison['dct']['ratio_vs_lsb'], 1), - "output_formats": ["png", "jpeg"], - "color_modes": ["grayscale", "color"], - }, - }, - } - click.echo(json.dumps(output_data, indent=2)) - return - - click.echo() - click.secho(f"Image: {image}", bold=True) - click.echo(f" Dimensions: {result.details['width']} × {result.details['height']}") - click.echo(f" Pixels: {result.details['pixels']:,}") - click.echo(f" Mode: {result.details['mode']}") - click.echo(f" Format: {result.details['format']}") - click.echo() - - click.secho(" Capacity:", bold=True) - click.echo(f" LSB mode: ~{comparison['lsb']['capacity_bytes']:,} bytes ({comparison['lsb']['capacity_kb']:.1f} KB)") - - dct_status = "✓" if comparison['dct']['available'] else "✗ (scipy not installed)" - click.echo(f" DCT mode: ~{comparison['dct']['capacity_bytes']:,} bytes ({comparison['dct']['capacity_kb']:.1f} KB) {dct_status}") - click.echo(f" DCT ratio: {comparison['dct']['ratio_vs_lsb']:.1f}% of LSB") - - if comparison['dct']['available']: - click.secho(" DCT options: grayscale/color, png/jpeg", dim=True) - - click.echo() - - except Exception as e: - raise click.ClickException(str(e)) - - -# ============================================================================ -# COMPARE COMMAND -# ============================================================================ - -@cli.command() -@click.argument('image', type=click.Path(exists=True)) -@click.option('--payload-size', '-s', type=int, help='Check if specific payload size fits') -@click.option('--json', 'as_json', is_flag=True, help='Output as JSON') -def compare(image, payload_size, as_json): - """ - Compare LSB and DCT embedding modes for an image. - - Shows capacity for each mode and recommends which to use. - Optionally checks if a specific payload size would fit. - - \b - Examples: - stegasoo compare carrier.png - stegasoo compare carrier.png --payload-size 50000 - stegasoo compare carrier.png --json - """ - try: - image_data = Path(image).read_bytes() - - comparison = compare_modes(image_data) - - if as_json: - import json - output_data = { - "file": image, - "width": comparison['width'], - "height": comparison['height'], - "modes": { - "lsb": { - "capacity_bytes": comparison['lsb']['capacity_bytes'], - "capacity_kb": round(comparison['lsb']['capacity_kb'], 1), - "available": True, - "output_format": comparison['lsb']['output'], - }, - "dct": { - "capacity_bytes": comparison['dct']['capacity_bytes'], - "capacity_kb": round(comparison['dct']['capacity_kb'], 1), - "available": comparison['dct']['available'], - "output_formats": ["png", "jpeg"], - "color_modes": ["grayscale", "color"], - "ratio_vs_lsb_percent": round(comparison['dct']['ratio_vs_lsb'], 1), - }, - }, - } - - if payload_size: - output_data["payload_check"] = { - "size_bytes": payload_size, - "fits_lsb": payload_size <= comparison['lsb']['capacity_bytes'], - "fits_dct": payload_size <= comparison['dct']['capacity_bytes'], - } - - click.echo(json.dumps(output_data, indent=2)) - return - - click.echo() - click.secho(f"=== Mode Comparison: {image} ===", fg='cyan', bold=True) - click.echo(f" Dimensions: {comparison['width']} × {comparison['height']}") - click.echo() - - # LSB mode - click.secho(" ┌─── LSB Mode ───", fg='green') - click.echo(f" │ Capacity: {comparison['lsb']['capacity_bytes']:,} bytes ({comparison['lsb']['capacity_kb']:.1f} KB)") - click.echo(f" │ Output: {comparison['lsb']['output']}") - click.echo(f" │ Status: ✓ Available") - click.echo(" │") - - # DCT mode - click.secho(" ├─── DCT Mode ───", fg='blue') - click.echo(f" │ Capacity: {comparison['dct']['capacity_bytes']:,} bytes ({comparison['dct']['capacity_kb']:.1f} KB)") - click.echo(f" │ Ratio: {comparison['dct']['ratio_vs_lsb']:.1f}% of LSB capacity") - if comparison['dct']['available']: - click.echo(f" │ Status: ✓ Available") - click.echo(f" │ Formats: PNG (lossless), JPEG (smaller)") - click.echo(f" │ Colors: Grayscale (default), Color") - else: - click.secho(f" │ Status: ✗ Requires scipy (pip install scipy)", fg='yellow') - click.echo(" │") - - # Payload check - if payload_size: - click.secho(" ├─── Payload Check ───", fg='magenta') - click.echo(f" │ Size: {payload_size:,} bytes") - - fits_lsb = payload_size <= comparison['lsb']['capacity_bytes'] - fits_dct = payload_size <= comparison['dct']['capacity_bytes'] - - lsb_icon = "✓" if fits_lsb else "✗" - dct_icon = "✓" if fits_dct else "✗" - lsb_color = 'green' if fits_lsb else 'red' - dct_color = 'green' if fits_dct else 'red' - - click.echo(f" │ LSB mode: ", nl=False) - click.secho(f"{lsb_icon} {'Fits' if fits_lsb else 'Too large'}", fg=lsb_color) - click.echo(f" │ DCT mode: ", nl=False) - click.secho(f"{dct_icon} {'Fits' if fits_dct else 'Too large'}", fg=dct_color) - click.echo(" │") - - # Recommendation - click.secho(" └─── Recommendation ───", fg='yellow') - if not comparison['dct']['available']: - click.echo(" Use LSB mode (DCT unavailable)") - elif payload_size: - if fits_dct: - click.echo(" DCT mode for better stealth (payload fits both modes)") - click.echo(" Use --dct-color color to preserve original colors") - elif fits_lsb: - click.echo(" LSB mode (payload too large for DCT)") - else: - click.secho(" ✗ Payload too large for both modes!", fg='red') - else: - click.echo(" LSB for larger payloads, DCT for better stealth") - click.echo(" DCT supports color output with --dct-color color") - - click.echo() - - except Exception as e: - raise click.ClickException(str(e)) - - -# ============================================================================ -# STRIP-METADATA COMMAND -# ============================================================================ - -@cli.command('strip-metadata') -@click.argument('image', type=click.Path(exists=True)) -@click.option('--output', '-o', type=click.Path(), help='Output file (default: overwrites input)') -@click.option('--format', '-f', 'output_format', type=click.Choice(['PNG', 'BMP']), default='PNG', - help='Output format') -@click.option('--quiet', '-q', is_flag=True, help='Suppress output') -def strip_metadata_cmd(image, output, output_format, quiet): - """ - Remove all metadata (EXIF, GPS, etc.) from an image. - - Creates a clean image with only pixel data - no camera info, - location data, timestamps, or other potentially sensitive metadata. - - \b - Examples: - stegasoo strip-metadata photo.jpg -o clean.png - stegasoo strip-metadata photo.jpg # Overwrites as PNG - """ - if not HAS_STRIP_METADATA: - raise click.ClickException("strip_image_metadata not available") - - try: - image_data = Path(image).read_bytes() - original_size = len(image_data) - - clean_data = strip_image_metadata(image_data, output_format) - - if output: - out_path = Path(output) - else: - # Replace extension with output format - out_path = Path(image).with_suffix(f'.{output_format.lower()}') - - out_path.write_bytes(clean_data) - - if not quiet: - click.secho("✓ Metadata stripped", fg='green') - click.echo(f" Input: {image} ({original_size:,} bytes)") - click.echo(f" Output: {out_path} ({len(clean_data):,} bytes)") - - except Exception as e: - raise click.ClickException(str(e)) - - -# ============================================================================ -# MODES COMMAND -# ============================================================================ - -@cli.command() -def modes(): - """ - Show available embedding modes and their status. - - Displays which modes are available and their characteristics. - """ - click.echo() - click.secho("=== Stegasoo Embedding Modes (v3.2.0) ===", fg='cyan', bold=True) - click.echo() - - # LSB Mode - click.secho(" LSB Mode (Spatial LSB)", fg='green', bold=True) - click.echo(" Status: ✓ Always available") - click.echo(" Output: PNG/BMP (full color)") - click.echo(" Capacity: ~375 KB per megapixel") - click.echo(" Use case: Larger payloads, color preservation") - click.echo(" CLI flag: --mode lsb (default)") - click.echo() - - # DCT Mode - click.secho(" DCT Mode (Frequency Domain)", fg='blue', bold=True) - if has_dct_support(): - click.echo(" Status: ✓ Available") - else: - click.secho(" Status: ✗ Requires scipy", fg='yellow') - click.echo(" Install: pip install scipy") - click.echo(" Capacity: ~75 KB per megapixel (~20% of LSB)") - click.echo(" Use case: Better stealth, frequency domain hiding") - click.echo(" CLI flag: --mode dct") - click.echo() - - # DCT Options - click.secho(" DCT Options", fg='magenta', bold=True) - click.echo(" Output format:") - click.echo(" --dct-format png Lossless, larger file (default)") - click.echo(" --dct-format jpeg Lossy, smaller, more natural") - click.echo() - click.echo(" Color mode:") - click.echo(" --dct-color grayscale Traditional DCT (default)") - click.echo(" --dct-color color Preserves original colors") - click.echo() - - # v3.2.0 Note - click.secho(" v3.2.0 Changes:", fg='cyan', bold=True) - click.echo(" ✓ No date parameters needed") - click.echo(" ✓ Single passphrase (no daily rotation)") - click.echo(" ✓ Default passphrase increased to 4 words") - click.echo(" ✓ True asynchronous communications") - click.echo() - - # Examples - click.secho(" Examples:", dim=True) - click.echo(" # Traditional DCT (grayscale PNG)") - click.echo(" stegasoo encode ... --mode dct") - click.echo() - click.echo(" # Color-preserving DCT with JPEG output") - click.echo(" stegasoo encode ... --mode dct --dct-color color --dct-format jpeg") - click.echo() - click.echo(" # Compare modes for an image") - click.echo(" stegasoo compare carrier.png") - click.echo() - - -# ============================================================================ -# MAIN -# ============================================================================ - -def main(): - """Entry point.""" - cli() - - -if __name__ == '__main__': - main() diff --git a/src/stegasoo/dct_steganography.py_old b/src/stegasoo/dct_steganography.py_old deleted file mode 100644 index abfde7a..0000000 --- a/src/stegasoo/dct_steganography.py_old +++ /dev/null @@ -1,974 +0,0 @@ -""" -DCT Domain Steganography Module (v3.2.0) - -Embeds data in DCT coefficients with two approaches: -1. PNG output: Scipy-based DCT transform (grayscale or color) -2. JPEG output: jpegio-based coefficient manipulation (if available) - -The JPEG approach is the "correct" way to do JPEG steganography because -it directly modifies the already-quantized coefficients without re-encoding. - -Changes in v3.0.2: -- jpegio integration for proper JPEG coefficient embedding -- Falls back to warning if jpegio not available for JPEG output -- Maintains backward compatibility with v3.0.1 - -Changes in v3.2.0: -- Fixed color-mode extraction to properly extract from Y channel -- Added _extract_from_y_channel() for accurate color-mode extraction -- Improved extraction robustness for both grayscale and color modes - -Requires: scipy (for PNG mode), optionally jpegio (for JPEG mode) -""" - -import io -import struct -import hashlib -from dataclasses import dataclass -from typing import Optional, Literal, Tuple -from enum import Enum - -import numpy as np -from PIL import Image - -# Check for scipy availability (for PNG/DCT mode) -try: - from scipy.fftpack import dct, idct - HAS_SCIPY = True -except ImportError: - HAS_SCIPY = False - dct = None - idct = None - -# Check for jpegio availability (for proper JPEG mode) -try: - import jpegio as jio - HAS_JPEGIO = True -except ImportError: - HAS_JPEGIO = False - jio = None - - -# ============================================================================ -# CONSTANTS -# ============================================================================ - -# DCT block size (standard 8x8 like JPEG) -BLOCK_SIZE = 8 - -# Coefficients to use for embedding (mid-frequency, zig-zag order positions) -EMBED_POSITIONS = [ - (0, 1), (1, 0), (2, 0), (1, 1), (0, 2), (0, 3), (1, 2), (2, 1), (3, 0), - (4, 0), (3, 1), (2, 2), (1, 3), (0, 4), (0, 5), (1, 4), (2, 3), (3, 2), - (4, 1), (5, 0), (5, 1), (4, 2), (3, 3), (2, 4), (1, 5), (0, 6), (0, 7), - (1, 6), (2, 5), (3, 4), (4, 3), (5, 2), (6, 1), (7, 0), -] - -# Use subset of mid-frequency coefficients for better robustness -DEFAULT_EMBED_POSITIONS = EMBED_POSITIONS[4:20] # 16 coefficients per block - -# Quantization step for QIM embedding (larger = more robust, more visible) -QUANT_STEP = 25 - -# Magic bytes for DCT stego identification -DCT_MAGIC = b'DCTS' - -# Header size: magic(4) + version(1) + flags(1) + length(4) = 10 bytes -HEADER_SIZE = 10 - -# Output format options -OUTPUT_FORMAT_PNG = 'png' -OUTPUT_FORMAT_JPEG = 'jpeg' - -# JPEG output quality (only for fallback mode, not jpegio) -JPEG_OUTPUT_QUALITY = 95 - -# jpegio constants for JPEG coefficient embedding -JPEGIO_MAGIC = b'JPGS' -JPEGIO_MIN_COEF_MAGNITUDE = 2 -JPEGIO_EMBED_CHANNEL = 0 # Y channel - -# Flag bits for header -FLAG_COLOR_MODE = 0x01 # Set if embedded in color mode (Y channel of YCbCr) - - -# ============================================================================ -# DATA CLASSES -# ============================================================================ - -class DCTOutputFormat(Enum): - """Output format for DCT stego images.""" - PNG = 'png' - JPEG = 'jpeg' - - -@dataclass -class DCTEmbedStats: - """Statistics from DCT embedding operation.""" - blocks_used: int - blocks_available: int - bits_embedded: int - capacity_bits: int - usage_percent: float - image_width: int - image_height: int - output_format: str - jpeg_native: bool = False # True if used jpegio for proper JPEG embedding - color_mode: str = 'grayscale' # 'color' or 'grayscale' (v3.0.1+) - - -@dataclass -class DCTCapacityInfo: - """Capacity information for a carrier image.""" - width: int - height: int - blocks_x: int - blocks_y: int - total_blocks: int - bits_per_block: int - total_capacity_bits: int - total_capacity_bytes: int - usable_capacity_bytes: int - - -# ============================================================================ -# AVAILABILITY CHECKS -# ============================================================================ - -def _check_scipy(): - """Raise ImportError if scipy is not available.""" - if not HAS_SCIPY: - raise ImportError( - "DCT steganography requires scipy. " - "Install with: pip install scipy" - ) - - -def has_dct_support() -> bool: - """Check if DCT steganography is available (scipy installed).""" - return HAS_SCIPY - - -def has_jpegio_support() -> bool: - """Check if jpegio is available for proper JPEG coefficient embedding.""" - return HAS_JPEGIO - - -# ============================================================================ -# SCIPY DCT HELPERS (for PNG output) -# ============================================================================ - -def _dct2(block: np.ndarray) -> np.ndarray: - """Apply 2D DCT to a block.""" - return dct(dct(block.T, norm='ortho').T, norm='ortho') - - -def _idct2(block: np.ndarray) -> np.ndarray: - """Apply 2D inverse DCT to a block.""" - return idct(idct(block.T, norm='ortho').T, norm='ortho') - - -def _to_grayscale(image_data: bytes) -> np.ndarray: - """Convert image bytes to grayscale numpy array.""" - img = Image.open(io.BytesIO(image_data)) - gray = img.convert('L') - return np.array(gray, dtype=np.float64) - - -def _extract_y_channel(image_data: bytes) -> np.ndarray: - """ - Extract Y (luminance) channel from image for color-mode extraction. - - This uses the same YCbCr conversion as embedding to ensure - accurate extraction from color-mode stego images. - - Args: - image_data: Image file bytes - - Returns: - Y channel as float64 numpy array - """ - img = Image.open(io.BytesIO(image_data)) - - # Convert to RGB if needed - if img.mode != 'RGB': - img = img.convert('RGB') - - rgb_array = np.array(img, dtype=np.float64) - - # Extract Y channel using ITU-R BT.601 (same as embedding) - R = rgb_array[:, :, 0] - G = rgb_array[:, :, 1] - B = rgb_array[:, :, 2] - - Y = 0.299 * R + 0.587 * G + 0.114 * B - - return Y - - -def _pad_to_blocks(image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]: - """Pad image dimensions to be divisible by block size.""" - h, w = image.shape - new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE - new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE - - if new_h == h and new_w == w: - return image, (h, w) - - padded = np.zeros((new_h, new_w), dtype=image.dtype) - padded[:h, :w] = image - - if new_h > h: - padded[h:, :w] = image[h-(new_h-h):h, :w][::-1, :] - if new_w > w: - padded[:h, w:] = image[:h, w-(new_w-w):w][:, ::-1] - if new_h > h and new_w > w: - padded[h:, w:] = image[h-(new_h-h):h, w-(new_w-w):w][::-1, ::-1] - - return padded, (h, w) - - -def _unpad_image(image: np.ndarray, original_size: Tuple[int, int]) -> np.ndarray: - """Remove padding from image.""" - h, w = original_size - return image[:h, :w] - - -def _embed_bit_in_coeff(coef: float, bit: int, quant_step: int = QUANT_STEP) -> float: - """Embed a single bit into a DCT coefficient using QIM.""" - quantized = round(coef / quant_step) - if (quantized % 2) != bit: - if quantized % 2 == 0 and bit == 1: - quantized += 1 if coef >= quantized * quant_step else -1 - elif quantized % 2 == 1 and bit == 0: - quantized += 1 if coef >= quantized * quant_step else -1 - return quantized * quant_step - - -def _extract_bit_from_coeff(coef: float, quant_step: int = QUANT_STEP) -> int: - """Extract a single bit from a DCT coefficient.""" - quantized = round(coef / quant_step) - return quantized % 2 - - -def _generate_block_order(num_blocks: int, seed: bytes) -> list: - """Generate pseudo-random block order from seed.""" - hash_bytes = hashlib.sha256(seed).digest() - rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big')) - order = list(range(num_blocks)) - rng.shuffle(order) - return order - - -def _save_stego_image(image: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes: - """Save stego image in specified format (grayscale).""" - clipped = np.clip(image, 0, 255).astype(np.uint8) - img = Image.fromarray(clipped, mode='L') - - buffer = io.BytesIO() - - if output_format == OUTPUT_FORMAT_JPEG: - img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY, - subsampling=0, optimize=True) - else: - img.save(buffer, format='PNG', optimize=True) - - return buffer.getvalue() - - -def _save_color_image(rgb_array: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes: - """Save color RGB image in specified format.""" - clipped = np.clip(rgb_array, 0, 255).astype(np.uint8) - img = Image.fromarray(clipped, mode='RGB') - - buffer = io.BytesIO() - - if output_format == OUTPUT_FORMAT_JPEG: - img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY, - subsampling=0, optimize=True) - else: - img.save(buffer, format='PNG', optimize=True) - - return buffer.getvalue() - - -def _rgb_to_ycbcr(rgb: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """ - Convert RGB array to YCbCr components. - - Uses ITU-R BT.601 conversion (standard for JPEG). - - Args: - rgb: RGB image array (H, W, 3), float64 - - Returns: - Tuple of (Y, Cb, Cr) arrays - """ - R = rgb[:, :, 0] - G = rgb[:, :, 1] - B = rgb[:, :, 2] - - # ITU-R BT.601 conversion - Y = 0.299 * R + 0.587 * G + 0.114 * B - Cb = 128 - 0.168736 * R - 0.331264 * G + 0.5 * B - Cr = 128 + 0.5 * R - 0.418688 * G - 0.081312 * B - - return Y, Cb, Cr - - -def _ycbcr_to_rgb(Y: np.ndarray, Cb: np.ndarray, Cr: np.ndarray) -> np.ndarray: - """ - Convert YCbCr components back to RGB array. - - Args: - Y: Luminance channel - Cb: Blue-difference chroma - Cr: Red-difference chroma - - Returns: - RGB array (H, W, 3) - """ - R = Y + 1.402 * (Cr - 128) - G = Y - 0.344136 * (Cb - 128) - 0.714136 * (Cr - 128) - B = Y + 1.772 * (Cb - 128) - - rgb = np.stack([R, G, B], axis=-1) - return rgb - - -def _create_header(data_length: int, flags: int = 0) -> bytes: - """Create DCT stego header.""" - version = 1 - return struct.pack('>4sBBI', DCT_MAGIC, version, flags, data_length) - - -def _parse_header(header_bits: list) -> Tuple[int, int, int]: - """Parse header from extracted bits. Returns (version, flags, data_length).""" - if len(header_bits) < HEADER_SIZE * 8: - raise ValueError("Insufficient header data") - - header_bytes = bytes([ - sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8)) - for i in range(HEADER_SIZE) - ]) - - magic, version, flags, length = struct.unpack('>4sBBI', header_bytes) - - if magic != DCT_MAGIC: - raise ValueError("Invalid DCT stego magic bytes") - - return version, flags, length - - -# ============================================================================ -# JPEGIO HELPERS (for proper JPEG output) -# ============================================================================ - -def _jpegio_bytes_to_file(data: bytes, suffix: str = '.jpg') -> str: - """Write bytes to temp file for jpegio.""" - import tempfile - import os - fd, path = tempfile.mkstemp(suffix=suffix) - try: - os.write(fd, data) - finally: - os.close(fd) - return path - - -def _jpegio_file_to_bytes(path: str) -> bytes: - """Read file to bytes and delete it.""" - import os - try: - with open(path, 'rb') as f: - return f.read() - finally: - try: - os.unlink(path) - except OSError: - pass - - -def _jpegio_get_usable_positions(coef_array: np.ndarray) -> list: - """Get usable coefficient positions for jpegio embedding.""" - positions = [] - h, w = coef_array.shape - - for row in range(h): - for col in range(w): - # Skip DC coefficients - if (row % BLOCK_SIZE == 0) and (col % BLOCK_SIZE == 0): - continue - # Check magnitude - if abs(coef_array[row, col]) >= JPEGIO_MIN_COEF_MAGNITUDE: - positions.append((row, col)) - - return positions - - -def _jpegio_generate_order(num_positions: int, seed: bytes) -> list: - """Generate pseudo-random order for jpegio embedding.""" - hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest() - rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big')) - order = list(range(num_positions)) - rng.shuffle(order) - return order - - -def _jpegio_create_header(data_length: int, flags: int = 0) -> bytes: - """Create header for jpegio embedding.""" - return struct.pack('>4sBBI', JPEGIO_MAGIC, 1, flags, data_length) - - -def _jpegio_parse_header(header_bytes: bytes) -> Tuple[int, int, int]: - """Parse jpegio header.""" - if len(header_bytes) < HEADER_SIZE: - raise ValueError("Insufficient header data") - - magic, version, flags, length = struct.unpack('>4sBBI', header_bytes[:HEADER_SIZE]) - - if magic != JPEGIO_MAGIC: - raise ValueError(f"Invalid JPEG stego magic: {magic}") - - return version, flags, length - - -# ============================================================================ -# PUBLIC API -# ============================================================================ - -def calculate_dct_capacity(image_data: bytes) -> DCTCapacityInfo: - """ - Calculate the DCT embedding capacity of an image. - - Args: - image_data: Image file bytes - - Returns: - DCTCapacityInfo with capacity details - """ - _check_scipy() - - img = Image.open(io.BytesIO(image_data)) - width, height = img.size - - blocks_x = width // BLOCK_SIZE - blocks_y = height // BLOCK_SIZE - total_blocks = blocks_x * blocks_y - - bits_per_block = len(DEFAULT_EMBED_POSITIONS) - total_bits = total_blocks * bits_per_block - total_bytes = total_bits // 8 - usable_bytes = max(0, total_bytes - HEADER_SIZE) - - return DCTCapacityInfo( - width=width, - height=height, - blocks_x=blocks_x, - blocks_y=blocks_y, - total_blocks=total_blocks, - bits_per_block=bits_per_block, - total_capacity_bits=total_bits, - total_capacity_bytes=total_bytes, - usable_capacity_bytes=usable_bytes - ) - - -def will_fit_dct(data_length: int, image_data: bytes) -> bool: - """Check if data will fit in the image using DCT embedding.""" - capacity = calculate_dct_capacity(image_data) - return data_length <= capacity.usable_capacity_bytes - - -def estimate_capacity_comparison(image_data: bytes) -> dict: - """Compare LSB and DCT capacity for an image.""" - img = Image.open(io.BytesIO(image_data)) - width, height = img.size - pixels = width * height - - lsb_bytes = (pixels * 3) // 8 - - if HAS_SCIPY: - dct_info = calculate_dct_capacity(image_data) - dct_bytes = dct_info.usable_capacity_bytes - else: - blocks = (width // 8) * (height // 8) - dct_bytes = (blocks * 16) // 8 - HEADER_SIZE - - return { - 'width': width, - 'height': height, - 'lsb': { - 'capacity_bytes': lsb_bytes, - 'capacity_kb': lsb_bytes / 1024, - 'output': 'PNG/BMP (color)', - }, - 'dct': { - 'capacity_bytes': dct_bytes, - 'capacity_kb': dct_bytes / 1024, - 'output': 'PNG or JPEG (grayscale)', - 'ratio_vs_lsb': (dct_bytes / lsb_bytes * 100) if lsb_bytes > 0 else 0, - 'available': HAS_SCIPY, - }, - 'jpeg_native': { - 'available': HAS_JPEGIO, - 'note': 'Uses jpegio for proper JPEG coefficient embedding', - } - } - - -def embed_in_dct( - data: bytes, - carrier_image: bytes, - seed: bytes, - output_format: str = OUTPUT_FORMAT_PNG, - color_mode: str = 'color', # v3.0.1: 'color' or 'grayscale' -) -> Tuple[bytes, DCTEmbedStats]: - """ - Embed data into image using DCT coefficient modification. - - For PNG output: Uses scipy DCT transform - For JPEG output: Uses jpegio if available for proper coefficient embedding - - Args: - data: Data to embed - carrier_image: Carrier image bytes - seed: Seed for pseudo-random selection - output_format: 'png' (default, lossless) or 'jpeg' - color_mode: 'color' (preserve colors) or 'grayscale' (v3.0.1+) - - Returns: - Tuple of (stego_image_bytes, stats) - """ - # Validate output format - if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG): - raise ValueError(f"Invalid output format: {output_format}") - - # Validate color mode - if color_mode not in ('color', 'grayscale'): - color_mode = 'color' # Default to color - - # For JPEG output, try to use jpegio for proper coefficient embedding - # Note: jpegio naturally preserves color (works in YCbCr space) - if output_format == OUTPUT_FORMAT_JPEG: - if HAS_JPEGIO: - return _embed_jpegio(data, carrier_image, seed, color_mode) - else: - # Fall back to scipy + PIL JPEG (WARNING: may not decode properly) - import warnings - warnings.warn( - "jpegio not available. JPEG output may not decode correctly. " - "Install jpegio for proper JPEG steganography support.", - RuntimeWarning - ) - # Continue with scipy method but output as JPEG - - # PNG output or JPEG fallback: use scipy DCT method - _check_scipy() - return _embed_scipy_dct(data, carrier_image, seed, output_format, color_mode) - - -def _embed_scipy_dct( - data: bytes, - carrier_image: bytes, - seed: bytes, - output_format: str, - color_mode: str = 'color', -) -> Tuple[bytes, DCTEmbedStats]: - """Embed using scipy DCT (for PNG output), with color preservation option.""" - capacity_info = calculate_dct_capacity(carrier_image) - - if len(data) > capacity_info.usable_capacity_bytes: - raise ValueError( - f"Data too large ({len(data)} bytes) for carrier " - f"(capacity: {capacity_info.usable_capacity_bytes} bytes)" - ) - - # Load image - img = Image.open(io.BytesIO(carrier_image)) - width, height = img.size - - # Set flags for header - flags = FLAG_COLOR_MODE if color_mode == 'color' else 0 - - if color_mode == 'color' and img.mode in ('RGB', 'RGBA'): - # Color mode: convert to YCbCr, embed in Y only, preserve Cb/Cr - if img.mode == 'RGBA': - img = img.convert('RGB') - - rgb_array = np.array(img, dtype=np.float64) - Y, Cb, Cr = _rgb_to_ycbcr(rgb_array) - - # Pad Y channel - Y_padded, original_size = _pad_to_blocks(Y) - - # Embed in Y channel (with color flag) - Y_embedded = _embed_in_channel(Y_padded, data, seed, capacity_info, flags) - - # Unpad - Y_result = _unpad_image(Y_embedded, original_size) - - # Convert back to RGB - result_rgb = _ycbcr_to_rgb(Y_result, Cb, Cr) - - # Save as color image - stego_bytes = _save_color_image(result_rgb, output_format) - else: - # Grayscale mode: original behavior - image = _to_grayscale(carrier_image) - padded, original_size = _pad_to_blocks(image) - - embedded = _embed_in_channel(padded, data, seed, capacity_info, flags) - - result = _unpad_image(embedded, original_size) - stego_bytes = _save_stego_image(result, output_format) - - # Calculate stats - header = _create_header(len(data), flags) - payload = header + data - bits = len(payload) * 8 - - stats = DCTEmbedStats( - blocks_used=(bits + len(DEFAULT_EMBED_POSITIONS) - 1) // len(DEFAULT_EMBED_POSITIONS), - blocks_available=capacity_info.total_blocks, - bits_embedded=bits, - capacity_bits=capacity_info.total_capacity_bits, - usage_percent=(bits / capacity_info.total_capacity_bits) * 100, - image_width=width, - image_height=height, - output_format=output_format, - jpeg_native=False, - color_mode=color_mode, - ) - - return stego_bytes, stats - - -def _embed_in_channel( - channel: np.ndarray, - data: bytes, - seed: bytes, - capacity_info: DCTCapacityInfo, - flags: int = 0, -) -> np.ndarray: - """Embed data in a single channel using DCT.""" - header = _create_header(len(data), flags) - payload = header + data - - bits = [] - for byte in payload: - for i in range(7, -1, -1): - bits.append((byte >> i) & 1) - - num_blocks = capacity_info.total_blocks - block_order = _generate_block_order(num_blocks, seed) - - h, w = channel.shape - result = channel.copy() - - bit_idx = 0 - for block_num in block_order: - if bit_idx >= len(bits): - break - - by = (block_num // (w // BLOCK_SIZE)) * BLOCK_SIZE - bx = (block_num % (w // BLOCK_SIZE)) * BLOCK_SIZE - - block = result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE].copy() - dct_block = _dct2(block) - - for pos in DEFAULT_EMBED_POSITIONS: - if bit_idx >= len(bits): - break - dct_block[pos] = _embed_bit_in_coeff(dct_block[pos], bits[bit_idx]) - bit_idx += 1 - - modified_block = _idct2(dct_block) - result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] = modified_block - - return result - - -def _embed_jpegio( - data: bytes, - carrier_image: bytes, - seed: bytes, - color_mode: str = 'color', -) -> Tuple[bytes, DCTEmbedStats]: - """ - Embed using jpegio for proper JPEG coefficient modification. - - Note: jpegio naturally preserves color since JPEG stores YCbCr - and we only modify Y channel coefficients. - """ - import tempfile - import os - - # Check if carrier is JPEG - if not, convert it - img = Image.open(io.BytesIO(carrier_image)) - width, height = img.size - - if img.format != 'JPEG': - # Convert to JPEG first - buffer = io.BytesIO() - if img.mode != 'RGB': - img = img.convert('RGB') - img.save(buffer, format='JPEG', quality=95, subsampling=0) - carrier_image = buffer.getvalue() - - # Write carrier to temp file - input_path = _jpegio_bytes_to_file(carrier_image, suffix='.jpg') - output_path = tempfile.mktemp(suffix='.jpg') - - # Set flags - flags = FLAG_COLOR_MODE if color_mode == 'color' else 0 - - try: - # Read JPEG with jpegio - jpeg = jio.read(input_path) - - # Get Y channel coefficients (channel 0) - coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL] - - # Find usable positions - all_positions = _jpegio_get_usable_positions(coef_array) - - # Generate pseudo-random order - order = _jpegio_generate_order(len(all_positions), seed) - - # Create payload with flags - header = _jpegio_create_header(len(data), flags) - payload = header + data - - # Convert to bits - bits = [] - for byte in payload: - for i in range(7, -1, -1): - bits.append((byte >> i) & 1) - - if len(bits) > len(all_positions): - raise ValueError( - f"Payload too large: {len(bits)} bits, " - f"only {len(all_positions)} usable coefficients" - ) - - # Embed using LSB - coefs_used = 0 - for bit_idx, pos_idx in enumerate(order): - if bit_idx >= len(bits): - break - - row, col = all_positions[pos_idx] - coef = coef_array[row, col] - - # Embed bit in LSB - if (coef & 1) != bits[bit_idx]: - if coef > 0: - coef_array[row, col] = coef - 1 if (coef & 1) else coef + 1 - else: - coef_array[row, col] = coef + 1 if (coef & 1) else coef - 1 - - coefs_used += 1 - - # Write modified JPEG - jio.write(jpeg, output_path) - - # Read back as bytes - with open(output_path, 'rb') as f: - stego_bytes = f.read() - - stats = DCTEmbedStats( - blocks_used=coefs_used // 63, # Approximate blocks - blocks_available=len(all_positions) // 63, - bits_embedded=len(bits), - capacity_bits=len(all_positions), - usage_percent=(len(bits) / len(all_positions)) * 100 if all_positions else 0, - image_width=width, - image_height=height, - output_format=OUTPUT_FORMAT_JPEG, - jpeg_native=True, - color_mode=color_mode, # JPEG naturally preserves color - ) - - return stego_bytes, stats - - finally: - for path in [input_path, output_path]: - try: - os.unlink(path) - except OSError: - pass - - -def extract_from_dct( - stego_image: bytes, - seed: bytes, -) -> bytes: - """ - Extract data from DCT stego image. - - Automatically detects whether image uses scipy DCT or jpegio embedding, - and handles both grayscale and color modes. - - Args: - stego_image: Stego image bytes - seed: Same seed used for embedding - - Returns: - Extracted data bytes - """ - # Check image format - img = Image.open(io.BytesIO(stego_image)) - - if img.format == 'JPEG' and HAS_JPEGIO: - # Try jpegio extraction first - try: - return _extract_jpegio(stego_image, seed) - except ValueError: - # If jpegio magic not found, fall back to scipy method - pass - - # PNG or fallback: use scipy DCT method - _check_scipy() - return _extract_scipy_dct(stego_image, seed) - - -def _extract_scipy_dct(stego_image: bytes, seed: bytes) -> bytes: - """ - Extract using scipy DCT (for PNG images). - - v3.2.0: Now properly handles both grayscale and color modes by - first trying to detect the mode from header flags, then extracting - from the appropriate channel. - """ - # First, try extracting from grayscale to get header and detect mode - # This works because even color-mode images can be converted to grayscale - # and the Y channel ≈ grayscale for extraction purposes - - # Try Y channel extraction first (works for both color and grayscale) - img = Image.open(io.BytesIO(stego_image)) - - if img.mode in ('RGB', 'RGBA'): - # Extract from Y channel (more accurate for color-mode images) - channel = _extract_y_channel(stego_image) - else: - # Grayscale image - channel = _to_grayscale(stego_image) - - padded, original_size = _pad_to_blocks(channel) - - h, w = padded.shape - blocks_x = w // BLOCK_SIZE - blocks_y = h // BLOCK_SIZE - num_blocks = blocks_x * blocks_y - - block_order = _generate_block_order(num_blocks, seed) - - all_bits = [] - - for block_num in block_order: - by = (block_num // blocks_x) * BLOCK_SIZE - bx = (block_num % blocks_x) * BLOCK_SIZE - - block = padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] - dct_block = _dct2(block) - - for pos in DEFAULT_EMBED_POSITIONS: - bit = _extract_bit_from_coeff(dct_block[pos]) - all_bits.append(bit) - - if len(all_bits) >= HEADER_SIZE * 8: - try: - _, flags, data_length = _parse_header(all_bits[:HEADER_SIZE * 8]) - total_needed = (HEADER_SIZE + data_length) * 8 - if len(all_bits) >= total_needed: - break - except ValueError: - pass - - version, flags, data_length = _parse_header(all_bits) - - # Check if color mode flag is set (for informational purposes) - is_color_mode = bool(flags & FLAG_COLOR_MODE) - - data_bits = all_bits[HEADER_SIZE * 8:(HEADER_SIZE + data_length) * 8] - - data = bytes([ - sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8)) - for i in range(data_length) - ]) - - return data - - -def _extract_jpegio(stego_image: bytes, seed: bytes) -> bytes: - """Extract using jpegio for JPEG images.""" - import os - - temp_path = _jpegio_bytes_to_file(stego_image, suffix='.jpg') - - try: - jpeg = jio.read(temp_path) - coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL] - - all_positions = _jpegio_get_usable_positions(coef_array) - order = _jpegio_generate_order(len(all_positions), seed) - - # Extract header bits - header_bits = [] - for pos_idx in order[:HEADER_SIZE * 8]: - row, col = all_positions[pos_idx] - coef = coef_array[row, col] - header_bits.append(coef & 1) - - header_bytes = bytes([ - sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8)) - for i in range(HEADER_SIZE) - ]) - - version, flags, data_length = _jpegio_parse_header(header_bytes) - - # Extract all needed bits - total_bits_needed = (HEADER_SIZE + data_length) * 8 - - all_bits = [] - for bit_idx, pos_idx in enumerate(order): - if bit_idx >= total_bits_needed: - break - row, col = all_positions[pos_idx] - coef = coef_array[row, col] - all_bits.append(coef & 1) - - # Extract data - data_bits = all_bits[HEADER_SIZE * 8:] - - data = bytes([ - sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8)) - for i in range(data_length) - ]) - - return data - - finally: - try: - os.unlink(temp_path) - except OSError: - pass - - -# ============================================================================ -# CONVENIENCE FUNCTIONS -# ============================================================================ - -def get_output_extension(output_format: str) -> str: - """Get file extension for output format.""" - if output_format == OUTPUT_FORMAT_JPEG: - return '.jpg' - return '.png' - - -def get_output_mimetype(output_format: str) -> str: - """Get MIME type for output format.""" - if output_format == OUTPUT_FORMAT_JPEG: - return 'image/jpeg' - return 'image/png'