diff --git a/PLAN-4.1.0.md b/PLAN-4.1.0.md index 0ecaf41..5ed57d4 100644 --- a/PLAN-4.1.0.md +++ b/PLAN-4.1.0.md @@ -429,20 +429,55 @@ Or simpler: detect on startup, update schema automatically (current pattern). - [x] Channel Key QR (Web UI) - added QR generator on About page - [x] CLI Channel Commands - [x] Saved Channel Keys (Web UI) - users can save/manage channel keys -- [ ] Advanced Tools (in progress) +- [x] Advanced Tools - Image Security Toolkit + - [x] CLI: `stegasoo tools capacity/strip/peek/exif` + - [x] API: `/api/tools/capacity`, `/api/tools/peek`, `/api/tools/exif/*` + - [x] WebUI: Tools page with tabbed interface + - [x] EXIF Editor with inline editing, clear all, save/download --- -## Action Item: Architectural Review +## Architectural Improvements (4.1.0) -Review other modules for consistency with the Library → CLI → API → WebUI pattern: +### Consolidated Channel Key Resolution -| Module | Library | CLI | API | WebUI | Notes | -|--------|---------|-----|-----|-------|-------| -| encode | ✓ | ✓ | ✓ | ✓ | Review for consistency | -| decode | ✓ | ✓ | ✓ | ✓ | Review for consistency | -| channel | ✓ | ✓ | - | ✓ | Needs API layer? | -| tools | ✓ | WIP | ✓ | WIP | Building now | -| generate | ✓ | ? | - | ✓ | CLI for credential gen? | +Moved `resolve_channel_key()` from 3 duplicate implementations to single source of truth in `src/stegasoo/channel.py`: + +```python +# Library: src/stegasoo/channel.py +def resolve_channel_key(value, *, file_path=None, no_channel=False) -> str | None: + """Unified channel key resolution - returns None (auto), "" (public), or key.""" + +def get_channel_response_info(channel_key) -> dict: + """Get channel info dict for API/WebUI responses.""" +``` + +Frontends now use thin wrappers that translate exceptions to their context (Click/HTTP). + +### DCT Payload Pre-Check + +Added `will_fit_by_mode()` pre-check to WebUI encode to fail fast with helpful error message instead of cryptic exception deep in DCT processing. + +### EXIF Tools (Library Layer) + +Added to `src/stegasoo/utils.py`: +- `read_image_exif(image_data)` - Read EXIF metadata as dict +- `write_image_exif(image_data, updates)` - Update EXIF fields (JPEG only) + +Dependencies added: `piexif>=1.1.0` + +--- + +## Action Item: Architectural Review ✅ DONE + +Reviewed modules for consistency with Library → CLI → API → WebUI pattern: + +| Module | Library | CLI | API | WebUI | Status | +|--------|---------|-----|-----|-------|--------| +| encode | ✓ | ✓ | ✓ | ✓ | Consistent | +| decode | ✓ | ✓ | ✓ | ✓ | Consistent | +| channel | ✓ | ✓ | ✓ | ✓ | Consolidated resolve_channel_key | +| tools | ✓ | ✓ | ✓ | ✓ | Complete | +| generate | ✓ | ✓ | - | ✓ | CLI has `stegasoo generate` | Priority order: Developer/CLI → API integrator → WebUI end-user diff --git a/frontends/api/main.py b/frontends/api/main.py index c2a3b63..8965671 100644 --- a/frontends/api/main.py +++ b/frontends/api/main.py @@ -49,7 +49,6 @@ from stegasoo import ( generate_credentials, get_channel_status, has_argon2, - has_channel_key, has_dct_support, set_channel_key, validate_channel_key, @@ -406,11 +405,7 @@ def _resolve_channel_key(channel_key: str | None) -> str | None: """ Resolve channel key from API parameter. - Args: - channel_key: API parameter value - - None: Use server-configured key (auto mode) - - "": Public mode (no channel key) - - "XXXX-...": Explicit key + Wrapper around library's resolve_channel_key with HTTP exception handling. Returns: Resolved channel key to pass to encode/decode @@ -418,44 +413,27 @@ def _resolve_channel_key(channel_key: str | None) -> str | None: Raises: HTTPException: If key format is invalid """ - if channel_key is None: - # Auto mode - use server config - return None + from stegasoo.channel import resolve_channel_key - if channel_key == "": - # Public mode - return "" - - # Explicit key - validate format - if not validate_channel_key(channel_key): - raise HTTPException( - 400, "Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX" - ) - - return channel_key + try: + return resolve_channel_key(channel_key) + except (ValueError, FileNotFoundError) as e: + raise HTTPException(400, str(e)) def _get_channel_info(channel_key: str | None) -> tuple[str, str | None]: """ Get channel mode and fingerprint for response. + Uses library's get_channel_response_info for consistent formatting. + Returns: (mode, fingerprint) tuple """ - if channel_key == "": - return "public", None + from stegasoo.channel import get_channel_response_info - if channel_key is not None: - # Explicit key - fingerprint = f"{channel_key[:4]}-••••-••••-••••-••••-••••-••••-{channel_key[-4:]}" - return "private", fingerprint - - # Auto mode - check server config - if has_channel_key(): - status = get_channel_status() - return "private", status.get("fingerprint") - - return "public", None + info = get_channel_response_info(channel_key) + return info["mode"], info.get("fingerprint") # ============================================================================ diff --git a/frontends/cli/main.py b/frontends/cli/main.py index b877e9f..a153405 100644 --- a/frontends/cli/main.py +++ b/frontends/cli/main.py @@ -168,37 +168,25 @@ def resolve_channel_key_option( """ Resolve channel key from CLI options. + Wrapper around library's resolve_channel_key with Click exception handling. + Returns: None: Use server-configured key (auto mode) "": Public mode (no channel key) str: Explicit channel key """ - if no_channel: - return "" # Public mode + from stegasoo.channel import resolve_channel_key - if channel_file: - # Load from file - path = Path(channel_file) - if not path.exists(): - raise click.ClickException(f"Channel key file not found: {channel_file}") - key = path.read_text().strip() - if not validate_channel_key(key): - raise click.ClickException(f"Invalid channel key format in file: {channel_file}") - return key - - if channel: - if channel.lower() == "auto": - return None # Use server config - # Explicit key provided - if not validate_channel_key(channel): - raise click.ClickException( - "Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX\n" - "Generate a new key with: stegasoo channel generate" - ) - return channel - - # Default: use server-configured key (auto mode) - return None + try: + return resolve_channel_key( + value=channel, + file_path=channel_file, + no_channel=no_channel, + ) + except FileNotFoundError as e: + raise click.ClickException(str(e)) + except ValueError as e: + raise click.ClickException(str(e)) def format_channel_status_line(quiet: bool = False) -> str | None: diff --git a/frontends/web/app.py b/frontends/web/app.py index 7b18261..6ec0647 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -277,23 +277,22 @@ def resolve_channel_key_form(channel_key_value: str) -> str: """ Resolve channel key from form input. - Args: - channel_key_value: Form value ('auto', 'none', or explicit key) - - Returns: - Value to pass to subprocess_stego ('auto', 'none', or explicit key) + Wrapper around library's resolve_channel_key for subprocess compatibility. + Returns string values for subprocess_stego ('auto', 'none', or explicit key). """ - if not channel_key_value or channel_key_value == "auto": - return "auto" - elif channel_key_value == "none": - return "none" - else: - # Explicit key - validate format - if validate_channel_key(channel_key_value): - return channel_key_value - else: - # Invalid format, fall back to auto + from stegasoo.channel import resolve_channel_key + + try: + result = resolve_channel_key(channel_key_value) + if result is None: return "auto" + elif result == "": + return "none" + else: + return result + except (ValueError, FileNotFoundError): + # Invalid format, fall back to auto + return "auto" def generate_thumbnail(image_data: bytes, size: tuple = THUMBNAIL_SIZE) -> bytes: @@ -928,6 +927,25 @@ def encode_page(): flash(result.error_message, "error") return render_template("encode.html", has_qrcode_read=HAS_QRCODE_READ) + # Pre-check payload capacity BEFORE encode (fail fast) + from stegasoo.steganography import will_fit_by_mode + + payload_size = len(payload.data) if hasattr(payload, "data") else len(payload.encode("utf-8")) + fit_check = will_fit_by_mode(payload_size, carrier_data, embed_mode=embed_mode) + if not fit_check.get("fits", True): + error_msg = ( + f"Payload too large for {embed_mode.upper()} mode. " + f"Payload: {payload_size:,} bytes, " + f"Capacity: {fit_check.get('capacity', 0):,} bytes" + ) + # Suggest alternative mode + if embed_mode == "dct": + alt_check = will_fit_by_mode(payload_size, carrier_data, embed_mode="lsb") + if alt_check.get("fits"): + error_msg += " - Try LSB mode instead." + flash(error_msg, "error") + return render_template("encode.html", has_qrcode_read=HAS_QRCODE_READ) + # v4.0.0: Include channel_key parameter # Use subprocess-isolated encode to prevent crashes if payload_type == "file" and payload_file and payload_file.filename: @@ -1370,6 +1388,109 @@ def api_tools_peek(): return jsonify({"success": False, "error": str(e)}), 400 +@app.route("/api/tools/exif", methods=["POST"]) +@login_required +def api_tools_exif(): + """Read EXIF metadata from image.""" + from stegasoo.utils import read_image_exif + + image_file = request.files.get("image") + if not image_file: + return jsonify({"success": False, "error": "No image provided"}), 400 + + try: + image_data = image_file.read() + exif = read_image_exif(image_data) + + # Check if it's a JPEG (editable) or not + is_jpeg = image_data[:2] == b"\xff\xd8" + + return jsonify({ + "success": True, + "filename": image_file.filename, + "exif": exif, + "editable": is_jpeg, + "field_count": len(exif), + }) + except Exception as e: + return jsonify({"success": False, "error": str(e)}), 400 + + +@app.route("/api/tools/exif/update", methods=["POST"]) +@login_required +def api_tools_exif_update(): + """Update EXIF fields in image.""" + from stegasoo.utils import write_image_exif + + image_file = request.files.get("image") + if not image_file: + return jsonify({"success": False, "error": "No image provided"}), 400 + + # Get updates from form data + updates_json = request.form.get("updates", "{}") + try: + import json + updates = json.loads(updates_json) + except json.JSONDecodeError: + return jsonify({"success": False, "error": "Invalid updates JSON"}), 400 + + if not updates: + return jsonify({"success": False, "error": "No updates provided"}), 400 + + try: + image_data = image_file.read() + updated_data = write_image_exif(image_data, updates) + + # Return as downloadable file + buffer = io.BytesIO(updated_data) + return send_file( + buffer, + mimetype="image/jpeg", + as_attachment=True, + download_name=f"exif_{image_file.filename}", + ) + except ValueError as e: + return jsonify({"success": False, "error": str(e)}), 400 + except Exception as e: + return jsonify({"success": False, "error": str(e)}), 500 + + +@app.route("/api/tools/exif/clear", methods=["POST"]) +@login_required +def api_tools_exif_clear(): + """Remove all EXIF metadata from image.""" + from stegasoo.utils import strip_image_metadata + + image_file = request.files.get("image") + if not image_file: + return jsonify({"success": False, "error": "No image provided"}), 400 + + # Get desired output format (default to PNG for lossless) + output_format = request.form.get("format", "PNG").upper() + if output_format not in ("PNG", "JPEG", "BMP"): + output_format = "PNG" + + try: + image_data = image_file.read() + clean_data = strip_image_metadata(image_data, output_format=output_format) + + # Determine extension and mimetype + ext_map = {"PNG": ("png", "image/png"), "JPEG": ("jpg", "image/jpeg"), "BMP": ("bmp", "image/bmp")} + ext, mimetype = ext_map.get(output_format, ("png", "image/png")) + + # Return as downloadable file + stem = image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename + buffer = io.BytesIO(clean_data) + return send_file( + buffer, + mimetype=mimetype, + as_attachment=True, + download_name=f"{stem}_clean.{ext}", + ) + except Exception as e: + return jsonify({"success": False, "error": str(e)}), 500 + + # Add these two test routes anywhere in app.py after the app = Flask(...) line: diff --git a/frontends/web/templates/tools.html b/frontends/web/templates/tools.html index 0944b19..6f0383d 100644 --- a/frontends/web/templates/tools.html +++ b/frontends/web/templates/tools.html @@ -4,7 +4,7 @@ {% block content %}
-
+

