Fix 3 architectural bottlenecks blocking cross-domain adoption
Bottleneck 1: ImageHashes generalization
- phash and dhash now default to "" (optional), enabling attestation
of CSV datasets, sensor logs, documents, and any non-image file
- Added ImageHashes.from_file() for arbitrary file attestation
(SHA-256 only, no perceptual hashes)
- Added ImageHashes.is_image property to check if perceptual matching
is meaningful
- Added content_type field to AttestationRecord ("image", "document",
"data", "audio", "video") — backward compatible, defaults to "image"
- from_dict() now tolerates missing phash/dhash fields
Bottleneck 2: Lazy path resolution
- Converted 5 modules from eager top-level path imports to lazy
access via `import soosef.paths as _paths`:
config.py, deadman.py, usb_monitor.py, tamper.py, anchors.py
- Paths now resolve at use-time, not import-time, so --data-dir
and SOOSEF_DATA_DIR overrides propagate correctly to all modules
- Enables portable mode (run entirely from USB stick)
- Updated deadman enforcement tests for new path access pattern
Bottleneck 3: Delivery acknowledgment chain records
- New CONTENT_TYPE_DELIVERY_ACK = "soosef/delivery-ack-v1"
- ChainStore.append_delivery_ack() records bundle receipt with
sender fingerprint and record count
- import_attestation_bundle() auto-generates ack when chain store
and private key are provided
- Enables two-way federation handshakes (art provenance, legal
chain of custody, multi-org evidence exchange)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f557cac45a
commit
fef552b9c1
@ -9,7 +9,7 @@ import json
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from soosef.paths import CONFIG_FILE
|
import soosef.paths as _paths
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -55,7 +55,7 @@ class SoosefConfig:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def load(cls, path: Path | None = None) -> "SoosefConfig":
|
def load(cls, path: Path | None = None) -> "SoosefConfig":
|
||||||
"""Load config from JSON file, falling back to defaults."""
|
"""Load config from JSON file, falling back to defaults."""
|
||||||
config_path = path or CONFIG_FILE
|
config_path = path or _paths.CONFIG_FILE
|
||||||
if config_path.exists():
|
if config_path.exists():
|
||||||
with open(config_path) as f:
|
with open(config_path) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
@ -64,7 +64,7 @@ class SoosefConfig:
|
|||||||
|
|
||||||
def save(self, path: Path | None = None) -> None:
|
def save(self, path: Path | None = None) -> None:
|
||||||
"""Persist config to JSON file."""
|
"""Persist config to JSON file."""
|
||||||
config_path = path or CONFIG_FILE
|
config_path = path or _paths.CONFIG_FILE
|
||||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
|
|
||||||
|
|||||||
@ -19,7 +19,7 @@ import json
|
|||||||
from datetime import UTC, datetime
|
from datetime import UTC, datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from soosef.paths import CHAIN_DIR
|
import soosef.paths as _paths
|
||||||
|
|
||||||
|
|
||||||
def get_chain_head_anchor() -> dict:
|
def get_chain_head_anchor() -> dict:
|
||||||
@ -29,7 +29,7 @@ def get_chain_head_anchor() -> dict:
|
|||||||
"""
|
"""
|
||||||
from soosef.federation.chain import ChainStore
|
from soosef.federation.chain import ChainStore
|
||||||
|
|
||||||
store = ChainStore(CHAIN_DIR)
|
store = ChainStore(_paths.CHAIN_DIR)
|
||||||
state = store.state()
|
state = store.state()
|
||||||
|
|
||||||
if state is None:
|
if state is None:
|
||||||
@ -102,7 +102,7 @@ def save_anchor(anchor: dict, tsa_response: dict | None = None) -> Path:
|
|||||||
|
|
||||||
Returns the path to the saved anchor file.
|
Returns the path to the saved anchor file.
|
||||||
"""
|
"""
|
||||||
anchors_dir = CHAIN_DIR / "anchors"
|
anchors_dir = _paths.CHAIN_DIR / "anchors"
|
||||||
anchors_dir.mkdir(parents=True, exist_ok=True)
|
anchors_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
record = {
|
record = {
|
||||||
@ -119,7 +119,7 @@ def save_anchor(anchor: dict, tsa_response: dict | None = None) -> Path:
|
|||||||
|
|
||||||
def load_anchors() -> list[dict]:
|
def load_anchors() -> list[dict]:
|
||||||
"""Load all saved anchor records, newest first."""
|
"""Load all saved anchor records, newest first."""
|
||||||
anchors_dir = CHAIN_DIR / "anchors"
|
anchors_dir = _paths.CHAIN_DIR / "anchors"
|
||||||
if not anchors_dir.exists():
|
if not anchors_dir.exists():
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|||||||
@ -48,6 +48,11 @@ CONTENT_TYPE_KEY_ROTATION = "soosef/key-rotation-v1"
|
|||||||
# includes the old pubkey fingerprint and cosigner fingerprints in metadata.
|
# includes the old pubkey fingerprint and cosigner fingerprints in metadata.
|
||||||
CONTENT_TYPE_KEY_RECOVERY = "soosef/key-recovery-v1"
|
CONTENT_TYPE_KEY_RECOVERY = "soosef/key-recovery-v1"
|
||||||
|
|
||||||
|
# Content type for delivery acknowledgments. When Party B receives an
|
||||||
|
# attestation bundle from Party A, B signs an acknowledgment that can be
|
||||||
|
# appended to A's chain, creating a two-way federation handshake.
|
||||||
|
CONTENT_TYPE_DELIVERY_ACK = "soosef/delivery-ack-v1"
|
||||||
|
|
||||||
|
|
||||||
def _now_us() -> int:
|
def _now_us() -> int:
|
||||||
"""Current time as Unix microseconds."""
|
"""Current time as Unix microseconds."""
|
||||||
@ -488,6 +493,41 @@ class ChainStore:
|
|||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def append_delivery_ack(
|
||||||
|
self,
|
||||||
|
private_key: Ed25519PrivateKey,
|
||||||
|
bundle_hash: str,
|
||||||
|
sender_fingerprint: str,
|
||||||
|
records_received: int,
|
||||||
|
) -> AttestationChainRecord:
|
||||||
|
"""Record that an attestation bundle was received from a peer.
|
||||||
|
|
||||||
|
Creates a two-way federation handshake: the sender can later
|
||||||
|
import this acknowledgment into their chain as proof that the
|
||||||
|
recipient received the material.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
private_key: Receiver's signing key.
|
||||||
|
bundle_hash: SHA-256 of the imported bundle file.
|
||||||
|
sender_fingerprint: Fingerprint of the sending organization.
|
||||||
|
records_received: Number of attestation records imported.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The acknowledgment record.
|
||||||
|
"""
|
||||||
|
content_hash = hashlib.sha256(bundle_hash.encode()).digest()
|
||||||
|
|
||||||
|
return self.append(
|
||||||
|
content_hash=content_hash,
|
||||||
|
content_type=CONTENT_TYPE_DELIVERY_ACK,
|
||||||
|
private_key=private_key,
|
||||||
|
metadata={
|
||||||
|
"bundle_hash": bundle_hash,
|
||||||
|
"sender_fingerprint": sender_fingerprint,
|
||||||
|
"records_received": records_received,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
def verify_chain(self, start: int = 0, end: int | None = None) -> bool:
|
def verify_chain(self, start: int = 0, end: int | None = None) -> bool:
|
||||||
"""Verify hash chain integrity and signatures over a range.
|
"""Verify hash chain integrity and signatures over a range.
|
||||||
|
|
||||||
|
|||||||
@ -106,6 +106,8 @@ def import_attestation_bundle(
|
|||||||
bundle_path: Path,
|
bundle_path: Path,
|
||||||
storage,
|
storage,
|
||||||
trusted_fingerprints: set[str] | None = None,
|
trusted_fingerprints: set[str] | None = None,
|
||||||
|
chain_store=None,
|
||||||
|
private_key=None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Import attestation records from a federation bundle.
|
"""Import attestation records from a federation bundle.
|
||||||
|
|
||||||
@ -174,10 +176,32 @@ def import_attestation_bundle(
|
|||||||
except Exception:
|
except Exception:
|
||||||
rejected += 1
|
rejected += 1
|
||||||
|
|
||||||
|
# Generate delivery acknowledgment if chain store and private key provided
|
||||||
|
ack_record = None
|
||||||
|
if chain_store is not None and private_key is not None and imported > 0:
|
||||||
|
try:
|
||||||
|
bundle_hash = hashlib.sha256(bundle_path.read_bytes()).hexdigest()
|
||||||
|
# Determine sender fingerprint from first imported record
|
||||||
|
sender_fp = ""
|
||||||
|
for rec_data in records:
|
||||||
|
fp = rec_data.get("attestor_fingerprint", "")
|
||||||
|
if fp:
|
||||||
|
sender_fp = fp
|
||||||
|
break
|
||||||
|
ack_record = chain_store.append_delivery_ack(
|
||||||
|
private_key=private_key,
|
||||||
|
bundle_hash=bundle_hash,
|
||||||
|
sender_fingerprint=sender_fp,
|
||||||
|
records_received=imported,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"imported": imported,
|
"imported": imported,
|
||||||
"skipped": skipped,
|
"skipped": skipped,
|
||||||
"rejected": rejected,
|
"rejected": rejected,
|
||||||
"total": len(records),
|
"total": len(records),
|
||||||
"investigation": bundle.get("investigation"),
|
"investigation": bundle.get("investigation"),
|
||||||
|
"ack_chain_index": ack_record.chain_index if ack_record else None,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import logging
|
|||||||
from datetime import UTC, datetime, timedelta
|
from datetime import UTC, datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from soosef.paths import DEADMAN_STATE
|
import soosef.paths as _paths
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -21,7 +21,7 @@ class DeadmanSwitch:
|
|||||||
"""Timer-based auto-purge if operator fails to check in."""
|
"""Timer-based auto-purge if operator fails to check in."""
|
||||||
|
|
||||||
def __init__(self, state_file: Path | None = None):
|
def __init__(self, state_file: Path | None = None):
|
||||||
self._state_file = state_file or DEADMAN_STATE
|
self._state_file = state_file or _paths.DEADMAN_STATE
|
||||||
|
|
||||||
def _load_state(self) -> dict:
|
def _load_state(self) -> dict:
|
||||||
if self._state_file.exists():
|
if self._state_file.exists():
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from soosef.paths import TAMPER_BASELINE
|
import soosef.paths as _paths
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -28,7 +28,7 @@ def _hash_file(path: Path) -> str:
|
|||||||
|
|
||||||
def create_baseline(watch_paths: list[Path], baseline_file: Path | None = None) -> dict:
|
def create_baseline(watch_paths: list[Path], baseline_file: Path | None = None) -> dict:
|
||||||
"""Create a SHA-256 baseline of watched files."""
|
"""Create a SHA-256 baseline of watched files."""
|
||||||
baseline_file = baseline_file or TAMPER_BASELINE
|
baseline_file = baseline_file or _paths.TAMPER_BASELINE
|
||||||
baseline = {}
|
baseline = {}
|
||||||
|
|
||||||
for path in watch_paths:
|
for path in watch_paths:
|
||||||
@ -49,7 +49,7 @@ def create_baseline(watch_paths: list[Path], baseline_file: Path | None = None)
|
|||||||
|
|
||||||
def check_baseline(baseline_file: Path | None = None) -> list[dict]:
|
def check_baseline(baseline_file: Path | None = None) -> list[dict]:
|
||||||
"""Check current files against baseline. Returns list of violations."""
|
"""Check current files against baseline. Returns list of violations."""
|
||||||
baseline_file = baseline_file or TAMPER_BASELINE
|
baseline_file = baseline_file or _paths.TAMPER_BASELINE
|
||||||
if not baseline_file.exists():
|
if not baseline_file.exists():
|
||||||
return [{"type": "error", "message": "No baseline exists"}]
|
return [{"type": "error", "message": "No baseline exists"}]
|
||||||
|
|
||||||
|
|||||||
@ -13,7 +13,7 @@ from collections.abc import Callable
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from soosef.paths import USB_WHITELIST
|
import soosef.paths as _paths
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -27,7 +27,7 @@ except ImportError:
|
|||||||
|
|
||||||
def load_whitelist(path: Path | None = None) -> set[str]:
|
def load_whitelist(path: Path | None = None) -> set[str]:
|
||||||
"""Load USB whitelist as set of 'vendor_id:product_id' strings."""
|
"""Load USB whitelist as set of 'vendor_id:product_id' strings."""
|
||||||
wl_path = path or USB_WHITELIST
|
wl_path = path or _paths.USB_WHITELIST
|
||||||
if wl_path.exists():
|
if wl_path.exists():
|
||||||
with open(wl_path) as f:
|
with open(wl_path) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
@ -37,7 +37,7 @@ def load_whitelist(path: Path | None = None) -> set[str]:
|
|||||||
|
|
||||||
def save_whitelist(devices: set[str], path: Path | None = None) -> None:
|
def save_whitelist(devices: set[str], path: Path | None = None) -> None:
|
||||||
"""Save USB whitelist."""
|
"""Save USB whitelist."""
|
||||||
wl_path = path or USB_WHITELIST
|
wl_path = path or _paths.USB_WHITELIST
|
||||||
wl_path.parent.mkdir(parents=True, exist_ok=True)
|
wl_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
with open(wl_path, "w") as f:
|
with open(wl_path, "w") as f:
|
||||||
json.dump({"allowed": sorted(devices)}, f, indent=2)
|
json.dump({"allowed": sorted(devices)}, f, indent=2)
|
||||||
|
|||||||
@ -190,21 +190,21 @@ class CaptureMetadata:
|
|||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class ImageHashes:
|
class ImageHashes:
|
||||||
"""
|
"""
|
||||||
Multi-algorithm image fingerprinting for robust matching.
|
Multi-algorithm content fingerprinting.
|
||||||
|
|
||||||
Designed to survive social media mangling:
|
For images: perceptual hashes (phash, dhash) enable fuzzy matching
|
||||||
- JPEG recompression
|
that survives compression, resizing, format conversion, and cropping.
|
||||||
- Resizing
|
|
||||||
- Format conversion
|
For non-image files (CSV, documents, sensor data): only sha256 is
|
||||||
- Cropping
|
populated; perceptual fields are empty strings. This enables the
|
||||||
- Color adjustments
|
same attestation pipeline for any file type.
|
||||||
|
|
||||||
Match if ANY hash is within threshold - defense in depth.
|
Match if ANY hash is within threshold - defense in depth.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
sha256: str # Exact match only - rarely survives sharing
|
sha256: str # Exact match — works for all file types
|
||||||
phash: str # DCT-based perceptual hash - survives compression
|
phash: str = "" # DCT-based perceptual hash — images only
|
||||||
dhash: str # Difference hash - survives resizing
|
dhash: str = "" # Difference hash — images only
|
||||||
ahash: str | None = None # Average hash - very tolerant
|
ahash: str | None = None # Average hash - very tolerant
|
||||||
colorhash: str | None = None # Color distribution - survives crops
|
colorhash: str | None = None # Color distribution - survives crops
|
||||||
crop_resistant: str | None = None # Center-region hash
|
crop_resistant: str | None = None # Center-region hash
|
||||||
@ -263,13 +263,25 @@ class ImageHashes:
|
|||||||
def from_dict(cls, d: dict[str, Any]) -> ImageHashes:
|
def from_dict(cls, d: dict[str, Any]) -> ImageHashes:
|
||||||
return cls(
|
return cls(
|
||||||
sha256=d["sha256"],
|
sha256=d["sha256"],
|
||||||
phash=d["phash"],
|
phash=d.get("phash", ""),
|
||||||
dhash=d["dhash"],
|
dhash=d.get("dhash", ""),
|
||||||
ahash=d.get("ahash"),
|
ahash=d.get("ahash"),
|
||||||
colorhash=d.get("colorhash"),
|
colorhash=d.get("colorhash"),
|
||||||
crop_resistant=d.get("crop_resistant"),
|
crop_resistant=d.get("crop_resistant"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_image(self) -> bool:
|
||||||
|
"""True if perceptual hashes are populated (image content)."""
|
||||||
|
return bool(self.phash or self.dhash)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_file(cls, file_data: bytes) -> ImageHashes:
|
||||||
|
"""Create hashes for an arbitrary file (SHA-256 only, no perceptual)."""
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
return cls(sha256=hashlib.sha256(file_data).hexdigest())
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class AttestationRecord:
|
class AttestationRecord:
|
||||||
@ -282,11 +294,12 @@ class AttestationRecord:
|
|||||||
Once in the log, it cannot be modified or deleted.
|
Once in the log, it cannot be modified or deleted.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
image_hashes: ImageHashes
|
image_hashes: ImageHashes # Named for backward compat; works for any file type
|
||||||
signature: bytes
|
signature: bytes
|
||||||
attestor_fingerprint: str
|
attestor_fingerprint: str
|
||||||
timestamp: datetime # When attestation was created
|
timestamp: datetime # When attestation was created
|
||||||
metadata: dict[str, Any] = field(default_factory=dict) # CaptureMetadata.to_dict()
|
metadata: dict[str, Any] = field(default_factory=dict) # CaptureMetadata.to_dict()
|
||||||
|
content_type: str = "image" # "image", "document", "data", "audio", "video"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def record_id(self) -> str:
|
def record_id(self) -> str:
|
||||||
|
|||||||
@ -64,7 +64,7 @@ def test_enforcement_loop_no_op_when_disarmed(tmp_path: Path, monkeypatch: pytes
|
|||||||
|
|
||||||
# Redirect the module-level DEADMAN_STATE constant so DeadmanSwitch() default is our tmp file
|
# Redirect the module-level DEADMAN_STATE constant so DeadmanSwitch() default is our tmp file
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
check_calls = []
|
check_calls = []
|
||||||
|
|
||||||
@ -94,7 +94,7 @@ def test_enforcement_loop_fires_when_overdue(tmp_path: Path, monkeypatch: pytest
|
|||||||
from soosef.fieldkit import deadman as deadman_mod
|
from soosef.fieldkit import deadman as deadman_mod
|
||||||
|
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
last_checkin = datetime.now(UTC) - timedelta(hours=100)
|
last_checkin = datetime.now(UTC) - timedelta(hours=100)
|
||||||
_write_deadman_state(
|
_write_deadman_state(
|
||||||
@ -124,7 +124,7 @@ def test_enforcement_loop_exits_after_firing(tmp_path: Path, monkeypatch: pytest
|
|||||||
from soosef.fieldkit import deadman as deadman_mod
|
from soosef.fieldkit import deadman as deadman_mod
|
||||||
|
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
last_checkin = datetime.now(UTC) - timedelta(hours=100)
|
last_checkin = datetime.now(UTC) - timedelta(hours=100)
|
||||||
_write_deadman_state(state_file, armed=True, last_checkin=last_checkin)
|
_write_deadman_state(state_file, armed=True, last_checkin=last_checkin)
|
||||||
@ -149,7 +149,7 @@ def test_enforcement_loop_tolerates_exceptions(tmp_path: Path, monkeypatch: pyte
|
|||||||
from soosef.fieldkit import deadman as deadman_mod
|
from soosef.fieldkit import deadman as deadman_mod
|
||||||
|
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
call_count = [0]
|
call_count = [0]
|
||||||
|
|
||||||
@ -212,7 +212,7 @@ def test_check_deadman_disarmed(
|
|||||||
|
|
||||||
# Point at an empty tmp dir so the real ~/.soosef/fieldkit/deadman.json isn't read
|
# Point at an empty tmp dir so the real ~/.soosef/fieldkit/deadman.json isn't read
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
result = cli_runner.invoke(main, ["fieldkit", "check-deadman"])
|
result = cli_runner.invoke(main, ["fieldkit", "check-deadman"])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
@ -227,7 +227,7 @@ def test_check_deadman_armed_ok(
|
|||||||
from soosef.fieldkit import deadman as deadman_mod
|
from soosef.fieldkit import deadman as deadman_mod
|
||||||
|
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
last_checkin = datetime.now(UTC) - timedelta(hours=1)
|
last_checkin = datetime.now(UTC) - timedelta(hours=1)
|
||||||
_write_deadman_state(
|
_write_deadman_state(
|
||||||
@ -251,7 +251,7 @@ def test_check_deadman_overdue_in_grace(
|
|||||||
from soosef.fieldkit import deadman as deadman_mod
|
from soosef.fieldkit import deadman as deadman_mod
|
||||||
|
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
# Past 24h interval but within 26h total (grace=2)
|
# Past 24h interval but within 26h total (grace=2)
|
||||||
last_checkin = datetime.now(UTC) - timedelta(hours=25)
|
last_checkin = datetime.now(UTC) - timedelta(hours=25)
|
||||||
@ -277,7 +277,7 @@ def test_check_deadman_fires_when_expired(
|
|||||||
from soosef.fieldkit import deadman as deadman_mod
|
from soosef.fieldkit import deadman as deadman_mod
|
||||||
|
|
||||||
state_file = tmp_path / "deadman.json"
|
state_file = tmp_path / "deadman.json"
|
||||||
monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)
|
monkeypatch.setattr(deadman_mod, "_paths", type("P", (), {"DEADMAN_STATE": state_file}))
|
||||||
|
|
||||||
last_checkin = datetime.now(UTC) - timedelta(hours=100)
|
last_checkin = datetime.now(UTC) - timedelta(hours=100)
|
||||||
_write_deadman_state(
|
_write_deadman_state(
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user