More snazzy 4.0 Web UI improvements.

This commit is contained in:
Aaron D. Lee
2026-01-02 15:45:43 -05:00
parent 1bb3589baf
commit 6fa4b447db
26 changed files with 4282 additions and 2282 deletions

View File

@@ -1,8 +1,13 @@
"""
Stegasoo - Secure Steganography with Multi-Factor Authentication (v3.2.0)
Stegasoo - Secure Steganography with Multi-Factor Authentication (v4.0.0)
Changes in v4.0.0:
- Added channel key support for deployment/group isolation
- New functions: get_channel_key, get_channel_fingerprint, generate_channel_key, etc.
- encode() and decode() now accept channel_key parameter
"""
__version__ = "3.2.0"
__version__ = "4.0.0"
# Core functionality
from .encode import encode
@@ -28,7 +33,19 @@ from .image_utils import (
from .utils import generate_filename
# Crypto functions
from .crypto import has_argon2
from .crypto import has_argon2, get_active_channel_key, get_channel_fingerprint
# Channel key management (v4.0.0)
from .channel import (
generate_channel_key,
get_channel_key,
set_channel_key,
clear_channel_key,
has_channel_key,
get_channel_status,
validate_channel_key,
format_channel_key,
)
# Steganography functions
from .steganography import (
@@ -150,6 +167,18 @@ __all__ = [
"export_rsa_key_pem",
"load_rsa_key",
# Channel key management (v4.0.0)
"generate_channel_key",
"get_channel_key",
"set_channel_key",
"clear_channel_key",
"has_channel_key",
"get_channel_status",
"validate_channel_key",
"format_channel_key",
"get_active_channel_key",
"get_channel_fingerprint",
# Image utilities
"get_image_info",
"compare_capacity",
@@ -183,6 +212,7 @@ __all__ = [
"validate_embed_mode",
"validate_dct_output_format",
"validate_dct_color_mode",
"validate_channel_key",
# Models
"ImageInfo",

View File

@@ -1,5 +1,5 @@
"""
Channel Key Management for Stegasoo (v3.2.0)
Channel Key Management for Stegasoo (v4.0.0)
A channel key ties encode/decode operations to a specific deployment or group.
Messages encoded with one channel key can only be decoded by systems with the
@@ -16,15 +16,12 @@ Storage priority:
2. Config file: ~/.stegasoo/channel.key or ./config/channel.key
3. None (public mode - compatible with any instance without a channel key)
STATUS: This module is IMPLEMENTED but NOT YET INTEGRATED into crypto.py.
The get_channel_key_hash() function should be mixed into key derivation
in a future release.
TODO (v3.3.0):
- Integrate get_channel_key_hash() into derive_hybrid_key() in crypto.py
- Add --channel-key option to CLI
- Add channel key display to web UI
- Document channel key feature in README
INTEGRATION STATUS (v4.0.0):
- ✅ get_channel_key_hash() integrated into derive_hybrid_key() in crypto.py
- ✅ get_channel_key_hash() integrated into derive_pixel_key() in crypto.py
- ✅ channel_key parameter added to encode() and decode() functions
- ✅ Header flags indicate whether message was encoded with channel key
- ✅ Helpful error messages for channel key mismatches
"""
import os
@@ -257,12 +254,9 @@ def get_channel_key_hash(key: Optional[str] = None) -> Optional[bytes]:
"""
Get the channel key as a 32-byte hash suitable for key derivation.
This hash is designed to be mixed into the Argon2 key derivation to bind
This hash is mixed into the Argon2 key derivation to bind
encryption to a specific channel.
NOTE: This function is implemented but not yet integrated into crypto.py.
See TODO at top of file for integration plan.
Args:
key: Channel key (if None, reads from config)

View File

@@ -1,14 +1,17 @@
"""
Stegasoo Constants and Configuration (v3.2.0 - Date Independent)
Stegasoo Constants and Configuration (v4.0.0 - Channel Key Support)
Central location for all magic numbers, limits, and crypto parameters.
All version numbers, limits, and configuration values should be defined here.
BREAKING CHANGES in v4.0.0:
- Added channel key support for deployment/group isolation
- FORMAT_VERSION bumped to 5 (adds flags byte to header)
- Header size increased by 1 byte for flags
BREAKING CHANGES in v3.2.0:
- Removed date dependency from cryptographic operations
- Renamed day_phrase → passphrase throughout codebase
- FORMAT_VERSION bumped to 4 to indicate incompatibility
- Increased default passphrase length to compensate for removed date entropy
"""
import os
@@ -28,8 +31,9 @@ MAGIC_HEADER = b'\x89ST3'
# FORMAT VERSION HISTORY:
# Version 1-3: Date-dependent encryption (v3.0.x - v3.1.x)
# Version 4: Date-independent encryption (v3.2.0+) - BREAKING CHANGE
FORMAT_VERSION = 4
# Version 4: Date-independent encryption (v3.2.0)
# Version 5: Channel key support (v4.0.0) - adds flags byte to header
FORMAT_VERSION = 5
# Payload type markers
PAYLOAD_TEXT = 0x01

View File

@@ -1,15 +1,18 @@
"""
Stegasoo Cryptographic Functions (v3.2.0 - Date Independent)
Stegasoo Cryptographic Functions (v4.0.0 - Channel Key Support)
Key derivation, encryption, and decryption using AES-256-GCM.
Supports both text messages and binary file payloads.
BREAKING CHANGES in v4.0.0:
- Added channel key support for deployment/group isolation
- Messages encoded with a channel key require the same key to decode
- Channel key can be configured via environment, config file, or explicit parameter
- FORMAT_VERSION bumped to 5
BREAKING CHANGES in v3.2.0:
- Removed date dependency from key derivation
- Renamed day_phrase → passphrase (no daily rotation needed)
- Messages can now be decoded without knowing encoding date
- Enables true asynchronous communication
- NOT backward compatible with v3.1.0 and earlier
"""
import io
@@ -46,6 +49,51 @@ except ImportError:
from cryptography.hazmat.primitives import hashes
# =============================================================================
# CHANNEL KEY RESOLUTION
# =============================================================================
# Sentinel value for "use auto-detected channel key"
CHANNEL_KEY_AUTO = "auto"
def _resolve_channel_key(channel_key: Optional[Union[str, bool]]) -> Optional[bytes]:
"""
Resolve channel key parameter to actual key hash.
Args:
channel_key: Channel key parameter with these behaviors:
- None or "auto": Use server's configured key (from env/config)
- str (valid key): Use this specific key
- "" or False: Explicitly use NO channel key (public mode)
Returns:
32-byte channel key hash, or None for public mode
"""
# Explicit public mode
if channel_key == "" or channel_key is False:
return None
# Auto-detect from environment/config
if channel_key is None or channel_key == CHANNEL_KEY_AUTO:
from .channel import get_channel_key_hash
return get_channel_key_hash()
# Explicit key provided - validate and hash it
if isinstance(channel_key, str):
from .channel import format_channel_key, validate_channel_key
if not validate_channel_key(channel_key):
raise ValueError(f"Invalid channel key format: {channel_key}")
formatted = format_channel_key(channel_key)
return hashlib.sha256(formatted.encode('utf-8')).digest()
raise ValueError(f"Invalid channel_key type: {type(channel_key)}")
# =============================================================================
# CORE CRYPTO FUNCTIONS
# =============================================================================
def hash_photo(image_data: bytes) -> bytes:
"""
Compute deterministic hash of photo pixel content.
@@ -73,7 +121,8 @@ def derive_hybrid_key(
passphrase: str,
salt: bytes,
pin: str = "",
rsa_key_data: Optional[bytes] = None
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
) -> bytes:
"""
Derive encryption key from multiple factors.
@@ -83,19 +132,21 @@ def derive_hybrid_key(
- Passphrase (something you know)
- PIN (something you know, static)
- RSA key (something you have)
- Channel key (deployment/group binding)
- Salt (random per message)
Uses Argon2id if available, falls back to PBKDF2.
NOTE: v3.2.0 removed date dependency and daily rotation.
Use a strong static passphrase instead (recommend 4+ words).
Args:
photo_data: Reference photo bytes
passphrase: Shared passphrase (recommend 4+ words)
salt: Random salt for this message
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
channel_key: Channel key parameter:
- None or "auto": Use configured key
- str: Use this specific key
- "" or False: No channel key (public mode)
Returns:
32-byte derived key
@@ -106,6 +157,10 @@ def derive_hybrid_key(
try:
photo_hash = hash_photo(photo_data)
# Resolve channel key
channel_hash = _resolve_channel_key(channel_key)
# Build key material
key_material = (
photo_hash +
passphrase.lower().encode() +
@@ -117,6 +172,10 @@ def derive_hybrid_key(
if rsa_key_data:
key_material += hashlib.sha256(rsa_key_data).digest()
# Add channel key hash if configured (v4.0.0)
if channel_hash:
key_material += channel_hash
if HAS_ARGON2:
key = hash_secret_raw(
secret=key_material,
@@ -147,7 +206,8 @@ def derive_pixel_key(
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
) -> bytes:
"""
Derive key for pseudo-random pixel selection.
@@ -155,19 +215,21 @@ def derive_pixel_key(
This key determines which pixels are used for embedding,
making the message location unpredictable without the correct inputs.
NOTE: v3.2.0 removed date dependency.
Args:
photo_data: Reference photo bytes
passphrase: Shared passphrase
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
channel_key: Channel key parameter (see derive_hybrid_key)
Returns:
32-byte key for pixel selection
"""
photo_hash = hash_photo(photo_data)
# Resolve channel key
channel_hash = _resolve_channel_key(channel_key)
material = (
photo_hash +
passphrase.lower().encode() +
@@ -177,6 +239,10 @@ def derive_pixel_key(
if rsa_key_data:
material += hashlib.sha256(rsa_key_data).digest()
# Add channel key hash if configured (v4.0.0)
if channel_hash:
material += channel_hash
return hashlib.sha256(material + b"pixel_selection").digest()
@@ -284,19 +350,29 @@ def _unpack_payload(data: bytes) -> DecodeResult:
return DecodeResult(payload_type='file', file_data=data)
# =============================================================================
# HEADER FLAGS (v4.0.0)
# =============================================================================
# Header flag bits
FLAG_CHANNEL_KEY = 0x01 # Set if encoded with a channel key
def encrypt_message(
message: Union[str, bytes, FilePayload],
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
) -> bytes:
"""
Encrypt message or file using AES-256-GCM with hybrid key derivation.
Message format (v3.2.0 - no date):
Message format (v4.0.0 - with channel key support):
- Magic header (4 bytes)
- Version (1 byte) = 4
- Version (1 byte) = 5
- Flags (1 byte) - indicates if channel key was used
- Salt (32 bytes)
- IV (12 bytes)
- Auth tag (16 bytes)
@@ -308,6 +384,10 @@ def encrypt_message(
passphrase: Shared passphrase (recommend 4+ words for good entropy)
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
channel_key: Channel key parameter:
- None or "auto": Use configured key
- str: Use this specific key
- "" or False: No channel key (public mode)
Returns:
Encrypted message bytes
@@ -317,9 +397,15 @@ def encrypt_message(
"""
try:
salt = secrets.token_bytes(SALT_SIZE)
key = derive_hybrid_key(photo_data, passphrase, salt, pin, rsa_key_data)
key = derive_hybrid_key(photo_data, passphrase, salt, pin, rsa_key_data, channel_key)
iv = secrets.token_bytes(IV_SIZE)
# Determine flags
flags = 0
channel_hash = _resolve_channel_key(channel_key)
if channel_hash:
flags |= FLAG_CHANNEL_KEY
# Pack payload with type marker
packed_payload, _ = _pack_payload(message)
@@ -330,16 +416,18 @@ def encrypt_message(
padding = secrets.token_bytes(padding_needed - 4) + struct.pack('>I', len(packed_payload))
padded_message = packed_payload + padding
# Build header for AAD
header = MAGIC_HEADER + bytes([FORMAT_VERSION, flags])
# Encrypt with AES-256-GCM
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
encryptor.authenticate_additional_data(MAGIC_HEADER + bytes([FORMAT_VERSION]))
encryptor.authenticate_additional_data(header)
ciphertext = encryptor.update(padded_message) + encryptor.finalize()
# v3.2.0: Simplified header without date
# v4.0.0: Header with flags byte
return (
MAGIC_HEADER +
bytes([FORMAT_VERSION]) +
header +
salt +
iv +
encryptor.tag +
@@ -354,16 +442,16 @@ def parse_header(encrypted_data: bytes) -> Optional[dict]:
"""
Parse the header from encrypted data.
v3.2.0: No date field in header.
v4.0.0: Includes flags byte for channel key indicator.
Args:
encrypted_data: Raw encrypted bytes
Returns:
Dict with salt, iv, tag, ciphertext or None if invalid
Dict with salt, iv, tag, ciphertext, flags or None if invalid
"""
# Min size: Magic(4) + Version(1) + Salt(32) + IV(12) + Tag(16) = 65 bytes
if len(encrypted_data) < 65 or encrypted_data[:4] != MAGIC_HEADER:
# Min size: Magic(4) + Version(1) + Flags(1) + Salt(32) + IV(12) + Tag(16) = 66 bytes
if len(encrypted_data) < 66 or encrypted_data[:4] != MAGIC_HEADER:
return None
try:
@@ -371,7 +459,9 @@ def parse_header(encrypted_data: bytes) -> Optional[dict]:
if version != FORMAT_VERSION:
return None
offset = 5
flags = encrypted_data[5]
offset = 6
salt = encrypted_data[offset:offset + SALT_SIZE]
offset += SALT_SIZE
iv = encrypted_data[offset:offset + IV_SIZE]
@@ -381,6 +471,9 @@ def parse_header(encrypted_data: bytes) -> Optional[dict]:
ciphertext = encrypted_data[offset:]
return {
'version': version,
'flags': flags,
'has_channel_key': bool(flags & FLAG_CHANNEL_KEY),
'salt': salt,
'iv': iv,
'tag': tag,
@@ -395,10 +488,11 @@ def decrypt_message(
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
) -> DecodeResult:
"""
Decrypt message (v3.2.0 - no date needed).
Decrypt message (v4.0.0 - with channel key support).
Args:
encrypted_data: Encrypted message bytes
@@ -406,6 +500,7 @@ def decrypt_message(
passphrase: Shared passphrase
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
channel_key: Channel key parameter (see encrypt_message)
Returns:
DecodeResult with decrypted content
@@ -418,18 +513,26 @@ def decrypt_message(
if not header:
raise InvalidHeaderError("Invalid or missing Stegasoo header")
# Check for channel key mismatch and provide helpful error
channel_hash = _resolve_channel_key(channel_key)
has_configured_key = channel_hash is not None
message_has_key = header['has_channel_key']
try:
key = derive_hybrid_key(
photo_data, passphrase, header['salt'], pin, rsa_key_data
photo_data, passphrase, header['salt'], pin, rsa_key_data, channel_key
)
# Reconstruct header for AAD verification
aad_header = MAGIC_HEADER + bytes([FORMAT_VERSION, header['flags']])
cipher = Cipher(
algorithms.AES(key),
modes.GCM(header['iv'], header['tag']),
backend=default_backend()
)
decryptor = cipher.decryptor()
decryptor.authenticate_additional_data(MAGIC_HEADER + bytes([FORMAT_VERSION]))
decryptor.authenticate_additional_data(aad_header)
padded_plaintext = decryptor.update(header['ciphertext']) + decryptor.finalize()
original_length = struct.unpack('>I', padded_plaintext[-4:])[0]
@@ -437,14 +540,25 @@ def decrypt_message(
payload_data = padded_plaintext[:original_length]
result = _unpack_payload(payload_data)
# Note: No date_encoded field in v3.2.0
return result
except Exception as e:
raise DecryptionError(
"Decryption failed. Check your passphrase, PIN, RSA key, and reference photo."
) from e
# Provide more helpful error message for channel key issues
if message_has_key and not has_configured_key:
raise DecryptionError(
"Decryption failed. This message was encoded with a channel key, "
"but no channel key is configured. Provide the correct channel key."
) from e
elif not message_has_key and has_configured_key:
raise DecryptionError(
"Decryption failed. This message was encoded without a channel key, "
"but you have one configured. Try with channel_key='' for public mode."
) from e
else:
raise DecryptionError(
"Decryption failed. Check your passphrase, PIN, RSA key, "
"reference photo, and channel key."
) from e
def decrypt_message_text(
@@ -452,7 +566,8 @@ def decrypt_message_text(
photo_data: bytes,
passphrase: str,
pin: str = "",
rsa_key_data: Optional[bytes] = None
rsa_key_data: Optional[bytes] = None,
channel_key: Optional[Union[str, bool]] = None,
) -> str:
"""
Decrypt message and return as text string.
@@ -465,6 +580,7 @@ def decrypt_message_text(
passphrase: Shared passphrase
pin: Optional static PIN
rsa_key_data: Optional RSA key bytes
channel_key: Channel key parameter
Returns:
Decrypted message string
@@ -472,7 +588,7 @@ def decrypt_message_text(
Raises:
DecryptionError: If decryption fails or content is a file
"""
result = decrypt_message(encrypted_data, photo_data, passphrase, pin, rsa_key_data)
result = decrypt_message(encrypted_data, photo_data, passphrase, pin, rsa_key_data, channel_key)
if result.is_file:
if result.file_data:
@@ -491,3 +607,29 @@ def decrypt_message_text(
def has_argon2() -> bool:
"""Check if Argon2 is available."""
return HAS_ARGON2
# =============================================================================
# CHANNEL KEY UTILITIES (exposed for convenience)
# =============================================================================
def get_active_channel_key() -> Optional[str]:
"""
Get the currently configured channel key (if any).
Returns:
Formatted channel key string, or None if not configured
"""
from .channel import get_channel_key
return get_channel_key()
def get_channel_fingerprint() -> Optional[str]:
"""
Get a display-safe fingerprint of the configured channel key.
Returns:
Masked key like "ABCD-••••-••••-••••-••••-••••-••••-3456" or None
"""
from .channel import get_channel_fingerprint as _get_fingerprint
return _get_fingerprint()

View File

@@ -98,8 +98,8 @@ def memory_usage() -> Dict[str, Union[float, str]]:
mem_info = process.memory_info()
return {
'rss_mb': mem_info.rss / 1024 / 1024, # Resident Set Size
'vms_mb': mem_info.vms / 1024 / 1024, # Virtual Memory Size
'rss_mb': mem_info.rss / 1024 / 1024,
'vms_mb': mem_info.vms / 1024 / 1024,
'percent': process.memory_percent(),
}
except ImportError:
@@ -117,7 +117,7 @@ def hexdump(data: bytes, offset: int = 0, length: int = 64) -> str:
for i in range(0, len(data_to_dump), 16):
chunk = data_to_dump[i:i+16]
hex_str = ' '.join(f'{b:02x}' for b in chunk)
hex_str = hex_str.ljust(47) # Pad to consistent width
hex_str = hex_str.ljust(47)
ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
result.append(f"{offset + i:08x}: {hex_str} {ascii_str}")
@@ -127,7 +127,6 @@ def hexdump(data: bytes, offset: int = 0, length: int = 64) -> str:
return '\n'.join(result)
# Create singleton instance for easy import
class Debug:
"""Debugging utility class."""

View File

@@ -1,10 +1,14 @@
"""
Stegasoo Decode Module (v3.2.0)
Stegasoo Decode Module (v4.0.0)
High-level decoding functions for extracting messages and files from images.
Changes in v4.0.0:
- Added channel_key parameter for deployment/group isolation
- Improved error messages for channel key mismatches
"""
from typing import Optional
from typing import Optional, Union
from pathlib import Path
from .models import DecodeInput, DecodeResult
@@ -29,6 +33,7 @@ def decode(
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
embed_mode: str = EMBED_MODE_AUTO,
channel_key: Optional[Union[str, bool]] = None,
) -> DecodeResult:
"""
Decode a message or file from a stego image.
@@ -41,6 +46,10 @@ def decode(
rsa_key_data: Optional RSA key bytes (if used during encoding)
rsa_password: Optional RSA key password
embed_mode: 'auto' (default), 'lsb', or 'dct'
channel_key: Channel key for deployment/group isolation:
- None or "auto": Use server's configured key
- str: Use this specific channel key
- "" or False: No channel key (public mode)
Returns:
DecodeResult with message or file data
@@ -57,9 +66,19 @@ def decode(
... else:
... with open(result.filename, 'wb') as f:
... f.write(result.file_data)
Example with explicit channel key:
>>> result = decode(
... stego_image=stego_bytes,
... reference_photo=ref_bytes,
... passphrase="apple forest thunder mountain",
... pin="123456",
... channel_key="ABCD-1234-EFGH-5678-IJKL-9012-MNOP-3456"
... )
"""
debug.print(f"decode: passphrase length={len(passphrase.split())} words, "
f"mode={embed_mode}")
f"mode={embed_mode}, "
f"channel_key={'explicit' if isinstance(channel_key, str) and channel_key else 'auto' if channel_key is None else 'none'}")
# Validate inputs
require_valid_image(stego_image, "Stego image")
@@ -71,10 +90,10 @@ def decode(
if rsa_key_data:
require_valid_rsa_key(rsa_key_data, rsa_password)
# Derive pixel/coefficient selection key
# Derive pixel/coefficient selection key (with channel key)
from .crypto import derive_pixel_key
pixel_key = derive_pixel_key(
reference_photo, passphrase, pin, rsa_key_data
reference_photo, passphrase, pin, rsa_key_data, channel_key
)
# Extract encrypted data
@@ -90,9 +109,9 @@ def decode(
debug.print(f"Extracted {len(encrypted)} bytes from image")
# Decrypt
# Decrypt (with channel key)
result = decrypt_message(
encrypted, reference_photo, passphrase, pin, rsa_key_data
encrypted, reference_photo, passphrase, pin, rsa_key_data, channel_key
)
debug.print(f"Decryption successful: {result.payload_type}")
@@ -108,6 +127,7 @@ def decode_file(
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
embed_mode: str = EMBED_MODE_AUTO,
channel_key: Optional[Union[str, bool]] = None,
) -> Path:
"""
Decode a file from a stego image and save it.
@@ -121,6 +141,7 @@ def decode_file(
rsa_key_data: Optional RSA key bytes
rsa_password: Optional RSA key password
embed_mode: 'auto', 'lsb', or 'dct'
channel_key: Channel key parameter (see decode())
Returns:
Path where file was saved
@@ -136,6 +157,7 @@ def decode_file(
rsa_key_data,
rsa_password,
embed_mode,
channel_key,
)
if not result.is_file:
@@ -163,6 +185,7 @@ def decode_text(
rsa_key_data: Optional[bytes] = None,
rsa_password: Optional[str] = None,
embed_mode: str = EMBED_MODE_AUTO,
channel_key: Optional[Union[str, bool]] = None,
) -> str:
"""
Decode a text message from a stego image.
@@ -177,6 +200,7 @@ def decode_text(
rsa_key_data: Optional RSA key bytes
rsa_password: Optional RSA key password
embed_mode: 'auto', 'lsb', or 'dct'
channel_key: Channel key parameter (see decode())
Returns:
Decoded message string
@@ -192,6 +216,7 @@ def decode_text(
rsa_key_data,
rsa_password,
embed_mode,
channel_key,
)
if result.is_file:

View File

@@ -1,7 +1,10 @@
"""
Stegasoo Encode Module (v3.2.0)
Stegasoo Encode Module (v4.0.0)
High-level encoding functions for hiding messages and files in images.
Changes in v4.0.0:
- Added channel_key parameter for deployment/group isolation
"""
from typing import Optional, Union
@@ -34,6 +37,7 @@ def encode(
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png",
dct_color_mode: str = "grayscale",
channel_key: Optional[Union[str, bool]] = None,
) -> EncodeResult:
"""
Encode a message or file into an image.
@@ -50,6 +54,10 @@ def encode(
embed_mode: 'lsb' (default) or 'dct'
dct_output_format: For DCT mode - 'png' or 'jpeg'
dct_color_mode: For DCT mode - 'grayscale' or 'color'
channel_key: Channel key for deployment/group isolation:
- None or "auto": Use server's configured key
- str: Use this specific channel key
- "" or False: No channel key (public mode)
Returns:
EncodeResult with stego image and metadata
@@ -64,9 +72,20 @@ def encode(
... )
>>> with open('stego.png', 'wb') as f:
... f.write(result.stego_image)
Example with explicit channel key:
>>> result = encode(
... message="Secret message",
... reference_photo=ref_bytes,
... carrier_image=carrier_bytes,
... passphrase="apple forest thunder mountain",
... pin="123456",
... channel_key="ABCD-1234-EFGH-5678-IJKL-9012-MNOP-3456"
... )
"""
debug.print(f"encode: passphrase length={len(passphrase.split())} words, "
f"pin={'set' if pin else 'none'}, mode={embed_mode}")
f"pin={'set' if pin else 'none'}, mode={embed_mode}, "
f"channel_key={'explicit' if isinstance(channel_key, str) and channel_key else 'auto' if channel_key is None else 'none'}")
# Validate inputs
require_valid_payload(message)
@@ -79,16 +98,16 @@ def encode(
if rsa_key_data:
require_valid_rsa_key(rsa_key_data, rsa_password)
# Encrypt message
# Encrypt message (with channel key)
encrypted = encrypt_message(
message, reference_photo, passphrase, pin, rsa_key_data
message, reference_photo, passphrase, pin, rsa_key_data, channel_key
)
debug.print(f"Encrypted payload: {len(encrypted)} bytes")
# Derive pixel/coefficient selection key
# Derive pixel/coefficient selection key (with channel key)
pixel_key = derive_pixel_key(
reference_photo, passphrase, pin, rsa_key_data
reference_photo, passphrase, pin, rsa_key_data, channel_key
)
# Embed in image
@@ -114,7 +133,7 @@ def encode(
pixels_modified=stats.pixels_modified,
total_pixels=stats.total_pixels,
capacity_used=stats.capacity_used,
date_used=None, # No longer used in v3.2.0
date_used=None, # No longer used in v3.2.0+
)
else:
# DCT mode stats
@@ -141,6 +160,7 @@ def encode_file(
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png",
dct_color_mode: str = "grayscale",
channel_key: Optional[Union[str, bool]] = None,
) -> EncodeResult:
"""
Encode a file into an image.
@@ -160,6 +180,7 @@ def encode_file(
embed_mode: 'lsb' or 'dct'
dct_output_format: 'png' or 'jpeg'
dct_color_mode: 'grayscale' or 'color'
channel_key: Channel key parameter (see encode())
Returns:
EncodeResult
@@ -178,6 +199,7 @@ def encode_file(
embed_mode=embed_mode,
dct_output_format=dct_output_format,
dct_color_mode=dct_color_mode,
channel_key=channel_key,
)
@@ -195,6 +217,7 @@ def encode_bytes(
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = "png",
dct_color_mode: str = "grayscale",
channel_key: Optional[Union[str, bool]] = None,
) -> EncodeResult:
"""
Encode raw bytes with metadata into an image.
@@ -213,6 +236,7 @@ def encode_bytes(
embed_mode: 'lsb' or 'dct'
dct_output_format: 'png' or 'jpeg'
dct_color_mode: 'grayscale' or 'color'
channel_key: Channel key parameter (see encode())
Returns:
EncodeResult
@@ -231,4 +255,5 @@ def encode_bytes(
embed_mode=embed_mode,
dct_output_format=dct_output_format,
dct_color_mode=dct_color_mode,
channel_key=channel_key,
)