fieldwitness/tests/test_chain.py
Aaron D. Lee fb0cc3e39d Implement 14 power-user feature requests for field deployment
Critical:
- FR-01: Chain verification now supports key rotation via signed rotation
  records (soosef/key-rotation-v1 content type). Old single-signer
  invariant replaced with authorized-signers set.
- FR-02: Carrier images stripped of EXIF metadata by default before
  steganographic encoding (strip_metadata=True). Prevents source
  location/device leakage.

High priority:
- FR-03: Session timeout (default 15min) + secure cookie flags
  (HttpOnly, SameSite=Strict, Secure when HTTPS)
- FR-04: CSRF protection via Flask-WTF on all POST forms. Killswitch
  now requires password re-authentication.
- FR-05: Collaborator trust store — trust_key(), get_trusted_keys(),
  resolve_attestor_name(), untrust_key() in KeystoreManager.
- FR-06: Production WSGI server (Waitress) by default, Flask dev
  server only with --debug flag.
- FR-07: Dead man's switch sends warning during grace period via
  local file + optional webhook before auto-purge.

Medium:
- FR-08: Geofence get_current_location() via gpsd for --here support.
- FR-09: Batch attestation endpoint (/attest/batch) with SHA-256
  dedup and per-file status reporting.
- FR-10: Key backup tracking with last_backup_info() and
  is_backup_overdue() + backup_reminder_days config.
- FR-11: Verification receipts signed with instance Ed25519 key
  (schema_version bumped to 2).
- FR-12: Login rate limiting with configurable lockout (5 attempts,
  15 min default).

Nice-to-have:
- FR-13: Unified `soosef status` pre-flight command showing identity,
  channel key, deadman, geofence, chain, and backup status.
- FR-14: `soosef chain export` produces ZIP with JSON manifest,
  public key, and raw chain.bin for legal discovery.

Tests: 157 passed, 1 skipped, 1 pre-existing flaky test.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 19:35:36 -04:00

327 lines
12 KiB
Python

