fieldwitness/tests/test_chain.py
Aaron D. Lee 490f9d4a1d Rebrand SooSeF to FieldWitness
Complete project rebrand for better positioning in the press freedom
and digital security space. FieldWitness communicates both field
deployment and evidence testimony — appropriate for the target audience
of journalists, NGOs, and human rights organizations.

Rename mapping:
- soosef → fieldwitness (package, CLI, all imports)
- soosef.stegasoo → fieldwitness.stego
- soosef.verisoo → fieldwitness.attest
- ~/.soosef/ → ~/.fwmetadata/ (innocuous data dir name)
- SOOSEF_DATA_DIR → FIELDWITNESS_DATA_DIR
- SoosefConfig → FieldWitnessConfig
- SoosefError → FieldWitnessError

Also includes:
- License switch from MIT to GPL-3.0
- C2PA bridge module (Phase 0-2 MVP): cert.py, export.py, vendor_assertions.py
- README repositioned to lead with provenance/federation, stego backgrounded
- Threat model skeleton at docs/security/threat-model.md
- Planning docs: docs/planning/c2pa-integration.md, docs/planning/gtm-feasibility.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:05:13 -04:00

327 lines
12 KiB
Python

"""Tests for the attestation hash chain store."""
from __future__ import annotations
import hashlib
from pathlib import Path
import pytest
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from fieldwitness.exceptions import ChainError, ChainIntegrityError
from fieldwitness.federation.chain import ChainStore
from fieldwitness.federation.models import ChainState
from fieldwitness.federation.serialization import canonical_bytes, compute_record_hash
def test_genesis_record(chain_dir: Path, private_key: Ed25519PrivateKey):
"""First record has chain_index=0 and prev_hash=0x00*32."""
store = ChainStore(chain_dir)
content_hash = hashlib.sha256(b"genesis content").digest()
record = store.append(content_hash, "test/plain", private_key)
assert record.chain_index == 0
assert record.prev_hash == ChainState.GENESIS_PREV_HASH
assert record.version == 1
assert record.content_hash == content_hash
assert record.content_type == "test/plain"
assert len(record.record_id) == 16
assert len(record.signer_pubkey) == 32
assert len(record.signature) == 64
def test_chain_state_after_genesis(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is correctly initialized after the first record."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
state = store.state()
assert state is not None
assert state.record_count == 1
assert state.head_index == 0
assert state.head_hash == compute_record_hash(record)
assert state.chain_id == hashlib.sha256(canonical_bytes(record)).digest()
def test_chain_append_multiple(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Appending multiple records produces monotonically increasing indices."""
store = ChainStore(chain_dir)
records = []
for i in range(5):
content = hashlib.sha256(f"content-{i}".encode()).digest()
record = store.append(content, "test/plain", private_key)
records.append(record)
for i, record in enumerate(records):
assert record.chain_index == i
state = store.state()
assert state is not None
assert state.record_count == 5
assert state.head_index == 4
def test_chain_hash_linkage(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's prev_hash matches the hash of the previous record."""
store = ChainStore(chain_dir)
r0 = store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
r2 = store.append(hashlib.sha256(b"r2").digest(), "test/plain", private_key)
assert r1.prev_hash == compute_record_hash(r0)
assert r2.prev_hash == compute_record_hash(r1)
def test_signature_verification(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Each record's Ed25519 signature is valid over canonical bytes."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
pub = Ed25519PublicKey.from_public_bytes(record.signer_pubkey)
# This will raise if invalid
pub.verify(record.signature, canonical_bytes(record))
def test_verify_chain_valid(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain returns True on a valid chain."""
store = ChainStore(chain_dir)
for i in range(5):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
assert store.verify_chain() is True
def test_verify_chain_detects_tamper(chain_dir: Path, private_key: Ed25519PrivateKey):
"""verify_chain detects a tampered record in the middle."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
# Corrupt chain.bin by flipping a byte in the middle
chain_file = chain_dir / "chain.bin"
data = bytearray(chain_file.read_bytes())
midpoint = len(data) // 2
data[midpoint] ^= 0xFF
chain_file.write_bytes(bytes(data))
# Force state reload
store._state = None
with pytest.raises((ChainIntegrityError, ChainError)):
store.verify_chain()
def test_entropy_witnesses_populated(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Entropy witnesses are populated with non-trivial values."""
store = ChainStore(chain_dir)
record = store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key)
ew = record.entropy_witnesses
assert ew is not None
assert ew.sys_uptime > 0
assert len(ew.fs_snapshot) == 16
assert ew.proc_entropy > 0
assert len(ew.boot_id) > 0
def test_chain_persistence(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain survives close and reopen."""
store1 = ChainStore(chain_dir)
r0 = store1.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store1.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Open a new store instance (simulates process restart)
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
# Can read records back
loaded_r0 = store2.get(0)
assert loaded_r0.content_hash == r0.content_hash
assert loaded_r0.signature == r0.signature
def test_chain_state_rebuild(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Chain state is rebuilt from chain.bin if state.cbor is missing."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"r0").digest(), "test/plain", private_key)
r1 = store.append(hashlib.sha256(b"r1").digest(), "test/plain", private_key)
# Delete state file
state_file = chain_dir / "state.cbor"
state_file.unlink()
# Reopen — should rebuild
store2 = ChainStore(chain_dir)
state = store2.state()
assert state is not None
assert state.record_count == 2
assert state.head_index == 1
assert state.head_hash == compute_record_hash(r1)
def test_empty_chain(chain_dir: Path):
"""Empty chain reports correct state."""
store = ChainStore(chain_dir)
assert store.is_empty() is True
assert store.state() is None
assert store.head() is None
def test_get_nonexistent_index(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Accessing a nonexistent index raises ChainError."""
store = ChainStore(chain_dir)
store.append(hashlib.sha256(b"only").digest(), "test/plain", private_key)
with pytest.raises(ChainError, match="not found"):
store.get(99)
def test_iter_records_range(chain_dir: Path, private_key: Ed25519PrivateKey):
"""iter_records respects start and end bounds."""
store = ChainStore(chain_dir)
for i in range(10):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
records = list(store.iter_records(start=3, end=6))
assert len(records) == 4
assert records[0].chain_index == 3
assert records[-1].chain_index == 6
def test_metadata_in_chain(chain_dir: Path, private_key: Ed25519PrivateKey):
"""Metadata is preserved through append and retrieval."""
store = ChainStore(chain_dir)
meta = {"caption": "evidence photo", "backfilled": True}
store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta)
loaded = store.get(0)
assert loaded.metadata == meta
def test_head_returns_latest(chain_dir: Path, private_key: Ed25519PrivateKey):
"""head() returns the most recently appended record."""
store = ChainStore(chain_dir)
for i in range(3):
store.append(hashlib.sha256(f"c-{i}".encode()).digest(), "test/plain", private_key)
head = store.head()
assert head is not None
assert head.chain_index == 2
def test_verify_chain_detects_signer_change(chain_dir: Path):
"""verify_chain flags a different signer as integrity violation."""
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
# Manually bypass normal append to inject a record signed by key2.
# We need to build the record with correct prev_hash but wrong signer.
import fcntl
import struct
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from uuid_utils import uuid7
from fieldwitness.federation.entropy import collect_entropy_witnesses
from fieldwitness.federation.models import AttestationChainRecord
from fieldwitness.federation.serialization import canonical_bytes as cb
from fieldwitness.federation.serialization import serialize_record
state = store.state()
prev_hash = state.head_hash
pub2 = key2.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
entropy = collect_entropy_witnesses(chain_dir / "chain.bin")
record = AttestationChainRecord(
version=1,
record_id=uuid7().bytes,
chain_index=1,
prev_hash=prev_hash,
content_hash=hashlib.sha256(b"r1").digest(),
content_type="test/plain",
metadata={},
claimed_ts=int(__import__("time").time() * 1_000_000),
entropy_witnesses=entropy,
signer_pubkey=pub2,
signature=b"",
)
sig = key2.sign(cb(record))
record = AttestationChainRecord(
version=record.version,
record_id=record.record_id,
chain_index=record.chain_index,
prev_hash=record.prev_hash,
content_hash=record.content_hash,
content_type=record.content_type,
metadata=record.metadata,
claimed_ts=record.claimed_ts,
entropy_witnesses=record.entropy_witnesses,
signer_pubkey=record.signer_pubkey,
signature=sig,
)
record_bytes = serialize_record(record)
with open(chain_dir / "chain.bin", "ab") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(struct.pack(">I", len(record_bytes)))
f.write(record_bytes)
fcntl.flock(f, fcntl.LOCK_UN)
store._state = None
with pytest.raises(ChainIntegrityError, match="is not authorized"):
store.verify_chain()
def test_key_rotation_in_chain(chain_dir: Path):
"""Chain with a proper key rotation record verifies successfully."""
from fieldwitness.federation.chain import CONTENT_TYPE_KEY_ROTATION
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
# Append records with key1
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key1)
# Rotate: old key signs a rotation record introducing new key
rotation = store.append_key_rotation(old_private_key=key1, new_private_key=key2)
assert rotation.content_type == CONTENT_TYPE_KEY_ROTATION
assert rotation.metadata["new_pubkey"]
# New key can now sign records
store.append(hashlib.sha256(b"r3").digest(), "test/plain", key2)
store.append(hashlib.sha256(b"r4").digest(), "test/plain", key2)
# Full chain verifies
assert store.verify_chain() is True
def test_key_rotation_without_rotation_record_fails(chain_dir: Path):
"""Using a new key without a rotation record is rejected."""
store = ChainStore(chain_dir)
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
store.append(hashlib.sha256(b"r0").digest(), "test/plain", key1)
# Directly use key2 without rotation — should fail verification
store.append(hashlib.sha256(b"r1").digest(), "test/plain", key2)
with pytest.raises(ChainIntegrityError, match="is not authorized"):
store.verify_chain()