fieldwitness/tests/test_key_rotation.py
Aaron D. Lee e3bc1cce1f Consolidate stegasoo and verisoo into soosef monorepo
Merge stegasoo (v4.3.0, steganography) and verisoo (v0.1.0, attestation)
as subpackages under soosef.stegasoo and soosef.verisoo. This eliminates
cross-repo coordination and enables atomic changes across the full stack.

- Copy stegasoo (34 modules) and verisoo (15 modules) into src/soosef/
- Convert all verisoo absolute imports to relative imports
- Rewire ~50 import sites across soosef code (cli, web, keystore, tests)
- Replace stegasoo/verisoo pip deps with inlined code + pip extras
  (stego-dct, stego-audio, attest, web, api, cli, fieldkit, all, dev)
- Add _availability.py for runtime feature detection
- Add unified FastAPI mount point at soosef.api
- Copy and adapt tests from both repos (155 pass, 1 skip)
- Drop standalone CLI/web frontends; keep FastAPI as optional modules
- Both source repos tagged pre-monorepo-consolidation on GitHub

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 19:06:14 -04:00

329 lines
11 KiB
Python

"""Tests for key rotation (rotate_identity, rotate_channel_key)."""
from __future__ import annotations
from pathlib import Path
import pytest
from click.testing import CliRunner
import soosef.paths as _paths
from soosef.cli import main
from soosef.exceptions import KeystoreError
from soosef.keystore.manager import KeystoreManager
from soosef.keystore.models import RotationResult
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_manager(tmp_path: Path) -> KeystoreManager:
"""Return a KeystoreManager pointing at isolated temp directories."""
identity_dir = tmp_path / "identity"
channel_key_file = tmp_path / "stegasoo" / "channel.key"
return KeystoreManager(identity_dir=identity_dir, channel_key_file=channel_key_file)
# ---------------------------------------------------------------------------
# rotate_identity
# ---------------------------------------------------------------------------
class TestRotateIdentity:
def test_raises_when_no_identity(self, tmp_path: Path):
ks = _make_manager(tmp_path)
with pytest.raises(KeystoreError, match="No identity to rotate"):
ks.rotate_identity()
def test_returns_rotation_result(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
assert isinstance(result, RotationResult)
assert result.old_fingerprint
assert result.new_fingerprint
assert result.old_fingerprint != result.new_fingerprint
def test_old_keys_archived(self, tmp_path: Path):
ks = _make_manager(tmp_path)
original = ks.generate_identity()
result = ks.rotate_identity()
archive = result.archive_path
assert archive.is_dir()
assert (archive / "private.pem").exists()
assert (archive / "public.pem").exists()
assert (archive / "rotation.txt").exists()
# The archived public key must correspond to the *old* fingerprint
import hashlib
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
load_pem_public_key,
)
pub_pem = (archive / "public.pem").read_bytes()
pub_key = load_pem_public_key(pub_pem)
pub_raw = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
archived_fp = hashlib.sha256(pub_raw).hexdigest()[:32]
assert archived_fp == original.fingerprint
def test_archive_private_key_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
priv = result.archive_path / "private.pem"
assert oct(priv.stat().st_mode & 0o777) == oct(0o600)
def test_archive_dir_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
assert oct(result.archive_path.stat().st_mode & 0o777) == oct(0o700)
def test_new_identity_active_after_rotation(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
current = ks.get_identity()
assert current.fingerprint == result.new_fingerprint
def test_rotation_txt_contains_old_fingerprint(self, tmp_path: Path):
ks = _make_manager(tmp_path)
original = ks.generate_identity()
result = ks.rotate_identity()
txt = (result.archive_path / "rotation.txt").read_text()
assert original.fingerprint in txt
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
r1 = ks.rotate_identity()
r2 = ks.rotate_identity()
assert r1.archive_path != r2.archive_path
assert r1.archive_path.is_dir()
assert r2.archive_path.is_dir()
def test_rotation_fingerprints_are_distinct_from_each_other(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
r1 = ks.rotate_identity()
r2 = ks.rotate_identity()
# Each rotation produces a unique new key
assert r1.new_fingerprint == r2.old_fingerprint
assert r1.new_fingerprint != r2.new_fingerprint
# ---------------------------------------------------------------------------
# rotate_channel_key
# ---------------------------------------------------------------------------
class TestRotateChannelKey:
def test_raises_when_no_channel_key(self, tmp_path: Path):
ks = _make_manager(tmp_path)
with pytest.raises(KeystoreError, match="No channel key to rotate"):
ks.rotate_channel_key()
def test_raises_for_env_var_key(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("STEGASOO_CHANNEL_KEY", "a" * 64)
ks = _make_manager(tmp_path)
# has_channel_key() returns True (env), but file doesn't exist
with pytest.raises(KeystoreError, match="environment variable"):
ks.rotate_channel_key()
def test_returns_rotation_result(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
assert isinstance(result, RotationResult)
assert result.old_fingerprint
assert result.new_fingerprint
assert result.old_fingerprint != result.new_fingerprint
def test_old_key_archived(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
archive = result.archive_path
assert archive.is_dir()
assert (archive / "channel.key").exists()
assert (archive / "rotation.txt").exists()
def test_archive_channel_key_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
key_file = result.archive_path / "channel.key"
assert oct(key_file.stat().st_mode & 0o777) == oct(0o600)
def test_archived_key_matches_old_fingerprint(self, tmp_path: Path):
from soosef.stegasoo.crypto import get_channel_fingerprint
ks = _make_manager(tmp_path)
ks.generate_channel_key()
# Get fingerprint before rotation
old_key = ks.get_channel_key()
old_fp = get_channel_fingerprint(old_key)
result = ks.rotate_channel_key()
archived_key = (result.archive_path / "channel.key").read_text().strip()
assert get_channel_fingerprint(archived_key) == old_fp
assert old_fp == result.old_fingerprint
def test_new_channel_key_active_after_rotation(self, tmp_path: Path):
from soosef.stegasoo.crypto import get_channel_fingerprint
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
current_key = ks.get_channel_key()
current_fp = get_channel_fingerprint(current_key)
assert current_fp == result.new_fingerprint
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
r1 = ks.rotate_channel_key()
r2 = ks.rotate_channel_key()
assert r1.archive_path != r2.archive_path
assert (r1.archive_path / "channel.key").exists()
assert (r2.archive_path / "channel.key").exists()
# ---------------------------------------------------------------------------
# CLI integration
# ---------------------------------------------------------------------------
class TestRotateCLI:
def _init_soosef(self, tmp_path: Path) -> Path:
"""Create the minimal directory + key structure for CLI tests.
Temporarily sets paths.BASE_DIR so the lazy-resolved KeystoreManager
writes keys to the same location the CLI will read from when invoked
with --data-dir pointing at the same directory.
"""
data_dir = tmp_path / ".soosef"
original_base = _paths.BASE_DIR
try:
_paths.BASE_DIR = data_dir
ks = KeystoreManager() # uses lazy _paths resolution
ks.generate_identity()
ks.generate_channel_key()
finally:
_paths.BASE_DIR = original_base
return data_dir
def test_rotate_identity_cli_success(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "rotated successfully" in result.output
assert "Old fingerprint:" in result.output
assert "New fingerprint:" in result.output
assert "IMPORTANT:" in result.output
def test_rotate_identity_cli_no_identity(self, tmp_path: Path):
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
)
assert result.exit_code != 0
assert "Error" in result.output
def test_rotate_channel_cli_success(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "rotated successfully" in result.output
assert "Old fingerprint:" in result.output
assert "New fingerprint:" in result.output
assert "IMPORTANT:" in result.output
def test_rotate_channel_cli_no_key(self, tmp_path: Path):
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
)
assert result.exit_code != 0
assert "Error" in result.output
def test_rotate_identity_aborts_without_confirmation(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
# Simulate the user typing "n" at the confirmation prompt
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity"],
input="n\n",
)
assert result.exit_code != 0
def test_rotate_channel_aborts_without_confirmation(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel"],
input="n\n",
)
assert result.exit_code != 0