fieldwitness/tests/test_key_rotation.py
Aaron D. Lee 51c9b0a99a Fix 14 bugs and add features from power-user security audit
Critical fixes:
- Fix admin_delete_user missing current_user_id argument (TypeError on every delete)
- Fix self-signed cert OOM: bytes(2130706433) → IPv4Address("127.0.0.1")
- Add @login_required to attestation routes (attest, log); verify stays public
- Add auth guards to fieldkit (@admin_required on killswitch) and keys blueprints
- Fix cleanup_temp_files NameError in generate() route

Security hardening:
- Unify temp storage to ~/.soosef/temp/ so killswitch purge covers web uploads
- Replace Path.unlink() with secure deletion (shred fallback) in temp_storage
- Add structured audit log (audit.jsonl) for admin, key, and killswitch actions

New features:
- Dead man's switch background enforcement thread in serve + check-deadman CLI
- Key rotation: soosef keys rotate-identity/rotate-channel with archiving
- Batch attestation: soosef attest batch <dir> with progress and error handling
- Geofence CLI: set/check/clear commands with config persistence
- USB CLI: snapshot/check commands against device whitelist
- Verification receipt download (/verify/receipt JSON endpoint + UI button)
- IdentityInfo.created_at populated from sidecar meta.json (mtime fallback)

Data layer:
- ChainStore.get() now O(1) via byte-offset index built during state rebuild
- Add federation module (chain, models, serialization, entropy)

Includes 45+ new tests across chain, deadman, key rotation, killswitch, and
serialization modules.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 17:06:33 -04:00

329 lines
11 KiB
Python

