From 14fce4d3ede1efe7ad3bb01d29a210c8a8de3804 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Tue, 24 Mar 2026 16:00:30 -0400 Subject: [PATCH] More video work, planning, etc. -- Need to mark things EXPERIMENTAL. --- .gitignore | 3 + IdeasScout_PLANS_20260324.md | 294 +++++++++++ rpi/flash-stock-img.sh | 30 +- rpi/pull-image.sh | 146 +++--- rpi/train_proj.json | 13 + src/stegasoo/__init__.py | 56 ++- src/stegasoo/cli.py | 404 +++++++++++++++ src/stegasoo/constants.py | 24 + src/stegasoo/decode.py | 97 ++++ src/stegasoo/encode.py | 101 +++- src/stegasoo/exceptions.py | 48 ++ src/stegasoo/models.py | 53 ++ src/stegasoo/video_steganography.py | 496 +++++++++++++++++++ src/stegasoo/video_utils.py | 732 ++++++++++++++++++++++++++++ tests/test_stegasoo.py | 228 +++++++++ 15 files changed, 2641 insertions(+), 84 deletions(-) create mode 100644 IdeasScout_PLANS_20260324.md create mode 100644 rpi/train_proj.json create mode 100644 src/stegasoo/video_steganography.py create mode 100644 src/stegasoo/video_utils.py diff --git a/.gitignore b/.gitignore index 6bc3f13..54d16c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Embedded repos (AUR packaging) +aur-cli-upload/ + # Python __pycache__/ *.py[cod] diff --git a/IdeasScout_PLANS_20260324.md b/IdeasScout_PLANS_20260324.md new file mode 100644 index 0000000..c714006 --- /dev/null +++ b/IdeasScout_PLANS_20260324.md @@ -0,0 +1,294 @@ +# Stegasoo Ideas Scout — Implementation Plans (2026-03-24) + +Baseline: v4.3.0, Python >=3.11, FORMAT_VERSION 5, no existing users (no backward compat constraints). + +--- + +## Tier 1 — Quick Wins + +### 1. Platform-Calibrated DCT Presets + +**Description**: `--platform telegram|discord|signal|whatsapp` flag for DCT encode. Bakes in each platform's known recompression parameters. Pre-verifies payload survives before outputting. + +**Implementation approach**: +- New file `src/stegasoo/platform_presets.py` — `PlatformPreset` dataclass + `PRESETS` dict mapping platform → tuned `quant_step`, `jpeg_quality`, `embed_positions`, `max_dimension`, `recompress_quality` +- `dct_steganography.py`: `_embed_scipy_dct_safe()` / `_embed_jpegio()` accept optional preset overrides for `QUANT_STEP`, `DEFAULT_EMBED_POSITIONS`, output quality +- New `pre_verify_survival()` function: encode → re-save at platform quality → extract → pass/fail +- Thread `platform` param through `encode.py` → `steganography.py` → DCT functions +- `cli.py`: add `--platform` as `click.Choice` + `--verify/--no-verify` (pre-verification doubles encode time) +- LSB + `--platform` should error early — LSB data is destroyed by any JPEG recompression + +**Known platform params** (from research): +| Platform | Quality | Max Dimension | Notes | +|----------|---------|---------------|-------| +| Telegram | ~82 | 2560×2560 | ~81KB embeddable | +| Discord | ~85 | Varies (Nitro) | | +| Signal | ~80 | Aggressive | | +| WhatsApp | ~70 | 1600×1600 | Most lossy | + +**Go/No-Go metrics**: +- >95% payload survival rate per platform at 1KB message size in automated tests +- Pre-verification correctly predicts real platform behavior (manual validation per platform at least once) + +**Complexity**: **M** — new file + parameter threading through 4-5 functions + +**Risks**: Platform params change without notice. Add version/date stamps to presets and a `stegasoo tools verify-platform` test command. + +--- + +### 2. Steganalysis Self-Check (`stegasoo check`) + +**Description**: New CLI command running chi-square and RS (Regular-Singular) statistical analysis on stego images. Outputs detectability risk level (low/medium/high). + +**Implementation approach**: +- New file `src/stegasoo/steganalysis.py`: + - `chi_square_analysis(image_data) -> float` — chi-square statistic on LSB distribution per channel + - `rs_analysis(image_data) -> float` — Regular-Singular groups analysis (requires numpy) + - `assess_risk(chi_p, rs_estimate) -> str` — maps to "low"/"medium"/"high" + - `check_image(image_data) -> dict` — orchestrator +- `cli.py`: new `@cli.command("check")` with `IMAGE` arg, `--json`, `--mode lsb|dct|auto` +- `constants.py`: threshold constants for chi-square p-value and RS boundaries +- `__init__.py`: export `check_image` in `__all__` +- Start LSB-only; DCT steganalysis (calibration attack) deferred + +**Go/No-Go metrics**: +- Clean images → consistently "low risk" +- Naive sequential LSB → "high risk" +- Stegasoo LSB at <50% capacity → "low" or "medium" + +**Complexity**: **M** — ~150 lines numpy per test, straightforward CLI integration + +--- + +### 3. Python 3.13 DCT Cleanup + +**Description**: The `jpegio` → `jpeglib` migration is already done in code. Remaining work: rename stale `jpegio` references and verify on 3.13. + +**Implementation approach**: +- `dct_steganography.py`: rename `HAS_JPEGIO` → `HAS_JPEGLIB`, `_jpegio_*` functions → `_jpeglib_*`, update constant names (`JPEGIO_MAGIC` → `JPEGLIB_MAGIC`, etc.) +- Verify `jpeglib.to_jpegio()` compatibility shim — if jpeglib plans to deprecate it, migrate to native API +- Run full test suite on Python 3.13 + +**Go/No-Go metrics**: +- All DCT tests pass on Python 3.13 +- No deprecation warnings from jpeglib + +**Complexity**: **S** — renaming and verification only + +--- + +## Tier 2 — Strategic + +### 4. Content-Adaptive Embedding (S-UNIWARD/WOW-inspired) + +**Description**: Replace uniform-random pixel selection with texture-weighted cost functions. Embed preferentially in busy/textured regions where changes are least detectable. 3-5x harder to detect statistically. + +**Implementation approach**: +- New file `src/stegasoo/adaptive_cost.py`: + - `compute_cost_map(image_data) -> np.ndarray` — per-pixel distortion cost via directional high-pass filters (Daubechets wavelet bank / KB filter) + - `select_pixels_by_cost(cost_map, pixel_key, num_needed) -> list[int]` — weighted sampling, still ChaCha20-seeded for determinism +- `steganography.py`: + - `generate_pixel_indices()`: add `cost_map` param, use weighted sampling when provided + - `_embed_lsb()`: compute cost map when adaptive mode enabled + - `_extract_lsb()`: must compute identical cost map to find same pixels +- `dct_steganography.py`: adapt `DEFAULT_EMBED_POSITIONS` per-block based on block texture energy +- Thread `adaptive: bool` through `encode.py`/`decode.py` +- `constants.py`: add `EMBED_MODE_ADAPTIVE_LSB`, filter kernels, cost thresholds + +**Go/No-Go metrics**: +- Chi-square test (Feature 2) shows measurable improvement vs uniform-random +- **Critical**: cost map computation is deterministic across platforms (quantize to fixed-point integers) +- Round-trip decode succeeds on Linux x86, Linux ARM, macOS + +**Complexity**: **L** — novel algorithm, cross-platform determinism requirement, touches core embedding + +**Risks**: Floating-point differences in wavelet computation could break extraction. Mitigate with integer quantization. Increases encode/decode time ~2-3x. + +--- + +### 5. Per-Message Forward Secrecy via HKDF + +**Description**: Derive ephemeral per-message encryption keys using HKDF expansion from the Argon2id root key + random nonce. Compromising one message doesn't reveal others. + +**Implementation approach**: +- `crypto.py`: + - Add `from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand` + - `derive_message_key(root_key, nonce) -> bytes` — HKDF-Expand with SHA-256 + - `encrypt_message()`: generate 16-byte random nonce, derive per-message key, embed nonce in header + - `decrypt_message()`: extract nonce, derive same key + - Also derive pixel selection key via HKDF with different `info` param +- `constants.py`: + - Bump `FORMAT_VERSION` to 6 + - `HKDF_INFO_ENCRYPTION = b"stegasoo-v6-encrypt"`, `HKDF_INFO_PIXEL = b"stegasoo-v6-pixel"` + - `MESSAGE_NONCE_SIZE = 16` +- Header grows from 66 → 82 bytes: add `message_nonce(16)` field +- Update `HEADER_OVERHEAD` / `ENCRYPTION_OVERHEAD` in `steganography.py` + +**Go/No-Go metrics**: +- Two messages with identical credentials produce different ciphertexts and different pixel locations +- `cryptography` library HKDF works with existing Argon2id output + +**Complexity**: **M** — well-defined crypto change, touches security-critical header format + +--- + +### 6. PWA Mobile Interface + +**Description**: Convert Flask Web UI to Progressive Web App. Mobile-optimized, installable, offline-capable static pages. + +**Implementation approach**: +- New files in `frontends/web/static/`: `manifest.json`, `sw.js`, icon set (192×192, 512×512) +- Base template: add manifest link, theme-color meta, viewport meta, service worker registration +- `app.py`: serve manifest with correct MIME, add cache headers for static assets +- Responsive CSS for encode/decode accordion forms +- Camera capture: `` for reference photo +- Service worker caches static assets only — NOT encode/decode API endpoints + +**Go/No-Go metrics**: +- Lighthouse PWA score >= 90 +- Installable on Android Chrome and iOS Safari +- Offline: static pages load, encode/decode shows graceful "offline" message + +**Complexity**: **M** — frontend only, no core library changes + +**Risks**: Camera capture requires HTTPS (already supported via `ssl_utils.py`). + +--- + +## Tier 3 — Moonshot + +### 7. Plausible Deniability / Dual-Payload Mode + +**Description**: Two independent encrypted payloads in one carrier, each with different credentials. Reveal decoy under coercion; real payload stays hidden. + +**Implementation approach**: +- New file `src/stegasoo/dual_payload.py`: + - `encode_dual(message_a, message_b, carrier, creds_a, creds_b)` + - Partition available pixels into two disjoint pools using different seeds + - **Critical**: ALL images (single or dual) must fill unused pixel pool with random data so single-payload and dual-payload images are indistinguishable +- `steganography.py`: `generate_pixel_indices()` gets `exclude_indices` param +- `decode.py`: each credential set finds a different valid payload; wrong credentials produce garbage +- CLI + Web UI: dual-payload encode workflow + +**Go/No-Go metrics**: +- Single-payload and dual-payload images are statistically indistinguishable (chi-square can't differentiate) +- Each payload decodes independently +- Wrong credentials for one payload don't reveal other payload's existence + +**Complexity**: **XL** — novel design, halves capacity per payload, challenging UX, needs rigorous security analysis + +**Dependencies**: Feature 2 (validation), Feature 4 (detectability reduction) + +--- + +## Architectural Improvements + +### 8. EmbeddingBackend Protocol + +**Description**: Typed plugin interface for all embedding algorithms. Replace if/elif dispatch in `steganography.py` with a registry. + +**Implementation approach**: +- New package `src/stegasoo/backends/`: + - `protocol.py` — `EmbeddingBackend(Protocol)` with `embed()`, `extract()`, `calculate_capacity()`, `is_available()` + - `lsb.py`, `dct.py` — wrap existing functions + - `registry.py` — `BackendRegistry` mapping mode strings to backends +- `steganography.py`: `embed_in_image()` / `extract_from_image()` dispatch via registry +- `__init__.py`: export protocol and `register_backend()` + +**Complexity**: **M** — implement before Features 4 and 7 (they become new backends) + +--- + +### 9. HKDF Key Separation + +Subsumed by Feature 5. The HKDF expansion provides: +- Encryption key: `HKDF-Expand(root_key, info="stegasoo-encrypt", nonce)` +- Pixel selection key: `HKDF-Expand(root_key, info="stegasoo-pixel", nonce)` +- Future: MAC key, padding key, etc. + +--- + +### 10. `[core]` Extra with Minimal Deps + +**Description**: Move Pillow to `[image]` extra, base deps = `cryptography` + `argon2-cffi` + `zstandard` only. + +**Complexity**: **S** — but Pillow is used in `crypto.py` for photo hashing (core to security model). Only worth it with a concrete headless use case. **Low priority.** + +--- + +## Ecosystem Features + +### 11. Aletheia Integration + +Optional `--engine aletheia` backend for Feature 2's `stegasoo check`. BSD-licensed, provides SPA/RS/WS attacks + ML classifiers. **Complexity: S** (after Feature 2). **Depends on**: Feature 2. + +### 12. C2PA/AI Provenance Watermarking + +Embed C2PA metadata alongside stego payloads. **Complexity: L** — C2PA is a complex standard. Potentially conflicts with stego goals (adds detectable metadata). Research-heavy. + +### 13. Signal/Matrix Bot + +Bot that decodes stego images in a channel using configured channel key. **Complexity: M** — integration work, uses existing `decode()` API. + +### 14. Homebrew Tap + Nix Flake + +Package distribution for macOS/NixOS. **Complexity: S** — packaging only, no code changes. + +--- + +## Summary Table + +| # | Feature | Tier | Size | Dependencies | Primary Files | +|---|---------|------|------|-------------|---------------| +| 1 | Platform DCT Presets | T1 | M | — | new `platform_presets.py`, `dct_steganography.py`, `encode.py`, `cli.py` | +| 2 | Steganalysis Self-Check | T1 | M | — | new `steganalysis.py`, `cli.py`, `constants.py` | +| 3 | Python 3.13 DCT Cleanup | T1 | S | — | `dct_steganography.py` | +| 4 | Content-Adaptive Embedding | T2 | L | numpy, #2 | new `adaptive_cost.py`, `steganography.py`, `constants.py` | +| 5 | HKDF Forward Secrecy | T2 | M | — | `crypto.py`, `constants.py`, `steganography.py` | +| 6 | PWA Mobile Interface | T2 | M | — | `frontends/web/` templates + static | +| 7 | Dual-Payload Mode | T3 | XL | #2, #4 | new `dual_payload.py`, `steganography.py`, `cli.py` | +| 8 | EmbeddingBackend Protocol | Arch | M | — | new `backends/` package, `steganography.py` | +| 9 | HKDF Key Separation | Arch | — | Included in #5 | `crypto.py` | +| 10 | `[core]` Extra | Arch | S | — | `pyproject.toml` | +| 11 | Aletheia Integration | Eco | S | #2 | `steganalysis.py` | +| 12 | C2PA Watermarking | Eco | L | — | new module | +| 13 | Signal/Matrix Bot | Eco | M | — | new `bots/` package | +| 14 | Homebrew + Nix | Eco | S | — | packaging files only | + +--- + +## Suggested Roadmap + +### Phase 1 — Foundations (v4.4.0) + +1. **#3** Python 3.13 DCT Cleanup (S) — unblocks CI on 3.13 +2. **#8** EmbeddingBackend Protocol (M) — architectural cleanup before new embedding work +3. **#2** Steganalysis Self-Check (M) — validation tooling for everything that follows + +### Phase 2 — Security & Robustness (v4.5.0) + +4. **#5** HKDF Forward Secrecy (M) — FORMAT_VERSION bump to 6, improved crypto +5. **#1** Platform-Calibrated DCT Presets (M) — high user value for social media +6. **#14** Homebrew + Nix (S) — distribution expansion + +### Phase 3 — Advanced Steganography (v5.0.0) + +7. **#4** Content-Adaptive Embedding (L) — major security improvement +8. **#6** PWA Mobile Interface (M) — parallel frontend work stream + +### Phase 4 — Moonshot (v5.x+) + +9. **#7** Dual-Payload Mode (XL) — after #2 and #4 are solid +10. **#12** C2PA Watermarking (L) — research-heavy +11. **#13** Signal/Matrix Bot (M) — community-driven + +--- + +## Additional Ideas (Backlog) + +- **Animated GIF steganography** — LSB in GIF frames, natural multi-media extension +- **PDF steganography** — whitespace/font metric/embedded image payloads +- **Batch encode** — `stegasoo batch-encode --dir /photos/` with auto carrier selection (BATCH_* constants suggest this was planned) +- **Stego identification** — `stegasoo identify image.png` probes for known stego signatures +- **Per-device credential sync via QR** — channel key as stego image of reference photo +- **`stegasoo verify`** — decode + confirm message matches expected hash without revealing contents diff --git a/rpi/flash-stock-img.sh b/rpi/flash-stock-img.sh index 9ed3a81..a795b40 100755 --- a/rpi/flash-stock-img.sh +++ b/rpi/flash-stock-img.sh @@ -1,7 +1,7 @@ #!/bin/bash # Flash Raspberry Pi image with headless config (Trixie/Bookworm compatible) -# Usage: ./flash-stock-img.sh -# Reads settings from config.json in same directory +# Usage: ./flash-stock-img.sh [-c config.json] +# Reads settings from config.json in same directory (or specify with -c) # # Uses the same firstrun.sh approach as rpi-imager for compatibility @@ -10,11 +10,31 @@ set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" CONFIG_FILE="$SCRIPT_DIR/config.json" +# ============================================================================ +# Parse options +# ============================================================================ +usage() { + echo "Usage: $0 [-c config.json] " + echo " -c FILE Use alternate config file (default: config.json in script dir)" + echo "Example: $0 2025-12-04-raspios-trixie-arm64-lite.img.xz /dev/sdb" + echo "Example: $0 -c myconfig.json raspios.img.xz /dev/sdb" + exit 1 +} + +while getopts "c:h" opt; do + case $opt in + c) CONFIG_FILE="$OPTARG" ;; + h) usage ;; + *) usage ;; + esac +done +shift $((OPTIND - 1)) + # ============================================================================ # Load config # ============================================================================ if [ ! -f "$CONFIG_FILE" ]; then - echo "Error: config.json not found at $CONFIG_FILE" + echo "Error: config file not found at $CONFIG_FILE" exit 1 fi @@ -38,9 +58,7 @@ echo # Validate args # ============================================================================ if [ $# -ne 2 ]; then - echo "Usage: $0 " - echo "Example: $0 2025-12-04-raspios-trixie-arm64-lite.img.xz /dev/sdb" - exit 1 + usage fi IMAGE="$1" diff --git a/rpi/pull-image.sh b/rpi/pull-image.sh index a3d53d7..84c71fe 100755 --- a/rpi/pull-image.sh +++ b/rpi/pull-image.sh @@ -14,9 +14,9 @@ BOLD='\033[1m' NC='\033[0m' if [ $# -ne 2 ]; then - echo "Usage: $0 " - echo "Example: $0 /dev/sdb stegasoo-rpi-4.2.1.img.zst" - exit 1 + echo "Usage: $0 " + echo "Example: $0 /dev/sdb stegasoo-rpi-4.2.1.img.zst" + exit 1 fi DEVICE="$1" @@ -24,13 +24,13 @@ OUTPUT="$2" # Check for root if [ "$EUID" -ne 0 ]; then - echo -e "${RED}Error: Must run as root (sudo)${NC}" - exit 1 + echo -e "${RED}Error: Must run as root (sudo)${NC}" + exit 1 fi if [ ! -b "$DEVICE" ]; then - echo -e "${RED}Error: Device not found: $DEVICE${NC}" - exit 1 + echo -e "${RED}Error: Device not found: $DEVICE${NC}" + exit 1 fi echo -e "${BOLD}Device info:${NC}" @@ -39,14 +39,14 @@ echo # Find partitions if [ -b "${DEVICE}1" ]; then - BOOT_PART="${DEVICE}1" - ROOT_PART="${DEVICE}2" + BOOT_PART="${DEVICE}1" + ROOT_PART="${DEVICE}2" elif [ -b "${DEVICE}p1" ]; then - BOOT_PART="${DEVICE}p1" - ROOT_PART="${DEVICE}p2" + BOOT_PART="${DEVICE}p1" + ROOT_PART="${DEVICE}p2" else - echo -e "${RED}Error: Could not find partitions${NC}" - exit 1 + echo -e "${RED}Error: Could not find partitions${NC}" + exit 1 fi # Unmount any mounted partitions @@ -62,65 +62,65 @@ echo -e "${BOLD}Checking partition size...${NC}" # Get current partition size in bytes CURRENT_SIZE=$(blockdev --getsize64 "$ROOT_PART") -TARGET_BYTES=$((16 * 1024 * 1024 * 1024)) # 16GB in bytes +TARGET_BYTES=$((16 * 1024 * 1024 * 1024)) # 16GB in bytes CURRENT_GB=$(echo "scale=2; $CURRENT_SIZE / 1073741824" | bc) echo " Current rootfs size: ${CURRENT_GB}GB" if [ "$CURRENT_SIZE" -gt "$TARGET_BYTES" ]; then - echo -e "${YELLOW}Resizing rootfs to 16GB...${NC}" + echo -e "${YELLOW}Resizing rootfs to 16GB...${NC}" - # Get boot partition end in sectors - BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's') + # Get boot partition end in sectors + BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's') - # Calculate 16GB in sectors (512 byte sectors) - ROOT_SIZE_SECTORS=33554432 - ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS)) + # Calculate 16GB in sectors (512 byte sectors) + ROOT_SIZE_SECTORS=33554432 + ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS)) - # SHRINKING: filesystem first, then partition - echo " Checking filesystem..." - e2fsck -f -y "$ROOT_PART" 2>/dev/null || true + # SHRINKING: filesystem first, then partition + echo " Checking filesystem..." + e2fsck -f -y "$ROOT_PART" 2>/dev/null || true - # Shrink filesystem to 15.5GB (leave room for partition overhead) - echo " Shrinking filesystem to 15500M..." - resize2fs "$ROOT_PART" 15500M + # Shrink filesystem to 15.5GB (leave room for partition overhead) + echo " Shrinking filesystem to 15500M..." + resize2fs "$ROOT_PART" 15500M - # Delete and recreate partition 2 with 16GB size - echo " Shrinking partition to 16GB..." - parted -s "$DEVICE" rm 2 - parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s + # Delete and recreate partition 2 with 16GB size + echo " Shrinking partition to 16GB..." + parted -s "$DEVICE" rm 2 + parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s - # Refresh partition table - partprobe "$DEVICE" - sleep 2 + # Refresh partition table + partprobe "$DEVICE" + sleep 2 - # Expand filesystem to fill the partition exactly - echo " Expanding filesystem to fill partition..." - e2fsck -f -y "$ROOT_PART" 2>/dev/null || true - resize2fs "$ROOT_PART" + # Expand filesystem to fill the partition exactly + echo " Expanding filesystem to fill partition..." + e2fsck -f -y "$ROOT_PART" 2>/dev/null || true + resize2fs "$ROOT_PART" - echo -e "${GREEN} Rootfs resized to 16GB${NC}" + echo -e "${GREEN} Rootfs resized to 16GB${NC}" elif [ "$CURRENT_SIZE" -lt "$TARGET_BYTES" ]; then - echo -e "${YELLOW} Rootfs is smaller than 16GB - expanding...${NC}" + echo -e "${YELLOW} Rootfs is smaller than 16GB - expanding...${NC}" - # Get boot partition end in sectors - BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's') - ROOT_SIZE_SECTORS=33554432 - ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS)) + # Get boot partition end in sectors + BOOT_END=$(parted -s "$DEVICE" unit s print | grep "^ 1" | awk '{print $3}' | tr -d 's') + ROOT_SIZE_SECTORS=33554432 + ROOT_END=$((BOOT_END + ROOT_SIZE_SECTORS)) - # EXPANDING: partition first, then filesystem - parted -s "$DEVICE" rm 2 - parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s + # EXPANDING: partition first, then filesystem + parted -s "$DEVICE" rm 2 + parted -s "$DEVICE" mkpart primary ext4 $((BOOT_END + 1))s ${ROOT_END}s - partprobe "$DEVICE" - sleep 2 + partprobe "$DEVICE" + sleep 2 - e2fsck -f -y "$ROOT_PART" 2>/dev/null || true - resize2fs "$ROOT_PART" + e2fsck -f -y "$ROOT_PART" 2>/dev/null || true + resize2fs "$ROOT_PART" - echo -e "${GREEN} Rootfs expanded to 16GB${NC}" + echo -e "${GREEN} Rootfs expanded to 16GB${NC}" else - echo -e "${GREEN} Rootfs already ~16GB${NC}" + echo -e "${GREEN} Rootfs already ~16GB${NC}" fi # ============================================================================ @@ -135,8 +135,8 @@ echo END_SECTOR=$(parted -s "$DEVICE" unit s print | grep "^ 2" | awk '{print $3}' | tr -d 's') if [ -z "$END_SECTOR" ]; then - echo -e "${RED}Error: Could not determine partition 2 end sector${NC}" - exit 1 + echo -e "${RED}Error: Could not determine partition 2 end sector${NC}" + exit 1 fi # Add a small buffer (1MB = 2048 sectors) for safety @@ -150,8 +150,8 @@ echo read -p "Proceed with image pull? [Y/n] " confirm if [[ "$confirm" =~ ^[Nn]$ ]]; then - echo "Aborted." - exit 1 + echo "Aborted." + exit 1 fi echo @@ -159,13 +159,13 @@ echo -e "${GREEN}Pulling image...${NC}" echo # Use pv if available for progress, otherwise fallback to dd status -if command -v pv &> /dev/null; then - dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS 2>/dev/null | \ - pv -s $TOTAL_BYTES | \ - zstd -T0 -3 > "$OUTPUT" +if command -v pv &>/dev/null; then + dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS 2>/dev/null | + pv -s $TOTAL_BYTES | + zstd -T0 -19 --ultra >"$OUTPUT" else - dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS status=progress | \ - zstd -T0 -3 > "$OUTPUT" + dd if="$DEVICE" bs=512 count=$TOTAL_SECTORS status=progress | + zstd -T0 -19 --ultra >"$OUTPUT" fi echo @@ -178,16 +178,16 @@ ls -lh "$OUTPUT" echo read -p "Create .zst.zip wrapper for GitHub? [y/N] " zip_confirm if [[ "$zip_confirm" =~ ^[Yy]$ ]]; then - ZIP_OUTPUT="${OUTPUT}.zip" - echo -e "${YELLOW}Creating zip wrapper (store mode, no compression)...${NC}" - zip -0 "$ZIP_OUTPUT" "$OUTPUT" - echo -e "${GREEN}Done!${NC} Upload this to GitHub Releases:" - ls -lh "$ZIP_OUTPUT" - echo - echo "Users can flash with:" - echo " sudo ./rpi/flash-image.sh $ZIP_OUTPUT" + ZIP_OUTPUT="${OUTPUT}.zip" + echo -e "${YELLOW}Creating zip wrapper (store mode, no compression)...${NC}" + zip -0 "$ZIP_OUTPUT" "$OUTPUT" + echo -e "${GREEN}Done!${NC} Upload this to GitHub Releases:" + ls -lh "$ZIP_OUTPUT" + echo + echo "Users can flash with:" + echo " sudo ./rpi/flash-image.sh $ZIP_OUTPUT" else - echo - echo "To verify:" - echo " zstdcat $OUTPUT | fdisk -l /dev/stdin" + echo + echo "To verify:" + echo " zstdcat $OUTPUT | fdisk -l /dev/stdin" fi diff --git a/rpi/train_proj.json b/rpi/train_proj.json new file mode 100644 index 0000000..e569b37 --- /dev/null +++ b/rpi/train_proj.json @@ -0,0 +1,13 @@ +{ + "hostname": "running_trains", + "username": "admin", + "password": "runthemtrains", + "wifiSSID": "WitchHazelWrecked", + "wifiPassword": "BeefPigsMoo", + "wifiCountry": "US", + "locale": "en_US.UTF-8", + "keyboardLayout": "us", + "timezone": "America/New_York", + "enableSSH": true +} + diff --git a/src/stegasoo/__init__.py b/src/stegasoo/__init__.py index fb47037..5432ec7 100644 --- a/src/stegasoo/__init__.py +++ b/src/stegasoo/__init__.py @@ -22,6 +22,9 @@ from .channel import ( validate_channel_key, ) +# Audio support — gated by STEGASOO_AUDIO env var and dependency availability +from .constants import AUDIO_ENABLED, VIDEO_ENABLED + # Crypto functions from .crypto import get_active_channel_key, get_channel_fingerprint, has_argon2 from .decode import decode, decode_file, decode_text @@ -54,9 +57,6 @@ from .steganography import ( # Utilities from .utils import generate_filename -# Audio support — gated by STEGASOO_AUDIO env var and dependency availability -from .constants import AUDIO_ENABLED, VIDEO_ENABLED - HAS_AUDIO_SUPPORT = AUDIO_ENABLED HAS_VIDEO_SUPPORT = VIDEO_ENABLED @@ -77,6 +77,24 @@ else: encode_audio = None decode_audio = None +# Video support — gated by STEGASOO_VIDEO env var and ffmpeg + audio deps +if VIDEO_ENABLED: + from .decode import decode_video + from .encode import encode_video + from .video_utils import ( + calculate_video_capacity, + detect_video_format, + get_video_info, + validate_video, + ) +else: + detect_video_format = None + get_video_info = None + validate_video = None + calculate_video_capacity = None + encode_video = None + decode_video = None + # QR Code utilities - optional, may not be available try: from .qr_utils import ( @@ -117,6 +135,8 @@ from .constants import ( EMBED_MODE_AUTO, EMBED_MODE_DCT, EMBED_MODE_LSB, + EMBED_MODE_VIDEO_AUTO, + EMBED_MODE_VIDEO_LSB, FORMAT_VERSION, LOSSLESS_FORMATS, MAX_FILE_PAYLOAD_SIZE, @@ -159,7 +179,13 @@ from .exceptions import ( SteganographyError, StegasooError, UnsupportedAudioFormatError, + UnsupportedVideoFormatError, ValidationError, + VideoCapacityError, + VideoError, + VideoExtractionError, + VideoTranscodeError, + VideoValidationError, ) # Models @@ -175,6 +201,9 @@ from .models import ( GenerateResult, ImageInfo, ValidationResult, + VideoCapacityInfo, + VideoEmbedStats, + VideoInfo, ) from .validation import ( validate_audio_embed_mode, @@ -212,6 +241,13 @@ __all__ = [ "HAS_VIDEO_SUPPORT", "validate_audio_embed_mode", "validate_audio_file", + # Video (v4.4.0) + "encode_video", + "decode_video", + "detect_video_format", + "get_video_info", + "validate_video", + "calculate_video_capacity", # Generation "generate_pin", "generate_passphrase", @@ -273,6 +309,10 @@ __all__ = [ "AudioEmbedStats", "AudioInfo", "AudioCapacityInfo", + # Video models + "VideoEmbedStats", + "VideoInfo", + "VideoCapacityInfo", # Exceptions "StegasooError", "ValidationError", @@ -303,6 +343,13 @@ __all__ = [ "AudioExtractionError", "AudioTranscodeError", "UnsupportedAudioFormatError", + # Video exceptions + "VideoError", + "VideoValidationError", + "VideoCapacityError", + "VideoExtractionError", + "VideoTranscodeError", + "UnsupportedVideoFormatError", # Constants "FORMAT_VERSION", "MIN_PASSPHRASE_WORDS", @@ -329,4 +376,7 @@ __all__ = [ "EMBED_MODE_AUDIO_LSB", "EMBED_MODE_AUDIO_SPREAD", "EMBED_MODE_AUDIO_AUTO", + # Video constants + "EMBED_MODE_VIDEO_LSB", + "EMBED_MODE_VIDEO_AUTO", ] diff --git a/src/stegasoo/cli.py b/src/stegasoo/cli.py index 7264516..5dcec66 100644 --- a/src/stegasoo/cli.py +++ b/src/stegasoo/cli.py @@ -853,6 +853,410 @@ def audio_info(ctx, audio): raise SystemExit(1) +# ============================================================================= +# VIDEO COMMANDS (v4.4.0) +# ============================================================================= + + +@cli.command("video-encode") +@click.argument("carrier", type=click.Path(exists=True)) +@click.option( + "-r", + "--reference", + required=True, + type=click.Path(exists=True), + help="Reference photo (shared secret)", +) +@click.option("-m", "--message", help="Message to encode") +@click.option( + "-f", + "--file", + "file_payload", + type=click.Path(exists=True), + help="File to embed instead of message", +) +@click.option("-o", "--output", type=click.Path(), help="Output video path") +@click.option( + "--passphrase", + prompt=True, + hide_input=True, + confirmation_prompt=True, + help="Passphrase (recommend 4+ words)", +) +@click.option("--pin", prompt=True, hide_input=True, confirmation_prompt=True, help="PIN code") +@click.option( + "--rsa-key", + type=click.Path(exists=True), + help="RSA private key PEM file", +) +@click.option("--rsa-password", default=None, help="Password for encrypted RSA key") +@click.option("--channel-key", default=None, help="Channel key for deployment isolation") +@click.option("--dry-run", is_flag=True, help="Show capacity usage without encoding") +@click.pass_context +def video_encode( + ctx, + carrier, + reference, + message, + file_payload, + output, + passphrase, + pin, + rsa_key, + rsa_password, + channel_key, + dry_run, +): + """ + Encode a message or file into a video carrier. + + Output is MKV format with FFV1 lossless codec to preserve embedded data. + + Examples: + + stegasoo video-encode carrier.mp4 -r ref.jpg -m "Secret" + + stegasoo video-encode carrier.mp4 -r ref.jpg -f secret.pdf -o stego.mkv + + stegasoo video-encode carrier.mp4 -r ref.jpg -m "Secret" --dry-run + """ + from .constants import VIDEO_ENABLED + + if not VIDEO_ENABLED: + raise click.UsageError( + "Video support is disabled. Install ffmpeg and audio extras, " + "or set STEGASOO_VIDEO=1 to force enable." + ) + + from .encode import encode_video + from .models import FilePayload + from .video_utils import calculate_video_capacity, get_video_info + + if not message and not file_payload: + raise click.UsageError("Either --message or --file is required") + + # Read RSA key if provided + rsa_key_data = None + if rsa_key: + with open(rsa_key, "rb") as f: + rsa_key_data = f.read() + + # Calculate payload size + if file_payload: + payload_size = Path(file_payload).stat().st_size + payload_type = "file" + else: + payload_size = len(message.encode("utf-8")) + payload_type = "text" + + # Read input files + with open(reference, "rb") as f: + reference_data = f.read() + with open(carrier, "rb") as f: + carrier_data = f.read() + + if dry_run: + try: + info = get_video_info(carrier_data) + capacity_info = calculate_video_capacity(carrier_data) + + result = { + "carrier": carrier, + "reference": reference, + "format": info.format, + "codec": info.codec, + "resolution": f"{info.width}x{info.height}", + "fps": round(info.fps, 2), + "duration_seconds": round(info.duration_seconds, 2), + "total_frames": info.total_frames, + "i_frames": info.i_frame_count, + "capacity_bytes": capacity_info.usable_capacity_bytes, + "capacity_kb": round(capacity_info.usable_capacity_bytes / 1024, 1), + "payload_type": payload_type, + "payload_size": payload_size, + "usage_percent": round( + payload_size / capacity_info.usable_capacity_bytes * 100, 1 + ) + if capacity_info.usable_capacity_bytes > 0 + else 0, + "fits": payload_size < capacity_info.usable_capacity_bytes, + } + + if ctx.obj.get("json"): + click.echo(json.dumps(result, indent=2)) + else: + click.echo(f"Carrier: {carrier} ({info.format}, {info.codec})") + click.echo(f"Resolution: {info.width}x{info.height} @ {info.fps:.2f} fps") + click.echo(f"Duration: {info.duration_seconds:.1f}s") + click.echo(f"Frames: {info.total_frames} total, {info.i_frame_count} I-frames") + click.echo(f"Reference: {reference}") + click.echo( + f"Capacity: {capacity_info.usable_capacity_bytes:,} bytes " + f"({capacity_info.usable_capacity_bytes // 1024} KB)" + ) + click.echo(f"Payload: {payload_size:,} bytes ({payload_type})") + click.echo(f"Usage: {result['usage_percent']}%") + click.echo(f"Status: {'✓ Fits' if result['fits'] else '✗ Too large'}") + click.echo() + click.echo("Note: Output will be MKV format with FFV1 lossless codec") + except Exception as e: + if ctx.obj.get("json"): + click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2)) + else: + click.echo(f"✗ Capacity check failed: {e}", err=True) + raise SystemExit(1) + return + + # Determine output path + if not output: + output = f"{Path(carrier).stem}_encoded.mkv" + + try: + if file_payload: + payload = FilePayload.from_file(file_payload) + else: + payload = message + + stego_video, stats = encode_video( + message=payload, + reference_photo=reference_data, + carrier_video=carrier_data, + passphrase=passphrase, + pin=pin, + rsa_key_data=rsa_key_data, + rsa_password=rsa_password, + channel_key=channel_key, + ) + + with open(output, "wb") as f: + f.write(stego_video) + + if ctx.obj.get("json"): + click.echo( + json.dumps( + { + "status": "success", + "carrier": carrier, + "reference": reference, + "output": output, + "codec": stats.codec, + "frames_modified": stats.frames_modified, + "duration_seconds": round(stats.duration_seconds, 2), + "capacity_used": round(stats.capacity_used * 100, 1), + }, + indent=2, + ) + ) + else: + click.echo(f"✓ Encoded to {output}") + click.echo(f" Codec: {stats.codec} (lossless)") + click.echo(f" Frames modified: {stats.frames_modified}") + click.echo(f" Capacity used: {stats.capacity_used * 100:.1f}%") + + except Exception as e: + if ctx.obj.get("json"): + click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2)) + else: + click.echo(f"✗ Video encoding failed: {e}", err=True) + raise SystemExit(1) + + +@cli.command("video-decode") +@click.argument("video", type=click.Path(exists=True)) +@click.option( + "-r", + "--reference", + required=True, + type=click.Path(exists=True), + help="Reference photo (shared secret)", +) +@click.option("--passphrase", prompt=True, hide_input=True, help="Passphrase") +@click.option("--pin", prompt=True, hide_input=True, help="PIN code") +@click.option( + "--rsa-key", + type=click.Path(exists=True), + help="RSA private key PEM file", +) +@click.option("--rsa-password", default=None, help="Password for encrypted RSA key") +@click.option("--channel-key", default=None, help="Channel key for deployment isolation") +@click.option("-o", "--output", type=click.Path(), help="Output path for file payloads") +@click.pass_context +def video_decode( + ctx, video, reference, passphrase, pin, rsa_key, rsa_password, channel_key, output +): + """ + Decode a message or file from stego video. + + Examples: + + stegasoo video-decode stego.mkv -r ref.jpg + + stegasoo video-decode stego.mkv -r ref.jpg -o ./extracted/ + """ + from .constants import VIDEO_ENABLED + + if not VIDEO_ENABLED: + raise click.UsageError( + "Video support is disabled. Install ffmpeg and audio extras, " + "or set STEGASOO_VIDEO=1 to force enable." + ) + + from .decode import decode_video + + # Read RSA key if provided + rsa_key_data = None + if rsa_key: + with open(rsa_key, "rb") as f: + rsa_key_data = f.read() + + with open(video, "rb") as f: + video_data = f.read() + with open(reference, "rb") as f: + reference_data = f.read() + + try: + result = decode_video( + stego_video=video_data, + reference_photo=reference_data, + passphrase=passphrase, + pin=pin, + rsa_key_data=rsa_key_data, + rsa_password=rsa_password, + channel_key=channel_key, + ) + + if result.is_file: + filename = result.filename or "decoded_file" + output_path = Path(output) / filename if output else Path(filename) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, "wb") as f: + f.write(result.file_data) + + if ctx.obj.get("json"): + click.echo( + json.dumps( + { + "status": "success", + "video": video, + "payload_type": "file", + "filename": filename, + "output": str(output_path), + "size": len(result.file_data), + }, + indent=2, + ) + ) + else: + click.echo(f"✓ Extracted file: {output_path}") + click.echo(f" Size: {len(result.file_data):,} bytes") + else: + if ctx.obj.get("json"): + click.echo( + json.dumps( + { + "status": "success", + "video": video, + "payload_type": "text", + "message": result.message, + }, + indent=2, + ) + ) + else: + click.echo(f"Decoded from {video}:") + click.echo(result.message) + + except Exception as e: + if ctx.obj.get("json"): + click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2)) + else: + click.echo(f"✗ Video decoding failed: {e}", err=True) + raise SystemExit(1) + + +@cli.command("video-info") +@click.argument("video", type=click.Path(exists=True)) +@click.pass_context +def video_info(ctx, video): + """ + Show video file information and steganographic capacity. + + Examples: + + stegasoo video-info carrier.mp4 + + stegasoo --json video-info carrier.mp4 + """ + from .constants import VIDEO_ENABLED + + if not VIDEO_ENABLED: + raise click.UsageError( + "Video support is disabled. Install ffmpeg and audio extras, " + "or set STEGASOO_VIDEO=1 to force enable." + ) + + from .video_utils import calculate_video_capacity, get_video_info + + with open(video, "rb") as f: + video_data = f.read() + + try: + info = get_video_info(video_data) + capacity_info = calculate_video_capacity(video_data) + + result = { + "file": video, + "format": info.format, + "codec": info.codec, + "resolution": { + "width": info.width, + "height": info.height, + }, + "fps": round(info.fps, 2), + "duration_seconds": round(info.duration_seconds, 2), + "total_frames": info.total_frames, + "i_frame_count": info.i_frame_count, + "bitrate": info.bitrate, + "file_size": len(video_data), + "capacity": { + "bytes": capacity_info.usable_capacity_bytes, + "kb": round(capacity_info.usable_capacity_bytes / 1024, 1), + "mb": round(capacity_info.usable_capacity_bytes / (1024 * 1024), 2), + }, + } + + if ctx.obj.get("json"): + click.echo(json.dumps(result, indent=2)) + else: + click.echo(f"File: {video}") + click.echo(f"Format: {info.format}") + click.echo(f"Codec: {info.codec}") + click.echo(f"Resolution: {info.width}x{info.height}") + click.echo(f"Frame rate: {info.fps:.2f} fps") + click.echo(f"Duration: {info.duration_seconds:.1f}s") + click.echo(f"Total frames: {info.total_frames:,}") + click.echo(f"I-frames (keyframes): {info.i_frame_count:,}") + if info.bitrate: + click.echo(f"Bitrate: {info.bitrate // 1000} kbps") + click.echo(f"File size: {len(video_data):,} bytes") + click.echo() + click.echo("Steganographic capacity (LSB, I-frames only):") + click.echo( + f" {capacity_info.usable_capacity_bytes:,} bytes " + f"({capacity_info.usable_capacity_bytes // 1024} KB)" + ) + click.echo() + click.echo("Note: Output will be MKV format with FFV1 lossless codec") + + except Exception as e: + if ctx.obj.get("json"): + click.echo(json.dumps({"status": "error", "error": str(e)}, indent=2)) + else: + click.echo(f"✗ Video info failed: {e}", err=True) + raise SystemExit(1) + + # ============================================================================= # BATCH COMMANDS # ============================================================================= diff --git a/src/stegasoo/constants.py b/src/stegasoo/constants.py index d5a8e8d..47fbd46 100644 --- a/src/stegasoo/constants.py +++ b/src/stegasoo/constants.py @@ -411,3 +411,27 @@ AUDIO_ECHO_DELAY_0 = 50 # Echo delay for bit 0 (samples at 44.1kHz ~ 1.1ms) AUDIO_ECHO_DELAY_1 = 100 # Echo delay for bit 1 (samples at 44.1kHz ~ 2.3ms) AUDIO_ECHO_AMPLITUDE = 0.3 # Echo strength (relative to original) AUDIO_ECHO_WINDOW_SIZE = 8192 # Window size for echo embedding + + +# ============================================================================= +# VIDEO STEGANOGRAPHY (v4.4.0) +# ============================================================================= + +# Video embedding modes +EMBED_MODE_VIDEO_LSB = "video_lsb" +EMBED_MODE_VIDEO_AUTO = "video_auto" +VALID_VIDEO_EMBED_MODES = {EMBED_MODE_VIDEO_LSB} + +# Video magic bytes (for format detection in stego video) +VIDEO_MAGIC_LSB = b"VIDL" + +# Video input limits +MAX_VIDEO_FILE_SIZE = 4 * 1024 * 1024 * 1024 # 4 GB +MAX_VIDEO_DURATION = 3600 # 1 hour in seconds +MIN_VIDEO_RESOLUTION = (64, 64) +MAX_VIDEO_RESOLUTION = (7680, 4320) # 8K UHD +ALLOWED_VIDEO_EXTENSIONS = {"mp4", "mkv", "webm", "avi", "mov"} + +# Video output settings +VIDEO_OUTPUT_CODEC = "ffv1" # FFV1 lossless codec +VIDEO_OUTPUT_CONTAINER = "mkv" # MKV container for FFV1 diff --git a/src/stegasoo/decode.py b/src/stegasoo/decode.py index e0b3bdd..bdf9348 100644 --- a/src/stegasoo/decode.py +++ b/src/stegasoo/decode.py @@ -383,3 +383,100 @@ def decode_audio( debug.print(f"Decryption successful: {result.payload_type}") return result + + +def decode_video( + stego_video: bytes, + reference_photo: bytes, + passphrase: str, + pin: str = "", + rsa_key_data: bytes | None = None, + rsa_password: str | None = None, + embed_mode: str = "video_auto", + channel_key: str | bool | None = None, + progress_file: str | None = None, +) -> DecodeResult: + """ + Decode a message or file from stego video. + + Extracts data from I-frames (keyframes) using LSB steganography. + + Args: + stego_video: Stego video bytes + reference_photo: Shared reference photo bytes + passphrase: Shared passphrase + pin: Optional static PIN + rsa_key_data: Optional RSA key bytes + rsa_password: Optional RSA key password + embed_mode: 'video_auto' or 'video_lsb' + channel_key: Channel key for deployment/group isolation + progress_file: Optional path to write progress JSON + + Returns: + DecodeResult with message or file data + """ + from .constants import ( + EMBED_MODE_VIDEO_AUTO, + EMBED_MODE_VIDEO_LSB, + VIDEO_ENABLED, + ) + + if not VIDEO_ENABLED: + raise ExtractionError( + "Video support is disabled. Install video extras and ffmpeg, " + "or set STEGASOO_VIDEO=1 to force enable." + ) + + from .video_utils import detect_video_format + + debug.print( + f"decode_video: mode={embed_mode}, " f"passphrase length={len(passphrase.split())} words" + ) + + # Validate inputs + require_valid_image(reference_photo, "Reference photo") + require_security_factors(pin, rsa_key_data) + + if pin: + require_valid_pin(pin) + if rsa_key_data: + require_valid_rsa_key(rsa_key_data, rsa_password) + + # Detect format + video_format = detect_video_format(stego_video) + debug.print(f"Detected video format: {video_format}") + + if video_format == "unknown": + raise ExtractionError("Could not detect video format.") + + _write_progress(progress_file, 20, 100, "initializing") + + # Derive pixel/frame selection key + from .crypto import derive_pixel_key + + pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key) + + _write_progress(progress_file, 25, 100, "extracting") + + encrypted = None + + if embed_mode == EMBED_MODE_VIDEO_AUTO or embed_mode == EMBED_MODE_VIDEO_LSB: + from .video_steganography import extract_from_video_lsb + + encrypted = extract_from_video_lsb(stego_video, pixel_key, progress_file=progress_file) + if encrypted: + debug.print("Video LSB extraction succeeded") + else: + raise ValueError(f"Invalid video embed mode: {embed_mode}") + + if not encrypted: + debug.print("No data extracted from video") + raise ExtractionError("Could not extract data from video. Check your credentials.") + + debug.print(f"Extracted {len(encrypted)} bytes from video") + + # Decrypt + result = decrypt_message(encrypted, reference_photo, passphrase, pin, rsa_key_data, channel_key) + + debug.print(f"Decryption successful: {result.payload_type}") + return result diff --git a/src/stegasoo/encode.py b/src/stegasoo/encode.py index 604d2b0..5b81c13 100644 --- a/src/stegasoo/encode.py +++ b/src/stegasoo/encode.py @@ -8,6 +8,9 @@ Changes in v4.0.0: Changes in v4.3.0: - Added encode_audio() for audio steganography + +Changes in v4.4.0: +- Added encode_video() for video steganography """ from __future__ import annotations @@ -18,7 +21,7 @@ from typing import TYPE_CHECKING from .constants import EMBED_MODE_LSB from .crypto import derive_pixel_key, encrypt_message from .debug import debug -from .exceptions import AudioError +from .exceptions import AudioError, VideoError from .models import EncodeResult, FilePayload from .steganography import embed_in_image from .utils import generate_filename @@ -31,7 +34,7 @@ from .validation import ( ) if TYPE_CHECKING: - from .models import AudioEmbedStats + from .models import AudioEmbedStats, VideoEmbedStats def encode( @@ -365,3 +368,97 @@ def encode_audio( raise ValueError(f"Invalid audio embed mode: {embed_mode}") return stego_audio, stats + + +def encode_video( + message: str | bytes | FilePayload, + reference_photo: bytes, + carrier_video: bytes, + passphrase: str, + pin: str = "", + rsa_key_data: bytes | None = None, + rsa_password: str | None = None, + embed_mode: str = "video_lsb", + channel_key: str | bool | None = None, + progress_file: str | None = None, +) -> tuple[bytes, VideoEmbedStats]: + """ + Encode a message or file into a video carrier. + + Embeds data across I-frames (keyframes) using LSB steganography. + Output is an MKV container with FFV1 lossless codec to preserve + the embedded data perfectly. + + Args: + message: Text message, raw bytes, or FilePayload to hide + reference_photo: Shared reference photo bytes + carrier_video: Carrier video bytes (MP4, MKV, WebM, AVI, MOV) + passphrase: Shared passphrase + pin: Optional static PIN + rsa_key_data: Optional RSA private key PEM bytes + rsa_password: Optional password for encrypted RSA key + embed_mode: 'video_lsb' (currently the only option) + channel_key: Channel key for deployment/group isolation + progress_file: Optional path to write progress JSON + + Returns: + Tuple of (stego video bytes, VideoEmbedStats) + + Note: + The output video will be in MKV format with FFV1 lossless codec, + regardless of the input format. This is necessary to preserve + the embedded data without lossy compression artifacts. + """ + from .constants import EMBED_MODE_VIDEO_LSB, VIDEO_ENABLED + + if not VIDEO_ENABLED: + raise VideoError( + "Video support is disabled. Install video extras and ffmpeg, " + "or set STEGASOO_VIDEO=1 to force enable." + ) + + from .video_utils import detect_video_format + + debug.print( + f"encode_video: mode={embed_mode}, " + f"passphrase length={len(passphrase.split())} words, " + f"pin={'set' if pin else 'none'}" + ) + + # Validate inputs + require_valid_payload(message) + require_valid_image(reference_photo, "Reference photo") + require_security_factors(pin, rsa_key_data) + + if pin: + require_valid_pin(pin) + if rsa_key_data: + require_valid_rsa_key(rsa_key_data, rsa_password) + + # Detect video format + video_format = detect_video_format(carrier_video) + debug.print(f"Detected video format: {video_format}") + + if video_format == "unknown": + raise VideoError("Could not detect video format. Supported: MP4, MKV, WebM, AVI, MOV.") + + # Encrypt message + encrypted = encrypt_message( + message, reference_photo, passphrase, pin, rsa_key_data, channel_key + ) + debug.print(f"Encrypted payload: {len(encrypted)} bytes") + + # Derive pixel/frame selection key + pixel_key = derive_pixel_key(reference_photo, passphrase, pin, rsa_key_data, channel_key) + + # Embed based on mode + if embed_mode == EMBED_MODE_VIDEO_LSB: + from .video_steganography import embed_in_video_lsb + + stego_video, stats = embed_in_video_lsb( + encrypted, carrier_video, pixel_key, progress_file=progress_file + ) + else: + raise ValueError(f"Invalid video embed mode: {embed_mode}") + + return stego_video, stats diff --git a/src/stegasoo/exceptions.py b/src/stegasoo/exceptions.py index 57be0fd..54b4905 100644 --- a/src/stegasoo/exceptions.py +++ b/src/stegasoo/exceptions.py @@ -243,3 +243,51 @@ class UnsupportedAudioFormatError(AudioError): """Audio format not supported.""" pass + + +# ============================================================================ +# VIDEO ERRORS +# ============================================================================ + + +class VideoError(SteganographyError): + """Base class for video steganography errors.""" + + pass + + +class VideoValidationError(ValidationError): + """Video validation failed.""" + + pass + + +class VideoCapacityError(CapacityError): + """Video carrier too small for message.""" + + def __init__(self, needed: int, available: int): + self.needed = needed + self.available = available + # Call SteganographyError.__init__ directly (skip CapacityError's image-specific message) + SteganographyError.__init__( + self, + f"Video carrier too small. Need {needed:,} bytes, have {available:,} bytes capacity.", + ) + + +class VideoExtractionError(ExtractionError): + """Failed to extract hidden data from video.""" + + pass + + +class VideoTranscodeError(VideoError): + """Video transcoding failed.""" + + pass + + +class UnsupportedVideoFormatError(VideoError): + """Video format not supported.""" + + pass diff --git a/src/stegasoo/models.py b/src/stegasoo/models.py index 4e71bc0..3ee72b4 100644 --- a/src/stegasoo/models.py +++ b/src/stegasoo/models.py @@ -336,3 +336,56 @@ class AudioCapacityInfo: chip_length: int | None = None # v4.4.0: samples per chip embeddable_channels: int | None = None # v4.4.0: channels used (excl. LFE) total_channels: int | None = None # v4.4.0: total channels in carrier + + +# ============================================================================= +# VIDEO STEGANOGRAPHY MODELS (v4.4.0) +# ============================================================================= + + +@dataclass +class VideoEmbedStats: + """Statistics from video embedding.""" + + frames_modified: int + total_frames: int + capacity_used: float # 0.0 - 1.0 + bytes_embedded: int + width: int + height: int + fps: float + duration_seconds: float + embed_mode: str # "video_lsb" + codec: str # Output codec (e.g., "ffv1") + + @property + def modification_percent(self) -> float: + """Percentage of frames modified.""" + return (self.frames_modified / self.total_frames) * 100 if self.total_frames > 0 else 0 + + +@dataclass +class VideoInfo: + """Information about a video file.""" + + width: int + height: int + fps: float + duration_seconds: float + total_frames: int + i_frame_count: int + format: str # "mp4", "mkv", "webm", etc. + codec: str # "h264", "vp9", "ffv1", etc. + bitrate: int | None = None # For lossy formats + + +@dataclass +class VideoCapacityInfo: + """Capacity information for video steganography.""" + + total_frames: int + i_frames: int + usable_capacity_bytes: int + embed_mode: str + resolution: tuple[int, int] + duration_seconds: float diff --git a/src/stegasoo/video_steganography.py b/src/stegasoo/video_steganography.py new file mode 100644 index 0000000..37b5714 --- /dev/null +++ b/src/stegasoo/video_steganography.py @@ -0,0 +1,496 @@ +""" +Stegasoo Video Steganography — LSB Embedding/Extraction (v4.4.0) + +Frame-based LSB embedding for video files. + +Hides data in the least significant bits of video frame pixels. Uses the +existing image steganography engine for per-frame embedding, providing +high capacity across multiple I-frames. + +Strategy: +1. Extract I-frames (keyframes) from video using ffmpeg +2. Embed payload across I-frames using existing LSB engine +3. Re-encode video with modified frames using FFV1 lossless codec +4. Output: MKV container with embedded data + +Uses ChaCha20 as a CSPRNG for pseudo-random frame selection and pixel +selection within frames, ensuring that without the key an attacker cannot +determine which frames/pixels were modified. +""" + +import struct +import tempfile +from pathlib import Path + +from .constants import ( + EMBED_MODE_VIDEO_LSB, + VIDEO_MAGIC_LSB, + VIDEO_OUTPUT_CODEC, +) +from .debug import debug +from .exceptions import VideoCapacityError, VideoError +from .models import VideoEmbedStats +from .steganography import ENCRYPTION_OVERHEAD, _embed_lsb, _extract_lsb +from .video_utils import extract_frames, get_video_info, reassemble_video + +# Progress reporting interval — write every N frames +PROGRESS_INTERVAL = 5 + + +# ============================================================================= +# PROGRESS REPORTING +# ============================================================================= + + +def _write_progress(progress_file: str | None, current: int, total: int, phase: str = "embedding"): + """Write progress to file for frontend polling.""" + if progress_file is None: + return + try: + import json + + with open(progress_file, "w") as f: + json.dump( + { + "current": current, + "total": total, + "percent": round((current / total) * 100, 1) if total > 0 else 0, + "phase": phase, + }, + f, + ) + except Exception: + pass # Don't let progress writing break encoding + + +# ============================================================================= +# CAPACITY +# ============================================================================= + + +def calculate_video_lsb_capacity(video_data: bytes) -> int: + """ + Calculate the maximum bytes that can be embedded in a video via LSB. + + Calculates capacity based on I-frames (keyframes) only. Each I-frame + provides capacity proportional to its pixel count. + + Args: + video_data: Raw bytes of a video file. + + Returns: + Maximum embeddable payload size in bytes (after subtracting overhead). + + Raises: + VideoError: If the video cannot be read or is in an unsupported format. + """ + from .video_utils import calculate_video_capacity + + capacity_info = calculate_video_capacity(video_data, EMBED_MODE_VIDEO_LSB) + + debug.print( + f"Video LSB capacity: {capacity_info.usable_capacity_bytes} bytes " + f"({capacity_info.i_frames} I-frames, {capacity_info.resolution[0]}x{capacity_info.resolution[1]})" + ) + + return capacity_info.usable_capacity_bytes + + +# ============================================================================= +# FRAME INDEX GENERATION (ChaCha20 CSPRNG) +# ============================================================================= + + +def generate_frame_indices(key: bytes, num_frames: int, num_needed: int) -> list[int]: + """ + Generate pseudo-random frame indices using ChaCha20 as a CSPRNG. + + Produces a deterministic sequence of unique frame indices so that + the same key always yields the same embedding locations. + + Args: + key: 32-byte key for the ChaCha20 cipher. + num_frames: Total number of frames available. + num_needed: How many unique frame indices are required. + + Returns: + List of ``num_needed`` unique indices in [0, num_frames). + """ + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms + + debug.validate(len(key) == 32, f"Frame key must be 32 bytes, got {len(key)}") + debug.validate(num_frames > 0, f"Number of frames must be positive, got {num_frames}") + debug.validate(num_needed > 0, f"Number needed must be positive, got {num_needed}") + debug.validate( + num_needed <= num_frames, + f"Cannot select {num_needed} frames from {num_frames} available", + ) + + debug.print(f"Generating {num_needed} frame indices from {num_frames} total frames") + + # Use a different nonce offset for frame selection (vs pixel selection) + nonce = b"\x01" + b"\x00" * 15 # Different from pixel selection nonce + + if num_needed >= num_frames // 2: + # Full Fisher-Yates shuffle + cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend()) + encryptor = cipher.encryptor() + + indices = list(range(num_frames)) + random_bytes = encryptor.update(b"\x00" * (num_frames * 4)) + + for i in range(num_frames - 1, 0, -1): + j_bytes = random_bytes[(num_frames - 1 - i) * 4 : (num_frames - i) * 4] + j = int.from_bytes(j_bytes, "big") % (i + 1) + indices[i], indices[j] = indices[j], indices[i] + + return indices[:num_needed] + + # Direct sampling + selected: list[int] = [] + used: set[int] = set() + + cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend()) + encryptor = cipher.encryptor() + + bytes_needed = (num_needed * 2) * 4 + random_bytes = encryptor.update(b"\x00" * bytes_needed) + + byte_offset = 0 + while len(selected) < num_needed and byte_offset < len(random_bytes) - 4: + idx = int.from_bytes(random_bytes[byte_offset : byte_offset + 4], "big") % num_frames + byte_offset += 4 + + if idx not in used: + used.add(idx) + selected.append(idx) + + debug.validate( + len(selected) == num_needed, + f"Failed to generate enough indices: {len(selected)}/{num_needed}", + ) + return selected + + +# ============================================================================= +# EMBEDDING +# ============================================================================= + + +@debug.time +def embed_in_video_lsb( + data: bytes, + carrier_video: bytes, + pixel_key: bytes, + progress_file: str | None = None, +) -> tuple[bytes, VideoEmbedStats]: + """ + Embed data into video frames using LSB steganography. + + The payload is prepended with a 4-byte magic header and a 4-byte + big-endian length prefix. Data is distributed across I-frames using + pseudo-random selection based on the pixel_key. + + The output video uses FFV1 lossless codec in MKV container to + preserve the embedded data perfectly. + + Args: + data: Encrypted payload bytes to embed. + carrier_video: Raw bytes of the carrier video file. + pixel_key: 32-byte key for frame and pixel selection. + progress_file: Optional path for progress JSON (frontend polling). + + Returns: + Tuple of (stego video bytes, VideoEmbedStats). + + Raises: + VideoCapacityError: If the payload is too large for the carrier. + VideoError: On any other embedding failure. + """ + debug.print(f"Video LSB embedding {len(data)} bytes") + debug.data(pixel_key, "Pixel key for embedding") + debug.validate(len(pixel_key) == 32, f"Pixel key must be 32 bytes, got {len(pixel_key)}") + + try: + # Get video info + video_info = get_video_info(carrier_video) + debug.print( + f"Carrier video: {video_info.width}x{video_info.height}, " + f"{video_info.fps:.2f} fps, {video_info.duration_seconds:.1f}s, " + f"{video_info.i_frame_count} I-frames" + ) + + # Prepend magic + length prefix + header = VIDEO_MAGIC_LSB + struct.pack(">I", len(data)) + payload = header + data + debug.print(f"Payload with header: {len(payload)} bytes") + + # Calculate capacity and check fit + capacity = calculate_video_lsb_capacity(carrier_video) + if len(payload) > capacity + ENCRYPTION_OVERHEAD: + raise VideoCapacityError(len(payload), capacity) + + # Extract I-frames to temp directory + with tempfile.TemporaryDirectory(prefix="stegasoo_video_") as temp_dir_str: + temp_dir = Path(temp_dir_str) + + _write_progress(progress_file, 5, 100, "extracting_frames") + + frames, _ = extract_frames(carrier_video, temp_dir, keyframes_only=True) + num_frames = len(frames) + + debug.print(f"Extracted {num_frames} I-frames for embedding") + + if num_frames == 0: + raise VideoError("No I-frames found in video") + + # Calculate bytes per frame (minus 4 byte length prefix used by _embed_lsb) + pixels_per_frame = video_info.width * video_info.height + bytes_per_frame = (pixels_per_frame * 3) // 8 - 4 # 3 bits per pixel, minus len prefix + + # For simplicity, embed entire payload in first frame if it fits + # This makes extraction straightforward + if len(payload) <= bytes_per_frame: + debug.print(f"Payload fits in single frame ({len(payload)} <= {bytes_per_frame})") + frame_path = frames[0] + + with open(frame_path, "rb") as f: + frame_data = f.read() + + try: + stego_frame, stats, ext = _embed_lsb( + payload, + frame_data, + pixel_key, + bits_per_channel=1, + output_format="PNG", + ) + + with open(frame_path, "wb") as f: + f.write(stego_frame) + + modified_frames = 1 + + except Exception as e: + debug.print(f"Failed to embed in frame: {e}") + raise VideoError(f"Failed to embed in frame: {e}") + else: + # For larger payloads, we need to split across frames + # Each frame stores: 4-byte chunk length + chunk data + debug.print("Splitting payload across multiple frames") + + frames_needed = (len(payload) + bytes_per_frame - 1) // bytes_per_frame + frames_needed = min(frames_needed, num_frames) + + debug.print(f"Using {frames_needed} frames to embed {len(payload)} bytes") + + # For now, use sequential frames for simplicity + modified_frames = 0 + bytes_remaining = len(payload) + payload_offset = 0 + + for frame_idx in range(frames_needed): + if bytes_remaining <= 0: + break + + frame_path = frames[frame_idx] + + with open(frame_path, "rb") as f: + frame_data = f.read() + + chunk_size = min(bytes_remaining, bytes_per_frame) + chunk = payload[payload_offset : payload_offset + chunk_size] + + try: + stego_frame, stats, ext = _embed_lsb( + chunk, + frame_data, + pixel_key, + bits_per_channel=1, + output_format="PNG", + ) + + with open(frame_path, "wb") as f: + f.write(stego_frame) + + modified_frames += 1 + payload_offset += chunk_size + bytes_remaining -= chunk_size + + except Exception as e: + debug.print(f"Failed to embed in frame {frame_idx}: {e}") + raise VideoError(f"Failed to embed in frame {frame_idx}: {e}") + + if progress_file and frame_idx % PROGRESS_INTERVAL == 0: + pct = 10 + int((frame_idx / frames_needed) * 70) + _write_progress(progress_file, pct, 100, "embedding") + + _write_progress(progress_file, 80, 100, "reassembling") + + # Reassemble video with modified frames + stego_video = reassemble_video( + frames, + carrier_video, + fps=1.0, # I-frame only videos use 1 fps + ) + + _write_progress(progress_file, 100, 100, "complete") + + video_stats = VideoEmbedStats( + frames_modified=modified_frames, + total_frames=video_info.total_frames, + capacity_used=len(payload) / (capacity + ENCRYPTION_OVERHEAD), + bytes_embedded=len(payload), + width=video_info.width, + height=video_info.height, + fps=video_info.fps, + duration_seconds=video_info.duration_seconds, + embed_mode=EMBED_MODE_VIDEO_LSB, + codec=VIDEO_OUTPUT_CODEC, + ) + + debug.print( + f"Video LSB embedding complete: {len(stego_video)} bytes, " + f"{modified_frames} frames modified" + ) + + return stego_video, video_stats + + except VideoCapacityError: + raise + except VideoError: + raise + except Exception as e: + debug.exception(e, "embed_in_video_lsb") + raise VideoError(f"Failed to embed data in video: {e}") from e + + +# ============================================================================= +# EXTRACTION +# ============================================================================= + + +@debug.time +def extract_from_video_lsb( + video_data: bytes, + pixel_key: bytes, + progress_file: str | None = None, +) -> bytes | None: + """ + Extract hidden data from video using LSB steganography. + + Extracts I-frames, reads LSBs from the same pseudo-random locations + used during embedding, and reconstructs the payload. + + Args: + video_data: Raw bytes of the stego video file. + pixel_key: 32-byte key (must match the one used for embedding). + progress_file: Optional path for progress JSON. + + Returns: + Extracted payload bytes (without magic/length prefix), or ``None`` + if extraction fails (wrong key, no data, corrupted). + """ + debug.print(f"Video LSB extracting from {len(video_data)} byte video") + debug.data(pixel_key, "Pixel key for extraction") + + try: + # Get video info + video_info = get_video_info(video_data) + debug.print( + f"Video: {video_info.width}x{video_info.height}, " + f"{video_info.i_frame_count} I-frames" + ) + + # Extract I-frames + with tempfile.TemporaryDirectory(prefix="stegasoo_video_extract_") as temp_dir_str: + temp_dir = Path(temp_dir_str) + + _write_progress(progress_file, 5, 100, "extracting_frames") + + frames, _ = extract_frames(video_data, temp_dir, keyframes_only=True) + num_frames = len(frames) + + if num_frames == 0: + debug.print("No I-frames found in video") + return None + + debug.print(f"Extracted {num_frames} I-frames for extraction") + + _write_progress(progress_file, 20, 100, "extracting_data") + + # First, try to extract from frame 0 to get magic and total length + frame_path = frames[0] + with open(frame_path, "rb") as f: + frame_data = f.read() + + first_chunk = _extract_lsb(frame_data, pixel_key, bits_per_channel=1) + if first_chunk is None or len(first_chunk) < 8: + debug.print("Failed to extract initial data from first frame") + return None + + # Check magic bytes + magic = first_chunk[:4] + if magic != VIDEO_MAGIC_LSB: + debug.print(f"Magic mismatch: got {magic!r}, expected {VIDEO_MAGIC_LSB!r}") + return None + + # Get total payload length + total_length = struct.unpack(">I", first_chunk[4:8])[0] + debug.print(f"Total payload length: {total_length} bytes") + + # Sanity check + pixels_per_frame = video_info.width * video_info.height + bytes_per_frame = (pixels_per_frame * 3) // 8 - 4 # minus length prefix + max_possible = bytes_per_frame * num_frames + + if total_length > max_possible or total_length < 1: + debug.print(f"Invalid payload length: {total_length}") + return None + + # If the entire payload fits in the first frame, return it directly + # This matches the simplified single-frame embedding approach + if len(first_chunk) >= 8 + total_length: + debug.print("Payload fits in single frame, extracting directly") + payload = first_chunk[8 : 8 + total_length] + else: + # Multi-frame extraction + debug.print("Multi-frame extraction needed") + frames_needed = (total_length + 8 + bytes_per_frame - 1) // bytes_per_frame + frames_needed = min(frames_needed, num_frames) + + # Extract sequentially (matching the embedding approach) + extracted_chunks = [first_chunk] + for frame_idx in range(1, frames_needed): + frame_path = frames[frame_idx] + with open(frame_path, "rb") as f: + frame_data = f.read() + + chunk = _extract_lsb(frame_data, pixel_key, bits_per_channel=1) + if chunk: + extracted_chunks.append(chunk) + + if progress_file and frame_idx % PROGRESS_INTERVAL == 0: + pct = 20 + int((frame_idx / frames_needed) * 70) + _write_progress(progress_file, pct, 100, "extracting_data") + + # Combine chunks + combined = b"".join(extracted_chunks) + + if len(combined) < 8 + total_length: + debug.print( + f"Insufficient data: have {len(combined) - 8}, need {total_length}" + ) + return None + + payload = combined[8 : 8 + total_length] + + _write_progress(progress_file, 100, 100, "complete") + + debug.print(f"Video LSB successfully extracted {len(payload)} bytes") + return payload + + except Exception as e: + debug.exception(e, "extract_from_video_lsb") + return None diff --git a/src/stegasoo/video_utils.py b/src/stegasoo/video_utils.py new file mode 100644 index 0000000..2fa144e --- /dev/null +++ b/src/stegasoo/video_utils.py @@ -0,0 +1,732 @@ +""" +Stegasoo Video Utilities (v4.4.0) + +Video format detection, frame extraction, and transcoding for video steganography. + +Dependencies: +- ffmpeg binary: Required for all video operations +- numpy: For frame data manipulation +- PIL/Pillow: For frame image handling + +Uses ffmpeg for: +- Format detection and metadata extraction +- I-frame extraction +- Video reassembly with FFV1 lossless codec +""" + +from __future__ import annotations + +import json +import os +import shutil +import subprocess +import tempfile +from pathlib import Path + +from .constants import ( + EMBED_MODE_VIDEO_AUTO, + EMBED_MODE_VIDEO_LSB, + MAX_VIDEO_DURATION, + MAX_VIDEO_FILE_SIZE, + MAX_VIDEO_RESOLUTION, + MIN_VIDEO_RESOLUTION, + VALID_VIDEO_EMBED_MODES, + VIDEO_OUTPUT_CODEC, + VIDEO_OUTPUT_CONTAINER, +) +from .debug import get_logger +from .exceptions import ( + UnsupportedVideoFormatError, + VideoTranscodeError, + VideoValidationError, +) +from .models import ValidationResult, VideoCapacityInfo, VideoInfo + +logger = get_logger(__name__) + + +# ============================================================================= +# FFMPEG AVAILABILITY +# ============================================================================= + + +def has_ffmpeg_support() -> bool: + """Check if ffmpeg is available on the system. + + Returns: + True if ffmpeg is found on PATH, False otherwise. + """ + return shutil.which("ffmpeg") is not None + + +def has_ffprobe_support() -> bool: + """Check if ffprobe is available on the system. + + Returns: + True if ffprobe is found on PATH, False otherwise. + """ + return shutil.which("ffprobe") is not None + + +def _require_ffmpeg() -> None: + """Raise error if ffmpeg is not available.""" + if not has_ffmpeg_support(): + raise VideoTranscodeError( + "ffmpeg is required for video operations. Install ffmpeg on your system." + ) + + +def _require_ffprobe() -> None: + """Raise error if ffprobe is not available.""" + if not has_ffprobe_support(): + raise VideoTranscodeError( + "ffprobe is required for video metadata. Install ffmpeg on your system." + ) + + +# ============================================================================= +# FORMAT DETECTION +# ============================================================================= + + +def detect_video_format(video_data: bytes) -> str: + """Detect video format from magic bytes. + + Examines the first bytes of video data to identify the container format. + + Magic byte signatures: + - MP4/M4V: b"ftyp" at offset 4 + - MKV/WebM: b"\\x1a\\x45\\xdf\\xa3" (EBML header) + - AVI: b"RIFF" at offset 0 + b"AVI " at offset 8 + - MOV: b"ftyp" with "qt" brand or b"moov"/"mdat" early + + Args: + video_data: Raw video file bytes. + + Returns: + Format string: "mp4", "mkv", "webm", "avi", "mov", or "unknown". + """ + if len(video_data) < 12: + logger.debug("detect_video_format: data too short (%d bytes)", len(video_data)) + return "unknown" + + # MP4/M4V/MOV: "ftyp" atom at offset 4 + if video_data[4:8] == b"ftyp": + # Check brand for specific type + brand = video_data[8:12] + if brand in (b"qt ", b"mqt "): + return "mov" + if brand in (b"isom", b"iso2", b"mp41", b"mp42", b"avc1", b"M4V "): + return "mp4" + # Default to mp4 for ftyp containers + return "mp4" + + # MKV/WebM: EBML header + if video_data[:4] == b"\x1a\x45\xdf\xa3": + # Check doctype to distinguish MKV from WebM + # WebM uses "webm" doctype, MKV uses "matroska" + # Simple heuristic: search for doctype string in first 64 bytes + header = video_data[:64] + if b"webm" in header.lower(): + return "webm" + return "mkv" + + # AVI: RIFF....AVI + if video_data[:4] == b"RIFF" and video_data[8:12] == b"AVI ": + return "avi" + + # MOV without ftyp (older format): check for moov/mdat atoms + if video_data[4:8] in (b"moov", b"mdat", b"wide", b"free"): + return "mov" + + return "unknown" + + +# ============================================================================= +# METADATA EXTRACTION +# ============================================================================= + + +def get_video_info(video_data: bytes) -> VideoInfo: + """Extract video metadata from raw video bytes. + + Uses ffprobe to extract detailed video information including + resolution, frame rate, duration, codec, and I-frame count. + + Args: + video_data: Raw video file bytes. + + Returns: + VideoInfo dataclass with video metadata. + + Raises: + UnsupportedVideoFormatError: If the format cannot be detected. + VideoTranscodeError: If metadata extraction fails. + """ + _require_ffprobe() + + fmt = detect_video_format(video_data) + if fmt == "unknown": + raise UnsupportedVideoFormatError( + "Cannot detect video format. Supported: MP4, MKV, WebM, AVI, MOV." + ) + + # Write to temp file for ffprobe + with tempfile.NamedTemporaryFile(suffix=f".{fmt}", delete=False) as f: + f.write(video_data) + temp_path = f.name + + try: + # Get stream info + result = subprocess.run( + [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_streams", + "-select_streams", + "v:0", + temp_path, + ], + capture_output=True, + text=True, + timeout=60, + ) + + if result.returncode != 0: + raise VideoTranscodeError(f"ffprobe failed: {result.stderr}") + + info = json.loads(result.stdout) + + # Extract video stream info + if not info.get("streams"): + raise VideoTranscodeError("No video stream found in file") + + stream = info["streams"][0] + format_info = info.get("format", {}) + + width = int(stream.get("width", 0)) + height = int(stream.get("height", 0)) + codec = stream.get("codec_name", "unknown") + + # Parse frame rate (can be "30/1" or "29.97") + fps_str = stream.get("r_frame_rate", "0/1") + if "/" in fps_str: + num, den = fps_str.split("/") + fps = float(num) / float(den) if float(den) > 0 else 0.0 + else: + fps = float(fps_str) + + # Get duration + duration = float(stream.get("duration", format_info.get("duration", 0))) + + # Get total frames + nb_frames = stream.get("nb_frames") + if nb_frames: + total_frames = int(nb_frames) + else: + # Estimate from duration and fps + total_frames = int(duration * fps) if fps > 0 else 0 + + # Get bitrate + bitrate = None + if format_info.get("bit_rate"): + bitrate = int(format_info["bit_rate"]) + + # Count I-frames using ffprobe + i_frame_count = _count_i_frames(temp_path, timeout=120) + + return VideoInfo( + width=width, + height=height, + fps=fps, + duration_seconds=duration, + total_frames=total_frames, + i_frame_count=i_frame_count, + format=fmt, + codec=codec, + bitrate=bitrate, + ) + + except json.JSONDecodeError as e: + raise VideoTranscodeError(f"Failed to parse ffprobe output: {e}") + except subprocess.TimeoutExpired: + raise VideoTranscodeError("ffprobe timed out") + finally: + os.unlink(temp_path) + + +def _count_i_frames(video_path: str, timeout: int = 120) -> int: + """Count I-frames (keyframes) in a video file. + + Args: + video_path: Path to video file. + timeout: Maximum time in seconds. + + Returns: + Number of I-frames in the video. + """ + try: + result = subprocess.run( + [ + "ffprobe", + "-v", + "quiet", + "-select_streams", + "v:0", + "-show_entries", + "frame=pict_type", + "-of", + "csv=p=0", + video_path, + ], + capture_output=True, + text=True, + timeout=timeout, + ) + + if result.returncode != 0: + logger.warning("Failed to count I-frames: %s", result.stderr) + return 0 + + # Count lines containing 'I' + return sum(1 for line in result.stdout.strip().split("\n") if line.strip() == "I") + + except subprocess.TimeoutExpired: + logger.warning("I-frame counting timed out") + return 0 + except Exception as e: + logger.warning("I-frame counting failed: %s", e) + return 0 + + +# ============================================================================= +# FRAME EXTRACTION +# ============================================================================= + + +def extract_frames( + video_data: bytes, + output_dir: Path | None = None, + keyframes_only: bool = True, +) -> tuple[list[Path], VideoInfo]: + """Extract frames from video as PNG images. + + Uses ffmpeg to extract frames from the video. By default extracts only + I-frames (keyframes) which are more robust to re-encoding. + + Args: + video_data: Raw video file bytes. + output_dir: Directory to save frames (temp dir if None). + keyframes_only: If True, only extract I-frames (keyframes). + + Returns: + Tuple of (list of frame paths sorted by frame number, VideoInfo). + + Raises: + VideoTranscodeError: If frame extraction fails. + """ + _require_ffmpeg() + + fmt = detect_video_format(video_data) + if fmt == "unknown": + raise UnsupportedVideoFormatError( + "Cannot detect video format. Supported: MP4, MKV, WebM, AVI, MOV." + ) + + # Get video info first + video_info = get_video_info(video_data) + + # Create output directory + if output_dir is None: + output_dir = Path(tempfile.mkdtemp(prefix="stegasoo_frames_")) + else: + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # Write video to temp file + with tempfile.NamedTemporaryFile(suffix=f".{fmt}", delete=False) as f: + f.write(video_data) + video_path = f.name + + try: + # Build ffmpeg command + cmd = [ + "ffmpeg", + "-i", + video_path, + "-vsync", + "0", + ] + + if keyframes_only: + # Extract only I-frames + cmd.extend(["-vf", "select='eq(pict_type,I)'"]) + + # Output as PNG with frame number + output_pattern = str(output_dir / "frame_%06d.png") + cmd.extend(["-start_number", "0", output_pattern]) + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=600, # 10 minute timeout + ) + + if result.returncode != 0: + raise VideoTranscodeError(f"Frame extraction failed: {result.stderr}") + + # Collect extracted frames + frames = sorted(output_dir.glob("frame_*.png")) + + if not frames: + raise VideoTranscodeError("No frames were extracted from video") + + logger.info( + "Extracted %d %s from video", + len(frames), + "I-frames" if keyframes_only else "frames", + ) + + return frames, video_info + + except subprocess.TimeoutExpired: + raise VideoTranscodeError("Frame extraction timed out") + finally: + os.unlink(video_path) + + +# ============================================================================= +# VIDEO REASSEMBLY +# ============================================================================= + + +def reassemble_video( + frames: list[Path], + original_video_data: bytes, + output_path: Path | None = None, + fps: float | None = None, + audio_data: bytes | None = None, +) -> bytes: + """Reassemble frames back into a video file. + + Creates a new video from the modified frames using FFV1 lossless codec + in an MKV container. This preserves the embedded data perfectly. + + Args: + frames: List of frame image paths in order. + original_video_data: Original video bytes (for audio track extraction). + output_path: Optional output path (temp file if None). + fps: Frame rate (auto-detected from original if None). + audio_data: Optional audio track data to mux in. + + Returns: + Video file bytes (MKV container with FFV1 codec). + + Raises: + VideoTranscodeError: If reassembly fails. + """ + _require_ffmpeg() + + if not frames: + raise VideoTranscodeError("No frames provided for reassembly") + + # Get original video format + fmt = detect_video_format(original_video_data) + + if fps is None: + # Use a fixed low framerate for I-frame sequences + # since I-frames are sparse (typically 1 per 30-60 frames) + fps = 1.0 # 1 fps for I-frame only videos + + # Create temp directory for work + with tempfile.TemporaryDirectory(prefix="stegasoo_reassemble_") as temp_dir_str: + temp_dir = Path(temp_dir_str) + + # Write original video for audio extraction + original_path = temp_dir / f"original.{fmt}" + original_path.write_bytes(original_video_data) + + # Create frame list file for ffmpeg + frame_list = temp_dir / "frames.txt" + with open(frame_list, "w") as f: + for frame in frames: + # FFmpeg concat format + f.write(f"file '{frame.absolute()}'\n") + f.write(f"duration {1.0 / fps}\n") + + # Output path + if output_path is None: + output_file = temp_dir / f"output.{VIDEO_OUTPUT_CONTAINER}" + else: + output_file = Path(output_path) + + # Build ffmpeg command + cmd = [ + "ffmpeg", + "-y", # Overwrite output + "-f", + "concat", + "-safe", + "0", + "-i", + str(frame_list), + ] + + # Add audio from original video if available + # Check if original has audio + has_audio = _video_has_audio(original_path) + if has_audio: + cmd.extend(["-i", str(original_path)]) + + # Video encoding settings (FFV1 lossless) + cmd.extend( + [ + "-c:v", + VIDEO_OUTPUT_CODEC, + "-level", + "3", # FFV1 level 3 for better compression + "-coder", + "1", # Range coder + "-context", + "1", # Large context + "-slicecrc", + "1", # Error detection + ] + ) + + # Audio settings + if has_audio: + cmd.extend( + [ + "-map", + "0:v", # Video from frames + "-map", + "1:a?", # Audio from original (if exists) + "-c:a", + "copy", # Copy audio without re-encoding + ] + ) + + cmd.append(str(output_file)) + + logger.debug("Running ffmpeg: %s", " ".join(cmd)) + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=600, + ) + + if result.returncode != 0: + raise VideoTranscodeError(f"Video reassembly failed: {result.stderr}") + + # Read output + return output_file.read_bytes() + + +def _video_has_audio(video_path: Path) -> bool: + """Check if a video file has an audio stream. + + Args: + video_path: Path to video file. + + Returns: + True if video has audio, False otherwise. + """ + try: + result = subprocess.run( + [ + "ffprobe", + "-v", + "quiet", + "-select_streams", + "a:0", + "-show_entries", + "stream=index", + "-of", + "csv=p=0", + str(video_path), + ], + capture_output=True, + text=True, + timeout=30, + ) + return bool(result.stdout.strip()) + except Exception: + return False + + +# ============================================================================= +# VALIDATION +# ============================================================================= + + +def validate_video( + video_data: bytes, + name: str = "Video", + check_duration: bool = True, +) -> ValidationResult: + """Validate video data for steganography. + + Checks: + - Not empty + - Not too large (MAX_VIDEO_FILE_SIZE) + - Valid video format (detectable via magic bytes) + - Duration within limits (MAX_VIDEO_DURATION) if check_duration=True + - Resolution within limits (MIN/MAX_VIDEO_RESOLUTION) + + Args: + video_data: Raw video file bytes. + name: Descriptive name for error messages (default: "Video"). + check_duration: Whether to enforce duration limit (default: True). + + Returns: + ValidationResult with video info in details on success. + """ + if not video_data: + return ValidationResult.error(f"{name} is required") + + if len(video_data) > MAX_VIDEO_FILE_SIZE: + size_gb = len(video_data) / (1024**3) + max_gb = MAX_VIDEO_FILE_SIZE / (1024**3) + return ValidationResult.error( + f"{name} too large ({size_gb:.1f} GB). Maximum: {max_gb:.0f} GB" + ) + + # Detect format + fmt = detect_video_format(video_data) + if fmt == "unknown": + return ValidationResult.error( + f"Could not detect {name} format. " "Supported formats: MP4, MKV, WebM, AVI, MOV." + ) + + # Check ffmpeg availability + if not has_ffmpeg_support(): + return ValidationResult.error( + "ffmpeg is required for video processing. Please install ffmpeg." + ) + + # Extract metadata for further validation + try: + info = get_video_info(video_data) + except (VideoTranscodeError, UnsupportedVideoFormatError) as e: + return ValidationResult.error(f"Could not read {name}: {e}") + except Exception as e: + return ValidationResult.error(f"Could not read {name}: {e}") + + # Check duration + if check_duration and info.duration_seconds > MAX_VIDEO_DURATION: + return ValidationResult.error( + f"{name} too long ({info.duration_seconds:.1f}s). " + f"Maximum: {MAX_VIDEO_DURATION}s ({MAX_VIDEO_DURATION // 60} minutes)" + ) + + # Check resolution + if info.width < MIN_VIDEO_RESOLUTION[0] or info.height < MIN_VIDEO_RESOLUTION[1]: + return ValidationResult.error( + f"{name} resolution too small ({info.width}x{info.height}). " + f"Minimum: {MIN_VIDEO_RESOLUTION[0]}x{MIN_VIDEO_RESOLUTION[1]}" + ) + + if info.width > MAX_VIDEO_RESOLUTION[0] or info.height > MAX_VIDEO_RESOLUTION[1]: + return ValidationResult.error( + f"{name} resolution too large ({info.width}x{info.height}). " + f"Maximum: {MAX_VIDEO_RESOLUTION[0]}x{MAX_VIDEO_RESOLUTION[1]}" + ) + + # Check I-frame count + if info.i_frame_count < 1: + return ValidationResult.error(f"{name} has no I-frames (keyframes) for embedding") + + return ValidationResult.ok( + width=info.width, + height=info.height, + fps=info.fps, + duration=info.duration_seconds, + total_frames=info.total_frames, + i_frame_count=info.i_frame_count, + format=info.format, + codec=info.codec, + bitrate=info.bitrate, + ) + + +def require_valid_video(video_data: bytes, name: str = "Video") -> None: + """Validate video, raising VideoValidationError on failure. + + Args: + video_data: Raw video file bytes. + name: Descriptive name for error messages. + + Raises: + VideoValidationError: If validation fails. + """ + result = validate_video(video_data, name) + if not result.is_valid: + raise VideoValidationError(result.error_message) + + +def validate_video_embed_mode(mode: str) -> ValidationResult: + """Validate video embedding mode string. + + Args: + mode: Embedding mode to validate. + + Returns: + ValidationResult with mode in details on success. + """ + valid_modes = VALID_VIDEO_EMBED_MODES | {EMBED_MODE_VIDEO_AUTO} + if mode not in valid_modes: + return ValidationResult.error( + f"Invalid video embed_mode: '{mode}'. " + f"Valid options: {', '.join(sorted(valid_modes))}" + ) + return ValidationResult.ok(mode=mode) + + +# ============================================================================= +# CAPACITY CALCULATION +# ============================================================================= + + +def calculate_video_capacity(video_data: bytes, embed_mode: str = EMBED_MODE_VIDEO_LSB) -> VideoCapacityInfo: + """Calculate steganographic capacity for a video file. + + Capacity is based on I-frames only (keyframes). Each I-frame provides + capacity similar to an image of the same dimensions. + + Args: + video_data: Raw video file bytes. + embed_mode: Embedding mode (currently only video_lsb). + + Returns: + VideoCapacityInfo with capacity details. + """ + info = get_video_info(video_data) + + # Calculate capacity per I-frame + # RGB image: 3 bits per pixel (1 bit per channel) / 8 = 0.375 bytes per pixel + # Subtract overhead per frame for header + pixels_per_frame = info.width * info.height + bytes_per_frame = (pixels_per_frame * 3) // 8 # 3 bits per pixel + + # Total capacity across all I-frames + # Subtract 70 bytes overhead for the encrypted payload header + from .steganography import ENCRYPTION_OVERHEAD + + total_capacity = (bytes_per_frame * info.i_frame_count) - ENCRYPTION_OVERHEAD + + return VideoCapacityInfo( + total_frames=info.total_frames, + i_frames=info.i_frame_count, + usable_capacity_bytes=max(0, total_capacity), + embed_mode=embed_mode, + resolution=(info.width, info.height), + duration_seconds=info.duration_seconds, + ) diff --git a/tests/test_stegasoo.py b/tests/test_stegasoo.py index 3104eaf..1f8c6fc 100644 --- a/tests/test_stegasoo.py +++ b/tests/test_stegasoo.py @@ -451,3 +451,231 @@ class TestEdgeCases: ) assert decoded.message == special_msg + + +# ============================================================================= +# VIDEO STEGANOGRAPHY TESTS (v4.4.0) +# ============================================================================= + + +@pytest.fixture +def test_video_bytes(): + """Create a minimal test video using ffmpeg. + + Creates a 2-second test video with solid color frames. + Returns None if ffmpeg is not available. + """ + import shutil + import subprocess + import tempfile + + if not shutil.which("ffmpeg"): + return None + + with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f: + output_path = f.name + + try: + # Create a simple 2-second video with colored frames + # Using lavfi (libavfilter) to generate test pattern + result = subprocess.run( + [ + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + "color=c=blue:s=320x240:d=2:r=10", + "-c:v", + "libx264", + "-pix_fmt", + "yuv420p", + "-g", + "5", # GOP size - creates I-frames every 5 frames + output_path, + ], + capture_output=True, + timeout=30, + ) + + if result.returncode != 0: + return None + + with open(output_path, "rb") as f: + video_data = f.read() + + return video_data + except Exception: + return None + finally: + import os + + try: + os.unlink(output_path) + except OSError: + pass + + +class TestVideoSupport: + """Test video steganography support detection.""" + + def test_video_support_flag_exists(self): + """HAS_VIDEO_SUPPORT flag should exist.""" + assert hasattr(stegasoo, "HAS_VIDEO_SUPPORT") + assert isinstance(stegasoo.HAS_VIDEO_SUPPORT, bool) + + def test_video_constants_exist(self): + """Video-related constants should exist.""" + assert hasattr(stegasoo, "EMBED_MODE_VIDEO_LSB") + assert hasattr(stegasoo, "EMBED_MODE_VIDEO_AUTO") + + +@pytest.mark.skipif( + not stegasoo.HAS_VIDEO_SUPPORT, + reason="Video support not available (ffmpeg or dependencies missing)", +) +class TestVideoFormatDetection: + """Test video format detection.""" + + def test_detect_video_format_mp4(self, test_video_bytes): + """Should detect MP4 format from magic bytes.""" + if test_video_bytes is None: + pytest.skip("Could not create test video") + + from stegasoo import detect_video_format + + fmt = detect_video_format(test_video_bytes) + assert fmt in ("mp4", "mov") + + def test_detect_video_format_unknown(self): + """Should return 'unknown' for non-video data.""" + from stegasoo import detect_video_format + + fmt = detect_video_format(b"not a video") + assert fmt == "unknown" + + +@pytest.mark.skipif( + not stegasoo.HAS_VIDEO_SUPPORT, + reason="Video support not available (ffmpeg or dependencies missing)", +) +class TestVideoInfo: + """Test video metadata extraction.""" + + def test_get_video_info(self, test_video_bytes): + """Should extract video metadata.""" + if test_video_bytes is None: + pytest.skip("Could not create test video") + + from stegasoo import get_video_info + + info = get_video_info(test_video_bytes) + + assert info.width == 320 + assert info.height == 240 + assert info.fps > 0 + assert info.duration_seconds > 0 + assert info.total_frames > 0 + assert info.format in ("mp4", "mov") + + def test_validate_video(self, test_video_bytes): + """Should validate video data.""" + if test_video_bytes is None: + pytest.skip("Could not create test video") + + from stegasoo import validate_video + + result = validate_video(test_video_bytes, check_duration=False) + + assert result.is_valid + assert result.details.get("format") in ("mp4", "mov") + + +@pytest.mark.skipif( + not stegasoo.HAS_VIDEO_SUPPORT, + reason="Video support not available (ffmpeg or dependencies missing)", +) +class TestVideoCapacity: + """Test video capacity calculation.""" + + def test_calculate_video_capacity(self, test_video_bytes): + """Should calculate steganographic capacity.""" + if test_video_bytes is None: + pytest.skip("Could not create test video") + + from stegasoo import calculate_video_capacity + + capacity_info = calculate_video_capacity(test_video_bytes) + + assert capacity_info.total_frames > 0 + assert capacity_info.i_frames > 0 + assert capacity_info.usable_capacity_bytes > 0 + assert capacity_info.embed_mode == "video_lsb" + assert capacity_info.resolution == (320, 240) + + +@pytest.mark.skipif( + not stegasoo.HAS_VIDEO_SUPPORT, + reason="Video support not available (ffmpeg or dependencies missing)", +) +class TestVideoEncodeDecode: + """Test video steganography round-trip.""" + + def test_video_roundtrip(self, test_video_bytes, ref_bytes): + """Test encoding and decoding a message in video.""" + if test_video_bytes is None: + pytest.skip("Could not create test video") + + from stegasoo import decode_video, encode_video + + message = "Secret video message!" + + # Encode + stego_video, stats = encode_video( + message=message, + reference_photo=ref_bytes, + carrier_video=test_video_bytes, + passphrase=TEST_PASSPHRASE, + pin=TEST_PIN, + ) + + assert stego_video + assert len(stego_video) > 0 + assert stats.frames_modified > 0 + assert stats.codec == "ffv1" # Should use lossless codec + + # Decode + result = decode_video( + stego_video=stego_video, + reference_photo=ref_bytes, + passphrase=TEST_PASSPHRASE, + pin=TEST_PIN, + ) + + assert result.is_text + assert result.message == message + + def test_video_wrong_passphrase_fails(self, test_video_bytes, ref_bytes): + """Decoding with wrong passphrase should fail.""" + if test_video_bytes is None: + pytest.skip("Could not create test video") + + from stegasoo import decode_video, encode_video + + message = "Secret video message!" + + stego_video, _ = encode_video( + message=message, + reference_photo=ref_bytes, + carrier_video=test_video_bytes, + passphrase=TEST_PASSPHRASE, + pin=TEST_PIN, + ) + + with pytest.raises(Exception): + decode_video( + stego_video=stego_video, + reference_photo=ref_bytes, + passphrase="wrong passphrase words here", + pin=TEST_PIN, + )