Fix 14 bugs and add features from power-user security audit

Critical fixes:
- Fix admin_delete_user missing current_user_id argument (TypeError on every delete)
- Fix self-signed cert OOM: bytes(2130706433) → IPv4Address("127.0.0.1")
- Add @login_required to attestation routes (attest, log); verify stays public
- Add auth guards to fieldkit (@admin_required on killswitch) and keys blueprints
- Fix cleanup_temp_files NameError in generate() route

Security hardening:
- Unify temp storage to ~/.soosef/temp/ so killswitch purge covers web uploads
- Replace Path.unlink() with secure deletion (shred fallback) in temp_storage
- Add structured audit log (audit.jsonl) for admin, key, and killswitch actions

New features:
- Dead man's switch background enforcement thread in serve + check-deadman CLI
- Key rotation: soosef keys rotate-identity/rotate-channel with archiving
- Batch attestation: soosef attest batch <dir> with progress and error handling
- Geofence CLI: set/check/clear commands with config persistence
- USB CLI: snapshot/check commands against device whitelist
- Verification receipt download (/verify/receipt JSON endpoint + UI button)
- IdentityInfo.created_at populated from sidecar meta.json (mtime fallback)

Data layer:
- ChainStore.get() now O(1) via byte-offset index built during state rebuild
- Add federation module (chain, models, serialization, entropy)

Includes 45+ new tests across chain, deadman, key rotation, killswitch, and
serialization modules.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-04-01 17:06:33 -04:00
parent fb2e036e66
commit 51c9b0a99a
28 changed files with 3749 additions and 168 deletions

37
tests/conftest.py Normal file
View File

@@ -0,0 +1,37 @@
"""Shared test fixtures for SooSeF tests."""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
@pytest.fixture()
def tmp_soosef_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
"""Set SOOSEF_DATA_DIR to a temporary directory.
This must be used before importing any module that reads soosef.paths
at import time. For modules that read paths lazily (most of them),
monkeypatching the paths module directly is more reliable.
"""
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
monkeypatch.setenv("SOOSEF_DATA_DIR", str(data_dir))
return data_dir
@pytest.fixture()
def chain_dir(tmp_path: Path) -> Path:
"""A temporary chain directory."""
d = tmp_path / "chain"
d.mkdir()
return d
@pytest.fixture()
def private_key() -> Ed25519PrivateKey:
"""A fresh Ed25519 private key for testing."""
return Ed25519PrivateKey.generate()

287
tests/test_chain.py Normal file
View File

