Add progress bars, fix DCT decode, sparkly MOTD

Progress bar support (v4.1.2):
- Web frontend: Real-time progress during encode with phase display
- CLI: --progress flag with rich library for encode command
- Backend: progress_file parameter for async progress reporting

DCT decode bug fix:
- Fixed InvalidMagicBytesError not being caught in early-exit check
- RS-protected format (v4.1.0+) has length prefix first, not magic bytes
- Exception handler now catches both ValueError and InvalidMagicBytesError

MOTD update:
- Added sparkly header to setup.sh MOTD (matches other rpi scripts)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-01-05 20:25:33 -05:00
parent 040c44fec6
commit 2d3ed8a79a
11 changed files with 782 additions and 89 deletions

View File

@@ -55,7 +55,32 @@ except ImportError:
jio = None
# Import custom exceptions
from .exceptions import InvalidMagicBytesError, ReedSolomonError as StegasooRSError
from .exceptions import InvalidMagicBytesError
from .exceptions import ReedSolomonError as StegasooRSError
# Progress reporting interval (write every N blocks)
PROGRESS_INTERVAL = 50
def _write_progress(progress_file: str | None, current: int, total: int, phase: str = "embedding"):
"""Write progress to file for frontend polling."""
if progress_file is None:
return
try:
import json
with open(progress_file, "w") as f:
json.dump(
{
"current": current,
"total": total,
"percent": round((current / total) * 100, 1) if total > 0 else 0,
"phase": phase,
},
f,
)
except Exception:
pass # Don't let progress writing break encoding
# ============================================================================
@@ -189,7 +214,7 @@ def has_jpegio_support() -> bool:
# Check for reedsolo availability
try:
from reedsolo import RSCodec, ReedSolomonError
from reedsolo import ReedSolomonError, RSCodec
HAS_REEDSOLO = True
except ImportError:
@@ -559,6 +584,7 @@ def embed_in_dct(
seed: bytes,
output_format: str = OUTPUT_FORMAT_PNG,
color_mode: str = "color",
progress_file: str | None = None,
) -> tuple[bytes, DCTEmbedStats]:
"""Embed data using DCT coefficient modification."""
if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG):
@@ -568,10 +594,12 @@ def embed_in_dct(
color_mode = "color"
if output_format == OUTPUT_FORMAT_JPEG and HAS_JPEGIO:
return _embed_jpegio(data, carrier_image, seed, color_mode)
return _embed_jpegio(data, carrier_image, seed, color_mode, progress_file)
_check_scipy()
return _embed_scipy_dct_safe(data, carrier_image, seed, output_format, color_mode)
return _embed_scipy_dct_safe(
data, carrier_image, seed, output_format, color_mode, progress_file
)
def _embed_scipy_dct_safe(
@@ -580,6 +608,7 @@ def _embed_scipy_dct_safe(
seed: bytes,
output_format: str,
color_mode: str = "color",
progress_file: str | None = None,
) -> tuple[bytes, DCTEmbedStats]:
"""
Embed using scipy DCT with safe memory handling.
@@ -642,7 +671,7 @@ def _embed_scipy_dct_safe(
gc.collect()
# Embed in Y channel
Y_embedded = _embed_in_channel_safe(Y_padded, bits, block_order, blocks_x)
Y_embedded = _embed_in_channel_safe(Y_padded, bits, block_order, blocks_x, progress_file)
del Y_padded
gc.collect()
@@ -666,7 +695,7 @@ def _embed_scipy_dct_safe(
del image
gc.collect()
embedded = _embed_in_channel_safe(padded, bits, block_order, blocks_x)
embedded = _embed_in_channel_safe(padded, bits, block_order, blocks_x, progress_file)
del padded
gc.collect()
@@ -699,6 +728,7 @@ def _embed_in_channel_safe(
bits: list,
block_order: list,
blocks_x: int,
progress_file: str | None = None,
) -> np.ndarray:
"""
Embed bits in channel using safe DCT operations.
@@ -711,8 +741,9 @@ def _embed_in_channel_safe(
result = np.array(channel, dtype=np.float64, copy=True, order="C")
bit_idx = 0
total_blocks = len(block_order)
for block_num in block_order:
for block_idx, block_num in enumerate(block_order):
if bit_idx >= len(bits):
break
@@ -748,6 +779,14 @@ def _embed_in_channel_safe(
# Clean up this iteration
del block, dct_block, modified_block
# Report progress periodically
if progress_file and block_idx % PROGRESS_INTERVAL == 0:
_write_progress(progress_file, block_idx, total_blocks, "embedding")
# Final progress update
if progress_file:
_write_progress(progress_file, total_blocks, total_blocks, "finalizing")
# Force garbage collection
gc.collect()
@@ -804,6 +843,7 @@ def _embed_jpegio(
carrier_image: bytes,
seed: bytes,
color_mode: str = "color",
progress_file: str | None = None,
) -> tuple[bytes, DCTEmbedStats]:
"""Embed using jpegio for proper JPEG coefficient modification."""
import os
@@ -861,6 +901,9 @@ def _embed_jpegio(
)
coefs_used = 0
total_bits = len(bits)
progress_interval = max(total_bits // 20, 100) # Report ~20 times or every 100 bits
for bit_idx, pos_idx in enumerate(order):
if bit_idx >= len(bits):
break
@@ -876,6 +919,14 @@ def _embed_jpegio(
coefs_used += 1
# Report progress periodically
if progress_file and bit_idx % progress_interval == 0:
_write_progress(progress_file, bit_idx, total_bits, "embedding")
# Final progress before save
if progress_file:
_write_progress(progress_file, total_bits, total_bits, "saving")
jio.write(jpeg, output_path)
with open(output_path, "rb") as f:
@@ -971,8 +1022,8 @@ def _extract_scipy_dct_safe(stego_image: bytes, seed: bytes) -> bytes:
total_needed = (HEADER_SIZE + data_length) * 8
if len(all_bits) >= total_needed:
break
except ValueError:
pass
except (ValueError, InvalidMagicBytesError):
pass # RS-protected format has length prefix first, not magic bytes
del padded
gc.collect()
@@ -997,6 +1048,7 @@ def _extract_scipy_dct_safe(stego_image: bytes, seed: bytes) -> bytes:
# Count occurrences of each unique copy
from collections import Counter
counter = Counter(copies)
best_header, count = counter.most_common(1)[0]
@@ -1009,9 +1061,13 @@ def _extract_scipy_dct_safe(stego_image: bytes, seed: bytes) -> bytes:
# Sanity check: both lengths should be reasonable
max_reasonable = (len(all_bits) // 8) - RS_LENGTH_PREFIX_SIZE
if (raw_payload_length > 0 and raw_payload_length <= max_reasonable and
rs_encoded_length > 0 and rs_encoded_length <= max_reasonable and
rs_encoded_length >= raw_payload_length):
if (
raw_payload_length > 0
and raw_payload_length <= max_reasonable
and rs_encoded_length > 0
and rs_encoded_length <= max_reasonable
and rs_encoded_length >= raw_payload_length
):
# This looks like RS-protected format
total_bits_needed = (RS_LENGTH_PREFIX_SIZE + rs_encoded_length) * 8
@@ -1088,6 +1144,7 @@ def _extract_jpegio(stego_image: bytes, seed: bytes) -> bytes:
# Extract 3 copies and use majority voting
from collections import Counter
copies = []
for i in range(RS_LENGTH_COPIES):
start = i * RS_LENGTH_HEADER_SIZE
@@ -1104,9 +1161,13 @@ def _extract_jpegio(stego_image: bytes, seed: bytes) -> bytes:
# Sanity check
max_reasonable = (len(all_positions) // 8) - RS_LENGTH_PREFIX_SIZE
if (raw_payload_length > 0 and raw_payload_length <= max_reasonable and
rs_encoded_length > 0 and rs_encoded_length <= max_reasonable and
rs_encoded_length >= raw_payload_length):
if (
raw_payload_length > 0
and raw_payload_length <= max_reasonable
and rs_encoded_length > 0
and rs_encoded_length <= max_reasonable
and rs_encoded_length >= raw_payload_length
):
total_bits_needed = (RS_LENGTH_PREFIX_SIZE + rs_encoded_length) * 8
if len(all_positions) >= total_bits_needed:

View File

@@ -37,6 +37,7 @@ def encode(
dct_output_format: str = "png",
dct_color_mode: str = "color",
channel_key: str | bool | None = None,
progress_file: str | None = None,
) -> EncodeResult:
"""
Encode a message or file into an image.
@@ -118,6 +119,7 @@ def encode(
embed_mode=embed_mode,
dct_output_format=dct_output_format,
dct_color_mode=dct_color_mode,
progress_file=progress_file,
)
# Generate filename

View File

@@ -39,6 +39,31 @@ from .debug import debug
from .exceptions import CapacityError, EmbeddingError
from .models import EmbedStats, FilePayload
# Progress reporting interval
PROGRESS_INTERVAL = 1000 # Write every N pixels for LSB
def _write_progress(progress_file: str | None, current: int, total: int, phase: str = "embedding"):
"""Write progress to file for frontend polling."""
if progress_file is None:
return
try:
import json
with open(progress_file, "w") as f:
json.dump(
{
"current": current,
"total": total,
"percent": round((current / total) * 100, 1) if total > 0 else 0,
"phase": phase,
},
f,
)
except Exception:
pass # Don't let progress writing break encoding
# Lossless formats that preserve LSB data
LOSSLESS_FORMATS = {"PNG", "BMP", "TIFF"}
@@ -526,6 +551,7 @@ def embed_in_image(
embed_mode: str = EMBED_MODE_LSB,
dct_output_format: str = DCT_OUTPUT_PNG,
dct_color_mode: str = "color",
progress_file: str | None = None,
) -> tuple[bytes, Union[EmbedStats, "DCTEmbedStats"], str]:
"""
Embed data into an image using specified mode.
@@ -579,6 +605,7 @@ def embed_in_image(
pixel_key,
output_format=dct_output_format,
color_mode=dct_color_mode,
progress_file=progress_file,
)
# Determine extension based on output format
@@ -594,7 +621,7 @@ def embed_in_image(
return stego_bytes, dct_stats, ext
# LSB MODE
return _embed_lsb(data, image_data, pixel_key, bits_per_channel, output_format)
return _embed_lsb(data, image_data, pixel_key, bits_per_channel, output_format, progress_file)
def _embed_lsb(
@@ -603,6 +630,7 @@ def _embed_lsb(
pixel_key: bytes,
bits_per_channel: int = 1,
output_format: str | None = None,
progress_file: str | None = None,
) -> tuple[bytes, EmbedStats, str]:
"""
Embed data using LSB steganography (internal implementation).
@@ -659,8 +687,9 @@ def _embed_lsb(
bit_idx = 0
modified_pixels = 0
total_pixels_to_process = len(selected_indices)
for pixel_idx in selected_indices:
for progress_idx, pixel_idx in enumerate(selected_indices):
if bit_idx >= len(binary_data):
break
@@ -690,6 +719,16 @@ def _embed_lsb(
new_pixels[pixel_idx] = (r, g, b)
modified_pixels += 1
# Report progress periodically
if progress_file and progress_idx % PROGRESS_INTERVAL == 0:
_write_progress(progress_file, progress_idx, total_pixels_to_process, "embedding")
# Final progress before save
if progress_file:
_write_progress(
progress_file, total_pixels_to_process, total_pixels_to_process, "saving"
)
debug.print(f"Modified {modified_pixels} pixels (out of {len(selected_indices)} selected)")
stego_img = Image.new("RGB", img.size)