"""Tests for key rotation (rotate_identity, rotate_channel_key)."""
from __future__ import annotations
from pathlib import Path
import pytest
from click.testing import CliRunner
import soosef.paths as _paths
from soosef.cli import main
from soosef.exceptions import KeystoreError
from soosef.keystore.manager import KeystoreManager
from soosef.keystore.models import RotationResult
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_manager(tmp_path: Path) -> KeystoreManager:
"""Return a KeystoreManager pointing at isolated temp directories."""
identity_dir = tmp_path / "identity"
channel_key_file = tmp_path / "stegasoo" / "channel.key"
return KeystoreManager(identity_dir=identity_dir, channel_key_file=channel_key_file)
# ---------------------------------------------------------------------------
# rotate_identity
# ---------------------------------------------------------------------------
class TestRotateIdentity:
def test_raises_when_no_identity(self, tmp_path: Path):
ks = _make_manager(tmp_path)
with pytest.raises(KeystoreError, match="No identity to rotate"):
ks.rotate_identity()
def test_returns_rotation_result(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
assert isinstance(result, RotationResult)
assert result.old_fingerprint
assert result.new_fingerprint
assert result.old_fingerprint != result.new_fingerprint
def test_old_keys_archived(self, tmp_path: Path):
ks = _make_manager(tmp_path)
original = ks.generate_identity()
result = ks.rotate_identity()
archive = result.archive_path
assert archive.is_dir()
assert (archive / "private.pem").exists()
assert (archive / "public.pem").exists()
assert (archive / "rotation.txt").exists()
# The archived public key must correspond to the *old* fingerprint
import hashlib
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
load_pem_public_key,
)
pub_pem = (archive / "public.pem").read_bytes()
pub_key = load_pem_public_key(pub_pem)
pub_raw = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
archived_fp = hashlib.sha256(pub_raw).hexdigest()[:32]
assert archived_fp == original.fingerprint
def test_archive_private_key_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
priv = result.archive_path / "private.pem"
assert oct(priv.stat().st_mode & 0o777) == oct(0o600)
def test_archive_dir_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
assert oct(result.archive_path.stat().st_mode & 0o777) == oct(0o700)
def test_new_identity_active_after_rotation(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
current = ks.get_identity()
assert current.fingerprint == result.new_fingerprint
def test_rotation_txt_contains_old_fingerprint(self, tmp_path: Path):
ks = _make_manager(tmp_path)
original = ks.generate_identity()
result = ks.rotate_identity()
txt = (result.archive_path / "rotation.txt").read_text()
assert original.fingerprint in txt
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
r1 = ks.rotate_identity()
r2 = ks.rotate_identity()
assert r1.archive_path != r2.archive_path
assert r1.archive_path.is_dir()
assert r2.archive_path.is_dir()
def test_rotation_fingerprints_are_distinct_from_each_other(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
r1 = ks.rotate_identity()
r2 = ks.rotate_identity()
# Each rotation produces a unique new key
assert r1.new_fingerprint == r2.old_fingerprint
assert r1.new_fingerprint != r2.new_fingerprint
# ---------------------------------------------------------------------------
# rotate_channel_key
# ---------------------------------------------------------------------------
class TestRotateChannelKey:
def test_raises_when_no_channel_key(self, tmp_path: Path):
ks = _make_manager(tmp_path)
with pytest.raises(KeystoreError, match="No channel key to rotate"):
ks.rotate_channel_key()
def test_raises_for_env_var_key(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("STEGASOO_CHANNEL_KEY", "a" * 64)
ks = _make_manager(tmp_path)
# has_channel_key() returns True (env), but file doesn't exist
with pytest.raises(KeystoreError, match="environment variable"):
ks.rotate_channel_key()
def test_returns_rotation_result(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
assert isinstance(result, RotationResult)
assert result.old_fingerprint
assert result.new_fingerprint
assert result.old_fingerprint != result.new_fingerprint
def test_old_key_archived(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
archive = result.archive_path
assert archive.is_dir()
assert (archive / "channel.key").exists()
assert (archive / "rotation.txt").exists()
def test_archive_channel_key_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
key_file = result.archive_path / "channel.key"
assert oct(key_file.stat().st_mode & 0o777) == oct(0o600)
def test_archived_key_matches_old_fingerprint(self, tmp_path: Path):
from stegasoo.crypto import get_channel_fingerprint
ks = _make_manager(tmp_path)
ks.generate_channel_key()
# Get fingerprint before rotation
old_key = ks.get_channel_key()
old_fp = get_channel_fingerprint(old_key)
result = ks.rotate_channel_key()
archived_key = (result.archive_path / "channel.key").read_text().strip()
assert get_channel_fingerprint(archived_key) == old_fp
assert old_fp == result.old_fingerprint
def test_new_channel_key_active_after_rotation(self, tmp_path: Path):
from stegasoo.crypto import get_channel_fingerprint
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
current_key = ks.get_channel_key()
current_fp = get_channel_fingerprint(current_key)
assert current_fp == result.new_fingerprint
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
r1 = ks.rotate_channel_key()
r2 = ks.rotate_channel_key()
assert r1.archive_path != r2.archive_path
assert (r1.archive_path / "channel.key").exists()
assert (r2.archive_path / "channel.key").exists()
# ---------------------------------------------------------------------------
# CLI integration
# ---------------------------------------------------------------------------
class TestRotateCLI:
def _init_soosef(self, tmp_path: Path) -> Path:
"""Create the minimal directory + key structure for CLI tests.
Temporarily sets paths.BASE_DIR so the lazy-resolved KeystoreManager
writes keys to the same location the CLI will read from when invoked
with --data-dir pointing at the same directory.
"""
data_dir = tmp_path / ".soosef"
original_base = _paths.BASE_DIR
try:
_paths.BASE_DIR = data_dir
ks = KeystoreManager() # uses lazy _paths resolution
ks.generate_identity()
ks.generate_channel_key()
finally:
_paths.BASE_DIR = original_base
return data_dir
def test_rotate_identity_cli_success(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "rotated successfully" in result.output
assert "Old fingerprint:" in result.output
assert "New fingerprint:" in result.output
assert "IMPORTANT:" in result.output
def test_rotate_identity_cli_no_identity(self, tmp_path: Path):
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
)
assert result.exit_code != 0
assert "Error" in result.output
def test_rotate_channel_cli_success(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "rotated successfully" in result.output
assert "Old fingerprint:" in result.output
assert "New fingerprint:" in result.output
assert "IMPORTANT:" in result.output
def test_rotate_channel_cli_no_key(self, tmp_path: Path):
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
)
assert result.exit_code != 0
assert "Error" in result.output
def test_rotate_identity_aborts_without_confirmation(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
# Simulate the user typing "n" at the confirmation prompt
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity"],
input="n\n",
)
assert result.exit_code != 0
def test_rotate_channel_aborts_without_confirmation(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel"],
input="n\n",
)
assert result.exit_code != 0