From fef552b9c120af487d430ff994ea07a847527a28 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Wed, 1 Apr 2026 21:27:15 -0400 Subject: [PATCH] Fix 3 architectural bottlenecks blocking cross-domain adoption MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bottleneck 1: ImageHashes generalization - phash and dhash now default to "" (optional), enabling attestation of CSV datasets, sensor logs, documents, and any non-image file - Added ImageHashes.from_file() for arbitrary file attestation (SHA-256 only, no perceptual hashes) - Added ImageHashes.is_image property to check if perceptual matching is meaningful - Added content_type field to AttestationRecord ("image", "document", "data", "audio", "video") — backward compatible, defaults to "image" - from_dict() now tolerates missing phash/dhash fields Bottleneck 2: Lazy path resolution - Converted 5 modules from eager top-level path imports to lazy access via `import soosef.paths as _paths`: config.py, deadman.py, usb_monitor.py, tamper.py, anchors.py - Paths now resolve at use-time, not import-time, so --data-dir and SOOSEF_DATA_DIR overrides propagate correctly to all modules - Enables portable mode (run entirely from USB stick) - Updated deadman enforcement tests for new path access pattern Bottleneck 3: Delivery acknowledgment chain records - New CONTENT_TYPE_DELIVERY_ACK = "soosef/delivery-ack-v1" - ChainStore.append_delivery_ack() records bundle receipt with sender fingerprint and record count - import_attestation_bundle() auto-generates ack when chain store and private key are provided - Enables two-way federation handshakes (art provenance, legal chain of custody, multi-org evidence exchange) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/soosef/config.py | 6 ++--- src/soosef/federation/anchors.py | 8 +++--- src/soosef/federation/chain.py | 40 ++++++++++++++++++++++++++++++ src/soosef/federation/exchange.py | 24 ++++++++++++++++++ src/soosef/fieldkit/deadman.py | 4 +-- src/soosef/fieldkit/tamper.py | 6 ++--- src/soosef/fieldkit/usb_monitor.py | 6 ++--- src/soosef/verisoo/models.py | 39 +++++++++++++++++++---------- tests/test_deadman_enforcement.py | 16 ++++++------ 9 files changed, 113 insertions(+), 36 deletions(-) diff --git a/src/soosef/config.py b/src/soosef/config.py index 8bedeaf..19a1e16 100644 --- a/src/soosef/config.py +++ b/src/soosef/config.py @@ -9,7 +9,7 @@ import json from dataclasses import dataclass from pathlib import Path -from soosef.paths import CONFIG_FILE +import soosef.paths as _paths @dataclass @@ -55,7 +55,7 @@ class SoosefConfig: @classmethod def load(cls, path: Path | None = None) -> "SoosefConfig": """Load config from JSON file, falling back to defaults.""" - config_path = path or CONFIG_FILE + config_path = path or _paths.CONFIG_FILE if config_path.exists(): with open(config_path) as f: data = json.load(f) @@ -64,7 +64,7 @@ class SoosefConfig: def save(self, path: Path | None = None) -> None: """Persist config to JSON file.""" - config_path = path or CONFIG_FILE + config_path = path or _paths.CONFIG_FILE config_path.parent.mkdir(parents=True, exist_ok=True) from dataclasses import asdict diff --git a/src/soosef/federation/anchors.py b/src/soosef/federation/anchors.py index 3047140..57d2ef3 100644 --- a/src/soosef/federation/anchors.py +++ b/src/soosef/federation/anchors.py @@ -19,7 +19,7 @@ import json from datetime import UTC, datetime from pathlib import Path -from soosef.paths import CHAIN_DIR +import soosef.paths as _paths def get_chain_head_anchor() -> dict: @@ -29,7 +29,7 @@ def get_chain_head_anchor() -> dict: """ from soosef.federation.chain import ChainStore - store = ChainStore(CHAIN_DIR) + store = ChainStore(_paths.CHAIN_DIR) state = store.state() if state is None: @@ -102,7 +102,7 @@ def save_anchor(anchor: dict, tsa_response: dict | None = None) -> Path: Returns the path to the saved anchor file. """ - anchors_dir = CHAIN_DIR / "anchors" + anchors_dir = _paths.CHAIN_DIR / "anchors" anchors_dir.mkdir(parents=True, exist_ok=True) record = { @@ -119,7 +119,7 @@ def save_anchor(anchor: dict, tsa_response: dict | None = None) -> Path: def load_anchors() -> list[dict]: """Load all saved anchor records, newest first.""" - anchors_dir = CHAIN_DIR / "anchors" + anchors_dir = _paths.CHAIN_DIR / "anchors" if not anchors_dir.exists(): return [] diff --git a/src/soosef/federation/chain.py b/src/soosef/federation/chain.py index f530281..97c1ebf 100644 --- a/src/soosef/federation/chain.py +++ b/src/soosef/federation/chain.py @@ -48,6 +48,11 @@ CONTENT_TYPE_KEY_ROTATION = "soosef/key-rotation-v1" # includes the old pubkey fingerprint and cosigner fingerprints in metadata. CONTENT_TYPE_KEY_RECOVERY = "soosef/key-recovery-v1" +# Content type for delivery acknowledgments. When Party B receives an +# attestation bundle from Party A, B signs an acknowledgment that can be +# appended to A's chain, creating a two-way federation handshake. +CONTENT_TYPE_DELIVERY_ACK = "soosef/delivery-ack-v1" + def _now_us() -> int: """Current time as Unix microseconds.""" @@ -488,6 +493,41 @@ class ChainStore: metadata=metadata, ) + def append_delivery_ack( + self, + private_key: Ed25519PrivateKey, + bundle_hash: str, + sender_fingerprint: str, + records_received: int, + ) -> AttestationChainRecord: + """Record that an attestation bundle was received from a peer. + + Creates a two-way federation handshake: the sender can later + import this acknowledgment into their chain as proof that the + recipient received the material. + + Args: + private_key: Receiver's signing key. + bundle_hash: SHA-256 of the imported bundle file. + sender_fingerprint: Fingerprint of the sending organization. + records_received: Number of attestation records imported. + + Returns: + The acknowledgment record. + """ + content_hash = hashlib.sha256(bundle_hash.encode()).digest() + + return self.append( + content_hash=content_hash, + content_type=CONTENT_TYPE_DELIVERY_ACK, + private_key=private_key, + metadata={ + "bundle_hash": bundle_hash, + "sender_fingerprint": sender_fingerprint, + "records_received": records_received, + }, + ) + def verify_chain(self, start: int = 0, end: int | None = None) -> bool: """Verify hash chain integrity and signatures over a range. diff --git a/src/soosef/federation/exchange.py b/src/soosef/federation/exchange.py index d89989a..4d6f922 100644 --- a/src/soosef/federation/exchange.py +++ b/src/soosef/federation/exchange.py @@ -106,6 +106,8 @@ def import_attestation_bundle( bundle_path: Path, storage, trusted_fingerprints: set[str] | None = None, + chain_store=None, + private_key=None, ) -> dict: """Import attestation records from a federation bundle. @@ -174,10 +176,32 @@ def import_attestation_bundle( except Exception: rejected += 1 + # Generate delivery acknowledgment if chain store and private key provided + ack_record = None + if chain_store is not None and private_key is not None and imported > 0: + try: + bundle_hash = hashlib.sha256(bundle_path.read_bytes()).hexdigest() + # Determine sender fingerprint from first imported record + sender_fp = "" + for rec_data in records: + fp = rec_data.get("attestor_fingerprint", "") + if fp: + sender_fp = fp + break + ack_record = chain_store.append_delivery_ack( + private_key=private_key, + bundle_hash=bundle_hash, + sender_fingerprint=sender_fp, + records_received=imported, + ) + except Exception: + pass + return { "imported": imported, "skipped": skipped, "rejected": rejected, "total": len(records), "investigation": bundle.get("investigation"), + "ack_chain_index": ack_record.chain_index if ack_record else None, } diff --git a/src/soosef/fieldkit/deadman.py b/src/soosef/fieldkit/deadman.py index e269bce..0a68d6b 100644 --- a/src/soosef/fieldkit/deadman.py +++ b/src/soosef/fieldkit/deadman.py @@ -12,7 +12,7 @@ import logging from datetime import UTC, datetime, timedelta from pathlib import Path -from soosef.paths import DEADMAN_STATE +import soosef.paths as _paths logger = logging.getLogger(__name__) @@ -21,7 +21,7 @@ class DeadmanSwitch: """Timer-based auto-purge if operator fails to check in.""" def __init__(self, state_file: Path | None = None): - self._state_file = state_file or DEADMAN_STATE + self._state_file = state_file or _paths.DEADMAN_STATE def _load_state(self) -> dict: if self._state_file.exists(): diff --git a/src/soosef/fieldkit/tamper.py b/src/soosef/fieldkit/tamper.py index 37a1802..069d117 100644 --- a/src/soosef/fieldkit/tamper.py +++ b/src/soosef/fieldkit/tamper.py @@ -12,7 +12,7 @@ import json import logging from pathlib import Path -from soosef.paths import TAMPER_BASELINE +import soosef.paths as _paths logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ def _hash_file(path: Path) -> str: def create_baseline(watch_paths: list[Path], baseline_file: Path | None = None) -> dict: """Create a SHA-256 baseline of watched files.""" - baseline_file = baseline_file or TAMPER_BASELINE + baseline_file = baseline_file or _paths.TAMPER_BASELINE baseline = {} for path in watch_paths: @@ -49,7 +49,7 @@ def create_baseline(watch_paths: list[Path], baseline_file: Path | None = None) def check_baseline(baseline_file: Path | None = None) -> list[dict]: """Check current files against baseline. Returns list of violations.""" - baseline_file = baseline_file or TAMPER_BASELINE + baseline_file = baseline_file or _paths.TAMPER_BASELINE if not baseline_file.exists(): return [{"type": "error", "message": "No baseline exists"}] diff --git a/src/soosef/fieldkit/usb_monitor.py b/src/soosef/fieldkit/usb_monitor.py index 7bc8e12..dc78999 100644 --- a/src/soosef/fieldkit/usb_monitor.py +++ b/src/soosef/fieldkit/usb_monitor.py @@ -13,7 +13,7 @@ from collections.abc import Callable from pathlib import Path from typing import Any -from soosef.paths import USB_WHITELIST +import soosef.paths as _paths logger = logging.getLogger(__name__) @@ -27,7 +27,7 @@ except ImportError: def load_whitelist(path: Path | None = None) -> set[str]: """Load USB whitelist as set of 'vendor_id:product_id' strings.""" - wl_path = path or USB_WHITELIST + wl_path = path or _paths.USB_WHITELIST if wl_path.exists(): with open(wl_path) as f: data = json.load(f) @@ -37,7 +37,7 @@ def load_whitelist(path: Path | None = None) -> set[str]: def save_whitelist(devices: set[str], path: Path | None = None) -> None: """Save USB whitelist.""" - wl_path = path or USB_WHITELIST + wl_path = path or _paths.USB_WHITELIST wl_path.parent.mkdir(parents=True, exist_ok=True) with open(wl_path, "w") as f: json.dump({"allowed": sorted(devices)}, f, indent=2) diff --git a/src/soosef/verisoo/models.py b/src/soosef/verisoo/models.py index 0fc74e7..8f3f51a 100644 --- a/src/soosef/verisoo/models.py +++ b/src/soosef/verisoo/models.py @@ -190,21 +190,21 @@ class CaptureMetadata: @dataclass(frozen=True) class ImageHashes: """ - Multi-algorithm image fingerprinting for robust matching. + Multi-algorithm content fingerprinting. - Designed to survive social media mangling: - - JPEG recompression - - Resizing - - Format conversion - - Cropping - - Color adjustments + For images: perceptual hashes (phash, dhash) enable fuzzy matching + that survives compression, resizing, format conversion, and cropping. + + For non-image files (CSV, documents, sensor data): only sha256 is + populated; perceptual fields are empty strings. This enables the + same attestation pipeline for any file type. Match if ANY hash is within threshold - defense in depth. """ - sha256: str # Exact match only - rarely survives sharing - phash: str # DCT-based perceptual hash - survives compression - dhash: str # Difference hash - survives resizing + sha256: str # Exact match — works for all file types + phash: str = "" # DCT-based perceptual hash — images only + dhash: str = "" # Difference hash — images only ahash: str | None = None # Average hash - very tolerant colorhash: str | None = None # Color distribution - survives crops crop_resistant: str | None = None # Center-region hash @@ -263,13 +263,25 @@ class ImageHashes: def from_dict(cls, d: dict[str, Any]) -> ImageHashes: return cls( sha256=d["sha256"], - phash=d["phash"], - dhash=d["dhash"], + phash=d.get("phash", ""), + dhash=d.get("dhash", ""), ahash=d.get("ahash"), colorhash=d.get("colorhash"), crop_resistant=d.get("crop_resistant"), ) + @property + def is_image(self) -> bool: + """True if perceptual hashes are populated (image content).""" + return bool(self.phash or self.dhash) + + @classmethod + def from_file(cls, file_data: bytes) -> ImageHashes: + """Create hashes for an arbitrary file (SHA-256 only, no perceptual).""" + import hashlib + + return cls(sha256=hashlib.sha256(file_data).hexdigest()) + @dataclass(frozen=True) class AttestationRecord: @@ -282,11 +294,12 @@ class AttestationRecord: Once in the log, it cannot be modified or deleted. """ - image_hashes: ImageHashes + image_hashes: ImageHashes # Named for backward compat; works for any file type signature: bytes attestor_fingerprint: str timestamp: datetime # When attestation was created metadata: dict[str, Any] = field(default_factory=dict) # CaptureMetadata.to_dict() + content_type: str = "image" # "image", "document", "data", "audio", "video" @property def record_id(self) -> str: diff --git a/tests/test_deadman_enforcement.py b/tests/test_deadman_enforcement.py index e6aae2f..7323bdf 100644 --- a/tests/test_deadman_enforcement.py +++ b/tests/test_deadman_enforcement.py @@ -64,7 +64,7 @@ def test_enforcement_loop_no_op_when_disarmed(tmp_path: Path, monkeypatch: pytes # Redirect the module-level DEADMAN_STATE constant so DeadmanSwitch() default is our tmp file state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) check_calls = [] @@ -94,7 +94,7 @@ def test_enforcement_loop_fires_when_overdue(tmp_path: Path, monkeypatch: pytest from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) last_checkin = datetime.now(UTC) - timedelta(hours=100) _write_deadman_state( @@ -124,7 +124,7 @@ def test_enforcement_loop_exits_after_firing(tmp_path: Path, monkeypatch: pytest from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) last_checkin = datetime.now(UTC) - timedelta(hours=100) _write_deadman_state(state_file, armed=True, last_checkin=last_checkin) @@ -149,7 +149,7 @@ def test_enforcement_loop_tolerates_exceptions(tmp_path: Path, monkeypatch: pyte from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) call_count = [0] @@ -212,7 +212,7 @@ def test_check_deadman_disarmed( # Point at an empty tmp dir so the real ~/.soosef/fieldkit/deadman.json isn't read state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) result = cli_runner.invoke(main, ["fieldkit", "check-deadman"]) assert result.exit_code == 0 @@ -227,7 +227,7 @@ def test_check_deadman_armed_ok( from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) last_checkin = datetime.now(UTC) - timedelta(hours=1) _write_deadman_state( @@ -251,7 +251,7 @@ def test_check_deadman_overdue_in_grace( from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) # Past 24h interval but within 26h total (grace=2) last_checkin = datetime.now(UTC) - timedelta(hours=25) @@ -277,7 +277,7 @@ def test_check_deadman_fires_when_expired( from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" - monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) + monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file})) last_checkin = datetime.now(UTC) - timedelta(hours=100) _write_deadman_state(