fieldwitness/frontends/web/stego_worker.py
Aaron D. Lee 490f9d4a1d Rebrand SooSeF to FieldWitness
Complete project rebrand for better positioning in the press freedom
and digital security space. FieldWitness communicates both field
deployment and evidence testimony — appropriate for the target audience
of journalists, NGOs, and human rights organizations.

Rename mapping:
- soosef → fieldwitness (package, CLI, all imports)
- soosef.stegasoo → fieldwitness.stego
- soosef.verisoo → fieldwitness.attest
- ~/.soosef/ → ~/.fwmetadata/ (innocuous data dir name)
- SOOSEF_DATA_DIR → FIELDWITNESS_DATA_DIR
- SoosefConfig → FieldWitnessConfig
- SoosefError → FieldWitnessError

Also includes:
- License switch from MIT to GPL-3.0
- C2PA bridge module (Phase 0-2 MVP): cert.py, export.py, vendor_assertions.py
- README repositioned to lead with provenance/federation, stego backgrounded
- Threat model skeleton at docs/security/threat-model.md
- Planning docs: docs/planning/c2pa-integration.md, docs/planning/gtm-feasibility.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:05:13 -04:00

466 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Stego Subprocess Worker (v4.0.0)
This script runs in a subprocess and handles encode/decode operations.
If it crashes due to jpeglib/scipy issues, the parent Flask process survives.
CHANGES in v4.0.0:
- Added channel_key support for encode/decode operations
- New channel_status operation
Communication is via JSON over stdin/stdout:
- Input: JSON object with operation parameters
- Output: JSON object with results or error
Usage:
echo '{"operation": "encode", ...}' | python stego_worker.py
"""
import base64
import json
import logging
import os
import sys
import traceback
from pathlib import Path
# Ensure stego is importable
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
sys.path.insert(0, str(Path(__file__).parent))
# Configure logging for worker subprocess
_log_level = os.environ.get("FIELDWITNESS_LOG_LEVEL", "").strip().upper()
if _log_level and hasattr(logging, _log_level):
logging.basicConfig(
level=getattr(logging, _log_level),
format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
elif os.environ.get("FIELDWITNESS_DEBUG", "").strip() in ("1", "true", "yes"):
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
logger = logging.getLogger("stego.worker")
def _resolve_channel_key(channel_key_param):
"""
Resolve channel_key parameter to value for stego.
Args:
channel_key_param: 'auto', 'none', explicit key, or None
Returns:
None (auto), "" (public), or explicit key string
"""
if channel_key_param is None or channel_key_param == "auto":
return None # Auto mode - use server config
elif channel_key_param == "none":
return "" # Public mode
else:
return channel_key_param # Explicit key
def _get_channel_info(resolved_key):
"""
Get channel mode and fingerprint for response.
Returns:
(mode, fingerprint) tuple
"""
from fieldwitness.stego import get_channel_status, has_channel_key
if resolved_key == "":
return "public", None
if resolved_key is not None:
# Explicit key
fingerprint = f"{resolved_key[:4]}-••••-••••-••••-••••-••••-••••-{resolved_key[-4:]}"
return "private", fingerprint
# Auto mode - check server config
if has_channel_key():
status = get_channel_status()
return "private", status.get("fingerprint")
return "public", None
def encode_operation(params: dict) -> dict:
"""Handle encode operation."""
logger.debug("encode_operation: mode=%s", params.get("embed_mode", "lsb"))
from fieldwitness.stego import FilePayload, encode
# Decode base64 inputs
carrier_data = base64.b64decode(params["carrier_b64"])
reference_data = base64.b64decode(params["reference_b64"])
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Determine payload type
if params.get("file_b64"):
file_data = base64.b64decode(params["file_b64"])
payload = FilePayload(
data=file_data,
filename=params.get("file_name", "file"),
mime_type=params.get("file_mime", "application/octet-stream"),
)
else:
payload = params.get("message", "")
# Resolve channel key (v4.0.0)
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Call encode with correct parameter names
result = encode(
message=payload,
reference_photo=reference_data,
carrier_image=carrier_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "lsb"),
dct_output_format=params.get("dct_output_format", "png"),
dct_color_mode=params.get("dct_color_mode", "color"),
channel_key=resolved_channel_key, # v4.0.0
progress_file=params.get("progress_file"), # v4.1.2
)
# Build stats dict if available
stats = None
if hasattr(result, "stats") and result.stats:
stats = {
"pixels_modified": getattr(result.stats, "pixels_modified", 0),
"capacity_used": getattr(result.stats, "capacity_used", 0),
"bytes_embedded": getattr(result.stats, "bytes_embedded", 0),
}
# Get channel info for response (v4.0.0)
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return {
"success": True,
"stego_b64": base64.b64encode(result.stego_image).decode("ascii"),
"filename": getattr(result, "filename", None),
"stats": stats,
"channel_mode": channel_mode,
"channel_fingerprint": channel_fingerprint,
}
def _write_decode_progress(progress_file: str | None, percent: int, phase: str) -> None:
"""Write decode progress to file."""
if not progress_file:
return
try:
import json
with open(progress_file, "w") as f:
json.dump({"percent": percent, "phase": phase}, f)
except Exception:
pass # Best effort
def decode_operation(params: dict) -> dict:
"""Handle decode operation."""
logger.debug("decode_operation: mode=%s", params.get("embed_mode", "auto"))
from fieldwitness.stego import decode
progress_file = params.get("progress_file")
# Progress: starting
_write_decode_progress(progress_file, 5, "reading")
# Decode base64 inputs
stego_data = base64.b64decode(params["stego_b64"])
reference_data = base64.b64decode(params["reference_b64"])
_write_decode_progress(progress_file, 15, "reading")
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Resolve channel key (v4.0.0)
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Library handles progress internally via progress_file parameter
# Call decode with correct parameter names
result = decode(
stego_image=stego_data,
reference_photo=reference_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "auto"),
channel_key=resolved_channel_key, # v4.0.0
progress_file=progress_file, # v4.2.0: pass through for real-time progress
)
# Library writes 100% "complete" - no need for worker to write again
if result.is_file:
return {
"success": True,
"is_file": True,
"file_b64": base64.b64encode(result.file_data).decode("ascii"),
"filename": result.filename,
"mime_type": result.mime_type,
}
else:
return {
"success": True,
"is_file": False,
"message": result.message,
}
def compare_operation(params: dict) -> dict:
"""Handle compare_modes operation."""
from fieldwitness.stego import compare_modes
carrier_data = base64.b64decode(params["carrier_b64"])
result = compare_modes(carrier_data)
return {
"success": True,
"comparison": result,
}
def capacity_check_operation(params: dict) -> dict:
"""Handle will_fit_by_mode operation."""
from fieldwitness.stego import will_fit_by_mode
carrier_data = base64.b64decode(params["carrier_b64"])
result = will_fit_by_mode(
payload=params["payload_size"],
carrier_image=carrier_data,
embed_mode=params.get("embed_mode", "lsb"),
)
return {
"success": True,
"result": result,
}
def encode_audio_operation(params: dict) -> dict:
"""Handle audio encode operation (v4.3.0)."""
logger.debug("encode_audio_operation: mode=%s", params.get("embed_mode", "audio_lsb"))
from fieldwitness.stego import FilePayload, encode_audio
carrier_data = base64.b64decode(params["carrier_b64"])
reference_data = base64.b64decode(params["reference_b64"])
# Optional RSA key
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
# Determine payload type
if params.get("file_b64"):
file_data = base64.b64decode(params["file_b64"])
payload = FilePayload(
data=file_data,
filename=params.get("file_name", "file"),
mime_type=params.get("file_mime", "application/octet-stream"),
)
else:
payload = params.get("message", "")
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
# Resolve chip_tier from params (None means use default)
chip_tier_val = params.get("chip_tier")
if chip_tier_val is not None:
chip_tier_val = int(chip_tier_val)
stego_audio, stats = encode_audio(
message=payload,
reference_photo=reference_data,
carrier_audio=carrier_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "audio_lsb"),
channel_key=resolved_channel_key,
progress_file=params.get("progress_file"),
chip_tier=chip_tier_val,
)
channel_mode, channel_fingerprint = _get_channel_info(resolved_channel_key)
return {
"success": True,
"stego_b64": base64.b64encode(stego_audio).decode("ascii"),
"stats": {
"samples_modified": stats.samples_modified,
"total_samples": stats.total_samples,
"capacity_used": stats.capacity_used,
"bytes_embedded": stats.bytes_embedded,
"sample_rate": stats.sample_rate,
"channels": stats.channels,
"duration_seconds": stats.duration_seconds,
"embed_mode": stats.embed_mode,
},
"channel_mode": channel_mode,
"channel_fingerprint": channel_fingerprint,
}
def decode_audio_operation(params: dict) -> dict:
"""Handle audio decode operation (v4.3.0)."""
logger.debug("decode_audio_operation: mode=%s", params.get("embed_mode", "audio_auto"))
from fieldwitness.stego import decode_audio
progress_file = params.get("progress_file")
_write_decode_progress(progress_file, 5, "reading")
stego_data = base64.b64decode(params["stego_b64"])
reference_data = base64.b64decode(params["reference_b64"])
_write_decode_progress(progress_file, 15, "reading")
rsa_key_data = None
if params.get("rsa_key_b64"):
rsa_key_data = base64.b64decode(params["rsa_key_b64"])
resolved_channel_key = _resolve_channel_key(params.get("channel_key", "auto"))
result = decode_audio(
stego_audio=stego_data,
reference_photo=reference_data,
passphrase=params.get("passphrase", ""),
pin=params.get("pin"),
rsa_key_data=rsa_key_data,
rsa_password=params.get("rsa_password"),
embed_mode=params.get("embed_mode", "audio_auto"),
channel_key=resolved_channel_key,
progress_file=progress_file,
)
if result.is_file:
return {
"success": True,
"is_file": True,
"file_b64": base64.b64encode(result.file_data).decode("ascii"),
"filename": result.filename,
"mime_type": result.mime_type,
}
else:
return {
"success": True,
"is_file": False,
"message": result.message,
}
def audio_info_operation(params: dict) -> dict:
"""Handle audio info operation (v4.3.0)."""
from fieldwitness.stego import get_audio_info
from fieldwitness.stego.audio_steganography import calculate_audio_lsb_capacity
from fieldwitness.stego.spread_steganography import calculate_audio_spread_capacity
audio_data = base64.b64decode(params["audio_b64"])
info = get_audio_info(audio_data)
lsb_capacity = calculate_audio_lsb_capacity(audio_data)
spread_capacity = calculate_audio_spread_capacity(audio_data)
return {
"success": True,
"info": {
"sample_rate": info.sample_rate,
"channels": info.channels,
"duration_seconds": round(info.duration_seconds, 2),
"num_samples": info.num_samples,
"format": info.format,
"bit_depth": info.bit_depth,
"capacity_lsb": lsb_capacity,
"capacity_spread": spread_capacity.usable_capacity_bytes,
},
}
def channel_status_operation(params: dict) -> dict:
"""Handle channel status check (v4.0.0)."""
from fieldwitness.stego import get_channel_status
status = get_channel_status()
reveal = params.get("reveal", False)
return {
"success": True,
"status": {
"mode": status["mode"],
"configured": status["configured"],
"fingerprint": status.get("fingerprint"),
"source": status.get("source"),
"key": status.get("key") if reveal and status["configured"] else None,
},
}
def main():
"""Main entry point - read JSON from stdin, write JSON to stdout."""
try:
# Read all input
input_text = sys.stdin.read()
if not input_text.strip():
output = {"success": False, "error": "No input provided"}
else:
params = json.loads(input_text)
operation = params.get("operation")
logger.info("Worker handling operation: %s", operation)
if operation == "encode":
output = encode_operation(params)
elif operation == "decode":
output = decode_operation(params)
elif operation == "compare":
output = compare_operation(params)
elif operation == "capacity":
output = capacity_check_operation(params)
elif operation == "channel_status":
output = channel_status_operation(params)
# Audio operations (v4.3.0)
elif operation == "encode_audio":
output = encode_audio_operation(params)
elif operation == "decode_audio":
output = decode_audio_operation(params)
elif operation == "audio_info":
output = audio_info_operation(params)
else:
output = {"success": False, "error": f"Unknown operation: {operation}"}
except json.JSONDecodeError as e:
output = {"success": False, "error": f"Invalid JSON: {e}"}
except Exception as e:
output = {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"traceback": traceback.format_exc(),
}
# Write output as JSON
print(json.dumps(output), flush=True)
if __name__ == "__main__":
main()