diff --git a/pyproject.toml b/pyproject.toml index 3c7e570..766cd0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,6 +124,10 @@ dev = [ "ruff>=0.1.0", "mypy>=1.0.0", ] +test-e2e = [ + "pytest-playwright>=0.4.0", + "playwright>=1.40.0", +] [project.scripts] fieldwitness = "fieldwitness.cli:main" @@ -151,6 +155,9 @@ packages = ["src/fieldwitness", "frontends"] testpaths = ["tests"] python_files = ["test_*.py"] addopts = "-v --cov=fieldwitness --cov-report=term-missing" +markers = [ + "e2e: end-to-end Playwright browser tests (require `playwright install` and `pip install fieldwitness[test-e2e]`)", +] [tool.black] line-length = 100 diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..ec941e0 --- /dev/null +++ b/tests/e2e/__init__.py @@ -0,0 +1 @@ +# e2e test package diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 0000000..9add910 --- /dev/null +++ b/tests/e2e/conftest.py @@ -0,0 +1,218 @@ +""" +Playwright e2e test fixtures for the FieldWitness Flask web UI. + +Isolation strategy +------------------ +- Each test session uses a fresh tmp_path via the `live_server` fixture. +- `FIELDWITNESS_DATA_DIR` is set in the OS environment before the Flask app + factory runs, so paths.py picks it up through its lazy `__getattr__`. +- The app is never imported at module level here — it is imported inside the + fixture function *after* the env var is set. +- The Flask dev server is run in a daemon thread with use_reloader=False so + the test process remains the controller. +""" + +from __future__ import annotations + +import os +import socket +import threading +import time +from pathlib import Path +from typing import Generator + +import pytest +from playwright.sync_api import Page + + +# --------------------------------------------------------------------------- +# Constants shared across fixtures +# --------------------------------------------------------------------------- + +TEST_ADMIN_USER = "testadmin" +TEST_ADMIN_PASS = "Fieldwitness-e2e-2024!" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _find_free_port() -> int: + """Return a free TCP port on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +def _wait_for_server(base_url: str, timeout: float = 10.0) -> None: + """Poll until the server responds or timeout is reached.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + import urllib.request + + urllib.request.urlopen(f"{base_url}/health", timeout=1) + return + except Exception: + time.sleep(0.1) + raise RuntimeError(f"Server at {base_url} did not start within {timeout}s") + + +# --------------------------------------------------------------------------- +# Session-scoped fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def e2e_data_dir(tmp_path_factory: pytest.TempPathFactory) -> Path: + """A single data directory for the entire test session. + + Using session scope means one Flask app instance serves all tests, + matching how Playwright typically works (one browser, many pages). + """ + return tmp_path_factory.mktemp("fieldwitness_e2e") + + +@pytest.fixture(scope="session") +def live_server(e2e_data_dir: Path) -> Generator[str, None, None]: + """Start the Flask app on a random port, yield the base URL. + + The server runs in a daemon thread so it dies when the test process exits. + + Path isolation strategy + ----------------------- + fieldwitness.paths.BASE_DIR is a module-level Path computed once at import + time from FIELDWITNESS_DATA_DIR. By the time conftest runs, fieldwitness is + already imported (pytest imports it for coverage), so setting the env var + is too late. Instead we directly patch `fieldwitness.paths.BASE_DIR` for + the duration of the test session; the module-level __getattr__ then resolves + every derived path (IDENTITY_DIR, AUTH_DB, …) from that patched BASE_DIR. + """ + data_dir = e2e_data_dir / ".fieldwitness" + data_dir.mkdir(parents=True, exist_ok=True) + + # Patch BASE_DIR so all lazy path resolution uses our temp directory + import fieldwitness.paths as _paths + + original_base_dir = _paths.BASE_DIR + _paths.BASE_DIR = data_dir + + # Also set the env var so any sub-process or re-import gets the right dir + os.environ["FIELDWITNESS_DATA_DIR"] = str(data_dir) + + port = _find_free_port() + + from fieldwitness.config import FieldWitnessConfig + from frontends.web.app import create_app + + config = FieldWitnessConfig( + https_enabled=False, + auth_enabled=True, + deadman_enabled=False, + killswitch_enabled=False, + chain_enabled=False, + chain_auto_wrap=False, + max_upload_mb=16, + session_timeout_minutes=60, + login_lockout_attempts=10, + login_lockout_minutes=1, + ) + + flask_app = create_app(config=config) + flask_app.config["TESTING"] = True + flask_app.config["WTF_CSRF_ENABLED"] = False # Flask-WTF checks this per request + flask_app.config["SECRET_KEY"] = "e2e-test-secret-key-not-for-production" + + def _run(): + from werkzeug.serving import make_server + + srv = make_server("127.0.0.1", port, flask_app) + srv.serve_forever() + + thread = threading.Thread(target=_run, daemon=True) + thread.start() + + base_url = f"http://127.0.0.1:{port}" + _wait_for_server(base_url) + + yield base_url + + # Restore original BASE_DIR when the session ends + _paths.BASE_DIR = original_base_dir + + +# --------------------------------------------------------------------------- +# Function-scoped fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def authenticated_page(live_server: str, page: Page) -> Page: + """Return a Playwright page that is authenticated as the test admin. + + Handles first-run setup (creating the admin user) on the first call. + Subsequent calls log in directly. + """ + # Check if we need first-run setup + page.goto(f"{live_server}/") + page.wait_for_load_state("networkidle") + + if "/setup" in page.url: + # First-run: create admin account + page.fill("input[name='username']", TEST_ADMIN_USER) + page.fill("input[name='password']", TEST_ADMIN_PASS) + page.fill("input[name='password_confirm']", TEST_ADMIN_PASS) + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + elif "/login" in page.url: + # Subsequent runs: log in + page.fill("input[name='username']", TEST_ADMIN_USER) + page.fill("input[name='password']", TEST_ADMIN_PASS) + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # Ensure we are at the index (not stuck on setup or login) + if "/login" in page.url or "/setup" in page.url: + raise RuntimeError( + f"Failed to authenticate. Currently at: {page.url}\n" + f"Page content: {page.content()[:500]}" + ) + + return page + + +@pytest.fixture() +def dropbox_token(live_server: str, authenticated_page: Page) -> str: + """Create a drop box upload token via the admin UI and return the token URL. + + Returns the full upload URL so tests can navigate directly to the source + upload page without needing to parse the admin UI. + """ + page = authenticated_page + page.goto(f"{live_server}/dropbox/admin") + page.wait_for_load_state("networkidle") + + # Fill in token creation form + label_input = page.locator("input[name='label']") + if label_input.count(): + label_input.fill("e2e-test-source") + + page.fill("input[name='hours']", "24") + page.fill("input[name='max_files']", "10") + + # Submit and capture the flash message containing the upload URL + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # Extract the upload URL from the success flash message + flash_text = page.locator(".alert-success, .alert.success, [class*='alert']").first.inner_text() + + # The URL is embedded in the flash: "Share this URL with your source: http://..." + import re + + match = re.search(r"http://\S+", flash_text) + if not match: + raise RuntimeError(f"Could not find upload URL in flash message: {flash_text!r}") + + return match.group(0) diff --git a/tests/e2e/helpers.py b/tests/e2e/helpers.py new file mode 100644 index 0000000..1a154c5 --- /dev/null +++ b/tests/e2e/helpers.py @@ -0,0 +1,107 @@ +""" +Test helpers for e2e tests — generate in-memory test files. + +All helpers return raw bytes suitable for use with page.set_input_files(). +""" + +from __future__ import annotations + +import io +import struct +import zlib + + +def create_test_image(width: int = 64, height: int = 64) -> bytes: + """Generate a minimal valid PNG image in memory. + + Returns raw PNG bytes. Does not require Pillow — builds a PNG manually + using only the stdlib so that helpers.py has zero external dependencies. + """ + # PNG signature + png_sig = b"\x89PNG\r\n\x1a\n" + + def make_chunk(chunk_type: bytes, data: bytes) -> bytes: + length = struct.pack(">I", len(data)) + crc = struct.pack(">I", zlib.crc32(chunk_type + data) & 0xFFFFFFFF) + return length + chunk_type + data + crc + + # IHDR: width, height, bit depth=8, color type=2 (RGB), compression=0, + # filter=0, interlace=0 + ihdr_data = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0) + ihdr = make_chunk(b"IHDR", ihdr_data) + + # IDAT: raw scanlines (filter byte 0 per row, then RGB pixels) + raw_rows = [] + for y in range(height): + row = b"\x00" # filter type None + for x in range(width): + # Vary pixel color so the image is non-trivial + r = (x * 4) & 0xFF + g = (y * 4) & 0xFF + b = ((x + y) * 2) & 0xFF + row += bytes([r, g, b]) + raw_rows.append(row) + + compressed = zlib.compress(b"".join(raw_rows)) + idat = make_chunk(b"IDAT", compressed) + + iend = make_chunk(b"IEND", b"") + + return png_sig + ihdr + idat + iend + + +def create_test_pdf() -> bytes: + """Generate a minimal valid single-page PDF in memory. + + The PDF is structurally valid (Acrobat/browsers will accept it) but + contains only a blank page with a text label. + """ + body = b"""%PDF-1.4 +1 0 obj<>endobj +2 0 obj<>endobj +3 0 obj<>>>>>endobj +4 0 obj<>endobj +5 0 obj<> +stream +BT /F1 12 Tf 100 700 Td (FieldWitness test PDF) Tj ET +endstream +endobj +""" + xref_offset = len(body) + body += ( + b"xref\n" + b"0 6\n" + b"0000000000 65535 f \n" + b"0000000009 00000 n \n" + b"0000000058 00000 n \n" + b"0000000115 00000 n \n" + b"0000000266 00000 n \n" + b"0000000346 00000 n \n" + b"trailer<>\n" + b"startxref\n" + ) + body += str(xref_offset).encode() + b"\n%%EOF\n" + return body + + +def create_test_csv() -> bytes: + """Generate a simple CSV file in memory.""" + lines = [ + "timestamp,sensor_id,value,unit", + "2024-01-15T10:00:00Z,TEMP-001,23.5,celsius", + "2024-01-15T10:01:00Z,TEMP-001,23.7,celsius", + "2024-01-15T10:02:00Z,TEMP-001,23.6,celsius", + "2024-01-15T10:03:00Z,HUMID-001,65.2,percent", + ] + return "\n".join(lines).encode("utf-8") + + +def create_test_txt() -> bytes: + """Generate a plain-text file in memory.""" + content = ( + "FieldWitness e2e test document\n" + "===============================\n\n" + "This file was generated by the automated test suite.\n" + "It contains no sensitive information.\n" + ) + return content.encode("utf-8") diff --git a/tests/e2e/test_attest.py b/tests/e2e/test_attest.py new file mode 100644 index 0000000..c706ca3 --- /dev/null +++ b/tests/e2e/test_attest.py @@ -0,0 +1,295 @@ +""" +e2e tests for the attestation and verification pages. + +Each test that needs to attest a file first ensures an identity key exists by +navigating to /keys and generating one. That step is idempotent — the server +silently ignores a second generate request if a key already exists. + +Terminology used in comments +----------------------------- +- attest form:
at /attest + - file input name: "image" (the field name in the HTML, even for non-images) + - optional text inputs: "caption", "location_name" +- verify form: same structure at /verify + - file input name: "image" +""" + +from __future__ import annotations + +import io +from pathlib import Path + +import pytest +from playwright.sync_api import Page, expect + +from tests.e2e.helpers import create_test_csv, create_test_image, create_test_pdf, create_test_txt + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _ensure_identity(page: Page, live_server: str) -> None: + """Generate an Ed25519 identity if one does not already exist.""" + page.goto(f"{live_server}/keys/") + page.wait_for_load_state("networkidle") + + # If the "Generate Identity" button is visible the key is missing; click it. + gen_button = page.locator("form[action*='generate_identity'] button") + if gen_button.count() > 0: + gen_button.click() + page.wait_for_load_state("networkidle") + + +def _attest_bytes( + page: Page, + live_server: str, + file_bytes: bytes, + filename: str, + caption: str = "", +) -> None: + """Upload *file_bytes* as *filename* via the /attest form.""" + page.goto(f"{live_server}/attest") + page.wait_for_load_state("networkidle") + + page.set_input_files( + "input[name='image']", + files=[{"name": filename, "mimeType": _mime(filename), "buffer": file_bytes}], + ) + + if caption: + page.fill("input[name='caption']", caption) + + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + +def _mime(filename: str) -> str: + ext = filename.rsplit(".", 1)[-1].lower() + return { + "png": "image/png", + "jpg": "image/jpeg", + "jpeg": "image/jpeg", + "pdf": "application/pdf", + "csv": "text/csv", + "txt": "text/plain", + }.get(ext, "application/octet-stream") + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.e2e +def test_attest_page_loads(live_server: str, authenticated_page: Page) -> None: + """The /attest page renders the file upload form.""" + page = authenticated_page + page.goto(f"{live_server}/attest") + page.wait_for_load_state("networkidle") + + expect(page.locator("input[name='image']")).to_be_visible() + expect(page.locator("button[type='submit']")).to_be_visible() + + +@pytest.mark.e2e +def test_attest_image_file(live_server: str, authenticated_page: Page) -> None: + """Attesting a PNG image shows the success result page with all hash fields.""" + page = authenticated_page + _ensure_identity(page, live_server) + + img_bytes = create_test_image(64, 64) + _attest_bytes(page, live_server, img_bytes, "test_capture.png", caption="e2e test image") + + # Result page has the success alert + expect(page.locator(".alert-success")).to_contain_text("Attestation created successfully") + + # Record ID and SHA-256 must be present + expect(page.locator("body")).to_contain_text("Record ID") + expect(page.locator("body")).to_contain_text("SHA-256") + + # Caption was saved + expect(page.locator("body")).to_contain_text("e2e test image") + + +@pytest.mark.e2e +def test_attest_pdf_file(live_server: str, authenticated_page: Page) -> None: + """Attesting a PDF succeeds and the result page notes SHA-256-only (no perceptual hashes).""" + page = authenticated_page + _ensure_identity(page, live_server) + + pdf_bytes = create_test_pdf() + _attest_bytes(page, live_server, pdf_bytes, "evidence.pdf") + + expect(page.locator(".alert-success")).to_contain_text("Attestation created successfully") + + # Non-image attestation note must appear + expect(page.locator(".alert-info")).to_contain_text("cryptographic hash") + + # Perceptual hash fields must NOT appear for PDFs + expect(page.locator("body")).not_to_contain_text("pHash") + + +@pytest.mark.e2e +def test_attest_csv_file(live_server: str, authenticated_page: Page) -> None: + """Attesting a CSV file succeeds.""" + page = authenticated_page + _ensure_identity(page, live_server) + + csv_bytes = create_test_csv() + _attest_bytes(page, live_server, csv_bytes, "sensor_data.csv") + + expect(page.locator(".alert-success")).to_contain_text("Attestation created successfully") + + +@pytest.mark.e2e +def test_attest_requires_identity(live_server: str, authenticated_page: Page) -> None: + """The submit button is disabled when no identity key is configured. + + NOTE: This test only checks the rendered HTML state. We do not actually + delete the identity key — that would break subsequent tests in the session. + Instead we verify the template logic: the template disables the button and + shows a warning when has_identity is False. + + We observe the button state based on whether an identity was just generated. + """ + page = authenticated_page + page.goto(f"{live_server}/attest") + page.wait_for_load_state("networkidle") + + # If identity is absent, a warning alert should be visible and the button disabled. + # If identity is present, the button is enabled. + submit = page.locator("button[type='submit']") + warning = page.locator(".alert-warning") + + if warning.count() > 0: + # No identity — button must be disabled + expect(submit).to_be_disabled() + else: + # Identity present — button must be enabled + expect(submit).to_be_enabled() + + +@pytest.mark.e2e +def test_verify_attested_file(live_server: str, authenticated_page: Page) -> None: + """Attest a file then immediately verify it — verification must succeed.""" + page = authenticated_page + _ensure_identity(page, live_server) + + img_bytes = create_test_image(80, 80) + filename = "verify_me.png" + + # Attest + _attest_bytes(page, live_server, img_bytes, filename) + expect(page.locator(".alert-success")).to_contain_text("Attestation created successfully") + + # Verify the same bytes + page.goto(f"{live_server}/verify") + page.wait_for_load_state("networkidle") + + page.set_input_files( + "input[name='image']", + files=[{"name": filename, "mimeType": "image/png", "buffer": img_bytes}], + ) + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # Verification result must show a match + expect(page.locator(".alert-success")).to_contain_text("matching attestation") + + +@pytest.mark.e2e +def test_verify_tampered_file(live_server: str, authenticated_page: Page) -> None: + """A file modified after attestation must not verify (no matching attestation).""" + page = authenticated_page + _ensure_identity(page, live_server) + + original_bytes = create_test_image(90, 90) + # Attest the original + _attest_bytes(page, live_server, original_bytes, "tampered.png") + expect(page.locator(".alert-success")).to_contain_text("Attestation created successfully") + + # Tamper: flip a single byte near the end of the image data + tampered = bytearray(original_bytes) + tampered[-50] ^= 0xFF + tampered_bytes = bytes(tampered) + + # Verify the tampered version — must not match + page.goto(f"{live_server}/verify") + page.wait_for_load_state("networkidle") + + page.set_input_files( + "input[name='image']", + files=[{"name": "tampered.png", "mimeType": "image/png", "buffer": tampered_bytes}], + ) + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # Warning = no match found (the alert-warning class is used for "not found") + # OR the alert-success is absent + success_alert = page.locator(".alert-success") + warning_alert = page.locator(".alert-warning") + + assert warning_alert.count() > 0 or success_alert.count() == 0, ( + "Tampered file incorrectly verified as matching" + ) + + +@pytest.mark.e2e +def test_attestation_log(live_server: str, authenticated_page: Page) -> None: + """After attesting multiple files the /attest/log page lists them all.""" + page = authenticated_page + _ensure_identity(page, live_server) + + # Attest three distinct files + for i in range(3): + img = create_test_image(32 + i * 8, 32 + i * 8) + _attest_bytes(page, live_server, img, f"log_test_{i}.png", caption=f"log entry {i}") + expect(page.locator(".alert-success")).to_contain_text("Attestation created successfully") + + # Check the log page + page.goto(f"{live_server}/attest/log") + page.wait_for_load_state("networkidle") + + # The log should contain at least 3 rows (may have more from other tests) + rows = page.locator("table tbody tr") + assert rows.count() >= 3, ( + f"Expected at least 3 rows in attestation log, got {rows.count()}" + ) + + +@pytest.mark.e2e +def test_batch_attest(live_server: str, authenticated_page: Page) -> None: + """The /attest/batch JSON endpoint accepts multiple files and returns results.""" + import json + + page = authenticated_page + _ensure_identity(page, live_server) + + # Use the fetch API to POST two files to the batch endpoint + img1 = create_test_image(48, 48) + img2 = create_test_image(56, 56) + + # Encode images as base64 for transfer to the browser context + import base64 + + img1_b64 = base64.b64encode(img1).decode() + img2_b64 = base64.b64encode(img2).decode() + + result = page.evaluate( + """async ([img1_b64, img2_b64]) => { + const b64 = (s) => Uint8Array.from(atob(s), c => c.charCodeAt(0)); + const form = new FormData(); + form.append('images', new File([b64(img1_b64)], 'batch1.png', {type: 'image/png'})); + form.append('images', new File([b64(img2_b64)], 'batch2.png', {type: 'image/png'})); + + const resp = await fetch('/attest/batch', {method: 'POST', body: form}); + return await resp.json(); + }""", + [img1_b64, img2_b64], + ) + + assert result.get("total") == 2, f"Unexpected batch result: {result}" + assert result.get("errors") == 0, f"Batch had errors: {result}" + assert result.get("attested", 0) + result.get("skipped", 0) == 2 diff --git a/tests/e2e/test_auth.py b/tests/e2e/test_auth.py new file mode 100644 index 0000000..9f65de2 --- /dev/null +++ b/tests/e2e/test_auth.py @@ -0,0 +1,232 @@ +""" +e2e tests for the authentication system. + +Tests cover first-run setup, login/logout flows, bad-credential rejection, +and the enforcement of auth guards on protected routes. + +The first-run setup test uses its own isolated live server because it must +observe a database with zero users — after the session-scoped live_server has +created the admin user those conditions can never be recreated in the same +process. +""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest +from playwright.sync_api import Page, expect + +from tests.e2e.conftest import TEST_ADMIN_PASS, TEST_ADMIN_USER, _find_free_port, _wait_for_server + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +PROTECTED_ROUTES = [ + "/attest", + "/keys/", + "/admin/users", + "/fieldkit/", + "/dropbox/admin", + "/federation/", +] + + +# --------------------------------------------------------------------------- +# Tests that use the shared live_server (admin already created) +# --------------------------------------------------------------------------- + + +@pytest.mark.e2e +def test_login(live_server: str, page: Page) -> None: + """Correct credentials reach the index page.""" + page.goto(f"{live_server}/login") + page.wait_for_load_state("networkidle") + + page.fill("input[name='username']", TEST_ADMIN_USER) + page.fill("input[name='password']", TEST_ADMIN_PASS) + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # Should land on the index, not /login + expect(page).not_to_have_url(f"{live_server}/login") + # Flash message confirms success + expect(page.locator("body")).to_contain_text("Login successful") + + +@pytest.mark.e2e +def test_login_wrong_password(live_server: str, page: Page) -> None: + """Wrong password stays on login with an error message.""" + page.goto(f"{live_server}/login") + page.wait_for_load_state("networkidle") + + page.fill("input[name='username']", TEST_ADMIN_USER) + page.fill("input[name='password']", "definitely-wrong-password") + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + expect(page).to_have_url(f"{live_server}/login") + expect(page.locator("body")).to_contain_text("Invalid") + + +@pytest.mark.e2e +def test_login_unknown_user(live_server: str, page: Page) -> None: + """Unknown username is rejected without leaking whether the user exists.""" + page.goto(f"{live_server}/login") + page.wait_for_load_state("networkidle") + + page.fill("input[name='username']", "nobody_here") + page.fill("input[name='password']", "anything") + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + expect(page).to_have_url(f"{live_server}/login") + # Must not expose "user not found" vs "wrong password" distinction + expect(page.locator("body")).to_contain_text("Invalid") + + +@pytest.mark.e2e +def test_logout(live_server: str, authenticated_page: Page) -> None: + """Logout clears session and redirects away from protected pages.""" + page = authenticated_page + + # Confirm we're authenticated + page.goto(f"{live_server}/") + expect(page).not_to_have_url(f"{live_server}/login") + + # Submit the logout form (it's a POST for CSRF reasons) + page.evaluate("""() => { + const form = document.createElement('form'); + form.method = 'POST'; + form.action = '/logout'; + document.body.appendChild(form); + form.submit(); + }""") + page.wait_for_load_state("networkidle") + + # After logout, navigating to a protected page should redirect to login + page.goto(f"{live_server}/attest") + page.wait_for_load_state("networkidle") + expect(page).to_have_url(f"{live_server}/login") + + +@pytest.mark.e2e +def test_protected_routes_require_auth(live_server: str, page: Page) -> None: + """Unauthenticated requests to protected routes redirect to /login.""" + for route in PROTECTED_ROUTES: + page.goto(f"{live_server}{route}") + page.wait_for_load_state("networkidle") + + assert "/login" in page.url or "/setup" in page.url, ( + f"Route {route} did not redirect to /login — currently at {page.url}" + ) + + +@pytest.mark.e2e +def test_verify_is_publicly_accessible(live_server: str, page: Page) -> None: + """/verify must be accessible without authentication (third-party verifier use case).""" + page.goto(f"{live_server}/verify") + page.wait_for_load_state("networkidle") + + # Should NOT redirect to login + assert "/login" not in page.url, f"Expected /verify to be public, got redirect to {page.url}" + expect(page.locator("body")).to_contain_text("Verify") + + +# --------------------------------------------------------------------------- +# First-run setup test — needs its own isolated server with an empty database +# --------------------------------------------------------------------------- + + +def _spawn_fresh_server(data_dir: Path) -> tuple[str, "subprocess.Popen"]: + """Spawn a fresh Flask process in a subprocess pointing at *data_dir*. + + Using a subprocess instead of a thread avoids the global BASE_DIR race + condition: the subprocess gets its own Python interpreter and module + namespace, so its fieldwitness.paths.BASE_DIR is completely independent + from the main test process's session-scoped server. + """ + import subprocess + import sys + + port = _find_free_port() + + server_code = f""" +import os, sys +os.environ["FIELDWITNESS_DATA_DIR"] = {str(data_dir)!r} +sys.path.insert(0, {str(Path(__file__).parents[2] / "frontends" / "web")!r}) + +from fieldwitness.config import FieldWitnessConfig +from frontends.web.app import create_app + +config = FieldWitnessConfig( + https_enabled=False, auth_enabled=True, deadman_enabled=False, + killswitch_enabled=False, chain_enabled=False, chain_auto_wrap=False, + max_upload_mb=16, session_timeout_minutes=60, + login_lockout_attempts=10, login_lockout_minutes=1, +) +app = create_app(config=config) +app.config["TESTING"] = True +app.config["WTF_CSRF_ENABLED"] = False +app.config["SECRET_KEY"] = "e2e-setup-test-secret" + +from werkzeug.serving import make_server +srv = make_server("127.0.0.1", {port}, app) +srv.serve_forever() +""" + + proc = subprocess.Popen( + [sys.executable, "-c", server_code], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + base_url = f"http://127.0.0.1:{port}" + try: + _wait_for_server(base_url, timeout=15) + except RuntimeError: + proc.kill() + raise + + return base_url, proc + + +@pytest.mark.e2e +def test_first_run_setup(tmp_path: Path, page: Page) -> None: + """A fresh app with no users redirects to /setup and allows admin creation. + + Uses a subprocess-based server so its fieldwitness.paths.BASE_DIR is + completely isolated from the session-scoped live_server running in the + same process — no global module state is shared. + """ + import subprocess + + data_dir = tmp_path / ".fieldwitness" + data_dir.mkdir() + + base_url, proc = _spawn_fresh_server(data_dir) + + try: + # Root should redirect to setup (no users exist yet) + page.goto(f"{base_url}/") + page.wait_for_load_state("networkidle") + + assert "/setup" in page.url, f"Expected redirect to /setup, got {page.url}" + expect(page.locator("h5")).to_contain_text("Initial Setup") + + # Fill and submit the setup form + page.fill("input[name='username']", "setup_admin") + page.fill("input[name='password']", "Setup-Password-99!") + page.fill("input[name='password_confirm']", "Setup-Password-99!") + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # Should now be on the index (setup auto-logs-in the new admin) + assert "/setup" not in page.url, f"Still on setup page after submission: {page.url}" + assert "/login" not in page.url, f"Setup did not log in automatically: {page.url}" + finally: + proc.terminate() + proc.wait(timeout=5) diff --git a/tests/e2e/test_dropbox.py b/tests/e2e/test_dropbox.py new file mode 100644 index 0000000..500dbb0 --- /dev/null +++ b/tests/e2e/test_dropbox.py @@ -0,0 +1,186 @@ +""" +e2e tests for the source drop box feature. + +The drop box is split into two distinct surfaces: +- Admin surface (/dropbox/admin) — authenticated, token management +- Source surface (/dropbox/upload/) — unauthenticated, CSRF-exempt + +Tests that exercise the source surface navigate in a fresh browser context +(or just navigate directly to the upload URL) to confirm there is no +session/authentication requirement on that path. +""" + +from __future__ import annotations + +import re +import time + +import pytest +from playwright.sync_api import Page, expect + +from tests.e2e.helpers import create_test_image, create_test_txt + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _create_token(page: Page, live_server: str, label: str = "e2e source", hours: int = 24) -> str: + """Create a drop box token via the admin UI and return the full upload URL.""" + page.goto(f"{live_server}/dropbox/admin") + page.wait_for_load_state("networkidle") + + label_input = page.locator("input[name='label']") + if label_input.count(): + label_input.fill(label) + + page.fill("input[name='hours']", str(hours)) + page.fill("input[name='max_files']", "5") + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # Flash message contains the upload URL + flash = page.locator("[class*='alert']").first.inner_text() + match = re.search(r"http://\S+", flash) + if not match: + raise RuntimeError(f"No upload URL found in flash message: {flash!r}") + + return match.group(0) + + +# --------------------------------------------------------------------------- +# Admin panel tests +# --------------------------------------------------------------------------- + + +@pytest.mark.e2e +def test_dropbox_admin_page(live_server: str, authenticated_page: Page) -> None: + """The /dropbox/admin page loads and shows the token creation form.""" + page = authenticated_page + page.goto(f"{live_server}/dropbox/admin") + page.wait_for_load_state("networkidle") + + expect(page.locator("input[name='label']")).to_be_visible() + expect(page.locator("input[name='hours']")).to_be_visible() + expect(page.locator("input[name='max_files']")).to_be_visible() + expect(page.locator("button[type='submit']")).to_be_visible() + + +@pytest.mark.e2e +def test_create_upload_token(live_server: str, authenticated_page: Page) -> None: + """Creating a token shows a success flash and the token appears in the active list.""" + page = authenticated_page + upload_url = _create_token(page, live_server, label="my-e2e-source") + + # The upload URL must contain the expected prefix + assert "/dropbox/upload/" in upload_url, f"Unexpected upload URL: {upload_url}" + + # The token should now appear in the active token table + # (token[:12] is shown in the table as per the template) + token_slug = upload_url.split("/dropbox/upload/")[1].split("?")[0] + table = page.locator("table") + if table.count() > 0: + expect(table).to_contain_text(token_slug[:12]) + + +@pytest.mark.e2e +def test_source_upload_page_accessible_without_auth( + live_server: str, authenticated_page: Page, page: Page +) -> None: + """The source upload page is accessible without any authentication. + + We get the URL via the admin (authenticated), then open it in a *separate* + fresh page that has no session cookie. + """ + upload_url = _create_token(authenticated_page, live_server, label="anon-test") + + # Navigate to the upload URL in the unauthenticated page + page.goto(upload_url) + page.wait_for_load_state("networkidle") + + # Must not redirect to login + assert "/login" not in page.url, ( + f"Source upload page redirected to login: {page.url}" + ) + + # Upload form must be present + expect(page.locator("input[type='file'], input[name='files']")).to_be_visible() + + +@pytest.mark.e2e +def test_source_upload_file(live_server: str, authenticated_page: Page, page: Page) -> None: + """A source can upload a file via the drop box and receives a receipt code.""" + upload_url = _create_token(authenticated_page, live_server, label="upload-test") + + # Submit a file as the anonymous source (unauthenticated page) + page.goto(upload_url) + page.wait_for_load_state("networkidle") + + txt_bytes = create_test_txt() + page.set_input_files( + "input[type='file'], input[name='files']", + files=[{"name": "tip.txt", "mimeType": "text/plain", "buffer": txt_bytes}], + ) + + page.click("button[type='submit']") + page.wait_for_load_state("networkidle") + + # The response page should show a receipt code + body_text = page.locator("body").inner_text() + assert any(word in body_text.lower() for word in ("receipt", "success", "received", "upload")), ( + f"No success/receipt indication found after upload. Body: {body_text[:300]}" + ) + + +@pytest.mark.e2e +def test_invalid_token_rejected(live_server: str, page: Page) -> None: + """A request with an invalid/missing token returns 404, not a login redirect.""" + page.goto(f"{live_server}/dropbox/upload/totally-invalid-token-xyz") + page.wait_for_load_state("networkidle") + + # Should be a 404 / plain-text "expired or invalid" message, NOT a redirect to login + assert "/login" not in page.url, ( + f"Invalid token redirected to login instead of showing 404: {page.url}" + ) + body = page.locator("body").inner_text() + assert any(word in body.lower() for word in ("expired", "invalid", "not found")), ( + f"Expected 'expired or invalid' message, got: {body[:200]}" + ) + + +@pytest.mark.e2e +def test_revoke_token(live_server: str, authenticated_page: Page, page: Page) -> None: + """An admin can revoke a token; after revocation the upload URL returns 404.""" + admin_page = authenticated_page + upload_url = _create_token(admin_page, live_server, label="revoke-test") + token = upload_url.split("/dropbox/upload/")[1].split("?")[0] + + # Verify the token works before revocation + page.goto(upload_url) + page.wait_for_load_state("networkidle") + assert "/login" not in page.url + + # Revoke via admin UI + admin_page.goto(f"{live_server}/dropbox/admin") + admin_page.wait_for_load_state("networkidle") + + # Find the revoke button for this token and click it + revoke_form = admin_page.locator(f"form input[name='token'][value='{token}']").locator("..") + if revoke_form.count() == 0: + # Try by partial token match in the table row + revoke_form = admin_page.locator("form").filter( + has=admin_page.locator(f"input[value='{token}']") + ) + + if revoke_form.count() > 0: + revoke_form.locator("button[type='submit']").click() + admin_page.wait_for_load_state("networkidle") + + # Now the upload URL should return 404 + page.goto(upload_url) + page.wait_for_load_state("networkidle") + body = page.locator("body").inner_text() + assert any(word in body.lower() for word in ("expired", "invalid", "not found")), ( + f"Expected 404 after revocation, got: {body[:200]}" + ) diff --git a/tests/e2e/test_fieldkit.py b/tests/e2e/test_fieldkit.py new file mode 100644 index 0000000..5221d05 --- /dev/null +++ b/tests/e2e/test_fieldkit.py @@ -0,0 +1,83 @@ +""" +e2e tests for the Fieldkit pages. + +Safety note: we do NOT actually fire the killswitch in any test — doing so +would destroy the session-scoped data directory and break all subsequent tests. +We verify the UI renders correctly and the form fields are present, but we do +not submit the final "Execute Purge" action. +""" + +from __future__ import annotations + +import pytest +from playwright.sync_api import Page, expect + + +@pytest.mark.e2e +def test_fieldkit_status_page_loads(live_server: str, authenticated_page: Page) -> None: + """The /fieldkit/ status dashboard loads and shows expected sections.""" + page = authenticated_page + page.goto(f"{live_server}/fieldkit/") + page.wait_for_load_state("networkidle") + + # Two key sections: dead man's switch and killswitch + expect(page.locator("body")).to_contain_text("Dead Man") + expect(page.locator("body")).to_contain_text("Killswitch") + + +@pytest.mark.e2e +def test_fieldkit_status_shows_disarmed_deadman( + live_server: str, authenticated_page: Page +) -> None: + """With deadman disabled in test config, the switch shows 'Disarmed'.""" + page = authenticated_page + page.goto(f"{live_server}/fieldkit/") + page.wait_for_load_state("networkidle") + + # The conftest configures deadman_enabled=False so the badge must be Disarmed + deadman_section = page.locator("body") + expect(deadman_section).to_contain_text("Disarmed") + + +@pytest.mark.e2e +def test_killswitch_page_loads(live_server: str, authenticated_page: Page) -> None: + """The /fieldkit/killswitch page loads and shows the confirmation form.""" + page = authenticated_page + page.goto(f"{live_server}/fieldkit/killswitch") + page.wait_for_load_state("networkidle") + + # Must have the text confirmation input + expect(page.locator("input[name='confirm']")).to_be_visible() + + # Must have the password confirmation input + expect(page.locator("input[name='password']")).to_be_visible() + + # The destructive submit button must be present but we do NOT click it + expect(page.locator("button[type='submit']")).to_be_visible() + + +@pytest.mark.e2e +def test_killswitch_requires_admin(live_server: str, page: Page) -> None: + """The killswitch page requires authentication; unauthenticated access is redirected.""" + page.goto(f"{live_server}/fieldkit/killswitch") + page.wait_for_load_state("networkidle") + + assert "/login" in page.url or "/setup" in page.url, ( + f"Expected auth redirect for killswitch, got {page.url}" + ) + + +@pytest.mark.e2e +def test_fieldkit_link_from_status(live_server: str, authenticated_page: Page) -> None: + """The 'Killswitch Panel' link on the status page navigates correctly.""" + page = authenticated_page + page.goto(f"{live_server}/fieldkit/") + page.wait_for_load_state("networkidle") + + link = page.locator("a[href*='killswitch']") + expect(link).to_be_visible() + link.click() + page.wait_for_load_state("networkidle") + + assert "killswitch" in page.url, f"Expected killswitch URL, got {page.url}" + expect(page.locator("input[name='confirm']")).to_be_visible() diff --git a/tests/e2e/test_keys.py b/tests/e2e/test_keys.py new file mode 100644 index 0000000..b213349 --- /dev/null +++ b/tests/e2e/test_keys.py @@ -0,0 +1,112 @@ +""" +e2e tests for the key management pages (/keys/*). + +These tests verify that: +- The key management dashboard loads and displays identity/channel key state. +- Generating a channel key succeeds (idempotent — second call is a no-op or success). +- Generating an identity key succeeds. + +We do NOT test key export (download) in the browser because Playwright's +download handling requires additional setup and the export route is tested +by the unit tests. The export button presence is verified instead. +""" + +from __future__ import annotations + +import pytest +from playwright.sync_api import Page, expect + + +@pytest.mark.e2e +def test_keys_page_loads(live_server: str, authenticated_page: Page) -> None: + """The /keys/ dashboard loads and shows both key sections.""" + page = authenticated_page + page.goto(f"{live_server}/keys/") + page.wait_for_load_state("networkidle") + + # Two key sections must be present + expect(page.locator("body")).to_contain_text("Channel Key") + expect(page.locator("body")).to_contain_text("Identity") + + +@pytest.mark.e2e +def test_generate_identity_key(live_server: str, authenticated_page: Page) -> None: + """If no identity key exists, generating one succeeds and shows a fingerprint.""" + page = authenticated_page + page.goto(f"{live_server}/keys/") + page.wait_for_load_state("networkidle") + + gen_button = page.locator("form[action*='generate_identity'] button") + + if gen_button.count() > 0: + # No identity — generate one + gen_button.click() + page.wait_for_load_state("networkidle") + + # Success flash or fingerprint visible + body = page.locator("body").inner_text() + assert any(word in body.lower() for word in ("generated", "fingerprint", "identity")), ( + f"Expected identity generation confirmation, got: {body[:300]}" + ) + else: + # Identity already exists — fingerprint should be displayed + expect(page.locator("body")).to_contain_text("Fingerprint") + + +@pytest.mark.e2e +def test_generate_channel_key(live_server: str, authenticated_page: Page) -> None: + """If no channel key exists, generating one succeeds.""" + page = authenticated_page + page.goto(f"{live_server}/keys/") + page.wait_for_load_state("networkidle") + + gen_button = page.locator("form[action*='generate_channel'] button") + + if gen_button.count() > 0: + gen_button.click() + page.wait_for_load_state("networkidle") + + body = page.locator("body").inner_text() + assert any(word in body.lower() for word in ("generated", "fingerprint", "channel")), ( + f"Expected channel key generation confirmation, got: {body[:300]}" + ) + else: + # Channel key already configured + expect(page.locator("body")).to_contain_text("Fingerprint") + + +@pytest.mark.e2e +def test_keys_page_shows_fingerprints_after_generation( + live_server: str, authenticated_page: Page +) -> None: + """After generating both keys the dashboard shows non-empty fingerprints.""" + page = authenticated_page + + # Ensure both keys exist + page.goto(f"{live_server}/keys/") + page.wait_for_load_state("networkidle") + + for action in ("generate_identity", "generate_channel"): + btn = page.locator(f"form[action*='{action}'] button") + if btn.count() > 0: + btn.click() + page.wait_for_load_state("networkidle") + page.goto(f"{live_server}/keys/") + page.wait_for_load_state("networkidle") + + # Both fingerprints should now be visible + fingerprints = page.locator("code") + assert fingerprints.count() >= 2, ( + f"Expected at least 2 fingerprint elements, got {fingerprints.count()}" + ) + + +@pytest.mark.e2e +def test_keys_page_requires_auth(live_server: str, page: Page) -> None: + """Unauthenticated access to /keys/ redirects to /login.""" + page.goto(f"{live_server}/keys/") + page.wait_for_load_state("networkidle") + + assert "/login" in page.url or "/setup" in page.url, ( + f"Expected auth redirect, got {page.url}" + ) diff --git a/tests/e2e/test_navigation.py b/tests/e2e/test_navigation.py new file mode 100644 index 0000000..25f8a4d --- /dev/null +++ b/tests/e2e/test_navigation.py @@ -0,0 +1,161 @@ +""" +e2e tests for general navigation and page health. + +These tests verify that: +- The homepage loads after authentication. +- All primary navigation links resolve without 5xx errors. +- The layout is accessible at a mobile viewport width. + +The navigation link test does NOT follow every link exhaustively — it checks +the primary links that appear in the base navigation bar (the links that every +page shares). Blueprint-specific internal links are covered by their own test +files. +""" + +from __future__ import annotations + +import pytest +from playwright.sync_api import Page, expect + + +# Primary navigation links as rendered by base.html. +# Each entry is (link text substring | href fragment, expected URL fragment). +# We match by href since the link text includes Bootstrap icons which vary. +PRIMARY_NAV_HREFS = [ + "/", + "/encode", + "/decode", + "/generate", + "/attest", + "/verify", + "/keys/", + "/fieldkit/", + "/dropbox/admin", + "/federation/", +] + + +@pytest.mark.e2e +def test_homepage_loads(live_server: str, authenticated_page: Page) -> None: + """The index page loads after login and shows the main feature cards.""" + page = authenticated_page + page.goto(f"{live_server}/") + page.wait_for_load_state("networkidle") + + # Core headings from index.html + expect(page.locator("body")).to_contain_text("FieldWitness") + # At least one of the feature card links is visible + expect(page.locator("a[href='/encode']")).to_be_visible() + + +@pytest.mark.e2e +def test_all_nav_links_no_server_error(live_server: str, authenticated_page: Page) -> None: + """Every primary navigation link returns a non-5xx response.""" + page = authenticated_page + + errors: list[str] = [] + + for href in PRIMARY_NAV_HREFS: + url = f"{live_server}{href}" + response = page.goto(url) + page.wait_for_load_state("networkidle") + + status = response.status if response else None + if status is not None and status >= 500: + errors.append(f"{href} → HTTP {status}") + + assert not errors, "Navigation links returned server errors:\n" + "\n".join(errors) + + +@pytest.mark.e2e +def test_health_endpoint_authenticated(live_server: str, authenticated_page: Page) -> None: + """The /health endpoint returns JSON with full details when authenticated.""" + import json + + page = authenticated_page + + # Use fetch() so we get the JSON body (page.goto would return HTML shell) + result = page.evaluate("""async () => { + const resp = await fetch('/health'); + return {status: resp.status, body: await resp.json()}; + }""") + + assert result["status"] == 200, f"Health check failed with status {result['status']}" + data = result["body"] + assert "status" in data, f"Unexpected health response: {data}" + assert data["status"] in ("ok", "degraded"), f"Unknown health status: {data['status']}" + + +@pytest.mark.e2e +def test_health_endpoint_unauthenticated(live_server: str, page: Page) -> None: + """The /health endpoint returns minimal JSON for unauthenticated callers.""" + result = page.evaluate("""async () => { + const resp = await fetch('/health'); + return {status: resp.status, body: await resp.json()}; + }""") + + assert result["status"] == 200 + data = result["body"] + # Unauthenticated response must have only status and version, not operational details + assert "status" in data + assert "modules" not in data, "Health leaked module details to unauthenticated caller" + assert "keys" not in data, "Health leaked key details to unauthenticated caller" + + +@pytest.mark.e2e +def test_responsive_layout_mobile(live_server: str, authenticated_page: Page) -> None: + """The index page renders without horizontal overflow at 375px viewport width.""" + page = authenticated_page + page.set_viewport_size({"width": 375, "height": 812}) + + page.goto(f"{live_server}/") + page.wait_for_load_state("networkidle") + + # The page must render — verify the main heading is still in the DOM + expect(page.locator("body")).to_contain_text("FieldWitness") + + # Check for horizontal overflow: scrollWidth should not exceed clientWidth + overflow = page.evaluate("""() => { + return document.documentElement.scrollWidth > document.documentElement.clientWidth; + }""") + + assert not overflow, ( + "Page has horizontal overflow at 375px viewport — layout breaks on mobile" + ) + + +@pytest.mark.e2e +def test_page_titles_are_set(live_server: str, authenticated_page: Page) -> None: + """Key pages have non-empty elements (not the default Flask title).""" + page = authenticated_page + + pages_and_expected = [ + ("/", "FieldWitness"), + ("/attest", "Attest"), + ("/verify", "Verify"), + ("/keys/", "Keys"), + ("/fieldkit/", "Fieldkit"), + ] + + for href, expected_fragment in pages_and_expected: + page.goto(f"{live_server}{href}") + page.wait_for_load_state("networkidle") + + title = page.title() + assert expected_fragment.lower() in title.lower(), ( + f"Page {href}: expected title to contain '{expected_fragment}', got '{title}'" + ) + + +@pytest.mark.e2e +def test_logout_link_present_when_authenticated( + live_server: str, authenticated_page: Page +) -> None: + """The navigation bar shows a logout affordance when the user is logged in.""" + page = authenticated_page + page.goto(f"{live_server}/") + page.wait_for_load_state("networkidle") + + # Logout is a POST form in the navbar; we just confirm the form/button exists + logout = page.locator("form[action*='logout'], a[href*='logout']") + assert logout.count() > 0, "No logout link/form found in navigation when authenticated" diff --git a/tests/test_c2pa_importer.py b/tests/test_c2pa_importer.py new file mode 100644 index 0000000..ed2116e --- /dev/null +++ b/tests/test_c2pa_importer.py @@ -0,0 +1,207 @@ +"""Integration tests for c2pa_bridge/importer.py — data-mapping logic. + +All tests run without c2pa-python installed. Tests that exercise the full +import pipeline are skipped automatically when c2pa-python is absent. +""" + +from __future__ import annotations + +from dataclasses import fields +from typing import Any + +import pytest + +from fieldwitness.c2pa_bridge.importer import C2PAImportResult, import_c2pa + + +# --------------------------------------------------------------------------- +# test_c2pa_import_result_dataclass +# --------------------------------------------------------------------------- + + +class TestC2PAImportResultDataclass: + def test_required_fields_exist(self): + expected = {"success", "manifests", "attestation_record", + "fieldwitness_assertions", "trust_status"} + actual = {f.name for f in fields(C2PAImportResult)} + assert expected.issubset(actual) + + def test_error_field_exists(self): + field_names = {f.name for f in fields(C2PAImportResult)} + assert "error" in field_names + + def test_construct_success_result(self): + result = C2PAImportResult( + success=True, + manifests=[], + attestation_record=None, + fieldwitness_assertions={}, + trust_status="unknown", + error=None, + ) + assert result.success is True + assert result.error is None + + def test_construct_failure_result(self): + result = C2PAImportResult( + success=False, + manifests=[], + attestation_record=None, + fieldwitness_assertions={}, + trust_status="invalid", + error="c2pa-python is not installed", + ) + assert result.success is False + assert result.error is not None + + def test_manifests_is_list(self): + result = C2PAImportResult( + success=False, + manifests=[{"key": "value"}], + attestation_record=None, + fieldwitness_assertions={}, + trust_status="unknown", + ) + assert isinstance(result.manifests, list) + + def test_fieldwitness_assertions_is_dict(self): + result = C2PAImportResult( + success=True, + manifests=[], + attestation_record=None, + fieldwitness_assertions={"org.fieldwitness.perceptual-hashes": {}}, + trust_status="self-signed", + ) + assert isinstance(result.fieldwitness_assertions, dict) + + +# --------------------------------------------------------------------------- +# test_import_without_c2pa_returns_error +# --------------------------------------------------------------------------- + + +class TestImportWithoutC2pa: + def test_returns_failure_result(self): + """import_c2pa must never raise — it returns a failure C2PAImportResult.""" + # Pass dummy bytes; without c2pa-python this should fail gracefully. + result = import_c2pa(b"dummy image data", "jpeg") + + # Either c2pa is installed (success possible) or we get a clean failure. + if not result.success: + assert result.error is not None + assert len(result.error) > 0 + + def test_no_exception_raised_for_any_input(self): + """import_c2pa must not propagate exceptions regardless of input.""" + # Various bad inputs — all must be caught internally. + for image_data, fmt in [ + (b"", "jpeg"), + (b"\x00" * 100, "png"), + (b"not an image", "webp"), + ]: + result = import_c2pa(image_data, fmt) + assert isinstance(result, C2PAImportResult) + + def test_failure_result_has_trust_status(self): + """Even a failure result must carry a trust_status string.""" + result = import_c2pa(b"garbage", "jpeg") + assert isinstance(result.trust_status, str) + assert len(result.trust_status) > 0 + + def test_failure_result_has_empty_manifests(self): + """On failure without c2pa, manifests must be an empty list.""" + try: + import c2pa # noqa: F401 + pytest.skip("c2pa-python is installed — this test covers the absent case") + except ImportError: + pass + + result = import_c2pa(b"garbage", "jpeg") + assert result.manifests == [] + + def test_error_message_mentions_install(self): + """When c2pa is absent, the error message must include install guidance.""" + try: + import c2pa # noqa: F401 + pytest.skip("c2pa-python is installed — this test covers the absent case") + except ImportError: + pass + + result = import_c2pa(b"dummy", "jpeg") + assert not result.success + assert "pip install" in (result.error or "") + + def test_unsupported_format_returns_failure(self): + """An unsupported image format must return success=False with an error.""" + try: + import c2pa # noqa: F401 + except ImportError: + pytest.skip("c2pa-python not installed; format validation not reached") + + result = import_c2pa(b"dummy", "bmp") + assert not result.success + assert result.error is not None + + +# --------------------------------------------------------------------------- +# test_trust_status_values +# --------------------------------------------------------------------------- + + +class TestTrustStatusValues: + """Verify the four trust statuses are valid, non-empty strings.""" + + VALID_STATUSES = {"trusted", "self-signed", "unknown", "invalid"} + + def test_trusted_is_valid_string(self): + assert "trusted" in self.VALID_STATUSES + + def test_self_signed_is_valid_string(self): + assert "self-signed" in self.VALID_STATUSES + + def test_unknown_is_valid_string(self): + assert "unknown" in self.VALID_STATUSES + + def test_invalid_is_valid_string(self): + assert "invalid" in self.VALID_STATUSES + + def test_all_statuses_are_non_empty(self): + for status in self.VALID_STATUSES: + assert len(status) > 0 + + def test_result_trust_status_is_one_of_valid(self): + """Every C2PAImportResult.trust_status value must be in the valid set.""" + # Absent c2pa returns "unknown"; with c2pa and corrupt data returns "invalid". + result = import_c2pa(b"not a real image", "jpeg") + assert result.trust_status in self.VALID_STATUSES + + def test_evaluate_trust_invalid_flag(self): + """Internal _evaluate_trust must return 'invalid' when _fw_invalid is set.""" + from fieldwitness.c2pa_bridge.importer import _evaluate_trust + + manifest: dict[str, Any] = {"_fw_invalid": True} + assert _evaluate_trust(manifest, trusted_certs=None) == "invalid" + + def test_evaluate_trust_unknown_for_no_cert(self): + """No cert chain and no invalid flag -> 'unknown'.""" + from fieldwitness.c2pa_bridge.importer import _evaluate_trust + + manifest: dict[str, Any] = {"signature_info": {}} + assert _evaluate_trust(manifest, trusted_certs=None) == "unknown" + + def test_evaluate_trust_self_signed_fw(self): + """Self-signed cert + 'FieldWitness' in claim_generator -> 'self-signed'.""" + from unittest.mock import MagicMock, patch + + from fieldwitness.c2pa_bridge.importer import _evaluate_trust + + dummy_pem = "DUMMY_PEM" + + with patch("fieldwitness.c2pa_bridge.importer._cert_is_self_signed", return_value=True): + manifest: dict[str, Any] = { + "claim_generator": "FieldWitness/0.3.0", + "signature_info": {"cert_chain": [dummy_pem]}, + } + result = _evaluate_trust(manifest, trusted_certs=None) + + assert result == "self-signed" diff --git a/tests/test_evidence_summary.py b/tests/test_evidence_summary.py new file mode 100644 index 0000000..444d6ff --- /dev/null +++ b/tests/test_evidence_summary.py @@ -0,0 +1,421 @@ +"""Integration tests for evidence_summary.py — HTML/PDF summary generation.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from fieldwitness.evidence_summary import build_summaries, generate_html_summary + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _minimal_manifest(**overrides: Any) -> dict[str, Any]: + """Return a minimal manifest dict with sensible defaults.""" + base: dict[str, Any] = { + "exported_at": "2026-03-15T10:00:00+00:00", + "investigation": "test-investigation", + "attestation_records": [ + { + "filename": "photo.jpg", + "file_size": "1.2 MB", + "sha256": "a" * 64, + "attestor_fingerprint": "dead" * 8, + "timestamp": "2026-03-15T09:30:00+00:00", + "image_hashes": { + "sha256": "a" * 64, + }, + } + ], + "chain_records": [ + { + "chain_index": 7, + "record_hash": "b" * 64, + } + ], + "anchors": [], + } + base.update(overrides) + return base + + +# --------------------------------------------------------------------------- +# test_generate_html_summary_basic +# --------------------------------------------------------------------------- + + +class TestGenerateHtmlSummaryBasic: + def test_returns_complete_html_document(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert html.startswith("<!DOCTYPE html>") + assert "</html>" in html + + def test_contains_file_information_section(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert "File Information" in html + + def test_contains_attestation_details_section(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert "Attestation Details" in html + + def test_contains_chain_position_section(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert "Chain Position" in html + + def test_filename_appears_in_output(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert "photo.jpg" in html + + def test_investigation_label_appears(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert "test-investigation" in html + + def test_sha256_abbreviated_appears(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + # The abbreviated form should be present (first 16 chars of "a"*64) + assert "aaaaaaaaaaaaaaaa" in html + + def test_chain_index_appears(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert "7" in html + + def test_verification_instructions_present(self): + manifest = _minimal_manifest() + html = generate_html_summary(manifest) + + assert "verify.py" in html + assert "python verify.py" in html + + def test_version_in_title(self): + html = generate_html_summary(_minimal_manifest(), version="0.3.0") + + assert "0.3.0" in html + + def test_empty_manifest_does_not_raise(self): + """An entirely empty manifest must not raise — all fields have fallbacks.""" + html = generate_html_summary({}) + + assert "<!DOCTYPE html>" in html + + +# --------------------------------------------------------------------------- +# test_generate_html_summary_with_anchors +# --------------------------------------------------------------------------- + + +class TestGenerateHtmlSummaryWithAnchors: + def test_anchor_section_title_present(self): + manifest = _minimal_manifest( + anchors=[ + { + "anchor": { + "anchored_at": "2026-03-15T11:00:00+00:00", + "digest": "c" * 64, + } + } + ] + ) + html = generate_html_summary(manifest) + + assert "RFC 3161" in html + + def test_anchor_timestamp_renders(self): + manifest = _minimal_manifest( + anchors=[ + { + "anchor": { + "anchored_at": "2026-03-15T11:00:00+00:00", + "digest": "c" * 64, + } + } + ] + ) + html = generate_html_summary(manifest) + + # The timestamp should appear in some form (machine or human readable) + assert "2026-03-15" in html + + def test_anchor_digest_renders(self): + manifest = _minimal_manifest( + anchors=[ + { + "anchor": { + "anchored_at": "2026-03-15T11:00:00+00:00", + "digest": "c" * 64, + } + } + ] + ) + html = generate_html_summary(manifest) + + assert "c" * 16 in html # at least the abbreviated form + + def test_multiple_anchors_all_labeled(self): + manifest = _minimal_manifest( + anchors=[ + {"anchor": {"anchored_at": "2026-03-15T11:00:00+00:00", "digest": "d" * 64}}, + {"anchor": {"anchored_at": "2026-03-16T09:00:00+00:00", "digest": "e" * 64}}, + ] + ) + html = generate_html_summary(manifest) + + assert "Anchor 1" in html + assert "Anchor 2" in html + + def test_no_anchors_shows_none_recorded(self): + manifest = _minimal_manifest(anchors=[]) + html = generate_html_summary(manifest) + + assert "None recorded" in html + + +# --------------------------------------------------------------------------- +# test_generate_html_summary_with_perceptual_hashes +# --------------------------------------------------------------------------- + + +class TestGenerateHtmlSummaryWithPerceptualHashes: + def test_perceptual_hash_section_present_when_phash_set(self): + manifest = _minimal_manifest() + manifest["attestation_records"][0]["image_hashes"]["phash"] = "f" * 16 + manifest["attestation_records"][0]["image_hashes"]["dhash"] = "0" * 16 + html = generate_html_summary(manifest) + + assert "Perceptual Hashes" in html + + def test_phash_value_renders(self): + manifest = _minimal_manifest() + manifest["attestation_records"][0]["image_hashes"]["phash"] = "aabbccdd11223344" + html = generate_html_summary(manifest) + + assert "aabbccdd11223344" in html + + def test_dhash_value_renders(self): + manifest = _minimal_manifest() + manifest["attestation_records"][0]["image_hashes"]["phash"] = "1234" * 4 + manifest["attestation_records"][0]["image_hashes"]["dhash"] = "5678" * 4 + html = generate_html_summary(manifest) + + assert "5678" * 4 in html + + def test_perceptual_hash_note_present(self): + manifest = _minimal_manifest() + manifest["attestation_records"][0]["image_hashes"]["phash"] = "f" * 16 + html = generate_html_summary(manifest) + + assert "format conversion" in html or "mild compression" in html + + +# --------------------------------------------------------------------------- +# test_generate_html_summary_no_perceptual_hashes +# --------------------------------------------------------------------------- + + +class TestGenerateHtmlSummaryNoPerceptualHashes: + def test_perceptual_hash_section_absent_for_non_image(self): + """The Perceptual Hashes section must not appear when phash and dhash are absent. + + generate_html_summary gates the entire section on ``if phash or dhash``, so + for non-image files the section is simply omitted — it does not render a + 'Not applicable' placeholder. + """ + manifest = _minimal_manifest() + manifest["attestation_records"][0]["image_hashes"] = {"sha256": "a" * 64} + html = generate_html_summary(manifest) + + assert "Perceptual Hashes" not in html + + def test_empty_string_hashes_omit_section(self): + """Empty-string phash/dhash must be treated the same as missing keys — section absent.""" + manifest = _minimal_manifest() + manifest["attestation_records"][0]["image_hashes"]["phash"] = "" + manifest["attestation_records"][0]["image_hashes"]["dhash"] = "" + html = generate_html_summary(manifest) + + assert "Perceptual Hashes" not in html + + def test_sha256_still_shown_without_perceptual_hashes(self): + """SHA-256 must still appear in File Information even without perceptual hashes.""" + manifest = _minimal_manifest() + manifest["attestation_records"][0]["image_hashes"] = {"sha256": "a" * 64} + html = generate_html_summary(manifest) + + # The abbreviated SHA-256 appears in the File Information section. + assert "aaaaaaaaaaaaaaaa" in html + + +# --------------------------------------------------------------------------- +# test_generate_html_summary_multiple_records +# --------------------------------------------------------------------------- + + +class TestGenerateHtmlSummaryMultipleRecords: + def test_multi_record_note_appears(self): + second_record = { + "filename": "photo2.jpg", + "file_size": "800 KB", + "sha256": "b" * 64, + "attestor_fingerprint": "cafe" * 8, + "timestamp": "2026-03-15T10:00:00+00:00", + "image_hashes": {"sha256": "b" * 64}, + } + manifest = _minimal_manifest() + manifest["attestation_records"].append(second_record) + html = generate_html_summary(manifest) + + assert "2 attested file" in html + + def test_multi_record_refers_to_manifest_json(self): + second = { + "filename": "doc.pdf", + "sha256": "c" * 64, + "attestor_fingerprint": "beef" * 8, + "timestamp": "2026-03-15T10:30:00+00:00", + "image_hashes": {"sha256": "c" * 64}, + } + manifest = _minimal_manifest() + manifest["attestation_records"].append(second) + html = generate_html_summary(manifest) + + assert "manifest.json" in html + + def test_first_record_details_are_shown(self): + """With multiple records, the first record's filename appears in File Information.""" + second = { + "filename": "other.jpg", + "sha256": "d" * 64, + "attestor_fingerprint": "0000" * 8, + "timestamp": "2026-03-15T11:00:00+00:00", + "image_hashes": {"sha256": "d" * 64}, + } + manifest = _minimal_manifest() + manifest["attestation_records"].append(second) + html = generate_html_summary(manifest) + + # First record's filename must be present + assert "photo.jpg" in html + + def test_single_record_has_no_multi_note(self): + manifest = _minimal_manifest() + assert len(manifest["attestation_records"]) == 1 + html = generate_html_summary(manifest) + + assert "attested file" not in html + + +# --------------------------------------------------------------------------- +# test_build_summaries_returns_html +# --------------------------------------------------------------------------- + + +class TestBuildSummaries: + def test_always_returns_summary_html(self): + manifest = _minimal_manifest() + result = build_summaries(manifest) + + assert "summary.html" in result + assert isinstance(result["summary.html"], bytes) + + def test_html_is_valid_utf8(self): + manifest = _minimal_manifest() + result = build_summaries(manifest) + + # Must decode without error + decoded = result["summary.html"].decode("utf-8") + assert "<!DOCTYPE html>" in decoded + + def test_pdf_returned_when_xhtml2pdf_available(self): + """If xhtml2pdf is installed, summary.pdf must be in the result.""" + try: + import xhtml2pdf # noqa: F401 + except ImportError: + pytest.skip("xhtml2pdf not installed") + + manifest = _minimal_manifest() + result = build_summaries(manifest) + + assert "summary.pdf" in result + assert isinstance(result["summary.pdf"], bytes) + assert len(result["summary.pdf"]) > 0 + + def test_no_pdf_when_xhtml2pdf_absent(self, monkeypatch: pytest.MonkeyPatch): + """When xhtml2pdf is not importable, summary.pdf must be absent.""" + import builtins + + real_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name == "xhtml2pdf": + raise ImportError("forced absence") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", mock_import) + + manifest = _minimal_manifest() + result = build_summaries(manifest) + + assert "summary.pdf" not in result + assert "summary.html" in result + + +# --------------------------------------------------------------------------- +# test_html_summary_contains_no_script_tags — security +# --------------------------------------------------------------------------- + + +class TestHtmlSummarySecurityNoScriptInjection: + def test_no_script_tags_from_normal_manifest(self): + html = generate_html_summary(_minimal_manifest()) + + assert "<script" not in html.lower() + + def test_script_in_filename_is_escaped(self): + manifest = _minimal_manifest() + manifest["attestation_records"][0]["filename"] = '<script>alert(1)</script>' + html = generate_html_summary(manifest) + + assert "<script>" not in html + assert "<script>" in html + + def test_script_in_investigation_is_escaped(self): + manifest = _minimal_manifest(investigation='"><script>alert(1)</script>') + html = generate_html_summary(manifest) + + assert "<script>" not in html + + def test_script_in_attestor_fingerprint_is_escaped(self): + manifest = _minimal_manifest() + manifest["attestation_records"][0]["attestor_fingerprint"] = ( + '"><script>evil()</script>' + ) + html = generate_html_summary(manifest) + + assert "<script>" not in html + + def test_script_in_anchor_digest_is_escaped(self): + manifest = _minimal_manifest( + anchors=[{"anchor": {"anchored_at": "2026-01-01T00:00:00Z", + "digest": '"><script>x()</script>'}}] + ) + html = generate_html_summary(manifest) + + assert "<script>" not in html diff --git a/tests/test_file_attestation.py b/tests/test_file_attestation.py new file mode 100644 index 0000000..a366258 --- /dev/null +++ b/tests/test_file_attestation.py @@ -0,0 +1,244 @@ +"""Integration tests for hash_file() — all-file-type attestation hashing.""" + +from __future__ import annotations + +import hashlib +import os +from io import BytesIO + +import pytest +from PIL import Image + +from fieldwitness.attest.hashing import hash_file +from fieldwitness.attest.models import ImageHashes + + +# --------------------------------------------------------------------------- +# File creation helpers +# --------------------------------------------------------------------------- + + +def _make_png(width: int = 50, height: int = 50, color: tuple = (128, 64, 32)) -> bytes: + """Create a minimal valid PNG in memory.""" + img = Image.new("RGB", (width, height), color) + buf = BytesIO() + img.save(buf, format="PNG") + return buf.getvalue() + + +def _make_pdf() -> bytes: + """Return a valid minimal PDF as raw bytes.""" + return ( + b"%PDF-1.4\n" + b"1 0 obj<</Type /Catalog /Pages 2 0 R>>endobj\n" + b"2 0 obj<</Type /Pages /Kids [3 0 R] /Count 1>>endobj\n" + b"3 0 obj<</Type /Page /Parent 2 0 R /MediaBox [0 0 612 792]>>endobj\n" + b"xref\n0 4\n" + b"0000000000 65535 f\r\n" + b"0000000009 00000 n\r\n" + b"0000000058 00000 n\r\n" + b"0000000115 00000 n\r\n" + b"trailer<</Size 4 /Root 1 0 R>>\n" + b"startxref\n196\n%%EOF" + ) + + +def _make_csv() -> bytes: + """Return a simple CSV file as bytes.""" + return b"id,name,value\n1,alpha,100\n2,beta,200\n3,gamma,300\n" + + +# --------------------------------------------------------------------------- +# test_hash_image_file +# --------------------------------------------------------------------------- + + +class TestHashImageFile: + def test_sha256_populated(self): + hashes = hash_file(_make_png()) + assert hashes.sha256 + assert len(hashes.sha256) == 64 + + def test_phash_populated(self): + hashes = hash_file(_make_png()) + # phash must be a non-empty string for a valid image + assert isinstance(hashes.phash, str) + assert len(hashes.phash) > 0 + + def test_dhash_populated(self): + hashes = hash_file(_make_png()) + assert isinstance(hashes.dhash, str) + assert len(hashes.dhash) > 0 + + def test_returns_image_hashes_instance(self): + result = hash_file(_make_png()) + assert isinstance(result, ImageHashes) + + def test_sha256_matches_direct_computation(self): + png_data = _make_png() + hashes = hash_file(png_data) + expected = hashlib.sha256(png_data).hexdigest() + assert hashes.sha256 == expected + + +# --------------------------------------------------------------------------- +# test_hash_pdf_file +# --------------------------------------------------------------------------- + + +class TestHashPdfFile: + def test_sha256_populated(self): + hashes = hash_file(_make_pdf()) + assert hashes.sha256 + assert len(hashes.sha256) == 64 + + def test_phash_empty_for_non_image(self): + """PDF files must have phash == '' (PIL cannot decode them).""" + hashes = hash_file(_make_pdf()) + assert hashes.phash == "" + + def test_dhash_empty_for_non_image(self): + hashes = hash_file(_make_pdf()) + assert hashes.dhash == "" + + def test_sha256_correct(self): + pdf_data = _make_pdf() + expected = hashlib.sha256(pdf_data).hexdigest() + assert hash_file(pdf_data).sha256 == expected + + +# --------------------------------------------------------------------------- +# test_hash_csv_file +# --------------------------------------------------------------------------- + + +class TestHashCsvFile: + def test_sha256_populated(self): + hashes = hash_file(_make_csv()) + assert hashes.sha256 + assert len(hashes.sha256) == 64 + + def test_phash_empty(self): + assert hash_file(_make_csv()).phash == "" + + def test_dhash_empty(self): + assert hash_file(_make_csv()).dhash == "" + + def test_sha256_correct(self): + csv_data = _make_csv() + assert hash_file(csv_data).sha256 == hashlib.sha256(csv_data).hexdigest() + + +# --------------------------------------------------------------------------- +# test_hash_empty_file +# --------------------------------------------------------------------------- + + +class TestHashEmptyFile: + def test_does_not_crash(self): + """Hashing empty bytes must not raise any exception.""" + result = hash_file(b"") + assert isinstance(result, ImageHashes) + + def test_sha256_of_empty_bytes(self): + """SHA-256 of empty bytes is the well-known constant.""" + empty_sha256 = hashlib.sha256(b"").hexdigest() + assert hash_file(b"").sha256 == empty_sha256 + + def test_phash_and_dhash_empty_or_str(self): + result = hash_file(b"") + # Must be strings (possibly empty), never None + assert isinstance(result.phash, str) + assert isinstance(result.dhash, str) + + +# --------------------------------------------------------------------------- +# test_hash_large_file +# --------------------------------------------------------------------------- + + +class TestHashLargeFile: + def test_sha256_correct_for_10mb(self): + """SHA-256 must be correct for a 10 MB random payload.""" + data = os.urandom(10 * 1024 * 1024) + expected = hashlib.sha256(data).hexdigest() + result = hash_file(data) + assert result.sha256 == expected + + def test_large_file_does_not_raise(self): + data = os.urandom(10 * 1024 * 1024) + result = hash_file(data) + assert isinstance(result, ImageHashes) + + def test_large_non_image_has_empty_perceptual_hashes(self): + data = os.urandom(10 * 1024 * 1024) + result = hash_file(data) + assert result.phash == "" + assert result.dhash == "" + + +# --------------------------------------------------------------------------- +# test_hash_file_deterministic +# --------------------------------------------------------------------------- + + +class TestHashFileDeterministic: + def test_same_image_twice_identical_sha256(self): + data = _make_png() + h1 = hash_file(data) + h2 = hash_file(data) + assert h1.sha256 == h2.sha256 + + def test_same_image_twice_identical_phash(self): + data = _make_png() + h1 = hash_file(data) + h2 = hash_file(data) + assert h1.phash == h2.phash + + def test_same_image_twice_identical_dhash(self): + data = _make_png() + h1 = hash_file(data) + h2 = hash_file(data) + assert h1.dhash == h2.dhash + + def test_same_binary_blob_twice_identical(self): + data = os.urandom(4096) + h1 = hash_file(data) + h2 = hash_file(data) + assert h1.sha256 == h2.sha256 + + def test_same_csv_twice_identical(self): + data = _make_csv() + assert hash_file(data).sha256 == hash_file(data).sha256 + + +# --------------------------------------------------------------------------- +# test_hash_file_different_content +# --------------------------------------------------------------------------- + + +class TestHashFileDifferentContent: + def test_different_images_different_sha256(self): + red = _make_png(color=(255, 0, 0)) + blue = _make_png(color=(0, 0, 255)) + assert hash_file(red).sha256 != hash_file(blue).sha256 + + def test_different_binary_blobs_different_sha256(self): + a = os.urandom(1024) + b = os.urandom(1024) + # Astronomically unlikely to collide, but guard anyway + assert a != b + assert hash_file(a).sha256 != hash_file(b).sha256 + + def test_different_csvs_different_sha256(self): + csv1 = b"a,b\n1,2\n" + csv2 = b"a,b\n3,4\n" + assert hash_file(csv1).sha256 != hash_file(csv2).sha256 + + def test_one_bit_flip_changes_sha256(self): + """Changing a single byte must produce a completely different SHA-256.""" + pdf = bytearray(_make_pdf()) + pdf[-1] ^= 0xFF + original_hash = hash_file(_make_pdf()).sha256 + mutated_hash = hash_file(bytes(pdf)).sha256 + assert original_hash != mutated_hash diff --git a/tests/test_killswitch_coverage.py b/tests/test_killswitch_coverage.py new file mode 100644 index 0000000..04eff9f --- /dev/null +++ b/tests/test_killswitch_coverage.py @@ -0,0 +1,244 @@ +"""Verify the killswitch covers the new paths added in v0.3.0. + +These tests inspect the execution plan of execute_purge() by running it against +a populated temporary directory and asserting that the relevant step names +appear in PurgeResult.steps_completed. + +Each test is independent and uses its own tmp_path fixture, following the same +pattern as test_killswitch.py. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import ( + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, +) + + +# --------------------------------------------------------------------------- +# Fixture +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def populated_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Create a minimal populated .fieldwitness directory for killswitch tests.""" + import fieldwitness.paths as paths + + data_dir = tmp_path / ".fieldwitness" + data_dir.mkdir() + monkeypatch.setattr(paths, "BASE_DIR", data_dir) + + # Identity + identity_dir = data_dir / "identity" + identity_dir.mkdir() + key = Ed25519PrivateKey.generate() + priv_pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) + (identity_dir / "private.pem").write_bytes(priv_pem) + pub_pem = key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) + (identity_dir / "public.pem").write_bytes(pub_pem) + + # Channel key + stego_dir = data_dir / "stego" + stego_dir.mkdir() + (stego_dir / "channel.key").write_text("channel-key-material") + + # Trusted keys directory with a dummy collaborator key + trusted_dir = data_dir / "trusted_keys" + trusted_dir.mkdir() + fp_dir = trusted_dir / "aabbcc112233" + fp_dir.mkdir() + (fp_dir / "public.pem").write_bytes(pub_pem) + (fp_dir / "meta.json").write_text('{"alias": "Alice"}') + + # Carrier history + (data_dir / "carrier_history.json").write_text('{"carriers": []}') + + # Tor hidden service directory with a dummy key + tor_dir = data_dir / "fieldkit" / "tor" / "hidden_service" + tor_dir.mkdir(parents=True) + (tor_dir / "hs_ed25519_secret_key").write_text("ED25519-V3:fakekeydata") + + # Flask instance secret + instance_dir = data_dir / "instance" + instance_dir.mkdir() + (instance_dir / ".secret_key").write_bytes(b"flask-secret") + + # Auth DB + auth_dir = data_dir / "auth" + auth_dir.mkdir() + (auth_dir / "fieldwitness.db").write_bytes(b"sqlite3 db") + + # Attestations + att_dir = data_dir / "attestations" + att_dir.mkdir() + (att_dir / "log.bin").write_bytes(b"attestation data") + + # Chain + chain_dir = data_dir / "chain" + chain_dir.mkdir() + (chain_dir / "chain.bin").write_bytes(b"chain data") + + # Temp + temp_dir = data_dir / "temp" + temp_dir.mkdir() + (temp_dir / "upload.tmp").write_bytes(b"temp file") + + # Config + (data_dir / "config.json").write_text("{}") + + return data_dir + + +# --------------------------------------------------------------------------- +# test_killswitch_covers_tor_keys +# --------------------------------------------------------------------------- + + +class TestKillswitchCoversTorKeys: + def test_tor_key_step_in_keys_only_plan(self, populated_dir: Path): + """KEYS_ONLY purge must include destroy_tor_hidden_service_key.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + assert "destroy_tor_hidden_service_key" in result.steps_completed + + def test_tor_key_step_in_all_plan(self, populated_dir: Path): + """ALL purge must also include destroy_tor_hidden_service_key.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.ALL, reason="test") + + assert "destroy_tor_hidden_service_key" in result.steps_completed + + def test_tor_hidden_service_dir_destroyed_by_keys_only(self, populated_dir: Path): + """The actual directory on disk must be gone after KEYS_ONLY purge.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + tor_dir = populated_dir / "fieldkit" / "tor" / "hidden_service" + assert tor_dir.exists(), "Test setup: tor hidden service dir must exist before purge" + + execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + assert not tor_dir.exists(), ( + "Tor hidden service key directory must be destroyed by KEYS_ONLY purge" + ) + + def test_tor_key_step_runs_before_data_steps(self, populated_dir: Path): + """Tor key destruction must precede data-layer steps (ordered destruction).""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.ALL, reason="test") + + completed = result.steps_completed + tor_idx = completed.index("destroy_tor_hidden_service_key") + # Attestation log and chain are data steps; they should come after key steps. + if "destroy_attestation_log" in completed: + att_idx = completed.index("destroy_attestation_log") + assert tor_idx < att_idx, ( + "Tor key must be destroyed before attestation log in the ordered plan" + ) + + +# --------------------------------------------------------------------------- +# test_killswitch_covers_trusted_keys +# --------------------------------------------------------------------------- + + +class TestKillswitchCoversTrustedKeys: + def test_trusted_keys_step_in_keys_only_plan(self, populated_dir: Path): + """KEYS_ONLY purge must include destroy_trusted_keys.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + assert "destroy_trusted_keys" in result.steps_completed + + def test_trusted_keys_step_in_all_plan(self, populated_dir: Path): + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.ALL, reason="test") + + assert "destroy_trusted_keys" in result.steps_completed + + def test_trusted_keys_dir_destroyed_by_keys_only(self, populated_dir: Path): + """The trusted_keys directory must be gone after KEYS_ONLY purge.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + trusted_dir = populated_dir / "trusted_keys" + assert trusted_dir.exists(), "Test setup: trusted_keys dir must exist before purge" + + execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + assert not trusted_dir.exists(), ( + "trusted_keys directory must be destroyed by KEYS_ONLY purge" + ) + + def test_trusted_keys_destroyed_recursively(self, populated_dir: Path): + """Sub-directories with per-key material must also be gone.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + key_subdir = populated_dir / "trusted_keys" / "aabbcc112233" + assert key_subdir.exists() + + execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + assert not key_subdir.exists() + + +# --------------------------------------------------------------------------- +# test_killswitch_covers_carrier_history +# --------------------------------------------------------------------------- + + +class TestKillswitchCoversCarrierHistory: + def test_carrier_history_step_in_all_plan(self, populated_dir: Path): + """ALL purge must include destroy_carrier_history.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.ALL, reason="test") + + assert "destroy_carrier_history" in result.steps_completed + + def test_carrier_history_file_destroyed_by_all(self, populated_dir: Path): + """The carrier_history.json file must be gone after ALL purge.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + carrier_file = populated_dir / "carrier_history.json" + assert carrier_file.exists(), "Test setup: carrier_history.json must exist before purge" + + execute_purge(PurgeScope.ALL, reason="test") + + assert not carrier_file.exists(), ( + "carrier_history.json must be destroyed by ALL purge" + ) + + def test_carrier_history_not_destroyed_by_keys_only(self, populated_dir: Path): + """KEYS_ONLY purge must NOT destroy carrier_history — it is not key material.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + carrier_file = populated_dir / "carrier_history.json" + + execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + # carrier_history is a data file, not key material — KEYS_ONLY preserves it. + assert carrier_file.exists(), ( + "carrier_history.json must be preserved by KEYS_ONLY purge " + "(it is not key material)" + ) + + def test_carrier_history_step_absent_from_keys_only_plan(self, populated_dir: Path): + """destroy_carrier_history must not appear in KEYS_ONLY completed steps.""" + from fieldwitness.fieldkit.killswitch import PurgeScope, execute_purge + + result = execute_purge(PurgeScope.KEYS_ONLY, reason="test") + + assert "destroy_carrier_history" not in result.steps_completed diff --git a/tests/test_paths.py b/tests/test_paths.py new file mode 100644 index 0000000..9fca29a --- /dev/null +++ b/tests/test_paths.py @@ -0,0 +1,217 @@ +"""Tests for the centralized path registry (fieldwitness/paths.py).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _fresh_paths_module(monkeypatch: pytest.MonkeyPatch, base_dir: Path): + """Return the paths module with BASE_DIR patched to base_dir. + + The monkeypatch is applied to the already-imported module attribute so that + __getattr__ picks up the new value on subsequent calls. + """ + import fieldwitness.paths as paths + + monkeypatch.setattr(paths, "BASE_DIR", base_dir) + return paths + + +# --------------------------------------------------------------------------- +# test_base_dir_default_is_fwmetadata +# --------------------------------------------------------------------------- + + +class TestBaseDirDefault: + def test_default_base_dir_ends_with_fwmetadata(self, monkeypatch): + """Verify the default data directory name is .fwmetadata.""" + import os + + monkeypatch.delenv("FIELDWITNESS_DATA_DIR", raising=False) + # Re-derive the default the same way paths.py does at import time. + default = Path.home() / ".fwmetadata" + assert default.name == ".fwmetadata" + + def test_default_base_dir_is_under_home(self, monkeypatch): + """Verify the default data directory is under the user's home.""" + monkeypatch.delenv("FIELDWITNESS_DATA_DIR", raising=False) + default = Path.home() / ".fwmetadata" + assert str(default).startswith(str(Path.home())) + + +# --------------------------------------------------------------------------- +# test_base_dir_override_via_env +# --------------------------------------------------------------------------- + + +class TestBaseDirOverrideViaEnv: + def test_env_override_changes_base_dir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + """Setting FIELDWITNESS_DATA_DIR must relocate BASE_DIR.""" + custom = tmp_path / "custom-fw-dir" + monkeypatch.setenv("FIELDWITNESS_DATA_DIR", str(custom)) + + # Re-evaluate the path that the module would compute at import time. + # Since BASE_DIR is module-level, we test it after patching the attribute. + import fieldwitness.paths as paths + + monkeypatch.setattr(paths, "BASE_DIR", custom) + + assert paths.BASE_DIR == custom + + def test_derived_paths_follow_overridden_base_dir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + """Derived paths (IDENTITY_DIR etc.) must be under the overridden BASE_DIR.""" + custom = tmp_path / "relocated" + paths = _fresh_paths_module(monkeypatch, custom) + + assert str(paths.IDENTITY_DIR).startswith(str(custom)) + assert str(paths.CHAIN_DIR).startswith(str(custom)) + assert str(paths.ATTESTATIONS_DIR).startswith(str(custom)) + + +# --------------------------------------------------------------------------- +# test_trusted_keys_dir_exists +# --------------------------------------------------------------------------- + + +class TestTrustedKeysDirDefined: + def test_trusted_keys_dir_is_defined(self): + import fieldwitness.paths as paths + + # Access must not raise AttributeError + td = paths.TRUSTED_KEYS_DIR + assert isinstance(td, Path) + + def test_trusted_keys_dir_under_base_dir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + paths = _fresh_paths_module(monkeypatch, tmp_path / ".fwmetadata") + + assert str(paths.TRUSTED_KEYS_DIR).startswith(str(paths.BASE_DIR)) + + def test_trusted_keys_dir_name(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + paths = _fresh_paths_module(monkeypatch, tmp_path / ".fw") + + assert paths.TRUSTED_KEYS_DIR.name == "trusted_keys" + + +# --------------------------------------------------------------------------- +# test_carrier_history_exists +# --------------------------------------------------------------------------- + + +class TestCarrierHistoryDefined: + def test_carrier_history_is_defined(self): + import fieldwitness.paths as paths + + ch = paths.CARRIER_HISTORY + assert isinstance(ch, Path) + + def test_carrier_history_under_base_dir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + paths = _fresh_paths_module(monkeypatch, tmp_path / ".fwmetadata") + + assert str(paths.CARRIER_HISTORY).startswith(str(paths.BASE_DIR)) + + def test_carrier_history_filename(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + paths = _fresh_paths_module(monkeypatch, tmp_path / ".fw") + + assert paths.CARRIER_HISTORY.name == "carrier_history.json" + + +# --------------------------------------------------------------------------- +# test_tor_dir_exists +# --------------------------------------------------------------------------- + + +class TestTorDirDefined: + def test_tor_dir_is_defined(self): + import fieldwitness.paths as paths + + td = paths.TOR_DIR + assert isinstance(td, Path) + + def test_tor_hidden_service_dir_is_defined(self): + import fieldwitness.paths as paths + + hs = paths.TOR_HIDDEN_SERVICE_DIR + assert isinstance(hs, Path) + + def test_tor_hidden_service_dir_under_tor_dir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + paths = _fresh_paths_module(monkeypatch, tmp_path / ".fwmetadata") + + assert str(paths.TOR_HIDDEN_SERVICE_DIR).startswith(str(paths.TOR_DIR)) + + def test_tor_dir_under_fieldkit_dir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + paths = _fresh_paths_module(monkeypatch, tmp_path / ".fwmetadata") + + assert str(paths.TOR_DIR).startswith(str(paths.FIELDKIT_DIR)) + + +# --------------------------------------------------------------------------- +# test_all_paths_under_base_dir +# --------------------------------------------------------------------------- + + +class TestAllPathsUnderBaseDir: + # Names that are files directly under BASE_DIR (one-segment paths where + # the parent IS BASE_DIR, not a sub-directory). + _SINGLE_SEGMENT = {"AUDIT_LOG", "CONFIG_FILE", "CARRIER_HISTORY", "LAST_BACKUP"} + + def test_every_defined_path_is_under_base_dir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + """Every path in _PATH_DEFS must resolve to a location inside BASE_DIR.""" + import fieldwitness.paths as _paths_module + + base = tmp_path / ".fwmetadata" + monkeypatch.setattr(_paths_module, "BASE_DIR", base) + + for name in _paths_module._PATH_DEFS: + resolved: Path = _paths_module.__getattr__(name) + assert str(resolved).startswith(str(base)), ( + f"Path {name!r} resolves to {resolved}, which is outside BASE_DIR {base}" + ) + + def test_no_absolute_hardcoded_paths_outside_base( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + """Changing BASE_DIR must change ALL derived paths — no hardcoded roots.""" + import fieldwitness.paths as _paths_module + + original_base = _paths_module.BASE_DIR + new_base = tmp_path / "relocated" + monkeypatch.setattr(_paths_module, "BASE_DIR", new_base) + + for name in _paths_module._PATH_DEFS: + resolved: Path = _paths_module.__getattr__(name) + # Must be under the new base, not the original one + assert str(resolved).startswith(str(new_base)), ( + f"Path {name!r} still points under the old BASE_DIR after override" + ) + + def test_path_defs_is_non_empty(self): + import fieldwitness.paths as paths + + assert len(paths._PATH_DEFS) > 0 + + def test_unknown_attribute_raises_attribute_error(self): + import fieldwitness.paths as paths + + with pytest.raises(AttributeError): + paths.__getattr__("DOES_NOT_EXIST_9999") diff --git a/tests/test_tor.py b/tests/test_tor.py new file mode 100644 index 0000000..b895133 --- /dev/null +++ b/tests/test_tor.py @@ -0,0 +1,195 @@ +"""Unit tests for fieldwitness.fieldkit.tor — all run without Tor installed.""" + +from __future__ import annotations + +import sys +import types +from dataclasses import fields +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _reload_tor_module(monkeypatch: pytest.MonkeyPatch, *, stem_available: bool): + """Reload the tor module with stem either importable or not. + + Because the module checks `import stem` at module scope, we must manipulate + sys.modules before importing (or re-importing) the module. + """ + # Remove the cached module so the import guard re-evaluates. + for key in list(sys.modules.keys()): + if key == "stem" or key.startswith("fieldwitness.fieldkit.tor"): + del sys.modules[key] + + if not stem_available: + # Make `import stem` raise ImportError + monkeypatch.setitem(sys.modules, "stem", None) # type: ignore[call-overload] + else: + # Install a minimal stub that satisfies `import stem` + stub = types.ModuleType("stem") + monkeypatch.setitem(sys.modules, "stem", stub) + stub_control = types.ModuleType("stem.control") + stub_control.Controller = MagicMock() # type: ignore[attr-defined] + monkeypatch.setitem(sys.modules, "stem.control", stub_control) + + import fieldwitness.fieldkit.tor as tor_module + + return tor_module + + +# --------------------------------------------------------------------------- +# test_has_tor_returns_false_without_stem +# --------------------------------------------------------------------------- + + +class TestHasTorWithoutStem: + def test_has_tor_false_when_stem_absent(self, monkeypatch: pytest.MonkeyPatch): + """has_tor() must return False when stem is not importable.""" + # Remove any cached stem import + for key in list(sys.modules.keys()): + if key == "stem" or key.startswith("stem."): + del sys.modules[key] + + monkeypatch.setitem(sys.modules, "stem", None) # type: ignore[call-overload] + + # Re-import _availability after evicting the cached module + for key in list(sys.modules.keys()): + if key == "fieldwitness._availability": + del sys.modules[key] + + from fieldwitness._availability import has_tor + + assert has_tor() is False + + +# --------------------------------------------------------------------------- +# test_start_onion_service_without_stem_raises +# --------------------------------------------------------------------------- + + +class TestStartOnionServiceWithoutStem: + def test_raises_tor_not_available(self, monkeypatch: pytest.MonkeyPatch): + """start_onion_service() must raise TorNotAvailableError when stem is absent.""" + tor = _reload_tor_module(monkeypatch, stem_available=False) + + with pytest.raises(tor.TorNotAvailableError): + tor.start_onion_service(target_port=5000) + + def test_error_message_mentions_install(self, monkeypatch: pytest.MonkeyPatch): + """The error message must guide the operator to install stem.""" + tor = _reload_tor_module(monkeypatch, stem_available=False) + + with pytest.raises(tor.TorNotAvailableError, match="pip install"): + tor.start_onion_service(target_port=5000) + + def test_tor_not_available_is_not_tor_control_error(self, monkeypatch: pytest.MonkeyPatch): + """TorNotAvailableError and TorControlError must be distinct exception types.""" + tor = _reload_tor_module(monkeypatch, stem_available=False) + + assert tor.TorNotAvailableError is not tor.TorControlError + + +# --------------------------------------------------------------------------- +# test_onion_service_info_dataclass +# --------------------------------------------------------------------------- + + +class TestOnionServiceInfoDataclass: + def test_fields_exist(self): + from fieldwitness.fieldkit.tor import OnionServiceInfo + + field_names = {f.name for f in fields(OnionServiceInfo)} + assert "onion_address" in field_names + assert "target_port" in field_names + assert "is_persistent" in field_names + + def test_onion_url_property(self): + from fieldwitness.fieldkit.tor import OnionServiceInfo + + info = OnionServiceInfo( + onion_address="abc123.onion", + target_port=5000, + is_persistent=True, + ) + assert info.onion_url == "http://abc123.onion" + + def test_frozen_dataclass_rejects_mutation(self): + from fieldwitness.fieldkit.tor import OnionServiceInfo + + info = OnionServiceInfo( + onion_address="abc123.onion", + target_port=5000, + is_persistent=False, + ) + with pytest.raises((AttributeError, TypeError)): + info.onion_address = "evil.onion" # type: ignore[misc] + + def test_is_persistent_false(self): + from fieldwitness.fieldkit.tor import OnionServiceInfo + + info = OnionServiceInfo( + onion_address="xyz.onion", + target_port=8080, + is_persistent=False, + ) + assert info.is_persistent is False + + def test_target_port_stored(self): + from fieldwitness.fieldkit.tor import OnionServiceInfo + + info = OnionServiceInfo( + onion_address="test.onion", + target_port=9999, + is_persistent=True, + ) + assert info.target_port == 9999 + + +# --------------------------------------------------------------------------- +# test_persistent_key_storage_path +# --------------------------------------------------------------------------- + + +class TestPersistentKeyStoragePath: + def test_key_stored_under_tor_hidden_service_dir( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path + ): + """The persistent key must be written inside paths.TOR_HIDDEN_SERVICE_DIR.""" + import fieldwitness.paths as paths + + # Redirect BASE_DIR to a temp location + monkeypatch.setattr(paths, "BASE_DIR", tmp_path / ".fwmetadata") + + tor_dir = paths.TOR_HIDDEN_SERVICE_DIR + + # Verify the resolved path sits under BASE_DIR / fieldkit / tor / hidden_service + assert str(tor_dir).startswith(str(tmp_path)) + assert "fieldkit" in str(tor_dir) + assert "tor" in str(tor_dir) + assert "hidden_service" in str(tor_dir) + + def test_tor_dir_is_child_of_base_dir(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + import fieldwitness.paths as paths + + monkeypatch.setattr(paths, "BASE_DIR", tmp_path / ".fwmetadata") + + assert str(paths.TOR_HIDDEN_SERVICE_DIR).startswith(str(paths.BASE_DIR)) + + def test_key_filename_in_expected_location( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path + ): + """The key file used by _start_persistent_service must be 'hs_ed25519_secret_key'.""" + import fieldwitness.paths as paths + + monkeypatch.setattr(paths, "BASE_DIR", tmp_path / ".fwmetadata") + + expected_key_file = paths.TOR_HIDDEN_SERVICE_DIR / "hs_ed25519_secret_key" + # We're verifying the path structure, not the file's existence + assert expected_key_file.name == "hs_ed25519_secret_key" + assert expected_key_file.parent == paths.TOR_HIDDEN_SERVICE_DIR