@@ -0,0 +1,287 @@
"""Tests for the attestation hash chain store."""
from __future__ import annotations
import hashlib
from pathlib import Path
import pytest
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from soosef.exceptions import ChainError, ChainIntegrityError
from soosef.federation.chain import ChainStore
from soosef.federation.models import ChainState
from soosef.federation.serialization import canonical_bytes, compute_record_hash
def test_genesis_record(chain_dir: Path, private_key: Ed25519PrivateKey):
"""First record has chain_index=0 and prev_hash=0x00*32."""
store = ChainStore(chain_dir)
content_hash = hashlib.sha256(b"genesis content").digest()
record = store.append(content_hash, "test/plain", private_key)
assert record.chain_index == 0
assert record.prev_hash == ChainState.GENESIS_PREV_HASH
assert record.version == 1
assert record.content_hash == content_hash
assert record.content_type == "test/plain"
assert len(record.record_id) == 16
assert len(record.signer_pubkey) == 32
assert len(record.signature) == 64
def test_chain_state_after_genesis(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is correctly initialized after the first record."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
state = store.state()
assert state is not None
assert state.record_count == 1
assert state.head_index == 0
assert state.head_hash == compute_record_hash(record)
assert state.chain_id == hashlib.sha256(canonical_bytes(record)).digest()
def test_chain_append_multiple(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Appending multiple records produces monotonically increasing indices."""
store = ChainStore(chain_dir)
records = []
for i in range(5):
content = hashlib.sha256(f"content-{i}".encode()).digest()
record = store.append(content, "test/plain", private_key)
records.append(record)
for i, record in enumerate(records):
assert record.chain_index == i
state = store.state()
assert state is not None
assert state.record_count == 5
assert state.head_index == 4
def test_chain_hash_linkage(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's prev_hash matches the hash of the previous record."""
store = ChainStore(chain_dir)
r0 = store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
r2 = store.append(hashlib.sha256(b"r2").digest(), "test/plain", private_key)
assert r1.prev_hash == compute_record_hash(r0)
assert r2.prev_hash == compute_record_hash(r1)
def test_signature_verification(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's Ed25519 signature is valid over canonical bytes."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
pub = Ed25519PublicKey.from_public_bytes(record.signer_pubkey)
# This will raise if invalid
pub.verify(record.signature, canonical_bytes(record))
def test_verify_chain_valid(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain returns True on a valid chain."""
store = ChainStore(chain_dir)
for i in range(5):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
assert store.verify_chain() is True
def test_verify_chain_detects_tamper(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain detects a tampered record in the middle."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
# Corrupt chain.bin by flipping a byte in the middle
chain_file = chain_dir / "chain.bin"
data = bytearray(chain_file.read_bytes())
midpoint = len(data) // 2
data[midpoint] ^= 0xFF
chain_file.write_bytes(bytes(data))
# Force state reload
store._state = None
with pytest.raises((ChainIntegrityError, ChainError)):
store.verify_chain()
def test_entropy_witnesses_populated(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Entropy witnesses are populated with non-trivial values."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
ew = record.entropy_witnesses
assert ew is not None
assert ew.sys_uptime > 0
assert len(ew.fs_snapshot) == 16
assert ew.proc_entropy > 0
assert len(ew.boot_id) > 0
def test_chain_persistence(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain survives close and reopen."""
store1 = ChainStore(chain_dir)
r0 = store1.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store1.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Open a new store instance (simulates process restart)
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
# Can read records back
loaded_r0 = store2.get(0)
assert loaded_r0.content_hash == r0.content_hash
assert loaded_r0.signature == r0.signature
def test_chain_state_rebuild(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is rebuilt from chain.bin if state.cbor is missing."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Delete state file
state_file = chain_dir / "state.cbor"
state_file.unlink()
# Reopen — should rebuild
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
def test_empty_chain(chain_dir: Path):
"""Empty chain reports correct state."""
store = ChainStore(chain_dir)
assert store.is_empty() is True
assert store.state() is None
assert store.head() is None
def test_get_nonexistent_index(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Accessing a nonexistent index raises ChainError."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"only").digest(), "test/plain", private_key)
with pytest.raises(ChainError, match="not found"):
store.get(99)
def test_iter_records_range(chain_dir: Path, private_key: Ed25519PrivateKey):
"""iter_records respects start and end bounds."""
store = ChainStore(chain_dir)
for i in range(10):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
records = list(store.iter_records(start=3, end=6))
assert len(records) == 4
assert records[0].chain_index == 3
assert records[-1].chain_index == 6
def test_metadata_in_chain(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Metadata is preserved through append and retrieval."""
store = ChainStore(chain_dir)
meta = {"caption": "evidence photo", "backfilled": True}
record = store.append(
hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta
)
loaded = store.get(0)
assert loaded.metadata == meta
def test_head_returns_latest(chain_dir: Path, private_key: Ed25519PrivateKey):
"""head() returns the most recently appended record."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
head = store.head()
assert head is not None
assert head.chain_index == 2
def test_verify_chain_detects_signer_change(chain_dir: Path):
"""verify_chain flags a different signer as integrity violation."""
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
# Manually bypass normal append to inject a record signed by key2.
# We need to build the record with correct prev_hash but wrong signer.
import struct
import fcntl
from soosef.federation.serialization import serialize_record
from soosef.federation.models import AttestationChainRecord
from soosef.federation.entropy import collect_entropy_witnesses
from uuid_utils import uuid7
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from soosef.federation.serialization import canonical_bytes as cb
state = store.state()
prev_hash = state.head_hash
pub2 = key2.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
entropy = collect_entropy_witnesses(chain_dir / "chain.bin")
record = AttestationChainRecord(
version=1,
record_id=uuid7().bytes,
chain_index=1,
prev_hash=prev_hash,
content_hash=hashlib.sha256(b"r1").digest(),
content_type="test/plain",
metadata={},
claimed_ts=int(__import__("time").time() * 1_000_000),
entropy_witnesses=entropy,
signer_pubkey=pub2,
signature=b"",
)
sig = key2.sign(cb(record))
record = AttestationChainRecord(
version=record.version,
record_id=record.record_id,
chain_index=record.chain_index,
prev_hash=record.prev_hash,
content_hash=record.content_hash,
content_type=record.content_type,
metadata=record.metadata,
claimed_ts=record.claimed_ts,
entropy_witnesses=record.entropy_witnesses,
signer_pubkey=record.signer_pubkey,
signature=sig,
)
record_bytes = serialize_record(record)
with open(chain_dir / "chain.bin", "ab") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(struct.pack(">I", len(record_bytes)))
f.write(record_bytes)
fcntl.flock(f, fcntl.LOCK_UN)
store._state = None
with pytest.raises(ChainIntegrityError, match="signer changed"):
store.verify_chain()

View File

@@ -0,0 +1,162 @@
"""Security-focused tests for the attestation chain.
Tests concurrent access, oversized records, and edge cases that could
compromise chain integrity.
"""
from __future__ import annotations
import hashlib
import struct
import threading
from pathlib import Path
import pytest
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from soosef.exceptions import ChainError
from soosef.federation.chain import ChainStore, MAX_RECORD_SIZE
from soosef.federation.serialization import compute_record_hash
def test_concurrent_append_no_fork(chain_dir: Path):
"""Concurrent appends must not fork the chain — indices must be unique."""
private_key = Ed25519PrivateKey.generate()
num_threads = 8
records_per_thread = 5
results: list[list] = [[] for _ in range(num_threads)]
errors: list[Exception] = []
def worker(thread_id: int):
try:
store = ChainStore(chain_dir)
for i in range(records_per_thread):
content = hashlib.sha256(f"t{thread_id}-r{i}".encode()).digest()
record = store.append(content, "test/plain", private_key)
results[thread_id].append(record.chain_index)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker, args=(t,)) for t in range(num_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Thread errors: {errors}"
# Collect all indices
all_indices = []
for r in results:
all_indices.extend(r)
# Every index must be unique (no fork)
assert len(all_indices) == len(set(all_indices)), (
f"Duplicate chain indices detected — chain forked! "
f"Indices: {sorted(all_indices)}"
)
# Indices should be 0..N-1 contiguous
total = num_threads * records_per_thread
assert sorted(all_indices) == list(range(total))
# Full chain verification should pass
store = ChainStore(chain_dir)
assert store.verify_chain() is True
assert store.state().record_count == total
def test_oversized_record_rejected(chain_dir: Path):
"""A corrupted length prefix exceeding MAX_RECORD_SIZE must raise ChainError."""
chain_file = chain_dir / "chain.bin"
# Write a length prefix claiming a 100 MB record
bogus_length = 100 * 1024 * 1024
chain_file.write_bytes(struct.pack(">I", bogus_length) + b"\x00" * 100)
store = ChainStore(chain_dir)
with pytest.raises(ChainError, match="exceeds maximum"):
list(store._iter_raw())
def test_max_record_size_boundary(chain_dir: Path):
"""Records at exactly MAX_RECORD_SIZE should be rejected (real records are <1KB)."""
chain_file = chain_dir / "chain.bin"
chain_file.write_bytes(struct.pack(">I", MAX_RECORD_SIZE + 1) + b"\x00" * 100)
store = ChainStore(chain_dir)
with pytest.raises(ChainError, match="exceeds maximum"):
list(store._iter_raw())
def test_truncated_chain_file(chain_dir: Path, private_key: Ed25519PrivateKey):
"""A truncated chain.bin still yields complete records before the truncation."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
# Truncate the file mid-record
chain_file = chain_dir / "chain.bin"
data = chain_file.read_bytes()
chain_file.write_bytes(data[:len(data) - 50])
store2 = ChainStore(chain_dir)
records = list(store2._iter_raw())
# Should get at least the first 2 complete records
assert len(records) >= 2
assert records[0].chain_index == 0
assert records[1].chain_index == 1
def test_empty_chain_file(chain_dir: Path):
"""An empty chain.bin (0 bytes) yields no records without error."""
chain_file = chain_dir / "chain.bin"
chain_file.write_bytes(b"")
store = ChainStore(chain_dir)
records = list(store._iter_raw())
assert records == []
def test_concurrent_read_during_write(chain_dir: Path):
"""Reading the chain while appending should not crash."""
private_key = Ed25519PrivateKey.generate()
store = ChainStore(chain_dir)
# Seed with some records
for i in range(5):
store.append(hashlib.sha256(f"seed-{i}".encode()).digest(), "test/plain", private_key)
read_errors: list[Exception] = []
write_errors: list[Exception] = []
def reader():
try:
s = ChainStore(chain_dir)
for _ in range(20):
list(s.iter_records())
except Exception as e:
read_errors.append(e)
def writer():
try:
s = ChainStore(chain_dir)
for i in range(10):
s.append(hashlib.sha256(f"w-{i}".encode()).digest(), "test/plain", private_key)
except Exception as e:
write_errors.append(e)
threads = [
threading.Thread(target=reader),
threading.Thread(target=reader),
threading.Thread(target=writer),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert not read_errors, f"Read errors during concurrent access: {read_errors}"
assert not write_errors, f"Write errors during concurrent access: {write_errors}"

View File

@@ -0,0 +1,292 @@
"""
Tests for dead man's switch background enforcement and CLI command.
Covers:
- _deadman_enforcement_loop: does not call execute_purge when not armed
- _deadman_enforcement_loop: calls execute_purge when armed and overdue
- _deadman_enforcement_loop: exits after firing so execute_purge is not called twice
- _start_deadman_thread: returns a live daemon thread
- check-deadman CLI: exits 0 when disarmed
- check-deadman CLI: exits 0 and prints OK when armed but not overdue
- check-deadman CLI: exits 0 and prints OVERDUE warning when past interval but in grace
- check-deadman CLI: exits 2 when fully expired (past interval + grace)
"""
from __future__ import annotations
import json
import time
from datetime import UTC, datetime, timedelta
from pathlib import Path
import pytest
from click.testing import CliRunner
# ── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture()
def soosef_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
"""Redirect soosef paths to a tmp directory."""
import soosef.paths as paths
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
monkeypatch.setattr(paths, "BASE_DIR", data_dir)
return data_dir
def _write_deadman_state(
state_file: Path,
*,
armed: bool,
last_checkin: datetime,
interval_hours: int = 24,
grace_hours: int = 2,
) -> None:
state_file.parent.mkdir(parents=True, exist_ok=True)
state = {
"armed": armed,
"last_checkin": last_checkin.isoformat(),
"interval_hours": interval_hours,
"grace_hours": grace_hours,
}
state_file.write_text(json.dumps(state))
# ── Unit tests: enforcement loop ─────────────────────────────────────────────
def test_enforcement_loop_no_op_when_disarmed(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Loop should not call check() when the switch is not armed."""
from soosef.cli import _deadman_enforcement_loop
from soosef.fieldkit import deadman as deadman_mod
# Redirect the module-level DEADMAN_STATE constant so DeadmanSwitch() default is our tmp file
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
check_calls = []
def fake_check(self):
check_calls.append("fired")
monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", fake_check)
iterations = [0]
def one_shot_sleep(n):
iterations[0] += 1
if iterations[0] >= 2:
raise StopIteration("stop test loop")
monkeypatch.setattr(time, "sleep", one_shot_sleep)
with pytest.raises(StopIteration):
_deadman_enforcement_loop(interval_seconds=0)
assert check_calls == []
def test_enforcement_loop_fires_when_overdue(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Loop must call DeadmanSwitch.check() when armed and past interval + grace."""
from soosef.cli import _deadman_enforcement_loop
from soosef.fieldkit import deadman as deadman_mod
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
last_checkin = datetime.now(UTC) - timedelta(hours=100)
_write_deadman_state(
state_file,
armed=True,
last_checkin=last_checkin,
interval_hours=24,
grace_hours=2,
)
check_calls = []
def fake_check(self):
check_calls.append("fired")
monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", fake_check)
monkeypatch.setattr(time, "sleep", lambda n: None)
_deadman_enforcement_loop(interval_seconds=0)
assert len(check_calls) == 1
def test_enforcement_loop_exits_after_firing(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""After firing, the loop must return and not call check() a second time."""
from soosef.cli import _deadman_enforcement_loop
from soosef.fieldkit import deadman as deadman_mod
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
last_checkin = datetime.now(UTC) - timedelta(hours=100)
_write_deadman_state(state_file, armed=True, last_checkin=last_checkin)
check_calls = []
def fake_check(self):
check_calls.append("fired")
monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", fake_check)
monkeypatch.setattr(time, "sleep", lambda n: None)
_deadman_enforcement_loop(interval_seconds=0)
# Called exactly once — loop exited after firing
assert len(check_calls) == 1
def test_enforcement_loop_tolerates_exceptions(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Transient errors in check() must not kill the loop."""
from soosef.cli import _deadman_enforcement_loop
from soosef.fieldkit import deadman as deadman_mod
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
call_count = [0]
def counting_sleep(n):
call_count[0] += 1
if call_count[0] >= 3:
raise StopIteration("stop test loop")
monkeypatch.setattr(time, "sleep", counting_sleep)
error_calls = [0]
def flaky_is_armed(self):
error_calls[0] += 1
if error_calls[0] == 1:
raise OSError("state file temporarily unreadable")
return False # not armed — loop just skips
monkeypatch.setattr(deadman_mod.DeadmanSwitch, "is_armed", flaky_is_armed)
with pytest.raises(StopIteration):
_deadman_enforcement_loop(interval_seconds=0)
# Should have survived the first exception and continued
assert call_count[0] >= 2
# ── Unit tests: _start_deadman_thread ────────────────────────────────────────
def test_start_deadman_thread_is_daemon(monkeypatch: pytest.MonkeyPatch):
"""Thread must be a daemon so it dies with the process."""
from soosef.cli import _start_deadman_thread
# Patch the loop to exit immediately so the thread doesn't hang in tests
import soosef.cli as cli_mod
monkeypatch.setattr(cli_mod, "_deadman_enforcement_loop", lambda interval_seconds: None)
t = _start_deadman_thread(interval_seconds=60)
assert t is not None
assert t.daemon is True
assert t.name == "deadman-enforcement"
t.join(timeout=2)
# ── CLI integration: check-deadman ───────────────────────────────────────────
@pytest.fixture()
def cli_runner():
return CliRunner()
def test_check_deadman_disarmed(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch):
"""check-deadman exits 0 and prints helpful message when not armed."""
from soosef.fieldkit import deadman as deadman_mod
from soosef.cli import main
# Point at an empty tmp dir so the real ~/.soosef/fieldkit/deadman.json isn't read
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
result = cli_runner.invoke(main, ["fieldkit", "check-deadman"])
assert result.exit_code == 0
assert "not armed" in result.output
def test_check_deadman_armed_ok(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch):
"""check-deadman exits 0 when armed and check-in is current."""
from soosef.fieldkit import deadman as deadman_mod
from soosef.cli import main
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
last_checkin = datetime.now(UTC) - timedelta(hours=1)
_write_deadman_state(
state_file,
armed=True,
last_checkin=last_checkin,
interval_hours=24,
grace_hours=2,
)
result = cli_runner.invoke(main, ["fieldkit", "check-deadman"])
assert result.exit_code == 0
assert "OK" in result.output
def test_check_deadman_overdue_in_grace(tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch):
"""check-deadman exits 0 but prints OVERDUE warning when past interval but in grace."""
from soosef.fieldkit import deadman as deadman_mod
from soosef.cli import main
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
# Past 24h interval but within 26h total (grace=2)
last_checkin = datetime.now(UTC) - timedelta(hours=25)
_write_deadman_state(
state_file,
armed=True,
last_checkin=last_checkin,
interval_hours=24,
grace_hours=2,
)
result = cli_runner.invoke(main, ["fieldkit", "check-deadman"])
# Not yet fired (grace not expired), so exit code is 0
assert result.exit_code == 0
assert "OVERDUE" in result.output
def test_check_deadman_fires_when_expired(
tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch
):
"""check-deadman exits 2 when the switch has fully expired."""
from soosef.fieldkit import deadman as deadman_mod
from soosef.cli import main
state_file = tmp_path / "deadman.json"
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
last_checkin = datetime.now(UTC) - timedelta(hours=100)
_write_deadman_state(
state_file,
armed=True,
last_checkin=last_checkin,
interval_hours=24,
grace_hours=2,
)
# Patch check() so we don't invoke the real killswitch during tests
monkeypatch.setattr(deadman_mod.DeadmanSwitch, "check", lambda self: None)
result = cli_runner.invoke(main, ["fieldkit", "check-deadman"])
assert result.exit_code == 2
assert "killswitch triggered" in result.output.lower() or "expired" in result.output.lower()

328
tests/test_key_rotation.py Normal file
View File

@@ -0,0 +1,328 @@
"""Tests for key rotation (rotate_identity, rotate_channel_key)."""
from __future__ import annotations
from pathlib import Path
import pytest
from click.testing import CliRunner
import soosef.paths as _paths
from soosef.cli import main
from soosef.exceptions import KeystoreError
from soosef.keystore.manager import KeystoreManager
from soosef.keystore.models import RotationResult
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_manager(tmp_path: Path) -> KeystoreManager:
"""Return a KeystoreManager pointing at isolated temp directories."""
identity_dir = tmp_path / "identity"
channel_key_file = tmp_path / "stegasoo" / "channel.key"
return KeystoreManager(identity_dir=identity_dir, channel_key_file=channel_key_file)
# ---------------------------------------------------------------------------
# rotate_identity
# ---------------------------------------------------------------------------
class TestRotateIdentity:
def test_raises_when_no_identity(self, tmp_path: Path):
ks = _make_manager(tmp_path)
with pytest.raises(KeystoreError, match="No identity to rotate"):
ks.rotate_identity()
def test_returns_rotation_result(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
assert isinstance(result, RotationResult)
assert result.old_fingerprint
assert result.new_fingerprint
assert result.old_fingerprint != result.new_fingerprint
def test_old_keys_archived(self, tmp_path: Path):
ks = _make_manager(tmp_path)
original = ks.generate_identity()
result = ks.rotate_identity()
archive = result.archive_path
assert archive.is_dir()
assert (archive / "private.pem").exists()
assert (archive / "public.pem").exists()
assert (archive / "rotation.txt").exists()
# The archived public key must correspond to the *old* fingerprint
import hashlib
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
load_pem_public_key,
)
pub_pem = (archive / "public.pem").read_bytes()
pub_key = load_pem_public_key(pub_pem)
pub_raw = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
archived_fp = hashlib.sha256(pub_raw).hexdigest()[:32]
assert archived_fp == original.fingerprint
def test_archive_private_key_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
priv = result.archive_path / "private.pem"
assert oct(priv.stat().st_mode & 0o777) == oct(0o600)
def test_archive_dir_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
assert oct(result.archive_path.stat().st_mode & 0o777) == oct(0o700)
def test_new_identity_active_after_rotation(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
result = ks.rotate_identity()
current = ks.get_identity()
assert current.fingerprint == result.new_fingerprint
def test_rotation_txt_contains_old_fingerprint(self, tmp_path: Path):
ks = _make_manager(tmp_path)
original = ks.generate_identity()
result = ks.rotate_identity()
txt = (result.archive_path / "rotation.txt").read_text()
assert original.fingerprint in txt
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
r1 = ks.rotate_identity()
r2 = ks.rotate_identity()
assert r1.archive_path != r2.archive_path
assert r1.archive_path.is_dir()
assert r2.archive_path.is_dir()
def test_rotation_fingerprints_are_distinct_from_each_other(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_identity()
r1 = ks.rotate_identity()
r2 = ks.rotate_identity()
# Each rotation produces a unique new key
assert r1.new_fingerprint == r2.old_fingerprint
assert r1.new_fingerprint != r2.new_fingerprint
# ---------------------------------------------------------------------------
# rotate_channel_key
# ---------------------------------------------------------------------------
class TestRotateChannelKey:
def test_raises_when_no_channel_key(self, tmp_path: Path):
ks = _make_manager(tmp_path)
with pytest.raises(KeystoreError, match="No channel key to rotate"):
ks.rotate_channel_key()
def test_raises_for_env_var_key(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("STEGASOO_CHANNEL_KEY", "a" * 64)
ks = _make_manager(tmp_path)
# has_channel_key() returns True (env), but file doesn't exist
with pytest.raises(KeystoreError, match="environment variable"):
ks.rotate_channel_key()
def test_returns_rotation_result(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
assert isinstance(result, RotationResult)
assert result.old_fingerprint
assert result.new_fingerprint
assert result.old_fingerprint != result.new_fingerprint
def test_old_key_archived(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
archive = result.archive_path
assert archive.is_dir()
assert (archive / "channel.key").exists()
assert (archive / "rotation.txt").exists()
def test_archive_channel_key_permissions(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
key_file = result.archive_path / "channel.key"
assert oct(key_file.stat().st_mode & 0o777) == oct(0o600)
def test_archived_key_matches_old_fingerprint(self, tmp_path: Path):
from stegasoo.crypto import get_channel_fingerprint
ks = _make_manager(tmp_path)
ks.generate_channel_key()
# Get fingerprint before rotation
old_key = ks.get_channel_key()
old_fp = get_channel_fingerprint(old_key)
result = ks.rotate_channel_key()
archived_key = (result.archive_path / "channel.key").read_text().strip()
assert get_channel_fingerprint(archived_key) == old_fp
assert old_fp == result.old_fingerprint
def test_new_channel_key_active_after_rotation(self, tmp_path: Path):
from stegasoo.crypto import get_channel_fingerprint
ks = _make_manager(tmp_path)
ks.generate_channel_key()
result = ks.rotate_channel_key()
current_key = ks.get_channel_key()
current_fp = get_channel_fingerprint(current_key)
assert current_fp == result.new_fingerprint
def test_multiple_rotations_create_separate_archives(self, tmp_path: Path):
ks = _make_manager(tmp_path)
ks.generate_channel_key()
r1 = ks.rotate_channel_key()
r2 = ks.rotate_channel_key()
assert r1.archive_path != r2.archive_path
assert (r1.archive_path / "channel.key").exists()
assert (r2.archive_path / "channel.key").exists()
# ---------------------------------------------------------------------------
# CLI integration
# ---------------------------------------------------------------------------
class TestRotateCLI:
def _init_soosef(self, tmp_path: Path) -> Path:
"""Create the minimal directory + key structure for CLI tests.
Temporarily sets paths.BASE_DIR so the lazy-resolved KeystoreManager
writes keys to the same location the CLI will read from when invoked
with --data-dir pointing at the same directory.
"""
data_dir = tmp_path / ".soosef"
original_base = _paths.BASE_DIR
try:
_paths.BASE_DIR = data_dir
ks = KeystoreManager() # uses lazy _paths resolution
ks.generate_identity()
ks.generate_channel_key()
finally:
_paths.BASE_DIR = original_base
return data_dir
def test_rotate_identity_cli_success(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "rotated successfully" in result.output
assert "Old fingerprint:" in result.output
assert "New fingerprint:" in result.output
assert "IMPORTANT:" in result.output
def test_rotate_identity_cli_no_identity(self, tmp_path: Path):
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity", "--yes"],
)
assert result.exit_code != 0
assert "Error" in result.output
def test_rotate_channel_cli_success(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "rotated successfully" in result.output
assert "Old fingerprint:" in result.output
assert "New fingerprint:" in result.output
assert "IMPORTANT:" in result.output
def test_rotate_channel_cli_no_key(self, tmp_path: Path):
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel", "--yes"],
)
assert result.exit_code != 0
assert "Error" in result.output
def test_rotate_identity_aborts_without_confirmation(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
# Simulate the user typing "n" at the confirmation prompt
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-identity"],
input="n\n",
)
assert result.exit_code != 0
def test_rotate_channel_aborts_without_confirmation(self, tmp_path: Path):
data_dir = self._init_soosef(tmp_path)
runner = CliRunner()
result = runner.invoke(
main,
["--data-dir", str(data_dir), "keys", "rotate-channel"],
input="n\n",
)
assert result.exit_code != 0

134
tests/test_killswitch.py Normal file
View File

@@ -0,0 +1,134 @@
"""Tests for killswitch — verifies emergency purge destroys all sensitive data."""
from __future__ import annotations
import hashlib
from pathlib import Path
import pytest
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
@pytest.fixture()
def populated_soosef(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
"""Create a populated ~/.soosef directory with identity, chain, attestations, etc."""
import soosef.paths as paths
data_dir = tmp_path / ".soosef"
data_dir.mkdir()
monkeypatch.setattr(paths, "BASE_DIR", data_dir)
# Create identity
identity_dir = data_dir / "identity"
identity_dir.mkdir()
key = Ed25519PrivateKey.generate()
priv_pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
(identity_dir / "private.pem").write_bytes(priv_pem)
pub_pem = key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
(identity_dir / "public.pem").write_bytes(pub_pem)
# Create channel key
stegasoo_dir = data_dir / "stegasoo"
stegasoo_dir.mkdir()
(stegasoo_dir / "channel.key").write_text("test-channel-key")
# Create chain data
from soosef.federation.chain import ChainStore
chain_dir = data_dir / "chain"
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", key)
# Create attestation dir with a dummy file
att_dir = data_dir / "attestations"
att_dir.mkdir()
(att_dir / "log.bin").write_bytes(b"dummy attestation data")
# Create other dirs
(data_dir / "auth").mkdir()
(data_dir / "auth" / "soosef.db").write_bytes(b"dummy db")
(data_dir / "temp").mkdir()
(data_dir / "temp" / "file.tmp").write_bytes(b"tmp")
(data_dir / "instance").mkdir()
(data_dir / "instance" / ".secret_key").write_bytes(b"secret")
(data_dir / "config.json").write_text("{}")
return data_dir
def test_purge_all_destroys_chain_data(populated_soosef: Path):
"""CRITICAL: execute_purge(ALL) must destroy chain directory."""
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
chain_dir = populated_soosef / "chain"
assert chain_dir.exists()
assert (chain_dir / "chain.bin").exists()
result = execute_purge(PurgeScope.ALL, reason="test")
assert not chain_dir.exists(), "Chain directory must be destroyed by killswitch"
assert "destroy_chain_data" in result.steps_completed
def test_purge_all_destroys_identity(populated_soosef: Path):
"""execute_purge(ALL) must destroy identity keys."""
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
assert (populated_soosef / "identity" / "private.pem").exists()
result = execute_purge(PurgeScope.ALL, reason="test")
assert not (populated_soosef / "identity").exists()
assert "destroy_identity_keys" in result.steps_completed
def test_purge_all_destroys_attestation_log(populated_soosef: Path):
"""execute_purge(ALL) must destroy the Verisoo attestation log."""
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
result = execute_purge(PurgeScope.ALL, reason="test")
assert not (populated_soosef / "attestations").exists()
assert "destroy_attestation_log" in result.steps_completed
def test_purge_keys_only_preserves_chain(populated_soosef: Path):
"""KEYS_ONLY purge destroys keys but preserves chain and attestation data."""
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
result = execute_purge(PurgeScope.KEYS_ONLY, reason="test")
# Keys gone
assert not (populated_soosef / "identity").exists()
assert "destroy_identity_keys" in result.steps_completed
# Chain and attestations preserved (KEYS_ONLY doesn't touch data)
assert (populated_soosef / "chain" / "chain.bin").exists()
assert (populated_soosef / "attestations" / "log.bin").exists()
def test_purge_reports_all_steps(populated_soosef: Path):
"""execute_purge(ALL) reports all expected steps including chain."""
from soosef.fieldkit.killswitch import PurgeScope, execute_purge
result = execute_purge(PurgeScope.ALL, reason="test")
expected_steps = [
"destroy_identity_keys",
"destroy_channel_key",
"destroy_flask_secret",
"destroy_auth_db",
"destroy_attestation_log",
"destroy_chain_data",
"destroy_temp_files",
"destroy_config",
]
for step in expected_steps:
assert step in result.steps_completed, f"Missing purge step: {step}"

123
tests/test_serialization.py Normal file
View File

@@ -0,0 +1,123 @@
"""Tests for CBOR serialization of chain records."""
from __future__ import annotations
import hashlib
from soosef.federation.models import AttestationChainRecord, ChainState, EntropyWitnesses
from soosef.federation.serialization import (
canonical_bytes,
compute_record_hash,
deserialize_record,
serialize_record,
)
def _make_record(**overrides) -> AttestationChainRecord:
"""Create a minimal test record with sensible defaults."""
defaults = {
"version": 1,
"record_id": b"\x01" * 16,
"chain_index": 0,
"prev_hash": ChainState.GENESIS_PREV_HASH,
"content_hash": hashlib.sha256(b"test content").digest(),
"content_type": "test/plain",
"metadata": {},
"claimed_ts": 1_700_000_000_000_000,
"entropy_witnesses": EntropyWitnesses(
sys_uptime=12345.678,
fs_snapshot=b"\xab" * 16,
proc_entropy=256,
boot_id="test-boot-id",
),
"signer_pubkey": b"\x02" * 32,
"signature": b"\x03" * 64,
}
defaults.update(overrides)
return AttestationChainRecord(**defaults)
def test_canonical_bytes_deterministic():
"""Same record always produces the same canonical bytes."""
record = _make_record()
b1 = canonical_bytes(record)
b2 = canonical_bytes(record)
assert b1 == b2
def test_canonical_bytes_excludes_signature():
"""Canonical bytes must not include the signature field."""
record_a = _make_record(signature=b"\x03" * 64)
record_b = _make_record(signature=b"\x04" * 64)
assert canonical_bytes(record_a) == canonical_bytes(record_b)
def test_canonical_bytes_sensitive_to_content():
"""Different content_hash must produce different canonical bytes."""
record_a = _make_record(content_hash=hashlib.sha256(b"a").digest())
record_b = _make_record(content_hash=hashlib.sha256(b"b").digest())
assert canonical_bytes(record_a) != canonical_bytes(record_b)
def test_serialize_deserialize_round_trip():
"""A record survives serialization and deserialization intact."""
original = _make_record()
data = serialize_record(original)
restored = deserialize_record(data)
assert restored.version == original.version
assert restored.record_id == original.record_id
assert restored.chain_index == original.chain_index
assert restored.prev_hash == original.prev_hash
assert restored.content_hash == original.content_hash
assert restored.content_type == original.content_type
assert restored.metadata == original.metadata
assert restored.claimed_ts == original.claimed_ts
assert restored.signer_pubkey == original.signer_pubkey
assert restored.signature == original.signature
# Entropy witnesses
assert restored.entropy_witnesses is not None
assert restored.entropy_witnesses.sys_uptime == original.entropy_witnesses.sys_uptime
assert restored.entropy_witnesses.fs_snapshot == original.entropy_witnesses.fs_snapshot
assert restored.entropy_witnesses.proc_entropy == original.entropy_witnesses.proc_entropy
assert restored.entropy_witnesses.boot_id == original.entropy_witnesses.boot_id
def test_serialize_includes_signature():
"""Full serialization must include the signature."""
record = _make_record(signature=b"\xaa" * 64)
data = serialize_record(record)
restored = deserialize_record(data)
assert restored.signature == b"\xaa" * 64
def test_compute_record_hash():
"""Record hash is SHA-256 of canonical bytes."""
record = _make_record()
expected = hashlib.sha256(canonical_bytes(record)).digest()
assert compute_record_hash(record) == expected
def test_record_hash_changes_with_content():
"""Different records produce different hashes."""
a = _make_record(content_hash=hashlib.sha256(b"a").digest())
b = _make_record(content_hash=hashlib.sha256(b"b").digest())
assert compute_record_hash(a) != compute_record_hash(b)
def test_metadata_preserved():
"""Arbitrary metadata survives round-trip."""
meta = {"backfilled": True, "caption": "test photo", "tags": ["evidence", "urgent"]}
record = _make_record(metadata=meta)
data = serialize_record(record)
restored = deserialize_record(data)
assert restored.metadata == meta
def test_empty_entropy_witnesses():
"""Record with no entropy witnesses round-trips correctly."""
record = _make_record(entropy_witnesses=None)
data = serialize_record(record)
restored = deserialize_record(data)
assert restored.entropy_witnesses is None