fieldwitness/tests/test_chain.py
Aaron D. Lee 51c9b0a99a Fix 14 bugs and add features from power-user security audit
Critical fixes:
- Fix admin_delete_user missing current_user_id argument (TypeError on every delete)
- Fix self-signed cert OOM: bytes(2130706433) → IPv4Address("127.0.0.1")
- Add @login_required to attestation routes (attest, log); verify stays public
- Add auth guards to fieldkit (@admin_required on killswitch) and keys blueprints
- Fix cleanup_temp_files NameError in generate() route

Security hardening:
- Unify temp storage to ~/.soosef/temp/ so killswitch purge covers web uploads
- Replace Path.unlink() with secure deletion (shred fallback) in temp_storage
- Add structured audit log (audit.jsonl) for admin, key, and killswitch actions

New features:
- Dead man's switch background enforcement thread in serve + check-deadman CLI
- Key rotation: soosef keys rotate-identity/rotate-channel with archiving
- Batch attestation: soosef attest batch <dir> with progress and error handling
- Geofence CLI: set/check/clear commands with config persistence
- USB CLI: snapshot/check commands against device whitelist
- Verification receipt download (/verify/receipt JSON endpoint + UI button)
- IdentityInfo.created_at populated from sidecar meta.json (mtime fallback)

Data layer:
- ChainStore.get() now O(1) via byte-offset index built during state rebuild
- Add federation module (chain, models, serialization, entropy)

Includes 45+ new tests across chain, deadman, key rotation, killswitch, and
serialization modules.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 17:06:33 -04:00

288 lines
10 KiB
Python

