"""Tests for key rotation (rotate_identity, rotate_channel_key).""" from __future__ import annotations from pathlib import Path import pytest from click.testing import CliRunner import fieldwitness.paths as _paths from fieldwitness.cli import main from fieldwitness.exceptions import KeystoreError from fieldwitness.keystore.manager import KeystoreManager from fieldwitness.keystore.models import RotationResult # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_manager(tmp_path: Path) -> KeystoreManager: """Return a KeystoreManager pointing at isolated temp directories.""" identity_dir = tmp_path / "identity" channel_key_file = tmp_path / "stego" / "channel.key" return KeystoreManager(identity_dir=identity_dir, channel_key_file=channel_key_file) # --------------------------------------------------------------------------- # rotate_identity # --------------------------------------------------------------------------- class TestRotateIdentity: def test_raises_when_no_identity(self, tmp_path: Path): ks = _make_manager(tmp_path) with pytest.raises(KeystoreError, match="No identity to rotate"): ks.rotate_identity() def test_returns_rotation_result(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_identity() result = ks.rotate_identity() assert isinstance(result, RotationResult) assert result.old_fingerprint assert result.new_fingerprint assert result.old_fingerprint != result.new_fingerprint def test_old_keys_archived(self, tmp_path: Path): ks = _make_manager(tmp_path) original = ks.generate_identity() result = ks.rotate_identity() archive = result.archive_path assert archive.is_dir() assert (archive / "private.pem").exists() assert (archive / "public.pem").exists() assert (archive / "rotation.txt").exists() # The archived public key must correspond to the *old* fingerprint import hashlib from cryptography.hazmat.primitives.serialization import ( Encoding, PublicFormat, load_pem_public_key, ) pub_pem = (archive / "public.pem").read_bytes() pub_key = load_pem_public_key(pub_pem) pub_raw = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw) archived_fp = hashlib.sha256(pub_raw).hexdigest()[:32] assert archived_fp == original.fingerprint def test_archive_private_key_permissions(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_identity() result = ks.rotate_identity() priv = result.archive_path / "private.pem" assert oct(priv.stat().st_mode & 0o777) == oct(0o600) def test_archive_dir_permissions(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_identity() result = ks.rotate_identity() assert oct(result.archive_path.stat().st_mode & 0o777) == oct(0o700) def test_new_identity_active_after_rotation(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_identity() result = ks.rotate_identity() current = ks.get_identity() assert current.fingerprint == result.new_fingerprint def test_rotation_txt_contains_old_fingerprint(self, tmp_path: Path): ks = _make_manager(tmp_path) original = ks.generate_identity() result = ks.rotate_identity() txt = (result.archive_path / "rotation.txt").read_text() assert original.fingerprint in txt def test_multiple_rotations_create_separate_archives(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_identity() r1 = ks.rotate_identity() r2 = ks.rotate_identity() assert r1.archive_path != r2.archive_path assert r1.archive_path.is_dir() assert r2.archive_path.is_dir() def test_rotation_fingerprints_are_distinct_from_each_other(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_identity() r1 = ks.rotate_identity() r2 = ks.rotate_identity() # Each rotation produces a unique new key assert r1.new_fingerprint == r2.old_fingerprint assert r1.new_fingerprint != r2.new_fingerprint # --------------------------------------------------------------------------- # rotate_channel_key # --------------------------------------------------------------------------- class TestRotateChannelKey: def test_raises_when_no_channel_key(self, tmp_path: Path): ks = _make_manager(tmp_path) with pytest.raises(KeystoreError, match="No channel key to rotate"): ks.rotate_channel_key() def test_raises_for_env_var_key(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv("FIELDWITNESS_CHANNEL_KEY", "a" * 64) ks = _make_manager(tmp_path) # has_channel_key() returns True (env), but file doesn't exist with pytest.raises(KeystoreError, match="environment variable"): ks.rotate_channel_key() def test_returns_rotation_result(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_channel_key() result = ks.rotate_channel_key() assert isinstance(result, RotationResult) assert result.old_fingerprint assert result.new_fingerprint assert result.old_fingerprint != result.new_fingerprint def test_old_key_archived(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_channel_key() result = ks.rotate_channel_key() archive = result.archive_path assert archive.is_dir() assert (archive / "channel.key").exists() assert (archive / "rotation.txt").exists() def test_archive_channel_key_permissions(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_channel_key() result = ks.rotate_channel_key() key_file = result.archive_path / "channel.key" assert oct(key_file.stat().st_mode & 0o777) == oct(0o600) def test_archived_key_matches_old_fingerprint(self, tmp_path: Path): from fieldwitness.stego.crypto import get_channel_fingerprint ks = _make_manager(tmp_path) ks.generate_channel_key() # Get fingerprint before rotation old_key = ks.get_channel_key() old_fp = get_channel_fingerprint(old_key) result = ks.rotate_channel_key() archived_key = (result.archive_path / "channel.key").read_text().strip() assert get_channel_fingerprint(archived_key) == old_fp assert old_fp == result.old_fingerprint def test_new_channel_key_active_after_rotation(self, tmp_path: Path): from fieldwitness.stego.crypto import get_channel_fingerprint ks = _make_manager(tmp_path) ks.generate_channel_key() result = ks.rotate_channel_key() current_key = ks.get_channel_key() current_fp = get_channel_fingerprint(current_key) assert current_fp == result.new_fingerprint def test_multiple_rotations_create_separate_archives(self, tmp_path: Path): ks = _make_manager(tmp_path) ks.generate_channel_key() r1 = ks.rotate_channel_key() r2 = ks.rotate_channel_key() assert r1.archive_path != r2.archive_path assert (r1.archive_path / "channel.key").exists() assert (r2.archive_path / "channel.key").exists() # --------------------------------------------------------------------------- # CLI integration # --------------------------------------------------------------------------- class TestRotateCLI: def _init_fieldwitness(self, tmp_path: Path) -> Path: """Create the minimal directory + key structure for CLI tests. Temporarily sets paths.BASE_DIR so the lazy-resolved KeystoreManager writes keys to the same location the CLI will read from when invoked with --data-dir pointing at the same directory. """ data_dir = tmp_path / ".fieldwitness" original_base = _paths.BASE_DIR try: _paths.BASE_DIR = data_dir ks = KeystoreManager() # uses lazy _paths resolution ks.generate_identity() ks.generate_channel_key() finally: _paths.BASE_DIR = original_base return data_dir def test_rotate_identity_cli_success(self, tmp_path: Path): data_dir = self._init_fieldwitness(tmp_path) runner = CliRunner() result = runner.invoke( main, ["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert "rotated successfully" in result.output assert "Old fingerprint:" in result.output assert "New fingerprint:" in result.output assert "IMPORTANT:" in result.output def test_rotate_identity_cli_no_identity(self, tmp_path: Path): data_dir = tmp_path / ".fieldwitness" data_dir.mkdir() runner = CliRunner() result = runner.invoke( main, ["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"], ) assert result.exit_code != 0 assert "Error" in result.output def test_rotate_channel_cli_success(self, tmp_path: Path): data_dir = self._init_fieldwitness(tmp_path) runner = CliRunner() result = runner.invoke( main, ["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert "rotated successfully" in result.output assert "Old fingerprint:" in result.output assert "New fingerprint:" in result.output assert "IMPORTANT:" in result.output def test_rotate_channel_cli_no_key(self, tmp_path: Path): data_dir = tmp_path / ".fieldwitness" data_dir.mkdir() runner = CliRunner() result = runner.invoke( main, ["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"], ) assert result.exit_code != 0 assert "Error" in result.output def test_rotate_identity_aborts_without_confirmation(self, tmp_path: Path): data_dir = self._init_fieldwitness(tmp_path) runner = CliRunner() # Simulate the user typing "n" at the confirmation prompt result = runner.invoke( main, ["--data-dir", str(data_dir), "keys", "rotate-identity"], input="n\n", ) assert result.exit_code != 0 def test_rotate_channel_aborts_without_confirmation(self, tmp_path: Path): data_dir = self._init_fieldwitness(tmp_path) runner = CliRunner() result = runner.invoke( main, ["--data-dir", str(data_dir), "keys", "rotate-channel"], input="n\n", ) assert result.exit_code != 0