Complete project rebrand for better positioning in the press freedom and digital security space. FieldWitness communicates both field deployment and evidence testimony — appropriate for the target audience of journalists, NGOs, and human rights organizations. Rename mapping: - soosef → fieldwitness (package, CLI, all imports) - soosef.stegasoo → fieldwitness.stego - soosef.verisoo → fieldwitness.attest - ~/.soosef/ → ~/.fwmetadata/ (innocuous data dir name) - SOOSEF_DATA_DIR → FIELDWITNESS_DATA_DIR - SoosefConfig → FieldWitnessConfig - SoosefError → FieldWitnessError Also includes: - License switch from MIT to GPL-3.0 - C2PA bridge module (Phase 0-2 MVP): cert.py, export.py, vendor_assertions.py - README repositioned to lead with provenance/federation, stego backgrounded - Threat model skeleton at docs/security/threat-model.md - Planning docs: docs/planning/c2pa-integration.md, docs/planning/gtm-feasibility.md Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
329 lines
11 KiB
Python
329 lines
11 KiB
Python
"""Tests for key rotation (rotate_identity, rotate_channel_key)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from click.testing import CliRunner
|
|
|
|
import fieldwitness.paths as _paths
|
|
from fieldwitness.cli import main
|
|
from fieldwitness.exceptions import KeystoreError
|
|
from fieldwitness.keystore.manager import KeystoreManager
|
|
from fieldwitness.keystore.models import RotationResult
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_manager(tmp_path: Path) -> KeystoreManager:
|
|
"""Return a KeystoreManager pointing at isolated temp directories."""
|
|
identity_dir = tmp_path / "identity"
|
|
channel_key_file = tmp_path / "stego" / "channel.key"
|
|
return KeystoreManager(identity_dir=identity_dir, channel_key_file=channel_key_file)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# rotate_identity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRotateIdentity:
|
|
def test_raises_when_no_identity(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
with pytest.raises(KeystoreError, match="No identity to rotate"):
|
|
ks.rotate_identity()
|
|
|
|
def test_returns_rotation_result(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_identity()
|
|
|
|
result = ks.rotate_identity()
|
|
|
|
assert isinstance(result, RotationResult)
|
|
assert result.old_fingerprint
|
|
assert result.new_fingerprint
|
|
assert result.old_fingerprint != result.new_fingerprint
|
|
|
|
def test_old_keys_archived(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
original = ks.generate_identity()
|
|
|
|
result = ks.rotate_identity()
|
|
|
|
archive = result.archive_path
|
|
assert archive.is_dir()
|
|
assert (archive / "private.pem").exists()
|
|
assert (archive / "public.pem").exists()
|
|
assert (archive / "rotation.txt").exists()
|
|
|
|
# The archived public key must correspond to the *old* fingerprint
|
|
import hashlib
|
|
|
|
from cryptography.hazmat.primitives.serialization import (
|
|
Encoding,
|
|
PublicFormat,
|
|
load_pem_public_key,
|
|
)
|
|
|
|
pub_pem = (archive / "public.pem").read_bytes()
|
|
pub_key = load_pem_public_key(pub_pem)
|
|
pub_raw = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
|
|
archived_fp = hashlib.sha256(pub_raw).hexdigest()[:32]
|
|
assert archived_fp == original.fingerprint
|
|
|
|
def test_archive_private_key_permissions(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_identity()
|
|
|
|
result = ks.rotate_identity()
|
|
|
|
priv = result.archive_path / "private.pem"
|
|
assert oct(priv.stat().st_mode & 0o777) == oct(0o600)
|
|
|
|
def test_archive_dir_permissions(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_identity()
|
|
|
|
result = ks.rotate_identity()
|
|
|
|
assert oct(result.archive_path.stat().st_mode & 0o777) == oct(0o700)
|
|
|
|
def test_new_identity_active_after_rotation(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_identity()
|
|
|
|
result = ks.rotate_identity()
|
|
|
|
current = ks.get_identity()
|
|
assert current.fingerprint == result.new_fingerprint
|
|
|
|
def test_rotation_txt_contains_old_fingerprint(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
original = ks.generate_identity()
|
|
|
|
result = ks.rotate_identity()
|
|
|
|
txt = (result.archive_path / "rotation.txt").read_text()
|
|
assert original.fingerprint in txt
|
|
|
|
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_identity()
|
|
|
|
r1 = ks.rotate_identity()
|
|
r2 = ks.rotate_identity()
|
|
|
|
assert r1.archive_path != r2.archive_path
|
|
assert r1.archive_path.is_dir()
|
|
assert r2.archive_path.is_dir()
|
|
|
|
def test_rotation_fingerprints_are_distinct_from_each_other(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_identity()
|
|
|
|
r1 = ks.rotate_identity()
|
|
r2 = ks.rotate_identity()
|
|
|
|
# Each rotation produces a unique new key
|
|
assert r1.new_fingerprint == r2.old_fingerprint
|
|
assert r1.new_fingerprint != r2.new_fingerprint
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# rotate_channel_key
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRotateChannelKey:
|
|
def test_raises_when_no_channel_key(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
with pytest.raises(KeystoreError, match="No channel key to rotate"):
|
|
ks.rotate_channel_key()
|
|
|
|
def test_raises_for_env_var_key(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
|
|
monkeypatch.setenv("FIELDWITNESS_CHANNEL_KEY", "a" * 64)
|
|
ks = _make_manager(tmp_path)
|
|
# has_channel_key() returns True (env), but file doesn't exist
|
|
with pytest.raises(KeystoreError, match="environment variable"):
|
|
ks.rotate_channel_key()
|
|
|
|
def test_returns_rotation_result(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_channel_key()
|
|
|
|
result = ks.rotate_channel_key()
|
|
|
|
assert isinstance(result, RotationResult)
|
|
assert result.old_fingerprint
|
|
assert result.new_fingerprint
|
|
assert result.old_fingerprint != result.new_fingerprint
|
|
|
|
def test_old_key_archived(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_channel_key()
|
|
|
|
result = ks.rotate_channel_key()
|
|
|
|
archive = result.archive_path
|
|
assert archive.is_dir()
|
|
assert (archive / "channel.key").exists()
|
|
assert (archive / "rotation.txt").exists()
|
|
|
|
def test_archive_channel_key_permissions(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_channel_key()
|
|
|
|
result = ks.rotate_channel_key()
|
|
|
|
key_file = result.archive_path / "channel.key"
|
|
assert oct(key_file.stat().st_mode & 0o777) == oct(0o600)
|
|
|
|
def test_archived_key_matches_old_fingerprint(self, tmp_path: Path):
|
|
from fieldwitness.stego.crypto import get_channel_fingerprint
|
|
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_channel_key()
|
|
# Get fingerprint before rotation
|
|
old_key = ks.get_channel_key()
|
|
old_fp = get_channel_fingerprint(old_key)
|
|
|
|
result = ks.rotate_channel_key()
|
|
|
|
archived_key = (result.archive_path / "channel.key").read_text().strip()
|
|
assert get_channel_fingerprint(archived_key) == old_fp
|
|
assert old_fp == result.old_fingerprint
|
|
|
|
def test_new_channel_key_active_after_rotation(self, tmp_path: Path):
|
|
from fieldwitness.stego.crypto import get_channel_fingerprint
|
|
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_channel_key()
|
|
|
|
result = ks.rotate_channel_key()
|
|
|
|
current_key = ks.get_channel_key()
|
|
current_fp = get_channel_fingerprint(current_key)
|
|
assert current_fp == result.new_fingerprint
|
|
|
|
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
|
|
ks = _make_manager(tmp_path)
|
|
ks.generate_channel_key()
|
|
|
|
r1 = ks.rotate_channel_key()
|
|
r2 = ks.rotate_channel_key()
|
|
|
|
assert r1.archive_path != r2.archive_path
|
|
assert (r1.archive_path / "channel.key").exists()
|
|
assert (r2.archive_path / "channel.key").exists()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI integration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRotateCLI:
|
|
def _init_fieldwitness(self, tmp_path: Path) -> Path:
|
|
"""Create the minimal directory + key structure for CLI tests.
|
|
|
|
Temporarily sets paths.BASE_DIR so the lazy-resolved KeystoreManager
|
|
writes keys to the same location the CLI will read from when invoked
|
|
with --data-dir pointing at the same directory.
|
|
"""
|
|
data_dir = tmp_path / ".fieldwitness"
|
|
original_base = _paths.BASE_DIR
|
|
try:
|
|
_paths.BASE_DIR = data_dir
|
|
ks = KeystoreManager() # uses lazy _paths resolution
|
|
ks.generate_identity()
|
|
ks.generate_channel_key()
|
|
finally:
|
|
_paths.BASE_DIR = original_base
|
|
return data_dir
|
|
|
|
def test_rotate_identity_cli_success(self, tmp_path: Path):
|
|
data_dir = self._init_fieldwitness(tmp_path)
|
|
runner = CliRunner()
|
|
|
|
result = runner.invoke(
|
|
main,
|
|
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
|
|
catch_exceptions=False,
|
|
)
|
|
|
|
assert result.exit_code == 0, result.output
|
|
assert "rotated successfully" in result.output
|
|
assert "Old fingerprint:" in result.output
|
|
assert "New fingerprint:" in result.output
|
|
assert "IMPORTANT:" in result.output
|
|
|
|
def test_rotate_identity_cli_no_identity(self, tmp_path: Path):
|
|
data_dir = tmp_path / ".fieldwitness"
|
|
data_dir.mkdir()
|
|
runner = CliRunner()
|
|
|
|
result = runner.invoke(
|
|
main,
|
|
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
|
|
)
|
|
|
|
assert result.exit_code != 0
|
|
assert "Error" in result.output
|
|
|
|
def test_rotate_channel_cli_success(self, tmp_path: Path):
|
|
data_dir = self._init_fieldwitness(tmp_path)
|
|
runner = CliRunner()
|
|
|
|
result = runner.invoke(
|
|
main,
|
|
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
|
|
catch_exceptions=False,
|
|
)
|
|
|
|
assert result.exit_code == 0, result.output
|
|
assert "rotated successfully" in result.output
|
|
assert "Old fingerprint:" in result.output
|
|
assert "New fingerprint:" in result.output
|
|
assert "IMPORTANT:" in result.output
|
|
|
|
def test_rotate_channel_cli_no_key(self, tmp_path: Path):
|
|
data_dir = tmp_path / ".fieldwitness"
|
|
data_dir.mkdir()
|
|
runner = CliRunner()
|
|
|
|
result = runner.invoke(
|
|
main,
|
|
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
|
|
)
|
|
|
|
assert result.exit_code != 0
|
|
assert "Error" in result.output
|
|
|
|
def test_rotate_identity_aborts_without_confirmation(self, tmp_path: Path):
|
|
data_dir = self._init_fieldwitness(tmp_path)
|
|
runner = CliRunner()
|
|
|
|
# Simulate the user typing "n" at the confirmation prompt
|
|
result = runner.invoke(
|
|
main,
|
|
["--data-dir", str(data_dir), "keys", "rotate-identity"],
|
|
input="n\n",
|
|
)
|
|
|
|
assert result.exit_code != 0
|
|
|
|
def test_rotate_channel_aborts_without_confirmation(self, tmp_path: Path):
|
|
data_dir = self._init_fieldwitness(tmp_path)
|
|
runner = CliRunner()
|
|
|
|
result = runner.invoke(
|
|
main,
|
|
["--data-dir", str(data_dir), "keys", "rotate-channel"],
|
|
input="n\n",
|
|
)
|
|
|
|
assert result.exit_code != 0
|