Critical: - FR-01: Chain verification now supports key rotation via signed rotation records (soosef/key-rotation-v1 content type). Old single-signer invariant replaced with authorized-signers set. - FR-02: Carrier images stripped of EXIF metadata by default before steganographic encoding (strip_metadata=True). Prevents source location/device leakage. High priority: - FR-03: Session timeout (default 15min) + secure cookie flags (HttpOnly, SameSite=Strict, Secure when HTTPS) - FR-04: CSRF protection via Flask-WTF on all POST forms. Killswitch now requires password re-authentication. - FR-05: Collaborator trust store — trust_key(), get_trusted_keys(), resolve_attestor_name(), untrust_key() in KeystoreManager. - FR-06: Production WSGI server (Waitress) by default, Flask dev server only with --debug flag. - FR-07: Dead man's switch sends warning during grace period via local file + optional webhook before auto-purge. Medium: - FR-08: Geofence get_current_location() via gpsd for --here support. - FR-09: Batch attestation endpoint (/attest/batch) with SHA-256 dedup and per-file status reporting. - FR-10: Key backup tracking with last_backup_info() and is_backup_overdue() + backup_reminder_days config. - FR-11: Verification receipts signed with instance Ed25519 key (schema_version bumped to 2). - FR-12: Login rate limiting with configurable lockout (5 attempts, 15 min default). Nice-to-have: - FR-13: Unified `soosef status` pre-flight command showing identity, channel key, deadman, geofence, chain, and backup status. - FR-14: `soosef chain export` produces ZIP with JSON manifest, public key, and raw chain.bin for legal discovery. Tests: 157 passed, 1 skipped, 1 pre-existing flaky test. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
327 lines
12 KiB
Python
327 lines
12 KiB
Python
"""Tests for the attestation hash chain store."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
|
Ed25519PrivateKey,
|
|
Ed25519PublicKey,
|
|
)
|
|
|
|
from soosef.exceptions import ChainError, ChainIntegrityError
|
|
from soosef.federation.chain import ChainStore
|
|
from soosef.federation.models import ChainState
|
|
from soosef.federation.serialization import canonical_bytes, compute_record_hash
|
|
|
|
|
|
def test_genesis_record(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""First record has chain_index=0 and prev_hash=0x00*32."""
|
|
store = ChainStore(chain_dir)
|
|
content_hash = hashlib.sha256(b"genesis content").digest()
|
|
|
|
record = store.append(content_hash, "test/plain", private_key)
|
|
|
|
assert record.chain_index == 0
|
|
assert record.prev_hash == ChainState.GENESIS_PREV_HASH
|
|
assert record.version == 1
|
|
assert record.content_hash == content_hash
|
|
assert record.content_type == "test/plain"
|
|
assert len(record.record_id) == 16
|
|
assert len(record.signer_pubkey) == 32
|
|
assert len(record.signature) == 64
|
|
|
|
|
|
def test_chain_state_after_genesis(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Chain state is correctly initialized after the first record."""
|
|
store = ChainStore(chain_dir)
|
|
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
|
|
|
|
state = store.state()
|
|
assert state is not None
|
|
assert state.record_count == 1
|
|
assert state.head_index == 0
|
|
assert state.head_hash == compute_record_hash(record)
|
|
assert state.chain_id == hashlib.sha256(canonical_bytes(record)).digest()
|
|
|
|
|
|
def test_chain_append_multiple(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Appending multiple records produces monotonically increasing indices."""
|
|
store = ChainStore(chain_dir)
|
|
|
|
records = []
|
|
for i in range(5):
|
|
content = hashlib.sha256(f"content-{i}".encode()).digest()
|
|
record = store.append(content, "test/plain", private_key)
|
|
records.append(record)
|
|
|
|
for i, record in enumerate(records):
|
|
assert record.chain_index == i
|
|
|
|
state = store.state()
|
|
assert state is not None
|
|
assert state.record_count == 5
|
|
assert state.head_index == 4
|
|
|
|
|
|
def test_chain_hash_linkage(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Each record's prev_hash matches the hash of the previous record."""
|
|
store = ChainStore(chain_dir)
|
|
|
|
r0 = store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
|
|
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
|
|
r2 = store.append(hashlib.sha256(b"r2").digest(), "test/plain", private_key)
|
|
|
|
assert r1.prev_hash == compute_record_hash(r0)
|
|
assert r2.prev_hash == compute_record_hash(r1)
|
|
|
|
|
|
def test_signature_verification(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Each record's Ed25519 signature is valid over canonical bytes."""
|
|
store = ChainStore(chain_dir)
|
|
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
|
|
|
|
pub = Ed25519PublicKey.from_public_bytes(record.signer_pubkey)
|
|
# This will raise if invalid
|
|
pub.verify(record.signature, canonical_bytes(record))
|
|
|
|
|
|
def test_verify_chain_valid(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""verify_chain returns True on a valid chain."""
|
|
store = ChainStore(chain_dir)
|
|
for i in range(5):
|
|
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
|
|
|
|
assert store.verify_chain() is True
|
|
|
|
|
|
def test_verify_chain_detects_tamper(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""verify_chain detects a tampered record in the middle."""
|
|
store = ChainStore(chain_dir)
|
|
for i in range(3):
|
|
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
|
|
|
|
# Corrupt chain.bin by flipping a byte in the middle
|
|
chain_file = chain_dir / "chain.bin"
|
|
data = bytearray(chain_file.read_bytes())
|
|
midpoint = len(data) // 2
|
|
data[midpoint] ^= 0xFF
|
|
chain_file.write_bytes(bytes(data))
|
|
|
|
# Force state reload
|
|
store._state = None
|
|
|
|
with pytest.raises((ChainIntegrityError, ChainError)):
|
|
store.verify_chain()
|
|
|
|
|
|
def test_entropy_witnesses_populated(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Entropy witnesses are populated with non-trivial values."""
|
|
store = ChainStore(chain_dir)
|
|
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
|
|
|
|
ew = record.entropy_witnesses
|
|
assert ew is not None
|
|
assert ew.sys_uptime > 0
|
|
assert len(ew.fs_snapshot) == 16
|
|
assert ew.proc_entropy > 0
|
|
assert len(ew.boot_id) > 0
|
|
|
|
|
|
def test_chain_persistence(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Chain survives close and reopen."""
|
|
store1 = ChainStore(chain_dir)
|
|
r0 = store1.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
|
|
r1 = store1.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
|
|
|
|
# Open a new store instance (simulates process restart)
|
|
store2 = ChainStore(chain_dir)
|
|
|
|
state = store2.state()
|
|
assert state is not None
|
|
assert state.record_count == 2
|
|
assert state.head_index == 1
|
|
assert state.head_hash == compute_record_hash(r1)
|
|
|
|
# Can read records back
|
|
loaded_r0 = store2.get(0)
|
|
assert loaded_r0.content_hash == r0.content_hash
|
|
assert loaded_r0.signature == r0.signature
|
|
|
|
|
|
def test_chain_state_rebuild(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Chain state is rebuilt from chain.bin if state.cbor is missing."""
|
|
store = ChainStore(chain_dir)
|
|
store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
|
|
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
|
|
|
|
# Delete state file
|
|
state_file = chain_dir / "state.cbor"
|
|
state_file.unlink()
|
|
|
|
# Reopen — should rebuild
|
|
store2 = ChainStore(chain_dir)
|
|
state = store2.state()
|
|
assert state is not None
|
|
assert state.record_count == 2
|
|
assert state.head_index == 1
|
|
assert state.head_hash == compute_record_hash(r1)
|
|
|
|
|
|
def test_empty_chain(chain_dir: Path):
|
|
"""Empty chain reports correct state."""
|
|
store = ChainStore(chain_dir)
|
|
assert store.is_empty() is True
|
|
assert store.state() is None
|
|
assert store.head() is None
|
|
|
|
|
|
def test_get_nonexistent_index(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Accessing a nonexistent index raises ChainError."""
|
|
store = ChainStore(chain_dir)
|
|
store.append(hashlib.sha256(b"only").digest(), "test/plain", private_key)
|
|
|
|
with pytest.raises(ChainError, match="not found"):
|
|
store.get(99)
|
|
|
|
|
|
def test_iter_records_range(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""iter_records respects start and end bounds."""
|
|
store = ChainStore(chain_dir)
|
|
for i in range(10):
|
|
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
|
|
|
|
records = list(store.iter_records(start=3, end=6))
|
|
assert len(records) == 4
|
|
assert records[0].chain_index == 3
|
|
assert records[-1].chain_index == 6
|
|
|
|
|
|
def test_metadata_in_chain(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""Metadata is preserved through append and retrieval."""
|
|
store = ChainStore(chain_dir)
|
|
meta = {"caption": "evidence photo", "backfilled": True}
|
|
store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta)
|
|
|
|
loaded = store.get(0)
|
|
assert loaded.metadata == meta
|
|
|
|
|
|
def test_head_returns_latest(chain_dir: Path, private_key: Ed25519PrivateKey):
|
|
"""head() returns the most recently appended record."""
|
|
store = ChainStore(chain_dir)
|
|
for i in range(3):
|
|
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
|
|
|
|
head = store.head()
|
|
assert head is not None
|
|
assert head.chain_index == 2
|
|
|
|
|
|
def test_verify_chain_detects_signer_change(chain_dir: Path):
|
|
"""verify_chain flags a different signer as integrity violation."""
|
|
store = ChainStore(chain_dir)
|
|
key1 = Ed25519PrivateKey.generate()
|
|
key2 = Ed25519PrivateKey.generate()
|
|
|
|
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
|
|
|
|
# Manually bypass normal append to inject a record signed by key2.
|
|
# We need to build the record with correct prev_hash but wrong signer.
|
|
import fcntl
|
|
import struct
|
|
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
|
|
from uuid_utils import uuid7
|
|
|
|
from soosef.federation.entropy import collect_entropy_witnesses
|
|
from soosef.federation.models import AttestationChainRecord
|
|
from soosef.federation.serialization import canonical_bytes as cb
|
|
from soosef.federation.serialization import serialize_record
|
|
|
|
state = store.state()
|
|
prev_hash = state.head_hash
|
|
pub2 = key2.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
|
|
entropy = collect_entropy_witnesses(chain_dir / "chain.bin")
|
|
|
|
record = AttestationChainRecord(
|
|
version=1,
|
|
record_id=uuid7().bytes,
|
|
chain_index=1,
|
|
prev_hash=prev_hash,
|
|
content_hash=hashlib.sha256(b"r1").digest(),
|
|
content_type="test/plain",
|
|
metadata={},
|
|
claimed_ts=int(__import__("time").time() * 1_000_000),
|
|
entropy_witnesses=entropy,
|
|
signer_pubkey=pub2,
|
|
signature=b"",
|
|
)
|
|
sig = key2.sign(cb(record))
|
|
record = AttestationChainRecord(
|
|
version=record.version,
|
|
record_id=record.record_id,
|
|
chain_index=record.chain_index,
|
|
prev_hash=record.prev_hash,
|
|
content_hash=record.content_hash,
|
|
content_type=record.content_type,
|
|
metadata=record.metadata,
|
|
claimed_ts=record.claimed_ts,
|
|
entropy_witnesses=record.entropy_witnesses,
|
|
signer_pubkey=record.signer_pubkey,
|
|
signature=sig,
|
|
)
|
|
|
|
record_bytes = serialize_record(record)
|
|
with open(chain_dir / "chain.bin", "ab") as f:
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
f.write(struct.pack(">I", len(record_bytes)))
|
|
f.write(record_bytes)
|
|
fcntl.flock(f, fcntl.LOCK_UN)
|
|
|
|
store._state = None
|
|
|
|
with pytest.raises(ChainIntegrityError, match="is not authorized"):
|
|
store.verify_chain()
|
|
|
|
|
|
def test_key_rotation_in_chain(chain_dir: Path):
|
|
"""Chain with a proper key rotation record verifies successfully."""
|
|
from soosef.federation.chain import CONTENT_TYPE_KEY_ROTATION
|
|
|
|
store = ChainStore(chain_dir)
|
|
key1 = Ed25519PrivateKey.generate()
|
|
key2 = Ed25519PrivateKey.generate()
|
|
|
|
# Append records with key1
|
|
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
|
|
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key1)
|
|
|
|
# Rotate: old key signs a rotation record introducing new key
|
|
rotation = store.append_key_rotation(old_private_key=key1, new_private_key=key2)
|
|
assert rotation.content_type == CONTENT_TYPE_KEY_ROTATION
|
|
assert rotation.metadata["new_pubkey"]
|
|
|
|
# New key can now sign records
|
|
store.append(hashlib.sha256(b"r3").digest(), "test/plain", key2)
|
|
store.append(hashlib.sha256(b"r4").digest(), "test/plain", key2)
|
|
|
|
# Full chain verifies
|
|
assert store.verify_chain() is True
|
|
|
|
|
|
def test_key_rotation_without_rotation_record_fails(chain_dir: Path):
|
|
"""Using a new key without a rotation record is rejected."""
|
|
store = ChainStore(chain_dir)
|
|
key1 = Ed25519PrivateKey.generate()
|
|
key2 = Ed25519PrivateKey.generate()
|
|
|
|
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
|
|
# Directly use key2 without rotation — should fail verification
|
|
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key2)
|
|
|
|
with pytest.raises(ChainIntegrityError, match="is not authorized"):
|
|
store.verify_chain()
|