Bug fixes, CLI updates, docs.

This commit is contained in:
Aaron D. Lee
2026-01-01 13:40:27 -05:00
parent 3898031480
commit a001f227ec
9 changed files with 1110 additions and 445 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ CHANGES in v3.2.0:
- Removed date dependency from all operations
- Renamed day_phrase → passphrase
- No longer need to specify or remember encoding dates
- Default passphrase length increased to 4 words
Usage:
stegasoo generate [OPTIONS]
@@ -28,26 +29,57 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'src'))
import stegasoo
from stegasoo import (
encode, encode_file, decode,
# Core operations
encode, decode,
# Credential generation
generate_credentials,
export_rsa_key_pem, load_rsa_key,
validate_image, calculate_capacity,
parse_date_from_filename, # Keep for filename parsing only
__version__,
StegasooError, DecryptionError, ExtractionError,
FilePayload,
will_fit,
strip_image_metadata,
# Embedding modes
EMBED_MODE_LSB,
EMBED_MODE_DCT,
EMBED_MODE_AUTO,
generate_passphrase,
generate_pin,
export_rsa_key_pem,
load_rsa_key,
# Validation
validate_image,
# Image utilities
get_image_info,
compare_capacity,
# Steganography functions
has_dct_support,
compare_modes,
will_fit_by_mode,
calculate_capacity_by_mode,
# Utilities
generate_filename,
# Version
__version__,
# Exceptions
StegasooError,
DecryptionError,
ExtractionError,
# Models
FilePayload,
# Constants
EMBED_MODE_LSB,
EMBED_MODE_DCT,
EMBED_MODE_AUTO,
DEFAULT_PASSPHRASE_WORDS,
DEFAULT_PIN_LENGTH,
)
# Optional: strip_image_metadata from utils
try:
from stegasoo.utils import strip_image_metadata
HAS_STRIP_METADATA = True
except ImportError:
HAS_STRIP_METADATA = False
# QR Code utilities
try:
from stegasoo.qr_utils import (
@@ -87,6 +119,7 @@ def cli():
Version 3.2.0 Changes:
- No more date parameters - encode/decode anytime!
- Simplified passphrase (no daily rotation)
- Default passphrase increased to 4 words
- True asynchronous communications
\b
@@ -109,9 +142,12 @@ def cli():
@cli.command()
@click.option('--pin/--no-pin', default=True, help='Generate a PIN (default: yes)')
@click.option('--rsa/--no-rsa', default=False, help='Generate an RSA key')
@click.option('--pin-length', type=click.IntRange(6, 9), default=6, help='PIN length (6-9)')
@click.option('--rsa-bits', type=click.Choice(['2048', '3072', '4096']), default='2048', help='RSA key size')
@click.option('--words', type=click.IntRange(3, 12), default=4, help='Words per passphrase (default: 4, was 3 in v3.1)')
@click.option('--pin-length', type=click.IntRange(6, 9), default=DEFAULT_PIN_LENGTH,
help=f'PIN length (6-9, default: {DEFAULT_PIN_LENGTH})')
@click.option('--rsa-bits', type=click.Choice(['2048', '3072', '4096']), default='2048',
help='RSA key size')
@click.option('--words', type=click.IntRange(3, 12), default=DEFAULT_PASSPHRASE_WORDS,
help=f'Words per passphrase (default: {DEFAULT_PASSPHRASE_WORDS})')
@click.option('--output', '-o', type=click.Path(), help='Save RSA key to file (requires password)')
@click.option('--password', '-p', help='Password for RSA key file')
@click.option('--json', 'as_json', is_flag=True, help='Output as JSON')
@@ -122,8 +158,8 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json):
Creates a passphrase and optionally a PIN and/or RSA key.
At least one of --pin or --rsa must be enabled.
v3.2.0: No more daily passphrases - use one strong passphrase!
Default increased to 4 words (from 3) for better security.
v3.2.0: Single passphrase (no more daily rotation!)
Default increased to 4 words for better security.
\b
Examples:
@@ -148,7 +184,8 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json):
use_rsa=rsa,
pin_length=pin_length,
rsa_bits=int(rsa_bits),
words_per_passphrase=words
passphrase_words=words, # v3.2.0: renamed parameter
rsa_password=password if output else None,
)
if as_json:
@@ -174,21 +211,21 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json):
click.secho("=" * 60, fg='cyan')
click.echo()
click.secho(" MEMORIZE THESE AND CLOSE THIS WINDOW", fg='yellow', bold=True)
click.secho("⚠️ MEMORIZE THESE AND CLOSE THIS WINDOW", fg='yellow', bold=True)
click.secho(" Do not screenshot or save to file!", fg='yellow')
click.echo()
if creds.pin:
click.secho("--- STATIC PIN ---", fg='green')
click.secho("─── STATIC PIN ───", fg='green')
click.secho(f" {creds.pin}", fg='bright_yellow', bold=True)
click.echo()
click.secho("--- PASSPHRASE ---", fg='green')
click.secho("─── PASSPHRASE ───", fg='green')
click.secho(f" {creds.passphrase}", fg='bright_white', bold=True)
click.echo()
if creds.rsa_key_pem:
click.secho("--- RSA KEY ---", fg='green')
click.secho("─── RSA KEY ───", fg='green')
if output:
# Save to file
private_key = load_rsa_key(creds.rsa_key_pem.encode())
@@ -200,7 +237,7 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json):
click.echo(creds.rsa_key_pem)
click.echo()
click.secho("--- SECURITY ---", fg='green')
click.secho("─── SECURITY ───", fg='green')
click.echo(f" Passphrase entropy: {creds.passphrase_entropy} bits ({words} words)")
if creds.pin:
click.echo(f" PIN entropy: {creds.pin_entropy} bits")
@@ -210,7 +247,7 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json):
click.secho(f" + photo entropy: 80-256 bits", dim=True)
click.echo()
click.secho("NOTE: v3.2.0 removed date dependency - use this passphrase anytime!", fg='cyan')
click.secho(" v3.2.0: Use this passphrase anytime - no date needed!", fg='cyan')
click.echo()
except Exception as e:
@@ -227,7 +264,7 @@ def generate(pin, rsa, pin_length, rsa_bits, words, output, password, as_json):
@click.option('--message', '-m', help='Text message to encode')
@click.option('--message-file', '-f', type=click.Path(exists=True), help='Read text message from file')
@click.option('--embed-file', '-e', type=click.Path(exists=True), help='Embed a file (binary)')
@click.option('--passphrase', '-p', required=True, help='Passphrase (v3.2.0: no date needed!)')
@click.option('--passphrase', '-p', required=True, help='Passphrase')
@click.option('--pin', help='Static PIN')
@click.option('--key', '-k', type=click.Path(exists=True), help='RSA key file (.pem)')
@click.option('--key-qr', type=click.Path(exists=True), help='RSA key from QR code image')
@@ -282,7 +319,7 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, passphrase, pin,
stegasoo encode -r photo.jpg -c meme.png -p "secure words here now" --pin 123456 -m "secret" --mode dct
# DCT mode - color JPEG
stegasoo encode -r photo.jpg -c meme.png -p "my strong passphrase" --pin 123456 -m "secret" \
stegasoo encode -r photo.jpg -c meme.png -p "my strong passphrase" --pin 123456 -m "secret" \\
--mode dct --dct-color color --dct-format jpeg
"""
# Check DCT mode availability
@@ -378,7 +415,7 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, passphrase, pin,
message=payload,
reference_photo=ref_photo,
carrier_image=carrier_image,
passphrase=passphrase, # Renamed from day_phrase
passphrase=passphrase,
pin=pin or "",
rsa_key_data=rsa_key_data,
rsa_password=effective_key_password,
@@ -405,7 +442,6 @@ def encode_cmd(ref, carrier, message, message_file, embed_file, passphrase, pin,
color_note = "color preserved" if dct_color_mode == 'color' else "grayscale"
format_note = dct_output_format.upper()
click.secho(f" DCT output: {format_note} ({color_note})", dim=True)
click.secho(" (v3.2.0: No date needed to decode!)", fg='cyan', dim=True)
except StegasooError as e:
raise click.ClickException(str(e))
@@ -509,7 +545,7 @@ def decode_cmd(ref, stego, passphrase, pin, key, key_qr, key_password, output, e
result = decode(
stego_image=stego_image,
reference_photo=ref_photo,
passphrase=passphrase, # Renamed from day_phrase
passphrase=passphrase,
pin=pin or "",
rsa_key_data=rsa_key_data,
rsa_password=effective_key_password,
@@ -631,7 +667,7 @@ def verify(ref, stego, passphrase, pin, key, key_qr, key_password, embed_mode, a
result = decode(
stego_image=stego_image,
reference_photo=ref_photo,
passphrase=passphrase, # v3.2.0: Renamed from day_phrase
passphrase=passphrase,
pin=pin or "",
rsa_key_data=rsa_key_data,
rsa_password=effective_key_password,
@@ -652,16 +688,16 @@ def verify(ref, stego, passphrase, pin, key, key_qr, key_password, embed_mode, a
if as_json:
import json
output = {
output_data = {
"valid": True,
"stego_file": stego,
"payload_type": payload_type,
"payload_size": payload_size,
}
if result.is_file:
output["filename"] = result.filename
output["mime_type"] = result.mime_type
click.echo(json.dumps(output, indent=2))
output_data["filename"] = result.filename
output_data["mime_type"] = result.mime_type
click.echo(json.dumps(output_data, indent=2))
else:
click.secho("✓ Valid stego image", fg='green', bold=True)
click.echo(f" Payload: {payload_type} ({payload_desc})")
@@ -670,12 +706,12 @@ def verify(ref, stego, passphrase, pin, key, key_qr, key_password, embed_mode, a
except (DecryptionError, ExtractionError) as e:
if as_json:
import json
output = {
output_data = {
"valid": False,
"stego_file": stego,
"error": str(e),
}
click.echo(json.dumps(output, indent=2))
click.echo(json.dumps(output_data, indent=2))
sys.exit(1)
else:
click.secho("✗ Verification failed", fg='red', bold=True)
@@ -712,7 +748,7 @@ def info(image, as_json):
if as_json:
import json
output = {
output_data = {
"file": image,
"width": result.details['width'],
"height": result.details['height'],
@@ -734,12 +770,12 @@ def info(image, as_json):
},
},
}
click.echo(json.dumps(output, indent=2))
click.echo(json.dumps(output_data, indent=2))
return
click.echo()
click.secho(f"Image: {image}", bold=True)
click.echo(f" Dimensions: {result.details['width']} x {result.details['height']}")
click.echo(f" Dimensions: {result.details['width']} × {result.details['height']}")
click.echo(f" Pixels: {result.details['pixels']:,}")
click.echo(f" Mode: {result.details['mode']}")
click.echo(f" Format: {result.details['format']}")
@@ -789,7 +825,7 @@ def compare(image, payload_size, as_json):
if as_json:
import json
output = {
output_data = {
"file": image,
"width": comparison['width'],
"height": comparison['height'],
@@ -812,43 +848,43 @@ def compare(image, payload_size, as_json):
}
if payload_size:
output["payload_check"] = {
output_data["payload_check"] = {
"size_bytes": payload_size,
"fits_lsb": payload_size <= comparison['lsb']['capacity_bytes'],
"fits_dct": payload_size <= comparison['dct']['capacity_bytes'],
}
click.echo(json.dumps(output, indent=2))
click.echo(json.dumps(output_data, indent=2))
return
click.echo()
click.secho(f"=== Mode Comparison: {image} ===", fg='cyan', bold=True)
click.echo(f" Dimensions: {comparison['width']} x {comparison['height']}")
click.echo(f" Dimensions: {comparison['width']} × {comparison['height']}")
click.echo()
# LSB mode
click.secho(" +--- LSB Mode ---", fg='green')
click.echo(f" | Capacity: {comparison['lsb']['capacity_bytes']:,} bytes ({comparison['lsb']['capacity_kb']:.1f} KB)")
click.echo(f" | Output: {comparison['lsb']['output']}")
click.echo(f" | Status: ✓ Available")
click.echo(" |")
click.secho(" ┌─── LSB Mode ───", fg='green')
click.echo(f" Capacity: {comparison['lsb']['capacity_bytes']:,} bytes ({comparison['lsb']['capacity_kb']:.1f} KB)")
click.echo(f" Output: {comparison['lsb']['output']}")
click.echo(f" Status: ✓ Available")
click.echo(" ")
# DCT mode
click.secho(" +--- DCT Mode ---", fg='blue')
click.echo(f" | Capacity: {comparison['dct']['capacity_bytes']:,} bytes ({comparison['dct']['capacity_kb']:.1f} KB)")
click.echo(f" | Ratio: {comparison['dct']['ratio_vs_lsb']:.1f}% of LSB capacity")
click.secho(" ├─── DCT Mode ───", fg='blue')
click.echo(f" Capacity: {comparison['dct']['capacity_bytes']:,} bytes ({comparison['dct']['capacity_kb']:.1f} KB)")
click.echo(f" Ratio: {comparison['dct']['ratio_vs_lsb']:.1f}% of LSB capacity")
if comparison['dct']['available']:
click.echo(f" | Status: ✓ Available")
click.echo(f" | Formats: PNG (lossless), JPEG (smaller)")
click.echo(f" | Colors: Grayscale (default), Color (v3.0.1)")
click.echo(f" Status: ✓ Available")
click.echo(f" Formats: PNG (lossless), JPEG (smaller)")
click.echo(f" Colors: Grayscale (default), Color")
else:
click.secho(f" | Status: ✗ Requires scipy (pip install scipy)", fg='yellow')
click.echo(" |")
click.secho(f" Status: ✗ Requires scipy (pip install scipy)", fg='yellow')
click.echo(" ")
# Payload check
if payload_size:
click.secho(" +--- Payload Check ---", fg='magenta')
click.echo(f" | Size: {payload_size:,} bytes")
click.secho(" ├─── Payload Check ───", fg='magenta')
click.echo(f" Size: {payload_size:,} bytes")
fits_lsb = payload_size <= comparison['lsb']['capacity_bytes']
fits_dct = payload_size <= comparison['dct']['capacity_bytes']
@@ -858,14 +894,14 @@ def compare(image, payload_size, as_json):
lsb_color = 'green' if fits_lsb else 'red'
dct_color = 'green' if fits_dct else 'red'
click.echo(f" | LSB mode: ", nl=False)
click.echo(f" LSB mode: ", nl=False)
click.secho(f"{lsb_icon} {'Fits' if fits_lsb else 'Too large'}", fg=lsb_color)
click.echo(f" | DCT mode: ", nl=False)
click.echo(f" DCT mode: ", nl=False)
click.secho(f"{dct_icon} {'Fits' if fits_dct else 'Too large'}", fg=dct_color)
click.echo(" |")
click.echo(" ")
# Recommendation
click.secho(" +--- Recommendation ---", fg='yellow')
click.secho(" └─── Recommendation ───", fg='yellow')
if not comparison['dct']['available']:
click.echo(" Use LSB mode (DCT unavailable)")
elif payload_size:
@@ -893,7 +929,8 @@ def compare(image, payload_size, as_json):
@cli.command('strip-metadata')
@click.argument('image', type=click.Path(exists=True))
@click.option('--output', '-o', type=click.Path(), help='Output file (default: overwrites input)')
@click.option('--format', '-f', 'output_format', type=click.Choice(['PNG', 'BMP']), default='PNG', help='Output format')
@click.option('--format', '-f', 'output_format', type=click.Choice(['PNG', 'BMP']), default='PNG',
help='Output format')
@click.option('--quiet', '-q', is_flag=True, help='Suppress output')
def strip_metadata_cmd(image, output, output_format, quiet):
"""
@@ -907,6 +944,9 @@ def strip_metadata_cmd(image, output, output_format, quiet):
stegasoo strip-metadata photo.jpg -o clean.png
stegasoo strip-metadata photo.jpg # Overwrites as PNG
"""
if not HAS_STRIP_METADATA:
raise click.ClickException("strip_image_metadata not available")
try:
image_data = Path(image).read_bytes()
original_size = len(image_data)
@@ -967,7 +1007,7 @@ def modes():
click.echo()
# DCT Options
click.secho(" DCT Options (v3.0.1)", fg='magenta', bold=True)
click.secho(" DCT Options", fg='magenta', bold=True)
click.echo(" Output format:")
click.echo(" --dct-format png Lossless, larger file (default)")
click.echo(" --dct-format jpeg Lossy, smaller, more natural")
@@ -981,6 +1021,7 @@ def modes():
click.secho(" v3.2.0 Changes:", fg='cyan', bold=True)
click.echo(" ✓ No date parameters needed")
click.echo(" ✓ Single passphrase (no daily rotation)")
click.echo(" ✓ Default passphrase increased to 4 words")
click.echo(" ✓ True asynchronous communications")
click.echo()

View File

@@ -1,10 +1,51 @@
#!/usr/bin/env python3
"""Main entry point."""
"""
Stegasoo - Main Entry Point
This module provides the main entry point for the stegasoo package.
It can be run directly or via the installed console script.
Usage:
python -m stegasoo --help
python src/main.py --help
stegasoo --help (if installed via pip)
"""
import sys
def main():
"""Main function."""
print("Hello, World!")
"""
Main entry point for Stegasoo CLI.
Delegates to the CLI module for command parsing and execution.
"""
try:
from stegasoo.cli import main as cli_main
cli_main()
except ImportError as e:
# Provide helpful error if dependencies are missing
print(f"Error: Could not import stegasoo package: {e}", file=sys.stderr)
print("\nMake sure stegasoo is installed:", file=sys.stderr)
print(" pip install -e .", file=sys.stderr)
print("\nOr run from the src directory:", file=sys.stderr)
print(" PYTHONPATH=src python -m stegasoo", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("\nInterrupted.", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def version():
"""Print version and exit."""
try:
from stegasoo import __version__
print(f"stegasoo {__version__}")
except ImportError:
print("stegasoo (version unknown)")
if __name__ == "__main__":

View File

@@ -1,8 +1,12 @@
"""
Stegasoo Batch Processing Module
Stegasoo Batch Processing Module (v3.2.0)
Enables encoding/decoding multiple files in a single operation.
Supports parallel processing, progress tracking, and detailed reporting.
Changes in v3.2.0:
- BatchCredentials: renamed day_phrase → passphrase, removed date_str
- Updated all credential handling to use v3.2.0 API
"""
import os
@@ -64,37 +68,57 @@ class BatchItem:
@dataclass
class BatchCredentials:
"""
Credentials for batch encode/decode operations.
Credentials for batch encode/decode operations (v3.2.0).
Provides a structured way to pass authentication factors
for batch processing instead of using plain dicts.
Changes in v3.2.0:
- Renamed day_phrase → passphrase
- Removed date_str (no longer used in cryptographic operations)
Example:
creds = BatchCredentials(
reference_photo=ref_bytes,
day_phrase="apple forest thunder",
passphrase="apple forest thunder mountain",
pin="123456"
)
result = processor.batch_encode(images, creds, message="secret")
"""
reference_photo: bytes
day_phrase: str
passphrase: str # v3.2.0: renamed from day_phrase
pin: str = ""
rsa_key_data: Optional[bytes] = None
rsa_password: Optional[str] = None
date_str: Optional[str] = None # YYYY-MM-DD, defaults to today
def to_dict(self) -> dict:
"""Convert to dictionary for legacy API compatibility."""
"""Convert to dictionary for API compatibility."""
return {
"reference_photo": self.reference_photo,
"day_phrase": self.day_phrase,
"passphrase": self.passphrase,
"pin": self.pin,
"rsa_key_data": self.rsa_key_data,
"rsa_password": self.rsa_password,
"date_str": self.date_str,
}
@classmethod
def from_dict(cls, data: dict) -> 'BatchCredentials':
"""
Create BatchCredentials from a dictionary.
Handles both v3.2.0 format (passphrase) and legacy format (day_phrase).
"""
# Handle legacy 'day_phrase' key
passphrase = data.get('passphrase') or data.get('day_phrase', '')
return cls(
reference_photo=data['reference_photo'],
passphrase=passphrase,
pin=data.get('pin', ''),
rsa_key_data=data.get('rsa_key_data'),
rsa_password=data.get('rsa_password'),
)
@dataclass
class BatchResult:
@@ -140,23 +164,39 @@ ProgressCallback = Callable[[int, int, BatchItem], None]
class BatchProcessor:
"""
Handles batch encoding/decoding operations.
Handles batch encoding/decoding operations (v3.2.0).
Usage:
processor = BatchProcessor(max_workers=4)
# Batch encode
# Batch encode with BatchCredentials
creds = BatchCredentials(
reference_photo=ref_bytes,
passphrase="apple forest thunder mountain",
pin="123456"
)
result = processor.batch_encode(
images=['img1.png', 'img2.png'],
message="Secret message",
output_dir="./encoded/",
credentials={"phrase": "...", "pin": "..."},
credentials=creds,
)
# Batch encode with dict credentials
result = processor.batch_encode(
images=['img1.png', 'img2.png'],
message="Secret message",
credentials={
"reference_photo": ref_bytes,
"passphrase": "apple forest thunder mountain",
"pin": "123456"
},
)
# Batch decode
result = processor.batch_decode(
images=['encoded1.png', 'encoded2.png'],
credentials={"phrase": "...", "pin": "..."},
credentials=creds,
)
"""
@@ -202,6 +242,26 @@ class BatchProcessor:
"""Check if path is a valid image file."""
return path.suffix.lower().lstrip('.') in ALLOWED_IMAGE_EXTENSIONS
def _normalize_credentials(
self,
credentials: dict | BatchCredentials | None
) -> BatchCredentials:
"""
Normalize credentials to BatchCredentials object.
Handles both dict and BatchCredentials input, and legacy 'day_phrase' key.
"""
if credentials is None:
raise ValueError("Credentials are required")
if isinstance(credentials, BatchCredentials):
return credentials
if isinstance(credentials, dict):
return BatchCredentials.from_dict(credentials)
raise ValueError(f"Invalid credentials type: {type(credentials)}")
def batch_encode(
self,
images: list[str | Path],
@@ -209,7 +269,7 @@ class BatchProcessor:
file_payload: Optional[Path] = None,
output_dir: Optional[Path] = None,
output_suffix: str = "_encoded",
credentials: dict = None,
credentials: dict | BatchCredentials | None = None,
compress: bool = True,
recursive: bool = False,
progress_callback: Optional[ProgressCallback] = None,
@@ -224,7 +284,7 @@ class BatchProcessor:
file_payload: File to embed (mutually exclusive with message)
output_dir: Output directory (default: same as input)
output_suffix: Suffix for output files
credentials: Dict with 'phrase', 'pin', and optionally 'private_key'
credentials: BatchCredentials or dict with 'passphrase', 'pin', etc.
compress: Enable compression
recursive: Search directories recursively
progress_callback: Called for each item: callback(current, total, item)
@@ -236,8 +296,8 @@ class BatchProcessor:
if message is None and file_payload is None:
raise ValueError("Either message or file_payload must be provided")
if credentials is None:
raise ValueError("Credentials are required")
# Normalize credentials to BatchCredentials
creds = self._normalize_credentials(credentials)
result = BatchResult(operation="encode")
image_paths = list(self.find_images(images, recursive))
@@ -274,15 +334,15 @@ class BatchProcessor:
output_path=item.output_path,
message=message,
file_payload=file_payload,
credentials=credentials,
credentials=creds.to_dict(),
compress=compress,
)
else:
# Placeholder - actual implementation would call stego.encode()
self._mock_encode(item, message, credentials, compress)
# Use stegasoo encode
self._do_encode(item, message, file_payload, creds, compress)
item.status = BatchStatus.SUCCESS
item.output_size = item.output_path.stat().st_size if item.output_path.exists() else 0
item.output_size = item.output_path.stat().st_size if item.output_path and item.output_path.exists() else 0
item.message = f"Encoded to {item.output_path.name}"
except Exception as e:
@@ -301,7 +361,7 @@ class BatchProcessor:
self,
images: list[str | Path],
output_dir: Optional[Path] = None,
credentials: dict = None,
credentials: dict | BatchCredentials | None = None,
recursive: bool = False,
progress_callback: Optional[ProgressCallback] = None,
decode_func: Callable = None,
@@ -312,7 +372,7 @@ class BatchProcessor:
Args:
images: List of image paths or directories
output_dir: Output directory for file payloads (default: same as input)
credentials: Dict with 'phrase', 'pin', and optionally 'private_key'
credentials: BatchCredentials or dict with 'passphrase', 'pin', etc.
recursive: Search directories recursively
progress_callback: Called for each item: callback(current, total, item)
decode_func: Custom decode function (for integration)
@@ -320,8 +380,8 @@ class BatchProcessor:
Returns:
BatchResult with decoded messages in item.message fields
"""
if credentials is None:
raise ValueError("Credentials are required")
# Normalize credentials to BatchCredentials
creds = self._normalize_credentials(credentials)
result = BatchResult(operation="decode")
image_paths = list(self.find_images(images, recursive))
@@ -351,12 +411,12 @@ class BatchProcessor:
decoded = decode_func(
image_path=item.input_path,
output_dir=item.output_path,
credentials=credentials,
credentials=creds.to_dict(),
)
item.message = decoded.get('message', '') if isinstance(decoded, dict) else str(decoded)
else:
# Placeholder - actual implementation would call stego.decode()
item.message = self._mock_decode(item, credentials)
# Use stegasoo decode
item.message = self._do_decode(item, creds)
item.status = BatchStatus.SUCCESS
@@ -404,14 +464,112 @@ class BatchProcessor:
result.end_time = time.time()
def _mock_encode(self, item: BatchItem, message: str, credentials: dict, compress: bool) -> None:
def _do_encode(
self,
item: BatchItem,
message: Optional[str],
file_payload: Optional[Path],
creds: BatchCredentials,
compress: bool
) -> None:
"""
Perform actual encoding using stegasoo.encode.
Override this method to customize encoding behavior.
"""
try:
from .encode import encode, encode_file
from .models import FilePayload
# Read carrier image
carrier_image = item.input_path.read_bytes()
if file_payload:
# Encode file
payload = FilePayload.from_file(str(file_payload))
result = encode(
message=payload,
reference_photo=creds.reference_photo,
carrier_image=carrier_image,
passphrase=creds.passphrase,
pin=creds.pin,
rsa_key_data=creds.rsa_key_data,
rsa_password=creds.rsa_password,
)
else:
# Encode text message
result = encode(
message=message,
reference_photo=creds.reference_photo,
carrier_image=carrier_image,
passphrase=creds.passphrase,
pin=creds.pin,
rsa_key_data=creds.rsa_key_data,
rsa_password=creds.rsa_password,
)
# Write output
if item.output_path:
item.output_path.write_bytes(result.stego_image)
except ImportError:
# Fallback to mock if stegasoo.encode not available
self._mock_encode(item, message, creds, compress)
def _do_decode(
self,
item: BatchItem,
creds: BatchCredentials,
) -> str:
"""
Perform actual decoding using stegasoo.decode.
Override this method to customize decoding behavior.
"""
try:
from .decode import decode
# Read stego image
stego_image = item.input_path.read_bytes()
result = decode(
stego_image=stego_image,
reference_photo=creds.reference_photo,
passphrase=creds.passphrase,
pin=creds.pin,
rsa_key_data=creds.rsa_key_data,
rsa_password=creds.rsa_password,
)
if result.is_text:
return result.message or ""
else:
# File payload - save it
if item.output_path and result.file_data:
output_file = item.output_path / (result.filename or "extracted_file")
output_file.write_bytes(result.file_data)
return f"File extracted: {result.filename or 'extracted_file'}"
return f"[File: {result.filename or 'binary data'}]"
except ImportError:
# Fallback to mock if stegasoo.decode not available
return self._mock_decode(item, creds)
def _mock_encode(
self,
item: BatchItem,
message: str,
creds: BatchCredentials,
compress: bool
) -> None:
"""Mock encode for testing - replace with actual stego.encode()"""
# This is a placeholder - in real usage, you'd call your actual encode function
# For now, just copy the file to simulate encoding
import shutil
if item.output_path:
shutil.copy(item.input_path, item.output_path)
def _mock_decode(self, item: BatchItem, credentials: dict) -> str:
def _mock_decode(self, item: BatchItem, creds: BatchCredentials) -> str:
"""Mock decode for testing - replace with actual stego.decode()"""
# This is a placeholder - in real usage, you'd call your actual decode function
return "[Decoded message would appear here]"

