#!/usr/bin/env python3 """ Stego Subprocess Worker (v4.0.0) This script runs in a subprocess and handles encode/decode operations. If it crashes due to jpeglib/scipy issues, the parent Flask process survives. CHANGES in v4.0.0: - Added channel_key support for encode/decode operations - New channel_status operation Communication is via JSON over stdin/stdout: - Input: JSON object with operation parameters - Output: JSON object with results or error Usage: echo '{"operation": "encode", ...}' | python stego_worker.py """ import base64 import json import logging import os import sys import traceback from pathlib import Path # Ensure stego is importable sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) sys.path.insert(0, str(Path(__file__).parent)) # Configure logging for worker subprocess _log_level = os.environ.get("FIELDWITNESS_LOG_LEVEL", "").strip().upper() if _log_level and hasattr(logging, _log_level): logging.basicConfig( level=getattr(logging, _log_level), format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s", datefmt="%H:%M:%S", stream=sys.stderr, ) elif os.environ.get("FIELDWITNESS_DEBUG", "").strip() in ("1", "true", "yes"): logging.basicConfig( level=logging.DEBUG, format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s", datefmt="%H:%M:%S", stream=sys.stderr, ) logger = logging.getLogger("stego.worker") def _resolve_channel_key(channel_key_param): """ Resolve channel_key parameter to value for stego. Args: channel_key_param: 'auto', 'none', explicit key, or None Returns: None (auto), "" (public), or explicit key string """ if channel_key_param is None or channel_key_param == "auto": return None # Auto mode - use server config elif channel_key_param == "none": return "" # Public mode else: return channel_key_param # Explicit key def _get_channel_info(resolved_key): """ Get channel mode and fingerprint for response. Returns: (mode, fingerprint) tuple """ from fieldwitness.stego import get_channel_status, has_channel_key if resolved_key == "": return "public", None if resolved_key is not None: # Explicit key fingerprint = f"{resolved_key[:4]}-••••-••••-••••-••••-••••-••••-{resolved_key[-4:]}" return "private", fingerprint # Auto mode - check server config if has_channel_key(): status = get_channel_status() return "private", status.get("fingerprint") return "public", None def encode_operation(params: dict) -> dict: """Handle encode operation.""" logger.debug("encode_operation: mode=%s", params.get("embed_mode", "lsb")) from fieldwitness.stego import FilePayload, encode # Decode base64 inputs carrier_data = base64.b64decode(params["carrier_b64"]) reference_data = base64.b64decode(params["reference_b64"]) # Optional RSA key rsa_key_data = None if params.get("rsa_key_b64"): rsa_key_data = base64.b64decode(params["rsa_key_b64"]) # Determine payload type if params.get("file_b64"): file_data = base64.b64decode(params["file_b64"]) payload = FilePayload( data=file_data, filename=params.get("file_name", "file"), mime_type=params.get("file_mime", "application/octet-stream"), ) else: payload = params.get("message", "") # Resolve channel key (v4.0.0) resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto")) # Call encode with correct parameter names result = encode( message=payload, reference_photo=reference_data, carrier_image=carrier_data, passphrase=params.get("passphrase", ""), pin=params.get("pin"), rsa_key_data=rsa_key_data, rsa_password=params.get("rsa_password"), embed_mode=params.get("embed_mode", "lsb"), dct_output_format=params.get("dct_output_format", "png"), dct_color_mode=params.get("dct_color_mode", "color"), channel_key=resolved_channel_key, # v4.0.0 progress_file=params.get("progress_file"), # v4.1.2 ) # Build stats dict if available stats = None if hasattr(result, "stats") and result.stats: stats = { "pixels_modified": getattr(result.stats, "pixels_modified", 0), "capacity_used": getattr(result.stats, "capacity_used", 0), "bytes_embedded": getattr(result.stats, "bytes_embedded", 0), } # Get channel info for response (v4.0.0) channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) return { "success": True, "stego_b64": base64.b64encode(result.stego_image).decode("ascii"), "filename": getattr(result, "filename", None), "stats": stats, "channel_mode": channel_mode, "channel_fingerprint": channel_fingerprint, } def _write_decode_progress(progress_file: str | None, percent: int, phase: str) -> None: """Write decode progress to file.""" if not progress_file: return try: import json with open(progress_file, "w") as f: json.dump({"percent": percent, "phase": phase}, f) except Exception: pass # Best effort def decode_operation(params: dict) -> dict: """Handle decode operation.""" logger.debug("decode_operation: mode=%s", params.get("embed_mode", "auto")) from fieldwitness.stego import decode progress_file = params.get("progress_file") # Progress: starting _write_decode_progress(progress_file, 5, "reading") # Decode base64 inputs stego_data = base64.b64decode(params["stego_b64"]) reference_data = base64.b64decode(params["reference_b64"]) _write_decode_progress(progress_file, 15, "reading") # Optional RSA key rsa_key_data = None if params.get("rsa_key_b64"): rsa_key_data = base64.b64decode(params["rsa_key_b64"]) # Resolve channel key (v4.0.0) resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto")) # Library handles progress internally via progress_file parameter # Call decode with correct parameter names result = decode( stego_image=stego_data, reference_photo=reference_data, passphrase=params.get("passphrase", ""), pin=params.get("pin"), rsa_key_data=rsa_key_data, rsa_password=params.get("rsa_password"), embed_mode=params.get("embed_mode", "auto"), channel_key=resolved_channel_key, # v4.0.0 progress_file=progress_file, # v4.2.0: pass through for real-time progress ) # Library writes 100% "complete" - no need for worker to write again if result.is_file: return { "success": True, "is_file": True, "file_b64": base64.b64encode(result.file_data).decode("ascii"), "filename": result.filename, "mime_type": result.mime_type, } else: return { "success": True, "is_file": False, "message": result.message, } def compare_operation(params: dict) -> dict: """Handle compare_modes operation.""" from fieldwitness.stego import compare_modes carrier_data = base64.b64decode(params["carrier_b64"]) result = compare_modes(carrier_data) return { "success": True, "comparison": result, } def capacity_check_operation(params: dict) -> dict: """Handle will_fit_by_mode operation.""" from fieldwitness.stego import will_fit_by_mode carrier_data = base64.b64decode(params["carrier_b64"]) result = will_fit_by_mode( payload=params["payload_size"], carrier_image=carrier_data, embed_mode=params.get("embed_mode", "lsb"), ) return { "success": True, "result": result, } def encode_audio_operation(params: dict) -> dict: """Handle audio encode operation (v4.3.0).""" logger.debug("encode_audio_operation: mode=%s", params.get("embed_mode", "audio_lsb")) from fieldwitness.stego import FilePayload, encode_audio carrier_data = base64.b64decode(params["carrier_b64"]) reference_data = base64.b64decode(params["reference_b64"]) # Optional RSA key rsa_key_data = None if params.get("rsa_key_b64"): rsa_key_data = base64.b64decode(params["rsa_key_b64"]) # Determine payload type if params.get("file_b64"): file_data = base64.b64decode(params["file_b64"]) payload = FilePayload( data=file_data, filename=params.get("file_name", "file"), mime_type=params.get("file_mime", "application/octet-stream"), ) else: payload = params.get("message", "") resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto")) # Resolve chip_tier from params (None means use default) chip_tier_val = params.get("chip_tier") if chip_tier_val is not None: chip_tier_val = int(chip_tier_val) stego_audio, stats = encode_audio( message=payload, reference_photo=reference_data, carrier_audio=carrier_data, passphrase=params.get("passphrase", ""), pin=params.get("pin"), rsa_key_data=rsa_key_data, rsa_password=params.get("rsa_password"), embed_mode=params.get("embed_mode", "audio_lsb"), channel_key=resolved_channel_key, progress_file=params.get("progress_file"), chip_tier=chip_tier_val, ) channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key) return { "success": True, "stego_b64": base64.b64encode(stego_audio).decode("ascii"), "stats": { "samples_modified": stats.samples_modified, "total_samples": stats.total_samples, "capacity_used": stats.capacity_used, "bytes_embedded": stats.bytes_embedded, "sample_rate": stats.sample_rate, "channels": stats.channels, "duration_seconds": stats.duration_seconds, "embed_mode": stats.embed_mode, }, "channel_mode": channel_mode, "channel_fingerprint": channel_fingerprint, } def decode_audio_operation(params: dict) -> dict: """Handle audio decode operation (v4.3.0).""" logger.debug("decode_audio_operation: mode=%s", params.get("embed_mode", "audio_auto")) from fieldwitness.stego import decode_audio progress_file = params.get("progress_file") _write_decode_progress(progress_file, 5, "reading") stego_data = base64.b64decode(params["stego_b64"]) reference_data = base64.b64decode(params["reference_b64"]) _write_decode_progress(progress_file, 15, "reading") rsa_key_data = None if params.get("rsa_key_b64"): rsa_key_data = base64.b64decode(params["rsa_key_b64"]) resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto")) result = decode_audio( stego_audio=stego_data, reference_photo=reference_data, passphrase=params.get("passphrase", ""), pin=params.get("pin"), rsa_key_data=rsa_key_data, rsa_password=params.get("rsa_password"), embed_mode=params.get("embed_mode", "audio_auto"), channel_key=resolved_channel_key, progress_file=progress_file, ) if result.is_file: return { "success": True, "is_file": True, "file_b64": base64.b64encode(result.file_data).decode("ascii"), "filename": result.filename, "mime_type": result.mime_type, } else: return { "success": True, "is_file": False, "message": result.message, } def audio_info_operation(params: dict) -> dict: """Handle audio info operation (v4.3.0).""" from fieldwitness.stego import get_audio_info from fieldwitness.stego.audio_steganography import calculate_audio_lsb_capacity from fieldwitness.stego.spread_steganography import calculate_audio_spread_capacity audio_data = base64.b64decode(params["audio_b64"]) info = get_audio_info(audio_data) lsb_capacity = calculate_audio_lsb_capacity(audio_data) spread_capacity = calculate_audio_spread_capacity(audio_data) return { "success": True, "info": { "sample_rate": info.sample_rate, "channels": info.channels, "duration_seconds": round(info.duration_seconds, 2), "num_samples": info.num_samples, "format": info.format, "bit_depth": info.bit_depth, "capacity_lsb": lsb_capacity, "capacity_spread": spread_capacity.usable_capacity_bytes, }, } def channel_status_operation(params: dict) -> dict: """Handle channel status check (v4.0.0).""" from fieldwitness.stego import get_channel_status status = get_channel_status() reveal = params.get("reveal", False) return { "success": True, "status": { "mode": status["mode"], "configured": status["configured"], "fingerprint": status.get("fingerprint"), "source": status.get("source"), "key": status.get("key") if reveal and status["configured"] else None, }, } def main(): """Main entry point - read JSON from stdin, write JSON to stdout.""" try: # Read all input input_text = sys.stdin.read() if not input_text.strip(): output = {"success": False, "error": "No input provided"} else: params = json.loads(input_text) operation = params.get("operation") logger.info("Worker handling operation: %s", operation) if operation == "encode": output = encode_operation(params) elif operation == "decode": output = decode_operation(params) elif operation == "compare": output = compare_operation(params) elif operation == "capacity": output = capacity_check_operation(params) elif operation == "channel_status": output = channel_status_operation(params) # Audio operations (v4.3.0) elif operation == "encode_audio": output = encode_audio_operation(params) elif operation == "decode_audio": output = decode_audio_operation(params) elif operation == "audio_info": output = audio_info_operation(params) else: output = {"success": False, "error": f"Unknown operation: {operation}"} except json.JSONDecodeError as e: output = {"success": False, "error": f"Invalid JSON: {e}"} except Exception as e: output = { "success": False, "error": str(e), "error_type": type(e).__name__, "traceback": traceback.format_exc(), } # Write output as JSON print(json.dumps(output), flush=True) if __name__ == "__main__": main()