"""Tests for the attestation hash chain store."""
from __future__ import annotations
import hashlib
from pathlib import Path
import pytest
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from soosef.exceptions import ChainError, ChainIntegrityError
from soosef.federation.chain import ChainStore
from soosef.federation.models import ChainState
from soosef.federation.serialization import canonical_bytes, compute_record_hash
def test_genesis_record(chain_dir: Path, private_key: Ed25519PrivateKey):
"""First record has chain_index=0 and prev_hash=0x00*32."""
store = ChainStore(chain_dir)
content_hash = hashlib.sha256(b"genesis content").digest()
record = store.append(content_hash, "test/plain", private_key)
assert record.chain_index == 0
assert record.prev_hash == ChainState.GENESIS_PREV_HASH
assert record.version == 1
assert record.content_hash == content_hash
assert record.content_type == "test/plain"
assert len(record.record_id) == 16
assert len(record.signer_pubkey) == 32
assert len(record.signature) == 64
def test_chain_state_after_genesis(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is correctly initialized after the first record."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
state = store.state()
assert state is not None
assert state.record_count == 1
assert state.head_index == 0
assert state.head_hash == compute_record_hash(record)
assert state.chain_id == hashlib.sha256(canonical_bytes(record)).digest()
def test_chain_append_multiple(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Appending multiple records produces monotonically increasing indices."""
store = ChainStore(chain_dir)
records = []
for i in range(5):
content = hashlib.sha256(f"content-{i}".encode()).digest()
record = store.append(content, "test/plain", private_key)
records.append(record)
for i, record in enumerate(records):
assert record.chain_index == i
state = store.state()
assert state is not None
assert state.record_count == 5
assert state.head_index == 4
def test_chain_hash_linkage(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's prev_hash matches the hash of the previous record."""
store = ChainStore(chain_dir)
r0 = store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
r2 = store.append(hashlib.sha256(b"r2").digest(), "test/plain", private_key)
assert r1.prev_hash == compute_record_hash(r0)
assert r2.prev_hash == compute_record_hash(r1)
def test_signature_verification(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's Ed25519 signature is valid over canonical bytes."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
pub = Ed25519PublicKey.from_public_bytes(record.signer_pubkey)
# This will raise if invalid
pub.verify(record.signature, canonical_bytes(record))
def test_verify_chain_valid(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain returns True on a valid chain."""
store = ChainStore(chain_dir)
for i in range(5):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
assert store.verify_chain() is True
def test_verify_chain_detects_tamper(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain detects a tampered record in the middle."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
# Corrupt chain.bin by flipping a byte in the middle
chain_file = chain_dir / "chain.bin"
data = bytearray(chain_file.read_bytes())
midpoint = len(data) // 2
data[midpoint] ^= 0xFF
chain_file.write_bytes(bytes(data))
# Force state reload
store._state = None
with pytest.raises((ChainIntegrityError, ChainError)):
store.verify_chain()
def test_entropy_witnesses_populated(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Entropy witnesses are populated with non-trivial values."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
ew = record.entropy_witnesses
assert ew is not None
assert ew.sys_uptime > 0
assert len(ew.fs_snapshot) == 16
assert ew.proc_entropy > 0
assert len(ew.boot_id) > 0
def test_chain_persistence(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain survives close and reopen."""
store1 = ChainStore(chain_dir)
r0 = store1.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store1.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Open a new store instance (simulates process restart)
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
# Can read records back
loaded_r0 = store2.get(0)
assert loaded_r0.content_hash == r0.content_hash
assert loaded_r0.signature == r0.signature
def test_chain_state_rebuild(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is rebuilt from chain.bin if state.cbor is missing."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Delete state file
state_file = chain_dir / "state.cbor"
state_file.unlink()
# Reopen — should rebuild
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
def test_empty_chain(chain_dir: Path):
"""Empty chain reports correct state."""
store = ChainStore(chain_dir)
assert store.is_empty() is True
assert store.state() is None
assert store.head() is None
def test_get_nonexistent_index(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Accessing a nonexistent index raises ChainError."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"only").digest(), "test/plain", private_key)
with pytest.raises(ChainError, match="not found"):
store.get(99)
def test_iter_records_range(chain_dir: Path, private_key: Ed25519PrivateKey):
"""iter_records respects start and end bounds."""
store = ChainStore(chain_dir)
for i in range(10):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
records = list(store.iter_records(start=3, end=6))
assert len(records) == 4
assert records[0].chain_index == 3
assert records[-1].chain_index == 6
def test_metadata_in_chain(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Metadata is preserved through append and retrieval."""
store = ChainStore(chain_dir)
meta = {"caption": "evidence photo", "backfilled": True}
record = store.append(
hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta
)
loaded = store.get(0)
assert loaded.metadata == meta
def test_head_returns_latest(chain_dir: Path, private_key: Ed25519PrivateKey):
"""head() returns the most recently appended record."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
head = store.head()
assert head is not None
assert head.chain_index == 2
def test_verify_chain_detects_signer_change(chain_dir: Path):
"""verify_chain flags a different signer as integrity violation."""
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
# Manually bypass normal append to inject a record signed by key2.
# We need to build the record with correct prev_hash but wrong signer.
import struct
import fcntl
from soosef.federation.serialization import serialize_record
from soosef.federation.models import AttestationChainRecord
from soosef.federation.entropy import collect_entropy_witnesses
from uuid_utils import uuid7
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from soosef.federation.serialization import canonical_bytes as cb
state = store.state()
prev_hash = state.head_hash
pub2 = key2.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
entropy = collect_entropy_witnesses(chain_dir / "chain.bin")
record = AttestationChainRecord(
version=1,
record_id=uuid7().bytes,
chain_index=1,
prev_hash=prev_hash,
content_hash=hashlib.sha256(b"r1").digest(),
content_type="test/plain",
metadata={},
claimed_ts=int(__import__("time").time() * 1_000_000),
entropy_witnesses=entropy,
signer_pubkey=pub2,
signature=b"",
)
sig = key2.sign(cb(record))
record = AttestationChainRecord(
version=record.version,
record_id=record.record_id,
chain_index=record.chain_index,
prev_hash=record.prev_hash,
content_hash=record.content_hash,
content_type=record.content_type,
metadata=record.metadata,
claimed_ts=record.claimed_ts,
entropy_witnesses=record.entropy_witnesses,
signer_pubkey=record.signer_pubkey,
signature=sig,
)
record_bytes = serialize_record(record)
with open(chain_dir / "chain.bin", "ab") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(struct.pack(">I", len(record_bytes)))
f.write(record_bytes)
fcntl.flock(f, fcntl.LOCK_UN)
store._state = None
with pytest.raises(ChainIntegrityError, match="signer changed"):
store.verify_chain()