Files
stegasoo/frontends/web/stego_worker.py
Aaron D. Lee aac8037c04 Fix DCT steganography for non-8-aligned images and set color mode default
- Fix block calculation mismatch in DCT extract (use original dimensions)
- Change default dct_color_mode from "grayscale" to "color"
- Update DCT test to use noise image instead of solid color
- Remove debug logging from encode/decode paths

The block calculation fix ensures extract uses the same block positions
as embed for images whose dimensions aren't divisible by 8. This was
causing decode failures on the Pi web UI with 1195x671 images.

Color mode is now the default since it preserves the original image
colors. The test fixture now uses a random noise image because solid
color images cause coefficient drift during YCbCr/RGB conversion that
can corrupt embedded data.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 21:36:59 -05:00

273 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
Stegasoo Subprocess Worker (v4.0.0)
This script runs in a subprocess and handles encode/decode operations.
If it crashes due to jpegio/scipy issues, the parent Flask process survives.
CHANGES in v4.0.0:
- Added channel_key support for encode/decode operations
- New channel_status operation
Communication is via JSON over stdin/stdout:
- Input: JSON object with operation parameters
- Output: JSON object with results or error
Usage:
echo '{"operation": "encode", ...}' | python stego_worker.py
"""
import base64
import json
import sys
import traceback
from pathlib import Path
# Ensure stegasoo is importable
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
sys.path.insert(0, str(Path(__file__).parent))
def _resolve_channel_key(channel_key_param):
"""
Resolve channel_key parameter to value for stegasoo.
Args:
channel_key_param: 'auto', 'none', explicit key, or None
Returns:
None (auto), "" (public), or explicit key string
"""
if channel_key_param is None or channel_key_param == "auto":
return None # Auto mode - use server config
elif channel_key_param == "none":
return "" # Public mode
else:
return channel_key_param # Explicit key
def _get_channel_info(resolved_key):
"""
Get channel mode and fingerprint for response.
Returns:
(mode, fingerprint) tuple
"""
from stegasoo import get_channel_status, has_channel_key
if resolved_key == "":
return "public", None
if resolved_key is not None:
# Explicit key
fingerprint = f"{resolved_key[:4]}-••••-••••-••••-••••-••••-••••-{resolved_key[-4:]}"
return "private", fingerprint
# Auto mode - check server config
if has_channel_key():
status = get_channel_status()
return "private", status.get("fingerprint")
return "public", None
def encode_operation(params: dict) -> dict:
"""Handle encode operation."""
from stegasoo import FilePayload, encode
# Decode base64 inputs
carrier_data = base64.b64decode(params["carrier_b64"])
reference_data = base64.b64decode(params["reference_b64"])
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Determine payload type
if params.get("file_b64"):
file_data = base64.b64decode(params["file_b64"])
payload = FilePayload(
data=file_data,
filename=params.get("file_name", "file"),
mime_type=params.get("file_mime", "application/octet-stream"),
)
else:
payload = params.get("message", "")
# Resolve channel key (v4.0.0)
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Call encode with correct parameter names
result = encode(
message=payload,
reference_photo=reference_data,
carrier_image=carrier_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "lsb"),
dct_output_format=params.get("dct_output_format", "png"),
dct_color_mode=params.get("dct_color_mode", "color"),
channel_key=resolved_channel_key, # v4.0.0
)
# Build stats dict if available
stats = None
if hasattr(result, "stats") and result.stats:
stats = {
"pixels_modified": getattr(result.stats, "pixels_modified", 0),
"capacity_used": getattr(result.stats, "capacity_used", 0),
"bytes_embedded": getattr(result.stats, "bytes_embedded", 0),
}
# Get channel info for response (v4.0.0)
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return {
"success": True,
"stego_b64": base64.b64encode(result.stego_image).decode("ascii"),
"filename": getattr(result, "filename", None),
"stats": stats,
"channel_mode": channel_mode,
"channel_fingerprint": channel_fingerprint,
}
def decode_operation(params: dict) -> dict:
"""Handle decode operation."""
from stegasoo import decode
# Decode base64 inputs
stego_data = base64.b64decode(params["stego_b64"])
reference_data = base64.b64decode(params["reference_b64"])
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Resolve channel key (v4.0.0)
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Call decode with correct parameter names
result = decode(
stego_image=stego_data,
reference_photo=reference_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "auto"),
channel_key=resolved_channel_key, # v4.0.0
)
if result.is_file:
return {
"success": True,
"is_file": True,
"file_b64": base64.b64encode(result.file_data).decode("ascii"),
"filename": result.filename,
"mime_type": result.mime_type,
}
else:
return {
"success": True,
"is_file": False,
"message": result.message,
}
def compare_operation(params: dict) -> dict:
"""Handle compare_modes operation."""
from stegasoo import compare_modes
carrier_data = base64.b64decode(params["carrier_b64"])
result = compare_modes(carrier_data)
return {
"success": True,
"comparison": result,
}
def capacity_check_operation(params: dict) -> dict:
"""Handle will_fit_by_mode operation."""
from stegasoo import will_fit_by_mode
carrier_data = base64.b64decode(params["carrier_b64"])
result = will_fit_by_mode(
payload=params["payload_size"],
carrier_image=carrier_data,
embed_mode=params.get("embed_mode", "lsb"),
)
return {
"success": True,
"result": result,
}
def channel_status_operation(params: dict) -> dict:
"""Handle channel status check (v4.0.0)."""
from stegasoo import get_channel_status
status = get_channel_status()
reveal = params.get("reveal", False)
return {
"success": True,
"status": {
"mode": status["mode"],
"configured": status["configured"],
"fingerprint": status.get("fingerprint"),
"source": status.get("source"),
"key": status.get("key") if reveal and status["configured"] else None,
},
}
def main():
"""Main entry point - read JSON from stdin, write JSON to stdout."""
try:
# Read all input
input_text = sys.stdin.read()
if not input_text.strip():
output = {"success": False, "error": "No input provided"}
else:
params = json.loads(input_text)
operation = params.get("operation")
if operation == "encode":
output = encode_operation(params)
elif operation == "decode":
output = decode_operation(params)
elif operation == "compare":
output = compare_operation(params)
elif operation == "capacity":
output = capacity_check_operation(params)
elif operation == "channel_status":
output = channel_status_operation(params)
else:
output = {"success": False, "error": f"Unknown operation: {operation}"}
except json.JSONDecodeError as e:
output = {"success": False, "error": f"Invalid JSON: {e}"}
except Exception as e:
output = {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"traceback": traceback.format_exc(),
}
# Write output as JSON
print(json.dumps(output), flush=True)
if __name__ == "__main__":
main()