Image Security Toolkit

@@ -15,8 +15,8 @@
- -
+ +
-

Remove metadata (camera info, GPS, timestamps) from images.

+

View, edit, or remove image metadata (EXIF, GPS, camera info).

-
-
- +
+ +
+ +
+
+ +
+ +
+ +
+
+ 0 metadata fields +
+ Non-JPEG: read-only +
+
- - + + +
+ + + + + + + + + +
FieldValue
+
+ +
+ No metadata found +
+ + +
+ +
+ + +
+
+
@@ -142,6 +187,199 @@ document.getElementById('capacityFile')?.addEventListener('change', async functi } }); +// EXIF Editor +let exifOriginalData = {}; +let exifCurrentData = {}; +let exifEditable = false; +let exifCurrentFile = null; + +document.getElementById('exifFile')?.addEventListener('change', async function() { + const file = this.files[0]; + if (!file) return; + + exifCurrentFile = file; + const formData = new FormData(); + formData.append('image', file); + + try { + const res = await fetch('/api/tools/exif', { method: 'POST', body: formData }); + const data = await res.json(); + + if (data.success) { + // Show thumbnail + const reader = new FileReader(); + reader.onload = e => document.getElementById('exifThumb').src = e.target.result; + reader.readAsDataURL(file); + + // Store data + exifOriginalData = JSON.parse(JSON.stringify(data.exif)); + exifCurrentData = JSON.parse(JSON.stringify(data.exif)); + exifEditable = data.editable; + + // Update UI + document.getElementById('exifFilename').textContent = data.filename; + document.getElementById('exifFieldCount').textContent = data.field_count; + document.getElementById('exifNotEditable').classList.toggle('d-none', data.editable); + document.getElementById('exifEditor').classList.remove('d-none'); + + renderExifTable(); + updateSaveButton(); + } + } catch (err) { + console.error(err); + } +}); + +function renderExifTable() { + const tbody = document.getElementById('exifTable'); + const empty = document.getElementById('exifEmpty'); + const entries = Object.entries(exifCurrentData).sort((a, b) => a[0].localeCompare(b[0])); + + if (entries.length === 0) { + tbody.innerHTML = ''; + empty.classList.remove('d-none'); + return; + } + + empty.classList.add('d-none'); + tbody.innerHTML = entries.map(([key, value]) => { + // Format value for display + let displayVal = value; + if (typeof value === 'object') { + displayVal = JSON.stringify(value); + } + if (typeof displayVal === 'string' && displayVal.length > 60) { + displayVal = displayVal.substring(0, 57) + '...'; + } + + // Check if field is editable (common string fields) + const editableFields = ['Make', 'Model', 'Software', 'Artist', 'Copyright', 'ImageDescription', 'DateTime', 'DateTimeOriginal', 'DateTimeDigitized', 'UserComment', 'LensMake', 'LensModel']; + const canEdit = exifEditable && editableFields.includes(key) && typeof value === 'string'; + + return ` + + ${key} + + ${canEdit + ? `` + : `${displayVal}` + } + + + ${canEdit + ? `` + : '' + } + + + `; + }).join(''); + + // Add event listeners for edits + tbody.querySelectorAll('.exif-input').forEach(input => { + input.addEventListener('input', function() { + exifCurrentData[this.dataset.field] = this.value; + updateSaveButton(); + }); + }); + + tbody.querySelectorAll('.exif-delete').forEach(btn => { + btn.addEventListener('click', function() { + delete exifCurrentData[this.dataset.field]; + renderExifTable(); + updateSaveButton(); + }); + }); +} + +function updateSaveButton() { + const changed = JSON.stringify(exifCurrentData) !== JSON.stringify(exifOriginalData); + document.getElementById('exifSave').disabled = !changed; +} + +// Clear All button +document.getElementById('exifClearAll')?.addEventListener('click', async function() { + if (!exifCurrentFile) return; + if (!confirm('Remove all metadata from this image?')) return; + + const formData = new FormData(); + formData.append('image', exifCurrentFile); + formData.append('format', 'PNG'); + + try { + const res = await fetch('/api/tools/exif/clear', { method: 'POST', body: formData }); + if (res.ok) { + const blob = await res.blob(); + const url = URL.createObjectURL(blob); + const a = document.createElement('a'); + a.href = url; + a.download = res.headers.get('Content-Disposition')?.split('filename=')[1]?.replace(/"/g, '') || 'clean.png'; + a.click(); + URL.revokeObjectURL(url); + } + } catch (err) { + console.error(err); + } +}); + +// Discard button +document.getElementById('exifDiscard')?.addEventListener('click', function() { + exifCurrentData = JSON.parse(JSON.stringify(exifOriginalData)); + renderExifTable(); + updateSaveButton(); +}); + +// Save button +document.getElementById('exifSave')?.addEventListener('click', async function() { + if (!exifCurrentFile || !exifEditable) return; + + // Find what changed + const updates = {}; + for (const [key, val] of Object.entries(exifCurrentData)) { + if (exifOriginalData[key] !== val) { + updates[key] = val; + } + } + // Mark deleted fields + for (const key of Object.keys(exifOriginalData)) { + if (!(key in exifCurrentData)) { + updates[key] = null; + } + } + + if (Object.keys(updates).length === 0) return; + + const formData = new FormData(); + formData.append('image', exifCurrentFile); + formData.append('updates', JSON.stringify(updates)); + + try { + const res = await fetch('/api/tools/exif/update', { method: 'POST', body: formData }); + if (res.ok) { + const blob = await res.blob(); + const url = URL.createObjectURL(blob); + const a = document.createElement('a'); + a.href = url; + a.download = res.headers.get('Content-Disposition')?.split('filename=')[1]?.replace(/"/g, '') || 'updated.jpg'; + a.click(); + URL.revokeObjectURL(url); + + // Update original to match current + exifOriginalData = JSON.parse(JSON.stringify(exifCurrentData)); + updateSaveButton(); + } else { + const err = await res.json(); + alert(err.error || 'Failed to save'); + } + } catch (err) { + console.error(err); + alert('Failed to save changes'); + } +}); + // Peek (Header Detection) document.getElementById('peekFile')?.addEventListener('change', async function() { const file = this.files[0]; diff --git a/pyproject.toml b/pyproject.toml index 515bc80..9b66a24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,8 @@ dct = [ ] cli = [ "click>=8.0.0", - "qrcode>=7.30" + "qrcode>=7.30", + "piexif>=1.1.0", ] compression = [ "lz4>=4.0.0", @@ -61,6 +62,7 @@ web = [ "gunicorn>=21.0.0", "qrcode>=7.3.0", "pyzbar>=0.1.9", + "piexif>=1.1.0", # Include DCT support for web UI "numpy>=2.0.0", "scipy>=1.10.0", diff --git a/src/stegasoo/channel.py b/src/stegasoo/channel.py index d81c70d..ec77bfe 100644 --- a/src/stegasoo/channel.py +++ b/src/stegasoo/channel.py @@ -372,6 +372,124 @@ def has_channel_key() -> bool: return get_channel_key() is not None +def resolve_channel_key( + value: str | None = None, + *, + file_path: str | Path | None = None, + no_channel: bool = False, +) -> str | None: + """ + Resolve a channel key from user input (unified for all frontends). + + This consolidates channel key resolution logic used by CLI, API, and WebUI. + + Args: + value: Input value: + - 'auto' or None: Use server-configured key + - 'none' or '': Public mode (no channel key) + - explicit key: Validate and use + file_path: Path to file containing channel key + no_channel: If True, return "" for public mode (overrides value) + + Returns: + None: Use server-configured key (auto mode) + "": Public mode (no channel key) + str: Explicit valid channel key + + Raises: + ValueError: If key format is invalid + FileNotFoundError: If file_path doesn't exist + + Example: + >>> resolve_channel_key("auto") # -> None + >>> resolve_channel_key("none") # -> "" + >>> resolve_channel_key(no_channel=True) # -> "" + >>> resolve_channel_key("ABCD-1234-...") # -> "ABCD-1234-..." + >>> resolve_channel_key(file_path="key.txt") # reads from file + """ + debug.print(f"resolve_channel_key: value={value}, file_path={file_path}, no_channel={no_channel}") + + # no_channel flag takes precedence + if no_channel: + debug.print("resolve_channel_key: public mode (no_channel=True)") + return "" + + # Read from file if provided + if file_path: + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Channel key file not found: {file_path}") + key = path.read_text().strip() + if not validate_channel_key(key): + raise ValueError(f"Invalid channel key format in file: {file_path}") + debug.print(f"resolve_channel_key: from file -> {get_channel_fingerprint(key)}") + return format_channel_key(key) + + # Handle value string + if value is None or value.lower() == "auto": + debug.print("resolve_channel_key: auto mode (server config)") + return None + + if value == "" or value.lower() == "none": + debug.print("resolve_channel_key: public mode (explicit none)") + return "" + + # Explicit key - validate + if validate_channel_key(value): + formatted = format_channel_key(value) + debug.print(f"resolve_channel_key: explicit key -> {get_channel_fingerprint(formatted)}") + return formatted + + raise ValueError( + "Invalid channel key format. Expected: XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX\n" + "Generate a new key with: stegasoo channel generate" + ) + + +def get_channel_response_info(channel_key: str | None) -> dict: + """ + Get channel info for API/WebUI responses. + + Args: + channel_key: Resolved channel key (None=auto, ""=public, str=explicit) + + Returns: + Dict with mode, fingerprint, and display info + + Example: + >>> info = get_channel_response_info("ABCD-1234-...") + >>> info['mode'] + 'explicit' + """ + if channel_key is None: + # Auto mode - check server config + server_key = get_channel_key() + if server_key: + return { + "mode": "private", + "fingerprint": get_channel_fingerprint(server_key), + "source": "server", + } + return { + "mode": "public", + "fingerprint": None, + "source": "server", + } + + if channel_key == "": + return { + "mode": "public", + "fingerprint": None, + "source": "explicit", + } + + return { + "mode": "private", + "fingerprint": get_channel_fingerprint(channel_key), + "source": "explicit", + } + + # ============================================================================= # CLI SUPPORT # ============================================================================= diff --git a/src/stegasoo/cli.py b/src/stegasoo/cli.py index b1fb72d..6c66a83 100644 --- a/src/stegasoo/cli.py +++ b/src/stegasoo/cli.py @@ -887,6 +887,83 @@ def tools_peek(image, as_json): click.echo() +@tools.command("exif") +@click.argument("image", type=click.Path(exists=True)) +@click.option("--clear", is_flag=True, help="Remove all EXIF metadata") +@click.option("--set", "set_fields", multiple=True, help="Set EXIF field (e.g. --set Artist=John)") +@click.option("-o", "--output", type=click.Path(), help="Output file (required for modifications)") +@click.option("--json", "as_json", is_flag=True, help="Output as JSON") +def tools_exif(image, clear, set_fields, output, as_json): + """View or edit EXIF metadata. + + Examples: + + stegasoo tools exif photo.jpg + + stegasoo tools exif photo.jpg --clear -o clean.jpg + + stegasoo tools exif photo.jpg --set Artist="John Doe" -o updated.jpg + """ + from .utils import read_image_exif, strip_image_metadata, write_image_exif + + with open(image, "rb") as f: + image_data = f.read() + + # View mode (no modifications) + if not clear and not set_fields: + exif = read_image_exif(image_data) + + if as_json: + click.echo(json.dumps(exif, indent=2, default=str)) + else: + click.echo(f"\n EXIF Metadata: {Path(image).name}") + click.echo(f" {'─' * 45}") + if not exif: + click.echo(" No EXIF metadata found") + else: + for key, value in sorted(exif.items()): + # Skip complex nested structures for display + if isinstance(value, dict): + click.echo(f" {key}: [complex data]") + elif isinstance(value, list): + click.echo(f" {key}: {value}") + else: + # Truncate long values + str_val = str(value) + if len(str_val) > 50: + str_val = str_val[:47] + "..." + click.echo(f" {key}: {str_val}") + click.echo() + return + + # Modification mode - require output file + if not output: + raise click.UsageError("Output file required for modifications (use -o/--output)") + + if clear: + # Strip all metadata + clean_data = strip_image_metadata(image_data, output_format="JPEG") + with open(output, "wb") as f: + f.write(clean_data) + click.echo(f"Cleared EXIF metadata, saved to: {output}") + elif set_fields: + # Parse field=value pairs + updates = {} + for field in set_fields: + if "=" not in field: + raise click.UsageError(f"Invalid format: {field} (use Field=Value)") + key, val = field.split("=", 1) + updates[key.strip()] = val.strip() + + try: + updated_data = write_image_exif(image_data, updates) + with open(output, "wb") as f: + f.write(updated_data) + click.echo(f"Updated {len(updates)} EXIF field(s), saved to: {output}") + except ValueError as e: + raise click.UsageError(str(e)) + + def main(): """Entry point for CLI.""" cli(obj={}) diff --git a/src/stegasoo/utils.py b/src/stegasoo/utils.py index 9e99c1f..e47c25d 100644 --- a/src/stegasoo/utils.py +++ b/src/stegasoo/utils.py @@ -18,6 +18,159 @@ from .constants import DAY_NAMES from .debug import debug +def read_image_exif(image_data: bytes) -> dict: + """ + Read EXIF metadata from an image. + + Args: + image_data: Raw image bytes + + Returns: + Dict with EXIF fields (tag names as keys) + + Example: + >>> exif = read_image_exif(photo_bytes) + >>> print(exif.get('Make')) # Camera manufacturer + """ + from PIL.ExifTags import GPSTAGS, TAGS + + result = {} + + try: + img = Image.open(io.BytesIO(image_data)) + exif_data = img._getexif() + + if exif_data: + for tag_id, value in exif_data.items(): + tag = TAGS.get(tag_id, str(tag_id)) + + # Handle GPS data specially + if tag == "GPSInfo" and isinstance(value, dict): + gps = {} + for gps_tag_id, gps_value in value.items(): + gps_tag = GPSTAGS.get(gps_tag_id, str(gps_tag_id)) + # Convert tuples/IFDRational to simple types + if hasattr(gps_value, "numerator"): + gps[gps_tag] = float(gps_value) + elif isinstance(gps_value, tuple): + gps[gps_tag] = [ + float(v) if hasattr(v, "numerator") else v + for v in gps_value + ] + else: + gps[gps_tag] = gps_value + result[tag] = gps + # Convert IFDRational to float + elif hasattr(value, "numerator"): + result[tag] = float(value) + # Convert bytes to string if possible + elif isinstance(value, bytes): + try: + result[tag] = value.decode("utf-8", errors="replace").strip("\x00") + except Exception: + result[tag] = f"<{len(value)} bytes>" + # Handle tuples of IFDRational + elif isinstance(value, tuple) and value and hasattr(value[0], "numerator"): + result[tag] = [float(v) for v in value] + else: + result[tag] = value + + img.close() + except Exception as e: + debug.print(f"Error reading EXIF: {e}") + + return result + + +def write_image_exif(image_data: bytes, exif_updates: dict) -> bytes: + """ + Write/update EXIF metadata in a JPEG image. + + Args: + image_data: Raw JPEG image bytes + exif_updates: Dict of EXIF fields to update (tag names as keys) + Use None as value to delete a field + + Returns: + Image bytes with updated EXIF + + Raises: + ValueError: If image is not JPEG or piexif not available + + Example: + >>> updated = write_image_exif(jpeg_bytes, {"Artist": "John Doe"}) + """ + try: + import piexif + except ImportError: + raise ValueError("piexif required for EXIF editing: pip install piexif") + + # Verify it's a JPEG + if not image_data[:2] == b"\xff\xd8": + raise ValueError("EXIF editing only supported for JPEG images") + + debug.print(f"Writing EXIF updates: {list(exif_updates.keys())}") + + # Load existing EXIF + try: + exif_dict = piexif.load(image_data) + except Exception: + # No existing EXIF, start fresh + exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None} + + # Map common tag names to piexif IFD and tag IDs + tag_mapping = { + # 0th IFD (main image) + "Make": (piexif.ImageIFD.Make, "0th"), + "Model": (piexif.ImageIFD.Model, "0th"), + "Software": (piexif.ImageIFD.Software, "0th"), + "Artist": (piexif.ImageIFD.Artist, "0th"), + "Copyright": (piexif.ImageIFD.Copyright, "0th"), + "ImageDescription": (piexif.ImageIFD.ImageDescription, "0th"), + "DateTime": (piexif.ImageIFD.DateTime, "0th"), + "Orientation": (piexif.ImageIFD.Orientation, "0th"), + # Exif IFD + "DateTimeOriginal": (piexif.ExifIFD.DateTimeOriginal, "Exif"), + "DateTimeDigitized": (piexif.ExifIFD.DateTimeDigitized, "Exif"), + "UserComment": (piexif.ExifIFD.UserComment, "Exif"), + "ExposureTime": (piexif.ExifIFD.ExposureTime, "Exif"), + "FNumber": (piexif.ExifIFD.FNumber, "Exif"), + "ISOSpeedRatings": (piexif.ExifIFD.ISOSpeedRatings, "Exif"), + "FocalLength": (piexif.ExifIFD.FocalLength, "Exif"), + "LensMake": (piexif.ExifIFD.LensMake, "Exif"), + "LensModel": (piexif.ExifIFD.LensModel, "Exif"), + } + + for tag_name, value in exif_updates.items(): + if tag_name not in tag_mapping: + debug.print(f"Unknown EXIF tag: {tag_name}, skipping") + continue + + tag_id, ifd = tag_mapping[tag_name] + + if value is None: + # Delete the tag + if tag_id in exif_dict[ifd]: + del exif_dict[ifd][tag_id] + debug.print(f"Deleted EXIF tag: {tag_name}") + else: + # Set the tag (encode strings as bytes) + if isinstance(value, str): + value = value.encode("utf-8") + exif_dict[ifd][tag_id] = value + debug.print(f"Set EXIF tag: {tag_name}") + + # Serialize EXIF and insert into image + exif_bytes = piexif.dump(exif_dict) + output = io.BytesIO() + img = Image.open(io.BytesIO(image_data)) + img.save(output, "JPEG", exif=exif_bytes, quality=95) + output.seek(0) + + debug.print(f"EXIF updated: {len(image_data)} -> {len(output.getvalue())} bytes") + return output.getvalue() + + def strip_image_metadata(image_data: bytes, output_format: str = "PNG") -> bytes: """ Remove all metadata (EXIF, ICC profiles, etc.) from an image.