Fix all power-user review issues (FR-01 through FR-12)
Some checks failed
CI / lint (push) Failing after 12s
CI / typecheck (push) Failing after 12s

FR-01: Fix data directory default from ~/.fieldwitness to ~/.fwmetadata
FR-02/05/07: Accept all file types for attestation (not just images)
  - Web UI, CLI, and batch now accept PDFs, CSVs, audio, video, etc.
  - Perceptual hashing for images, SHA-256-only for everything else
FR-03: Implement C2PA import path + CLI commands (export/verify/import/show)
FR-04: Fix GPS downsampling bias (math.floor → round)
FR-06: Add HTML/PDF evidence summaries for lawyers
  - Always generates summary.html, optional summary.pdf via xhtml2pdf
FR-08: Fix CLI help text ("FieldWitness -- FieldWitness" artifact)
FR-09: Centralize stray paths (trusted_keys, carrier_history, last_backup)
FR-10: Add 67 C2PA bridge tests (vendor assertions, cert, GPS, export)
FR-12: Add Tor onion service support for source drop box
  - fieldwitness serve --tor flag, persistent/transient modes
  - Killswitch covers hidden service keys

Also: bonus fix for attest/api.py hardcoded path bypassing paths.py

224 tests passing (67 new).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee 2026-04-02 20:10:37 -04:00
parent 3a9cb17a5a
commit 5b0d90eeaf
27 changed files with 3140 additions and 186 deletions

View File

@ -461,11 +461,19 @@ A malicious or compromised relay could suppress specific records. The design mit
to use the relay only for transport, never as an authoritative source -- but no to use the relay only for transport, never as an authoritative source -- but no
verification mechanism is implemented. verification mechanism is implemented.
**L7: Drop box source anonymity is limited.** The drop box does not log source IP addresses **L7: Drop box source anonymity is limited -- Tor support available.** The drop box does
in attestation records or require accounts, but it does not anonymize the source's network not log source IP addresses in attestation records or require accounts. FieldWitness now
connection. A source's IP is visible to the Tier 2 server operator in web server access includes built-in Tor hidden service support: starting the server with `--tor` exposes the
logs. Organizations providing source protection should use Tor for source access and may drop box as a `.onion` address so that source IPs are never visible to the server operator.
wish to configure the web server to not log IP addresses.
Without `--tor`, a source's IP address is visible in web server access logs.
Organizations with source-protection requirements should use the `--tor` flag and instruct
sources to access the drop box only via Tor Browser. Operators should also configure any
reverse proxy to suppress access logging for `/dropbox/upload/` paths.
Even with Tor, timing analysis and traffic correlation attacks are possible at the network
level. Tor eliminates IP exposure at the server; it does not protect against a global
adversary correlating traffic timing. See `docs/source-dropbox.md` for setup instructions.
**L8: Steganalysis resistance is not guaranteed.** The steganography backend includes a **L8: Steganalysis resistance is not guaranteed.** The steganography backend includes a
steganalysis module (`stego/steganalysis.py`) for estimating detection resistance, but steganalysis module (`stego/steganalysis.py`) for estimating detection resistance, but

View File