View File

@@ -1,5 +1,5 @@
"""
Channel Key Management for Stegasoo
Channel Key Management for Stegasoo (v3.2.0)
A channel key ties encode/decode operations to a specific deployment or group.
Messages encoded with one channel key can only be decoded by systems with the
@@ -15,6 +15,16 @@ Storage priority:
1. Environment variable: STEGASOO_CHANNEL_KEY
2. Config file: ~/.stegasoo/channel.key or ./config/channel.key
3. None (public mode - compatible with any instance without a channel key)
STATUS: This module is IMPLEMENTED but NOT YET INTEGRATED into crypto.py.
The get_channel_key_hash() function should be mixed into key derivation
in a future release.
TODO (v3.3.0):
- Integrate get_channel_key_hash() into derive_hybrid_key() in crypto.py
- Add --channel-key option to CLI
- Add channel key display to web UI
- Document channel key feature in README
"""
import os
@@ -247,9 +257,12 @@ def get_channel_key_hash(key: Optional[str] = None) -> Optional[bytes]:
"""
Get the channel key as a 32-byte hash suitable for key derivation.
This hash is mixed into the Argon2 key derivation to bind
This hash is designed to be mixed into the Argon2 key derivation to bind
encryption to a specific channel.
NOTE: This function is implemented but not yet integrated into crypto.py.
See TODO at top of file for integration plan.
Args:
key: Channel key (if None, reads from config)

View File

@@ -1,7 +1,11 @@
"""
Stegasoo CLI Module
Stegasoo CLI Module (v3.2.0)
Command-line interface with batch processing and compression support.
Changes in v3.2.0:
- Updated to use DEFAULT_PASSPHRASE_WORDS (consistency with v3.2.0 naming)
- Updated help text to use 'passphrase' terminology
"""
import sys
@@ -16,7 +20,7 @@ from .constants import (
MAX_MESSAGE_SIZE,
MAX_FILE_PAYLOAD_SIZE,
DEFAULT_PIN_LENGTH,
DEFAULT_PHRASE_WORDS,
DEFAULT_PASSPHRASE_WORDS, # v3.2.0: renamed from DEFAULT_PHRASE_WORDS
)
from .compression import (
CompressionAlgorithm,
@@ -60,8 +64,8 @@ def cli(ctx, json_output):
@click.option('-f', '--file', 'file_payload', type=click.Path(exists=True),
help='File to embed instead of message')
@click.option('-o', '--output', type=click.Path(), help='Output image path')
@click.option('--phrase', prompt=True, hide_input=True,
confirmation_prompt=True, help='Passphrase')
@click.option('--passphrase', prompt=True, hide_input=True,
confirmation_prompt=True, help='Passphrase (recommend 4+ words)')
@click.option('--pin', prompt=True, hide_input=True,
confirmation_prompt=True, help='PIN code')
@click.option('--compress/--no-compress', default=True,
@@ -70,14 +74,14 @@ def cli(ctx, json_output):
default='zlib', help='Compression algorithm')
@click.option('--dry-run', is_flag=True, help='Show capacity usage without encoding')
@click.pass_context
def encode(ctx, image, message, file_payload, output, phrase, pin,
def encode(ctx, image, message, file_payload, output, passphrase, pin,
compress, algorithm, dry_run):
"""
Encode a message or file into an image.
Examples:
stegasoo encode photo.png -m "Secret message" --phrase --pin
stegasoo encode photo.png -m "Secret message" --passphrase --pin
stegasoo encode photo.png -f secret.pdf -o encoded.png
"""
@@ -109,7 +113,7 @@ def encode(ctx, image, message, file_payload, output, phrase, pin,
# Get image capacity
with Image.open(image) as img:
width, height = img.size
capacity_bytes = (width * height * 3 // 8) - 100
capacity_bytes = (width * height * 3 // 8) - 69 # v3.2.0: corrected overhead
if dry_run:
result = {
@@ -153,18 +157,18 @@ def encode(ctx, image, message, file_payload, output, phrase, pin,
@cli.command()
@click.argument('image', type=click.Path(exists=True))
@click.option('--phrase', prompt=True, hide_input=True, help='Passphrase')
@click.option('--passphrase', prompt=True, hide_input=True, help='Passphrase')
@click.option('--pin', prompt=True, hide_input=True, help='PIN code')
@click.option('-o', '--output', type=click.Path(),
help='Output path for file payloads')
@click.pass_context
def decode(ctx, image, phrase, pin, output):
def decode(ctx, image, passphrase, pin, output):
"""
Decode a message or file from an image.
Examples:
stegasoo decode encoded.png --phrase --pin
stegasoo decode encoded.png --passphrase --pin
stegasoo decode encoded.png -o ./extracted/
"""
@@ -201,8 +205,8 @@ def batch():
@click.option('-o', '--output-dir', type=click.Path(),
help='Output directory (default: same as input)')
@click.option('--suffix', default='_encoded', help='Output filename suffix')
@click.option('--phrase', prompt=True, hide_input=True,
confirmation_prompt=True, help='Passphrase')
@click.option('--passphrase', prompt=True, hide_input=True,
confirmation_prompt=True, help='Passphrase (recommend 4+ words)')
@click.option('--pin', prompt=True, hide_input=True,
confirmation_prompt=True, help='PIN code')
@click.option('--compress/--no-compress', default=True,
@@ -215,13 +219,13 @@ def batch():
@click.option('-v', '--verbose', is_flag=True, help='Show detailed output')
@click.pass_context
def batch_encode(ctx, images, message, file_payload, output_dir, suffix,
phrase, pin, compress, algorithm, recursive, jobs, verbose):
passphrase, pin, compress, algorithm, recursive, jobs, verbose):
"""
Encode message into multiple images.
Examples:
stegasoo batch encode *.png -m "Secret" --phrase --pin
stegasoo batch encode *.png -m "Secret" --passphrase --pin
stegasoo batch encode ./photos/ -r -o ./encoded/
"""
@@ -236,7 +240,8 @@ def batch_encode(ctx, images, message, file_payload, output_dir, suffix,
status = "" if item.status.value == "success" else ""
click.echo(f"[{current}/{total}] {status} {item.input_path.name}")
credentials = {"phrase": phrase, "pin": pin}
# v3.2.0: Use 'passphrase' key instead of 'phrase'
credentials = {"passphrase": passphrase, "pin": pin}
result = processor.batch_encode(
images=list(images),
@@ -260,20 +265,20 @@ def batch_encode(ctx, images, message, file_payload, output_dir, suffix,
@click.argument('images', nargs=-1, required=True, type=click.Path(exists=True))
@click.option('-o', '--output-dir', type=click.Path(),
help='Output directory for file payloads')
@click.option('--phrase', prompt=True, hide_input=True, help='Passphrase')
@click.option('--passphrase', prompt=True, hide_input=True, help='Passphrase')
@click.option('--pin', prompt=True, hide_input=True, help='PIN code')
@click.option('-r', '--recursive', is_flag=True,
help='Search directories recursively')
@click.option('-j', '--jobs', default=4, help='Parallel workers (default: 4)')
@click.option('-v', '--verbose', is_flag=True, help='Show detailed output')
@click.pass_context
def batch_decode(ctx, images, output_dir, phrase, pin, recursive, jobs, verbose):
def batch_decode(ctx, images, output_dir, passphrase, pin, recursive, jobs, verbose):
"""
Decode messages from multiple images.
Examples:
stegasoo batch decode encoded*.png --phrase --pin
stegasoo batch decode encoded*.png --passphrase --pin
stegasoo batch decode ./encoded/ -r -o ./extracted/
"""
@@ -285,7 +290,8 @@ def batch_decode(ctx, images, output_dir, phrase, pin, recursive, jobs, verbose)
status = "" if item.status.value == "success" else ""
click.echo(f"[{current}/{total}] {status} {item.input_path.name}")
credentials = {"phrase": phrase, "pin": pin}
# v3.2.0: Use 'passphrase' key instead of 'phrase'
credentials = {"passphrase": passphrase, "pin": pin}
result = processor.batch_decode(
images=list(images),
@@ -348,14 +354,14 @@ def batch_check(ctx, images, recursive):
# =============================================================================
@cli.command()
@click.option('--words', default=DEFAULT_PHRASE_WORDS,
help=f'Number of words (default: {DEFAULT_PHRASE_WORDS})')
@click.option('--words', default=DEFAULT_PASSPHRASE_WORDS,
help=f'Number of words in passphrase (default: {DEFAULT_PASSPHRASE_WORDS})')
@click.option('--pin-length', default=DEFAULT_PIN_LENGTH,
help=f'PIN length (default: {DEFAULT_PIN_LENGTH})')
@click.pass_context
def generate(ctx, words, pin_length):
"""
Generate random credentials (phrase + PIN).
Generate random credentials (passphrase + PIN).
Examples:
@@ -367,25 +373,35 @@ def generate(ctx, words, pin_length):
# Generate PIN
pin = ''.join(str(secrets.randbelow(10)) for _ in range(pin_length))
# Ensure PIN doesn't start with 0
if pin[0] == '0':
pin = str(secrets.randbelow(9) + 1) + pin[1:]
# Generate phrase (would use BIP-39 wordlist)
# Generate passphrase (would use BIP-39 wordlist)
# Placeholder - actual implementation uses constants.get_wordlist()
try:
from .constants import get_wordlist
wordlist = get_wordlist()
phrase_words = [secrets.choice(wordlist) for _ in range(words)]
except (ImportError, FileNotFoundError):
# Fallback for testing
sample_words = ['alpha', 'bravo', 'charlie', 'delta', 'echo', 'foxtrot',
'golf', 'hotel', 'india', 'juliet', 'kilo', 'lima']
phrase_words = [secrets.choice(sample_words) for _ in range(words)]
phrase = ' '.join(phrase_words)
passphrase = ' '.join(phrase_words)
result = {
"phrase": phrase,
"passphrase": passphrase,
"pin": pin,
"phrase_words": words,
"passphrase_words": words,
"pin_length": pin_length,
}
if ctx.obj.get('json'):
click.echo(json.dumps(result, indent=2))
else:
click.echo(f"Phrase: {phrase}")
click.echo(f"Passphrase: {passphrase}")
click.echo(f"PIN: {pin}")
click.echo("\n⚠️ Save these credentials securely - they cannot be recovered!")

