Lint cleanup: ruff fixes across entire codebase

- Strip trailing whitespace from all Python files
- Fix import sorting (I001) across all modules
- Convert Optional[X] to X | None syntax (UP045)
- Remove unused imports (F401)
- Convert lambda assignments to def functions (E731)
- Add TYPE_CHECKING import for forward references
- Update pyproject.toml ruff config:
  - Move select/ignore to [tool.ruff.lint] section
  - Add per-file ignores for DCT colorspace naming (N803/N806)
  - Add per-file ignores for __init__.py import structure (E402)
  - Exclude defunct test_routes.py
- Remove frontends/web/test_routes.py (defunct debug snippet)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-01-02 17:17:38 -05:00
parent d94ee7be90
commit 6b21190f97
36 changed files with 2275 additions and 2383 deletions

View File

@@ -21,53 +21,47 @@ NEW in v3.0: LSB and DCT embedding modes.
NEW in v3.0.1: DCT color mode and JPEG output format.
"""
import io
import sys
import base64
import sys
from pathlib import Path
from typing import Optional, Literal
from datetime import date
from typing import Literal
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Query
from fastapi.responses import Response, JSONResponse
from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel, Field
# Add parent to path for development
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'src'))
import stegasoo
from stegasoo import (
encode, decode, generate_credentials,
validate_image,
__version__,
StegasooError, DecryptionError, CapacityError,
has_argon2,
FilePayload,
MAX_FILE_PAYLOAD_SIZE,
# Embedding modes
EMBED_MODE_LSB,
EMBED_MODE_DCT,
EMBED_MODE_AUTO,
has_dct_support,
compare_modes,
will_fit_by_mode,
CapacityError,
DecryptionError,
FilePayload,
StegasooError,
__version__,
calculate_capacity_by_mode,
# Channel key functions (v4.0.0)
generate_channel_key,
get_channel_key,
set_channel_key,
clear_channel_key,
has_channel_key,
compare_modes,
decode,
encode,
generate_channel_key,
generate_credentials,
get_channel_status,
has_argon2,
has_channel_key,
has_dct_support,
set_channel_key,
validate_channel_key,
format_channel_key,
get_active_channel_key,
get_channel_fingerprint,
validate_image,
will_fit_by_mode,
)
from stegasoo.constants import (
MIN_PIN_LENGTH, MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS, MAX_PASSPHRASE_WORDS,
DEFAULT_PASSPHRASE_WORDS,
MAX_PASSPHRASE_WORDS,
MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
VALID_RSA_SIZES,
)
@@ -151,11 +145,11 @@ class GenerateRequest(BaseModel):
class GenerateResponse(BaseModel):
passphrase: str = Field(description="Single passphrase (v3.2.0: no daily rotation)")
pin: Optional[str] = None
rsa_key_pem: Optional[str] = None
pin: str | None = None
rsa_key_pem: str | None = None
entropy: dict[str, int]
# Legacy field for compatibility
phrases: Optional[dict[str, str]] = Field(
phrases: dict[str, str] | None = Field(
default=None,
description="Deprecated: Use 'passphrase' instead"
)
@@ -167,10 +161,10 @@ class EncodeRequest(BaseModel):
carrier_image_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: Optional[str] = None
rsa_password: Optional[str] = None
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: Optional[str] = Field(
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key"
)
@@ -192,15 +186,15 @@ class EncodeFileRequest(BaseModel):
"""Request for embedding a file (base64-encoded)."""
file_data_base64: str
filename: str
mime_type: Optional[str] = None
mime_type: str | None = None
reference_photo_base64: str
carrier_image_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: Optional[str] = None
rsa_password: Optional[str] = None
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: Optional[str] = Field(
channel_key: str | None = Field(
default=None,
description="Channel key for deployment isolation. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key"
)
@@ -236,16 +230,16 @@ class EncodeResponse(BaseModel):
default="public",
description="Channel mode: 'public' or 'private'"
)
channel_fingerprint: Optional[str] = Field(
channel_fingerprint: str | None = Field(
default=None,
description="Channel key fingerprint (if private mode)"
)
# Legacy fields (v3.2.0: no longer used in crypto)
date_used: Optional[str] = Field(
date_used: str | None = Field(
default=None,
description="Deprecated: Date no longer used in v3.2.0"
)
day_of_week: Optional[str] = Field(
day_of_week: str | None = Field(
default=None,
description="Deprecated: Date no longer used in v3.2.0"
)
@@ -256,10 +250,10 @@ class DecodeRequest(BaseModel):
reference_photo_base64: str
passphrase: str = Field(description="Passphrase (v3.2.0: renamed from day_phrase)")
pin: str = ""
rsa_key_base64: Optional[str] = None
rsa_password: Optional[str] = None
rsa_key_base64: str | None = None
rsa_password: str | None = None
# Channel key (v4.0.0)
channel_key: Optional[str] = Field(
channel_key: str | None = Field(
default=None,
description="Channel key for decryption. null=auto (use server config), ''=public mode, 'XXXX-...'=explicit key"
)
@@ -272,10 +266,10 @@ class DecodeRequest(BaseModel):
class DecodeResponse(BaseModel):
"""Response for decode - can be text or file."""
payload_type: str # 'text' or 'file'
message: Optional[str] = None # For text
file_data_base64: Optional[str] = None # For file (base64-encoded)
filename: Optional[str] = None # For file
mime_type: Optional[str] = None # For file
message: str | None = None # For text
file_data_base64: str | None = None # For file (base64-encoded)
filename: str | None = None # For file
mime_type: str | None = None # For file
class ModeCapacity(BaseModel):
@@ -292,7 +286,7 @@ class ImageInfoResponse(BaseModel):
pixels: int
capacity_bytes: int = Field(description="LSB mode capacity (for backwards compatibility)")
capacity_kb: int = Field(description="LSB mode capacity in KB")
modes: Optional[dict[str, ModeCapacity]] = Field(
modes: dict[str, ModeCapacity] | None = Field(
default=None,
description="Capacity by embedding mode (v3.0+)"
)
@@ -301,7 +295,7 @@ class ImageInfoResponse(BaseModel):
class CompareModesRequest(BaseModel):
"""Request for comparing embedding modes."""
carrier_image_base64: str
payload_size: Optional[int] = Field(
payload_size: int | None = Field(
default=None,
description="Optional payload size to check if it fits"
)
@@ -313,7 +307,7 @@ class CompareModesResponse(BaseModel):
height: int
lsb: dict
dct: dict
payload_check: Optional[dict] = None
payload_check: dict | None = None
recommendation: str
@@ -332,9 +326,9 @@ class ChannelStatusResponse(BaseModel):
"""Response for channel key status (v4.0.0)."""
mode: str = Field(description="'public' or 'private'")
configured: bool = Field(description="Whether a channel key is configured")
fingerprint: Optional[str] = Field(default=None, description="Key fingerprint (partial)")
source: Optional[str] = Field(default=None, description="Where the key comes from")
key: Optional[str] = Field(default=None, description="Full key (only if reveal=true)")
fingerprint: str | None = Field(default=None, description="Key fingerprint (partial)")
source: str | None = Field(default=None, description="Where the key comes from")
key: str | None = Field(default=None, description="Full key (only if reveal=true)")
class ChannelGenerateResponse(BaseModel):
@@ -342,7 +336,7 @@ class ChannelGenerateResponse(BaseModel):
key: str = Field(description="Generated channel key")
fingerprint: str = Field(description="Key fingerprint")
saved: bool = Field(default=False, description="Whether key was saved to config")
save_location: Optional[str] = Field(default=None, description="Where key was saved")
save_location: str | None = Field(default=None, description="Where key was saved")
class ChannelSetRequest(BaseModel):
@@ -356,7 +350,7 @@ class ModesResponse(BaseModel):
lsb: dict
dct: DctModeInfo
# Channel key status (v4.0.0)
channel: Optional[dict] = Field(
channel: dict | None = Field(
default=None,
description="Channel key status (v4.0.0)"
)
@@ -369,12 +363,12 @@ class StatusResponse(BaseModel):
has_dct: bool
max_payload_kb: int
available_modes: list[str]
dct_features: Optional[dict] = Field(
dct_features: dict | None = Field(
default=None,
description="DCT mode features (v3.0.1+)"
)
# Channel key status (v4.0.0)
channel: Optional[dict] = Field(
channel: dict | None = Field(
default=None,
description="Channel key status (v4.0.0)"
)
@@ -385,8 +379,8 @@ class StatusResponse(BaseModel):
class QrExtractResponse(BaseModel):
success: bool
key_pem: Optional[str] = None
error: Optional[str] = None
key_pem: str | None = None
error: str | None = None
class WillFitRequest(BaseModel):
@@ -408,14 +402,14 @@ class WillFitResponse(BaseModel):
class ErrorResponse(BaseModel):
error: str
detail: Optional[str] = None
detail: str | None = None
# ============================================================================
# HELPER: RESOLVE CHANNEL KEY
# ============================================================================
def _resolve_channel_key(channel_key: Optional[str]) -> Optional[str]:
def _resolve_channel_key(channel_key: str | None) -> str | None:
"""
Resolve channel key from API parameter.
@@ -443,13 +437,13 @@ def _resolve_channel_key(channel_key: Optional[str]) -> Optional[str]:
if not validate_channel_key(channel_key):
raise HTTPException(
400,
f"Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
"Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
)
return channel_key
def _get_channel_info(channel_key: Optional[str]) -> tuple[str, Optional[str]]:
def _get_channel_info(channel_key: str | None) -> tuple[str, str | None]:
"""
Get channel mode and fingerprint for response.
@@ -1112,10 +1106,10 @@ async def api_encode_multipart(
reference_photo: UploadFile = File(...),
carrier: UploadFile = File(...),
message: str = Form(""),
payload_file: Optional[UploadFile] = File(None),
payload_file: UploadFile | None = File(None),
pin: str = Form(""),
rsa_key: Optional[UploadFile] = File(None),
rsa_key_qr: Optional[UploadFile] = File(None),
rsa_key: UploadFile | None = File(None),
rsa_key_qr: UploadFile | None = File(None),
rsa_password: str = Form(""),
# Channel key (v4.0.0)
channel_key: str = Form("auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"),
@@ -1253,8 +1247,8 @@ async def api_decode_multipart(
reference_photo: UploadFile = File(...),
stego_image: UploadFile = File(...),
pin: str = Form(""),
rsa_key: Optional[UploadFile] = File(None),
rsa_key_qr: Optional[UploadFile] = File(None),
rsa_key: UploadFile | None = File(None),
rsa_key_qr: UploadFile | None = File(None),
rsa_password: str = Form(""),
# Channel key (v4.0.0)
channel_key: str = Form("auto", description="Channel key: 'auto'=server config, 'none'=public, 'XXXX-...'=explicit"),

View File

@@ -26,85 +26,59 @@ Usage:
import sys
from pathlib import Path
from typing import Optional
import click
# Add parent to path for development
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'src'))
import stegasoo
from stegasoo import (
# Core operations
encode, decode,
# Credential generation
generate_credentials,
generate_passphrase,
generate_pin,
export_rsa_key_pem,
load_rsa_key,
# Validation
validate_image,
# Image utilities
get_image_info,
compare_capacity,
# Steganography functions
has_dct_support,
compare_modes,
will_fit_by_mode,
# Utilities
generate_filename,
# Version
__version__,
# Exceptions
StegasooError,
DecryptionError,
ExtractionError,
# Models
FilePayload,
# Exceptions
StegasooError,
# Utilities
__version__,
clear_channel_key,
compare_modes,
decode,
# Core operations
encode,
export_rsa_key_pem,
# Channel key functions (v4.0.0)
generate_channel_key,
get_channel_key,
set_channel_key,
clear_channel_key,
has_channel_key,
# Credential generation
generate_credentials,
get_channel_status,
# Validation
get_image_info,
has_channel_key,
has_dct_support,
load_rsa_key,
set_channel_key,
validate_channel_key,
format_channel_key,
get_active_channel_key,
get_channel_fingerprint,
will_fit_by_mode,
)
# Import constants - try main module first, then constants submodule
try:
from stegasoo import (
EMBED_MODE_LSB,
EMBED_MODE_DCT,
from stegasoo import ( # noqa: F401
EMBED_MODE_AUTO,
EMBED_MODE_DCT,
EMBED_MODE_LSB,
)
except ImportError:
from stegasoo.constants import (
EMBED_MODE_LSB,
EMBED_MODE_DCT,
EMBED_MODE_AUTO,
)
pass
# Import constants that may not be in main __init__
try:
from stegasoo.constants import (
DEFAULT_PASSPHRASE_WORDS,
DEFAULT_PIN_LENGTH,
MIN_PIN_LENGTH,
MAX_PIN_LENGTH,
MIN_PIN_LENGTH,
)
except ImportError:
# Fallback defaults if constants not available
@@ -122,17 +96,23 @@ except ImportError:
# QR Code utilities
try:
from stegasoo.qr_utils import (
from stegasoo.qr_utils import ( # noqa: F401
can_fit_in_qr,
extract_key_from_qr_file,
generate_qr_code,
has_qr_read, has_qr_write,
can_fit_in_qr, needs_compression,
has_qr_read,
has_qr_write,
needs_compression,
)
HAS_QR = True
except ImportError:
HAS_QR = False
has_qr_read = lambda: False
has_qr_write = lambda: False
def has_qr_read() -> bool:
return False
def has_qr_write() -> bool:
return False
# ============================================================================
@@ -179,8 +159,8 @@ def cli():
# CHANNEL KEY HELPERS
# ============================================================================
def resolve_channel_key_option(channel: Optional[str], channel_file: Optional[str],
no_channel: bool) -> Optional[str]:
def resolve_channel_key_option(channel: str | None, channel_file: str | None,
no_channel: bool) -> str | None:
"""
Resolve channel key from CLI options.
@@ -208,8 +188,8 @@ def resolve_channel_key_option(channel: Optional[str], channel_file: Optional[st
# Explicit key provided
if not validate_channel_key(channel):
raise click.ClickException(
f"Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX\n"
f"Generate a new key with: stegasoo channel generate"
"Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX\n"
"Generate a new key with: stegasoo channel generate"
)
return channel
@@ -217,7 +197,7 @@ def resolve_channel_key_option(channel: Optional[str], channel_file: Optional[st
return None
def format_channel_status_line(quiet: bool = False) -> Optional[str]:
def format_channel_status_line(quiet: bool = False) -> str | None:
"""Get a one-line status for channel key configuration."""
if quiet:
return None
@@ -338,14 +318,14 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json):
if creds.rsa_key_pem:
click.echo(f" RSA entropy: {creds.rsa_entropy} bits")
click.echo(f" Combined: {creds.total_entropy} bits")
click.secho(f" + photo entropy: 80-256 bits", dim=True)
click.secho(" + photo entropy: 80-256 bits", dim=True)
click.echo()
# Show channel key status
if has_channel_key():
status = get_channel_status()
click.secho("─── CHANNEL KEY ───", fg='magenta')
click.echo(f" Status: Private mode")
click.echo(" Status: Private mode")
click.echo(f" Fingerprint: {status['fingerprint']}")
click.secho(f" (configured via {status['source']})", dim=True)
click.echo()
@@ -547,16 +527,16 @@ def channel_set(key, key_file, project):
if not validate_channel_key(key):
raise click.ClickException(
f"Invalid channel key format.\n"
f"Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX\n"
f"Generate a new key with: stegasoo channel generate"
"Invalid channel key format.\n"
"Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX\n"
"Generate a new key with: stegasoo channel generate"
)
location = 'project' if project else 'user'
set_channel_key(key, location=location)
status = get_channel_status()
click.secho(f"✓ Channel key saved", fg='green')
click.secho("✓ Channel key saved", fg='green')
click.echo(f" Location: {status['source']}")
click.echo(f" Fingerprint: {status['fingerprint']}")
@@ -759,7 +739,7 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, passphrase, pin,
suggestion = ""
if alt_mode == 'lsb' and alt_check['fits']:
suggestion = f"\n Tip: Payload would fit in LSB mode (--mode lsb)"
suggestion = "\n Tip: Payload would fit in LSB mode (--mode lsb)"
raise click.ClickException(
f"Payload too large for {embed_mode.upper()} mode.\n"
@@ -810,7 +790,7 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, passphrase, pin,
out_path.write_bytes(result.stego_image)
if not quiet:
click.secho(f"✓ Encoded successfully!", fg='green')
click.secho("✓ Encoded successfully!", fg='green')
click.echo(f" Output: {out_path}")
click.echo(f" Size: {len(result.stego_image):,} bytes")
click.echo(f" Capacity used: {result.capacity_percent:.1f}%")
@@ -1240,7 +1220,7 @@ def compare(image, payload, size, as_json):
click.secho(" ┌─── LSB Mode ───", fg='green')
click.echo(f" │ Capacity: {comparison['lsb']['capacity_bytes']:,} bytes ({comparison['lsb']['capacity_kb']:.1f} KB)")
click.echo(f" │ Output: {comparison['lsb']['output']}")
click.echo(f" │ Status: ✓ Available")
click.echo(" │ Status: ✓ Available")
click.echo("")
# DCT mode
@@ -1248,11 +1228,11 @@ def compare(image, payload, size, as_json):
click.echo(f" │ Capacity: {comparison['dct']['capacity_bytes']:,} bytes ({comparison['dct']['capacity_kb']:.1f} KB)")
click.echo(f" │ Ratio: {comparison['dct']['ratio_vs_lsb']:.1f}% of LSB capacity")
if comparison['dct']['available']:
click.echo(f" │ Status: ✓ Available")
click.echo(f" │ Formats: PNG (lossless), JPEG (smaller)")
click.echo(f" │ Colors: Grayscale (default), Color")
click.echo(" │ Status: ✓ Available")
click.echo(" │ Formats: PNG (lossless), JPEG (smaller)")
click.echo(" │ Colors: Grayscale (default), Color")
else:
click.secho(f" │ Status: ✗ Requires scipy (pip install scipy)", fg='yellow')
click.secho(" │ Status: ✗ Requires scipy (pip install scipy)", fg='yellow')
click.echo("")
# Payload check
@@ -1268,9 +1248,9 @@ def compare(image, payload, size, as_json):
lsb_color = 'green' if fits_lsb else 'red'
dct_color = 'green' if fits_dct else 'red'
click.echo(f" │ LSB mode: ", nl=False)
click.echo(" │ LSB mode: ", nl=False)
click.secho(f"{lsb_icon} {'Fits' if fits_lsb else 'Too large'}", fg=lsb_color)
click.echo(f" │ DCT mode: ", nl=False)
click.echo(" │ DCT mode: ", nl=False)
click.secho(f"{dct_icon} {'Fits' if fits_dct else 'Too large'}", fg=dct_color)
click.echo("")

View File

@@ -22,20 +22,16 @@ NEW in v3.0.1: DCT output format selection (PNG or JPEG) and color mode (graysca
"""
import io
import mimetypes
import os
import secrets
import sys
import time
import secrets
import mimetypes
from pathlib import Path
from datetime import datetime
from flask import Flask, flash, jsonify, redirect, render_template, request, send_file, url_for
from PIL import Image
from flask import (
Flask, render_template, request, send_file,
jsonify, flash, redirect, url_for
)
import os
os.environ['NUMPY_MADVISE_HUGEPAGE'] = '0'
os.environ['OMP_NUM_THREADS'] = '1'
@@ -44,75 +40,76 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'src'))
import stegasoo
from stegasoo import (
generate_credentials,
export_rsa_key_pem, load_rsa_key,
validate_pin, validate_message, validate_image,
validate_rsa_key, validate_security_factors,
validate_file_payload, validate_passphrase,
generate_filename,
StegasooError, DecryptionError, CapacityError,
has_argon2,
CapacityError,
DecryptionError,
FilePayload,
# Embedding modes
EMBED_MODE_LSB,
EMBED_MODE_DCT,
EMBED_MODE_AUTO,
has_dct_support,
# Channel key functions (v4.0.0)
has_channel_key,
StegasooError,
export_rsa_key_pem,
generate_credentials,
generate_filename,
get_channel_status,
has_argon2,
# Channel key functions (v4.0.0)
has_dct_support,
load_rsa_key,
validate_channel_key,
generate_channel_key,
# NOTE: encode, decode, compare_modes, will_fit_by_mode now use subprocess isolation
validate_file_payload,
validate_image,
validate_message,
validate_passphrase,
validate_pin,
validate_rsa_key,
validate_security_factors,
)
from stegasoo.constants import (
__version__,
MAX_MESSAGE_SIZE, MAX_MESSAGE_CHARS,
MIN_PIN_LENGTH, MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS, RECOMMENDED_PASSPHRASE_WORDS,
DEFAULT_PASSPHRASE_WORDS,
VALID_RSA_SIZES, MAX_FILE_SIZE,
MAX_FILE_PAYLOAD_SIZE, MAX_UPLOAD_SIZE,
TEMP_FILE_EXPIRY, TEMP_FILE_EXPIRY_MINUTES,
THUMBNAIL_SIZE, THUMBNAIL_QUALITY,
MAX_FILE_PAYLOAD_SIZE,
MAX_FILE_SIZE,
MAX_MESSAGE_CHARS,
MAX_PIN_LENGTH,
MAX_UPLOAD_SIZE,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
RECOMMENDED_PASSPHRASE_WORDS,
TEMP_FILE_EXPIRY,
TEMP_FILE_EXPIRY_MINUTES,
THUMBNAIL_QUALITY,
THUMBNAIL_SIZE,
VALID_RSA_SIZES,
__version__,
)
# QR Code support
try:
import qrcode
from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_M
import qrcode # noqa: F401
from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_M # noqa: F401
HAS_QRCODE = True
except ImportError:
HAS_QRCODE = False
# QR Code reading
try:
from pyzbar.pyzbar import decode as pyzbar_decode
from pyzbar.pyzbar import decode as pyzbar_decode # noqa: F401
HAS_QRCODE_READ = True
except ImportError:
HAS_QRCODE_READ = False
import zlib
import base64
# Import QR utilities
from stegasoo.qr_utils import (
compress_data, decompress_data, auto_decompress,
is_compressed, can_fit_in_qr, needs_compression,
generate_qr_code, read_qr_code, extract_key_from_qr,
detect_and_crop_qr,
has_qr_write, has_qr_read,
QR_MAX_BINARY, COMPRESSION_PREFIX
)
# ============================================================================
# SUBPROCESS ISOLATION FOR STEGASOO OPERATIONS
# ============================================================================
# Runs encode/decode/compare in subprocesses to prevent jpegio/scipy crashes
# from taking down the Flask server.
from subprocess_stego import SubprocessStego
from stegasoo.qr_utils import (
can_fit_in_qr,
detect_and_crop_qr,
extract_key_from_qr,
generate_qr_code,
)
# Initialize subprocess wrapper (worker script must be in same directory)
subprocess_stego = SubprocessStego(timeout=180) # 3 minute timeout for large images

View File

@@ -17,9 +17,9 @@ Usage:
echo '{"operation": "encode", ...}' | python stego_worker.py
"""
import sys
import json
import base64
import json
import sys
import traceback
from pathlib import Path
@@ -53,7 +53,7 @@ def _get_channel_info(resolved_key):
Returns:
(mode, fingerprint) tuple
"""
from stegasoo import has_channel_key, get_channel_status
from stegasoo import get_channel_status, has_channel_key
if resolved_key == "":
return "public", None
@@ -73,7 +73,7 @@ def _get_channel_info(resolved_key):
def encode_operation(params: dict) -> dict:
"""Handle encode operation."""
from stegasoo import encode, FilePayload
from stegasoo import FilePayload, encode
# Decode base64 inputs
carrier_data = base64.b64decode(params['carrier_b64'])

View File

@@ -43,14 +43,13 @@ Usage:
result = stego.compare_modes(carrier_bytes)
"""
import json
import base64
import json
import subprocess
import sys
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Dict, Any, Union
from pathlib import Path
from typing import Any
# Default timeout for operations (seconds)
DEFAULT_TIMEOUT = 120
@@ -63,14 +62,14 @@ WORKER_SCRIPT = Path(__file__).parent / 'stego_worker.py'
class EncodeResult:
"""Result from encode operation."""
success: bool
stego_data: Optional[bytes] = None
filename: Optional[str] = None
stats: Optional[Dict[str, Any]] = None
stego_data: bytes | None = None
filename: str | None = None
stats: dict[str, Any] | None = None
# Channel info (v4.0.0)
channel_mode: Optional[str] = None
channel_fingerprint: Optional[str] = None
error: Optional[str] = None
error_type: Optional[str] = None
channel_mode: str | None = None
channel_fingerprint: str | None = None
error: str | None = None
error_type: str | None = None
@dataclass
@@ -78,12 +77,12 @@ class DecodeResult:
"""Result from decode operation."""
success: bool
is_file: bool = False
message: Optional[str] = None
file_data: Optional[bytes] = None
filename: Optional[str] = None
mime_type: Optional[str] = None
error: Optional[str] = None
error_type: Optional[str] = None
message: str | None = None
file_data: bytes | None = None
filename: str | None = None
mime_type: str | None = None
error: str | None = None
error_type: str | None = None
@dataclass
@@ -92,9 +91,9 @@ class CompareResult:
success: bool
width: int = 0
height: int = 0
lsb: Optional[Dict[str, Any]] = None
dct: Optional[Dict[str, Any]] = None
error: Optional[str] = None
lsb: dict[str, Any] | None = None
dct: dict[str, Any] | None = None
error: str | None = None
@dataclass
@@ -107,7 +106,7 @@ class CapacityResult:
usage_percent: float = 0.0
headroom: int = 0
mode: str = ""
error: Optional[str] = None
error: str | None = None
@dataclass
@@ -116,10 +115,10 @@ class ChannelStatusResult:
success: bool
mode: str = "public"
configured: bool = False
fingerprint: Optional[str] = None
source: Optional[str] = None
key: Optional[str] = None
error: Optional[str] = None
fingerprint: str | None = None
source: str | None = None
key: str | None = None
error: str | None = None
class SubprocessStego:
@@ -132,8 +131,8 @@ class SubprocessStego:
def __init__(
self,
worker_path: Optional[Path] = None,
python_executable: Optional[str] = None,
worker_path: Path | None = None,
python_executable: str | None = None,
timeout: int = DEFAULT_TIMEOUT,
):
"""
@@ -151,7 +150,7 @@ class SubprocessStego:
if not self.worker_path.exists():
raise FileNotFoundError(f"Worker script not found: {self.worker_path}")
def _run_worker(self, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[str, Any]:
def _run_worker(self, params: dict[str, Any], timeout: int | None = None) -> dict[str, Any]:
"""
Run the worker subprocess with given parameters.
@@ -215,20 +214,20 @@ class SubprocessStego:
self,
carrier_data: bytes,
reference_data: bytes,
message: Optional[str] = None,
file_data: Optional[bytes] = None,
file_name: Optional[str] = None,
file_mime: Optional[str] = None,
message: str | None = None,
file_data: bytes | None = None,
file_name: str | None = None,
file_mime: str | None = None,
passphrase: str = "",
pin: Optional[str] = None,
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
pin: str | None = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "lsb",
dct_output_format: str = "png",
dct_color_mode: str = "color",
# Channel key (v4.0.0)
channel_key: Optional[str] = "auto",
timeout: Optional[int] = None,
channel_key: str | None = "auto",
timeout: int | None = None,
) -> EncodeResult:
"""
Encode a message or file into an image.
@@ -298,13 +297,13 @@ class SubprocessStego:
stego_data: bytes,
reference_data: bytes,
passphrase: str = "",
pin: Optional[str] = None,
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
pin: str | None = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = "auto",
# Channel key (v4.0.0)
channel_key: Optional[str] = "auto",
timeout: Optional[int] = None,
channel_key: str | None = "auto",
timeout: int | None = None,
) -> DecodeResult:
"""
Decode a message or file from a stego image.
@@ -364,7 +363,7 @@ class SubprocessStego:
def compare_modes(
self,
carrier_data: bytes,
timeout: Optional[int] = None,
timeout: int | None = None,
) -> CompareResult:
"""
Compare LSB and DCT capacity for a carrier image.
@@ -403,7 +402,7 @@ class SubprocessStego:
carrier_data: bytes,
payload_size: int,
embed_mode: str = "lsb",
timeout: Optional[int] = None,
timeout: int | None = None,
) -> CapacityResult:
"""
Check if a payload will fit in the carrier.
@@ -446,7 +445,7 @@ class SubprocessStego:
def get_channel_status(
self,
reveal: bool = False,
timeout: Optional[int] = None,
timeout: int | None = None,
) -> ChannelStatusResult:
"""
Get current channel key status (v4.0.0).
@@ -483,7 +482,7 @@ class SubprocessStego:
# Convenience function for quick usage
_default_stego: Optional[SubprocessStego] = None
_default_stego: SubprocessStego | None = None
def get_subprocess_stego() -> SubprocessStego:

View File

@@ -1,90 +0,0 @@
"""
Minimal test to isolate the memory corruption crash.
Add this route to your app.py temporarily to test if the crash
is in Flask/Pillow or in stegasoo code.
Usage:
1. Add this code to app.py
2. Restart the server
3. Use the /test-capacity endpoint instead of /api/compare-capacity
4. If it crashes: Flask or Pillow issue
5. If it works: Stegasoo code issue
"""
# Add these imports at the top of app.py if not present:
# from PIL import Image
# import io
# Add this route to app.py:
@app.route('/test-capacity', methods=['POST'])
def test_capacity():
"""
Minimal capacity test - no stegasoo code, just PIL.
"""
carrier = request.files.get('carrier')
if not carrier:
return jsonify({'error': 'No carrier image provided'}), 400
try:
# Read the file data
carrier_data = carrier.read()
# Method 1: Just get size from PIL
buffer = io.BytesIO(carrier_data)
img = Image.open(buffer)
width, height = img.size
fmt = img.format
mode = img.mode
img.close()
buffer.close()
# Simple capacity calculation (no scipy, no numpy)
pixels = width * height
lsb_bytes = (pixels * 3) // 8
blocks = (width // 8) * (height // 8)
dct_bytes = (blocks * 16) // 8 - 10
return jsonify({
'success': True,
'width': width,
'height': height,
'format': fmt,
'mode': mode,
'lsb': {
'capacity_bytes': lsb_bytes,
'capacity_kb': round(lsb_bytes / 1024, 1),
},
'dct': {
'capacity_bytes': dct_bytes,
'capacity_kb': round(dct_bytes / 1024, 1),
}
})
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
# Alternative: completely bypass PIL too
@app.route('/test-capacity-nopil', methods=['POST'])
def test_capacity_nopil():
"""
Ultra-minimal test - no PIL, no stegasoo.
"""
carrier = request.files.get('carrier')
if not carrier:
return jsonify({'error': 'No carrier image provided'}), 400
try:
carrier_data = carrier.read()
# Just return size info, no image processing at all
return jsonify({
'success': True,
'data_size': len(carrier_data),
'first_bytes': carrier_data[:20].hex() if len(carrier_data) >= 20 else carrier_data.hex(),
})
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500

View File

@@ -114,9 +114,18 @@ target-version = ["py310", "py311", "py312"]
[tool.ruff]
line-length = 100
exclude = ["frontends/web/test_routes.py"] # Debug snippet, not a real module
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]
ignore = ["E501"]
[tool.ruff.lint.per-file-ignores]
# YCbCr colorspace variables (R, G, B, Y, Cb, Cr) are standard names
"src/stegasoo/dct_steganography.py" = ["N803", "N806"]
# Package __init__.py has imports after try/except and aliases - intentional structure
"src/stegasoo/__init__.py" = ["E402"]
[tool.mypy]
python_version = "3.10"
warn_return_any = true

View File

@@ -10,56 +10,55 @@ Changes in v4.0.0:
__version__ = "4.0.1"
# Core functionality
from .encode import encode
# Channel key management (v4.0.0)
from .channel import (
clear_channel_key,
format_channel_key,
generate_channel_key,
get_channel_key,
get_channel_status,
has_channel_key,
set_channel_key,
validate_channel_key,
)
# Crypto functions
from .crypto import get_active_channel_key, get_channel_fingerprint, has_argon2
from .decode import decode, decode_file, decode_text
from .encode import encode
# Credential generation
from .generate import (
generate_pin,
generate_passphrase,
generate_rsa_key,
generate_credentials,
export_rsa_key_pem,
generate_credentials,
generate_passphrase,
generate_pin,
generate_rsa_key,
load_rsa_key,
)
# Image utilities
from .image_utils import (
get_image_info,
compare_capacity,
get_image_info,
)
# Steganography functions
from .steganography import (
compare_modes,
has_dct_support,
will_fit_by_mode,
)
# Utilities
from .utils import generate_filename
# Crypto functions
from .crypto import has_argon2, get_active_channel_key, get_channel_fingerprint
# Channel key management (v4.0.0)
from .channel import (
generate_channel_key,
get_channel_key,
set_channel_key,
clear_channel_key,
has_channel_key,
get_channel_status,
validate_channel_key,
format_channel_key,
)
# Steganography functions
from .steganography import (
has_dct_support,
compare_modes,
will_fit_by_mode,
)
# QR Code utilities - optional, may not be available
try:
from .qr_utils import (
generate_qr_code,
extract_key_from_qr,
detect_and_crop_qr,
extract_key_from_qr,
generate_qr_code,
)
HAS_QR_UTILS = True
except ImportError:
@@ -70,12 +69,12 @@ except ImportError:
# Validation
from .validation import (
validate_file_payload,
validate_image,
validate_message,
validate_passphrase,
validate_pin,
validate_rsa_key,
validate_message,
validate_file_payload,
validate_image,
validate_security_factors,
)
@@ -84,62 +83,61 @@ validate_reference_photo = validate_image
validate_carrier = validate_image
# Additional validators
from .validation import (
validate_embed_mode,
validate_dct_output_format,
validate_dct_color_mode,
)
# Models
from .models import (
ImageInfo,
CapacityComparison,
GenerateResult,
EncodeResult,
DecodeResult,
FilePayload,
Credentials,
ValidationResult,
# Constants
from .constants import (
DEFAULT_PASSPHRASE_WORDS,
EMBED_MODE_AUTO,
EMBED_MODE_DCT,
EMBED_MODE_LSB,
FORMAT_VERSION,
LOSSLESS_FORMATS,
MAX_IMAGE_PIXELS,
MAX_MESSAGE_SIZE,
MAX_PASSPHRASE_WORDS,
MAX_PIN_LENGTH,
MIN_IMAGE_PIXELS,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
RECOMMENDED_PASSPHRASE_WORDS,
)
# Exceptions
from .exceptions import (
StegasooError,
ValidationError,
PinValidationError,
MessageValidationError,
ImageValidationError,
KeyValidationError,
SecurityFactorError,
CapacityError,
CryptoError,
EncryptionError,
DecryptionError,
EmbeddingError,
EncryptionError,
ExtractionError,
ImageValidationError,
InvalidHeaderError,
KeyDerivationError,
KeyGenerationError,
KeyPasswordError,
KeyValidationError,
MessageValidationError,
PinValidationError,
SecurityFactorError,
SteganographyError,
CapacityError,
ExtractionError,
EmbeddingError,
InvalidHeaderError,
StegasooError,
ValidationError,
)
# Constants
from .constants import (
FORMAT_VERSION,
MIN_PASSPHRASE_WORDS,
RECOMMENDED_PASSPHRASE_WORDS,
DEFAULT_PASSPHRASE_WORDS,
MAX_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
MAX_PIN_LENGTH,
MAX_MESSAGE_SIZE,
MIN_IMAGE_PIXELS,
MAX_IMAGE_PIXELS,
LOSSLESS_FORMATS,
EMBED_MODE_LSB,
EMBED_MODE_DCT,
EMBED_MODE_AUTO,
# Models
from .models import (
CapacityComparison,
Credentials,
DecodeResult,
EncodeResult,
FilePayload,
GenerateResult,
ImageInfo,
ValidationResult,
)
from .validation import (
validate_dct_color_mode,
validate_dct_output_format,
validate_embed_mode,
)
# Aliases for backward compatibility

View File

@@ -9,15 +9,14 @@ Changes in v3.2.0:
- Updated all credential handling to use v3.2.0 API
"""
import os
import json
import time
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional, Callable, Iterator
from enum import Enum
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
from collections.abc import Callable, Iterator
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from .constants import ALLOWED_IMAGE_EXTENSIONS, LOSSLESS_FORMATS
@@ -35,17 +34,17 @@ class BatchStatus(Enum):
class BatchItem:
"""Represents a single item in a batch operation."""
input_path: Path
output_path: Optional[Path] = None
output_path: Path | None = None
status: BatchStatus = BatchStatus.PENDING
error: Optional[str] = None
start_time: Optional[float] = None
end_time: Optional[float] = None
error: str | None = None
start_time: float | None = None
end_time: float | None = None
input_size: int = 0
output_size: int = 0
message: str = ""
@property
def duration(self) -> Optional[float]:
def duration(self) -> float | None:
"""Processing duration in seconds."""
if self.start_time and self.end_time:
return self.end_time - self.start_time
@@ -88,8 +87,8 @@ class BatchCredentials:
reference_photo: bytes
passphrase: str # v3.2.0: renamed from day_phrase
pin: str = ""
rsa_key_data: Optional[bytes] = None
rsa_password: Optional[str] = None
rsa_key_data: bytes | None = None
rsa_password: str | None = None
def to_dict(self) -> dict:
"""Convert to dictionary for API compatibility."""
@@ -129,11 +128,11 @@ class BatchResult:
failed: int = 0
skipped: int = 0
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
end_time: float | None = None
items: list[BatchItem] = field(default_factory=list)
@property
def duration(self) -> Optional[float]:
def duration(self) -> float | None:
"""Total batch duration in seconds."""
if self.end_time:
return self.end_time - self.start_time
@@ -265,14 +264,14 @@ class BatchProcessor:
def batch_encode(
self,
images: list[str | Path],
message: Optional[str] = None,
file_payload: Optional[Path] = None,
output_dir: Optional[Path] = None,
message: str | None = None,
file_payload: Path | None = None,
output_dir: Path | None = None,
output_suffix: str = "_encoded",
credentials: dict | BatchCredentials | None = None,
compress: bool = True,
recursive: bool = False,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
encode_func: Callable = None,
) -> BatchResult:
"""
@@ -360,10 +359,10 @@ class BatchProcessor:
def batch_decode(
self,
images: list[str | Path],
output_dir: Optional[Path] = None,
output_dir: Path | None = None,
credentials: dict | BatchCredentials | None = None,
recursive: bool = False,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
decode_func: Callable = None,
) -> BatchResult:
"""
@@ -436,7 +435,7 @@ class BatchProcessor:
self,
result: BatchResult,
process_func: Callable[[BatchItem], BatchItem],
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
"""Execute batch processing with thread pool."""
completed = 0
@@ -467,8 +466,8 @@ class BatchProcessor:
def _do_encode(
self,
item: BatchItem,
message: Optional[str],
file_payload: Optional[Path],
message: str | None,
file_payload: Path | None,
creds: BatchCredentials,
compress: bool
) -> None:
@@ -478,7 +477,7 @@ class BatchProcessor:
Override this method to customize encoding behavior.
"""
try:
from .encode import encode, encode_file
from .encode import encode
from .models import FilePayload
# Read carrier image
@@ -590,6 +589,7 @@ def batch_capacity_check(
List of dicts with path, dimensions, and estimated capacity
"""
from PIL import Image
from .constants import MAX_IMAGE_PIXELS
processor = BatchProcessor()
@@ -628,7 +628,7 @@ def batch_capacity_check(
def _get_image_warnings(img, path: Path) -> list[str]:
"""Generate warnings for an image."""
from .constants import MAX_IMAGE_PIXELS, LOSSLESS_FORMATS
from .constants import LOSSLESS_FORMATS, MAX_IMAGE_PIXELS
warnings = []

View File

@@ -24,12 +24,11 @@ INTEGRATION STATUS (v4.0.0):
- ✅ Helpful error messages for channel key mismatches
"""
import os
import secrets
import hashlib
import os
import re
import secrets
from pathlib import Path
from typing import Optional, List
from .debug import debug
@@ -129,7 +128,7 @@ def validate_channel_key(key: str) -> bool:
return False
def get_channel_key() -> Optional[str]:
def get_channel_key() -> str | None:
"""
Get the current channel key from environment or config.
@@ -165,7 +164,7 @@ def get_channel_key() -> Optional[str]:
if key and validate_channel_key(key):
debug.print(f"Channel key from {config_path}: {get_channel_fingerprint(key)}")
return format_channel_key(key)
except (IOError, PermissionError) as e:
except (OSError, PermissionError) as e:
debug.print(f"Could not read {config_path}: {e}")
continue
@@ -216,7 +215,7 @@ def set_channel_key(key: str, location: str = 'project') -> Path:
return config_path
def clear_channel_key(location: str = 'all') -> List[Path]:
def clear_channel_key(location: str = 'all') -> list[Path]:
"""
Remove channel key configuration.
@@ -244,13 +243,13 @@ def clear_channel_key(location: str = 'all') -> List[Path]:
path.unlink()
deleted.append(path)
debug.print(f"Removed channel key: {path}")
except (IOError, PermissionError) as e:
except (OSError, PermissionError) as e:
debug.print(f"Could not remove {path}: {e}")
return deleted
def get_channel_key_hash(key: Optional[str] = None) -> Optional[bytes]:
def get_channel_key_hash(key: str | None = None) -> bytes | None:
"""
Get the channel key as a 32-byte hash suitable for key derivation.
@@ -279,7 +278,7 @@ def get_channel_key_hash(key: Optional[str] = None) -> Optional[bytes]:
return hashlib.sha256(formatted.encode('utf-8')).digest()
def get_channel_fingerprint(key: Optional[str] = None) -> Optional[str]:
def get_channel_fingerprint(key: str | None = None) -> str | None:
"""
Get a short fingerprint for display purposes.
Shows first and last 4 chars with masked middle.
@@ -341,7 +340,7 @@ def get_channel_status() -> dict:
if file_key and format_channel_key(file_key) == key:
source = str(config_path)
break
except (IOError, PermissionError):
except (OSError, PermissionError):
continue
return {
@@ -409,7 +408,7 @@ if __name__ == '__main__':
if cmd == 'generate':
key = generate_channel_key()
print(f"Generated channel key:")
print("Generated channel key:")
print(f" {key}")
print()
save = input("Save to config? [y/N]: ").strip().lower()

View File

@@ -8,33 +8,29 @@ Changes in v3.2.0:
- Updated help text to use 'passphrase' terminology
"""
import sys
import json
from pathlib import Path
from typing import Optional
import click
from .constants import (
__version__,
MAX_MESSAGE_SIZE,
MAX_FILE_PAYLOAD_SIZE,
DEFAULT_PIN_LENGTH,
DEFAULT_PASSPHRASE_WORDS, # v3.2.0: renamed from DEFAULT_PHRASE_WORDS
)
from .compression import (
CompressionAlgorithm,
get_available_algorithms,
algorithm_name,
HAS_LZ4,
)
from .batch import (
BatchProcessor,
BatchResult,
batch_capacity_check,
print_batch_result,
)
from .compression import (
HAS_LZ4,
CompressionAlgorithm,
algorithm_name,
get_available_algorithms,
)
from .constants import (
DEFAULT_PASSPHRASE_WORDS, # v3.2.0: renamed from DEFAULT_PHRASE_WORDS
DEFAULT_PIN_LENGTH,
MAX_FILE_PAYLOAD_SIZE,
MAX_MESSAGE_SIZE,
__version__,
)
# Click context settings
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@@ -426,12 +422,12 @@ def info(ctx):
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Stegasoo v{__version__}")
click.echo(f"\nCompression algorithms:")
click.echo("\nCompression algorithms:")
for algo in get_available_algorithms():
click.echo(f"{algorithm_name(algo)}")
if not HAS_LZ4:
click.echo(" (install 'lz4' for LZ4 support)")
click.echo(f"\nLimits:")
click.echo("\nLimits:")
click.echo(f" • Max message: {MAX_MESSAGE_SIZE:,} bytes")
click.echo(f" • Max file payload: {MAX_FILE_PAYLOAD_SIZE:,} bytes")

View File

@@ -5,10 +5,9 @@ Provides transparent compression/decompression for payloads before encryption.
Supports multiple algorithms with automatic detection on decompression.
"""
import zlib
import struct
import zlib
from enum import IntEnum
from typing import Optional
# Optional LZ4 support (faster, slightly worse ratio)
try:

View File

@@ -14,7 +14,6 @@ BREAKING CHANGES in v3.2.0:
- Renamed day_phrase → passphrase throughout codebase
"""
import os
from pathlib import Path
# ============================================================================
@@ -199,7 +198,7 @@ def get_bip39_words() -> list[str]:
"Please ensure bip39-words.txt is in the data directory."
)
with open(wordlist_path, 'r') as f:
with open(wordlist_path) as f:
return [line.strip() for line in f if line.strip()]

View File

@@ -15,38 +15,40 @@ BREAKING CHANGES in v3.2.0:
- Renamed day_phrase → passphrase (no daily rotation needed)
"""
import io
import hashlib
import io
import secrets
import struct
import json
from typing import Optional, Union
from PIL import Image
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from PIL import Image
from .constants import (
MAGIC_HEADER, FORMAT_VERSION,
SALT_SIZE, IV_SIZE, TAG_SIZE,
ARGON2_TIME_COST, ARGON2_MEMORY_COST, ARGON2_PARALLELISM,
PBKDF2_ITERATIONS,
PAYLOAD_TEXT, PAYLOAD_FILE,
ARGON2_MEMORY_COST,
ARGON2_PARALLELISM,
ARGON2_TIME_COST,
FORMAT_VERSION,
IV_SIZE,
MAGIC_HEADER,
MAX_FILENAME_LENGTH,
PAYLOAD_FILE,
PAYLOAD_TEXT,
PBKDF2_ITERATIONS,
SALT_SIZE,
TAG_SIZE,
)
from .models import FilePayload, DecodeResult
from .exceptions import (
EncryptionError, DecryptionError, KeyDerivationError, InvalidHeaderError
)
from .exceptions import DecryptionError, EncryptionError, InvalidHeaderError, KeyDerivationError
from .models import DecodeResult, FilePayload
# Check for Argon2 availability
try:
from argon2.low_level import hash_secret_raw, Type
from argon2.low_level import Type, hash_secret_raw
HAS_ARGON2 = True
except ImportError:
HAS_ARGON2 = False
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
# =============================================================================
@@ -57,7 +59,7 @@ except ImportError:
CHANNEL_KEY_AUTO = "auto"
def _resolve_channel_key(channel_key: Optional[Union[str, bool]]) -> Optional[bytes]:
def _resolve_channel_key(channel_key: str | bool | None) -> bytes | None:
"""
Resolve channel key parameter to actual key hash.
@@ -121,8 +123,8 @@ def derive_hybrid_key(
passphrase: str,
salt: bytes,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
rsa_key_data: bytes | None = None,
channel_key: str | bool | None = None,
) -> bytes:
"""
Derive encryption key from multiple factors.
@@ -206,8 +208,8 @@ def derive_pixel_key(
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
rsa_key_data: bytes | None = None,
channel_key: str | bool | None = None,
) -> bytes:
"""
Derive key for pseudo-random pixel selection.
@@ -247,7 +249,7 @@ def derive_pixel_key(
def _pack_payload(
content: Union[str, bytes, FilePayload],
content: str | bytes | FilePayload,
) -> tuple[bytes, int]:
"""
Pack payload with type marker and metadata.
@@ -359,12 +361,12 @@ FLAG_CHANNEL_KEY = 0x01 # Set if encoded with a channel key
def encrypt_message(
message: Union[str, bytes, FilePayload],
message: str | bytes | FilePayload,
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
rsa_key_data: bytes | None = None,
channel_key: str | bool | None = None,
) -> bytes:
"""
Encrypt message or file using AES-256-GCM with hybrid key derivation.
@@ -438,7 +440,7 @@ def encrypt_message(
raise EncryptionError(f"Encryption failed: {e}") from e
def parse_header(encrypted_data: bytes) -> Optional[dict]:
def parse_header(encrypted_data: bytes) -> dict | None:
"""
Parse the header from encrypted data.
@@ -488,8 +490,8 @@ def decrypt_message(
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
rsa_key_data: bytes | None = None,
channel_key: str | bool | None = None,
) -> DecodeResult:
"""
Decrypt message (v4.0.0 - with channel key support).
@@ -566,8 +568,8 @@ def decrypt_message_text(
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
rsa_key_data: bytes | None = None,
channel_key: str | bool | None = None,
) -> str:
"""
Decrypt message and return as text string.
@@ -613,7 +615,7 @@ def has_argon2() -> bool:
# CHANNEL KEY UTILITIES (exposed for convenience)
# =============================================================================
def get_active_channel_key() -> Optional[str]:
def get_active_channel_key() -> str | None:
"""
Get the currently configured channel key (if any).
@@ -624,7 +626,7 @@ def get_active_channel_key() -> Optional[str]:
return get_channel_key()
def get_channel_fingerprint(key: Optional[str] = None) -> Optional[str]:
def get_channel_fingerprint(key: str | None = None) -> str | None:
"""
Get a display-safe fingerprint of a channel key.

View File

@@ -14,12 +14,11 @@ v3.2.0-patch2 Changes:
Requires: scipy (for PNG mode), optionally jpegio (for JPEG mode)
"""
import gc
import hashlib
import io
import struct
import hashlib
import gc
from dataclasses import dataclass
from typing import Optional, Tuple
from enum import Enum
import numpy as np
@@ -206,7 +205,7 @@ def _extract_y_channel(image_data: bytes) -> np.ndarray:
return np.array(Y, dtype=np.float64, copy=True, order='C')
def _pad_to_blocks(image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
def _pad_to_blocks(image: np.ndarray) -> tuple[np.ndarray, tuple[int, int]]:
h, w = image.shape
new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
@@ -230,7 +229,7 @@ def _pad_to_blocks(image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
return padded, (h, w)
def _unpad_image(image: np.ndarray, original_size: Tuple[int, int]) -> np.ndarray:
def _unpad_image(image: np.ndarray, original_size: tuple[int, int]) -> np.ndarray:
h, w = original_size
return np.array(image[:h, :w], dtype=np.float64, copy=True, order='C')
@@ -282,7 +281,7 @@ def _save_color_image(rgb_array: np.ndarray, output_format: str = OUTPUT_FORMAT_
return buffer.getvalue()
def _rgb_to_ycbcr(rgb: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
def _rgb_to_ycbcr(rgb: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
R = rgb[:, :, 0].astype(np.float64)
G = rgb[:, :, 1].astype(np.float64)
B = rgb[:, :, 2].astype(np.float64)
@@ -310,7 +309,7 @@ def _create_header(data_length: int, flags: int = 0) -> bytes:
return struct.pack('>4sBBI', DCT_MAGIC, 1, flags, data_length)
def _parse_header(header_bits: list) -> Tuple[int, int, int]:
def _parse_header(header_bits: list) -> tuple[int, int, int]:
if len(header_bits) < HEADER_SIZE * 8:
raise ValueError("Insufficient header data")
@@ -332,8 +331,8 @@ def _parse_header(header_bits: list) -> Tuple[int, int, int]:
# ============================================================================
def _jpegio_bytes_to_file(data: bytes, suffix: str = '.jpg') -> str:
import tempfile
import os
import tempfile
fd, path = tempfile.mkstemp(suffix=suffix)
try:
os.write(fd, data)
@@ -366,7 +365,7 @@ def _jpegio_create_header(data_length: int, flags: int = 0) -> bytes:
return struct.pack('>4sBBI', JPEGIO_MAGIC, 1, flags, data_length)
def _jpegio_parse_header(header_bytes: bytes) -> Tuple[int, int, int]:
def _jpegio_parse_header(header_bytes: bytes) -> tuple[int, int, int]:
if len(header_bytes) < HEADER_SIZE:
raise ValueError("Insufficient header data")
magic, version, flags, length = struct.unpack('>4sBBI', header_bytes[:HEADER_SIZE])
@@ -455,7 +454,7 @@ def embed_in_dct(
seed: bytes,
output_format: str = OUTPUT_FORMAT_PNG,
color_mode: str = 'color',
) -> Tuple[bytes, DCTEmbedStats]:
) -> tuple[bytes, DCTEmbedStats]:
"""Embed data using DCT coefficient modification."""
if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG):
raise ValueError(f"Invalid output format: {output_format}")
@@ -476,7 +475,7 @@ def _embed_scipy_dct_safe(
seed: bytes,
output_format: str,
color_mode: str = 'color',
) -> Tuple[bytes, DCTEmbedStats]:
) -> tuple[bytes, DCTEmbedStats]:
"""
Embed using scipy DCT with safe memory handling.
@@ -688,10 +687,10 @@ def _embed_jpegio(
carrier_image: bytes,
seed: bytes,
color_mode: str = 'color',
) -> Tuple[bytes, DCTEmbedStats]:
) -> tuple[bytes, DCTEmbedStats]:
"""Embed using jpegio for proper JPEG coefficient modification."""
import tempfile
import os
import tempfile
# Normalize JPEG to avoid crashes with quality=100 images
carrier_image = _normalize_jpeg_for_jpegio(carrier_image)

View File

@@ -5,12 +5,13 @@ Debugging, logging, and performance monitoring tools.
Can be disabled for production use.
"""
import sys
import time
import traceback
from collections.abc import Callable
from datetime import datetime
from functools import wraps
from typing import Callable, Any, Optional, Dict, Union
import sys
from typing import Any
# Global debug configuration
DEBUG_ENABLED = False # Set to True to enable debug output
@@ -89,11 +90,12 @@ def validate_assertion(condition: bool, message: str) -> None:
raise AssertionError(f"Validation failed: {message}")
def memory_usage() -> Dict[str, Union[float, str]]:
def memory_usage() -> dict[str, float | str]:
"""Get current memory usage (if psutil is available)."""
try:
import psutil
import os
import psutil
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
@@ -153,7 +155,7 @@ class Debug:
"""Runtime validation assertion."""
validate_assertion(condition, message)
def memory(self) -> Dict[str, Union[float, str]]:
def memory(self) -> dict[str, float | str]:
"""Get current memory usage."""
return memory_usage()

View File

@@ -8,21 +8,20 @@ Changes in v4.0.0:
- Improved error messages for channel key mismatches
"""
from typing import Optional, Union
from pathlib import Path
from .models import DecodeInput, DecodeResult
from .constants import EMBED_MODE_AUTO
from .crypto import decrypt_message
from .debug import debug
from .exceptions import DecryptionError, ExtractionError
from .models import DecodeResult
from .steganography import extract_from_image
from .validation import (
require_valid_image,
require_security_factors,
require_valid_image,
require_valid_pin,
require_valid_rsa_key,
)
from .constants import EMBED_MODE_AUTO
from .exceptions import ExtractionError, DecryptionError
from .debug import debug
def decode(
@@ -30,10 +29,10 @@ def decode(
reference_photo: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = EMBED_MODE_AUTO,
channel_key: Optional[Union[str, bool]] = None,
channel_key: str | bool | None = None,
) -> DecodeResult:
"""
Decode a message or file from a stego image.
@@ -122,12 +121,12 @@ def decode_file(
stego_image: bytes,
reference_photo: bytes,
passphrase: str,
output_path: Optional[Path] = None,
output_path: Path | None = None,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = EMBED_MODE_AUTO,
channel_key: Optional[Union[str, bool]] = None,
channel_key: str | bool | None = None,
) -> Path:
"""
Decode a file from a stego image and save it.
@@ -182,10 +181,10 @@ def decode_text(
reference_photo: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
embed_mode: str = EMBED_MODE_AUTO,
channel_key: Optional[Union[str, bool]] = None,
channel_key: str | bool | None = None,
) -> str:
"""
Decode a text message from a stego image.

View File

@@ -7,37 +7,36 @@ Changes in v4.0.0:
- Added channel_key parameter for deployment/group isolation
"""
from typing import Optional, Union
from pathlib import Path
from .models import EncodeInput, EncodeResult, FilePayload
from .crypto import encrypt_message, derive_pixel_key
from .constants import EMBED_MODE_LSB
from .crypto import derive_pixel_key, encrypt_message
from .debug import debug
from .models import EncodeResult, FilePayload
from .steganography import embed_in_image
from .utils import generate_filename
from .validation import (
require_valid_payload,
require_valid_image,
require_security_factors,
require_valid_image,
require_valid_payload,
require_valid_pin,
require_valid_rsa_key,
)
from .utils import generate_filename
from .constants import EMBED_MODE_LSB
from .debug import debug
def encode(
message: Union[str, bytes, FilePayload],
message: str | bytes | FilePayload,
reference_photo: bytes,
carrier_image: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
output_format: Optional[str] = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
output_format: str | None = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png",
dct_color_mode: str = "grayscale",
channel_key: Optional[Union[str, bool]] = None,
channel_key: str | bool | None = None,
) -> EncodeResult:
"""
Encode a message or file into an image.
@@ -148,19 +147,19 @@ def encode(
def encode_file(
filepath: Union[str, Path],
filepath: str | Path,
reference_photo: bytes,
carrier_image: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
output_format: Optional[str] = None,
filename_override: Optional[str] = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
output_format: str | None = None,
filename_override: str | None = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png",
dct_color_mode: str = "grayscale",
channel_key: Optional[Union[str, bool]] = None,
channel_key: str | bool | None = None,
) -> EncodeResult:
"""
Encode a file into an image.
@@ -210,14 +209,14 @@ def encode_bytes(
carrier_image: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
output_format: Optional[str] = None,
mime_type: Optional[str] = None,
rsa_key_data: bytes | None = None,
rsa_password: str | None = None,
output_format: str | None = None,
mime_type: str | None = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png",
dct_color_mode: str = "grayscale",
channel_key: Optional[Union[str, bool]] = None,
channel_key: str | bool | None = None,
) -> EncodeResult:
"""
Encode raw bytes with metadata into an image.

View File

@@ -4,23 +4,25 @@ Stegasoo Generate Module (v3.2.0)
Public API for generating credentials (PINs, passphrases, RSA keys).
"""
from typing import Optional
from .keygen import (
generate_pin as _generate_pin,
generate_phrase,
generate_rsa_key as _generate_rsa_key,
export_rsa_key_pem,
load_rsa_key,
)
from .models import Credentials
from .constants import (
DEFAULT_PIN_LENGTH,
DEFAULT_PASSPHRASE_WORDS,
DEFAULT_PIN_LENGTH,
DEFAULT_RSA_BITS,
)
from .debug import debug
from .keygen import (
export_rsa_key_pem,
generate_phrase,
load_rsa_key,
)
from .keygen import (
generate_pin as _generate_pin,
)
from .keygen import (
generate_rsa_key as _generate_rsa_key,
)
from .models import Credentials
# Re-export from keygen for convenience
__all__ = [
@@ -78,7 +80,7 @@ def generate_passphrase(words: int = DEFAULT_PASSPHRASE_WORDS) -> str:
def generate_rsa_key(
bits: int = DEFAULT_RSA_BITS,
password: Optional[str] = None
password: str | None = None
) -> str:
"""
Generate an RSA private key in PEM format.
@@ -106,7 +108,7 @@ def generate_credentials(
pin_length: int = DEFAULT_PIN_LENGTH,
rsa_bits: int = DEFAULT_RSA_BITS,
passphrase_words: int = DEFAULT_PASSPHRASE_WORDS,
rsa_password: Optional[str] = None,
rsa_password: str | None = None,
) -> Credentials:
"""
Generate a complete set of credentials.

View File

@@ -4,14 +4,14 @@ Stegasoo Image Utilities (v3.2.0)
Functions for analyzing images and comparing capacity.
"""
from typing import Optional
import io
from PIL import Image
from .models import ImageInfo, CapacityComparison
from .steganography import calculate_capacity, has_dct_support
from .constants import EMBED_MODE_LSB, EMBED_MODE_DCT
from .constants import EMBED_MODE_LSB
from .debug import debug
from .models import CapacityComparison, ImageInfo
from .steganography import calculate_capacity, has_dct_support
def get_image_info(image_data: bytes) -> ImageInfo:
@@ -69,7 +69,7 @@ def get_image_info(image_data: bytes) -> ImageInfo:
def compare_capacity(
carrier_image: bytes,
reference_photo: Optional[bytes] = None,
reference_photo: bytes | None = None,
) -> CapacityComparison:
"""
Compare embedding capacity between LSB and DCT modes.

View File

@@ -10,24 +10,28 @@ Changes in v3.2.0:
"""
import secrets
from typing import Optional, Dict, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from .constants import (
DAY_NAMES,
MIN_PIN_LENGTH, MAX_PIN_LENGTH, DEFAULT_PIN_LENGTH,
MIN_PASSPHRASE_WORDS, MAX_PASSPHRASE_WORDS, DEFAULT_PASSPHRASE_WORDS,
MIN_RSA_BITS, VALID_RSA_SIZES, DEFAULT_RSA_BITS,
DEFAULT_PASSPHRASE_WORDS,
DEFAULT_PIN_LENGTH,
DEFAULT_RSA_BITS,
MAX_PASSPHRASE_WORDS,
MAX_PIN_LENGTH,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
VALID_RSA_SIZES,
get_wordlist,
)
from .models import Credentials, KeyInfo
from .exceptions import KeyGenerationError, KeyPasswordError
from .debug import debug
from .exceptions import KeyGenerationError, KeyPasswordError
from .models import Credentials, KeyInfo
def generate_pin(length: int = DEFAULT_PIN_LENGTH) -> str:
@@ -92,7 +96,7 @@ def generate_phrase(words_per_phrase: int = DEFAULT_PASSPHRASE_WORDS) -> str:
generate_passphrase = generate_phrase
def generate_day_phrases(words_per_phrase: int = DEFAULT_PASSPHRASE_WORDS) -> Dict[str, str]:
def generate_day_phrases(words_per_phrase: int = DEFAULT_PASSPHRASE_WORDS) -> dict[str, str]:
"""
Generate phrases for all days of the week.
@@ -162,7 +166,7 @@ def generate_rsa_key(bits: int = DEFAULT_RSA_BITS) -> rsa.RSAPrivateKey:
def export_rsa_key_pem(
private_key: rsa.RSAPrivateKey,
password: Optional[str] = None
password: str | None = None
) -> bytes:
"""
Export RSA key to PEM format.
@@ -182,10 +186,7 @@ def export_rsa_key_pem(
"""
debug.validate(private_key is not None, "Private key cannot be None")
encryption_algorithm: Union[
serialization.BestAvailableEncryption,
serialization.NoEncryption
]
encryption_algorithm: serialization.BestAvailableEncryption | serialization.NoEncryption
if password:
encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
@@ -203,7 +204,7 @@ def export_rsa_key_pem(
def load_rsa_key(
key_data: bytes,
password: Optional[str] = None
password: str | None = None
) -> rsa.RSAPrivateKey:
"""
Load RSA private key from PEM data.
@@ -253,7 +254,7 @@ def load_rsa_key(
raise KeyGenerationError(f"Could not load RSA key: {e}") from e
def get_key_info(key_data: bytes, password: Optional[str] = None) -> KeyInfo:
def get_key_info(key_data: bytes, password: str | None = None) -> KeyInfo:
"""
Get information about an RSA key.
@@ -293,7 +294,7 @@ def generate_credentials(
pin_length: int = DEFAULT_PIN_LENGTH,
rsa_bits: int = DEFAULT_RSA_BITS,
passphrase_words: int = DEFAULT_PASSPHRASE_WORDS,
rsa_password: Optional[str] = None,
rsa_password: str | None = None,
) -> Credentials:
"""
Generate a complete set of credentials.

View File

@@ -12,8 +12,6 @@ Changes in v3.2.0:
"""
from dataclasses import dataclass, field
from datetime import date
from typing import Optional, Union, List
@dataclass
@@ -24,13 +22,13 @@ class Credentials:
v3.2.0: Simplified to use single passphrase instead of daily rotation.
"""
passphrase: str # Single passphrase (no daily rotation)
pin: Optional[str] = None
rsa_key_pem: Optional[str] = None
rsa_bits: Optional[int] = None
pin: str | None = None
rsa_key_pem: str | None = None
rsa_bits: int | None = None
words_per_passphrase: int = 4 # Increased from 3 in v3.1.0
# Optional: backup passphrases for multi-factor or rotation
backup_passphrases: Optional[list[str]] = None
backup_passphrases: list[str] | None = None
@property
def passphrase_entropy(self) -> int:
@@ -68,17 +66,17 @@ class FilePayload:
"""Represents a file to be embedded."""
data: bytes
filename: str
mime_type: Optional[str] = None
mime_type: str | None = None
@property
def size(self) -> int:
return len(self.data)
@classmethod
def from_file(cls, filepath: str, filename: Optional[str] = None) -> 'FilePayload':
def from_file(cls, filepath: str, filename: str | None = None) -> 'FilePayload':
"""Create FilePayload from a file path."""
from pathlib import Path
import mimetypes
from pathlib import Path
path = Path(filepath)
data = path.read_bytes()
@@ -95,13 +93,13 @@ class EncodeInput:
v3.2.0: Removed date_str (date no longer used in crypto).
"""
message: Union[str, bytes, FilePayload] # Text, raw bytes, or file
message: str | bytes | FilePayload # Text, raw bytes, or file
reference_photo: bytes
carrier_image: bytes
passphrase: str # Renamed from day_phrase
pin: str = ""
rsa_key_data: Optional[bytes] = None
rsa_password: Optional[str] = None
rsa_key_data: bytes | None = None
rsa_password: str | None = None
@dataclass
@@ -116,7 +114,7 @@ class EncodeResult:
pixels_modified: int
total_pixels: int
capacity_used: float # 0.0 - 1.0
date_used: Optional[str] = None # Cosmetic only (for filename organization)
date_used: str | None = None # Cosmetic only (for filename organization)
@property
def capacity_percent(self) -> float:
@@ -135,8 +133,8 @@ class DecodeInput:
reference_photo: bytes
passphrase: str # Renamed from day_phrase
pin: str = ""
rsa_key_data: Optional[bytes] = None
rsa_password: Optional[str] = None
rsa_key_data: bytes | None = None
rsa_password: str | None = None
@dataclass
@@ -147,11 +145,11 @@ class DecodeResult:
v3.2.0: date_encoded is always None (date removed from crypto).
"""
payload_type: str # 'text' or 'file'
message: Optional[str] = None # For text payloads
file_data: Optional[bytes] = None # For file payloads
filename: Optional[str] = None # Original filename for file payloads
mime_type: Optional[str] = None # MIME type hint
date_encoded: Optional[str] = None # Always None in v3.2.0 (kept for compatibility)
message: str | None = None # For text payloads
file_data: bytes | None = None # For file payloads
filename: str | None = None # Original filename for file payloads
mime_type: str | None = None # MIME type hint
date_encoded: str | None = None # Always None in v3.2.0 (kept for compatibility)
@property
def is_file(self) -> bool:
@@ -161,7 +159,7 @@ class DecodeResult:
def is_text(self) -> bool:
return self.payload_type == 'text'
def get_content(self) -> Union[str, bytes]:
def get_content(self) -> str | bytes:
"""Get the decoded content (text or bytes)."""
if self.is_text:
return self.message or ""
@@ -196,10 +194,10 @@ class ValidationResult:
is_valid: bool
error_message: str = ""
details: dict = field(default_factory=dict)
warning: Optional[str] = None # v3.2.0: Added for passphrase length warnings
warning: str | None = None # v3.2.0: Added for passphrase length warnings
@classmethod
def ok(cls, warning: Optional[str] = None, **details) -> 'ValidationResult':
def ok(cls, warning: str | None = None, **details) -> 'ValidationResult':
"""Create a successful validation result."""
result = cls(is_valid=True, details=details)
if warning:
@@ -227,8 +225,8 @@ class ImageInfo:
file_size: int
lsb_capacity_bytes: int
lsb_capacity_kb: float
dct_capacity_bytes: Optional[int] = None
dct_capacity_kb: Optional[float] = None
dct_capacity_bytes: int | None = None
dct_capacity_kb: float | None = None
@dataclass
@@ -241,18 +239,18 @@ class CapacityComparison:
lsb_kb: float
lsb_output_format: str
dct_available: bool
dct_bytes: Optional[int] = None
dct_kb: Optional[float] = None
dct_output_formats: Optional[List[str]] = None
dct_ratio_vs_lsb: Optional[float] = None
dct_bytes: int | None = None
dct_kb: float | None = None
dct_output_formats: list[str] | None = None
dct_ratio_vs_lsb: float | None = None
@dataclass
class GenerateResult:
"""Result of credential generation."""
passphrase: str
pin: Optional[str] = None
rsa_key_pem: Optional[str] = None
pin: str | None = None
rsa_key_pem: str | None = None
passphrase_words: int = 4
passphrase_entropy: int = 0
pin_entropy: int = 0

View File

@@ -10,10 +10,9 @@ IMPROVEMENTS IN THIS VERSION:
- Improved error messages
"""
import base64
import io
import zlib
import base64
from typing import Optional, Tuple
from PIL import Image
@@ -27,20 +26,19 @@ except ImportError:
# QR code reading
try:
from pyzbar.pyzbar import decode as pyzbar_decode
from pyzbar.pyzbar import ZBarSymbol
from pyzbar.pyzbar import decode as pyzbar_decode
HAS_QRCODE_READ = True
except ImportError:
HAS_QRCODE_READ = False
from .constants import (
QR_MAX_BINARY,
QR_CROP_PADDING_PERCENT,
QR_CROP_MIN_PADDING_PX,
QR_CROP_PADDING_PERCENT,
QR_MAX_BINARY,
)
# Constants
COMPRESSION_PREFIX = "STEGASOO-Z:"
@@ -273,7 +271,7 @@ def generate_qr_code(
return buf.getvalue()
def read_qr_code(image_data: bytes) -> Optional[str]:
def read_qr_code(image_data: bytes) -> str | None:
"""
Read QR code from image data.
@@ -313,7 +311,7 @@ def read_qr_code(image_data: bytes) -> Optional[str]:
return None
def read_qr_code_from_file(filepath: str) -> Optional[str]:
def read_qr_code_from_file(filepath: str) -> str | None:
"""
Read QR code from image file.
@@ -327,7 +325,7 @@ def read_qr_code_from_file(filepath: str) -> Optional[str]:
return read_qr_code(f.read())
def extract_key_from_qr(image_data: bytes) -> Optional[str]:
def extract_key_from_qr(image_data: bytes) -> str | None:
"""
Extract RSA key from QR code image, auto-decompressing if needed.
@@ -375,7 +373,7 @@ def extract_key_from_qr(image_data: bytes) -> Optional[str]:
return None
def extract_key_from_qr_file(filepath: str) -> Optional[str]:
def extract_key_from_qr_file(filepath: str) -> str | None:
"""
Extract RSA key from QR code image file.
@@ -393,7 +391,7 @@ def detect_and_crop_qr(
image_data: bytes,
padding_percent: float = QR_CROP_PADDING_PERCENT,
min_padding_px: int = QR_CROP_MIN_PADDING_PX
) -> Optional[bytes]:
) -> bytes | None:
"""
Detect QR code in image and crop to it, handling rotation.
@@ -488,7 +486,7 @@ def detect_and_crop_qr_file(
filepath: str,
padding_percent: float = QR_CROP_PADDING_PERCENT,
min_padding_px: int = QR_CROP_MIN_PADDING_PX
) -> Optional[bytes]:
) -> bytes | None:
"""
Detect QR code in image file and crop to it.

View File

@@ -20,22 +20,24 @@ Changes in v3.2.0:
import io
import struct
from typing import Optional, Tuple, List, Union
from typing import TYPE_CHECKING, Union
from PIL import Image
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from PIL import Image
if TYPE_CHECKING:
from .dct_steganography import DCTEmbedStats
from .models import EmbedStats, FilePayload
from .exceptions import CapacityError, ExtractionError, EmbeddingError
from .debug import debug
from .constants import (
EMBED_MODE_LSB,
EMBED_MODE_DCT,
EMBED_MODE_AUTO,
EMBED_MODE_DCT,
EMBED_MODE_LSB,
VALID_EMBED_MODES,
)
from .debug import debug
from .exceptions import CapacityError, EmbeddingError
from .models import EmbedStats, FilePayload
# Lossless formats that preserve LSB data
LOSSLESS_FORMATS = {'PNG', 'BMP', 'TIFF'}
@@ -122,7 +124,7 @@ def has_dct_support() -> bool:
# FORMAT UTILITIES
# =============================================================================
def get_output_format(input_format: Optional[str]) -> Tuple[str, str]:
def get_output_format(input_format: str | None) -> tuple[str, str]:
"""
Determine the output format based on input format.
@@ -151,7 +153,7 @@ def get_output_format(input_format: Optional[str]) -> Tuple[str, str]:
# =============================================================================
def will_fit(
payload: Union[str, bytes, FilePayload, int],
payload: str | bytes | FilePayload | int,
carrier_image: bytes,
bits_per_channel: int = 1,
include_compression_estimate: bool = True,
@@ -297,7 +299,7 @@ def calculate_capacity_by_mode(
def will_fit_by_mode(
payload: Union[str, bytes, FilePayload, int],
payload: str | bytes | FilePayload | int,
carrier_image: bytes,
embed_mode: str = EMBED_MODE_LSB,
bits_per_channel: int = 1,
@@ -424,7 +426,7 @@ def compare_modes(image_data: bytes) -> dict:
# =============================================================================
@debug.time
def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> List[int]:
def generate_pixel_indices(key: bytes, num_pixels: int, num_needed: int) -> list[int]:
"""
Generate pseudo-random pixel indices for embedding.
@@ -508,11 +510,11 @@ def embed_in_image(
image_data: bytes,
pixel_key: bytes,
bits_per_channel: int = 1,
output_format: Optional[str] = None,
output_format: str | None = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = DCT_OUTPUT_PNG,
dct_color_mode: str = 'grayscale',
) -> Tuple[bytes, Union[EmbedStats, 'DCTEmbedStats'], str]:
) -> tuple[bytes, Union[EmbedStats, 'DCTEmbedStats'], str]:
"""
Embed data into an image using specified mode.
@@ -586,8 +588,8 @@ def _embed_lsb(
image_data: bytes,
pixel_key: bytes,
bits_per_channel: int = 1,
output_format: Optional[str] = None,
) -> Tuple[bytes, EmbedStats, str]:
output_format: str | None = None,
) -> tuple[bytes, EmbedStats, str]:
"""
Embed data using LSB steganography (internal implementation).
"""
@@ -722,7 +724,7 @@ def extract_from_image(
pixel_key: bytes,
bits_per_channel: int = 1,
embed_mode: str = EMBED_MODE_AUTO,
) -> Optional[bytes]:
) -> bytes | None:
"""
Extract hidden data from a stego image.
@@ -765,7 +767,7 @@ def extract_from_image(
return _extract_lsb(image_data, pixel_key, bits_per_channel)
def _extract_dct(image_data: bytes, pixel_key: bytes) -> Optional[bytes]:
def _extract_dct(image_data: bytes, pixel_key: bytes) -> bytes | None:
"""Extract using DCT mode."""
try:
dct_mod = _get_dct_module()
@@ -779,7 +781,7 @@ def _extract_lsb(
image_data: bytes,
pixel_key: bytes,
bits_per_channel: int = 1
) -> Optional[bytes]:
) -> bytes | None:
"""
Extract using LSB mode (internal implementation).
"""
@@ -878,7 +880,7 @@ def _extract_lsb(
# UTILITY FUNCTIONS
# =============================================================================
def get_image_dimensions(image_data: bytes) -> Tuple[int, int]:
def get_image_dimensions(image_data: bytes) -> tuple[int, int]:
"""Get image dimensions without loading full image."""
debug.validate(len(image_data) > 0, "Image data cannot be empty")
img = Image.open(io.BytesIO(image_data))
@@ -890,7 +892,7 @@ def get_image_dimensions(image_data: bytes) -> Tuple[int, int]:
img.close()
def get_image_format(image_data: bytes) -> Optional[str]:
def get_image_format(image_data: bytes) -> str | None:
"""Get image format (PIL format string like 'PNG', 'JPEG')."""
try:
img = Image.open(io.BytesIO(image_data))

View File

@@ -9,9 +9,8 @@ import os
import random
import secrets
import shutil
from datetime import date, datetime
from datetime import date
from pathlib import Path
from typing import Optional, Union
from PIL import Image
@@ -58,7 +57,7 @@ def strip_image_metadata(image_data: bytes, output_format: str = 'PNG') -> bytes
def generate_filename(
date_str: Optional[str] = None,
date_str: str | None = None,
prefix: str = "",
extension: str = "png"
) -> str:
@@ -96,7 +95,7 @@ def generate_filename(
return filename
def parse_date_from_filename(filename: str) -> Optional[str]:
def parse_date_from_filename(filename: str) -> str | None:
"""
Extract date from a stego filename.
@@ -205,7 +204,7 @@ class SecureDeleter:
>>> deleter.execute()
"""
def __init__(self, path: Union[str, Path], passes: int = 7):
def __init__(self, path: str | Path, passes: int = 7):
"""
Initialize secure deleter.
@@ -294,7 +293,7 @@ class SecureDeleter:
debug.print(f"Path does not exist: {self.path}")
def secure_delete(path: Union[str, Path], passes: int = 7) -> None:
def secure_delete(path: str | Path, passes: int = 7) -> None:
"""
Convenience function for secure deletion.

View File

@@ -10,25 +10,35 @@ Changes in v3.2.0:
"""
import io
from typing import Optional, Union
from PIL import Image
from .constants import (
MIN_PIN_LENGTH, MAX_PIN_LENGTH,
MAX_MESSAGE_SIZE, MAX_FILE_PAYLOAD_SIZE, MAX_IMAGE_PIXELS, MAX_FILE_SIZE,
MIN_RSA_BITS, MIN_KEY_PASSWORD_LENGTH,
ALLOWED_IMAGE_EXTENSIONS, ALLOWED_KEY_EXTENSIONS,
MIN_PASSPHRASE_WORDS, RECOMMENDED_PASSPHRASE_WORDS,
EMBED_MODE_LSB, EMBED_MODE_DCT, EMBED_MODE_AUTO,
ALLOWED_IMAGE_EXTENSIONS,
ALLOWED_KEY_EXTENSIONS,
EMBED_MODE_AUTO,
EMBED_MODE_DCT,
EMBED_MODE_LSB,
MAX_FILE_PAYLOAD_SIZE,
MAX_FILE_SIZE,
MAX_IMAGE_PIXELS,
MAX_MESSAGE_SIZE,
MAX_PIN_LENGTH,
MIN_KEY_PASSWORD_LENGTH,
MIN_PASSPHRASE_WORDS,
MIN_PIN_LENGTH,
MIN_RSA_BITS,
RECOMMENDED_PASSPHRASE_WORDS,
)
from .models import ValidationResult, FilePayload
from .exceptions import (
ValidationError, PinValidationError, MessageValidationError,
ImageValidationError, KeyValidationError, SecurityFactorError,
FileTooLargeError, UnsupportedFileTypeError,
ImageValidationError,
KeyValidationError,
MessageValidationError,
PinValidationError,
SecurityFactorError,
)
from .keygen import load_rsa_key
from .models import FilePayload, ValidationResult
def validate_pin(pin: str, required: bool = False) -> ValidationResult:
@@ -87,7 +97,7 @@ def validate_message(message: str) -> ValidationResult:
return ValidationResult.ok(length=len(message))
def validate_payload(payload: Union[str, bytes, FilePayload]) -> ValidationResult:
def validate_payload(payload: str | bytes | FilePayload) -> ValidationResult:
"""
Validate a payload (text message, bytes, or file).
@@ -212,7 +222,7 @@ def validate_image(
def validate_rsa_key(
key_data: bytes,
password: Optional[str] = None,
password: str | None = None,
required: bool = False
) -> ValidationResult:
"""
@@ -248,7 +258,7 @@ def validate_rsa_key(
def validate_security_factors(
pin: str,
rsa_key_data: Optional[bytes]
rsa_key_data: bytes | None
) -> ValidationResult:
"""
Validate that at least one security factor is provided.
@@ -456,7 +466,7 @@ def require_valid_message(message: str) -> None:
raise MessageValidationError(result.error_message)
def require_valid_payload(payload: Union[str, bytes, FilePayload]) -> None:
def require_valid_payload(payload: str | bytes | FilePayload) -> None:
"""Validate payload (text, bytes, or file), raising exception on failure."""
result = validate_payload(payload)
if not result.is_valid:
@@ -472,7 +482,7 @@ def require_valid_image(image_data: bytes, name: str = "Image") -> None:
def require_valid_rsa_key(
key_data: bytes,
password: Optional[str] = None,
password: str | None = None,
required: bool = False
) -> None:
"""Validate RSA key, raising exception on failure."""
@@ -481,7 +491,7 @@ def require_valid_rsa_key(
raise KeyValidationError(result.error_message)
def require_security_factors(pin: str, rsa_key_data: Optional[bytes]) -> None:
def require_security_factors(pin: str, rsa_key_data: bytes | None) -> None:
"""Validate security factors, raising exception on failure."""
result = validate_security_factors(pin, rsa_key_data)
if not result.is_valid:

View File

@@ -7,18 +7,19 @@ Updated for v4.0.0:
- BatchCredentials.passphrase is a single string
"""
import pytest
import tempfile
import shutil
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
from unittest.mock import Mock
import pytest
from stegasoo.batch import (
BatchCredentials,
BatchItem,
BatchProcessor,
BatchResult,
BatchItem,
BatchStatus,
BatchCredentials,
batch_capacity_check,
print_batch_result,
)

View File

@@ -3,18 +3,19 @@ Tests for Stegasoo compression module.
"""
import pytest
from stegasoo.compression import (
compress,
decompress,
CompressionAlgorithm,
CompressionError,
get_compression_ratio,
estimate_compressed_size,
get_available_algorithms,
algorithm_name,
MIN_COMPRESS_SIZE,
COMPRESSION_MAGIC,
HAS_LZ4,
MIN_COMPRESS_SIZE,
CompressionAlgorithm,
CompressionError,
algorithm_name,
compress,
decompress,
estimate_compressed_size,
get_available_algorithms,
get_compression_ratio,
)

View File

@@ -11,29 +11,28 @@ Updated for v4.0.0:
- Python 3.12 recommended (3.13 not supported)
"""
import io
import pytest
from PIL import Image
import io
import stegasoo
from stegasoo import (
generate_pin,
generate_passphrase,
generate_credentials,
validate_pin,
validate_message,
validate_passphrase,
validate_channel_key,
encode,
decode,
decode_text,
encode,
generate_channel_key,
generate_credentials,
generate_passphrase,
generate_pin,
get_channel_fingerprint,
__version__,
validate_channel_key,
validate_message,
validate_passphrase,
validate_pin,
)
from stegasoo.steganography import get_output_format
# =============================================================================
# Fixtures
# =============================================================================