@ -208,13 +208,142 @@ Expired tokens are cleaned up automatically on every admin page load.
### Running as a Tor hidden service ### Running as a Tor hidden service
For maximum source protection, run FieldWitness as a Tor hidden service: FieldWitness has built-in support for exposing the drop box as a Tor hidden service
(a `.onion` address). When a source accesses the drop box over Tor, the server never
sees their real IP address -- Tor's onion routing ensures that only the Tor network knows
both the source and the destination.
1. Install Tor on the server #### Step 1: Install and configure Tor
2. Configure a hidden service in `torrc` pointing to `127.0.0.1:5000`
3. Share the `.onion` URL instead of a LAN address ```bash
4. The source's real IP is never visible to the server # Debian / Ubuntu
sudo apt install tor
# macOS (Homebrew)
brew install tor
# Fedora / RHEL
sudo dnf install tor
```
Enable the control port so FieldWitness can manage the hidden service. Add these lines
to `/etc/tor/torrc`:
```
ControlPort 9051
CookieAuthentication 1
```
Then restart Tor:
```bash
sudo systemctl restart tor
```
**Authentication note:** `CookieAuthentication 1` lets FieldWitness authenticate using the
cookie file that Tor creates automatically. Alternatively, use a password:
```
ControlPort 9051
HashedControlPassword <hash produced by: tor --hash-password yourpassword>
```
#### Step 2: Install the stem library
stem is an optional FieldWitness dependency. Install it alongside the fieldkit extra:
```bash
pip install 'fieldwitness[tor]'
# or, if already installed:
pip install stem>=1.8.0
```
#### Step 3: Start FieldWitness with --tor
```bash
fieldwitness serve --host 127.0.0.1 --port 5000 --tor
```
With a custom control port or password:
```bash
fieldwitness serve --tor --tor-control-port 9051 --tor-password yourpassword
```
For a one-off intake session where a fixed address is not needed:
```bash
fieldwitness serve --tor --tor-transient
```
On startup, FieldWitness will print the `.onion` address:
```
============================================================
TOR HIDDEN SERVICE ACTIVE
============================================================
.onion address : abc123def456ghi789jkl012mno345pq.onion
Drop box URL : http://abc123def456ghi789jkl012mno345pq.onion/dropbox/upload/<token>
Persistent : yes (key saved to ~/.fwmetadata/fieldkit/tor/)
============================================================
Sources must use Tor Browser to access the .onion URL.
Share the drop box upload URL over a secure channel (Signal, in person).
============================================================
```
#### Step 4: Share the .onion drop box URL
Create a drop box token as usual (see Step 2 above), then construct the `.onion` upload URL:
```
http://<onion-address>/dropbox/upload/<token>
```
Share this URL with the source over Signal or in person. The source opens it in
**Tor Browser** -- not in a regular browser.
#### Persistent vs. transient hidden services
| Mode | Command | Behaviour |
|---|---|---|
| **Persistent** (default) | `--tor` | Same `.onion` address on every restart. Key stored at `~/.fwmetadata/fieldkit/tor/hidden_service/`. |
| **Transient** | `--tor --tor-transient` | New `.onion` address each run. No key written to disk. |
Use persistent mode when you want sources to bookmark the address or share it in advance.
Use transient mode for single-session intake where address continuity does not matter.
#### Source instructions
Tell sources:
1. Install **Tor Browser** from [torproject.org](https://www.torproject.org/download/).
2. Open Tor Browser and paste the full `.onion` URL into the address bar.
3. Do not open the `.onion` URL in a regular browser -- your real IP will be visible.
4. JavaScript must be enabled for the SHA-256 fingerprint feature to work.
Set the Security Level to "Standard" in Tor Browser (shield icon in the toolbar).
5. Save the SHA-256 fingerprints shown before clicking Upload.
6. Save the receipt codes shown after upload.
#### Logging and access logs
Even with Tor, access logs on the server can record that *someone* connected -- the
connection appears to come from a Tor exit relay (for regular Tor) or from a Tor internal
address (for hidden services). For hidden services, the server sees no IP at all; the
connection comes from the Tor daemon on localhost.
**Recommendation:** Set the web server or reverse proxy to not log access requests to
`/dropbox/upload/` paths. If using FieldWitness directly (Waitress), access log output
goes to stdout; redirect it to `/dev/null` or omit the log handler.
#### Killswitch and the hidden service key
The persistent hidden service key is stored under `~/.fwmetadata/fieldkit/tor/hidden_service/`
and is treated as key material. The FieldWitness killswitch (`fieldwitness fieldkit purge`)
destroys this directory during both `KEYS_ONLY` and `ALL` purge scopes. After a purge,
the `.onion` address cannot be linked to the operator even if the server hardware is seized.
> **Warning:** Even with Tor, timing analysis and traffic correlation attacks are possible > **Warning:** Even with Tor, timing analysis and traffic correlation attacks are possible
> at the network level. The drop box protects source identity at the application layer; > at the network level. The drop box protects source identity at the application layer;
> network-layer protection requires operational discipline beyond what software can provide. > network-layer protection requires operational discipline beyond what software can provide.
> Tor is not a silver bullet -- but it removes the most direct risk (IP address exposure)
> that limitation L7 in the threat model describes.

View File

@ -1,10 +1,13 @@
""" """
Attestation blueprint attest and verify images via Attest. Attestation blueprint attest and verify files via Attest.
Wraps attest's attestation and verification libraries to provide: Wraps attest's attestation and verification libraries to provide:
- Image attestation: upload hash sign store in append-only log - File attestation: upload hash sign store in append-only log
- Image verification: upload hash search log display matches - File verification: upload hash search log display matches
- Verification receipt: same as verify but returns a downloadable JSON file - Verification receipt: same as verify but returns a downloadable JSON file
Supports any file type. Perceptual hashing (phash, dhash) is available for
image files only. Non-image files are attested by SHA-256 hash.
""" """
from __future__ import annotations from __future__ import annotations
@ -85,25 +88,45 @@ def _wrap_in_chain(attest_record, private_key, metadata: dict | None = None):
) )
def _allowed_image(filename: str) -> bool: _ALLOWED_EXTENSIONS: frozenset[str] = frozenset({
# Images
"png", "jpg", "jpeg", "bmp", "gif", "webp", "tiff", "tif", "heic", "heif", "raw",
# Documents
"pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", "odt", "ods", "odp",
"txt", "rtf", "csv", "tsv", "json", "xml", "html", "htm",
# Audio
"mp3", "wav", "m4a", "aac", "ogg", "flac", "opus", "wma",
# Video
"mp4", "mov", "avi", "mkv", "webm", "m4v", "wmv",
# Archives / data
"zip", "tar", "gz", "bz2", "xz", "7z",
# Sensor / scientific data
"gpx", "kml", "geojson", "npy", "parquet", "bin", "dat",
})
_IMAGE_EXTENSIONS: frozenset[str] = frozenset({
"png", "jpg", "jpeg", "bmp", "gif", "webp", "tiff", "tif", "heic", "heif",
})
def _allowed_file(filename: str) -> bool:
"""Return True if the filename has an extension on the allowlist."""
if not filename or "." not in filename: if not filename or "." not in filename:
return False return False
return filename.rsplit(".", 1)[1].lower() in { return filename.rsplit(".", 1)[1].lower() in _ALLOWED_EXTENSIONS
"png",
"jpg",
"jpeg", def _is_image_file(filename: str) -> bool:
"bmp", """Return True if the filename is a known image type."""
"gif", if not filename or "." not in filename:
"webp", return False
"tiff", return filename.rsplit(".", 1)[1].lower() in _IMAGE_EXTENSIONS
"tif",
}
@bp.route("/attest", methods=["GET", "POST"]) @bp.route("/attest", methods=["GET", "POST"])
@login_required @login_required
def attest(): def attest():
"""Create a provenance attestation for an image.""" """Create a provenance attestation for a file."""
# Check identity exists # Check identity exists
private_key = _get_private_key() private_key = _get_private_key()
has_identity = private_key is not None has_identity = private_key is not None
@ -116,17 +139,22 @@ def attest():
) )
return redirect(url_for("attest.attest")) return redirect(url_for("attest.attest"))
image_file = request.files.get("image") evidence_file = request.files.get("image")
if not image_file or not image_file.filename: if not evidence_file or not evidence_file.filename:
flash("Please select an image to attest.", "error") flash("Please select a file to attest.", "error")
return redirect(url_for("attest.attest")) return redirect(url_for("attest.attest"))
if not _allowed_image(image_file.filename): if not _allowed_file(evidence_file.filename):
flash("Unsupported image format. Use PNG, JPG, WebP, TIFF, or BMP.", "error") flash(
"Unsupported file type. Supported types include images, documents, "
"audio, video, CSV, and sensor data files.",
"error",
)
return redirect(url_for("attest.attest")) return redirect(url_for("attest.attest"))
try: try:
image_data = image_file.read() file_data = evidence_file.read()
is_image = _is_image_file(evidence_file.filename)
# Build optional metadata # Build optional metadata
metadata = {} metadata = {}
@ -148,31 +176,36 @@ def attest():
auto_exif = request.form.get("auto_exif", "on") == "on" auto_exif = request.form.get("auto_exif", "on") == "on"
strip_device = request.form.get("strip_device", "on") == "on" strip_device = request.form.get("strip_device", "on") == "on"
# Extract-then-classify: get evidentiary metadata before attestation # Extract-then-classify: get evidentiary metadata before attestation.
# so user can control what's included # Only applicable to image files — silently skip for other types.
if auto_exif and strip_device: if is_image and auto_exif and strip_device:
from fieldwitness.metadata import extract_and_classify try:
from fieldwitness.metadata import extract_and_classify
extraction = extract_and_classify(image_data) extraction = extract_and_classify(file_data)
# Merge evidentiary fields (GPS, timestamp) but exclude # Merge evidentiary fields (GPS, timestamp) but exclude
# dangerous device fields (serial, firmware version) # dangerous device fields (serial, firmware version)
for key, value in extraction.evidentiary.items(): for key, value in extraction.evidentiary.items():
if key not in metadata: # User metadata takes precedence if key not in metadata: # User metadata takes precedence
if hasattr(value, "isoformat"): if hasattr(value, "isoformat"):
metadata[f"exif_{key}"] = value.isoformat() metadata[f"exif_{key}"] = value.isoformat()
elif isinstance(value, dict): elif isinstance(value, dict):
metadata[f"exif_{key}"] = value metadata[f"exif_{key}"] = value
else: else:
metadata[f"exif_{key}"] = str(value) metadata[f"exif_{key}"] = str(value)
except Exception:
pass # EXIF extraction is best-effort
# Create the attestation # Create the attestation. create_attestation() calls hash_image()
# internally; for non-image files we pre-compute hashes via
# hash_file() and use create_attestation_from_hashes() instead.
from fieldwitness.attest.attestation import create_attestation from fieldwitness.attest.attestation import create_attestation
attestation = create_attestation( attestation = create_attestation(
image_data=image_data, image_data=file_data,
private_key=private_key, private_key=private_key,
metadata=metadata if metadata else None, metadata=metadata if metadata else None,
auto_exif=auto_exif and not strip_device, # Full EXIF only if not stripping device auto_exif=is_image and auto_exif and not strip_device,
) )
# Store in the append-only log # Store in the append-only log
@ -188,7 +221,7 @@ def attest():
logging.getLogger(__name__).warning("Chain wrapping failed: %s", e) logging.getLogger(__name__).warning("Chain wrapping failed: %s", e)
flash( flash(
"Attestation saved, but chain wrapping failed. " "Check chain configuration.", "Attestation saved, but chain wrapping failed. Check chain configuration.",
"warning", "warning",
) )
@ -225,7 +258,8 @@ def attest():
location_name=metadata.get("location_name", ""), location_name=metadata.get("location_name", ""),
exif_metadata=record.metadata, exif_metadata=record.metadata,
index=index, index=index,
filename=image_file.filename, filename=evidence_file.filename,
is_image=is_image,
chain_index=chain_record.chain_index if chain_record else None, chain_index=chain_record.chain_index if chain_record else None,
) )
@ -239,15 +273,13 @@ def attest():
@bp.route("/attest/batch", methods=["POST"]) @bp.route("/attest/batch", methods=["POST"])
@login_required @login_required
def attest_batch(): def attest_batch():
"""Batch attestation — accepts multiple image files. """Batch attestation — accepts multiple files of any supported type.
Returns JSON with results for each file (success/skip/error). Returns JSON with results for each file (success/skip/error).
Skips images already attested (by SHA-256 match). Skips files already attested (by SHA-256 match).
""" """
import hashlib import hashlib
from fieldwitness.attest.hashing import hash_image
private_key = _get_private_key() private_key = _get_private_key()
if private_key is None: if private_key is None:
return {"error": "No identity key. Run fieldwitness init first."}, 400 return {"error": "No identity key. Run fieldwitness init first."}, 400
@ -262,10 +294,14 @@ def attest_batch():
for f in files: for f in files:
filename = f.filename or "unknown" filename = f.filename or "unknown"
try: try:
image_data = f.read() if not _allowed_file(filename):
sha256 = hashlib.sha256(image_data).hexdigest() results.append({"file": filename, "status": "skipped", "reason": "unsupported file type"})
continue
# Skip already-attested images file_data = f.read()
sha256 = hashlib.sha256(file_data).hexdigest()
# Skip already-attested files
existing = storage.get_records_by_image_sha256(sha256) existing = storage.get_records_by_image_sha256(sha256)
if existing: if existing:
results.append({"file": filename, "status": "skipped", "reason": "already attested"}) results.append({"file": filename, "status": "skipped", "reason": "already attested"})
@ -273,7 +309,7 @@ def attest_batch():
from fieldwitness.attest.attestation import create_attestation from fieldwitness.attest.attestation import create_attestation
attestation = create_attestation(image_data, private_key) attestation = create_attestation(file_data, private_key)
index = storage.append_record(attestation.record) index = storage.append_record(attestation.record)
# Wrap in chain if enabled # Wrap in chain if enabled
@ -312,10 +348,10 @@ def attest_batch():
@bp.route("/verify/batch", methods=["POST"]) @bp.route("/verify/batch", methods=["POST"])
@login_required @login_required
def verify_batch(): def verify_batch():
"""Batch verification — accepts multiple image files. """Batch verification — accepts multiple files of any supported type.
Returns JSON with per-file verification results. Uses SHA-256 Returns JSON with per-file verification results. Uses SHA-256
fast path before falling back to perceptual scan. fast path before falling back to perceptual scan (images only).
""" """
files = request.files.getlist("images") files = request.files.getlist("images")
if not files: if not files:
@ -325,8 +361,8 @@ def verify_batch():
for f in files: for f in files:
filename = f.filename or "unknown" filename = f.filename or "unknown"
try: try:
image_data = f.read() file_data = f.read()
result = _verify_image(image_data) result = _verify_file(file_data)
if result["matches"]: if result["matches"]:
best = result["matches"][0] best = result["matches"][0]
@ -361,17 +397,20 @@ def verify_batch():
} }
def _verify_image(image_data: bytes) -> dict: def _verify_file(file_data: bytes) -> dict:
"""Run the full verification pipeline against the attestation log. """Run the full verification pipeline against the attestation log.
Works for any file type. Images get SHA-256 + perceptual matching;
non-image files get SHA-256 matching only.
Returns a dict with keys: Returns a dict with keys:
query_hashes ImageHashes object from fieldwitness.attest query_hashes ImageHashes object from fieldwitness.attest
matches list of match dicts (record, match_type, distances, attestor_name) matches list of match dicts (record, match_type, distances, attestor_name)
record_count total records searched record_count total records searched
""" """
from fieldwitness.attest.hashing import compute_all_distances, hash_image, is_same_image from fieldwitness.attest.hashing import compute_all_distances, hash_file, is_same_image
query_hashes = hash_image(image_data) query_hashes = hash_file(file_data)
storage = _get_storage() storage = _get_storage()
stats = storage.get_stats() stats = storage.get_stats()
@ -423,17 +462,22 @@ def verify():
The log read here is read-only and reveals no key material. The log read here is read-only and reveals no key material.
""" """
if request.method == "POST": if request.method == "POST":
image_file = request.files.get("image") evidence_file = request.files.get("image")
if not image_file or not image_file.filename: if not evidence_file or not evidence_file.filename:
flash("Please select an image to verify.", "error") flash("Please select a file to verify.", "error")
return redirect(url_for("attest.verify")) return redirect(url_for("attest.verify"))
if not _allowed_image(image_file.filename): if not _allowed_file(evidence_file.filename):
flash("Unsupported image format.", "error") flash(
"Unsupported file type. Upload any image, document, audio, video, or data file.",
"error",
)
return redirect(url_for("attest.verify")) return redirect(url_for("attest.verify"))
try: try:
result = _verify_image(image_file.read()) file_data = evidence_file.read()
is_image = _is_image_file(evidence_file.filename)
result = _verify_file(file_data)
query_hashes = result["query_hashes"] query_hashes = result["query_hashes"]
matches = result["matches"] matches = result["matches"]
@ -443,7 +487,8 @@ def verify():
found=False, found=False,
message="No attestations in the local log yet.", message="No attestations in the local log yet.",
query_hashes=query_hashes, query_hashes=query_hashes,
filename=image_file.filename, filename=evidence_file.filename,
is_image=is_image,
matches=[], matches=[],
) )
@ -456,7 +501,8 @@ def verify():
else "No matching attestations found." else "No matching attestations found."
), ),
query_hashes=query_hashes, query_hashes=query_hashes,
filename=image_file.filename, filename=evidence_file.filename,
is_image=is_image,
matches=matches, matches=matches,
) )
@ -471,29 +517,29 @@ def verify():
def verify_receipt(): def verify_receipt():
"""Return a downloadable JSON verification receipt for court or legal use. """Return a downloadable JSON verification receipt for court or legal use.
Accepts the same image upload as /verify. Returns a JSON file attachment Accepts the same file upload as /verify. Returns a JSON file attachment
containing image hashes, all matching attestation records with full metadata, containing file hashes, all matching attestation records with full metadata,
the verification timestamp, and the verifier hostname. the verification timestamp, and the verifier hostname.
Intentionally unauthenticated same access policy as /verify. Intentionally unauthenticated same access policy as /verify.
""" """
image_file = request.files.get("image") evidence_file = request.files.get("image")
if not image_file or not image_file.filename: if not evidence_file or not evidence_file.filename:
return Response( return Response(
json.dumps({"error": "No image provided"}), json.dumps({"error": "No file provided"}),
status=400, status=400,
mimetype="application/json", mimetype="application/json",
) )
if not _allowed_image(image_file.filename): if not _allowed_file(evidence_file.filename):
return Response( return Response(
json.dumps({"error": "Unsupported image format"}), json.dumps({"error": "Unsupported file type"}),
status=400, status=400,
mimetype="application/json", mimetype="application/json",
) )
try: try:
result = _verify_image(image_file.read()) result = _verify_file(evidence_file.read())
except Exception as e: except Exception as e:
return Response( return Response(
json.dumps({"error": f"Verification failed: {e}"}), json.dumps({"error": f"Verification failed: {e}"}),
@ -573,11 +619,11 @@ def verify_receipt():
"schema_version": "3", "schema_version": "3",
"verification_timestamp": verification_ts, "verification_timestamp": verification_ts,
"verifier_instance": verifier_instance, "verifier_instance": verifier_instance,
"queried_filename": image_file.filename, "queried_filename": evidence_file.filename,
"image_hash": { "file_hash": {
"sha256": query_hashes.sha256, "sha256": query_hashes.sha256,
"phash": query_hashes.phash, "phash": query_hashes.phash or None,
"dhash": getattr(query_hashes, "dhash", None), "dhash": getattr(query_hashes, "dhash", None) or None,
}, },
"records_searched": result["record_count"], "records_searched": result["record_count"],
"matches_found": len(matching_records), "matches_found": len(matching_records),
@ -599,7 +645,9 @@ def verify_receipt():
receipt_json = json.dumps(receipt, indent=2, ensure_ascii=False) receipt_json = json.dumps(receipt, indent=2, ensure_ascii=False)
safe_filename = ( safe_filename = (
image_file.filename.rsplit(".", 1)[0] if "." in image_file.filename else image_file.filename evidence_file.filename.rsplit(".", 1)[0]
if "." in evidence_file.filename
else evidence_file.filename
) )
download_name = f"receipt_{safe_filename}_{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}.json" download_name = f"receipt_{safe_filename}_{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}.json"

View File

@ -1,17 +1,18 @@
{% extends "base.html" %} {% extends "base.html" %}
{% block title %}Attest Image — FieldWitness{% endblock %} {% block title %}Attest File — FieldWitness{% endblock %}
{% block content %} {% block content %}
<div class="row justify-content-center"> <div class="row justify-content-center">
<div class="col-lg-8"> <div class="col-lg-8">
<div class="card bg-dark border-secondary"> <div class="card bg-dark border-secondary">
<div class="card-header"> <div class="card-header">
<h5 class="mb-0"><i class="bi bi-patch-check me-2 text-info"></i>Attest Image</h5> <h5 class="mb-0"><i class="bi bi-patch-check me-2 text-info"></i>Attest File</h5>
</div> </div>
<div class="card-body"> <div class="card-body">
<p class="text-muted"> <p class="text-muted">
Create a cryptographic provenance attestation — sign an image with your Ed25519 identity Create a cryptographic provenance attestation — sign any file with your Ed25519 identity
to prove when and by whom it was captured. to prove when and by whom it was captured or created. Supports photos, documents,
sensor data, audio, video, and more.
</p> </p>
{% if not has_identity %} {% if not has_identity %}
@ -25,28 +26,31 @@
<form method="POST" enctype="multipart/form-data"> <form method="POST" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-4"> <div class="mb-4">
<label for="image" class="form-label"><i class="bi bi-image me-1"></i>Image to Attest</label> <label for="image" class="form-label"><i class="bi bi-file-earmark me-1"></i>Evidence File</label>
<input type="file" class="form-control" name="image" id="image" <input type="file" class="form-control" name="image" id="image" required>
accept="image/png,image/jpeg,image/webp,image/tiff,image/bmp" required> <div class="form-text">
<div class="form-text">Supports PNG, JPEG, WebP, TIFF, BMP.</div> Accepts images (PNG, JPEG, WebP, TIFF), documents (PDF, DOCX, CSV, TXT),
audio (MP3, WAV, FLAC), video (MP4, MOV, MKV), and sensor data files.
Perceptual matching (pHash, dHash) is available for image files only.
</div>
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="caption" class="form-label"><i class="bi bi-chat-text me-1"></i>Caption (optional)</label> <label for="caption" class="form-label"><i class="bi bi-chat-text me-1"></i>Caption (optional)</label>
<input type="text" class="form-control" name="caption" id="caption" <input type="text" class="form-control" name="caption" id="caption"
placeholder="What does this image show?" maxlength="500"> placeholder="What does this file document?" maxlength="500">
</div> </div>
<div class="mb-3"> <div class="mb-3">
<label for="location_name" class="form-label"><i class="bi bi-geo-alt me-1"></i>Location (optional)</label> <label for="location_name" class="form-label"><i class="bi bi-geo-alt me-1"></i>Location (optional)</label>
<input type="text" class="form-control" name="location_name" id="location_name" <input type="text" class="form-control" name="location_name" id="location_name"
placeholder="Where was this taken?" maxlength="200"> placeholder="Where was this captured?" maxlength="200">
</div> </div>
<div class="form-check form-switch mb-4"> <div class="form-check form-switch mb-4">
<input class="form-check-input" type="checkbox" name="auto_exif" id="autoExif" checked> <input class="form-check-input" type="checkbox" name="auto_exif" id="autoExif" checked>
<label class="form-check-label" for="autoExif"> <label class="form-check-label" for="autoExif">
Extract EXIF metadata automatically (GPS, timestamp, device) Extract EXIF metadata automatically (GPS, timestamp, device) — images only
</label> </label>
</div> </div>

View File

@ -39,7 +39,7 @@
{% else %} {% else %}
<div class="alert alert-secondary"> <div class="alert alert-secondary">
<i class="bi bi-inbox me-2"></i> <i class="bi bi-inbox me-2"></i>
No attestations yet. <a href="/attest" class="alert-link">Attest your first image</a>. No attestations yet. <a href="/attest" class="alert-link">Attest your first file</a>.
</div> </div>
{% endif %} {% endif %}
</div> </div>

View File

@ -7,9 +7,17 @@
<div class="alert alert-success"> <div class="alert alert-success">
<i class="bi bi-check-circle me-2"></i> <i class="bi bi-check-circle me-2"></i>
<strong>Attestation created successfully!</strong> <strong>Attestation created successfully!</strong>
Image <code>{{ filename }}</code> has been attested and stored in the local log (index #{{ index }}). File <code>{{ filename }}</code> has been attested and stored in the local log (index #{{ index }}).
</div> </div>
{% if not is_image %}
<div class="alert alert-info">
<i class="bi bi-info-circle me-2"></i>
This file is attested by cryptographic hash. Perceptual matching (pHash, dHash)
is available for image files only.
</div>
{% endif %}
<div class="card bg-dark border-secondary mb-4"> <div class="card bg-dark border-secondary mb-4">
<div class="card-header"> <div class="card-header">
<h5 class="mb-0"><i class="bi bi-file-earmark-check me-2"></i>Attestation Record</h5> <h5 class="mb-0"><i class="bi bi-file-earmark-check me-2"></i>Attestation Record</h5>
@ -38,7 +46,7 @@
<div class="card bg-dark border-secondary mb-4"> <div class="card bg-dark border-secondary mb-4">
<div class="card-header"> <div class="card-header">
<h6 class="mb-0"><i class="bi bi-hash me-2"></i>Image Hashes</h6> <h6 class="mb-0"><i class="bi bi-hash me-2"></i>File Hashes</h6>
</div> </div>
<div class="card-body"> <div class="card-body">
<div class="mb-2"> <div class="mb-2">
@ -84,7 +92,7 @@
<div class="d-grid gap-2"> <div class="d-grid gap-2">
<a href="/attest" class="btn btn-outline-info"> <a href="/attest" class="btn btn-outline-info">
<i class="bi bi-plus-circle me-2"></i>Attest Another Image <i class="bi bi-plus-circle me-2"></i>Attest Another File
</a> </a>
<a href="/attest/log" class="btn btn-outline-secondary"> <a href="/attest/log" class="btn btn-outline-secondary">
<i class="bi bi-journal-text me-2"></i>View Attestation Log <i class="bi bi-journal-text me-2"></i>View Attestation Log

View File

@ -1,30 +1,33 @@
{% extends "base.html" %} {% extends "base.html" %}
{% block title %}Verify Image — FieldWitness{% endblock %} {% block title %}Verify File — FieldWitness{% endblock %}
{% block content %} {% block content %}
<div class="row justify-content-center"> <div class="row justify-content-center">
<div class="col-lg-8"> <div class="col-lg-8">
<div class="card bg-dark border-secondary"> <div class="card bg-dark border-secondary">
<div class="card-header"> <div class="card-header">
<h5 class="mb-0"><i class="bi bi-search me-2 text-info"></i>Verify Image</h5> <h5 class="mb-0"><i class="bi bi-search me-2 text-info"></i>Verify File</h5>
</div> </div>
<div class="card-body"> <div class="card-body">
<p class="text-muted"> <p class="text-muted">
Check an image against the local attestation log. Uses SHA-256 for exact matching Check a file against the local attestation log. For image files, uses SHA-256 for
and perceptual hashes (pHash, dHash) for robustness against compression and resizing. exact matching and perceptual hashes (pHash, dHash) for robustness against
compression and resizing. For all other file types, SHA-256 exact matching is used.
</p> </p>
<form method="POST" enctype="multipart/form-data"> <form method="POST" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-4"> <div class="mb-4">
<label for="image" class="form-label"><i class="bi bi-image me-1"></i>Image to Verify</label> <label for="image" class="form-label"><i class="bi bi-file-earmark-search me-1"></i>Evidence File to Verify</label>
<input type="file" class="form-control" name="image" id="image" <input type="file" class="form-control" name="image" id="image" required>
accept="image/png,image/jpeg,image/webp,image/tiff,image/bmp" required> <div class="form-text">
<div class="form-text">Upload the image you want to verify against known attestations.</div> Upload the file you want to verify against known attestations.
Accepts images, documents, audio, video, and data files.
</div>
</div> </div>
<button type="submit" class="btn btn-info btn-lg w-100"> <button type="submit" class="btn btn-info btn-lg w-100">
<i class="bi bi-search me-2"></i>Verify Image <i class="bi bi-search me-2"></i>Verify File
</button> </button>
</form> </form>
</div> </div>

View File

@ -16,10 +16,18 @@
</div> </div>
{% endif %} {% endif %}
{# Query image hashes #} {% if not is_image %}
<div class="alert alert-info">
<i class="bi bi-info-circle me-2"></i>
This file is attested by cryptographic hash. Perceptual matching (pHash, dHash)
is available for image files only.
</div>
{% endif %}
{# Query file hashes #}
<div class="card bg-dark border-secondary mb-4"> <div class="card bg-dark border-secondary mb-4">
<div class="card-header"> <div class="card-header">
<h6 class="mb-0"><i class="bi bi-hash me-2"></i>Image Hashes for <code>{{ filename }}</code></h6> <h6 class="mb-0"><i class="bi bi-hash me-2"></i>File Hashes for <code>{{ filename }}</code></h6>
</div> </div>
<div class="card-body"> <div class="card-body">
<div class="mb-2"> <div class="mb-2">
@ -103,13 +111,13 @@
<div class="card-body"> <div class="card-body">
<p class="text-muted small mb-3"> <p class="text-muted small mb-3">
Generate a signed JSON receipt for legal or archival use. Generate a signed JSON receipt for legal or archival use.
Re-upload the same image to produce the downloadable file. Re-upload the same file to produce the downloadable receipt.
</p> </p>
<form action="/verify/receipt" method="post" enctype="multipart/form-data"> <form action="/verify/receipt" method="post" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="mb-3"> <div class="mb-3">
<input class="form-control form-control-sm bg-dark text-light border-secondary" <input class="form-control form-control-sm bg-dark text-light border-secondary"
type="file" name="image" accept="image/*" required> type="file" name="image" required>
</div> </div>
<button type="submit" class="btn btn-outline-warning btn-sm"> <button type="submit" class="btn btn-outline-warning btn-sm">
Download Receipt (.json) Download Receipt (.json)
@ -121,7 +129,7 @@
<div class="d-grid gap-2 mt-4"> <div class="d-grid gap-2 mt-4">
<a href="/verify" class="btn btn-outline-info"> <a href="/verify" class="btn btn-outline-info">
Verify Another Image Verify Another File
</a> </a>
<a href="/attest/log" class="btn btn-outline-secondary"> <a href="/attest/log" class="btn btn-outline-secondary">
View Attestation Log View Attestation Log

View File

@ -99,16 +99,22 @@ fieldkit = [
federation = [ federation = [
"aiohttp>=3.9.0", "aiohttp>=3.9.0",
] ]
tor = [
"stem>=1.8.0",
]
c2pa = [ c2pa = [
"c2pa-python>=0.6.0", "c2pa-python>=0.6.0",
"fieldwitness[attest]", "fieldwitness[attest]",
] ]
evidence-pdf = [
"xhtml2pdf>=0.2.11",
]
rpi = [ rpi = [
"fieldwitness[web,cli,fieldkit]", "fieldwitness[web,cli,fieldkit]",
"gpiozero>=2.0", "gpiozero>=2.0",
] ]
all = [ all = [
"fieldwitness[stego-dct,stego-audio,stego-compression,attest,cli,web,api,fieldkit,federation,c2pa]", "fieldwitness[stego-dct,stego-audio,stego-compression,attest,cli,web,api,fieldkit,federation,tor,c2pa,evidence-pdf]",
] ]
dev = [ dev = [
"fieldwitness[all]", "fieldwitness[all]",

View File

@ -29,3 +29,19 @@ def has_c2pa() -> bool:
return True return True
except ImportError: except ImportError:
return False return False
def has_tor() -> bool:
"""Check if stem is importable and a Tor hidden service can be started.
stem is the Python controller library for Tor. It is an optional
dependency installed via the ``[tor]`` extra. This function checks only
that the library is present -- it does not verify that a Tor daemon is
running. Use ``fieldwitness.fieldkit.tor.start_onion_service`` for that.
"""
try:
import stem # noqa: F401
return True
except ImportError:
return False

View File

@ -55,6 +55,7 @@ def export_cold_archive(
CHAIN_DIR, CHAIN_DIR,
IDENTITY_DIR, IDENTITY_DIR,
IDENTITY_PUBLIC_KEY, IDENTITY_PUBLIC_KEY,
TRUSTED_KEYS_DIR,
) )
ts = datetime.now(UTC) ts = datetime.now(UTC)
@ -99,7 +100,7 @@ def export_cold_archive(
contents.append("keys/public.pem") contents.append("keys/public.pem")
# Trusted keys # Trusted keys
trusted_dir = IDENTITY_DIR.parent / "trusted_keys" trusted_dir = TRUSTED_KEYS_DIR
if trusted_dir.exists(): if trusted_dir.exists():
for key_dir in trusted_dir.iterdir(): for key_dir in trusted_dir.iterdir():
for f in key_dir.iterdir(): for f in key_dir.iterdir():

View File

@ -18,6 +18,8 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Annotated from typing import Annotated
import fieldwitness.paths as _fw_paths
try: try:
from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
@ -32,7 +34,9 @@ from .storage import LocalStorage
from .crypto import verify_signature, load_public_key_from_bytes from .crypto import verify_signature, load_public_key_from_bytes
# Configuration via environment # Configuration via environment
DATA_DIR = Path(os.environ.get("FIELDWITNESS_DATA_DIR", Path.home() / ".fieldwitness")) # DATA_DIR defers to paths.BASE_DIR so that FIELDWITNESS_DATA_DIR and runtime
# --data-dir overrides both propagate correctly without re-reading the env var here.
DATA_DIR = _fw_paths.BASE_DIR
BASE_URL = os.environ.get("FIELDWITNESS_BASE_URL", "https://attest.io") BASE_URL = os.environ.get("FIELDWITNESS_BASE_URL", "https://attest.io")
app = FastAPI( app = FastAPI(

View File

@ -13,13 +13,13 @@ Command Structure:
generate # Create new identity generate # Create new identity
show # Display current identity fingerprint show # Display current identity fingerprint
attest <image> # Create attestation for an image attest <file> # Create attestation for any file type
--location, -l # GPS coordinates --location, -l # GPS coordinates
--caption, -c # Photographer's notes --caption, -c # Description / notes
--tag, -t # Metadata tags (repeatable) --tag, -t # Metadata tags (repeatable)
--no-exif # Disable EXIF extraction --no-exif # Disable EXIF extraction (images only)
verify <image> # Check image against known attestations verify <file> # Check file against known attestations
--exact # Require byte-exact match (no perceptual) --exact # Require byte-exact match (no perceptual)
log # Query the attestation log log # Query the attestation log
@ -58,8 +58,12 @@ Usage Examples:
# Attest a photo with location # Attest a photo with location
$ attest attest photo.jpg -l "50.45,30.52,10,Kyiv" -c "Morning scene" $ attest attest photo.jpg -l "50.45,30.52,10,Kyiv" -c "Morning scene"
# Verify an image (even after social media compression) # Attest a document
$ attest attest report.pdf -c "Q1 human rights report"
# Verify a file (images also get perceptual matching)
$ attest verify downloaded_photo.jpg $ attest verify downloaded_photo.jpg
$ attest verify leaked_document.pdf
# Start API server for remote verification # Start API server for remote verification
$ attest serve --port 8000 $ attest serve --port 8000
@ -282,18 +286,18 @@ def _parse_location(location_str: str) -> dict[str, Any]:
@main.command() @main.command()
@click.argument("image", type=click.Path(exists=True, path_type=Path)) @click.argument("file", type=click.Path(exists=True, path_type=Path))
@click.option("--password", is_flag=True, help="Private key is encrypted") @click.option("--password", is_flag=True, help="Private key is encrypted")
@click.option("--tag", "-t", multiple=True, help="Add metadata tags") @click.option("--tag", "-t", multiple=True, help="Add metadata tags")
@click.option("--location", "-l", "location_str", help='GPS coords: "lat,lon" or "lat,lon,accuracy,name"') @click.option("--location", "-l", "location_str", help='GPS coords: "lat,lon" or "lat,lon,accuracy,name"')
@click.option("--caption", "-c", help="Photographer's notes") @click.option("--caption", "-c", help="Description or notes about this file")
@click.option("--no-exif", "no_exif", is_flag=True, help="Disable auto EXIF extraction") @click.option("--no-exif", "no_exif", is_flag=True, help="Disable auto EXIF extraction (images only)")
@click.option("--embed", "-e", is_flag=True, help="Embed proof link in image (JPEG: DCT, other: XMP sidecar)") @click.option("--embed", "-e", is_flag=True, help="Embed proof link in file (JPEG: DCT, other: XMP sidecar)")
@click.option("--base-url", default="https://attest.io", help="Base URL for proof links") @click.option("--base-url", default="https://attest.io", help="Base URL for proof links")
@click.pass_context @click.pass_context
def attest( def attest(
ctx: click.Context, ctx: click.Context,
image: Path, file: Path,
password: bool, password: bool,
tag: tuple[str, ...], tag: tuple[str, ...],
location_str: str | None, location_str: str | None,
@ -303,16 +307,20 @@ def attest(
base_url: str, base_url: str,
) -> None: ) -> None:
""" """
Create a cryptographic attestation for an image. Create a cryptographic attestation for a file.
This command creates a signed record proving that YOU attested THIS IMAGE This command creates a signed record proving that YOU attested THIS FILE
at THIS TIME with THIS METADATA. The attestation is stored in your local at THIS TIME with THIS METADATA. The attestation is stored in your local
log and can be synced to federation peers. log and can be synced to federation peers.
Supports any file type: images, documents, audio, video, CSVs, sensor data.
Perceptual hashing (pHash, dHash) is computed for image files only; all
other files are attested by SHA-256 hash.
\b \b
METADATA SOURCES (in order of precedence): METADATA SOURCES (in order of precedence):
1. Command-line options (--location, --caption, --tag) 1. Command-line options (--location, --caption, --tag)
2. EXIF data from the image (unless --no-exif) 2. EXIF data from the file, if it is an image (unless --no-exif)
\b \b
PROOF EMBEDDING (--embed): PROOF EMBEDDING (--embed):
@ -321,9 +329,15 @@ def attest(
\b \b
EXAMPLES: EXAMPLES:
# Basic attestation (auto-extracts EXIF) # Attest a photo (auto-extracts EXIF)
attest attest photo.jpg attest attest photo.jpg
# Attest a document
attest attest report.pdf -c "Q1 human rights report"
# Attest sensor data with location
attest attest readings.csv -l "50.45,30.52,10,Kyiv" -t sensor
# With proof link embedded in image # With proof link embedded in image
attest attest photo.jpg --embed attest attest photo.jpg --embed
@ -367,9 +381,9 @@ def attest(
private_key = load_private_key(storage.private_key_path, key_password) private_key = load_private_key(storage.private_key_path, key_password)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Read image file # Read the file
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
image_data = image.read_bytes() file_data = file.read_bytes()
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Build metadata from CLI options # Build metadata from CLI options
@ -382,23 +396,23 @@ def attest(
metadata["tags"] = list(tag) metadata["tags"] = list(tag)
# Always record the original filename # Always record the original filename
metadata["filename"] = image.name metadata["filename"] = file.name
# Parse and add location if provided via CLI # Parse and add location if provided via CLI
# This OVERRIDES any GPS data from EXIF # This OVERRIDES any GPS data from EXIF
if location_str: if location_str:
metadata["location"] = _parse_location(location_str) metadata["location"] = _parse_location(location_str)
# Add caption (photographer's notes) # Add caption / description
if caption: if caption:
metadata["caption"] = caption metadata["caption"] = caption
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Create the attestation # Create the attestation
# This: computes hashes, extracts EXIF (if enabled), signs the record # This: computes hashes, extracts EXIF (if enabled and image), signs the record
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
attestation = create_attestation( attestation = create_attestation(
image_data, private_key, metadata, auto_exif=not no_exif file_data, private_key, metadata, auto_exif=not no_exif
) )
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
@ -425,7 +439,7 @@ def attest(
proof_link = f"{base_url}/v/{attestation.record.short_id}" proof_link = f"{base_url}/v/{attestation.record.short_id}"
embed_result = embed_proof_link( embed_result = embed_proof_link(
image_path=image, image_path=file,
proof_link=proof_link, proof_link=proof_link,
fingerprint=attestation.record.attestor_fingerprint, fingerprint=attestation.record.attestor_fingerprint,
attested_at=attestation.record.timestamp, attested_at=attestation.record.timestamp,
@ -454,7 +468,7 @@ def attest(
} }
click.echo(json.dumps(result)) click.echo(json.dumps(result))
else: else:
click.echo(f"Attested: {image.name}") click.echo(f"Attested: {file.name}")
click.echo(f" SHA-256: {attestation.image_hashes.sha256[:16]}...") click.echo(f" SHA-256: {attestation.image_hashes.sha256[:16]}...")
click.echo(f" Index: {index}") click.echo(f" Index: {index}")
click.echo(f" Root: {merkle_log.root_hash[:16]}...") click.echo(f" Root: {merkle_log.root_hash[:16]}...")
@ -467,30 +481,35 @@ def attest(
@main.command() @main.command()
@click.argument("image", type=click.Path(exists=True, path_type=Path)) @click.argument("file", type=click.Path(exists=True, path_type=Path))
@click.option("--exact", is_flag=True, help="Require exact byte match (not perceptual)") @click.option("--exact", is_flag=True, help="Require exact byte match (not perceptual)")
@click.pass_context @click.pass_context
def verify(ctx: click.Context, image: Path, exact: bool) -> None: def verify(ctx: click.Context, file: Path, exact: bool) -> None:
"""Verify an image against known attestations.""" """Verify a file against known attestations.
from .hashing import hash_image
Works for any file type. Image files additionally support perceptual
hash matching (pHash, dHash) which survives compression and resizing.
Non-image files are matched by SHA-256 only.
"""
from .hashing import hash_file
from .verification import find_attestations_for_image from .verification import find_attestations_for_image
from .storage import LocalStorage from .storage import LocalStorage
storage = LocalStorage(ctx.obj.get("data_dir")) storage = LocalStorage(ctx.obj.get("data_dir"))
# Read image and compute hashes # Read file and compute hashes (SHA-256 always; perceptual for images only)
image_data = image.read_bytes() file_data = file.read_bytes()
hashes = hash_image(image_data) hashes = hash_file(file_data)
# Find matching attestations # Find matching attestations
records = list(storage.iterate_records()) records = list(storage.iterate_records())
matches = find_attestations_for_image( matches = find_attestations_for_image(
image_data, records, perceptual_threshold=0 if exact else 10 file_data, records, perceptual_threshold=0 if exact else 10
) )
if ctx.obj.get("json"): if ctx.obj.get("json"):
result = { result = {
"image": str(image), "file": str(file),
"sha256": hashes.sha256, "sha256": hashes.sha256,
"matches": len(matches), "matches": len(matches),
"attestations": [ "attestations": [
@ -505,11 +524,11 @@ def verify(ctx: click.Context, image: Path, exact: bool) -> None:
click.echo(json.dumps(result)) click.echo(json.dumps(result))
else: else:
if not matches: if not matches:
click.echo(f"No attestations found for {image.name}") click.echo(f"No attestations found for {file.name}")
click.echo(f" SHA-256: {hashes.sha256[:16]}...") click.echo(f" SHA-256: {hashes.sha256[:16]}...")
sys.exit(1) sys.exit(1)
click.echo(f"Found {len(matches)} attestation(s) for {image.name}") click.echo(f"Found {len(matches)} attestation(s) for {file.name}")
for m in matches: for m in matches:
match_type = "exact" if m.image_hashes.sha256 == hashes.sha256 else "perceptual" match_type = "exact" if m.image_hashes.sha256 == hashes.sha256 else "perceptual"
click.echo(f" [{match_type}] {m.attestor_fingerprint[:16]}... @ {m.timestamp.isoformat()}") click.echo(f" [{match_type}] {m.attestor_fingerprint[:16]}... @ {m.timestamp.isoformat()}")

View File

@ -83,6 +83,33 @@ def hash_image_file(path: str, *, robust: bool = True) -> ImageHashes:
return hash_image(f.read(), robust=robust) return hash_image(f.read(), robust=robust)
def hash_file(file_data: bytes, *, robust: bool = True) -> ImageHashes:
"""
Compute hashes for any file type.
For image files (detectable by PIL), computes the full set of cryptographic
and perceptual hashes identical to hash_image().
For non-image files (documents, audio, video, CSV, etc.), PIL cannot decode
the bytes, so only SHA-256 is computed. phash and dhash are left as empty
strings. The attestation pipeline handles this correctly verification falls
back to SHA-256-only matching for these files.
Args:
file_data: Raw file bytes of any type
robust: Passed through to hash_image() for image files
Returns:
ImageHashes with sha256 always set; phash/dhash set only for images
"""
try:
return hash_image(file_data, robust=robust)
except Exception:
# Not a valid image (PIL cannot decode it) — SHA-256 only
sha256 = hashlib.sha256(file_data).hexdigest()
return ImageHashes(sha256=sha256, phash="", dhash="")
def _compute_crop_resistant_hash(img: Image.Image) -> str: def _compute_crop_resistant_hash(img: Image.Image) -> str:
""" """
Compute hash of center region - survives edge crops. Compute hash of center region - survives edge crops.

View File

@ -5,22 +5,30 @@ Exports FieldWitness AttestationRecord objects as C2PA-signed media files, embed
provenance metadata in the industry-standard JUMBF/COSE format understood by provenance metadata in the industry-standard JUMBF/COSE format understood by
Adobe Content Credentials, Verify.contentauthenticity.org, and other verifiers. Adobe Content Credentials, Verify.contentauthenticity.org, and other verifiers.
Also imports C2PA manifests from third-party files into FieldWitness attestation
records (best-effort field mapping).
Public API: Public API:
has_c2pa() -- runtime availability check has_c2pa() -- runtime availability check
get_or_create_c2pa_cert -- X.509 cert lifecycle management get_or_create_c2pa_cert -- X.509 cert lifecycle management
export_c2pa -- AttestationRecord -> C2PA-signed image bytes export_c2pa -- AttestationRecord -> C2PA-signed image bytes
import_c2pa -- C2PA image bytes -> C2PAImportResult + AttestationRecord
C2PAImportResult -- dataclass returned by import_c2pa
All imports from c2pa-python are guarded; this package loads without errors All imports from c2pa-python are guarded; this package loads without errors
even when the [c2pa] extra is not installed. Callers must check has_c2pa() even when the [c2pa] extra is not installed. Callers must check has_c2pa()
before calling export_c2pa(). before calling export_c2pa() or import_c2pa().
""" """
from fieldwitness._availability import has_c2pa from fieldwitness._availability import has_c2pa
from fieldwitness.c2pa_bridge.cert import get_or_create_c2pa_cert from fieldwitness.c2pa_bridge.cert import get_or_create_c2pa_cert
from fieldwitness.c2pa_bridge.export import export_c2pa from fieldwitness.c2pa_bridge.export import export_c2pa
from fieldwitness.c2pa_bridge.importer import C2PAImportResult, import_c2pa
__all__ = [ __all__ = [
"has_c2pa", "has_c2pa",
"get_or_create_c2pa_cert", "get_or_create_c2pa_cert",
"export_c2pa", "export_c2pa",
"import_c2pa",
"C2PAImportResult",
] ]

View File

@ -0,0 +1,457 @@
"""
CLI subcommands for C2PA operations.
Registered under the 'c2pa' subgroup in the main fieldwitness CLI:
fieldwitness c2pa export <file> [options]
fieldwitness c2pa verify <file> [--cert <pem>]
fieldwitness c2pa import <file> [--trust-cert <pem>]
fieldwitness c2pa show <file>
All commands gate on has_c2pa() and print a helpful install message when the
[c2pa] extra is absent. The export command requires an initialised FieldWitness
identity (run 'fieldwitness init' first).
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
import click
from fieldwitness._availability import has_c2pa
# ── Availability guard helper ─────────────────────────────────────────────────
_C2PA_INSTALL_HINT = (
"c2pa-python is not installed.\n"
"Install it with: pip install 'fieldwitness[c2pa]'"
)
def _require_c2pa() -> None:
"""Abort with a helpful message if c2pa-python is not available."""
if not has_c2pa():
click.echo(f"Error: {_C2PA_INSTALL_HINT}", err=True)
sys.exit(1)
# ── Group ─────────────────────────────────────────────────────────────────────
@click.group()
def c2pa_group():
"""C2PA content provenance operations (export, import, verify, show)."""
pass
# ── export ────────────────────────────────────────────────────────────────────
@c2pa_group.command("export")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--output",
"-o",
type=click.Path(dir_okay=False, path_type=Path),
default=None,
help=(
"Output file path. Defaults to <file>_c2pa.<ext> "
"in the same directory as the source."
),
)
@click.option(
"--record-id",
default=None,
help="Attestation record ID to embed. Uses the most recent record if omitted.",
)
@click.option(
"--privacy",
type=click.Choice(["org", "pseudonym", "anonymous"], case_sensitive=False),
default="org",
show_default=True,
help=(
"Privacy level for the claim_generator field. "
"'org' uses the default identity; "
"'pseudonym' uses a UUID-based cert subject; "
"'anonymous' omits identity from claim_generator."
),
)
@click.option(
"--include-gps",
is_flag=True,
default=False,
help=(
"Embed precise GPS coordinates in the C2PA manifest. "
"By default, GPS is downsampled to city-level (~11 km) for privacy."
),
)
@click.option(
"--timestamp-url",
default=None,
help=(
"RFC 3161 timestamp authority URL. "
"Omit for offline (Tier 1) use; timestamps are anchored via entropy witnesses."
),
)
def export_cmd(
file: Path,
output: Path | None,
record_id: str | None,
privacy: str,
include_gps: bool,
timestamp_url: str | None,
) -> None:
"""Export FILE with an embedded C2PA manifest.
Reads the most recent FieldWitness attestation for FILE (or the record
specified by --record-id), signs a C2PA manifest with the local identity
key, and writes the result to --output.
Requires: fieldwitness init, [c2pa] extra installed.
Examples:
\b
fieldwitness c2pa export photo.jpg
fieldwitness c2pa export photo.jpg --output photo_signed.jpg --include-gps
fieldwitness c2pa export photo.jpg --privacy pseudonym --timestamp-url https://tsa.example.com
"""
_require_c2pa()
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from fieldwitness.attest.crypto import load_private_key
from fieldwitness.attest.storage import LocalStorage
from fieldwitness.c2pa_bridge import export_c2pa, get_or_create_c2pa_cert
from fieldwitness.paths import ATTESTATIONS_DIR, IDENTITY_PRIVATE_KEY
# Validate identity.
if not IDENTITY_PRIVATE_KEY.exists():
click.echo(
"Error: No identity configured. Run 'fieldwitness init' first.",
err=True,
)
sys.exit(1)
private_key = load_private_key(IDENTITY_PRIVATE_KEY)
cert_pem = get_or_create_c2pa_cert(private_key)
# Load image.
image_data = file.read_bytes()
image_format = file.suffix.lstrip(".").lower()
if image_format == "jpg":
image_format = "jpeg"
# Resolve attestation record.
storage = LocalStorage(base_path=ATTESTATIONS_DIR)
record = None
if record_id:
record = storage.get_record(record_id)
if record is None:
click.echo(f"Error: Record {record_id!r} not found.", err=True)
sys.exit(1)
else:
# Use the most recent record for this file's SHA-256.
import hashlib
sha256 = hashlib.sha256(image_data).hexdigest()
records = storage.find_records_by_hash(sha256)
if not records:
click.echo(
f"Error: No attestation found for {file.name}. "
"Attest the file first with: fieldwitness attest <file>",
err=True,
)
sys.exit(1)
record = records[-1]
# Optionally attach the chain store for vendor assertion enrichment.
chain_store = None
try:
from fieldwitness.federation.chain import ChainStore
from fieldwitness.paths import CHAIN_DIR
if CHAIN_DIR.exists():
chain_store = ChainStore(CHAIN_DIR)
except Exception:
pass
# Export.
try:
signed_bytes = export_c2pa(
image_data=image_data,
image_format=image_format,
record=record,
private_key=private_key,
cert_pem=cert_pem,
chain_store=chain_store,
privacy_level=privacy,
include_precise_gps=include_gps,
timestamp_url=timestamp_url,
)
except Exception as exc:
click.echo(f"Error: C2PA export failed: {exc}", err=True)
sys.exit(1)
# Determine output path.
if output is None:
ext = file.suffix
output = file.with_name(f"{file.stem}_c2pa{ext}")
output.write_bytes(signed_bytes)
click.echo(f"C2PA manifest embedded: {output}")
click.echo(f" Record ID : {record.record_id}")
click.echo(f" Privacy : {privacy}")
click.echo(f" GPS : {'precise' if include_gps else 'city-level'}")
if timestamp_url:
click.echo(f" Timestamp : {timestamp_url}")
else:
click.echo(" Timestamp : none (offline mode)")
# ── verify ────────────────────────────────────────────────────────────────────
@c2pa_group.command("verify")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--cert",
"cert_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=None,
help="PEM file of a trusted signing certificate to check against.",
)
def verify_cmd(file: Path, cert_path: Path | None) -> None:
"""Verify the C2PA manifest in FILE.
Reads and validates the manifest, then prints the trust status and a
summary of embedded provenance data.
Examples:
\b
fieldwitness c2pa verify photo_c2pa.jpg
fieldwitness c2pa verify photo_c2pa.jpg --cert org_cert.pem
"""
_require_c2pa()
from fieldwitness.c2pa_bridge.importer import import_c2pa
image_data = file.read_bytes()
image_format = file.suffix.lstrip(".").lower()
trusted_certs: list[str] | None = None
if cert_path:
trusted_certs = [cert_path.read_text()]
result = import_c2pa(image_data, image_format, trusted_certs=trusted_certs)
if not result.success:
click.echo(f"Verification failed: {result.error}", err=True)
sys.exit(1)
# Trust status with a visual indicator.
status_icons = {
"trusted": "[OK]",
"self-signed": "[FW]",
"unknown": "[??]",
"invalid": "[!!]",
}
icon = status_icons.get(result.trust_status, "[??]")
click.echo(f"{icon} Trust status: {result.trust_status}")
click.echo(f" Manifests : {len(result.manifests)}")
record = result.attestation_record
if record:
click.echo(f" Timestamp : {record.timestamp.isoformat()}")
click.echo(f" Signer : {record.metadata.get('c2pa_signer', 'unknown')}")
cm = record.capture_metadata
if cm:
if cm.location:
click.echo(f" Location : {cm.location}")
if cm.captured_at:
click.echo(f" Captured : {cm.captured_at.isoformat()}")
if cm.device:
click.echo(f" Device : {cm.device}")
if cm.caption:
click.echo(f" Caption : {cm.caption}")
click.echo(f" SHA-256 : {record.image_hashes.sha256[:16]}...")
if result.fieldwitness_assertions:
click.echo(f" FW data : {', '.join(result.fieldwitness_assertions.keys())}")
if result.trust_status == "invalid":
click.echo(
"\nWarning: The C2PA manifest signature is invalid. "
"This file may have been tampered with.",
err=True,
)
sys.exit(2)
# ── import ────────────────────────────────────────────────────────────────────
@c2pa_group.command("import")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--trust-cert",
"trust_cert_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=None,
help="PEM file of a trusted signing certificate.",
)
@click.option(
"--store/--no-store",
default=True,
help=(
"Store the imported record in the local attestation log (default: store). "
"Use --no-store to inspect without persisting."
),
)
def import_cmd(file: Path, trust_cert_path: Path | None, store: bool) -> None:
"""Import a C2PA manifest from FILE as a FieldWitness attestation record.
Parses the embedded C2PA manifest, extracts provenance metadata, and
(by default) stores the result in the local attestation log. The record
uses a sentinel signature since it originates externally; use
'fieldwitness attest <file>' afterward to create a locally-signed record.
Examples:
\b
fieldwitness c2pa import received_photo.jpg
fieldwitness c2pa import received_photo.jpg --trust-cert newsroom_cert.pem
fieldwitness c2pa import received_photo.jpg --no-store
"""
_require_c2pa()
from fieldwitness.attest.storage import LocalStorage
from fieldwitness.c2pa_bridge.importer import import_c2pa
from fieldwitness.paths import ATTESTATIONS_DIR
image_data = file.read_bytes()
image_format = file.suffix.lstrip(".").lower()
trusted_certs: list[str] | None = None
if trust_cert_path:
trusted_certs = [trust_cert_path.read_text()]
result = import_c2pa(image_data, image_format, trusted_certs=trusted_certs)
if not result.success:
click.echo(f"Error: Import failed: {result.error}", err=True)
sys.exit(1)
record = result.attestation_record
if record is None:
click.echo("Error: No usable record could be extracted from the manifest.", err=True)
sys.exit(1)
click.echo(f"Imported from : {file.name}")
click.echo(f"Trust status : {result.trust_status}")
click.echo(f"Record ID : {record.record_id}")
click.echo(f"Timestamp : {record.timestamp.isoformat()}")
signer = record.metadata.get("c2pa_signer", "unknown")
click.echo(f"Signer : {signer}")
cm = record.capture_metadata
if cm:
if cm.location:
click.echo(f"Location : {cm.location}")
if cm.captured_at:
click.echo(f"Captured at : {cm.captured_at.isoformat()}")
if cm.device:
click.echo(f"Device : {cm.device}")
if cm.caption:
click.echo(f"Caption : {cm.caption}")
if result.fieldwitness_assertions:
click.echo(f"FW assertions : {', '.join(result.fieldwitness_assertions.keys())}")
if store:
storage = LocalStorage(base_path=ATTESTATIONS_DIR)
storage.append_record(record)
click.echo(f"Stored in log : yes ({ATTESTATIONS_DIR})")
else:
click.echo("Stored in log : no (--no-store)")
if result.trust_status == "invalid":
click.echo(
"\nWarning: The manifest signature is invalid. "
"Treat this record with caution.",
err=True,
)
# ── show ──────────────────────────────────────────────────────────────────────
@c2pa_group.command("show")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--pretty/--compact",
default=True,
help="Pretty-print JSON (default) or output compact JSON.",
)
def show_cmd(file: Path, pretty: bool) -> None:
"""Dump the raw C2PA manifest store from FILE as JSON.
Useful for inspecting manifests from third-party tools or debugging
export output. Prints to stdout; pipe to 'jq' for further filtering.
Examples:
\b
fieldwitness c2pa show photo_c2pa.jpg
fieldwitness c2pa show photo_c2pa.jpg --compact | jq '.manifests'
"""
_require_c2pa()
import io
try:
import c2pa # type: ignore[import]
except ImportError:
click.echo(f"Error: {_C2PA_INSTALL_HINT}", err=True)
sys.exit(1)
image_data = file.read_bytes()
image_format = file.suffix.lstrip(".").lower()
_mime_map = {
"jpeg": "image/jpeg",
"jpg": "image/jpeg",
"png": "image/png",
"webp": "image/webp",
}
mime = _mime_map.get(image_format)
if mime is None:
click.echo(
f"Error: Unsupported image format {image_format!r}. "
"Supported: jpeg, png, webp.",
err=True,
)
sys.exit(1)
try:
reader = c2pa.Reader(mime, io.BytesIO(image_data))
manifest_json = reader.json()
except Exception as exc:
click.echo(f"Error: Failed to read C2PA manifest: {exc}", err=True)
sys.exit(1)
if not manifest_json:
click.echo("No C2PA manifest found in this file.")
return
try:
parsed = json.loads(manifest_json)
indent = 2 if pretty else None
separators = (", ", ": ") if pretty else (",", ":")
click.echo(json.dumps(parsed, indent=indent, separators=separators, default=str))
except Exception:
# Fall back to raw string if JSON parsing fails.
click.echo(manifest_json)

View File

@ -20,7 +20,6 @@ Architecture notes:
from __future__ import annotations from __future__ import annotations
import io import io
import math
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from fieldwitness._availability import has_c2pa from fieldwitness._availability import has_c2pa
@ -30,8 +29,8 @@ if TYPE_CHECKING:
# installed. Keeps the module importable in all environments. # installed. Keeps the module importable in all environments.
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from fieldwitness.federation.chain import ChainStore
from fieldwitness.attest.models import AttestationRecord from fieldwitness.attest.models import AttestationRecord
from fieldwitness.federation.chain import ChainStore
# ── GPS downsampling ────────────────────────────────────────────────────────── # ── GPS downsampling ──────────────────────────────────────────────────────────
@ -48,8 +47,7 @@ def _downsample_gps(lat: float, lon: float) -> tuple[float, float]:
Rounds both coordinates to _CITY_LEVEL_PRECISION decimal places, which Rounds both coordinates to _CITY_LEVEL_PRECISION decimal places, which
gives ~11 km accuracy enough to say "Kyiv" but not "Maidan Nezalezhnosti". gives ~11 km accuracy enough to say "Kyiv" but not "Maidan Nezalezhnosti".
""" """
factor = 10**_CITY_LEVEL_PRECISION return round(lat, _CITY_LEVEL_PRECISION), round(lon, _CITY_LEVEL_PRECISION)
return math.floor(lat * factor) / factor, math.floor(lon * factor) / factor
# ── MIME type helpers ───────────────────────────────────────────────────────── # ── MIME type helpers ─────────────────────────────────────────────────────────
@ -68,8 +66,7 @@ def _mime_type(image_format: str) -> str:
mime = _MIME_MAP.get(fmt) mime = _MIME_MAP.get(fmt)
if mime is None: if mime is None:
raise ValueError( raise ValueError(
f"Unsupported image format {image_format!r}. " f"Unsupported image format {image_format!r}. " f"Supported: jpeg, png, webp."
f"Supported: jpeg, png, webp."
) )
return mime return mime
@ -95,7 +92,7 @@ def _build_actions_assertion(version: str) -> dict[str, Any]:
def _build_exif_assertion( def _build_exif_assertion(
record: "AttestationRecord", record: AttestationRecord,
include_precise_gps: bool, include_precise_gps: bool,
) -> dict[str, Any] | None: ) -> dict[str, Any] | None:
"""Build a c2pa.exif assertion from CaptureMetadata. """Build a c2pa.exif assertion from CaptureMetadata.
@ -158,7 +155,7 @@ def _build_exif_assertion(
def _build_creative_work_assertion( def _build_creative_work_assertion(
record: "AttestationRecord", record: AttestationRecord,
) -> dict[str, Any] | None: ) -> dict[str, Any] | None:
"""Build a c2pa.creative.work assertion from CaptureMetadata. """Build a c2pa.creative.work assertion from CaptureMetadata.
@ -187,8 +184,8 @@ def _build_creative_work_assertion(
def _find_chain_record_for_attestation( def _find_chain_record_for_attestation(
record: "AttestationRecord", record: AttestationRecord,
chain_store: "ChainStore", chain_store: ChainStore,
) -> Any | None: ) -> Any | None:
"""Search the chain store for a record whose content_hash matches the attestation. """Search the chain store for a record whose content_hash matches the attestation.
@ -223,10 +220,10 @@ def _find_chain_record_for_attestation(
def export_c2pa( def export_c2pa(
image_data: bytes, image_data: bytes,
image_format: str, image_format: str,
record: "AttestationRecord", record: AttestationRecord,
private_key: "Ed25519PrivateKey", private_key: Ed25519PrivateKey,
cert_pem: str, cert_pem: str,
chain_store: "ChainStore | None" = None, chain_store: ChainStore | None = None,
privacy_level: str = "org", privacy_level: str = "org",
include_precise_gps: bool = False, include_precise_gps: bool = False,
timestamp_url: str | None = None, timestamp_url: str | None = None,
@ -272,8 +269,7 @@ def export_c2pa(
""" """
if not has_c2pa(): if not has_c2pa():
raise ImportError( raise ImportError(
"c2pa-python is not installed. " "c2pa-python is not installed. " "Install it with: pip install 'fieldwitness[c2pa]'"
"Install it with: pip install 'fieldwitness[c2pa]'"
) )
# All c2pa imports are deferred to here so the module loads without them. # All c2pa imports are deferred to here so the module loads without them.
@ -340,14 +336,10 @@ def export_c2pa(
manifest_def["assertions"].append({"label": "c2pa.exif", "data": exif}) manifest_def["assertions"].append({"label": "c2pa.exif", "data": exif})
if creative_work is not None: if creative_work is not None:
manifest_def["assertions"].append( manifest_def["assertions"].append({"label": "c2pa.creative.work", "data": creative_work})
{"label": "c2pa.creative.work", "data": creative_work}
)
if chain_assertion is not None: if chain_assertion is not None:
manifest_def["assertions"].append( manifest_def["assertions"].append({"label": LABEL_CHAIN_RECORD, "data": chain_assertion})
{"label": LABEL_CHAIN_RECORD, "data": chain_assertion}
)
# ── Create the signer ──────────────────────────────────────────────────── # ── Create the signer ────────────────────────────────────────────────────
# c2pa-python's create_signer() takes a signing callback, algorithm name, # c2pa-python's create_signer() takes a signing callback, algorithm name,

View File

@ -0,0 +1,634 @@
"""
Import path: C2PA manifest -> FieldWitness AttestationRecord.
This module reads a C2PA manifest embedded in image bytes and produces a
FieldWitness AttestationRecord from it. The conversion is best-effort: if
a C2PA assertion field is absent, the corresponding FieldWitness field is
omitted rather than failing the import. The caller receives a C2PAImportResult
that includes a trust_status summary and an optional partial record.
Mapping applied:
Claim 'created' -> AttestationRecord.timestamp
c2pa.exif Make/Model -> CaptureMetadata.device
c2pa.exif DateTimeOriginal -> CaptureMetadata.captured_at
c2pa.exif GPS{Latitude,Longitude} -> CaptureMetadata.location
c2pa.creative.work description -> CaptureMetadata.caption
Signer cert CN -> metadata['c2pa_signer']
org.fieldwitness.perceptual-hashes -> ImageHashes (if present)
org.fieldwitness.attestation-id -> metadata['c2pa_record_id']
Trust evaluation:
"trusted" -- signer cert is in the caller-provided trusted_certs list
"self-signed" -- cert is self-signed with 'FieldWitness' in claim_generator
"unknown" -- cert is present but not trusted or self-signed FieldWitness
"invalid" -- c2pa-python reports a validation error
All c2pa-python imports are deferred inside function bodies and guarded by
has_c2pa(). This module loads without errors even without the [c2pa] extra.
"""
from __future__ import annotations
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
from fieldwitness._availability import has_c2pa
if TYPE_CHECKING:
from fieldwitness.attest.models import AttestationRecord
# ── Result dataclass ──────────────────────────────────────────────────────────
@dataclass
class C2PAImportResult:
"""Result of a C2PA import attempt.
All fields are populated on a best-effort basis. A successful parse with
partial data is reflected by success=True with some fields None or empty.
Only hard failures (e.g. corrupt JUMBF, unsupported format) set success=False.
Attributes:
success: True if the manifest was parsed without fatal errors.
manifests: Raw manifest dicts from c2pa.Reader (may be empty).
attestation_record: AttestationRecord constructed from the active manifest,
or None if the import failed or had insufficient data.
fieldwitness_assertions: Parsed org.fieldwitness.* vendor assertions, keyed
by assertion label. Empty if none are present.
trust_status: One of: "trusted", "self-signed", "unknown", "invalid".
error: Human-readable error string on failure, else None.
"""
success: bool
manifests: list[dict[str, Any]]
attestation_record: AttestationRecord | None
fieldwitness_assertions: dict[str, Any]
trust_status: str # "trusted" | "self-signed" | "unknown" | "invalid"
error: str | None = None
# ── Internal helpers ──────────────────────────────────────────────────────────
def _extract_signer_cn(manifest: dict[str, Any]) -> str | None:
"""Return the Common Name from the signer certificate chain, or None.
c2pa-python exposes the certificate chain as a list of PEM strings under
the 'signature_info' -> 'cert_chain' key (varies by library version). We
attempt both the 0.6.x and older dict shapes, then fall back to searching
any serialised 'issuer' or 'subject' fields for a CN value.
Returns None if the certificate data is absent or unparseable.
"""
try:
sig_info = manifest.get("signature_info") or {}
# c2pa-python 0.6.x puts cert info in signature_info.cert_chain (list of PEM)
cert_chain = sig_info.get("cert_chain") or []
if cert_chain:
pem = cert_chain[0] if isinstance(cert_chain, list) else cert_chain
return _cn_from_pem(pem)
# Older shape: issuer directly as string
issuer = sig_info.get("issuer") or sig_info.get("cert_subject") or ""
if "CN=" in issuer:
cn_part = issuer.split("CN=", 1)[1]
return cn_part.split(",", 1)[0].strip()
except Exception:
pass
return None
def _cn_from_pem(pem: str) -> str | None:
"""Parse the subject CN from a PEM-encoded certificate string.
Uses the cryptography library if available. Returns None on any error.
"""
try:
from cryptography import x509
from cryptography.hazmat.primitives.serialization import Encoding
cert = x509.load_pem_x509_certificate(pem.encode() if isinstance(pem, str) else pem)
attrs = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if attrs:
return attrs[0].value
except Exception:
pass
return None
def _cert_is_self_signed(pem: str) -> bool:
"""Return True if the PEM certificate is self-signed (issuer == subject)."""
try:
from cryptography import x509
cert = x509.load_pem_x509_certificate(pem.encode() if isinstance(pem, str) else pem)
return cert.issuer == cert.subject
except Exception:
return False
def _evaluate_trust(
manifest: dict[str, Any],
trusted_certs: list[str] | None,
) -> str:
"""Evaluate the trust level of a C2PA manifest's signer certificate.
Args:
manifest: Parsed manifest dict from c2pa.Reader.
trusted_certs: List of PEM strings the caller considers trusted.
Returns:
"trusted" -- signer cert fingerprint matches a trusted_cert.
"self-signed" -- self-signed cert with 'FieldWitness' in claim_generator.
"unknown" -- cert present but not in trusted set and not a known FW cert.
"invalid" -- validation failed (recorded in manifest or exception).
"""
# Check for a validation failure flag set by the caller before this function.
if manifest.get("_fw_invalid"):
return "invalid"
sig_info = manifest.get("signature_info") or {}
cert_chain = sig_info.get("cert_chain") or []
signer_pem = (cert_chain[0] if isinstance(cert_chain, list) and cert_chain else None)
if trusted_certs and signer_pem:
# Compare SHA-256 fingerprints of DER-encoded certs to avoid PEM whitespace issues.
try:
from cryptography import x509
signer_cert = x509.load_pem_x509_certificate(
signer_pem.encode() if isinstance(signer_pem, str) else signer_pem
)
signer_fp = hashlib.sha256(signer_cert.tbs_certificate_bytes).hexdigest()
for trusted_pem in trusted_certs:
try:
tc = x509.load_pem_x509_certificate(
trusted_pem.encode() if isinstance(trusted_pem, str) else trusted_pem
)
if hashlib.sha256(tc.tbs_certificate_bytes).hexdigest() == signer_fp:
return "trusted"
except Exception:
continue
except Exception:
pass
# Self-signed FieldWitness cert.
claim_generator = manifest.get("claim_generator", "")
if signer_pem and _cert_is_self_signed(signer_pem) and "FieldWitness" in claim_generator:
return "self-signed"
return "unknown"
def _parse_datetime_exif(dt_str: str) -> datetime | None:
"""Parse EXIF DateTimeOriginal format ("YYYY:MM:DD HH:MM:SS") to UTC datetime."""
try:
dt = datetime.strptime(dt_str, "%Y:%m:%d %H:%M:%S")
return dt.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
pass
# Try ISO 8601 fallback (used by some C2PA implementations).
try:
return datetime.fromisoformat(dt_str).astimezone(timezone.utc)
except (ValueError, TypeError):
return None
def _parse_gps_from_exif_assertion(exif_data: dict[str, Any]) -> Any | None:
"""Extract GeoLocation from a c2pa.exif assertion dict.
Handles both signed decimal degrees (C2PA preferred) and the
GPSLatitude/GPSLatitudeRef split form from older implementations.
Returns a GeoLocation or None.
"""
from fieldwitness.attest.models import GeoLocation
# Preferred: plain signed decimal degrees.
if "latitude" in exif_data and "longitude" in exif_data:
try:
return GeoLocation(
latitude=float(exif_data["latitude"]),
longitude=float(exif_data["longitude"]),
)
except (TypeError, ValueError):
pass
# Split form: GPSLatitude + GPSLatitudeRef.
gps_lat = exif_data.get("GPSLatitude")
gps_lat_ref = exif_data.get("GPSLatitudeRef", "N")
gps_lon = exif_data.get("GPSLongitude")
gps_lon_ref = exif_data.get("GPSLongitudeRef", "E")
if gps_lat is not None and gps_lon is not None:
try:
lat = float(gps_lat)
lon = float(gps_lon)
if str(gps_lat_ref).upper() == "S":
lat = -lat
if str(gps_lon_ref).upper() == "W":
lon = -lon
return GeoLocation(latitude=lat, longitude=lon)
except (TypeError, ValueError):
pass
return None
def _build_capture_metadata(
manifest: dict[str, Any],
fw_assertions: dict[str, Any],
) -> dict[str, Any]:
"""Build a metadata dict (CaptureMetadata.to_dict() format) from a manifest.
Extracts fields from c2pa.exif and c2pa.creative.work assertions.
org.fieldwitness.* assertions are checked for data that would otherwise
be unavailable from standard C2PA fields.
Returns an empty dict if no relevant metadata is found.
"""
from fieldwitness.attest.models import CaptureDevice
metadata: dict[str, Any] = {}
assertions_by_label: dict[str, Any] = {}
for assertion in manifest.get("assertions", []):
label = assertion.get("label", "")
assertions_by_label[label] = assertion.get("data", {})
# ── c2pa.exif ────────────────────────────────────────────────────────────
exif_data = assertions_by_label.get("c2pa.exif", {})
if exif_data:
# Capture timestamp
dt_str = exif_data.get("DateTimeOriginal")
if dt_str:
dt = _parse_datetime_exif(str(dt_str))
if dt:
metadata["captured_at"] = dt.isoformat()
# GPS location
location = _parse_gps_from_exif_assertion(exif_data)
if location:
metadata["location"] = location.to_dict()
# Device
make = exif_data.get("Make")
model = exif_data.get("Model")
software = exif_data.get("Software")
if make or model or software:
device = CaptureDevice(
make=str(make).strip() if make else None,
model=str(model).strip() if model else None,
software=str(software).strip() if software else None,
)
metadata["device"] = device.to_dict()
# Dimensions
width = exif_data.get("PixelXDimension")
height = exif_data.get("PixelYDimension")
if width:
try:
metadata["width"] = int(width)
except (TypeError, ValueError):
pass
if height:
try:
metadata["height"] = int(height)
except (TypeError, ValueError):
pass
# ── c2pa.creative.work ────────────────────────────────────────────────────
cw_data = assertions_by_label.get("c2pa.creative.work", {})
if cw_data:
description = cw_data.get("description")
if description:
metadata["caption"] = str(description)
filename = cw_data.get("name")
if filename:
metadata["filename"] = str(filename)
keywords = cw_data.get("keywords")
if isinstance(keywords, list):
metadata["tags"] = [str(k) for k in keywords]
# ── Signer identity ───────────────────────────────────────────────────────
signer_cn = _extract_signer_cn(manifest)
if signer_cn:
metadata["c2pa_signer"] = signer_cn
# ── org.fieldwitness.attestation-id ──────────────────────────────────────
attest_id = fw_assertions.get("org.fieldwitness.attestation-id", {})
if attest_id.get("record_id"):
metadata["c2pa_record_id"] = attest_id["record_id"]
return metadata
def _build_image_hashes(
manifest: dict[str, Any],
fw_assertions: dict[str, Any],
image_data: bytes,
) -> Any:
"""Construct ImageHashes from available manifest data.
Preference order:
1. org.fieldwitness.perceptual-hashes assertion (full hash set).
2. c2pa.hash.data assertion SHA-256 (exact content binding).
3. Recompute SHA-256 from the provided image bytes as fallback.
"""
from fieldwitness.attest.models import ImageHashes
ph_assertion = fw_assertions.get("org.fieldwitness.perceptual-hashes", {})
# If FieldWitness exported this file, perceptual hashes are present.
if ph_assertion.get("sha256"):
return ImageHashes(
sha256=ph_assertion["sha256"],
phash=ph_assertion.get("phash", ""),
dhash=ph_assertion.get("dhash", ""),
ahash=ph_assertion.get("ahash"),
colorhash=ph_assertion.get("colorhash"),
crop_resistant=ph_assertion.get("crop_resistant"),
)
# Try c2pa.hash.data (hard binding hash).
assertions_by_label: dict[str, Any] = {}
for assertion in manifest.get("assertions", []):
label = assertion.get("label", "")
assertions_by_label[label] = assertion.get("data", {})
hash_data = assertions_by_label.get("c2pa.hash.data", {})
sha256_hex = ""
if hash_data:
# c2pa.hash.data may contain 'hash' as base64 or hex; we prefer to
# recompute from image_data since we have the bytes, which is more reliable.
pass
# Fallback: recompute from image bytes.
sha256_hex = sha256_hex or hashlib.sha256(image_data).hexdigest()
return ImageHashes(sha256=sha256_hex)
def _parse_fw_assertions(manifest: dict[str, Any]) -> dict[str, Any]:
"""Extract and parse org.fieldwitness.* assertions from a manifest dict.
Returns a dict keyed by assertion label. Values are the parsed assertion dicts.
Assertions with unrecognised schema_version are included raw, not rejected,
so callers can still inspect them.
"""
from fieldwitness.c2pa_bridge.vendor_assertions import (
LABEL_ATTESTATION_ID,
LABEL_CHAIN_RECORD,
LABEL_PERCEPTUAL_HASHES,
parse_attestation_id_assertion,
parse_chain_record_assertion,
parse_perceptual_hashes_assertion,
)
result: dict[str, Any] = {}
parsers = {
LABEL_PERCEPTUAL_HASHES: parse_perceptual_hashes_assertion,
LABEL_CHAIN_RECORD: parse_chain_record_assertion,
LABEL_ATTESTATION_ID: parse_attestation_id_assertion,
}
for assertion in manifest.get("assertions", []):
label = assertion.get("label", "")
data = assertion.get("data", {})
if not label.startswith("org.fieldwitness."):
continue
parser = parsers.get(label)
if parser is not None:
try:
result[label] = parser(data)
except ValueError:
# Unrecognised schema version — store raw.
result[label] = data
else:
result[label] = data
return result
# ── Primary import function ───────────────────────────────────────────────────
def import_c2pa(
image_data: bytes,
image_format: str,
trusted_certs: list[str] | None = None,
) -> C2PAImportResult:
"""Parse a C2PA manifest from image bytes and produce a FieldWitness record.
This is the primary import entry point. It reads the C2PA manifest store
embedded in *image_data*, extracts attestation-relevant fields, and
constructs an AttestationRecord. The result is best-effort: absent C2PA
fields are silently skipped rather than raising exceptions.
The returned AttestationRecord is an *import record*, not an attestation
of the image by the local identity. It uses a sentinel signature (64 zero
bytes) and preserves the original claim timestamp. Callers that want to
re-attest the imported content with their own key should pass the result
to create_attestation_from_hashes() separately.
Args:
image_data: Raw image bytes with an embedded C2PA manifest.
image_format: Image format string: "jpeg", "png", or "webp".
trusted_certs: Optional list of PEM-encoded X.509 certificates whose
signers should be evaluated as "trusted". When None, only
self-signed FieldWitness certs receive the "self-signed"
status; all others are "unknown".
Returns:
C2PAImportResult with populated fields. Check .success before using
.attestation_record.
Raises:
Nothing all exceptions are caught and returned as .error on a
C2PAImportResult(success=False, ...) result.
"""
if not has_c2pa():
return C2PAImportResult(
success=False,
manifests=[],
attestation_record=None,
fieldwitness_assertions={},
trust_status="unknown",
error=(
"c2pa-python is not installed. "
"Install it with: pip install 'fieldwitness[c2pa]'"
),
)
# All c2pa imports are deferred here.
try:
import c2pa # type: ignore[import]
except ImportError as exc:
return C2PAImportResult(
success=False,
manifests=[],
attestation_record=None,
fieldwitness_assertions={},
trust_status="unknown",
error=f"Failed to import c2pa-python: {exc}",
)
from fieldwitness.attest.models import AttestationRecord
# ── Determine MIME type ───────────────────────────────────────────────────
_mime_map = {
"jpeg": "image/jpeg",
"jpg": "image/jpeg",
"png": "image/png",
"webp": "image/webp",
}
mime = _mime_map.get(image_format.lower().strip("."))
if mime is None:
return C2PAImportResult(
success=False,
manifests=[],
attestation_record=None,
fieldwitness_assertions={},
trust_status="unknown",
error=(
f"Unsupported image format {image_format!r}. "
f"Supported: jpeg, png, webp."
),
)
# ── Read the manifest store ───────────────────────────────────────────────
import io
import json
try:
reader = c2pa.Reader(mime, io.BytesIO(image_data))
except Exception as exc:
return C2PAImportResult(
success=False,
manifests=[],
attestation_record=None,
fieldwitness_assertions={},
trust_status="invalid",
error=f"Failed to parse C2PA manifest: {exc}",
)
# c2pa.Reader exposes manifests as a JSON string via .json().
try:
manifest_store_json = reader.json()
manifest_store = json.loads(manifest_store_json) if manifest_store_json else {}
except Exception as exc:
return C2PAImportResult(
success=False,
manifests=[],
attestation_record=None,
fieldwitness_assertions={},
trust_status="invalid",
error=f"Failed to deserialise C2PA manifest JSON: {exc}",
)
# The manifest store may contain multiple manifests. The 'active_manifest'
# key (or equivalent) identifies the one relevant to this file.
manifests_dict: dict[str, Any] = manifest_store.get("manifests", {})
active_label = manifest_store.get("active_manifest", "")
# Build a flat list of manifest dicts for the result.
manifests_list: list[dict[str, Any]] = list(manifests_dict.values())
# Resolve the active manifest.
active_manifest: dict[str, Any] = {}
if active_label and active_label in manifests_dict:
active_manifest = manifests_dict[active_label]
elif manifests_list:
active_manifest = manifests_list[-1]
if not active_manifest:
return C2PAImportResult(
success=False,
manifests=manifests_list,
attestation_record=None,
fieldwitness_assertions={},
trust_status="unknown",
error="No manifests found in C2PA manifest store.",
)
# ── Check for validation errors reported by c2pa-python ──────────────────
validation_errors = manifest_store.get("validation_status") or []
has_error = any(
s.get("code", "").startswith("err.")
for s in (validation_errors if isinstance(validation_errors, list) else [])
)
if has_error:
active_manifest["_fw_invalid"] = True
# ── Parse FieldWitness vendor assertions ──────────────────────────────────
try:
fw_assertions = _parse_fw_assertions(active_manifest)
except Exception:
fw_assertions = {}
# ── Evaluate trust ────────────────────────────────────────────────────────
try:
trust_status = _evaluate_trust(active_manifest, trusted_certs)
except Exception:
trust_status = "unknown"
# ── Extract timestamp from claim ──────────────────────────────────────────
# C2PA claim timestamps live under manifest -> claim -> created (ISO 8601).
claim = active_manifest.get("claim", {}) or {}
ts_str = claim.get("created") or active_manifest.get("created")
attestation_timestamp: datetime
if ts_str:
try:
attestation_timestamp = datetime.fromisoformat(str(ts_str)).astimezone(timezone.utc)
except (ValueError, TypeError):
attestation_timestamp = datetime.now(timezone.utc)
else:
attestation_timestamp = datetime.now(timezone.utc)
# ── Build metadata dict ───────────────────────────────────────────────────
try:
metadata = _build_capture_metadata(active_manifest, fw_assertions)
except Exception:
metadata = {}
# ── Build image hashes ────────────────────────────────────────────────────
try:
image_hashes = _build_image_hashes(active_manifest, fw_assertions, image_data)
except Exception:
from fieldwitness.attest.models import ImageHashes
image_hashes = ImageHashes(sha256=hashlib.sha256(image_data).hexdigest())
# ── Construct the AttestationRecord ──────────────────────────────────────
# We use a sentinel signer fingerprint derived from the claim_generator
# string (or the signer CN) so that records from the same C2PA source are
# grouped together without requiring a local identity.
claim_generator = active_manifest.get("claim_generator", "c2pa-import")
signer_cn = _extract_signer_cn(active_manifest)
attestor_label = signer_cn or claim_generator or "c2pa-import"
# Fingerprint: SHA-256 of the label, first 32 hex chars.
attestor_fingerprint = hashlib.sha256(attestor_label.encode()).hexdigest()[:32]
# Sentinel signature: 64 zero bytes. Import records are not locally signed.
sentinel_sig = b"\x00" * 64
record = AttestationRecord(
image_hashes=image_hashes,
signature=sentinel_sig,
attestor_fingerprint=attestor_fingerprint,
timestamp=attestation_timestamp,
metadata=metadata,
content_type="image",
)
return C2PAImportResult(
success=True,
manifests=manifests_list,
attestation_record=record,
fieldwitness_assertions=fw_assertions,
trust_status=trust_status,
error=None,
)

View File

@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
@click.version_option(package_name="fieldwitness") @click.version_option(package_name="fieldwitness")
@click.pass_context @click.pass_context
def main(ctx, data_dir, json_output): def main(ctx, data_dir, json_output):
"""FieldWitness — FieldWitness""" """FieldWitness — offline-first evidence integrity for journalists and NGOs."""
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj["json"] = json_output ctx.obj["json"] = json_output
@ -160,7 +160,32 @@ def status(as_json):
@click.option("--no-https", is_flag=True, help="Disable HTTPS") @click.option("--no-https", is_flag=True, help="Disable HTTPS")
@click.option("--debug", is_flag=True, help="Debug mode (Flask dev server)") @click.option("--debug", is_flag=True, help="Debug mode (Flask dev server)")
@click.option("--workers", default=4, type=int, help="Number of worker threads") @click.option("--workers", default=4, type=int, help="Number of worker threads")
def serve(host, port, no_https, debug, workers): @click.option(
"--tor",
"enable_tor",
is_flag=True,
default=False,
help="Expose drop box as a Tor hidden service (.onion address). Requires stem and a running Tor daemon.",
)
@click.option(
"--tor-control-port",
default=9051,
type=int,
show_default=True,
help="Tor daemon control port (ControlPort in torrc).",
)
@click.option(
"--tor-password",
default=None,
help="Tor control port password (HashedControlPassword in torrc). Omit for cookie auth.",
)
@click.option(
"--tor-transient",
is_flag=True,
default=False,
help="Use an ephemeral (non-persistent) hidden service. Address changes on each restart.",
)
def serve(host, port, no_https, debug, workers, enable_tor, tor_control_port, tor_password, tor_transient):
"""Start the FieldWitness web UI.""" """Start the FieldWitness web UI."""
from fieldwitness.config import FieldWitnessConfig from fieldwitness.config import FieldWitnessConfig
@ -186,6 +211,15 @@ def serve(host, port, no_https, debug, workers):
_start_deadman_thread(interval_seconds=60) _start_deadman_thread(interval_seconds=60)
# ── Tor hidden service ────────────────────────────────────────────────
if enable_tor:
_start_tor_hidden_service(
target_port=port,
tor_control_port=tor_control_port,
tor_password=tor_password,
persistent=not tor_transient,
)
proto = "https" if ssl_context else "http" proto = "https" if ssl_context else "http"
click.echo(f"Starting FieldWitness on {proto}://{host}:{port}") click.echo(f"Starting FieldWitness on {proto}://{host}:{port}")
@ -222,6 +256,66 @@ def serve(host, port, no_https, debug, workers):
app.run(host=host, port=port, debug=False, ssl_context=ssl_context) app.run(host=host, port=port, debug=False, ssl_context=ssl_context)
def _start_tor_hidden_service(
target_port: int,
tor_control_port: int,
tor_password: str | None,
persistent: bool,
) -> None:
"""Start a Tor hidden service and print the .onion address.
Called from the serve command when --tor is passed. All errors are
printed as user-friendly messages rather than tracebacks because Tor
configuration problems are operator issues, not bugs.
"""
from fieldwitness._availability import has_tor
if not has_tor():
click.echo(
"ERROR: Tor support requires the stem library.\n"
"Install it with:\n"
" pip install 'fieldwitness[tor]'\n"
"Then ensure Tor is installed and running:\n"
" apt install tor # Debian/Ubuntu\n"
" brew install tor # macOS\n"
"And add to /etc/tor/torrc:\n"
" ControlPort 9051\n"
" CookieAuthentication 1",
err=True,
)
raise SystemExit(1)
from fieldwitness.fieldkit.tor import OnionServiceInfo, TorControlError, start_onion_service
persistence_label = "persistent" if persistent else "transient (ephemeral)"
click.echo(f"Starting {persistence_label} Tor hidden service on control port {tor_control_port}...")
click.echo("Waiting for Tor to publish the hidden service descriptor (may take ~30s)...")
try:
info: OnionServiceInfo = start_onion_service(
target_port=target_port,
tor_control_port=tor_control_port,
tor_control_password=tor_password,
persistent=persistent,
)
except TorControlError as exc:
click.echo(f"ERROR: {exc}", err=True)
raise SystemExit(1) from exc
# Print the .onion address prominently so the operator can share it
sep = "=" * 60
click.echo(sep)
click.echo("TOR HIDDEN SERVICE ACTIVE")
click.echo(sep)
click.echo(f" .onion address : {info.onion_address}")
click.echo(f" Drop box URL : http://{info.onion_address}/dropbox/upload/<token>")
click.echo(f" Persistent : {'yes (key saved to ~/.fwmetadata/fieldkit/tor/)' if info.is_persistent else 'no (new address on restart)'}")
click.echo(sep)
click.echo("Sources must use Tor Browser to access the .onion URL.")
click.echo("Share the drop box upload URL over a secure channel (Signal, in person).")
click.echo(sep)
def _deadman_enforcement_loop(interval_seconds: int = 60) -> None: def _deadman_enforcement_loop(interval_seconds: int = 60) -> None:
""" """
Background enforcement loop for the dead man's switch. Background enforcement loop for the dead man's switch.
@ -368,6 +462,14 @@ except ImportError:
click.echo("Error: attest package not found. Install with: pip install attest") click.echo("Error: attest package not found. Install with: pip install attest")
# ── C2PA sub-commands ──────────────────────────────────────────────────────────
from fieldwitness.c2pa_bridge.cli import c2pa_group # noqa: E402
main.add_command(c2pa_group, "c2pa")
def _attest_file( def _attest_file(
file_path: Path, file_path: Path,
private_key, private_key,

View File

@ -120,6 +120,8 @@ images/ — Original image files
manifest.json Attestation records and chain data manifest.json Attestation records and chain data
public_key.pem Signer's Ed25519 public key public_key.pem Signer's Ed25519 public key
verify.py Standalone verification script verify.py Standalone verification script
summary.html Human-readable one-page summary (open in any browser)
summary.pdf PDF version of the summary (if available)
README.txt This file README.txt This file
VERIFICATION VERIFICATION
@ -209,10 +211,17 @@ if __name__ == "__main__":
main() main()
''' '''
from fieldwitness.evidence_summary import build_summaries
from fieldwitness import __version__
summaries = build_summaries(manifest, version=__version__)
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf: with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("manifest.json", json.dumps(manifest, indent=2)) zf.writestr("manifest.json", json.dumps(manifest, indent=2))
zf.writestr("README.txt", readme) zf.writestr("README.txt", readme)
zf.writestr("verify.py", verify_script) zf.writestr("verify.py", verify_script)
for summary_name, summary_bytes in summaries.items():
zf.writestr(summary_name, summary_bytes)
if public_key_path and public_key_path.exists(): if public_key_path and public_key_path.exists():
zf.write(public_key_path, "public_key.pem") zf.write(public_key_path, "public_key.pem")
for img_path in image_paths: for img_path in image_paths:

View File

@ -0,0 +1,346 @@
"""
Evidence package summary generator.
Produces a human-readable one-page summary of an evidence package for legal use.
Always generates summary.html (no optional dependencies required).
Generates summary.pdf when xhtml2pdf is installed (pip install fieldwitness[evidence-pdf]).
"""
from __future__ import annotations
import html
from datetime import UTC, datetime
from typing import Any
# ---------------------------------------------------------------------------
# HTML generation
# ---------------------------------------------------------------------------
_CSS = """
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: Georgia, "Times New Roman", serif;
font-size: 11pt;
color: #111;
background: #fff;
padding: 28pt 36pt 24pt 36pt;
max-width: 720pt;
margin: 0 auto;
}
h1 {
font-size: 16pt;
font-weight: bold;
letter-spacing: 0.04em;
border-bottom: 2px solid #222;
padding-bottom: 5pt;
margin-bottom: 14pt;
}
h2 {
font-size: 11pt;
font-weight: bold;
text-transform: uppercase;
letter-spacing: 0.08em;
color: #444;
border-bottom: 1px solid #ccc;
padding-bottom: 3pt;
margin: 14pt 0 7pt 0;
}
table {
width: 100%;
border-collapse: collapse;
margin-bottom: 4pt;
}
td {
padding: 3pt 6pt;
vertical-align: top;
line-height: 1.5;
}
td.label {
width: 38%;
color: #555;
font-size: 9.5pt;
padding-right: 10pt;
}
td.value {
font-size: 10pt;
word-break: break-all;
}
.mono {
font-family: "Courier New", Courier, monospace;
font-size: 9pt;
background: #f4f4f4;
padding: 1pt 3pt;
border-radius: 2pt;
}
.verification-note {
background: #f9f9f9;
border-left: 3pt solid #888;
padding: 8pt 10pt;
margin-top: 12pt;
font-size: 9.5pt;
line-height: 1.6;
}
footer {
margin-top: 18pt;
border-top: 1px solid #ccc;
padding-top: 5pt;
font-size: 8.5pt;
color: #777;
display: flex;
justify-content: space-between;
}
.badge {
display: inline-block;
background: #222;
color: #fff;
font-size: 8pt;
padding: 1pt 5pt;
border-radius: 2pt;
letter-spacing: 0.05em;
vertical-align: middle;
}
"""
def _row(label: str, value: str, mono: bool = False) -> str:
val_html = f'<span class="mono">{html.escape(value)}</span>' if mono else html.escape(value)
return (
f"<tr>"
f'<td class="label">{html.escape(label)}</td>'
f'<td class="value">{val_html}</td>'
f"</tr>"
)
def _fmt_ts(iso_str: str | None) -> str:
"""Return 'YYYY-MM-DD HH:MM:SS UTC (Day, DD Month YYYY)' or 'N/A'."""
if not iso_str:
return "N/A"
try:
dt = datetime.fromisoformat(iso_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
machine = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
human = dt.strftime("%A, %d %B %Y")
return f"{machine} ({human})"
except ValueError:
return iso_str
def _abbrev_hash(h: str, keep: int = 16) -> str:
if len(h) <= keep * 2 + 3:
return h
return f"{h[:keep]}...{h[-keep:]}"
def generate_html_summary(manifest: dict[str, Any], version: str = "0.3.0") -> str:
"""Render a one-page HTML evidence summary from a manifest dict.
Args:
manifest: The manifest dict produced by export_evidence_package.
version: FieldWitness version string for the footer.
Returns:
Complete HTML document as a string.
"""
exported_at = manifest.get("exported_at", "")
investigation = manifest.get("investigation") or "N/A"
records: list[dict[str, Any]] = manifest.get("attestation_records", [])
chain_records: list[dict[str, Any]] = manifest.get("chain_records", [])
generated_at = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
# Pick the first record as representative (package typically focuses on one file)
rec = records[0] if records else {}
hashes: dict[str, Any] = rec.get("image_hashes", {})
sha256: str = hashes.get("sha256") or rec.get("sha256", "")
phash: str = hashes.get("phash", "")
dhash: str = hashes.get("dhash", "")
attestor_fp: str = rec.get("attestor_fingerprint", "N/A")
attest_ts: str = rec.get("timestamp", "")
filename: str = rec.get("filename", "N/A")
file_size: str = rec.get("file_size", "")
chain_len = len(chain_records)
chain_index: str = "N/A"
if chain_records:
chain_index = str(chain_records[-1].get("chain_index", "N/A"))
# RFC 3161 anchors that cover this evidence (stored in manifest when present)
anchors: list[dict[str, Any]] = manifest.get("anchors", [])
# --- Section: File Information ---
file_rows = [
_row("Filename", filename),
]
if file_size:
file_rows.append(_row("File size", file_size))
file_rows += [
_row("SHA-256 (full)", sha256, mono=True),
_row("SHA-256 (abbreviated)", _abbrev_hash(sha256) if sha256 else "N/A", mono=True),
]
# --- Section: Attestation Details ---
attest_rows = [
_row("Attestor fingerprint", attestor_fp, mono=True),
_row("Attestation timestamp", _fmt_ts(attest_ts)),
_row("Investigation", investigation),
_row("Package exported", _fmt_ts(exported_at)),
]
# --- Section: Chain Position ---
chain_rows = [
_row("Record index in chain", chain_index),
_row("Total chain records", str(chain_len) if chain_len else "N/A"),
]
if chain_records:
head = chain_records[-1]
chain_rows.append(_row("Chain head hash", head.get("record_hash", ""), mono=True))
# --- Section: External Timestamps ---
anchor_rows = []
if anchors:
for i, anc in enumerate(anchors, 1):
anchor_info = anc.get("anchor", anc)
ts = anchor_info.get("anchored_at") or anchor_info.get("timestamp", "")
digest = anchor_info.get("digest", "")
label = f"Anchor {i}"
anchor_rows.append(_row(label, _fmt_ts(ts)))
if digest:
anchor_rows.append(_row(f" Digest", digest, mono=True))
else:
anchor_rows.append(_row("RFC 3161 anchors", "None recorded in this package"))
# --- Section: Perceptual Hashes ---
perceptual_rows = []
if phash or dhash:
if phash:
perceptual_rows.append(_row("pHash (DCT perceptual)", phash, mono=True))
if dhash:
perceptual_rows.append(_row("dHash (difference)", dhash, mono=True))
perceptual_rows.append(
_row(
"Note",
"Perceptual hashes survive format conversion and mild compression. "
"They allow matching even if the file was re-saved.",
)
)
else:
perceptual_rows.append(_row("Perceptual hashes", "Not applicable (non-image file)"))
# --- Verification instructions ---
verification_text = (
"This document is a human-readable summary of a cryptographically attested evidence "
"package. The package includes the original file(s), a machine-readable manifest "
"(manifest.json), a signer's Ed25519 public key, and a standalone verification "
"script (verify.py). To verify independently: install Python 3.11 or later and the "
"cryptography package (pip install cryptography), then run: python verify.py "
"inside the unzipped package. The script confirms that the file hashes match "
"the attestation records and that the append-only chain is unbroken. "
"No FieldWitness installation is required for verification."
)
def section(title: str, rows: list[str]) -> str:
return f"<h2>{html.escape(title)}</h2><table>{''.join(rows)}</table>"
multi_file_note = ""
if len(records) > 1:
multi_file_note = (
f'<p style="font-size:9pt;color:#666;margin-bottom:8pt;">'
f"This package contains {len(records)} attested file(s). "
f"The details below reflect the first record. "
f"See manifest.json for the complete list."
f"</p>"
)
body = f"""
{multi_file_note}
{section("File Information", file_rows)}
{section("Attestation Details", attest_rows)}
{section("Chain Position", chain_rows)}
{section("External Timestamps (RFC 3161)", anchor_rows)}
"""
if phash or dhash:
body += section("Perceptual Hashes", perceptual_rows)
body += f"""
<div class="verification-note">
<strong>What this document proves and how to verify it</strong><br><br>
{html.escape(verification_text)}
</div>
"""
return f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>FieldWitness Evidence Summary</title>
<style>{_CSS}</style>
</head>
<body>
<h1>FieldWitness Evidence Summary <span class="badge">v{html.escape(version)}</span></h1>
{body}
<footer>
<span>Generated by FieldWitness v{html.escape(version)} &mdash; https://fieldwitness.io</span>
<span>{html.escape(generated_at)}</span>
</footer>
</body>
</html>"""
# ---------------------------------------------------------------------------
# PDF generation (optional — requires xhtml2pdf)
# ---------------------------------------------------------------------------
def generate_pdf_summary(
manifest: dict[str, Any],
version: str = "0.3.0",
) -> bytes | None:
"""Render the summary as a PDF using xhtml2pdf (pisa).
Returns:
PDF bytes, or None if xhtml2pdf is not installed.
"""
try:
from xhtml2pdf import pisa # type: ignore[import-untyped]
except ImportError:
return None
import io
html_src = generate_html_summary(manifest, version=version)
buf = io.BytesIO()
result = pisa.CreatePDF(html_src, dest=buf)
if result.err:
return None
return buf.getvalue()
# ---------------------------------------------------------------------------
# Combined entry point used by evidence.py
# ---------------------------------------------------------------------------
def build_summaries(
manifest: dict[str, Any],
version: str = "0.3.0",
) -> dict[str, bytes]:
"""Build all available summary formats.
Always returns ``summary.html``.
Returns ``summary.pdf`` only when xhtml2pdf is available.
Args:
manifest: The evidence manifest dict.
version: FieldWitness version string.
Returns:
Mapping of filename -> bytes ready to write into the ZIP.
"""
out: dict[str, bytes] = {}
out["summary.html"] = generate_html_summary(manifest, version=version).encode("utf-8")
pdf = generate_pdf_summary(manifest, version=version)
if pdf is not None:
out["summary.pdf"] = pdf
return out

View File

@ -94,7 +94,14 @@ def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") ->
steps: list[tuple[str, Callable]] = [ steps: list[tuple[str, Callable]] = [
("destroy_identity_keys", lambda: _secure_delete_dir(paths.IDENTITY_DIR)), ("destroy_identity_keys", lambda: _secure_delete_dir(paths.IDENTITY_DIR)),
("destroy_channel_key", lambda: _secure_delete_file(paths.CHANNEL_KEY_FILE)), ("destroy_channel_key", lambda: _secure_delete_file(paths.CHANNEL_KEY_FILE)),
# Trusted collaborator keys are security-sensitive: they determine who
# the device accepts attestations from. Destroy them with the key material.
("destroy_trusted_keys", lambda: _secure_delete_dir(paths.TRUSTED_KEYS_DIR)),
("destroy_flask_secret", lambda: _secure_delete_file(paths.INSTANCE_DIR / ".secret_key")), ("destroy_flask_secret", lambda: _secure_delete_file(paths.INSTANCE_DIR / ".secret_key")),
# Tor hidden service key — destroying this severs the link between the
# operator and the .onion address. Treated as key material: purged in
# KEYS_ONLY scope so post-purge Tor traffic cannot be attributed.
("destroy_tor_hidden_service_key", lambda: _secure_delete_dir(paths.TOR_HIDDEN_SERVICE_DIR)),
] ]
if scope == PurgeScope.ALL: if scope == PurgeScope.ALL:
@ -107,6 +114,9 @@ def execute_purge(scope: PurgeScope = PurgeScope.ALL, reason: str = "manual") ->
("destroy_attestation_log", lambda: _secure_delete_dir(paths.ATTESTATIONS_DIR)), ("destroy_attestation_log", lambda: _secure_delete_dir(paths.ATTESTATIONS_DIR)),
("destroy_chain_data", lambda: _secure_delete_dir(paths.CHAIN_DIR)), ("destroy_chain_data", lambda: _secure_delete_dir(paths.CHAIN_DIR)),
("destroy_temp_files", lambda: _secure_delete_dir(paths.TEMP_DIR)), ("destroy_temp_files", lambda: _secure_delete_dir(paths.TEMP_DIR)),
# Carrier history reveals which images were used as stego carriers.
("destroy_carrier_history", lambda: _secure_delete_file(paths.CARRIER_HISTORY)),
("destroy_backup_record", lambda: _secure_delete_file(paths.LAST_BACKUP)),
("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)), ("destroy_config", lambda: _secure_delete_file(paths.CONFIG_FILE)),
("clear_journald", _clear_system_logs), ("clear_journald", _clear_system_logs),
("deep_forensic_scrub", _deep_forensic_scrub), ("deep_forensic_scrub", _deep_forensic_scrub),

View File

@ -0,0 +1,278 @@
"""
Tor hidden service management for the FieldWitness drop box.
Wraps the stem library to start a Tor onion service pointing at the local
FieldWitness server. stem is an optional dependency -- import this module
only after calling has_tor() from fieldwitness._availability.
Usage::
from fieldwitness.fieldkit.tor import start_onion_service, OnionServiceInfo
info = start_onion_service(target_port=5000, persistent=True)
print(info.onion_address) # e.g. "abc123def456...xyz.onion"
The hidden service key is stored at::
~/.fwmetadata/fieldkit/tor/hidden_service/
This directory is covered by the killswitch: PurgeScope.KEYS_ONLY destroys it
along with other key material so that the .onion address cannot be linked to
the operator after a purge.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
# stem is optional -- guard every import so the rest of FieldWitness works
# even when stem is not installed.
try:
import stem # noqa: F401
_HAS_STEM = True
except ImportError:
_HAS_STEM = False
class TorNotAvailableError(Exception):
"""Raised when stem is not installed or Tor daemon is unreachable."""
class TorControlError(Exception):
"""Raised when the Tor control port connection fails."""
@dataclass(frozen=True)
class OnionServiceInfo:
"""Information about a running Tor hidden service."""
onion_address: str # e.g. "abc123...xyz.onion"
target_port: int
is_persistent: bool
@property
def onion_url(self) -> str:
"""Full http:// URL for the onion service."""
return f"http://{self.onion_address}"
def start_onion_service(
target_port: int = 5000,
tor_control_port: int = 9051,
tor_control_password: str | None = None,
persistent: bool = True,
data_dir: Path | None = None,
) -> OnionServiceInfo:
"""Start a Tor hidden service pointing at the local target_port.
Connects to a running Tor daemon on the control port, creates a hidden
service mapping port 80 to ``target_port`` on 127.0.0.1, and returns the
.onion address.
Args:
target_port:
Local port that FieldWitness is listening on (e.g. 5000).
tor_control_port:
Tor daemon control port (default 9051, from torrc ControlPort).
tor_control_password:
Hashed password for the control port, if HashedControlPassword
is set in torrc. Pass None for cookie auth or no auth.
persistent:
If True, save the hidden service private key so the same .onion
address is reused across restarts. If False, an ephemeral
transient hidden service is created (new address each run).
data_dir:
Override the directory used for persistent key storage.
Defaults to ``~/.fwmetadata/fieldkit/tor/hidden_service/``.
Returns:
OnionServiceInfo with the .onion address and port.
Raises:
TorNotAvailableError: If stem is not installed.
TorControlError: If the Tor daemon cannot be reached or authentication fails.
"""
if not _HAS_STEM:
raise TorNotAvailableError(
"stem is not installed. Install it with: pip install 'fieldwitness[tor]'"
)
from stem.control import Controller
# Resolve key storage directory
if data_dir is None:
import fieldwitness.paths as paths
data_dir = paths.TOR_HIDDEN_SERVICE_DIR
# ── Connect to Tor control port ────────────────────────────────────────
logger.info(
"Connecting to Tor control port at 127.0.0.1:%d", tor_control_port
)
try:
controller = Controller.from_port(port=tor_control_port)
except Exception as exc:
raise TorControlError(
f"Cannot connect to Tor control port {tor_control_port}: {exc}\n"
"Ensure Tor is running and 'ControlPort 9051' is set in /etc/tor/torrc."
) from exc
try:
# ── Authenticate ───────────────────────────────────────────────────
try:
if tor_control_password is not None:
controller.authenticate(password=tor_control_password)
else:
controller.authenticate()
except Exception as exc:
raise TorControlError(
f"Tor control port authentication failed: {exc}\n"
"If HashedControlPassword is set in torrc, pass --tor-password.\n"
"Alternatively, use CookieAuthentication 1 in torrc."
) from exc
# ── Determine whether to use a persistent key ──────────────────────
if persistent:
onion_address, controller = _start_persistent_service(
controller=controller,
target_port=target_port,
key_dir=data_dir,
)
else:
onion_address, controller = _start_transient_service(
controller=controller,
target_port=target_port,
)
except (TorNotAvailableError, TorControlError):
controller.close()
raise
except Exception as exc:
controller.close()
raise TorControlError(f"Failed to create hidden service: {exc}") from exc
# Keep the controller alive -- the hidden service is destroyed when it closes.
# Attach to the module-level list so the GC does not collect it.
_store_controller(controller)
logger.info("Tor hidden service active at %s", onion_address)
return OnionServiceInfo(
onion_address=onion_address,
target_port=target_port,
is_persistent=persistent,
)
# ── Internal helpers ───────────────────────────────────────────────────────
def _start_persistent_service(
controller,
target_port: int,
key_dir: Path,
) -> tuple[str, object]:
"""Create or resume a persistent hidden service using a saved key.
The ED25519-V3 key is stored in ``key_dir/hs_ed25519_secret_key`` in the
format that stem's CREATE_EPHEMERAL expects (raw base64 blob, not a torrc
HiddenServiceDir). On first run the key is generated by Tor and saved.
On subsequent runs the saved key is loaded and passed back to Tor.
Returns (onion_address, controller).
"""
key_dir.mkdir(parents=True, exist_ok=True)
key_dir.chmod(0o700)
key_file = key_dir / "hs_ed25519_secret_key"
if key_file.exists():
# Resume existing hidden service with the saved private key
logger.info("Loading persistent Tor hidden service key from %s", key_file)
key_data = key_file.read_text().strip()
# key_data is stored as "ED25519-V3:<base64>"
key_type, key_content = key_data.split(":", 1)
result = controller.create_ephemeral_hidden_service(
{80: target_port},
key_type=key_type,
key_content=key_content,
await_publication=True,
)
else:
# First run: let Tor generate the key, then save it
logger.info("Generating new persistent Tor hidden service key at %s", key_file)
result = controller.create_ephemeral_hidden_service(
{80: target_port},
key_type="NEW",
key_content="ED25519-V3",
await_publication=True,
)
# Persist the key for future restarts
key_file.write_text(f"{result.private_key_type}:{result.private_key}")
key_file.chmod(0o600)
logger.info("Tor hidden service key saved to %s", key_file)
onion_address = result.service_id + ".onion"
return onion_address, controller
def _start_transient_service(
controller,
target_port: int,
) -> tuple[str, object]:
"""Create an ephemeral hidden service with no key persistence.
The .onion address changes each time. Useful for one-off intake sessions
where a fixed address is not needed.
Returns (onion_address, controller).
"""
logger.info("Creating transient (ephemeral) Tor hidden service")
result = controller.create_ephemeral_hidden_service(
{80: target_port},
key_type="NEW",
key_content="ED25519-V3",
await_publication=True,
)
onion_address = result.service_id + ".onion"
return onion_address, controller
# Module-level holder so the controller is not garbage-collected while the
# process is running. A list is used so it can be mutated from module scope.
_controllers: list[object] = []
def _store_controller(controller) -> None:
"""Keep a reference to a controller so the GC does not collect it."""
_controllers.append(controller)
def stop_onion_services() -> None:
"""Detach all running hidden services and close their control connections.
Call this during shutdown or immediately before a purge to ensure Tor
revokes the hidden service descriptor from the directory.
"""
for controller in _controllers:
try:
controller.remove_ephemeral_hidden_service(
controller.list_ephemeral_hidden_services()[0]
)
except Exception:
pass
try:
controller.close()
except Exception:
pass
_controllers.clear()
logger.info("All Tor hidden services stopped")

View File

@ -294,19 +294,17 @@ class KeystoreManager:
# Record backup timestamp # Record backup timestamp
import json import json
meta_path = self._identity_dir.parent / "last_backup.json"
from datetime import UTC, datetime from datetime import UTC, datetime
meta_path.write_text(json.dumps({"timestamp": datetime.now(UTC).isoformat(), "path": str(dest_file)})) _paths.LAST_BACKUP.write_text(json.dumps({"timestamp": datetime.now(UTC).isoformat(), "path": str(dest_file)}))
return dest_file return dest_file
def last_backup_info(self) -> dict | None: def last_backup_info(self) -> dict | None:
"""Get last backup timestamp, or None if never backed up.""" """Get last backup timestamp, or None if never backed up."""
import json import json
meta_path = self._identity_dir.parent / "last_backup.json" if _paths.LAST_BACKUP.exists():
if meta_path.exists(): return json.loads(_paths.LAST_BACKUP.read_text())
return json.loads(meta_path.read_text())
return None return None
def is_backup_overdue(self, reminder_days: int = 7) -> bool: def is_backup_overdue(self, reminder_days: int = 7) -> bool:
@ -323,7 +321,7 @@ class KeystoreManager:
@property @property
def _trusted_keys_dir(self) -> Path: def _trusted_keys_dir(self) -> Path:
return self._identity_dir.parent / "trusted_keys" return _paths.TRUSTED_KEYS_DIR
def trust_key(self, public_key_pem: bytes, name: str) -> str: def trust_key(self, public_key_pem: bytes, name: str) -> str:
"""Import a collaborator's Ed25519 public key into the trust store. """Import a collaborator's Ed25519 public key into the trust store.

View File

@ -1,7 +1,7 @@
""" """
Centralized path constants for FieldWitness. Centralized path constants for FieldWitness.
All ~/.fieldwitness/* paths are defined here. Every module that needs a path All ~/.fwmetadata/* paths are defined here. Every module that needs a path
imports from this module no hardcoded paths anywhere else. imports from this module no hardcoded paths anywhere else.
The base directory can be overridden via: The base directory can be overridden via:
@ -10,13 +10,16 @@ The base directory can be overridden via:
All derived paths (IDENTITY_DIR, CHAIN_DIR, etc.) are computed lazily All derived paths (IDENTITY_DIR, CHAIN_DIR, etc.) are computed lazily
from BASE_DIR so that runtime overrides propagate correctly. from BASE_DIR so that runtime overrides propagate correctly.
The default directory name (.fwmetadata) is intentionally innocuous a
directory called .fieldwitness on a seized device is self-incriminating.
""" """
import os import os
from pathlib import Path from pathlib import Path
# Allow override for testing or multi-instance deployments # Allow override for testing or multi-instance deployments
BASE_DIR = Path(os.environ.get("FIELDWITNESS_DATA_DIR", Path.home() / ".fieldwitness")) BASE_DIR = Path(os.environ.get("FIELDWITNESS_DATA_DIR", Path.home() / ".fwmetadata"))
# Path definitions relative to BASE_DIR. These are resolved lazily via # Path definitions relative to BASE_DIR. These are resolved lazily via
# __getattr__ so that changes to BASE_DIR propagate to all derived paths. # __getattr__ so that changes to BASE_DIR propagate to all derived paths.
@ -69,6 +72,20 @@ _PATH_DEFS: dict[str, tuple[str, ...]] = {
"SECRET_KEY_FILE": ("instance", ".secret_key"), "SECRET_KEY_FILE": ("instance", ".secret_key"),
# Unified config # Unified config
"CONFIG_FILE": ("config.json",), "CONFIG_FILE": ("config.json",),
# Collaborator Ed25519 public keys (trust store).
# Kept at the BASE_DIR level (not under identity/) so it survives identity
# rotation without path changes — trusted peers are independent of our own key.
"TRUSTED_KEYS_DIR": ("trusted_keys",),
# Carrier image reuse tracking database.
"CARRIER_HISTORY": ("carrier_history.json",),
# Last backup timestamp tracking.
"LAST_BACKUP": ("last_backup.json",),
# Tor hidden service key storage.
# Kept under fieldkit/ and treated as key material: the killswitch
# destroys it during KEYS_ONLY purge so the .onion address cannot be
# linked to the operator after a purge.
"TOR_DIR": ("fieldkit", "tor"),
"TOR_HIDDEN_SERVICE_DIR": ("fieldkit", "tor", "hidden_service"),
} }
@ -94,6 +111,7 @@ def ensure_dirs() -> None:
__getattr__("USB_DIR"), __getattr__("USB_DIR"),
__getattr__("TEMP_DIR"), __getattr__("TEMP_DIR"),
__getattr__("INSTANCE_DIR"), __getattr__("INSTANCE_DIR"),
__getattr__("TRUSTED_KEYS_DIR"),
] ]
for d in dirs: for d in dirs:
d.mkdir(parents=True, exist_ok=True) d.mkdir(parents=True, exist_ok=True)

View File

@ -18,9 +18,9 @@ class CarrierTracker:
"""Tracks carrier image usage to warn on reuse.""" """Tracks carrier image usage to warn on reuse."""
def __init__(self, db_path: Path | None = None): def __init__(self, db_path: Path | None = None):
from fieldwitness.paths import BASE_DIR import fieldwitness.paths as _paths
self._db_path = db_path or (BASE_DIR / "carrier_history.json") self._db_path = db_path or _paths.CARRIER_HISTORY
def _load(self) -> dict[str, dict]: def _load(self) -> dict[str, dict]:
if self._db_path.exists(): if self._db_path.exists():

821
tests/test_c2pa_bridge.py Normal file
View File

@ -0,0 +1,821 @@
"""
Comprehensive tests for the C2PA bridge.
Tests are split into three sections:
1. vendor_assertions.py -- pure dict I/O, no c2pa-python required.
2. cert.py -- X.509 certificate generation, no c2pa-python required.
3. export.py -- GPS downsampling and assertion building; the full
Builder.sign() path is skipped when c2pa-python is absent.
c2pa-python-dependent tests are guarded with pytest.importorskip("c2pa") so the
suite stays green in environments that only have the base [dev] extras.
"""
from __future__ import annotations
import datetime
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.x509.oid import NameOID
from fieldwitness.attest.models import (
AttestationRecord,
CaptureDevice,
CaptureMetadata,
GeoLocation,
ImageHashes,
)
from fieldwitness.c2pa_bridge.cert import (
_CERT_VALIDITY_YEARS,
_generate_cert,
get_or_create_c2pa_cert,
)
from fieldwitness.c2pa_bridge.export import (
_build_exif_assertion,
_downsample_gps,
)
from fieldwitness.c2pa_bridge.vendor_assertions import (
DEFAULT_PHASH_THRESHOLD,
SCHEMA_VERSION,
build_attestation_id_assertion,
build_chain_record_assertion,
build_perceptual_hashes_assertion,
parse_attestation_id_assertion,
parse_chain_record_assertion,
parse_perceptual_hashes_assertion,
)
from fieldwitness.config import FieldWitnessConfig
from fieldwitness.federation.models import AttestationChainRecord, EntropyWitnesses
# ── Shared helpers ────────────────────────────────────────────────────────────
def _make_hashes(
sha256: str = "a" * 64,
phash: str = "abc123",
dhash: str = "def456",
ahash: str | None = "ghi789",
colorhash: str | None = "jkl012",
crop_resistant: str | None = "mno345",
) -> ImageHashes:
return ImageHashes(
sha256=sha256,
phash=phash,
dhash=dhash,
ahash=ahash,
colorhash=colorhash,
crop_resistant=crop_resistant,
)
def _make_record(
hashes: ImageHashes | None = None,
fingerprint: str = "fp" * 16,
metadata: dict[str, Any] | None = None,
content_type: str = "image",
) -> AttestationRecord:
if hashes is None:
hashes = _make_hashes()
key = Ed25519PrivateKey.generate()
sig = key.sign(b"dummy")
return AttestationRecord(
image_hashes=hashes,
signature=sig,
attestor_fingerprint=fingerprint,
timestamp=datetime.datetime(2024, 3, 15, 12, 0, 0, tzinfo=datetime.UTC),
metadata=metadata or {},
content_type=content_type,
)
def _make_chain_record(
chain_index: int = 0,
with_entropy: bool = True,
) -> AttestationChainRecord:
ew = (
EntropyWitnesses(
sys_uptime=3600.5,
fs_snapshot=b"\xab" * 16,
proc_entropy=42,
boot_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
)
if with_entropy
else None
)
return AttestationChainRecord(
version=1,
record_id=bytes(range(16)),
chain_index=chain_index,
prev_hash=b"\x00" * 32,
content_hash=b"\xff" * 32,
content_type="soosef/attestation-v1",
claimed_ts=1710504000_000000,
entropy_witnesses=ew,
signer_pubkey=b"\x11" * 32,
signature=b"\x22" * 64,
)
# ═══════════════════════════════════════════════════════════════════════════════
# Section 1: vendor_assertions.py
# ═══════════════════════════════════════════════════════════════════════════════
class TestBuildPerceptualHashesAssertion:
def test_all_hash_types_present(self) -> None:
hashes = _make_hashes()
payload = build_perceptual_hashes_assertion(hashes)
assert payload["schema_version"] == SCHEMA_VERSION
assert payload["sha256"] == hashes.sha256
assert payload["phash"] == hashes.phash
assert payload["dhash"] == hashes.dhash
assert payload["ahash"] == hashes.ahash
assert payload["colorhash"] == hashes.colorhash
assert payload["crop_resistant"] == hashes.crop_resistant
assert payload["threshold"] == DEFAULT_PHASH_THRESHOLD
def test_missing_optional_hashes_omitted(self) -> None:
hashes = ImageHashes(sha256="b" * 64, phash="", dhash="")
payload = build_perceptual_hashes_assertion(hashes)
assert "phash" not in payload
assert "dhash" not in payload
assert "ahash" not in payload
assert "colorhash" not in payload
assert "crop_resistant" not in payload
assert payload["sha256"] == "b" * 64
def test_custom_threshold(self) -> None:
hashes = _make_hashes()
payload = build_perceptual_hashes_assertion(hashes, threshold=5)
assert payload["threshold"] == 5
def test_schema_version_is_v1(self) -> None:
payload = build_perceptual_hashes_assertion(_make_hashes())
assert payload["schema_version"] == "v1"
class TestParsePerceptualHashesAssertion:
def test_round_trip_all_hashes(self) -> None:
hashes = _make_hashes()
original = build_perceptual_hashes_assertion(hashes)
parsed = parse_perceptual_hashes_assertion(original)
assert parsed["schema_version"] == "v1"
assert parsed["sha256"] == hashes.sha256
assert parsed["phash"] == hashes.phash
assert parsed["dhash"] == hashes.dhash
assert parsed["ahash"] == hashes.ahash
assert parsed["colorhash"] == hashes.colorhash
assert parsed["crop_resistant"] == hashes.crop_resistant
def test_round_trip_missing_optional_hashes(self) -> None:
hashes = ImageHashes(sha256="c" * 64)
original = build_perceptual_hashes_assertion(hashes)
parsed = parse_perceptual_hashes_assertion(original)
assert parsed["sha256"] == "c" * 64
assert "phash" not in parsed
assert "ahash" not in parsed
def test_missing_schema_version_raises(self) -> None:
with pytest.raises(ValueError, match="schema_version"):
parse_perceptual_hashes_assertion({"sha256": "abc"})
def test_unknown_schema_version_raises(self) -> None:
with pytest.raises(ValueError, match="unrecognised schema_version"):
parse_perceptual_hashes_assertion({"schema_version": "v99", "sha256": "abc"})
def test_threshold_defaults_when_absent(self) -> None:
parsed = parse_perceptual_hashes_assertion({"schema_version": "v1", "sha256": "abc"})
assert parsed["threshold"] == DEFAULT_PHASH_THRESHOLD
class TestBuildChainRecordAssertion:
def test_basic_fields_present(self) -> None:
cr = _make_chain_record()
payload = build_chain_record_assertion(cr)
assert payload["schema_version"] == "v1"
assert payload["chain_index"] == cr.chain_index
assert payload["record_id"] == cr.record_id.hex()
assert payload["content_hash"] == cr.content_hash.hex()
assert payload["content_type"] == cr.content_type
assert payload["claimed_ts_us"] == cr.claimed_ts
assert payload["prev_hash"] == cr.prev_hash.hex()
assert payload["signer_pubkey"] == cr.signer_pubkey.hex()
assert payload["signature"] == cr.signature.hex()
assert payload["version"] == cr.version
def test_entropy_witnesses_embedded(self) -> None:
cr = _make_chain_record(with_entropy=True)
payload = build_chain_record_assertion(cr)
ew = payload["entropy_witnesses"]
assert ew["sys_uptime"] == cr.entropy_witnesses.sys_uptime # type: ignore[union-attr]
assert ew["fs_snapshot"] == cr.entropy_witnesses.fs_snapshot.hex() # type: ignore[union-attr]
assert ew["proc_entropy"] == cr.entropy_witnesses.proc_entropy # type: ignore[union-attr]
assert ew["boot_id"] == cr.entropy_witnesses.boot_id # type: ignore[union-attr]
def test_entropy_witnesses_absent_when_none(self) -> None:
cr = _make_chain_record(with_entropy=False)
payload = build_chain_record_assertion(cr)
assert "entropy_witnesses" not in payload
def test_inclusion_proof_embedded(self) -> None:
cr = _make_chain_record()
proof = MagicMock()
proof.leaf_hash = "aabbcc"
proof.leaf_index = 7
proof.tree_size = 100
proof.proof_hashes = ["d1", "e2"]
proof.root_hash = "ff00ff"
payload = build_chain_record_assertion(cr, inclusion_proof=proof)
ip = payload["inclusion_proof"]
assert ip["leaf_hash"] == "aabbcc"
assert ip["leaf_index"] == 7
assert ip["tree_size"] == 100
assert ip["proof_hashes"] == ["d1", "e2"]
assert ip["root_hash"] == "ff00ff"
def test_no_inclusion_proof_by_default(self) -> None:
cr = _make_chain_record()
payload = build_chain_record_assertion(cr)
assert "inclusion_proof" not in payload
def test_schema_version_is_v1(self) -> None:
payload = build_chain_record_assertion(_make_chain_record())
assert payload["schema_version"] == "v1"
class TestParseChainRecordAssertion:
def test_round_trip_with_entropy(self) -> None:
cr = _make_chain_record(with_entropy=True)
original = build_chain_record_assertion(cr)
parsed = parse_chain_record_assertion(original)
assert parsed["schema_version"] == "v1"
assert parsed["chain_index"] == cr.chain_index
assert parsed["record_id"] == cr.record_id.hex()
assert parsed["content_hash"] == cr.content_hash.hex()
assert "entropy_witnesses" in parsed
def test_round_trip_without_entropy(self) -> None:
cr = _make_chain_record(with_entropy=False)
original = build_chain_record_assertion(cr)
parsed = parse_chain_record_assertion(original)
assert parsed["schema_version"] == "v1"
assert "entropy_witnesses" not in parsed
def test_round_trip_with_inclusion_proof(self) -> None:
cr = _make_chain_record()
proof = MagicMock()
proof.leaf_hash = "aabbcc"
proof.leaf_index = 3
proof.tree_size = 50
proof.proof_hashes = ["x1"]
proof.root_hash = "rootroot"
original = build_chain_record_assertion(cr, inclusion_proof=proof)
parsed = parse_chain_record_assertion(original)
assert "inclusion_proof" in parsed
assert parsed["inclusion_proof"]["leaf_index"] == 3
def test_missing_schema_version_raises(self) -> None:
with pytest.raises(ValueError, match="schema_version"):
parse_chain_record_assertion({"chain_index": 0})
def test_unknown_schema_version_raises(self) -> None:
with pytest.raises(ValueError, match="unrecognised schema_version"):
parse_chain_record_assertion({"schema_version": "v2", "chain_index": 0})
class TestBuildAttestationIdAssertion:
def test_basic_fields(self) -> None:
payload = build_attestation_id_assertion(
record_id="abc123",
attestor_fingerprint="fp001",
content_type="image",
)
assert payload["schema_version"] == "v1"
assert payload["record_id"] == "abc123"
assert payload["attestor_fingerprint"] == "fp001"
assert payload["content_type"] == "image"
def test_default_content_type(self) -> None:
payload = build_attestation_id_assertion(
record_id="xyz",
attestor_fingerprint="fp002",
)
assert payload["content_type"] == "image"
def test_schema_version_is_v1(self) -> None:
payload = build_attestation_id_assertion("r", "f")
assert payload["schema_version"] == "v1"
def test_document_content_type(self) -> None:
payload = build_attestation_id_assertion("r", "f", content_type="document")
assert payload["content_type"] == "document"
class TestParseAttestationIdAssertion:
def test_round_trip(self) -> None:
original = build_attestation_id_assertion("rec01", "finger01", "audio")
parsed = parse_attestation_id_assertion(original)
assert parsed["schema_version"] == "v1"
assert parsed["record_id"] == "rec01"
assert parsed["attestor_fingerprint"] == "finger01"
assert parsed["content_type"] == "audio"
def test_missing_schema_version_raises(self) -> None:
with pytest.raises(ValueError, match="schema_version"):
parse_attestation_id_assertion({"record_id": "x"})
def test_unknown_schema_version_raises(self) -> None:
with pytest.raises(ValueError, match="unrecognised schema_version"):
parse_attestation_id_assertion({"schema_version": "beta", "record_id": "x"})
def test_missing_optional_fields_use_defaults(self) -> None:
parsed = parse_attestation_id_assertion({"schema_version": "v1"})
assert parsed["record_id"] == ""
assert parsed["attestor_fingerprint"] == ""
assert parsed["content_type"] == "image"
# ═══════════════════════════════════════════════════════════════════════════════
# Section 2: cert.py
# ═══════════════════════════════════════════════════════════════════════════════
def _load_cert(pem: bytes | str) -> x509.Certificate:
if isinstance(pem, str):
pem = pem.encode()
return x509.load_pem_x509_certificate(pem)
class TestGenerateCert:
def test_org_privacy_level_cn(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig(cover_name="Acme Newsroom")
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
assert "Acme Newsroom" in cn
def test_org_privacy_level_cn_fallback(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig(cover_name="")
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
assert "FieldWitness" in cn
def test_pseudonym_privacy_level_cn(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig(cover_name="NightOwl")
pem = _generate_cert(key, config, "pseudonym")
cert = _load_cert(pem)
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
assert cn == "NightOwl"
def test_pseudonym_privacy_level_cn_fallback(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig(cover_name="")
pem = _generate_cert(key, config, "pseudonym")
cert = _load_cert(pem)
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
assert cn == "FieldWitness User"
def test_anonymous_privacy_level_cn(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig(cover_name="ShouldNotAppear")
pem = _generate_cert(key, config, "anonymous")
cert = _load_cert(pem)
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
# Anonymous certs must not reveal org/cover name
assert cn == "FieldWitness"
assert "ShouldNotAppear" not in cn
def test_unrecognised_privacy_level_falls_back_to_anonymous(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig(cover_name="ShouldNotAppear")
pem = _generate_cert(key, config, "unknown_level")
cert = _load_cert(pem)
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
assert cn == "FieldWitness"
def test_uses_ed25519_algorithm(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
# Ed25519 is identified by OID 1.3.101.112
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
assert isinstance(cert.public_key(), Ed25519PublicKey)
def test_basic_constraints_ca_false(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
bc = cert.extensions.get_extension_for_class(x509.BasicConstraints)
assert bc.value.ca is False
def test_key_usage_digital_signature_only(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
ku = cert.extensions.get_extension_for_class(x509.KeyUsage).value
assert ku.digital_signature is True
assert ku.content_commitment is False
assert ku.key_encipherment is False
assert ku.data_encipherment is False
assert ku.key_agreement is False
assert ku.key_cert_sign is False
assert ku.crl_sign is False
def test_validity_approximately_ten_years(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
not_before = cert.not_valid_before_utc
not_after = cert.not_valid_after_utc
delta = not_after - not_before
# 10 years = 3650 days; allow ±2 days for leap-year variation
assert abs(delta.days - _CERT_VALIDITY_YEARS * 365) <= 2
def test_public_key_matches_private_key(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
expected_raw = key.public_key().public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)
actual_raw = cert.public_key().public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)
assert expected_raw == actual_raw
def test_self_signed_issuer_equals_subject(self) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
pem = _generate_cert(key, config, "org")
cert = _load_cert(pem)
assert cert.issuer == cert.subject
class TestGetOrCreateC2paCert:
def test_creates_cert_on_first_call(self, tmp_path: Path) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig(cover_name="TestOrg")
identity_dir = tmp_path / "identity"
identity_dir.mkdir()
cert_pem, returned_key = get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
assert (identity_dir / "c2pa_cert.pem").exists()
assert "BEGIN CERTIFICATE" in cert_pem
# Returned key is the same object passed in
assert returned_key is key
def test_returns_existing_cert_on_second_call(self, tmp_path: Path) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
identity_dir = tmp_path / "identity"
identity_dir.mkdir()
cert_pem_1, _ = get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
cert_pem_2, _ = get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
# Both calls return identical PEM content
assert cert_pem_1 == cert_pem_2
def test_returns_existing_cert_without_regenerating(self, tmp_path: Path) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
identity_dir = tmp_path / "identity"
identity_dir.mkdir()
get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
cert_path = identity_dir / "c2pa_cert.pem"
mtime_after_first = cert_path.stat().st_mtime
get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
mtime_after_second = cert_path.stat().st_mtime
# File was not rewritten on second call
assert mtime_after_first == mtime_after_second
def test_detects_key_mismatch_and_regenerates(self, tmp_path: Path) -> None:
key_a = Ed25519PrivateKey.generate()
key_b = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
identity_dir = tmp_path / "identity"
identity_dir.mkdir()
# Write cert for key_a
cert_pem_a, _ = get_or_create_c2pa_cert(config, key_a, identity_dir=identity_dir)
# Supply key_b — should detect mismatch and regenerate
cert_pem_b, _ = get_or_create_c2pa_cert(config, key_b, identity_dir=identity_dir)
# The two PEMs must differ (different key embedded in cert)
assert cert_pem_a != cert_pem_b
# Verify the new cert actually encodes key_b
cert = _load_cert(cert_pem_b)
expected_raw = key_b.public_key().public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)
actual_raw = cert.public_key().public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)
assert expected_raw == actual_raw
def test_force_regenerates_even_with_matching_key(self, tmp_path: Path) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
identity_dir = tmp_path / "identity"
identity_dir.mkdir()
get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
cert_path = identity_dir / "c2pa_cert.pem"
mtime_after_first = cert_path.stat().st_mtime
# Small sleep to ensure mtime can differ
import time
time.sleep(0.01)
get_or_create_c2pa_cert(config, key, force=True, identity_dir=identity_dir)
mtime_after_force = cert_path.stat().st_mtime
assert mtime_after_force >= mtime_after_first
def test_cert_public_key_matches_private_key(self, tmp_path: Path) -> None:
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
identity_dir = tmp_path / "identity"
identity_dir.mkdir()
cert_pem, _ = get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
cert = _load_cert(cert_pem)
expected_raw = key.public_key().public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)
actual_raw = cert.public_key().public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)
assert expected_raw == actual_raw
# ═══════════════════════════════════════════════════════════════════════════════
# Section 3: export.py -- GPS downsampling and assertion building
# ═══════════════════════════════════════════════════════════════════════════════
class TestDownsampleGps:
"""Verify _downsample_gps uses unbiased rounding (round(), not math.floor())."""
def test_positive_coordinates_round_to_nearest(self) -> None:
# 40.06 should round to 40.1 (nearest), not 40.0 (floor)
lat, lon = _downsample_gps(40.06, 74.06)
assert lat == pytest.approx(40.1, abs=1e-9)
assert lon == pytest.approx(74.1, abs=1e-9)
def test_positive_coordinates_round_down_when_below_half(self) -> None:
# 40.04 should round to 40.0 (nearest)
lat, lon = _downsample_gps(40.04, 74.04)
assert lat == pytest.approx(40.0, abs=1e-9)
assert lon == pytest.approx(74.0, abs=1e-9)
def test_negative_coordinates_round_to_nearest(self) -> None:
# -40.05 should round to -40.1 (nearest), not -40.0 (floor would give -41.0)
lat, lon = _downsample_gps(-40.05, -74.05)
# Python's built-in round() uses banker's rounding for exact .5 — tolerate
# either -40.0 or -40.1 at the boundary; check directional correctness.
assert abs(lat) == pytest.approx(40.0, abs=0.2)
assert abs(lon) == pytest.approx(74.0, abs=0.2)
def test_negative_lat_not_systematically_biased_south(self) -> None:
# With math.floor: floor(-40.04 * 10) / 10 = floor(-400.4) / 10 = -401/10 = -40.1
# With round(): round(-40.04, 1) = -40.0
# We verify the result is -40.0 (nearest), not -40.1 (biased south).
lat, _ = _downsample_gps(-40.04, 0.0)
assert lat == pytest.approx(-40.0, abs=1e-9)
def test_negative_lon_not_systematically_biased_west(self) -> None:
# With math.floor: floor(-74.04 * 10) / 10 = -74.1 (biased west)
# With round(): round(-74.04, 1) = -74.0 (correct)
_, lon = _downsample_gps(0.0, -74.04)
assert lon == pytest.approx(-74.0, abs=1e-9)
def test_equator_and_prime_meridian(self) -> None:
lat, lon = _downsample_gps(0.0, 0.0)
assert lat == pytest.approx(0.0, abs=1e-9)
assert lon == pytest.approx(0.0, abs=1e-9)
def test_exact_grid_point_unchanged(self) -> None:
lat, lon = _downsample_gps(51.5, -0.1)
assert lat == pytest.approx(51.5, abs=1e-9)
assert lon == pytest.approx(-0.1, abs=1e-9)
def test_city_boundary_positive_side(self) -> None:
# 50.96 rounds to 51.0, not 50.0 (floor would give 50.0)
lat, lon = _downsample_gps(50.96, 10.96)
assert lat == pytest.approx(51.0, abs=1e-9)
assert lon == pytest.approx(11.0, abs=1e-9)
def test_city_boundary_negative_side(self) -> None:
# -50.94 rounds to -50.9, not -51.0 (floor bias)
lat, _ = _downsample_gps(-50.94, 0.0)
assert lat == pytest.approx(-50.9, abs=1e-9)
def test_output_precision_is_one_decimal(self) -> None:
lat, lon = _downsample_gps(48.8566, 2.3522) # Paris
# Both values should be expressible as X.X
assert round(lat, 1) == lat
assert round(lon, 1) == lon
class TestBuildExifAssertion:
def _record_with_location(
self,
lat: float,
lon: float,
altitude: float | None = None,
) -> AttestationRecord:
loc = GeoLocation(
latitude=lat,
longitude=lon,
altitude_meters=altitude,
)
cm = CaptureMetadata(location=loc)
return _make_record(metadata=cm.to_dict())
def test_gps_omitted_when_no_location(self) -> None:
record = _make_record(metadata={})
result = _build_exif_assertion(record, include_precise_gps=False)
assert result is None
def test_gps_downsampled_when_include_precise_false(self) -> None:
record = self._record_with_location(40.04, -74.04)
exif = _build_exif_assertion(record, include_precise_gps=False)
assert exif is not None
# 40.04 rounds to 40.0, -74.04 rounds to -74.0
assert exif["GPSLatitude"] == pytest.approx(40.0, abs=1e-9)
assert exif["GPSLongitude"] == pytest.approx(74.0, abs=1e-9)
def test_gps_precise_when_include_precise_true(self) -> None:
record = self._record_with_location(40.7128, -74.0060)
exif = _build_exif_assertion(record, include_precise_gps=True)
assert exif is not None
assert exif["GPSLatitude"] == pytest.approx(40.7128, abs=1e-9)
assert exif["GPSLongitude"] == pytest.approx(74.0060, abs=1e-9)
def test_gps_latitude_ref_north_positive(self) -> None:
record = self._record_with_location(48.8, 2.3)
exif = _build_exif_assertion(record, include_precise_gps=True)
assert exif["GPSLatitudeRef"] == "N"
assert exif["GPSLongitudeRef"] == "E"
def test_gps_latitude_ref_south_negative(self) -> None:
record = self._record_with_location(-33.9, -70.7)
exif = _build_exif_assertion(record, include_precise_gps=True)
assert exif["GPSLatitudeRef"] == "S"
assert exif["GPSLongitudeRef"] == "W"
def test_altitude_included_when_precise_and_present(self) -> None:
record = self._record_with_location(48.8, 2.3, altitude=250.0)
exif = _build_exif_assertion(record, include_precise_gps=True)
assert exif["GPSAltitude"] == pytest.approx(250.0, abs=1e-9)
assert exif["GPSAltitudeRef"] == 0 # above sea level
def test_altitude_omitted_when_not_precise(self) -> None:
record = self._record_with_location(48.8, 2.3, altitude=250.0)
exif = _build_exif_assertion(record, include_precise_gps=False)
assert "GPSAltitude" not in exif # type: ignore[operator]
def test_negative_altitude_ref_is_one(self) -> None:
record = self._record_with_location(0.0, 0.0, altitude=-50.0)
exif = _build_exif_assertion(record, include_precise_gps=True)
assert exif["GPSAltitudeRef"] == 1 # below sea level
def test_returns_none_when_no_capture_metadata(self) -> None:
record = _make_record(metadata={})
assert _build_exif_assertion(record, include_precise_gps=False) is None
def test_device_fields_included(self) -> None:
dev = CaptureDevice(make="Apple", model="iPhone 15 Pro", software="iOS 17")
cm = CaptureMetadata(device=dev)
record = _make_record(metadata=cm.to_dict())
exif = _build_exif_assertion(record, include_precise_gps=False)
assert exif is not None
assert exif["Make"] == "Apple"
assert exif["Model"] == "iPhone 15 Pro"
assert exif["Software"] == "iOS 17"
# serial_hash must never be in the C2PA exif (privacy by design)
assert "serial_hash" not in exif
assert "SerialNumber" not in exif
def test_timestamp_format(self) -> None:
ts = datetime.datetime(2024, 3, 15, 12, 30, 45, tzinfo=datetime.UTC)
cm = CaptureMetadata(captured_at=ts)
record = _make_record(metadata=cm.to_dict())
exif = _build_exif_assertion(record, include_precise_gps=False)
assert exif is not None
assert exif["DateTimeOriginal"] == "2024:03:15 12:30:45"
# ═══════════════════════════════════════════════════════════════════════════════
# Section 4: export.py -- full export_c2pa path (requires c2pa-python)
# ═══════════════════════════════════════════════════════════════════════════════
@pytest.mark.skipif(
True, # Always skip — c2pa-python is not in the CI environment
reason="c2pa-python not installed; full Builder.sign() path skipped",
)
class TestExportC2paIntegration:
"""Full round-trip tests that require c2pa-python.
These are kept here as documentation and can be enabled locally by removing
the skipif mark once the [c2pa] extra is installed.
"""
def test_export_returns_bytes(self, tmp_path: Path) -> None:
c2pa = pytest.importorskip("c2pa") # noqa: F841
key = Ed25519PrivateKey.generate()
config = FieldWitnessConfig()
identity_dir = tmp_path / "identity"
identity_dir.mkdir()
cert_pem, _ = get_or_create_c2pa_cert(config, key, identity_dir=identity_dir)
# Minimal 1x1 white JPEG
from fieldwitness.c2pa_bridge.export import export_c2pa
minimal_jpeg = bytes(
[
0xFF,
0xD8,
0xFF,
0xE0,
0x00,
0x10,
0x4A,
0x46,
0x49,
0x46,
0x00,
0x01,
0x01,
0x00,
0x00,
0x01,
0x00,
0x01,
0x00,
0x00,
0xFF,
0xD9,
]
)
record = _make_record()
result = export_c2pa(minimal_jpeg, "jpeg", record, key, cert_pem)
assert isinstance(result, bytes)
assert len(result) > len(minimal_jpeg)