"""Tests for the attestation hash chain store."""
from __future__ import annotations
import hashlib
from pathlib import Path
import pytest
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from soosef.exceptions import ChainError, ChainIntegrityError
from soosef.federation.chain import ChainStore
from soosef.federation.models import ChainState
from soosef.federation.serialization import canonical_bytes, compute_record_hash
def test_genesis_record(chain_dir: Path, private_key: Ed25519PrivateKey):
"""First record has chain_index=0 and prev_hash=0x00*32."""
store = ChainStore(chain_dir)
content_hash = hashlib.sha256(b"genesis content").digest()
record = store.append(content_hash, "test/plain", private_key)
assert record.chain_index == 0
assert record.prev_hash == ChainState.GENESIS_PREV_HASH
assert record.version == 1
assert record.content_hash == content_hash
assert record.content_type == "test/plain"
assert len(record.record_id) == 16
assert len(record.signer_pubkey) == 32
assert len(record.signature) == 64
def test_chain_state_after_genesis(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is correctly initialized after the first record."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
state = store.state()
assert state is not None
assert state.record_count == 1
assert state.head_index == 0
assert state.head_hash == compute_record_hash(record)
assert state.chain_id == hashlib.sha256(canonical_bytes(record)).digest()
def test_chain_append_multiple(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Appending multiple records produces monotonically increasing indices."""
store = ChainStore(chain_dir)
records = []
for i in range(5):
content = hashlib.sha256(f"content-{i}".encode()).digest()
record = store.append(content, "test/plain", private_key)
records.append(record)
for i, record in enumerate(records):
assert record.chain_index == i
state = store.state()
assert state is not None
assert state.record_count == 5
assert state.head_index == 4
def test_chain_hash_linkage(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's prev_hash matches the hash of the previous record."""
store = ChainStore(chain_dir)
r0 = store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
r2 = store.append(hashlib.sha256(b"r2").digest(), "test/plain", private_key)
assert r1.prev_hash == compute_record_hash(r0)
assert r2.prev_hash == compute_record_hash(r1)
def test_signature_verification(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's Ed25519 signature is valid over canonical bytes."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
pub = Ed25519PublicKey.from_public_bytes(record.signer_pubkey)
# This will raise if invalid
pub.verify(record.signature, canonical_bytes(record))
def test_verify_chain_valid(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain returns True on a valid chain."""
store = ChainStore(chain_dir)
for i in range(5):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
assert store.verify_chain() is True
def test_verify_chain_detects_tamper(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain detects a tampered record in the middle."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
# Corrupt chain.bin by flipping a byte in the middle
chain_file = chain_dir / "chain.bin"
data = bytearray(chain_file.read_bytes())
midpoint = len(data) // 2
data[midpoint] ^= 0xFF
chain_file.write_bytes(bytes(data))
# Force state reload
store._state = None
with pytest.raises((ChainIntegrityError, ChainError)):
store.verify_chain()
def test_entropy_witnesses_populated(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Entropy witnesses are populated with non-trivial values."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
ew = record.entropy_witnesses
assert ew is not None
assert ew.sys_uptime > 0
assert len(ew.fs_snapshot) == 16
assert ew.proc_entropy > 0
assert len(ew.boot_id) > 0
def test_chain_persistence(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain survives close and reopen."""
store1 = ChainStore(chain_dir)
r0 = store1.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store1.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Open a new store instance (simulates process restart)
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
# Can read records back
loaded_r0 = store2.get(0)
assert loaded_r0.content_hash == r0.content_hash
assert loaded_r0.signature == r0.signature
def test_chain_state_rebuild(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is rebuilt from chain.bin if state.cbor is missing."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Delete state file
state_file = chain_dir / "state.cbor"
state_file.unlink()
# Reopen — should rebuild
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
def test_empty_chain(chain_dir: Path):
"""Empty chain reports correct state."""
store = ChainStore(chain_dir)
assert store.is_empty() is True
assert store.state() is None
assert store.head() is None
def test_get_nonexistent_index(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Accessing a nonexistent index raises ChainError."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"only").digest(), "test/plain", private_key)
with pytest.raises(ChainError, match="not found"):
store.get(99)
def test_iter_records_range(chain_dir: Path, private_key: Ed25519PrivateKey):
"""iter_records respects start and end bounds."""
store = ChainStore(chain_dir)
for i in range(10):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
records = list(store.iter_records(start=3, end=6))
assert len(records) == 4
assert records[0].chain_index == 3
assert records[-1].chain_index == 6
def test_metadata_in_chain(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Metadata is preserved through append and retrieval."""
store = ChainStore(chain_dir)
meta = {"caption": "evidence photo", "backfilled": True}
store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta)
loaded = store.get(0)
assert loaded.metadata == meta
def test_head_returns_latest(chain_dir: Path, private_key: Ed25519PrivateKey):
"""head() returns the most recently appended record."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
head = store.head()
assert head is not None
assert head.chain_index == 2
def test_verify_chain_detects_signer_change(chain_dir: Path):
"""verify_chain flags a different signer as integrity violation."""
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
# Manually bypass normal append to inject a record signed by key2.
# We need to build the record with correct prev_hash but wrong signer.
import fcntl
import struct
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from uuid_utils import uuid7
from soosef.federation.entropy import collect_entropy_witnesses
from soosef.federation.models import AttestationChainRecord
from soosef.federation.serialization import canonical_bytes as cb
from soosef.federation.serialization import serialize_record
state = store.state()
prev_hash = state.head_hash
pub2 = key2.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
entropy = collect_entropy_witnesses(chain_dir / "chain.bin")
record = AttestationChainRecord(
version=1,
record_id=uuid7().bytes,
chain_index=1,
prev_hash=prev_hash,
content_hash=hashlib.sha256(b"r1").digest(),
content_type="test/plain",
metadata={},
claimed_ts=int(__import__("time").time() * 1_000_000),
entropy_witnesses=entropy,
signer_pubkey=pub2,
signature=b"",
)
sig = key2.sign(cb(record))
record = AttestationChainRecord(
version=record.version,
record_id=record.record_id,
chain_index=record.chain_index,
prev_hash=record.prev_hash,
content_hash=record.content_hash,
content_type=record.content_type,
metadata=record.metadata,
claimed_ts=record.claimed_ts,
entropy_witnesses=record.entropy_witnesses,
signer_pubkey=record.signer_pubkey,
signature=sig,
)
record_bytes = serialize_record(record)
with open(chain_dir / "chain.bin", "ab") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(struct.pack(">I", len(record_bytes)))
f.write(record_bytes)
fcntl.flock(f, fcntl.LOCK_UN)
store._state = None
with pytest.raises(ChainIntegrityError, match="is not authorized"):
store.verify_chain()
def test_key_rotation_in_chain(chain_dir: Path):
"""Chain with a proper key rotation record verifies successfully."""
from soosef.federation.chain import CONTENT_TYPE_KEY_ROTATION
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
# Append records with key1
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key1)
# Rotate: old key signs a rotation record introducing new key
rotation = store.append_key_rotation(old_private_key=key1, new_private_key=key2)
assert rotation.content_type == CONTENT_TYPE_KEY_ROTATION
assert rotation.metadata["new_pubkey"]
# New key can now sign records
store.append(hashlib.sha256(b"r3").digest(), "test/plain", key2)
store.append(hashlib.sha256(b"r4").digest(), "test/plain", key2)
# Full chain verifies
assert store.verify_chain() is True
def test_key_rotation_without_rotation_record_fails(chain_dir: Path):
"""Using a new key without a rotation record is rejected."""
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
# Directly use key2 without rotation — should fail verification
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key2)
with pytest.raises(ChainIntegrityError, match="is not authorized"):
store.verify_chain()