View File

@@ -1,5 +1,5 @@
"""
DCT Domain Steganography Module (v3.0.2)
DCT Domain Steganography Module (v3.2.0)
Embeds data in DCT coefficients with two approaches:
1. PNG output: Scipy-based DCT transform (grayscale or color)
@@ -8,11 +8,16 @@ Embeds data in DCT coefficients with two approaches:
The JPEG approach is the "correct" way to do JPEG steganography because
it directly modifies the already-quantized coefficients without re-encoding.
New in v3.0.2:
Changes in v3.0.2:
- jpegio integration for proper JPEG coefficient embedding
- Falls back to warning if jpegio not available for JPEG output
- Maintains backward compatibility with v3.0.1
Changes in v3.2.0:
- Fixed color-mode extraction to properly extract from Y channel
- Added _extract_from_y_channel() for accurate color-mode extraction
- Improved extraction robustness for both grayscale and color modes
Requires: scipy (for PNG mode), optionally jpegio (for JPEG mode)
"""
@@ -83,6 +88,9 @@ JPEGIO_MAGIC = b'JPGS'
JPEGIO_MIN_COEF_MAGNITUDE = 2
JPEGIO_EMBED_CHANNEL = 0 # Y channel
# Flag bits for header
FLAG_COLOR_MODE = 0x01 # Set if embedded in color mode (Y channel of YCbCr)
# ============================================================================
# DATA CLASSES
@@ -167,6 +175,37 @@ def _to_grayscale(image_data: bytes) -> np.ndarray:
return np.array(gray, dtype=np.float64)
def _extract_y_channel(image_data: bytes) -> np.ndarray:
"""
Extract Y (luminance) channel from image for color-mode extraction.
This uses the same YCbCr conversion as embedding to ensure
accurate extraction from color-mode stego images.
Args:
image_data: Image file bytes
Returns:
Y channel as float64 numpy array
"""
img = Image.open(io.BytesIO(image_data))
# Convert to RGB if needed
if img.mode != 'RGB':
img = img.convert('RGB')
rgb_array = np.array(img, dtype=np.float64)
# Extract Y channel using ITU-R BT.601 (same as embedding)
R = rgb_array[:, :, 0]
G = rgb_array[:, :, 1]
B = rgb_array[:, :, 2]
Y = 0.299 * R + 0.587 * G + 0.114 * B
return Y
def _pad_to_blocks(image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
"""Pad image dimensions to be divisible by block size."""
h, w = image.shape
@@ -376,9 +415,9 @@ def _jpegio_generate_order(num_positions: int, seed: bytes) -> list:
return order
def _jpegio_create_header(data_length: int) -> bytes:
def _jpegio_create_header(data_length: int, flags: int = 0) -> bytes:
"""Create header for jpegio embedding."""
return struct.pack('>4sBBI', JPEGIO_MAGIC, 1, 0, data_length)
return struct.pack('>4sBBI', JPEGIO_MAGIC, 1, flags, data_length)
def _jpegio_parse_header(header_bytes: bytes) -> Tuple[int, int, int]:
@@ -549,6 +588,9 @@ def _embed_scipy_dct(
img = Image.open(io.BytesIO(carrier_image))
width, height = img.size
# Set flags for header
flags = FLAG_COLOR_MODE if color_mode == 'color' else 0
if color_mode == 'color' and img.mode in ('RGB', 'RGBA'):
# Color mode: convert to YCbCr, embed in Y only, preserve Cb/Cr
if img.mode == 'RGBA':
@@ -560,8 +602,8 @@ def _embed_scipy_dct(
# Pad Y channel
Y_padded, original_size = _pad_to_blocks(Y)
# Embed in Y channel
Y_embedded = _embed_in_channel(Y_padded, data, seed, capacity_info)
# Embed in Y channel (with color flag)
Y_embedded = _embed_in_channel(Y_padded, data, seed, capacity_info, flags)
# Unpad
Y_result = _unpad_image(Y_embedded, original_size)
@@ -576,13 +618,13 @@ def _embed_scipy_dct(
image = _to_grayscale(carrier_image)
padded, original_size = _pad_to_blocks(image)
embedded = _embed_in_channel(padded, data, seed, capacity_info)
embedded = _embed_in_channel(padded, data, seed, capacity_info, flags)
result = _unpad_image(embedded, original_size)
stego_bytes = _save_stego_image(result, output_format)
# Calculate stats
header = _create_header(len(data))
header = _create_header(len(data), flags)
payload = header + data
bits = len(payload) * 8
@@ -607,9 +649,10 @@ def _embed_in_channel(
data: bytes,
seed: bytes,
capacity_info: DCTCapacityInfo,
flags: int = 0,
) -> np.ndarray:
"""Embed data in a single channel using DCT."""
header = _create_header(len(data))
header = _create_header(len(data), flags)
payload = header + data
bits = []
@@ -677,14 +720,14 @@ def _embed_jpegio(
input_path = _jpegio_bytes_to_file(carrier_image, suffix='.jpg')
output_path = tempfile.mktemp(suffix='.jpg')
# Set flags
flags = FLAG_COLOR_MODE if color_mode == 'color' else 0
try:
# Read JPEG with jpegio
jpeg = jio.read(input_path)
# Get Y channel coefficients (channel 0)
# For grayscale mode, we could convert to grayscale, but jpegio
# works with the original JPEG which already has color info.
# The color_mode primarily affects the output interpretation.
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
# Find usable positions
@@ -693,8 +736,8 @@ def _embed_jpegio(
# Generate pseudo-random order
order = _jpegio_generate_order(len(all_positions), seed)
# Create payload
header = _jpegio_create_header(len(data))
# Create payload with flags
header = _jpegio_create_header(len(data), flags)
payload = header + data
# Convert to bits
@@ -764,7 +807,8 @@ def extract_from_dct(
"""
Extract data from DCT stego image.
Automatically detects whether image uses scipy DCT or jpegio embedding.
Automatically detects whether image uses scipy DCT or jpegio embedding,
and handles both grayscale and color modes.
Args:
stego_image: Stego image bytes
@@ -790,9 +834,28 @@ def extract_from_dct(
def _extract_scipy_dct(stego_image: bytes, seed: bytes) -> bytes:
"""Extract using scipy DCT (for PNG images)."""
image = _to_grayscale(stego_image)
padded, original_size = _pad_to_blocks(image)
"""
Extract using scipy DCT (for PNG images).
v3.2.0: Now properly handles both grayscale and color modes by
first trying to detect the mode from header flags, then extracting
from the appropriate channel.
"""
# First, try extracting from grayscale to get header and detect mode
# This works because even color-mode images can be converted to grayscale
# and the Y channel ≈ grayscale for extraction purposes
# Try Y channel extraction first (works for both color and grayscale)
img = Image.open(io.BytesIO(stego_image))
if img.mode in ('RGB', 'RGBA'):
# Extract from Y channel (more accurate for color-mode images)
channel = _extract_y_channel(stego_image)
else:
# Grayscale image
channel = _to_grayscale(stego_image)
padded, original_size = _pad_to_blocks(channel)
h, w = padded.shape
blocks_x = w // BLOCK_SIZE
@@ -816,7 +879,7 @@ def _extract_scipy_dct(stego_image: bytes, seed: bytes) -> bytes:
if len(all_bits) >= HEADER_SIZE * 8:
try:
_, _, data_length = _parse_header(all_bits[:HEADER_SIZE * 8])
_, flags, data_length = _parse_header(all_bits[:HEADER_SIZE * 8])
total_needed = (HEADER_SIZE + data_length) * 8
if len(all_bits) >= total_needed:
break
@@ -825,6 +888,9 @@ def _extract_scipy_dct(stego_image: bytes, seed: bytes) -> bytes:
version, flags, data_length = _parse_header(all_bits)
# Check if color mode flag is set (for informational purposes)
is_color_mode = bool(flags & FLAG_COLOR_MODE)
data_bits = all_bits[HEADER_SIZE * 8:(HEADER_SIZE + data_length) * 8]
data = bytes([

View File

@@ -1,7 +1,12 @@
"""
Stegasoo Key Generation
Stegasoo Key Generation (v3.2.0)
Generate PINs, passphrases, and RSA keys.
Changes in v3.2.0:
- generate_credentials() now returns Credentials with single passphrase
- Removed generate_day_phrases() from main API (kept for legacy compatibility)
- Updated to use PASSPHRASE constants
"""
import secrets
@@ -16,7 +21,7 @@ from cryptography.hazmat.backends import default_backend
from .constants import (
DAY_NAMES,
MIN_PIN_LENGTH, MAX_PIN_LENGTH, DEFAULT_PIN_LENGTH,
MIN_PHRASE_WORDS, MAX_PHRASE_WORDS, DEFAULT_PHRASE_WORDS,
MIN_PASSPHRASE_WORDS, MAX_PASSPHRASE_WORDS, DEFAULT_PASSPHRASE_WORDS,
MIN_RSA_BITS, VALID_RSA_SIZES, DEFAULT_RSA_BITS,
get_wordlist,
)
@@ -57,7 +62,7 @@ def generate_pin(length: int = DEFAULT_PIN_LENGTH) -> str:
return pin
def generate_phrase(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> str:
def generate_phrase(words_per_phrase: int = DEFAULT_PASSPHRASE_WORDS) -> str:
"""
Generate a random passphrase from BIP-39 wordlist.
@@ -68,13 +73,13 @@ def generate_phrase(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> str:
Space-separated phrase
Example:
>>> generate_phrase(3)
"apple forest thunder"
>>> generate_phrase(4)
"apple forest thunder mountain"
"""
debug.validate(MIN_PHRASE_WORDS <= words_per_phrase <= MAX_PHRASE_WORDS,
f"Words per phrase must be between {MIN_PHRASE_WORDS} and {MAX_PHRASE_WORDS}")
debug.validate(MIN_PASSPHRASE_WORDS <= words_per_phrase <= MAX_PASSPHRASE_WORDS,
f"Words per phrase must be between {MIN_PASSPHRASE_WORDS} and {MAX_PASSPHRASE_WORDS}")
words_per_phrase = max(MIN_PHRASE_WORDS, min(MAX_PHRASE_WORDS, words_per_phrase))
words_per_phrase = max(MIN_PASSPHRASE_WORDS, min(MAX_PASSPHRASE_WORDS, words_per_phrase))
wordlist = get_wordlist()
words = [secrets.choice(wordlist) for _ in range(words_per_phrase)]
@@ -83,10 +88,17 @@ def generate_phrase(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> str:
return phrase
def generate_day_phrases(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> Dict[str, str]:
# Alias for backward compatibility and public API consistency
generate_passphrase = generate_phrase
def generate_day_phrases(words_per_phrase: int = DEFAULT_PASSPHRASE_WORDS) -> Dict[str, str]:
"""
Generate phrases for all days of the week.
DEPRECATED in v3.2.0: Use generate_phrase() for single passphrase.
Kept for legacy compatibility and organizational use cases.
Args:
words_per_phrase: Number of words per phrase (3-12)
@@ -97,6 +109,14 @@ def generate_day_phrases(words_per_phrase: int = DEFAULT_PHRASE_WORDS) -> Dict[s
>>> generate_day_phrases(3)
{'Monday': 'apple forest thunder', 'Tuesday': 'banana river lightning', ...}
"""
import warnings
warnings.warn(
"generate_day_phrases() is deprecated in v3.2.0. "
"Use generate_phrase() for single passphrase.",
DeprecationWarning,
stacklevel=2
)
phrases = {day: generate_phrase(words_per_phrase) for day in DAY_NAMES}
debug.print(f"Generated phrases for {len(phrases)} days")
return phrases
@@ -272,13 +292,89 @@ def generate_credentials(
use_rsa: bool = False,
pin_length: int = DEFAULT_PIN_LENGTH,
rsa_bits: int = DEFAULT_RSA_BITS,
words_per_phrase: int = DEFAULT_PHRASE_WORDS
passphrase_words: int = DEFAULT_PASSPHRASE_WORDS,
rsa_password: Optional[str] = None,
) -> Credentials:
"""
Generate a complete set of credentials.
v3.2.0: Now generates a single passphrase instead of daily phrases.
At least one of use_pin or use_rsa must be True.
Args:
use_pin: Whether to generate a PIN
use_rsa: Whether to generate an RSA key
pin_length: PIN length if generating (default 6)
rsa_bits: RSA key size if generating (default 2048)
passphrase_words: Words in passphrase (default 4)
rsa_password: Optional password for RSA key encryption
Returns:
Credentials object with passphrase, PIN, and/or RSA key
Raises:
ValueError: If neither PIN nor RSA is selected
Example:
>>> creds = generate_credentials(use_pin=True, use_rsa=False)
>>> creds.passphrase
"apple forest thunder mountain"
>>> creds.pin
"812345"
"""
debug.validate(use_pin or use_rsa,
"Must select at least one security factor (PIN or RSA key)")
if not use_pin and not use_rsa:
raise ValueError("Must select at least one security factor (PIN or RSA key)")
debug.print(f"Generating credentials: PIN={use_pin}, RSA={use_rsa}, "
f"passphrase_words={passphrase_words}")
# Generate single passphrase (v3.2.0 - no daily rotation)
passphrase = generate_phrase(passphrase_words)
# Generate PIN if requested
pin = generate_pin(pin_length) if use_pin else None
# Generate RSA key if requested
rsa_key_pem = None
if use_rsa:
rsa_key_obj = generate_rsa_key(rsa_bits)
rsa_key_pem = export_rsa_key_pem(rsa_key_obj, rsa_password).decode('utf-8')
# Create Credentials object (v3.2.0 format with single passphrase)
creds = Credentials(
passphrase=passphrase,
pin=pin,
rsa_key_pem=rsa_key_pem,
rsa_bits=rsa_bits if use_rsa else None,
words_per_passphrase=passphrase_words,
)
debug.print(f"Credentials generated: {creds.total_entropy} bits total entropy")
return creds
# =============================================================================
# LEGACY COMPATIBILITY
# =============================================================================
def generate_credentials_legacy(
use_pin: bool = True,
use_rsa: bool = False,
pin_length: int = DEFAULT_PIN_LENGTH,
rsa_bits: int = DEFAULT_RSA_BITS,
words_per_phrase: int = DEFAULT_PASSPHRASE_WORDS
) -> dict:
"""
Generate credentials in legacy format (v3.1.0 style with daily phrases).
DEPRECATED: Use generate_credentials() for v3.2.0 format.
This function exists only for migration tools that need to work with
old-format credentials.
Args:
use_pin: Whether to generate a PIN
use_rsa: Whether to generate an RSA key
@@ -287,44 +383,33 @@ def generate_credentials(
words_per_phrase: Words per daily phrase
Returns:
Credentials object
Raises:
ValueError: If neither PIN nor RSA is selected
Example:
>>> creds = generate_credentials(use_pin=True, use_rsa=False)
>>> creds.pin
"812345"
>>> creds.phrases['Monday']
"apple forest thunder"
Dict with 'phrases' (dict), 'pin', 'rsa_key_pem', etc.
"""
debug.validate(use_pin or use_rsa,
"Must select at least one security factor (PIN or RSA key)")
import warnings
warnings.warn(
"generate_credentials_legacy() returns v3.1.0 format. "
"Use generate_credentials() for v3.2.0 format.",
DeprecationWarning,
stacklevel=2
)
if not use_pin and not use_rsa:
raise ValueError("Must select at least one security factor (PIN or RSA key)")
debug.print(f"Generating credentials: PIN={use_pin}, RSA={use_rsa}, "
f"words={words_per_phrase}")
phrases = generate_day_phrases(words_per_phrase)
# Generate daily phrases (old format)
phrases = {day: generate_phrase(words_per_phrase) for day in DAY_NAMES}
pin = generate_pin(pin_length) if use_pin else None
rsa_key_pem = None
rsa_key_obj = None
if use_rsa:
rsa_key_obj = generate_rsa_key(rsa_bits)
rsa_key_pem = export_rsa_key_pem(rsa_key_obj).decode('utf-8')
creds = Credentials(
phrases=phrases,
pin=pin,
rsa_key_pem=rsa_key_pem,
rsa_bits=rsa_bits if use_rsa else None,
words_per_phrase=words_per_phrase
)
debug.print(f"Credentials generated: {creds.total_entropy} bits total entropy")
return creds
return {
'phrases': phrases,
'pin': pin,
'rsa_key_pem': rsa_key_pem,
'rsa_bits': rsa_bits if use_rsa else None,
'words_per_phrase': words_per_phrase,
}

View File

@@ -1,17 +1,21 @@
"""
Stegasoo Steganography Functions (v3.0.1)
Stegasoo Steganography Functions (v3.2.0)
LSB and DCT embedding modes with pseudo-random pixel/coefficient selection.
New in v3.0:
Changes in v3.0:
- DCT domain embedding mode (requires scipy)
- embed_mode parameter for encode/decode
- Auto-detection of embedding mode
- Comparison utilities
New in v3.0.1:
Changes in v3.0.1:
- dct_output_format parameter for DCT mode ('png' or 'jpeg')
- dct_color_mode parameter for DCT mode ('grayscale' or 'color')
Changes in v3.2.0:
- Fixed HEADER_OVERHEAD constant (65 bytes, not 104 - date field removed)
- Updated ENCRYPTION_OVERHEAD calculation
"""
import io
@@ -51,10 +55,24 @@ EXT_TO_FORMAT = {
'tif': 'TIFF',
}
# Overhead constants for capacity estimation
HEADER_OVERHEAD = 104 # Magic + version + date + salt + iv + tag
LENGTH_PREFIX = 4 # 4 bytes for payload length
ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX
# =============================================================================
# OVERHEAD CONSTANTS (v3.2.0 - Updated for date-independent format)
# =============================================================================
# v3.2.0 Header format (no date field):
# Magic: 4 bytes (\x89ST3)
# Version: 1 byte (4 for v3.2.0)
# Salt: 32 bytes
# IV: 12 bytes
# Tag: 16 bytes
# -----------------
# Total: 65 bytes
#
# Previous v3.1.0 had date field (10 bytes + 1 byte length) = 76 bytes header
# The old value of 104 was incorrect even for v3.1.0
HEADER_OVERHEAD = 65 # v3.2.0: Magic + version + salt + iv + tag
LENGTH_PREFIX = 4 # 4 bytes for payload length in LSB embedding
ENCRYPTION_OVERHEAD = HEADER_OVERHEAD + LENGTH_PREFIX # 69 bytes total
# DCT output format options (v3.0.1)
DCT_OUTPUT_PNG = 'png'
@@ -167,6 +185,9 @@ def will_fit(
capacity = calculate_capacity(carrier_image, bits_per_channel)
# Estimate encrypted size with padding
# Padding adds 64-319 bytes, rounded up to 256-byte boundary
# Average case: ~190 bytes padding
estimated_padding = 190
estimated_encrypted_size = payload_size + estimated_padding + ENCRYPTION_OVERHEAD
@@ -175,7 +196,7 @@ def will_fit(
try:
import zlib
compressed = zlib.compress(payload_data, level=6)
compressed_size = len(compressed) + 9
compressed_size = len(compressed) + 9 # Compression header
if compressed_size < payload_size:
compressed_estimate = compressed_size
estimated_encrypted_size = compressed_size + estimated_padding + ENCRYPTION_OVERHEAD
@@ -301,7 +322,7 @@ def will_fit_by_mode(
else:
payload_size = len(payload)
estimated_size = payload_size + ENCRYPTION_OVERHEAD + 190
estimated_size = payload_size + ENCRYPTION_OVERHEAD + 190 # padding estimate
dct_mod = _get_dct_module()
fits = dct_mod.will_fit_dct(estimated_size, carrier_image)
@@ -481,8 +502,8 @@ def embed_in_image(
bits_per_channel: int = 1,
output_format: Optional[str] = None,
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = DCT_OUTPUT_PNG, # NEW in v3.0.1
dct_color_mode: str = 'grayscale', # NEW in v3.0.1: 'grayscale' or 'color'
dct_output_format: str = DCT_OUTPUT_PNG,
dct_color_mode: str = 'grayscale',
) -> Tuple[bytes, Union[EmbedStats, 'DCTEmbedStats'], str]:
"""
Embed data into an image using specified mode.
@@ -535,7 +556,7 @@ def embed_in_image(
image_data,
pixel_key,
output_format=dct_output_format,
color_mode=dct_color_mode, # NEW in v3.0.1
color_mode=dct_color_mode,
)
# Determine extension based on output format