diff --git a/.gitignore b/.gitignore
index 6bc3f13..54d16c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
+# Embedded repos (AUR packaging)
+aur-cli-upload/
+
# Python
__pycache__/
*.py[cod]
diff --git a/IdeasScout_PLANS_20260324.md b/IdeasScout_PLANS_20260324.md
new file mode 100644
index 0000000..c714006
--- /dev/null
+++ b/IdeasScout_PLANS_20260324.md
@@ -0,0 +1,294 @@
+# Stegasoo Ideas Scout — Implementation Plans (2026-03-24)
+
+Baseline: v4.3.0, Python >=3.11, FORMAT_VERSION 5, no existing users (no backward compat constraints).
+
+---
+
+## Tier 1 — Quick Wins
+
+### 1. Platform-Calibrated DCT Presets
+
+**Description**: `--platform telegram|discord|signal|whatsapp` flag for DCT encode. Bakes in each platform's known recompression parameters. Pre-verifies payload survives before outputting.
+
+**Implementation approach**:
+- New file `src/stegasoo/platform_presets.py` — `PlatformPreset` dataclass + `PRESETS` dict mapping platform → tuned `quant_step`, `jpeg_quality`, `embed_positions`, `max_dimension`, `recompress_quality`
+- `dct_steganography.py`: `_embed_scipy_dct_safe()` / `_embed_jpegio()` accept optional preset overrides for `QUANT_STEP`, `DEFAULT_EMBED_POSITIONS`, output quality
+- New `pre_verify_survival()` function: encode → re-save at platform quality → extract → pass/fail
+- Thread `platform` param through `encode.py` → `steganography.py` → DCT functions
+- `cli.py`: add `--platform` as `click.Choice` + `--verify/--no-verify` (pre-verification doubles encode time)
+- LSB + `--platform` should error early — LSB data is destroyed by any JPEG recompression
+
+**Known platform params** (from research):
+| Platform | Quality | Max Dimension | Notes |
+|----------|---------|---------------|-------|
+| Telegram | ~82 | 2560×2560 | ~81KB embeddable |
+| Discord | ~85 | Varies (Nitro) | |
+| Signal | ~80 | Aggressive | |
+| WhatsApp | ~70 | 1600×1600 | Most lossy |
+
+**Go/No-Go metrics**:
+- >95% payload survival rate per platform at 1KB message size in automated tests
+- Pre-verification correctly predicts real platform behavior (manual validation per platform at least once)
+
+**Complexity**: **M** — new file + parameter threading through 4-5 functions
+
+**Risks**: Platform params change without notice. Add version/date stamps to presets and a `stegasoo tools verify-platform` test command.
+
+---
+
+### 2. Steganalysis Self-Check (`stegasoo check`)
+
+**Description**: New CLI command running chi-square and RS (Regular-Singular) statistical analysis on stego images. Outputs detectability risk level (low/medium/high).
+
+**Implementation approach**:
+- New file `src/stegasoo/steganalysis.py`:
+ - `chi_square_analysis(image_data) -> float` — chi-square statistic on LSB distribution per channel
+ - `rs_analysis(image_data) -> float` — Regular-Singular groups analysis (requires numpy)
+ - `assess_risk(chi_p, rs_estimate) -> str` — maps to "low"/"medium"/"high"
+ - `check_image(image_data) -> dict` — orchestrator
+- `cli.py`: new `@cli.command("check")` with `IMAGE` arg, `--json`, `--mode lsb|dct|auto`
+- `constants.py`: threshold constants for chi-square p-value and RS boundaries
+- `__init__.py`: export `check_image` in `__all__`
+- Start LSB-only; DCT steganalysis (calibration attack) deferred
+
+**Go/No-Go metrics**:
+- Clean images → consistently "low risk"
+- Naive sequential LSB → "high risk"
+- Stegasoo LSB at <50% capacity → "low" or "medium"
+
+**Complexity**: **M** — ~150 lines numpy per test, straightforward CLI integration
+
+---
+
+### 3. Python 3.13 DCT Cleanup
+
+**Description**: The `jpegio` → `jpeglib` migration is already done in code. Remaining work: rename stale `jpegio` references and verify on 3.13.
+
+**Implementation approach**:
+- `dct_steganography.py`: rename `HAS_JPEGIO` → `HAS_JPEGLIB`, `_jpegio_*` functions → `_jpeglib_*`, update constant names (`JPEGIO_MAGIC` → `JPEGLIB_MAGIC`, etc.)
+- Verify `jpeglib.to_jpegio()` compatibility shim — if jpeglib plans to deprecate it, migrate to native API
+- Run full test suite on Python 3.13
+
+**Go/No-Go metrics**:
+- All DCT tests pass on Python 3.13
+- No deprecation warnings from jpeglib
+
+**Complexity**: **S** — renaming and verification only
+
+---
+
+## Tier 2 — Strategic
+
+### 4. Content-Adaptive Embedding (S-UNIWARD/WOW-inspired)
+
+**Description**: Replace uniform-random pixel selection with texture-weighted cost functions. Embed preferentially in busy/textured regions where changes are least detectable. 3-5x harder to detect statistically.
+
+**Implementation approach**:
+- New file `src/stegasoo/adaptive_cost.py`:
+ - `compute_cost_map(image_data) -> np.ndarray` — per-pixel distortion cost via directional high-pass filters (Daubechets wavelet bank / KB filter)
+ - `select_pixels_by_cost(cost_map, pixel_key, num_needed) -> list[int]` — weighted sampling, still ChaCha20-seeded for determinism
+- `steganography.py`:
+ - `generate_pixel_indices()`: add `cost_map` param, use weighted sampling when provided
+ - `_embed_lsb()`: compute cost map when adaptive mode enabled
+ - `_extract_lsb()`: must compute identical cost map to find same pixels
+- `dct_steganography.py`: adapt `DEFAULT_EMBED_POSITIONS` per-block based on block texture energy
+- Thread `adaptive: bool` through `encode.py`/`decode.py`
+- `constants.py`: add `EMBED_MODE_ADAPTIVE_LSB`, filter kernels, cost thresholds
+
+**Go/No-Go metrics**:
+- Chi-square test (Feature 2) shows measurable improvement vs uniform-random
+- **Critical**: cost map computation is deterministic across platforms (quantize to fixed-point integers)
+- Round-trip decode succeeds on Linux x86, Linux ARM, macOS
+
+**Complexity**: **L** — novel algorithm, cross-platform determinism requirement, touches core embedding
+
+**Risks**: Floating-point differences in wavelet computation could break extraction. Mitigate with integer quantization. Increases encode/decode time ~2-3x.
+
+---
+
+### 5. Per-Message Forward Secrecy via HKDF
+
+**Description**: Derive ephemeral per-message encryption keys using HKDF expansion from the Argon2id root key + random nonce. Compromising one message doesn't reveal others.
+
+**Implementation approach**:
+- `crypto.py`:
+ - Add `from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand`
+ - `derive_message_key(root_key, nonce) -> bytes` — HKDF-Expand with SHA-256
+ - `encrypt_message()`: generate 16-byte random nonce, derive per-message key, embed nonce in header
+ - `decrypt_message()`: extract nonce, derive same key
+ - Also derive pixel selection key via HKDF with different `info` param
+- `constants.py`:
+ - Bump `FORMAT_VERSION` to 6
+ - `HKDF_INFO_ENCRYPTION = b"stegasoo-v6-encrypt"`, `HKDF_INFO_PIXEL = b"stegasoo-v6-pixel"`
+ - `MESSAGE_NONCE_SIZE = 16`
+- Header grows from 66 → 82 bytes: add `message_nonce(16)` field
+- Update `HEADER_OVERHEAD` / `ENCRYPTION_OVERHEAD` in `steganography.py`
+
+**Go/No-Go metrics**:
+- Two messages with identical credentials produce different ciphertexts and different pixel locations
+- `cryptography` library HKDF works with existing Argon2id output
+
+**Complexity**: **M** — well-defined crypto change, touches security-critical header format
+
+---
+
+### 6. PWA Mobile Interface
+
+**Description**: Convert Flask Web UI to Progressive Web App. Mobile-optimized, installable, offline-capable static pages.
+
+**Implementation approach**:
+- New files in `frontends/web/static/`: `manifest.json`, `sw.js`, icon set (192×192, 512×512)
+- Base template: add manifest link, theme-color meta, viewport meta, service worker registration
+- `app.py`: serve manifest with correct MIME, add cache headers for static assets
+- Responsive CSS for encode/decode accordion forms
+- Camera capture: `` for reference photo
+- Service worker caches static assets only — NOT encode/decode API endpoints
+
+**Go/No-Go metrics**:
+- Lighthouse PWA score >= 90
+- Installable on Android Chrome and iOS Safari
+- Offline: static pages load, encode/decode shows graceful "offline" message
+
+**Complexity**: **M** — frontend only, no core library changes
+
+**Risks**: Camera capture requires HTTPS (already supported via `ssl_utils.py`).
+
+---
+
+## Tier 3 — Moonshot
+
+### 7. Plausible Deniability / Dual-Payload Mode
+
+**Description**: Two independent encrypted payloads in one carrier, each with different credentials. Reveal decoy under coercion; real payload stays hidden.
+
+**Implementation approach**:
+- New file `src/stegasoo/dual_payload.py`:
+ - `encode_dual(message_a, message_b, carrier, creds_a, creds_b)`
+ - Partition available pixels into two disjoint pools using different seeds
+ - **Critical**: ALL images (single or dual) must fill unused pixel pool with random data so single-payload and dual-payload images are indistinguishable
+- `steganography.py`: `generate_pixel_indices()` gets `exclude_indices` param
+- `decode.py`: each credential set finds a different valid payload; wrong credentials produce garbage
+- CLI + Web UI: dual-payload encode workflow
+
+**Go/No-Go metrics**:
+- Single-payload and dual-payload images are statistically indistinguishable (chi-square can't differentiate)
+- Each payload decodes independently
+- Wrong credentials for one payload don't reveal other payload's existence
+
+**Complexity**: **XL** — novel design, halves capacity per payload, challenging UX, needs rigorous security analysis
+
+**Dependencies**: Feature 2 (validation), Feature 4 (detectability reduction)
+
+---
+
+## Architectural Improvements
+
+### 8. EmbeddingBackend Protocol
+
+**Description**: Typed plugin interface for all embedding algorithms. Replace if/elif dispatch in `steganography.py` with a registry.
+
+**Implementation approach**:
+- New package `src/stegasoo/backends/`:
+ - `protocol.py` — `EmbeddingBackend(Protocol)` with `embed()`, `extract()`, `calculate_capacity()`, `is_available()`
+ - `lsb.py`, `dct.py` — wrap existing functions
+ - `registry.py` — `BackendRegistry` mapping mode strings to backends
+- `steganography.py`: `embed_in_image()` / `extract_from_image()` dispatch via registry
+- `__init__.py`: export protocol and `register_backend()`
+
+**Complexity**: **M** — implement before Features 4 and 7 (they become new backends)
+
+---
+
+### 9. HKDF Key Separation
+
+Subsumed by Feature 5. The HKDF expansion provides:
+- Encryption key: `HKDF-Expand(root_key, info="stegasoo-encrypt", nonce)`
+- Pixel selection key: `HKDF-Expand(root_key, info="stegasoo-pixel", nonce)`
+- Future: MAC key, padding key, etc.
+
+---
+
+### 10. `[core]` Extra with Minimal Deps
+
+**Description**: Move Pillow to `[image]` extra, base deps = `cryptography` + `argon2-cffi` + `zstandard` only.
+
+**Complexity**: **S** — but Pillow is used in `crypto.py` for photo hashing (core to security model). Only worth it with a concrete headless use case. **Low priority.**
+
+---
+
+## Ecosystem Features
+
+### 11. Aletheia Integration
+
+Optional `--engine aletheia` backend for Feature 2's `stegasoo check`. BSD-licensed, provides SPA/RS/WS attacks + ML classifiers. **Complexity: S** (after Feature 2). **Depends on**: Feature 2.
+
+### 12. C2PA/AI Provenance Watermarking
+
+Embed C2PA metadata alongside stego payloads. **Complexity: L** — C2PA is a complex standard. Potentially conflicts with stego goals (adds detectable metadata). Research-heavy.
+
+### 13. Signal/Matrix Bot
+
+Bot that decodes stego images in a channel using configured channel key. **Complexity: M** — integration work, uses existing `decode()` API.
+
+### 14. Homebrew Tap + Nix Flake
+
+Package distribution for macOS/NixOS. **Complexity: S** — packaging only, no code changes.
+
+---
+
+## Summary Table
+
+| # | Feature | Tier | Size | Dependencies | Primary Files |
+|---|---------|------|------|-------------|---------------|
+| 1 | Platform DCT Presets | T1 | M | — | new `platform_presets.py`, `dct_steganography.py`, `encode.py`, `cli.py` |
+| 2 | Steganalysis Self-Check | T1 | M | — | new `steganalysis.py`, `cli.py`, `constants.py` |
+| 3 | Python 3.13 DCT Cleanup | T1 | S | — | `dct_steganography.py` |
+| 4 | Content-Adaptive Embedding | T2 | L | numpy, #2 | new `adaptive_cost.py`, `steganography.py`, `constants.py` |
+| 5 | HKDF Forward Secrecy | T2 | M | — | `crypto.py`, `constants.py`, `steganography.py` |
+| 6 | PWA Mobile Interface | T2 | M | — | `frontends/web/` templates + static |
+| 7 | Dual-Payload Mode | T3 | XL | #2, #4 | new `dual_payload.py`, `steganography.py`, `cli.py` |
+| 8 | EmbeddingBackend Protocol | Arch | M | — | new `backends/` package, `steganography.py` |
+| 9 | HKDF Key Separation | Arch | — | Included in #5 | `crypto.py` |
+| 10 | `[core]` Extra | Arch | S | — | `pyproject.toml` |
+| 11 | Aletheia Integration | Eco | S | #2 | `steganalysis.py` |
+| 12 | C2PA Watermarking | Eco | L | — | new module |
+| 13 | Signal/Matrix Bot | Eco | M | — | new `bots/` package |
+| 14 | Homebrew + Nix | Eco | S | — | packaging files only |
+
+---
+
+## Suggested Roadmap
+
+### Phase 1 — Foundations (v4.4.0)
+
+1. **#3** Python 3.13 DCT Cleanup (S) — unblocks CI on 3.13
+2. **#8** EmbeddingBackend Protocol (M) — architectural cleanup before new embedding work
+3. **#2** Steganalysis Self-Check (M) — validation tooling for everything that follows
+
+### Phase 2 — Security & Robustness (v4.5.0)
+
+4. **#5** HKDF Forward Secrecy (M) — FORMAT_VERSION bump to 6, improved crypto
+5. **#1** Platform-Calibrated DCT Presets (M) — high user value for social media
+6. **#14** Homebrew + Nix (S) — distribution expansion
+
+### Phase 3 — Advanced Steganography (v5.0.0)
+
+7. **#4** Content-Adaptive Embedding (L) — major security improvement
+8. **#6** PWA Mobile Interface (M) — parallel frontend work stream
+
+### Phase 4 — Moonshot (v5.x+)
+
+9. **#7** Dual-Payload Mode (XL) — after #2 and #4 are solid
+10. **#12** C2PA Watermarking (L) — research-heavy
+11. **#13** Signal/Matrix Bot (M) — community-driven
+
+---
+
+## Additional Ideas (Backlog)
+
+- **Animated GIF steganography** — LSB in GIF frames, natural multi-media extension
+- **PDF steganography** — whitespace/font metric/embedded image payloads
+- **Batch encode** — `stegasoo batch-encode --dir /photos/` with auto carrier selection (BATCH_* constants suggest this was planned)
+- **Stego identification** — `stegasoo identify image.png` probes for known stego signatures
+- **Per-device credential sync via QR** — channel key as stego image of reference photo
+- **`stegasoo verify`** — decode + confirm message matches expected hash without revealing contents
diff --git a/rpi/flash-stock-img.sh b/rpi/flash-stock-img.sh
index 9ed3a81..a795b40 100755
--- a/rpi/flash-stock-img.sh
+++ b/rpi/flash-stock-img.sh
@@ -1,7 +1,7 @@
#!/bin/bash
# Flash Raspberry Pi image with headless config (Trixie/Bookworm compatible)
-# Usage: ./flash-stock-img.sh
-# Reads settings from config.json in same directory
+# Usage: ./flash-stock-img.sh [-c config.json]
+# Reads settings from config.json in same directory (or specify with -c)
#
# Uses the same firstrun.sh approach as rpi-imager for compatibility
@@ -10,11 +10,31 @@ set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="$SCRIPT_DIR/config.json"
+# ============================================================================
+# Parse options
+# ============================================================================
+usage() {
+ echo "Usage: $0 [-c config.json] "
+ echo " -c FILE Use alternate config file (default: config.json in script dir)"
+ echo "Example: $0 2025-12-04-raspios-trixie-arm64-lite.img.xz /dev/sdb"
+ echo "Example: $0 -c myconfig.json raspios.img.xz /dev/sdb"
+ exit 1
+}
+
+while getopts "c:h" opt; do
+ case $opt in
+ c) CONFIG_FILE="$OPTARG" ;;
+ h) usage ;;
+ *) usage ;;
+ esac
+done
+shift $((OPTIND - 1))
+
# ============================================================================
# Load config
# ============================================================================
if [ ! -f "$CONFIG_FILE" ]; then
- echo "Error: config.json not found at $CONFIG_FILE"
+ echo "Error: config file not found at $CONFIG_FILE"
exit 1
fi
@@ -38,9 +58,7 @@ echo
# Validate args
# ============================================================================
if [ $# -ne 2 ]; then
- echo "Usage: $0 "
- echo "Example: $0 2025-12-04-raspios-trixie-arm64-lite.img.xz /dev/sdb"
- exit 1
+ usage
fi
IMAGE="$1"
diff --git a/rpi/pull-image.sh b/rpi/pull-image.sh
index a3d53d7..84c71fe 100755
--- a/rpi/pull-image.sh
+++ b/rpi/pull-image.sh
@@ -14,9 +14,9 @@ BOLD='\033[1m'
NC='\033[0m'
if [ $# -ne 2 ]; then
- echo "Usage: $0 "
- echo "Example: $0 /dev/sdb stegasoo-rpi-4.2.1.img.zst"
- exit 1
+ echo "Usage: $0 "
+ echo "Example: $0 /dev/sdb stegasoo-rpi-4.2.1.img.zst"
+ exit 1
fi
DEVICE="$1"
@@ -24,13 +24,13 @@ OUTPUT="$2"
# Check for root
if [ "$EUID" -ne 0 ]; then
- echo -e "${RED}Error: Must run as root (sudo)${NC}"
- exit 1
+ echo -e "${RED}Error: Must run as root (sudo)${NC}"
+ exit 1
fi
if [ ! -b "$DEVICE" ]; then
- echo -e "${RED}Error: Device not found: $DEVICE${NC}"
- exit 1
+ echo -e "${RED}Error: Device not found: $DEVICE${NC}"
+ exit 1
fi
echo -e "${BOLD}Device info:${NC}"
@@ -39,14 +39,14 @@ echo
# Find partitions
if [ -b "${DEVICE}1" ]; then
- BOOT_PART="${DEVICE}1"
- ROOT_PART="${DEVICE}2"
+ BOOT_PART="${DEVICE}1"
+ ROOT_PART="${DEVICE}2"
elif [ -b "${DEVICE}p1" ]; then
- BOOT_PART="${DEVICE}p1"
- ROOT_PART="${DEVICE}p2"
+ BOOT_PART="${DEVICE}p1"
+ ROOT_PART="${DEVICE}p2"
else
- echo -e "${RED}Error: Could not find partitions${NC}"
- exit 1
+ echo -e "${RED}Error: Could not find partitions${NC}"
+ exit 1
fi
# Unmount any mounted partitions
@@ -62,65 +62,65 @@ echo -e "${BOLD}Checking partition size...${NC}"
# Get current partition size in bytes
CURRENT_SIZE=$(blockdev --getsize64 "$ROOT_PART")
-TARGET_BYTES=$((16 * 1024 * 1024 * 1024)) # 16GB in bytes
+TARGET_BYTES=$((16 * 1024 * 1024 * 1024)) # 16GB in bytes
CURRENT_GB=$(echo "scale=2; $CURRENT_SIZE / 1073741824" | bc)
echo " Current rootfs size: ${CURRENT_GB}GB"
if [ "$CURRENT_SIZE" -gt "$TARGET_BYTES" ]; then
- echo -e "${YELLOW}Resizing rootfs to 16GB...${NC}"
+ echo -e "${YELLOW}Resizing rootfs to 16GB...${NC}"
- # Get boot partition end in sectors
- BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's')
+ # Get boot partition end in sectors
+ BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's')
- # Calculate 16GB in sectors (512 byte sectors)
- ROOT_SIZE_SECTORS=33554432
- ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS))
+ # Calculate 16GB in sectors (512 byte sectors)
+ ROOT_SIZE_SECTORS=33554432
+ ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS))
- # SHRINKING: filesystem first, then partition
- echo " Checking filesystem..."
- e2fsck -f -y "$ROOT_PART" 2>/dev/null || true
+ # SHRINKING: filesystem first, then partition
+ echo " Checking filesystem..."
+ e2fsck -f -y "$ROOT_PART" 2>/dev/null || true
- # Shrink filesystem to 15.5GB (leave room for partition overhead)
- echo " Shrinking filesystem to 15500M..."
- resize2fs "$ROOT_PART" 15500M
+ # Shrink filesystem to 15.5GB (leave room for partition overhead)
+ echo " Shrinking filesystem to 15500M..."
+ resize2fs "$ROOT_PART" 15500M
- # Delete and recreate partition 2 with 16GB size
- echo " Shrinking partition to 16GB..."
- parted -s "$DEVICE" rm 2
- parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s
+ # Delete and recreate partition 2 with 16GB size
+ echo " Shrinking partition to 16GB..."
+ parted -s "$DEVICE" rm 2
+ parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s
- # Refresh partition table
- partprobe "$DEVICE"
- sleep 2
+ # Refresh partition table
+ partprobe "$DEVICE"
+ sleep 2
- # Expand filesystem to fill the partition exactly
- echo " Expanding filesystem to fill partition..."
- e2fsck -f -y "$ROOT_PART" 2>/dev/null || true
- resize2fs "$ROOT_PART"
+ # Expand filesystem to fill the partition exactly
+ echo " Expanding filesystem to fill partition..."
+ e2fsck -f -y "$ROOT_PART" 2>/dev/null || true
+ resize2fs "$ROOT_PART"
- echo -e "${GREEN} Rootfs resized to 16GB${NC}"
+ echo -e "${GREEN} Rootfs resized to 16GB${NC}"
elif [ "$CURRENT_SIZE" -lt "$TARGET_BYTES" ]; then
- echo -e "${YELLOW} Rootfs is smaller than 16GB - expanding...${NC}"
+ echo -e "${YELLOW} Rootfs is smaller than 16GB - expanding...${NC}"
- # Get boot partition end in sectors
- BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's')
- ROOT_SIZE_SECTORS=33554432
- ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS))
+ # Get boot partition end in sectors
+ BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's')
+ ROOT_SIZE_SECTORS=33554432
+ ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS))
- # EXPANDING: partition first, then filesystem
- parted -s "$DEVICE" rm 2
- parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s
+ # EXPANDING: partition first, then filesystem
+ parted -s "$DEVICE" rm 2
+ parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s
- partprobe "$DEVICE"
- sleep 2
+ partprobe "$DEVICE"
+ sleep 2
- e2fsck -f -y "$ROOT_PART" 2>/dev/null || true
- resize2fs "$ROOT_PART"
+ e2fsck -f -y "$ROOT_PART" 2>/dev/null || true
+ resize2fs "$ROOT_PART"
- echo -e "${GREEN} Rootfs expanded to 16GB${NC}"
+ echo -e "${GREEN} Rootfs expanded to 16GB${NC}"
else
- echo -e "${GREEN} Rootfs already ~16GB${NC}"
+ echo -e "${GREEN} Rootfs already ~16GB${NC}"
fi
# ============================================================================
@@ -135,8 +135,8 @@ echo
END_SECTOR=$(parted -s "$DEVICE" unit s print | grep "^ 2" | awk '{print $3}' | tr -d 's')
if [ -z "$END_SECTOR" ]; then
- echo -e "${RED}Error: Could not determine partition 2 end sector${NC}"
- exit 1
+ echo -e "${RED}Error: Could not determine partition 2 end sector${NC}"
+ exit 1
fi
# Add a small buffer (1MB = 2048 sectors) for safety
@@ -150,8 +150,8 @@ echo
read -p "Proceed with image pull? [Y/n] " confirm
if [[ "$confirm" =~ ^[Nn]$ ]]; then
- echo "Aborted."
- exit 1
+ echo "Aborted."
+ exit 1
fi
echo
@@ -159,13 +159,13 @@ echo -e "${GREEN}Pulling image...${NC}"
echo
# Use pv if available for progress, otherwise fallback to dd status
-if command -v pv &> /dev/null; then
- dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS 2>/dev/null | \
- pv -s $TOTAL_BYTES | \
- zstd -T0 -3 > "$OUTPUT"
+if command -v pv &>/dev/null; then
+ dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS 2>/dev/null |
+ pv -s $TOTAL_BYTES |
+ zstd -T0 -19 --ultra >"$OUTPUT"
else
- dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS status=progress | \
- zstd -T0 -3 > "$OUTPUT"
+ dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS status=progress |
+ zstd -T0 -19 --ultra >"$OUTPUT"
fi
echo
@@ -178,16 +178,16 @@ ls -lh "$OUTPUT"
echo
read -p "Create .zst.zip wrapper for GitHub? [y/N] " zip_confirm
if [[ "$zip_confirm" =~ ^[Yy]$ ]]; then
- ZIP_OUTPUT="${OUTPUT}.zip"
- echo -e "${YELLOW}Creating zip wrapper (store mode, no compression)...${NC}"
- zip -0 "$ZIP_OUTPUT" "$OUTPUT"
- echo -e "${GREEN}Done!${NC} Upload this to GitHub Releases:"
- ls -lh "$ZIP_OUTPUT"
- echo
- echo "Users can flash with:"
- echo " sudo ./rpi/flash-image.sh $ZIP_OUTPUT"
+ ZIP_OUTPUT="${OUTPUT}.zip"
+ echo -e "${YELLOW}Creating zip wrapper (store mode, no compression)...${NC}"
+ zip -0 "$ZIP_OUTPUT" "$OUTPUT"
+ echo -e "${GREEN}Done!${NC} Upload this to GitHub Releases:"
+ ls -lh "$ZIP_OUTPUT"
+ echo
+ echo "Users can flash with:"
+ echo " sudo ./rpi/flash-image.sh $ZIP_OUTPUT"
else
- echo
- echo "To verify:"
- echo " zstdcat $OUTPUT | fdisk -l /dev/stdin"
+ echo
+ echo "To verify:"
+ echo " zstdcat $OUTPUT | fdisk -l /dev/stdin"
fi
diff --git a/rpi/train_proj.json b/rpi/train_proj.json
new file mode 100644
index 0000000..e569b37
--- /dev/null
+++ b/rpi/train_proj.json
@@ -0,0 +1,13 @@
+{
+ "hostname": "running_trains",
+ "username": "admin",
+ "password": "runthemtrains",
+ "wifiSSID": "WitchHazelWrecked",
+ "wifiPassword": "BeefPigsMoo",
+ "wifiCountry": "US",
+ "locale": "en_US.UTF-8",
+ "keyboardLayout": "us",
+ "timezone": "America/New_York",
+ "enableSSH": true
+}
+
diff --git a/src/stegasoo/__init__.py b/src/stegasoo/__init__.py
index fb47037..5432ec7 100644
--- a/src/stegasoo/__init__.py
+++ b/src/stegasoo/__init__.py
@@ -22,6 +22,9 @@ from .channel import (
validate_channel_key,
)
+# Audio support — gated by STEGASOO_AUDIO env var and dependency availability
+from .constants import AUDIO_ENABLED, VIDEO_ENABLED
+
# Crypto functions
from .crypto import get_active_channel_key, get_channel_fingerprint, has_argon2
from .decode import decode, decode_file, decode_text
@@ -54,9 +57,6 @@ from .steganography import (
# Utilities
from .utils import generate_filename
-# Audio support — gated by STEGASOO_AUDIO env var and dependency availability
-from .constants import AUDIO_ENABLED, VIDEO_ENABLED
-
HAS_AUDIO_SUPPORT = AUDIO_ENABLED
HAS_VIDEO_SUPPORT = VIDEO_ENABLED
@@ -77,6 +77,24 @@ else:
encode_audio = None
decode_audio = None
+# Video support — gated by STEGASOO_VIDEO env var and ffmpeg + audio deps
+if VIDEO_ENABLED:
+ from .decode import decode_video
+ from .encode import encode_video
+ from .video_utils import (
+ calculate_video_capacity,
+ detect_video_format,
+ get_video_info,
+ validate_video,
+ )
+else:
+ detect_video_format = None
+ get_video_info = None
+ validate_video = None
+ calculate_video_capacity = None
+ encode_video = None
+ decode_video = None
+
# QR Code utilities - optional, may not be available
try:
from .qr_utils import (
@@ -117,6 +135,8 @@ from .constants import (
EMBED_MODE_AUTO,
EMBED_MODE_DCT,
EMBED_MODE_LSB,
+ EMBED_MODE_VIDEO_AUTO,
+ EMBED_MODE_VIDEO_LSB,
FORMAT_VERSION,
LOSSLESS_FORMATS,
MAX_FILE_PAYLOAD_SIZE,
@@ -159,7 +179,13 @@ from .exceptions import (
SteganographyError,
StegasooError,
UnsupportedAudioFormatError,
+ UnsupportedVideoFormatError,
ValidationError,
+ VideoCapacityError,
+ VideoError,
+ VideoExtractionError,
+ VideoTranscodeError,
+ VideoValidationError,
)
# Models
@@ -175,6 +201,9 @@ from .models import (
GenerateResult,
ImageInfo,
ValidationResult,
+ VideoCapacityInfo,
+ VideoEmbedStats,
+ VideoInfo,
)
from .validation import (
validate_audio_embed_mode,
@@ -212,6 +241,13 @@ __all__ = [
"HAS_VIDEO_SUPPORT",
"validate_audio_embed_mode",
"validate_audio_file",
+ # Video (v4.4.0)
+ "encode_video",
+ "decode_video",
+ "detect_video_format",
+ "get_video_info",
+ "validate_video",
+ "calculate_video_capacity",
# Generation
"generate_pin",
"generate_passphrase",
@@ -273,6 +309,10 @@ __all__ = [
"AudioEmbedStats",
"AudioInfo",
"AudioCapacityInfo",
+ # Video models
+ "VideoEmbedStats",
+ "VideoInfo",
+ "VideoCapacityInfo",
# Exceptions
"StegasooError",
"ValidationError",
@@ -303,6 +343,13 @@ __all__ = [
"AudioExtractionError",
"AudioTranscodeError",
"UnsupportedAudioFormatError",
+ # Video exceptions
+ "VideoError",
+ "VideoValidationError",
+ "VideoCapacityError",
+ "VideoExtractionError",
+ "VideoTranscodeError",
+ "UnsupportedVideoFormatError",
# Constants
"FORMAT_VERSION",
"MIN_PASSPHRASE_WORDS",
@@ -329,4 +376,7 @@ __all__ = [
"EMBED_MODE_AUDIO_LSB",
"EMBED_MODE_AUDIO_SPREAD",
"EMBED_MODE_AUDIO_AUTO",
+ # Video constants
+ "EMBED_MODE_VIDEO_LSB",
+ "EMBED_MODE_VIDEO_AUTO",
]
diff --git a/src/stegasoo/cli.py b/src/stegasoo/cli.py
index 7264516..5dcec66 100644
--- a/src/stegasoo/cli.py
+++ b/src/stegasoo/cli.py
@@ -853,6 +853,410 @@ def audio_info(ctx, audio):
raise SystemExit(1)
+# =============================================================================
+# VIDEO COMMANDS (v4.4.0)
+# =============================================================================
+
+
+@cli.command("video-encode")
+@click.argument("carrier", type=click.Path(exists=True))
+@click.option(
+ "-r",
+ "--reference",
+ required=True,
+ type=click.Path(exists=True),
+ help="Reference photo (shared secret)",
+)
+@click.option("-m", "--message", help="Message to encode")
+@click.option(
+ "-f",
+ "--file",
+ "file_payload",
+ type=click.Path(exists=True),
+ help="File to embed instead of message",
+)
+@click.option("-o", "--output", type=click.Path(), help="Output video path")
+@click.option(
+ "--passphrase",
+ prompt=True,
+ hide_input=True,
+ confirmation_prompt=True,
+ help="Passphrase (recommend 4+ words)",
+)
+@click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code")
+@click.option(
+ "--rsa-key",
+ type=click.Path(exists=True),
+ help="RSA private key PEM file",
+)
+@click.option("--rsa-password", default=None, help="Password for encrypted RSA key")
+@click.option("--channel-key", default=None, help="Channel key for deployment isolation")
+@click.option("--dry-run", is_flag=True, help="Show capacity usage without encoding")
+@click.pass_context
+def video_encode(
+ ctx,
+ carrier,
+ reference,
+ message,
+ file_payload,
+ output,
+ passphrase,
+ pin,
+ rsa_key,
+ rsa_password,
+ channel_key,
+ dry_run,
+):
+ """
+ Encode a message or file into a video carrier.
+
+ Output is MKV format with FFV1 lossless codec to preserve embedded data.
+
+ Examples:
+
+ stegasoo video-encode carrier.mp4 -r ref.jpg -m "Secret"
+
+ stegasoo video-encode carrier.mp4 -r ref.jpg -f secret.pdf -o stego.mkv
+
+ stegasoo video-encode carrier.mp4 -r ref.jpg -m "Secret" --dry-run
+ """
+ from .constants import VIDEO_ENABLED
+
+ if not VIDEO_ENABLED:
+ raise click.UsageError(
+ "Video support is disabled. Install ffmpeg and audio extras, "
+ "or set STEGASOO_VIDEO=1 to force enable."
+ )
+
+ from .encode import encode_video
+ from .models import FilePayload
+ from .video_utils import calculate_video_capacity, get_video_info
+
+ if not message and not file_payload:
+ raise click.UsageError("Either --message or --file is required")
+
+ # Read RSA key if provided
+ rsa_key_data = None
+ if rsa_key:
+ with open(rsa_key, "rb") as f:
+ rsa_key_data = f.read()
+
+ # Calculate payload size
+ if file_payload:
+ payload_size = Path(file_payload).stat().st_size
+ payload_type = "file"
+ else:
+ payload_size = len(message.encode("utf-8"))
+ payload_type = "text"
+
+ # Read input files
+ with open(reference, "rb") as f:
+ reference_data = f.read()
+ with open(carrier, "rb") as f:
+ carrier_data = f.read()
+
+ if dry_run:
+ try:
+ info = get_video_info(carrier_data)
+ capacity_info = calculate_video_capacity(carrier_data)
+
+ result = {
+ "carrier": carrier,
+ "reference": reference,
+ "format": info.format,
+ "codec": info.codec,
+ "resolution": f"{info.width}x{info.height}",
+ "fps": round(info.fps, 2),
+ "duration_seconds": round(info.duration_seconds, 2),
+ "total_frames": info.total_frames,
+ "i_frames": info.i_frame_count,
+ "capacity_bytes": capacity_info.usable_capacity_bytes,
+ "capacity_kb": round(capacity_info.usable_capacity_bytes / 1024, 1),
+ "payload_type": payload_type,
+ "payload_size": payload_size,
+ "usage_percent": round(
+ payload_size / capacity_info.usable_capacity_bytes * 100, 1
+ )
+ if capacity_info.usable_capacity_bytes > 0
+ else 0,
+ "fits": payload_size < capacity_info.usable_capacity_bytes,
+ }
+
+ if ctx.obj.get("json"):
+ click.echo(json.dumps(result, indent=2))
+ else:
+ click.echo(f"Carrier: {carrier} ({info.format}, {info.codec})")
+ click.echo(f"Resolution: {info.width}x{info.height} @ {info.fps:.2f} fps")
+ click.echo(f"Duration: {info.duration_seconds:.1f}s")
+ click.echo(f"Frames: {info.total_frames} total, {info.i_frame_count} I-frames")
+ click.echo(f"Reference: {reference}")
+ click.echo(
+ f"Capacity: {capacity_info.usable_capacity_bytes:,} bytes "
+ f"({capacity_info.usable_capacity_bytes // 1024} KB)"
+ )
+ click.echo(f"Payload: {payload_size:,} bytes ({payload_type})")
+ click.echo(f"Usage: {result['usage_percent']}%")
+ click.echo(f"Status: {'✓ Fits' if result['fits'] else '✗ Too large'}")
+ click.echo()
+ click.echo("Note: Output will be MKV format with FFV1 lossless codec")
+ except Exception as e:
+ if ctx.obj.get("json"):
+ click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
+ else:
+ click.echo(f"✗ Capacity check failed: {e}", err=True)
+ raise SystemExit(1)
+ return
+
+ # Determine output path
+ if not output:
+ output = f"{Path(carrier).stem}_encoded.mkv"
+
+ try:
+ if file_payload:
+ payload = FilePayload.from_file(file_payload)
+ else:
+ payload = message
+
+ stego_video, stats = encode_video(
+ message=payload,
+ reference_photo=reference_data,
+ carrier_video=carrier_data,
+ passphrase=passphrase,
+ pin=pin,
+ rsa_key_data=rsa_key_data,
+ rsa_password=rsa_password,
+ channel_key=channel_key,
+ )
+
+ with open(output, "wb") as f:
+ f.write(stego_video)
+
+ if ctx.obj.get("json"):
+ click.echo(
+ json.dumps(
+ {
+ "status": "success",
+ "carrier": carrier,
+ "reference": reference,
+ "output": output,
+ "codec": stats.codec,
+ "frames_modified": stats.frames_modified,
+ "duration_seconds": round(stats.duration_seconds, 2),
+ "capacity_used": round(stats.capacity_used * 100, 1),
+ },
+ indent=2,
+ )
+ )
+ else:
+ click.echo(f"✓ Encoded to {output}")
+ click.echo(f" Codec: {stats.codec} (lossless)")
+ click.echo(f" Frames modified: {stats.frames_modified}")
+ click.echo(f" Capacity used: {stats.capacity_used * 100:.1f}%")
+
+ except Exception as e:
+ if ctx.obj.get("json"):
+ click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
+ else:
+ click.echo(f"✗ Video encoding failed: {e}", err=True)
+ raise SystemExit(1)
+
+
+@cli.command("video-decode")
+@click.argument("video", type=click.Path(exists=True))
+@click.option(
+ "-r",
+ "--reference",
+ required=True,
+ type=click.Path(exists=True),
+ help="Reference photo (shared secret)",
+)
+@click.option("--passphrase", prompt=True, hide_input=True, help="Passphrase")
+@click.option("--pin", prompt=True, hide_input=True, help="PIN code")
+@click.option(
+ "--rsa-key",
+ type=click.Path(exists=True),
+ help="RSA private key PEM file",
+)
+@click.option("--rsa-password", default=None, help="Password for encrypted RSA key")
+@click.option("--channel-key", default=None, help="Channel key for deployment isolation")
+@click.option("-o", "--output", type=click.Path(), help="Output path for file payloads")
+@click.pass_context
+def video_decode(
+ ctx, video, reference, passphrase, pin, rsa_key, rsa_password, channel_key, output
+):
+ """
+ Decode a message or file from stego video.
+
+ Examples:
+
+ stegasoo video-decode stego.mkv -r ref.jpg
+
+ stegasoo video-decode stego.mkv -r ref.jpg -o ./extracted/
+ """
+ from .constants import VIDEO_ENABLED
+
+ if not VIDEO_ENABLED:
+ raise click.UsageError(
+ "Video support is disabled. Install ffmpeg and audio extras, "
+ "or set STEGASOO_VIDEO=1 to force enable."
+ )
+
+ from .decode import decode_video
+
+ # Read RSA key if provided
+ rsa_key_data = None
+ if rsa_key:
+ with open(rsa_key, "rb") as f:
+ rsa_key_data = f.read()
+
+ with open(video, "rb") as f:
+ video_data = f.read()
+ with open(reference, "rb") as f:
+ reference_data = f.read()
+
+ try:
+ result = decode_video(
+ stego_video=video_data,
+ reference_photo=reference_data,
+ passphrase=passphrase,
+ pin=pin,
+ rsa_key_data=rsa_key_data,
+ rsa_password=rsa_password,
+ channel_key=channel_key,
+ )
+
+ if result.is_file:
+ filename = result.filename or "decoded_file"
+ output_path = Path(output) / filename if output else Path(filename)
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ with open(output_path, "wb") as f:
+ f.write(result.file_data)
+
+ if ctx.obj.get("json"):
+ click.echo(
+ json.dumps(
+ {
+ "status": "success",
+ "video": video,
+ "payload_type": "file",
+ "filename": filename,
+ "output": str(output_path),
+ "size": len(result.file_data),
+ },
+ indent=2,
+ )
+ )
+ else:
+ click.echo(f"✓ Extracted file: {output_path}")
+ click.echo(f" Size: {len(result.file_data):,} bytes")
+ else:
+ if ctx.obj.get("json"):
+ click.echo(
+ json.dumps(
+ {
+ "status": "success",
+ "video": video,
+ "payload_type": "text",
+ "message": result.message,
+ },
+ indent=2,
+ )
+ )
+ else:
+ click.echo(f"Decoded from {video}:")
+ click.echo(result.message)
+
+ except Exception as e:
+ if ctx.obj.get("json"):
+ click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
+ else:
+ click.echo(f"✗ Video decoding failed: {e}", err=True)
+ raise SystemExit(1)
+
+
+@cli.command("video-info")
+@click.argument("video", type=click.Path(exists=True))
+@click.pass_context
+def video_info(ctx, video):
+ """
+ Show video file information and steganographic capacity.
+
+ Examples:
+
+ stegasoo video-info carrier.mp4
+
+ stegasoo --json video-info carrier.mp4
+ """
+ from .constants import VIDEO_ENABLED
+
+ if not VIDEO_ENABLED:
+ raise click.UsageError(
+ "Video support is disabled. Install ffmpeg and audio extras, "
+ "or set STEGASOO_VIDEO=1 to force enable."
+ )
+
+ from .video_utils import calculate_video_capacity, get_video_info
+
+ with open(video, "rb") as f:
+ video_data = f.read()
+
+ try:
+ info = get_video_info(video_data)
+ capacity_info = calculate_video_capacity(video_data)
+
+ result = {
+ "file": video,
+ "format": info.format,
+ "codec": info.codec,
+ "resolution": {
+ "width": info.width,
+ "height": info.height,
+ },
+ "fps": round(info.fps, 2),
+ "duration_seconds": round(info.duration_seconds, 2),
+ "total_frames": info.total_frames,
+ "i_frame_count": info.i_frame_count,
+ "bitrate": info.bitrate,
+ "file_size": len(video_data),
+ "capacity": {
+ "bytes": capacity_info.usable_capacity_bytes,
+ "kb": round(capacity_info.usable_capacity_bytes / 1024, 1),
+ "mb": round(capacity_info.usable_capacity_bytes / (1024 * 1024), 2),
+ },
+ }
+
+ if ctx.obj.get("json"):
+ click.echo(json.dumps(result, indent=2))
+ else:
+ click.echo(f"File: {video}")
+ click.echo(f"Format: {info.format}")
+ click.echo(f"Codec: {info.codec}")
+ click.echo(f"Resolution: {info.width}x{info.height}")
+ click.echo(f"Frame rate: {info.fps:.2f} fps")
+ click.echo(f"Duration: {info.duration_seconds:.1f}s")
+ click.echo(f"Total frames: {info.total_frames:,}")
+ click.echo(f"I-frames (keyframes): {info.i_frame_count:,}")
+ if info.bitrate:
+ click.echo(f"Bitrate: {info.bitrate // 1000} kbps")
+ click.echo(f"File size: {len(video_data):,} bytes")
+ click.echo()
+ click.echo("Steganographic capacity (LSB, I-frames only):")
+ click.echo(
+ f" {capacity_info.usable_capacity_bytes:,} bytes "
+ f"({capacity_info.usable_capacity_bytes // 1024} KB)"
+ )
+ click.echo()
+ click.echo("Note: Output will be MKV format with FFV1 lossless codec")
+
+ except Exception as e:
+ if ctx.obj.get("json"):
+ click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2))
+ else:
+ click.echo(f"✗ Video info failed: {e}", err=True)
+ raise SystemExit(1)
+
+
# =============================================================================
# BATCH COMMANDS
# =============================================================================
diff --git a/src/stegasoo/constants.py b/src/stegasoo/constants.py
index d5a8e8d..47fbd46 100644
--- a/src/stegasoo/constants.py
+++ b/src/stegasoo/constants.py
@@ -411,3 +411,27 @@ AUDIO_ECHO_DELAY_0 = 50 # Echo delay for bit 0 (samples at 44.1kHz ~ 1.1ms)
AUDIO_ECHO_DELAY_1 = 100 # Echo delay for bit 1 (samples at 44.1kHz ~ 2.3ms)
AUDIO_ECHO_AMPLITUDE = 0.3 # Echo strength (relative to original)
AUDIO_ECHO_WINDOW_SIZE = 8192 # Window size for echo embedding
+
+
+# =============================================================================
+# VIDEO STEGANOGRAPHY (v4.4.0)
+# =============================================================================
+
+# Video embedding modes
+EMBED_MODE_VIDEO_LSB = "video_lsb"
+EMBED_MODE_VIDEO_AUTO = "video_auto"
+VALID_VIDEO_EMBED_MODES = {EMBED_MODE_VIDEO_LSB}
+
+# Video magic bytes (for format detection in stego video)
+VIDEO_MAGIC_LSB = b"VIDL"
+
+# Video input limits
+MAX_VIDEO_FILE_SIZE = 4 * 1024 * 1024 * 1024 # 4 GB
+MAX_VIDEO_DURATION = 3600 # 1 hour in seconds
+MIN_VIDEO_RESOLUTION = (64, 64)
+MAX_VIDEO_RESOLUTION = (7680, 4320) # 8K UHD
+ALLOWED_VIDEO_EXTENSIONS = {"mp4", "mkv", "webm", "avi", "mov"}
+
+# Video output settings
+VIDEO_OUTPUT_CODEC = "ffv1" # FFV1 lossless codec
+VIDEO_OUTPUT_CONTAINER = "mkv" # MKV container for FFV1
diff --git a/src/stegasoo/decode.py b/src/stegasoo/decode.py
index e0b3bdd..bdf9348 100644
--- a/src/stegasoo/decode.py
+++ b/src/stegasoo/decode.py
@@ -383,3 +383,100 @@ def decode_audio(
debug.print(f"Decryption successful: {result.payload_type}")
return result
+
+
+def decode_video(
+ stego_video: bytes,
+ reference_photo: bytes,
+ passphrase: str,
+ pin: str = "",
+ rsa_key_data: bytes | None = None,
+ rsa_password: str | None = None,
+ embed_mode: str = "video_auto",
+ channel_key: str | bool | None = None,
+ progress_file: str | None = None,
+) -> DecodeResult:
+ """
+ Decode a message or file from stego video.
+
+ Extracts data from I-frames (keyframes) using LSB steganography.
+
+ Args:
+ stego_video: Stego video bytes
+ reference_photo: Shared reference photo bytes
+ passphrase: Shared passphrase
+ pin: Optional static PIN
+ rsa_key_data: Optional RSA key bytes
+ rsa_password: Optional RSA key password
+ embed_mode: 'video_auto' or 'video_lsb'
+ channel_key: Channel key for deployment/group isolation
+ progress_file: Optional path to write progress JSON
+
+ Returns:
+ DecodeResult with message or file data
+ """
+ from .constants import (
+ EMBED_MODE_VIDEO_AUTO,
+ EMBED_MODE_VIDEO_LSB,
+ VIDEO_ENABLED,
+ )
+
+ if not VIDEO_ENABLED:
+ raise ExtractionError(
+ "Video support is disabled. Install video extras and ffmpeg, "
+ "or set STEGASOO_VIDEO=1 to force enable."
+ )
+
+ from .video_utils import detect_video_format
+
+ debug.print(
+ f"decode_video: mode={embed_mode}, " f"passphrase length={len(passphrase.split())} words"
+ )
+
+ # Validate inputs
+ require_valid_image(reference_photo, "Reference photo")
+ require_security_factors(pin, rsa_key_data)
+
+ if pin:
+ require_valid_pin(pin)
+ if rsa_key_data:
+ require_valid_rsa_key(rsa_key_data, rsa_password)
+
+ # Detect format
+ video_format = detect_video_format(stego_video)
+ debug.print(f"Detected video format: {video_format}")
+
+ if video_format == "unknown":
+ raise ExtractionError("Could not detect video format.")
+
+ _write_progress(progress_file, 20, 100, "initializing")
+
+ # Derive pixel/frame selection key
+ from .crypto import derive_pixel_key
+
+ pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key)
+
+ _write_progress(progress_file, 25, 100, "extracting")
+
+ encrypted = None
+
+ if embed_mode == EMBED_MODE_VIDEO_AUTO or embed_mode == EMBED_MODE_VIDEO_LSB:
+ from .video_steganography import extract_from_video_lsb
+
+ encrypted = extract_from_video_lsb(stego_video, pixel_key, progress_file=progress_file)
+ if encrypted:
+ debug.print("Video LSB extraction succeeded")
+ else:
+ raise ValueError(f"Invalid video embed mode: {embed_mode}")
+
+ if not encrypted:
+ debug.print("No data extracted from video")
+ raise ExtractionError("Could not extract data from video. Check your credentials.")
+
+ debug.print(f"Extracted {len(encrypted)} bytes from video")
+
+ # Decrypt
+ result = decrypt_message(encrypted, reference_photo, passphrase, pin, rsa_key_data, channel_key)
+
+ debug.print(f"Decryption successful: {result.payload_type}")
+ return result
diff --git a/src/stegasoo/encode.py b/src/stegasoo/encode.py
index 604d2b0..5b81c13 100644
--- a/src/stegasoo/encode.py
+++ b/src/stegasoo/encode.py
@@ -8,6 +8,9 @@ Changes in v4.0.0:
Changes in v4.3.0:
- Added encode_audio() for audio steganography
+
+Changes in v4.4.0:
+- Added encode_video() for video steganography
"""
from __future__ import annotations
@@ -18,7 +21,7 @@ from typing import TYPE_CHECKING
from .constants import EMBED_MODE_LSB
from .crypto import derive_pixel_key, encrypt_message
from .debug import debug
-from .exceptions import AudioError
+from .exceptions import AudioError, VideoError
from .models import EncodeResult, FilePayload
from .steganography import embed_in_image
from .utils import generate_filename
@@ -31,7 +34,7 @@ from .validation import (
)
if TYPE_CHECKING:
- from .models import AudioEmbedStats
+ from .models import AudioEmbedStats, VideoEmbedStats
def encode(
@@ -365,3 +368,97 @@ def encode_audio(
raise ValueError(f"Invalid audio embed mode: {embed_mode}")
return stego_audio, stats
+
+
+def encode_video(
+ message: str | bytes | FilePayload,
+ reference_photo: bytes,
+ carrier_video: bytes,
+ passphrase: str,
+ pin: str = "",
+ rsa_key_data: bytes | None = None,
+ rsa_password: str | None = None,
+ embed_mode: str = "video_lsb",
+ channel_key: str | bool | None = None,
+ progress_file: str | None = None,
+) -> tuple[bytes, VideoEmbedStats]:
+ """
+ Encode a message or file into a video carrier.
+
+ Embeds data across I-frames (keyframes) using LSB steganography.
+ Output is an MKV container with FFV1 lossless codec to preserve
+ the embedded data perfectly.
+
+ Args:
+ message: Text message, raw bytes, or FilePayload to hide
+ reference_photo: Shared reference photo bytes
+ carrier_video: Carrier video bytes (MP4, MKV, WebM, AVI, MOV)
+ passphrase: Shared passphrase
+ pin: Optional static PIN
+ rsa_key_data: Optional RSA private key PEM bytes
+ rsa_password: Optional password for encrypted RSA key
+ embed_mode: 'video_lsb' (currently the only option)
+ channel_key: Channel key for deployment/group isolation
+ progress_file: Optional path to write progress JSON
+
+ Returns:
+ Tuple of (stego video bytes, VideoEmbedStats)
+
+ Note:
+ The output video will be in MKV format with FFV1 lossless codec,
+ regardless of the input format. This is necessary to preserve
+ the embedded data without lossy compression artifacts.
+ """
+ from .constants import EMBED_MODE_VIDEO_LSB, VIDEO_ENABLED
+
+ if not VIDEO_ENABLED:
+ raise VideoError(
+ "Video support is disabled. Install video extras and ffmpeg, "
+ "or set STEGASOO_VIDEO=1 to force enable."
+ )
+
+ from .video_utils import detect_video_format
+
+ debug.print(
+ f"encode_video: mode={embed_mode}, "
+ f"passphrase length={len(passphrase.split())} words, "
+ f"pin={'set' if pin else 'none'}"
+ )
+
+ # Validate inputs
+ require_valid_payload(message)
+ require_valid_image(reference_photo, "Reference photo")
+ require_security_factors(pin, rsa_key_data)
+
+ if pin:
+ require_valid_pin(pin)
+ if rsa_key_data:
+ require_valid_rsa_key(rsa_key_data, rsa_password)
+
+ # Detect video format
+ video_format = detect_video_format(carrier_video)
+ debug.print(f"Detected video format: {video_format}")
+
+ if video_format == "unknown":
+ raise VideoError("Could not detect video format. Supported: MP4, MKV, WebM, AVI, MOV.")
+
+ # Encrypt message
+ encrypted = encrypt_message(
+ message, reference_photo, passphrase, pin, rsa_key_data, channel_key
+ )
+ debug.print(f"Encrypted payload: {len(encrypted)} bytes")
+
+ # Derive pixel/frame selection key
+ pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key)
+
+ # Embed based on mode
+ if embed_mode == EMBED_MODE_VIDEO_LSB:
+ from .video_steganography import embed_in_video_lsb
+
+ stego_video, stats = embed_in_video_lsb(
+ encrypted, carrier_video, pixel_key, progress_file=progress_file
+ )
+ else:
+ raise ValueError(f"Invalid video embed mode: {embed_mode}")
+
+ return stego_video, stats
diff --git a/src/stegasoo/exceptions.py b/src/stegasoo/exceptions.py
index 57be0fd..54b4905 100644
--- a/src/stegasoo/exceptions.py
+++ b/src/stegasoo/exceptions.py
@@ -243,3 +243,51 @@ class UnsupportedAudioFormatError(AudioError):
"""Audio format not supported."""
pass
+
+
+# ============================================================================
+# VIDEO ERRORS
+# ============================================================================
+
+
+class VideoError(SteganographyError):
+ """Base class for video steganography errors."""
+
+ pass
+
+
+class VideoValidationError(ValidationError):
+ """Video validation failed."""
+
+ pass
+
+
+class VideoCapacityError(CapacityError):
+ """Video carrier too small for message."""
+
+ def __init__(self, needed: int, available: int):
+ self.needed = needed
+ self.available = available
+ # Call SteganographyError.__init__ directly (skip CapacityError's image-specific message)
+ SteganographyError.__init__(
+ self,
+ f"Video carrier too small. Need {needed:,} bytes, have {available:,} bytes capacity.",
+ )
+
+
+class VideoExtractionError(ExtractionError):
+ """Failed to extract hidden data from video."""
+
+ pass
+
+
+class VideoTranscodeError(VideoError):
+ """Video transcoding failed."""
+
+ pass
+
+
+class UnsupportedVideoFormatError(VideoError):
+ """Video format not supported."""
+
+ pass
diff --git a/src/stegasoo/models.py b/src/stegasoo/models.py
index 4e71bc0..3ee72b4 100644
--- a/src/stegasoo/models.py
+++ b/src/stegasoo/models.py
@@ -336,3 +336,56 @@ class AudioCapacityInfo:
chip_length: int | None = None # v4.4.0: samples per chip
embeddable_channels: int | None = None # v4.4.0: channels used (excl. LFE)
total_channels: int | None = None # v4.4.0: total channels in carrier
+
+
+# =============================================================================
+# VIDEO STEGANOGRAPHY MODELS (v4.4.0)
+# =============================================================================
+
+
+@dataclass
+class VideoEmbedStats:
+ """Statistics from video embedding."""
+
+ frames_modified: int
+ total_frames: int
+ capacity_used: float # 0.0 - 1.0
+ bytes_embedded: int
+ width: int
+ height: int
+ fps: float
+ duration_seconds: float
+ embed_mode: str # "video_lsb"
+ codec: str # Output codec (e.g., "ffv1")
+
+ @property
+ def modification_percent(self) -> float:
+ """Percentage of frames modified."""
+ return (self.frames_modified / self.total_frames) * 100 if self.total_frames > 0 else 0
+
+
+@dataclass
+class VideoInfo:
+ """Information about a video file."""
+
+ width: int
+ height: int
+ fps: float
+ duration_seconds: float
+ total_frames: int
+ i_frame_count: int
+ format: str # "mp4", "mkv", "webm", etc.
+ codec: str # "h264", "vp9", "ffv1", etc.
+ bitrate: int | None = None # For lossy formats
+
+
+@dataclass
+class VideoCapacityInfo:
+ """Capacity information for video steganography."""
+
+ total_frames: int
+ i_frames: int
+ usable_capacity_bytes: int
+ embed_mode: str
+ resolution: tuple[int, int]
+ duration_seconds: float
diff --git a/src/stegasoo/video_steganography.py b/src/stegasoo/video_steganography.py
new file mode 100644
index 0000000..37b5714
--- /dev/null
+++ b/src/stegasoo/video_steganography.py
@@ -0,0 +1,496 @@
+"""
+Stegasoo Video Steganography — LSB Embedding/Extraction (v4.4.0)
+
+Frame-based LSB embedding for video files.
+
+Hides data in the least significant bits of video frame pixels. Uses the
+existing image steganography engine for per-frame embedding, providing
+high capacity across multiple I-frames.
+
+Strategy:
+1. Extract I-frames (keyframes) from video using ffmpeg
+2. Embed payload across I-frames using existing LSB engine
+3. Re-encode video with modified frames using FFV1 lossless codec
+4. Output: MKV container with embedded data
+
+Uses ChaCha20 as a CSPRNG for pseudo-random frame selection and pixel
+selection within frames, ensuring that without the key an attacker cannot
+determine which frames/pixels were modified.
+"""
+
+import struct
+import tempfile
+from pathlib import Path
+
+from .constants import (
+ EMBED_MODE_VIDEO_LSB,
+ VIDEO_MAGIC_LSB,
+ VIDEO_OUTPUT_CODEC,
+)
+from .debug import debug
+from .exceptions import VideoCapacityError, VideoError
+from .models import VideoEmbedStats
+from .steganography import ENCRYPTION_OVERHEAD, _embed_lsb, _extract_lsb
+from .video_utils import extract_frames, get_video_info, reassemble_video
+
+# Progress reporting interval — write every N frames
+PROGRESS_INTERVAL = 5
+
+
+# =============================================================================
+# PROGRESS REPORTING
+# =============================================================================
+
+
+def _write_progress(progress_file: str | None, current: int, total: int, phase: str = "embedding"):
+ """Write progress to file for frontend polling."""
+ if progress_file is None:
+ return
+ try:
+ import json
+
+ with open(progress_file, "w") as f:
+ json.dump(
+ {
+ "current": current,
+ "total": total,
+ "percent": round((current / total) * 100, 1) if total > 0 else 0,
+ "phase": phase,
+ },
+ f,
+ )
+ except Exception:
+ pass # Don't let progress writing break encoding
+
+
+# =============================================================================
+# CAPACITY
+# =============================================================================
+
+
+def calculate_video_lsb_capacity(video_data: bytes) -> int:
+ """
+ Calculate the maximum bytes that can be embedded in a video via LSB.
+
+ Calculates capacity based on I-frames (keyframes) only. Each I-frame
+ provides capacity proportional to its pixel count.
+
+ Args:
+ video_data: Raw bytes of a video file.
+
+ Returns:
+ Maximum embeddable payload size in bytes (after subtracting overhead).
+
+ Raises:
+ VideoError: If the video cannot be read or is in an unsupported format.
+ """
+ from .video_utils import calculate_video_capacity
+
+ capacity_info = calculate_video_capacity(video_data, EMBED_MODE_VIDEO_LSB)
+
+ debug.print(
+ f"Video LSB capacity: {capacity_info.usable_capacity_bytes} bytes "
+ f"({capacity_info.i_frames} I-frames, {capacity_info.resolution[0]}x{capacity_info.resolution[1]})"
+ )
+
+ return capacity_info.usable_capacity_bytes
+
+
+# =============================================================================
+# FRAME INDEX GENERATION (ChaCha20 CSPRNG)
+# =============================================================================
+
+
+def generate_frame_indices(key: bytes, num_frames: int, num_needed: int) -> list[int]:
+ """
+ Generate pseudo-random frame indices using ChaCha20 as a CSPRNG.
+
+ Produces a deterministic sequence of unique frame indices so that
+ the same key always yields the same embedding locations.
+
+ Args:
+ key: 32-byte key for the ChaCha20 cipher.
+ num_frames: Total number of frames available.
+ num_needed: How many unique frame indices are required.
+
+ Returns:
+ List of ``num_needed`` unique indices in [0, num_frames).
+ """
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
+
+ debug.validate(len(key) == 32, f"Frame key must be 32 bytes, got {len(key)}")
+ debug.validate(num_frames > 0, f"Number of frames must be positive, got {num_frames}")
+ debug.validate(num_needed > 0, f"Number needed must be positive, got {num_needed}")
+ debug.validate(
+ num_needed <= num_frames,
+ f"Cannot select {num_needed} frames from {num_frames} available",
+ )
+
+ debug.print(f"Generating {num_needed} frame indices from {num_frames} total frames")
+
+ # Use a different nonce offset for frame selection (vs pixel selection)
+ nonce = b"\x01" + b"\x00" * 15 # Different from pixel selection nonce
+
+ if num_needed >= num_frames // 2:
+ # Full Fisher-Yates shuffle
+ cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
+ encryptor = cipher.encryptor()
+
+ indices = list(range(num_frames))
+ random_bytes = encryptor.update(b"\x00" * (num_frames * 4))
+
+ for i in range(num_frames - 1, 0, -1):
+ j_bytes = random_bytes[(num_frames - 1 - i) * 4 : (num_frames - i) * 4]
+ j = int.from_bytes(j_bytes, "big") % (i + 1)
+ indices[i], indices[j] = indices[j], indices[i]
+
+ return indices[:num_needed]
+
+ # Direct sampling
+ selected: list[int] = []
+ used: set[int] = set()
+
+ cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend())
+ encryptor = cipher.encryptor()
+
+ bytes_needed = (num_needed * 2) * 4
+ random_bytes = encryptor.update(b"\x00" * bytes_needed)
+
+ byte_offset = 0
+ while len(selected) < num_needed and byte_offset < len(random_bytes) - 4:
+ idx = int.from_bytes(random_bytes[byte_offset : byte_offset + 4], "big") % num_frames
+ byte_offset += 4
+
+ if idx not in used:
+ used.add(idx)
+ selected.append(idx)
+
+ debug.validate(
+ len(selected) == num_needed,
+ f"Failed to generate enough indices: {len(selected)}/{num_needed}",
+ )
+ return selected
+
+
+# =============================================================================
+# EMBEDDING
+# =============================================================================
+
+
+@debug.time
+def embed_in_video_lsb(
+ data: bytes,
+ carrier_video: bytes,
+ pixel_key: bytes,
+ progress_file: str | None = None,
+) -> tuple[bytes, VideoEmbedStats]:
+ """
+ Embed data into video frames using LSB steganography.
+
+ The payload is prepended with a 4-byte magic header and a 4-byte
+ big-endian length prefix. Data is distributed across I-frames using
+ pseudo-random selection based on the pixel_key.
+
+ The output video uses FFV1 lossless codec in MKV container to
+ preserve the embedded data perfectly.
+
+ Args:
+ data: Encrypted payload bytes to embed.
+ carrier_video: Raw bytes of the carrier video file.
+ pixel_key: 32-byte key for frame and pixel selection.
+ progress_file: Optional path for progress JSON (frontend polling).
+
+ Returns:
+ Tuple of (stego video bytes, VideoEmbedStats).
+
+ Raises:
+ VideoCapacityError: If the payload is too large for the carrier.
+ VideoError: On any other embedding failure.
+ """
+ debug.print(f"Video LSB embedding {len(data)} bytes")
+ debug.data(pixel_key, "Pixel key for embedding")
+ debug.validate(len(pixel_key) == 32, f"Pixel key must be 32 bytes, got {len(pixel_key)}")
+
+ try:
+ # Get video info
+ video_info = get_video_info(carrier_video)
+ debug.print(
+ f"Carrier video: {video_info.width}x{video_info.height}, "
+ f"{video_info.fps:.2f} fps, {video_info.duration_seconds:.1f}s, "
+ f"{video_info.i_frame_count} I-frames"
+ )
+
+ # Prepend magic + length prefix
+ header = VIDEO_MAGIC_LSB + struct.pack(">I", len(data))
+ payload = header + data
+ debug.print(f"Payload with header: {len(payload)} bytes")
+
+ # Calculate capacity and check fit
+ capacity = calculate_video_lsb_capacity(carrier_video)
+ if len(payload) > capacity + ENCRYPTION_OVERHEAD:
+ raise VideoCapacityError(len(payload), capacity)
+
+ # Extract I-frames to temp directory
+ with tempfile.TemporaryDirectory(prefix="stegasoo_video_") as temp_dir_str:
+ temp_dir = Path(temp_dir_str)
+
+ _write_progress(progress_file, 5, 100, "extracting_frames")
+
+ frames, _ = extract_frames(carrier_video, temp_dir, keyframes_only=True)
+ num_frames = len(frames)
+
+ debug.print(f"Extracted {num_frames} I-frames for embedding")
+
+ if num_frames == 0:
+ raise VideoError("No I-frames found in video")
+
+ # Calculate bytes per frame (minus 4 byte length prefix used by _embed_lsb)
+ pixels_per_frame = video_info.width * video_info.height
+ bytes_per_frame = (pixels_per_frame * 3) // 8 - 4 # 3 bits per pixel, minus len prefix
+
+ # For simplicity, embed entire payload in first frame if it fits
+ # This makes extraction straightforward
+ if len(payload) <= bytes_per_frame:
+ debug.print(f"Payload fits in single frame ({len(payload)} <= {bytes_per_frame})")
+ frame_path = frames[0]
+
+ with open(frame_path, "rb") as f:
+ frame_data = f.read()
+
+ try:
+ stego_frame, stats, ext = _embed_lsb(
+ payload,
+ frame_data,
+ pixel_key,
+ bits_per_channel=1,
+ output_format="PNG",
+ )
+
+ with open(frame_path, "wb") as f:
+ f.write(stego_frame)
+
+ modified_frames = 1
+
+ except Exception as e:
+ debug.print(f"Failed to embed in frame: {e}")
+ raise VideoError(f"Failed to embed in frame: {e}")
+ else:
+ # For larger payloads, we need to split across frames
+ # Each frame stores: 4-byte chunk length + chunk data
+ debug.print("Splitting payload across multiple frames")
+
+ frames_needed = (len(payload) + bytes_per_frame - 1) // bytes_per_frame
+ frames_needed = min(frames_needed, num_frames)
+
+ debug.print(f"Using {frames_needed} frames to embed {len(payload)} bytes")
+
+ # For now, use sequential frames for simplicity
+ modified_frames = 0
+ bytes_remaining = len(payload)
+ payload_offset = 0
+
+ for frame_idx in range(frames_needed):
+ if bytes_remaining <= 0:
+ break
+
+ frame_path = frames[frame_idx]
+
+ with open(frame_path, "rb") as f:
+ frame_data = f.read()
+
+ chunk_size = min(bytes_remaining, bytes_per_frame)
+ chunk = payload[payload_offset : payload_offset + chunk_size]
+
+ try:
+ stego_frame, stats, ext = _embed_lsb(
+ chunk,
+ frame_data,
+ pixel_key,
+ bits_per_channel=1,
+ output_format="PNG",
+ )
+
+ with open(frame_path, "wb") as f:
+ f.write(stego_frame)
+
+ modified_frames += 1
+ payload_offset += chunk_size
+ bytes_remaining -= chunk_size
+
+ except Exception as e:
+ debug.print(f"Failed to embed in frame {frame_idx}: {e}")
+ raise VideoError(f"Failed to embed in frame {frame_idx}: {e}")
+
+ if progress_file and frame_idx % PROGRESS_INTERVAL == 0:
+ pct = 10 + int((frame_idx / frames_needed) * 70)
+ _write_progress(progress_file, pct, 100, "embedding")
+
+ _write_progress(progress_file, 80, 100, "reassembling")
+
+ # Reassemble video with modified frames
+ stego_video = reassemble_video(
+ frames,
+ carrier_video,
+ fps=1.0, # I-frame only videos use 1 fps
+ )
+
+ _write_progress(progress_file, 100, 100, "complete")
+
+ video_stats = VideoEmbedStats(
+ frames_modified=modified_frames,
+ total_frames=video_info.total_frames,
+ capacity_used=len(payload) / (capacity + ENCRYPTION_OVERHEAD),
+ bytes_embedded=len(payload),
+ width=video_info.width,
+ height=video_info.height,
+ fps=video_info.fps,
+ duration_seconds=video_info.duration_seconds,
+ embed_mode=EMBED_MODE_VIDEO_LSB,
+ codec=VIDEO_OUTPUT_CODEC,
+ )
+
+ debug.print(
+ f"Video LSB embedding complete: {len(stego_video)} bytes, "
+ f"{modified_frames} frames modified"
+ )
+
+ return stego_video, video_stats
+
+ except VideoCapacityError:
+ raise
+ except VideoError:
+ raise
+ except Exception as e:
+ debug.exception(e, "embed_in_video_lsb")
+ raise VideoError(f"Failed to embed data in video: {e}") from e
+
+
+# =============================================================================
+# EXTRACTION
+# =============================================================================
+
+
+@debug.time
+def extract_from_video_lsb(
+ video_data: bytes,
+ pixel_key: bytes,
+ progress_file: str | None = None,
+) -> bytes | None:
+ """
+ Extract hidden data from video using LSB steganography.
+
+ Extracts I-frames, reads LSBs from the same pseudo-random locations
+ used during embedding, and reconstructs the payload.
+
+ Args:
+ video_data: Raw bytes of the stego video file.
+ pixel_key: 32-byte key (must match the one used for embedding).
+ progress_file: Optional path for progress JSON.
+
+ Returns:
+ Extracted payload bytes (without magic/length prefix), or ``None``
+ if extraction fails (wrong key, no data, corrupted).
+ """
+ debug.print(f"Video LSB extracting from {len(video_data)} byte video")
+ debug.data(pixel_key, "Pixel key for extraction")
+
+ try:
+ # Get video info
+ video_info = get_video_info(video_data)
+ debug.print(
+ f"Video: {video_info.width}x{video_info.height}, "
+ f"{video_info.i_frame_count} I-frames"
+ )
+
+ # Extract I-frames
+ with tempfile.TemporaryDirectory(prefix="stegasoo_video_extract_") as temp_dir_str:
+ temp_dir = Path(temp_dir_str)
+
+ _write_progress(progress_file, 5, 100, "extracting_frames")
+
+ frames, _ = extract_frames(video_data, temp_dir, keyframes_only=True)
+ num_frames = len(frames)
+
+ if num_frames == 0:
+ debug.print("No I-frames found in video")
+ return None
+
+ debug.print(f"Extracted {num_frames} I-frames for extraction")
+
+ _write_progress(progress_file, 20, 100, "extracting_data")
+
+ # First, try to extract from frame 0 to get magic and total length
+ frame_path = frames[0]
+ with open(frame_path, "rb") as f:
+ frame_data = f.read()
+
+ first_chunk = _extract_lsb(frame_data, pixel_key, bits_per_channel=1)
+ if first_chunk is None or len(first_chunk) < 8:
+ debug.print("Failed to extract initial data from first frame")
+ return None
+
+ # Check magic bytes
+ magic = first_chunk[:4]
+ if magic != VIDEO_MAGIC_LSB:
+ debug.print(f"Magic mismatch: got {magic!r}, expected {VIDEO_MAGIC_LSB!r}")
+ return None
+
+ # Get total payload length
+ total_length = struct.unpack(">I", first_chunk[4:8])[0]
+ debug.print(f"Total payload length: {total_length} bytes")
+
+ # Sanity check
+ pixels_per_frame = video_info.width * video_info.height
+ bytes_per_frame = (pixels_per_frame * 3) // 8 - 4 # minus length prefix
+ max_possible = bytes_per_frame * num_frames
+
+ if total_length > max_possible or total_length < 1:
+ debug.print(f"Invalid payload length: {total_length}")
+ return None
+
+ # If the entire payload fits in the first frame, return it directly
+ # This matches the simplified single-frame embedding approach
+ if len(first_chunk) >= 8 + total_length:
+ debug.print("Payload fits in single frame, extracting directly")
+ payload = first_chunk[8 : 8 + total_length]
+ else:
+ # Multi-frame extraction
+ debug.print("Multi-frame extraction needed")
+ frames_needed = (total_length + 8 + bytes_per_frame - 1) // bytes_per_frame
+ frames_needed = min(frames_needed, num_frames)
+
+ # Extract sequentially (matching the embedding approach)
+ extracted_chunks = [first_chunk]
+ for frame_idx in range(1, frames_needed):
+ frame_path = frames[frame_idx]
+ with open(frame_path, "rb") as f:
+ frame_data = f.read()
+
+ chunk = _extract_lsb(frame_data, pixel_key, bits_per_channel=1)
+ if chunk:
+ extracted_chunks.append(chunk)
+
+ if progress_file and frame_idx % PROGRESS_INTERVAL == 0:
+ pct = 20 + int((frame_idx / frames_needed) * 70)
+ _write_progress(progress_file, pct, 100, "extracting_data")
+
+ # Combine chunks
+ combined = b"".join(extracted_chunks)
+
+ if len(combined) < 8 + total_length:
+ debug.print(
+ f"Insufficient data: have {len(combined) - 8}, need {total_length}"
+ )
+ return None
+
+ payload = combined[8 : 8 + total_length]
+
+ _write_progress(progress_file, 100, 100, "complete")
+
+ debug.print(f"Video LSB successfully extracted {len(payload)} bytes")
+ return payload
+
+ except Exception as e:
+ debug.exception(e, "extract_from_video_lsb")
+ return None
diff --git a/src/stegasoo/video_utils.py b/src/stegasoo/video_utils.py
new file mode 100644
index 0000000..2fa144e
--- /dev/null
+++ b/src/stegasoo/video_utils.py
@@ -0,0 +1,732 @@
+"""
+Stegasoo Video Utilities (v4.4.0)
+
+Video format detection, frame extraction, and transcoding for video steganography.
+
+Dependencies:
+- ffmpeg binary: Required for all video operations
+- numpy: For frame data manipulation
+- PIL/Pillow: For frame image handling
+
+Uses ffmpeg for:
+- Format detection and metadata extraction
+- I-frame extraction
+- Video reassembly with FFV1 lossless codec
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import shutil
+import subprocess
+import tempfile
+from pathlib import Path
+
+from .constants import (
+ EMBED_MODE_VIDEO_AUTO,
+ EMBED_MODE_VIDEO_LSB,
+ MAX_VIDEO_DURATION,
+ MAX_VIDEO_FILE_SIZE,
+ MAX_VIDEO_RESOLUTION,
+ MIN_VIDEO_RESOLUTION,
+ VALID_VIDEO_EMBED_MODES,
+ VIDEO_OUTPUT_CODEC,
+ VIDEO_OUTPUT_CONTAINER,
+)
+from .debug import get_logger
+from .exceptions import (
+ UnsupportedVideoFormatError,
+ VideoTranscodeError,
+ VideoValidationError,
+)
+from .models import ValidationResult, VideoCapacityInfo, VideoInfo
+
+logger = get_logger(__name__)
+
+
+# =============================================================================
+# FFMPEG AVAILABILITY
+# =============================================================================
+
+
+def has_ffmpeg_support() -> bool:
+ """Check if ffmpeg is available on the system.
+
+ Returns:
+ True if ffmpeg is found on PATH, False otherwise.
+ """
+ return shutil.which("ffmpeg") is not None
+
+
+def has_ffprobe_support() -> bool:
+ """Check if ffprobe is available on the system.
+
+ Returns:
+ True if ffprobe is found on PATH, False otherwise.
+ """
+ return shutil.which("ffprobe") is not None
+
+
+def _require_ffmpeg() -> None:
+ """Raise error if ffmpeg is not available."""
+ if not has_ffmpeg_support():
+ raise VideoTranscodeError(
+ "ffmpeg is required for video operations. Install ffmpeg on your system."
+ )
+
+
+def _require_ffprobe() -> None:
+ """Raise error if ffprobe is not available."""
+ if not has_ffprobe_support():
+ raise VideoTranscodeError(
+ "ffprobe is required for video metadata. Install ffmpeg on your system."
+ )
+
+
+# =============================================================================
+# FORMAT DETECTION
+# =============================================================================
+
+
+def detect_video_format(video_data: bytes) -> str:
+ """Detect video format from magic bytes.
+
+ Examines the first bytes of video data to identify the container format.
+
+ Magic byte signatures:
+ - MP4/M4V: b"ftyp" at offset 4
+ - MKV/WebM: b"\\x1a\\x45\\xdf\\xa3" (EBML header)
+ - AVI: b"RIFF" at offset 0 + b"AVI " at offset 8
+ - MOV: b"ftyp" with "qt" brand or b"moov"/"mdat" early
+
+ Args:
+ video_data: Raw video file bytes.
+
+ Returns:
+ Format string: "mp4", "mkv", "webm", "avi", "mov", or "unknown".
+ """
+ if len(video_data) < 12:
+ logger.debug("detect_video_format: data too short (%d bytes)", len(video_data))
+ return "unknown"
+
+ # MP4/M4V/MOV: "ftyp" atom at offset 4
+ if video_data[4:8] == b"ftyp":
+ # Check brand for specific type
+ brand = video_data[8:12]
+ if brand in (b"qt ", b"mqt "):
+ return "mov"
+ if brand in (b"isom", b"iso2", b"mp41", b"mp42", b"avc1", b"M4V "):
+ return "mp4"
+ # Default to mp4 for ftyp containers
+ return "mp4"
+
+ # MKV/WebM: EBML header
+ if video_data[:4] == b"\x1a\x45\xdf\xa3":
+ # Check doctype to distinguish MKV from WebM
+ # WebM uses "webm" doctype, MKV uses "matroska"
+ # Simple heuristic: search for doctype string in first 64 bytes
+ header = video_data[:64]
+ if b"webm" in header.lower():
+ return "webm"
+ return "mkv"
+
+ # AVI: RIFF....AVI
+ if video_data[:4] == b"RIFF" and video_data[8:12] == b"AVI ":
+ return "avi"
+
+ # MOV without ftyp (older format): check for moov/mdat atoms
+ if video_data[4:8] in (b"moov", b"mdat", b"wide", b"free"):
+ return "mov"
+
+ return "unknown"
+
+
+# =============================================================================
+# METADATA EXTRACTION
+# =============================================================================
+
+
+def get_video_info(video_data: bytes) -> VideoInfo:
+ """Extract video metadata from raw video bytes.
+
+ Uses ffprobe to extract detailed video information including
+ resolution, frame rate, duration, codec, and I-frame count.
+
+ Args:
+ video_data: Raw video file bytes.
+
+ Returns:
+ VideoInfo dataclass with video metadata.
+
+ Raises:
+ UnsupportedVideoFormatError: If the format cannot be detected.
+ VideoTranscodeError: If metadata extraction fails.
+ """
+ _require_ffprobe()
+
+ fmt = detect_video_format(video_data)
+ if fmt == "unknown":
+ raise UnsupportedVideoFormatError(
+ "Cannot detect video format. Supported: MP4, MKV, WebM, AVI, MOV."
+ )
+
+ # Write to temp file for ffprobe
+ with tempfile.NamedTemporaryFile(suffix=f".{fmt}", delete=False) as f:
+ f.write(video_data)
+ temp_path = f.name
+
+ try:
+ # Get stream info
+ result = subprocess.run(
+ [
+ "ffprobe",
+ "-v",
+ "quiet",
+ "-print_format",
+ "json",
+ "-show_format",
+ "-show_streams",
+ "-select_streams",
+ "v:0",
+ temp_path,
+ ],
+ capture_output=True,
+ text=True,
+ timeout=60,
+ )
+
+ if result.returncode != 0:
+ raise VideoTranscodeError(f"ffprobe failed: {result.stderr}")
+
+ info = json.loads(result.stdout)
+
+ # Extract video stream info
+ if not info.get("streams"):
+ raise VideoTranscodeError("No video stream found in file")
+
+ stream = info["streams"][0]
+ format_info = info.get("format", {})
+
+ width = int(stream.get("width", 0))
+ height = int(stream.get("height", 0))
+ codec = stream.get("codec_name", "unknown")
+
+ # Parse frame rate (can be "30/1" or "29.97")
+ fps_str = stream.get("r_frame_rate", "0/1")
+ if "/" in fps_str:
+ num, den = fps_str.split("/")
+ fps = float(num) / float(den) if float(den) > 0 else 0.0
+ else:
+ fps = float(fps_str)
+
+ # Get duration
+ duration = float(stream.get("duration", format_info.get("duration", 0)))
+
+ # Get total frames
+ nb_frames = stream.get("nb_frames")
+ if nb_frames:
+ total_frames = int(nb_frames)
+ else:
+ # Estimate from duration and fps
+ total_frames = int(duration * fps) if fps > 0 else 0
+
+ # Get bitrate
+ bitrate = None
+ if format_info.get("bit_rate"):
+ bitrate = int(format_info["bit_rate"])
+
+ # Count I-frames using ffprobe
+ i_frame_count = _count_i_frames(temp_path, timeout=120)
+
+ return VideoInfo(
+ width=width,
+ height=height,
+ fps=fps,
+ duration_seconds=duration,
+ total_frames=total_frames,
+ i_frame_count=i_frame_count,
+ format=fmt,
+ codec=codec,
+ bitrate=bitrate,
+ )
+
+ except json.JSONDecodeError as e:
+ raise VideoTranscodeError(f"Failed to parse ffprobe output: {e}")
+ except subprocess.TimeoutExpired:
+ raise VideoTranscodeError("ffprobe timed out")
+ finally:
+ os.unlink(temp_path)
+
+
+def _count_i_frames(video_path: str, timeout: int = 120) -> int:
+ """Count I-frames (keyframes) in a video file.
+
+ Args:
+ video_path: Path to video file.
+ timeout: Maximum time in seconds.
+
+ Returns:
+ Number of I-frames in the video.
+ """
+ try:
+ result = subprocess.run(
+ [
+ "ffprobe",
+ "-v",
+ "quiet",
+ "-select_streams",
+ "v:0",
+ "-show_entries",
+ "frame=pict_type",
+ "-of",
+ "csv=p=0",
+ video_path,
+ ],
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ )
+
+ if result.returncode != 0:
+ logger.warning("Failed to count I-frames: %s", result.stderr)
+ return 0
+
+ # Count lines containing 'I'
+ return sum(1 for line in result.stdout.strip().split("\n") if line.strip() == "I")
+
+ except subprocess.TimeoutExpired:
+ logger.warning("I-frame counting timed out")
+ return 0
+ except Exception as e:
+ logger.warning("I-frame counting failed: %s", e)
+ return 0
+
+
+# =============================================================================
+# FRAME EXTRACTION
+# =============================================================================
+
+
+def extract_frames(
+ video_data: bytes,
+ output_dir: Path | None = None,
+ keyframes_only: bool = True,
+) -> tuple[list[Path], VideoInfo]:
+ """Extract frames from video as PNG images.
+
+ Uses ffmpeg to extract frames from the video. By default extracts only
+ I-frames (keyframes) which are more robust to re-encoding.
+
+ Args:
+ video_data: Raw video file bytes.
+ output_dir: Directory to save frames (temp dir if None).
+ keyframes_only: If True, only extract I-frames (keyframes).
+
+ Returns:
+ Tuple of (list of frame paths sorted by frame number, VideoInfo).
+
+ Raises:
+ VideoTranscodeError: If frame extraction fails.
+ """
+ _require_ffmpeg()
+
+ fmt = detect_video_format(video_data)
+ if fmt == "unknown":
+ raise UnsupportedVideoFormatError(
+ "Cannot detect video format. Supported: MP4, MKV, WebM, AVI, MOV."
+ )
+
+ # Get video info first
+ video_info = get_video_info(video_data)
+
+ # Create output directory
+ if output_dir is None:
+ output_dir = Path(tempfile.mkdtemp(prefix="stegasoo_frames_"))
+ else:
+ output_dir = Path(output_dir)
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Write video to temp file
+ with tempfile.NamedTemporaryFile(suffix=f".{fmt}", delete=False) as f:
+ f.write(video_data)
+ video_path = f.name
+
+ try:
+ # Build ffmpeg command
+ cmd = [
+ "ffmpeg",
+ "-i",
+ video_path,
+ "-vsync",
+ "0",
+ ]
+
+ if keyframes_only:
+ # Extract only I-frames
+ cmd.extend(["-vf", "select='eq(pict_type,I)'"])
+
+ # Output as PNG with frame number
+ output_pattern = str(output_dir / "frame_%06d.png")
+ cmd.extend(["-start_number", "0", output_pattern])
+
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=600, # 10 minute timeout
+ )
+
+ if result.returncode != 0:
+ raise VideoTranscodeError(f"Frame extraction failed: {result.stderr}")
+
+ # Collect extracted frames
+ frames = sorted(output_dir.glob("frame_*.png"))
+
+ if not frames:
+ raise VideoTranscodeError("No frames were extracted from video")
+
+ logger.info(
+ "Extracted %d %s from video",
+ len(frames),
+ "I-frames" if keyframes_only else "frames",
+ )
+
+ return frames, video_info
+
+ except subprocess.TimeoutExpired:
+ raise VideoTranscodeError("Frame extraction timed out")
+ finally:
+ os.unlink(video_path)
+
+
+# =============================================================================
+# VIDEO REASSEMBLY
+# =============================================================================
+
+
+def reassemble_video(
+ frames: list[Path],
+ original_video_data: bytes,
+ output_path: Path | None = None,
+ fps: float | None = None,
+ audio_data: bytes | None = None,
+) -> bytes:
+ """Reassemble frames back into a video file.
+
+ Creates a new video from the modified frames using FFV1 lossless codec
+ in an MKV container. This preserves the embedded data perfectly.
+
+ Args:
+ frames: List of frame image paths in order.
+ original_video_data: Original video bytes (for audio track extraction).
+ output_path: Optional output path (temp file if None).
+ fps: Frame rate (auto-detected from original if None).
+ audio_data: Optional audio track data to mux in.
+
+ Returns:
+ Video file bytes (MKV container with FFV1 codec).
+
+ Raises:
+ VideoTranscodeError: If reassembly fails.
+ """
+ _require_ffmpeg()
+
+ if not frames:
+ raise VideoTranscodeError("No frames provided for reassembly")
+
+ # Get original video format
+ fmt = detect_video_format(original_video_data)
+
+ if fps is None:
+ # Use a fixed low framerate for I-frame sequences
+ # since I-frames are sparse (typically 1 per 30-60 frames)
+ fps = 1.0 # 1 fps for I-frame only videos
+
+ # Create temp directory for work
+ with tempfile.TemporaryDirectory(prefix="stegasoo_reassemble_") as temp_dir_str:
+ temp_dir = Path(temp_dir_str)
+
+ # Write original video for audio extraction
+ original_path = temp_dir / f"original.{fmt}"
+ original_path.write_bytes(original_video_data)
+
+ # Create frame list file for ffmpeg
+ frame_list = temp_dir / "frames.txt"
+ with open(frame_list, "w") as f:
+ for frame in frames:
+ # FFmpeg concat format
+ f.write(f"file '{frame.absolute()}'\n")
+ f.write(f"duration {1.0 / fps}\n")
+
+ # Output path
+ if output_path is None:
+ output_file = temp_dir / f"output.{VIDEO_OUTPUT_CONTAINER}"
+ else:
+ output_file = Path(output_path)
+
+ # Build ffmpeg command
+ cmd = [
+ "ffmpeg",
+ "-y", # Overwrite output
+ "-f",
+ "concat",
+ "-safe",
+ "0",
+ "-i",
+ str(frame_list),
+ ]
+
+ # Add audio from original video if available
+ # Check if original has audio
+ has_audio = _video_has_audio(original_path)
+ if has_audio:
+ cmd.extend(["-i", str(original_path)])
+
+ # Video encoding settings (FFV1 lossless)
+ cmd.extend(
+ [
+ "-c:v",
+ VIDEO_OUTPUT_CODEC,
+ "-level",
+ "3", # FFV1 level 3 for better compression
+ "-coder",
+ "1", # Range coder
+ "-context",
+ "1", # Large context
+ "-slicecrc",
+ "1", # Error detection
+ ]
+ )
+
+ # Audio settings
+ if has_audio:
+ cmd.extend(
+ [
+ "-map",
+ "0:v", # Video from frames
+ "-map",
+ "1:a?", # Audio from original (if exists)
+ "-c:a",
+ "copy", # Copy audio without re-encoding
+ ]
+ )
+
+ cmd.append(str(output_file))
+
+ logger.debug("Running ffmpeg: %s", " ".join(cmd))
+
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=600,
+ )
+
+ if result.returncode != 0:
+ raise VideoTranscodeError(f"Video reassembly failed: {result.stderr}")
+
+ # Read output
+ return output_file.read_bytes()
+
+
+def _video_has_audio(video_path: Path) -> bool:
+ """Check if a video file has an audio stream.
+
+ Args:
+ video_path: Path to video file.
+
+ Returns:
+ True if video has audio, False otherwise.
+ """
+ try:
+ result = subprocess.run(
+ [
+ "ffprobe",
+ "-v",
+ "quiet",
+ "-select_streams",
+ "a:0",
+ "-show_entries",
+ "stream=index",
+ "-of",
+ "csv=p=0",
+ str(video_path),
+ ],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+ return bool(result.stdout.strip())
+ except Exception:
+ return False
+
+
+# =============================================================================
+# VALIDATION
+# =============================================================================
+
+
+def validate_video(
+ video_data: bytes,
+ name: str = "Video",
+ check_duration: bool = True,
+) -> ValidationResult:
+ """Validate video data for steganography.
+
+ Checks:
+ - Not empty
+ - Not too large (MAX_VIDEO_FILE_SIZE)
+ - Valid video format (detectable via magic bytes)
+ - Duration within limits (MAX_VIDEO_DURATION) if check_duration=True
+ - Resolution within limits (MIN/MAX_VIDEO_RESOLUTION)
+
+ Args:
+ video_data: Raw video file bytes.
+ name: Descriptive name for error messages (default: "Video").
+ check_duration: Whether to enforce duration limit (default: True).
+
+ Returns:
+ ValidationResult with video info in details on success.
+ """
+ if not video_data:
+ return ValidationResult.error(f"{name} is required")
+
+ if len(video_data) > MAX_VIDEO_FILE_SIZE:
+ size_gb = len(video_data) / (1024**3)
+ max_gb = MAX_VIDEO_FILE_SIZE / (1024**3)
+ return ValidationResult.error(
+ f"{name} too large ({size_gb:.1f} GB). Maximum: {max_gb:.0f} GB"
+ )
+
+ # Detect format
+ fmt = detect_video_format(video_data)
+ if fmt == "unknown":
+ return ValidationResult.error(
+ f"Could not detect {name} format. " "Supported formats: MP4, MKV, WebM, AVI, MOV."
+ )
+
+ # Check ffmpeg availability
+ if not has_ffmpeg_support():
+ return ValidationResult.error(
+ "ffmpeg is required for video processing. Please install ffmpeg."
+ )
+
+ # Extract metadata for further validation
+ try:
+ info = get_video_info(video_data)
+ except (VideoTranscodeError, UnsupportedVideoFormatError) as e:
+ return ValidationResult.error(f"Could not read {name}: {e}")
+ except Exception as e:
+ return ValidationResult.error(f"Could not read {name}: {e}")
+
+ # Check duration
+ if check_duration and info.duration_seconds > MAX_VIDEO_DURATION:
+ return ValidationResult.error(
+ f"{name} too long ({info.duration_seconds:.1f}s). "
+ f"Maximum: {MAX_VIDEO_DURATION}s ({MAX_VIDEO_DURATION // 60} minutes)"
+ )
+
+ # Check resolution
+ if info.width < MIN_VIDEO_RESOLUTION[0] or info.height < MIN_VIDEO_RESOLUTION[1]:
+ return ValidationResult.error(
+ f"{name} resolution too small ({info.width}x{info.height}). "
+ f"Minimum: {MIN_VIDEO_RESOLUTION[0]}x{MIN_VIDEO_RESOLUTION[1]}"
+ )
+
+ if info.width > MAX_VIDEO_RESOLUTION[0] or info.height > MAX_VIDEO_RESOLUTION[1]:
+ return ValidationResult.error(
+ f"{name} resolution too large ({info.width}x{info.height}). "
+ f"Maximum: {MAX_VIDEO_RESOLUTION[0]}x{MAX_VIDEO_RESOLUTION[1]}"
+ )
+
+ # Check I-frame count
+ if info.i_frame_count < 1:
+ return ValidationResult.error(f"{name} has no I-frames (keyframes) for embedding")
+
+ return ValidationResult.ok(
+ width=info.width,
+ height=info.height,
+ fps=info.fps,
+ duration=info.duration_seconds,
+ total_frames=info.total_frames,
+ i_frame_count=info.i_frame_count,
+ format=info.format,
+ codec=info.codec,
+ bitrate=info.bitrate,
+ )
+
+
+def require_valid_video(video_data: bytes, name: str = "Video") -> None:
+ """Validate video, raising VideoValidationError on failure.
+
+ Args:
+ video_data: Raw video file bytes.
+ name: Descriptive name for error messages.
+
+ Raises:
+ VideoValidationError: If validation fails.
+ """
+ result = validate_video(video_data, name)
+ if not result.is_valid:
+ raise VideoValidationError(result.error_message)
+
+
+def validate_video_embed_mode(mode: str) -> ValidationResult:
+ """Validate video embedding mode string.
+
+ Args:
+ mode: Embedding mode to validate.
+
+ Returns:
+ ValidationResult with mode in details on success.
+ """
+ valid_modes = VALID_VIDEO_EMBED_MODES | {EMBED_MODE_VIDEO_AUTO}
+ if mode not in valid_modes:
+ return ValidationResult.error(
+ f"Invalid video embed_mode: '{mode}'. "
+ f"Valid options: {', '.join(sorted(valid_modes))}"
+ )
+ return ValidationResult.ok(mode=mode)
+
+
+# =============================================================================
+# CAPACITY CALCULATION
+# =============================================================================
+
+
+def calculate_video_capacity(video_data: bytes, embed_mode: str = EMBED_MODE_VIDEO_LSB) -> VideoCapacityInfo:
+ """Calculate steganographic capacity for a video file.
+
+ Capacity is based on I-frames only (keyframes). Each I-frame provides
+ capacity similar to an image of the same dimensions.
+
+ Args:
+ video_data: Raw video file bytes.
+ embed_mode: Embedding mode (currently only video_lsb).
+
+ Returns:
+ VideoCapacityInfo with capacity details.
+ """
+ info = get_video_info(video_data)
+
+ # Calculate capacity per I-frame
+ # RGB image: 3 bits per pixel (1 bit per channel) / 8 = 0.375 bytes per pixel
+ # Subtract overhead per frame for header
+ pixels_per_frame = info.width * info.height
+ bytes_per_frame = (pixels_per_frame * 3) // 8 # 3 bits per pixel
+
+ # Total capacity across all I-frames
+ # Subtract 70 bytes overhead for the encrypted payload header
+ from .steganography import ENCRYPTION_OVERHEAD
+
+ total_capacity = (bytes_per_frame * info.i_frame_count) - ENCRYPTION_OVERHEAD
+
+ return VideoCapacityInfo(
+ total_frames=info.total_frames,
+ i_frames=info.i_frame_count,
+ usable_capacity_bytes=max(0, total_capacity),
+ embed_mode=embed_mode,
+ resolution=(info.width, info.height),
+ duration_seconds=info.duration_seconds,
+ )
diff --git a/tests/test_stegasoo.py b/tests/test_stegasoo.py
index 3104eaf..1f8c6fc 100644
--- a/tests/test_stegasoo.py
+++ b/tests/test_stegasoo.py
@@ -451,3 +451,231 @@ class TestEdgeCases:
)
assert decoded.message == special_msg
+
+
+# =============================================================================
+# VIDEO STEGANOGRAPHY TESTS (v4.4.0)
+# =============================================================================
+
+
+@pytest.fixture
+def test_video_bytes():
+ """Create a minimal test video using ffmpeg.
+
+ Creates a 2-second test video with solid color frames.
+ Returns None if ffmpeg is not available.
+ """
+ import shutil
+ import subprocess
+ import tempfile
+
+ if not shutil.which("ffmpeg"):
+ return None
+
+ with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
+ output_path = f.name
+
+ try:
+ # Create a simple 2-second video with colored frames
+ # Using lavfi (libavfilter) to generate test pattern
+ result = subprocess.run(
+ [
+ "ffmpeg",
+ "-y",
+ "-f",
+ "lavfi",
+ "-i",
+ "color=c=blue:s=320x240:d=2:r=10",
+ "-c:v",
+ "libx264",
+ "-pix_fmt",
+ "yuv420p",
+ "-g",
+ "5", # GOP size - creates I-frames every 5 frames
+ output_path,
+ ],
+ capture_output=True,
+ timeout=30,
+ )
+
+ if result.returncode != 0:
+ return None
+
+ with open(output_path, "rb") as f:
+ video_data = f.read()
+
+ return video_data
+ except Exception:
+ return None
+ finally:
+ import os
+
+ try:
+ os.unlink(output_path)
+ except OSError:
+ pass
+
+
+class TestVideoSupport:
+ """Test video steganography support detection."""
+
+ def test_video_support_flag_exists(self):
+ """HAS_VIDEO_SUPPORT flag should exist."""
+ assert hasattr(stegasoo, "HAS_VIDEO_SUPPORT")
+ assert isinstance(stegasoo.HAS_VIDEO_SUPPORT, bool)
+
+ def test_video_constants_exist(self):
+ """Video-related constants should exist."""
+ assert hasattr(stegasoo, "EMBED_MODE_VIDEO_LSB")
+ assert hasattr(stegasoo, "EMBED_MODE_VIDEO_AUTO")
+
+
+@pytest.mark.skipif(
+ not stegasoo.HAS_VIDEO_SUPPORT,
+ reason="Video support not available (ffmpeg or dependencies missing)",
+)
+class TestVideoFormatDetection:
+ """Test video format detection."""
+
+ def test_detect_video_format_mp4(self, test_video_bytes):
+ """Should detect MP4 format from magic bytes."""
+ if test_video_bytes is None:
+ pytest.skip("Could not create test video")
+
+ from stegasoo import detect_video_format
+
+ fmt = detect_video_format(test_video_bytes)
+ assert fmt in ("mp4", "mov")
+
+ def test_detect_video_format_unknown(self):
+ """Should return 'unknown' for non-video data."""
+ from stegasoo import detect_video_format
+
+ fmt = detect_video_format(b"not a video")
+ assert fmt == "unknown"
+
+
+@pytest.mark.skipif(
+ not stegasoo.HAS_VIDEO_SUPPORT,
+ reason="Video support not available (ffmpeg or dependencies missing)",
+)
+class TestVideoInfo:
+ """Test video metadata extraction."""
+
+ def test_get_video_info(self, test_video_bytes):
+ """Should extract video metadata."""
+ if test_video_bytes is None:
+ pytest.skip("Could not create test video")
+
+ from stegasoo import get_video_info
+
+ info = get_video_info(test_video_bytes)
+
+ assert info.width == 320
+ assert info.height == 240
+ assert info.fps > 0
+ assert info.duration_seconds > 0
+ assert info.total_frames > 0
+ assert info.format in ("mp4", "mov")
+
+ def test_validate_video(self, test_video_bytes):
+ """Should validate video data."""
+ if test_video_bytes is None:
+ pytest.skip("Could not create test video")
+
+ from stegasoo import validate_video
+
+ result = validate_video(test_video_bytes, check_duration=False)
+
+ assert result.is_valid
+ assert result.details.get("format") in ("mp4", "mov")
+
+
+@pytest.mark.skipif(
+ not stegasoo.HAS_VIDEO_SUPPORT,
+ reason="Video support not available (ffmpeg or dependencies missing)",
+)
+class TestVideoCapacity:
+ """Test video capacity calculation."""
+
+ def test_calculate_video_capacity(self, test_video_bytes):
+ """Should calculate steganographic capacity."""
+ if test_video_bytes is None:
+ pytest.skip("Could not create test video")
+
+ from stegasoo import calculate_video_capacity
+
+ capacity_info = calculate_video_capacity(test_video_bytes)
+
+ assert capacity_info.total_frames > 0
+ assert capacity_info.i_frames > 0
+ assert capacity_info.usable_capacity_bytes > 0
+ assert capacity_info.embed_mode == "video_lsb"
+ assert capacity_info.resolution == (320, 240)
+
+
+@pytest.mark.skipif(
+ not stegasoo.HAS_VIDEO_SUPPORT,
+ reason="Video support not available (ffmpeg or dependencies missing)",
+)
+class TestVideoEncodeDecode:
+ """Test video steganography round-trip."""
+
+ def test_video_roundtrip(self, test_video_bytes, ref_bytes):
+ """Test encoding and decoding a message in video."""
+ if test_video_bytes is None:
+ pytest.skip("Could not create test video")
+
+ from stegasoo import decode_video, encode_video
+
+ message = "Secret video message!"
+
+ # Encode
+ stego_video, stats = encode_video(
+ message=message,
+ reference_photo=ref_bytes,
+ carrier_video=test_video_bytes,
+ passphrase=TEST_PASSPHRASE,
+ pin=TEST_PIN,
+ )
+
+ assert stego_video
+ assert len(stego_video) > 0
+ assert stats.frames_modified > 0
+ assert stats.codec == "ffv1" # Should use lossless codec
+
+ # Decode
+ result = decode_video(
+ stego_video=stego_video,
+ reference_photo=ref_bytes,
+ passphrase=TEST_PASSPHRASE,
+ pin=TEST_PIN,
+ )
+
+ assert result.is_text
+ assert result.message == message
+
+ def test_video_wrong_passphrase_fails(self, test_video_bytes, ref_bytes):
+ """Decoding with wrong passphrase should fail."""
+ if test_video_bytes is None:
+ pytest.skip("Could not create test video")
+
+ from stegasoo import decode_video, encode_video
+
+ message = "Secret video message!"
+
+ stego_video, _ = encode_video(
+ message=message,
+ reference_photo=ref_bytes,
+ carrier_video=test_video_bytes,
+ passphrase=TEST_PASSPHRASE,
+ pin=TEST_PIN,
+ )
+
+ with pytest.raises(Exception):
+ decode_video(
+ stego_video=stego_video,
+ reference_photo=ref_bytes,
+ passphrase="wrong passphrase words here",
+ pin=TEST_PIN,
+ )