- Fix block calculation mismatch in DCT extract (use original dimensions) - Change default dct_color_mode from "grayscale" to "color" - Update DCT test to use noise image instead of solid color - Remove debug logging from encode/decode paths The block calculation fix ensures extract uses the same block positions as embed for images whose dimensions aren't divisible by 8. This was causing decode failures on the Pi web UI with 1195x671 images. Color mode is now the default since it preserves the original image colors. The test fixture now uses a random noise image because solid color images cause coefficient drift during YCbCr/RGB conversion that can corrupt embedded data. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
273 lines
8.1 KiB
Python
273 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Stegasoo Subprocess Worker (v4.0.0)
|
|
|
|
This script runs in a subprocess and handles encode/decode operations.
|
|
If it crashes due to jpegio/scipy issues, the parent Flask process survives.
|
|
|
|
CHANGES in v4.0.0:
|
|
- Added channel_key support for encode/decode operations
|
|
- New channel_status operation
|
|
|
|
Communication is via JSON over stdin/stdout:
|
|
- Input: JSON object with operation parameters
|
|
- Output: JSON object with results or error
|
|
|
|
Usage:
|
|
echo '{"operation": "encode", ...}' | python stego_worker.py
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import sys
|
|
import traceback
|
|
from pathlib import Path
|
|
|
|
# Ensure stegasoo is importable
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
|
|
def _resolve_channel_key(channel_key_param):
|
|
"""
|
|
Resolve channel_key parameter to value for stegasoo.
|
|
|
|
Args:
|
|
channel_key_param: 'auto', 'none', explicit key, or None
|
|
|
|
Returns:
|
|
None (auto), "" (public), or explicit key string
|
|
"""
|
|
if channel_key_param is None or channel_key_param == "auto":
|
|
return None # Auto mode - use server config
|
|
elif channel_key_param == "none":
|
|
return "" # Public mode
|
|
else:
|
|
return channel_key_param # Explicit key
|
|
|
|
|
|
def _get_channel_info(resolved_key):
|
|
"""
|
|
Get channel mode and fingerprint for response.
|
|
|
|
Returns:
|
|
(mode, fingerprint) tuple
|
|
"""
|
|
from stegasoo import get_channel_status, has_channel_key
|
|
|
|
if resolved_key == "":
|
|
return "public", None
|
|
|
|
if resolved_key is not None:
|
|
# Explicit key
|
|
fingerprint = f"{resolved_key[:4]}-••••-••••-••••-••••-••••-••••-{resolved_key[-4:]}"
|
|
return "private", fingerprint
|
|
|
|
# Auto mode - check server config
|
|
if has_channel_key():
|
|
status = get_channel_status()
|
|
return "private", status.get("fingerprint")
|
|
|
|
return "public", None
|
|
|
|
|
|
def encode_operation(params: dict) -> dict:
|
|
"""Handle encode operation."""
|
|
from stegasoo import FilePayload, encode
|
|
|
|
# Decode base64 inputs
|
|
carrier_data = base64.b64decode(params["carrier_b64"])
|
|
reference_data = base64.b64decode(params["reference_b64"])
|
|
|
|
# Optional RSA key
|
|
rsa_key_data = None
|
|
if params.get("rsa_key_b64"):
|
|
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
|
|
|
|
# Determine payload type
|
|
if params.get("file_b64"):
|
|
file_data = base64.b64decode(params["file_b64"])
|
|
payload = FilePayload(
|
|
data=file_data,
|
|
filename=params.get("file_name", "file"),
|
|
mime_type=params.get("file_mime", "application/octet-stream"),
|
|
)
|
|
else:
|
|
payload = params.get("message", "")
|
|
|
|
# Resolve channel key (v4.0.0)
|
|
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
|
|
|
|
# Call encode with correct parameter names
|
|
result = encode(
|
|
message=payload,
|
|
reference_photo=reference_data,
|
|
carrier_image=carrier_data,
|
|
passphrase=params.get("passphrase", ""),
|
|
pin=params.get("pin"),
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=params.get("rsa_password"),
|
|
embed_mode=params.get("embed_mode", "lsb"),
|
|
dct_output_format=params.get("dct_output_format", "png"),
|
|
dct_color_mode=params.get("dct_color_mode", "color"),
|
|
channel_key=resolved_channel_key, # v4.0.0
|
|
)
|
|
|
|
# Build stats dict if available
|
|
stats = None
|
|
if hasattr(result, "stats") and result.stats:
|
|
stats = {
|
|
"pixels_modified": getattr(result.stats, "pixels_modified", 0),
|
|
"capacity_used": getattr(result.stats, "capacity_used", 0),
|
|
"bytes_embedded": getattr(result.stats, "bytes_embedded", 0),
|
|
}
|
|
|
|
# Get channel info for response (v4.0.0)
|
|
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
|
|
|
|
return {
|
|
"success": True,
|
|
"stego_b64": base64.b64encode(result.stego_image).decode("ascii"),
|
|
"filename": getattr(result, "filename", None),
|
|
"stats": stats,
|
|
"channel_mode": channel_mode,
|
|
"channel_fingerprint": channel_fingerprint,
|
|
}
|
|
|
|
|
|
def decode_operation(params: dict) -> dict:
|
|
"""Handle decode operation."""
|
|
from stegasoo import decode
|
|
|
|
# Decode base64 inputs
|
|
stego_data = base64.b64decode(params["stego_b64"])
|
|
reference_data = base64.b64decode(params["reference_b64"])
|
|
|
|
# Optional RSA key
|
|
rsa_key_data = None
|
|
if params.get("rsa_key_b64"):
|
|
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
|
|
|
|
# Resolve channel key (v4.0.0)
|
|
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
|
|
|
|
# Call decode with correct parameter names
|
|
result = decode(
|
|
stego_image=stego_data,
|
|
reference_photo=reference_data,
|
|
passphrase=params.get("passphrase", ""),
|
|
pin=params.get("pin"),
|
|
rsa_key_data=rsa_key_data,
|
|
rsa_password=params.get("rsa_password"),
|
|
embed_mode=params.get("embed_mode", "auto"),
|
|
channel_key=resolved_channel_key, # v4.0.0
|
|
)
|
|
|
|
if result.is_file:
|
|
return {
|
|
"success": True,
|
|
"is_file": True,
|
|
"file_b64": base64.b64encode(result.file_data).decode("ascii"),
|
|
"filename": result.filename,
|
|
"mime_type": result.mime_type,
|
|
}
|
|
else:
|
|
return {
|
|
"success": True,
|
|
"is_file": False,
|
|
"message": result.message,
|
|
}
|
|
|
|
|
|
def compare_operation(params: dict) -> dict:
|
|
"""Handle compare_modes operation."""
|
|
from stegasoo import compare_modes
|
|
|
|
carrier_data = base64.b64decode(params["carrier_b64"])
|
|
result = compare_modes(carrier_data)
|
|
|
|
return {
|
|
"success": True,
|
|
"comparison": result,
|
|
}
|
|
|
|
|
|
def capacity_check_operation(params: dict) -> dict:
|
|
"""Handle will_fit_by_mode operation."""
|
|
from stegasoo import will_fit_by_mode
|
|
|
|
carrier_data = base64.b64decode(params["carrier_b64"])
|
|
|
|
result = will_fit_by_mode(
|
|
payload=params["payload_size"],
|
|
carrier_image=carrier_data,
|
|
embed_mode=params.get("embed_mode", "lsb"),
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"result": result,
|
|
}
|
|
|
|
|
|
def channel_status_operation(params: dict) -> dict:
|
|
"""Handle channel status check (v4.0.0)."""
|
|
from stegasoo import get_channel_status
|
|
|
|
status = get_channel_status()
|
|
reveal = params.get("reveal", False)
|
|
|
|
return {
|
|
"success": True,
|
|
"status": {
|
|
"mode": status["mode"],
|
|
"configured": status["configured"],
|
|
"fingerprint": status.get("fingerprint"),
|
|
"source": status.get("source"),
|
|
"key": status.get("key") if reveal and status["configured"] else None,
|
|
},
|
|
}
|
|
|
|
|
|
def main():
|
|
"""Main entry point - read JSON from stdin, write JSON to stdout."""
|
|
try:
|
|
# Read all input
|
|
input_text = sys.stdin.read()
|
|
|
|
if not input_text.strip():
|
|
output = {"success": False, "error": "No input provided"}
|
|
else:
|
|
params = json.loads(input_text)
|
|
operation = params.get("operation")
|
|
|
|
if operation == "encode":
|
|
output = encode_operation(params)
|
|
elif operation == "decode":
|
|
output = decode_operation(params)
|
|
elif operation == "compare":
|
|
output = compare_operation(params)
|
|
elif operation == "capacity":
|
|
output = capacity_check_operation(params)
|
|
elif operation == "channel_status":
|
|
output = channel_status_operation(params)
|
|
else:
|
|
output = {"success": False, "error": f"Unknown operation: {operation}"}
|
|
|
|
except json.JSONDecodeError as e:
|
|
output = {"success": False, "error": f"Invalid JSON: {e}"}
|
|
except Exception as e:
|
|
output = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"error_type": type(e).__name__,
|
|
"traceback": traceback.format_exc(),
|
|
}
|
|
|
|
# Write output as JSON
|
|
print(json.dumps(output), flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|