diff --git a/frontends/web/app.py b/frontends/web/app.py index 34289ee..4d97090 100644 --- a/frontends/web/app.py +++ b/frontends/web/app.py @@ -25,13 +25,9 @@ SooSeF-native features (attest, fieldkit, keys) are clean blueprints. """ import io -import mimetypes import os import secrets import sys -import threading -import time -from concurrent.futures import ThreadPoolExecutor from pathlib import Path from flask import ( @@ -45,11 +41,10 @@ from flask import ( session, url_for, ) -from PIL import Image import soosef from soosef.config import SoosefConfig -from soosef.paths import AUTH_DB, INSTANCE_DIR, SECRET_KEY_FILE, TEMP_DIR, ensure_dirs +from soosef.paths import INSTANCE_DIR, SECRET_KEY_FILE, TEMP_DIR, ensure_dirs # Suppress numpy/scipy warnings in subprocesses os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0" @@ -89,7 +84,8 @@ def create_app(config: SoosefConfig | None = None) -> Flask: # Add web dir to path so auth.py and support modules are importable sys.path.insert(0, str(web_dir)) - from auth import init_app as init_auth, is_authenticated, is_admin, get_username + from auth import get_username, is_admin, is_authenticated + from auth import init_app as init_auth init_auth(app) @@ -127,19 +123,17 @@ def create_app(config: SoosefConfig | None = None) -> Flask: # Stegasoo capabilities try: - from stegasoo import has_dct_support, HAS_AUDIO_SUPPORT - from stegasoo import get_channel_status + from stegasoo import HAS_AUDIO_SUPPORT, get_channel_status, has_dct_support from stegasoo.constants import ( - MAX_MESSAGE_CHARS, - MAX_FILE_PAYLOAD_SIZE, - MAX_UPLOAD_SIZE, - TEMP_FILE_EXPIRY_MINUTES, - MIN_PIN_LENGTH, - MAX_PIN_LENGTH, - MIN_PASSPHRASE_WORDS, - RECOMMENDED_PASSPHRASE_WORDS, DEFAULT_PASSPHRASE_WORDS, - __version__ as stegasoo_version, + MAX_FILE_PAYLOAD_SIZE, + MAX_MESSAGE_CHARS, + MAX_PIN_LENGTH, + MAX_UPLOAD_SIZE, + MIN_PASSPHRASE_WORDS, + MIN_PIN_LENGTH, + RECOMMENDED_PASSPHRASE_WORDS, + TEMP_FILE_EXPIRY_MINUTES, ) has_dct = has_dct_support() @@ -246,68 +240,30 @@ def _register_stegasoo_routes(app: Flask) -> None: The stegasoo templates are in templates/stego/ and extend our base.html. """ import temp_storage - from soosef.audit import log_action - from subprocess_stego import ( - SubprocessStego, - cleanup_progress_file, - generate_job_id, - get_progress_file_path, - read_progress, - ) - from auth import login_required, admin_required - - import stegasoo + from auth import admin_required, login_required from stegasoo import ( - HAS_AUDIO_SUPPORT, - CapacityError, - DecryptionError, - FilePayload, - InvalidHeaderError, - InvalidMagicBytesError, - ReedSolomonError, - StegasooError, export_rsa_key_pem, generate_credentials, - generate_filename, get_channel_status, - has_argon2, - has_dct_support, load_rsa_key, - validate_channel_key, - validate_file_payload, - validate_image, - validate_message, - validate_passphrase, - validate_pin, - validate_rsa_key, - validate_security_factors, ) from stegasoo.constants import ( DEFAULT_PASSPHRASE_WORDS, - MAX_FILE_PAYLOAD_SIZE, - MAX_FILE_SIZE, - MAX_MESSAGE_CHARS, MAX_PIN_LENGTH, - MAX_UPLOAD_SIZE, MIN_PASSPHRASE_WORDS, MIN_PIN_LENGTH, - RECOMMENDED_PASSPHRASE_WORDS, TEMP_FILE_EXPIRY, - TEMP_FILE_EXPIRY_MINUTES, - THUMBNAIL_QUALITY, - THUMBNAIL_SIZE, VALID_RSA_SIZES, - __version__, ) from stegasoo.qr_utils import ( can_fit_in_qr, - decompress_data, - detect_and_crop_qr, - extract_key_from_qr, generate_qr_code, - is_compressed, ) - from stegasoo.channel import resolve_channel_key + from subprocess_stego import ( + SubprocessStego, + ) + + from soosef.audit import log_action # Initialize subprocess wrapper subprocess_stego = SubprocessStego(timeout=180) @@ -315,36 +271,36 @@ def _register_stegasoo_routes(app: Flask) -> None: # ── Auth routes (setup, login, logout, account) ──────────────── from auth import ( - create_admin_user, - verify_user_password, - login_user as auth_login_user, - logout_user as auth_logout_user, - is_authenticated as auth_is_authenticated, - user_exists as auth_user_exists, - get_current_user, - get_recovery_key_hash, - has_recovery_key, - set_recovery_key_hash, - verify_and_reset_admin_password, + MAX_CHANNEL_KEYS, + MAX_USERS, + can_create_user, + can_save_channel_key, change_password, - get_all_users, + create_admin_user, create_user, delete_user, - get_user_by_id, - reset_user_password, generate_temp_password, - can_create_user, - get_non_admin_count, + get_all_users, + get_current_user, + get_recovery_key_hash, + get_user_by_id, get_user_channel_keys, - save_channel_key, - delete_channel_key, - can_save_channel_key, - update_channel_key_name, - update_channel_key_last_used, - get_channel_key_by_id, - clear_recovery_key, - MAX_USERS, - MAX_CHANNEL_KEYS, + has_recovery_key, + reset_user_password, + verify_and_reset_admin_password, + verify_user_password, + ) + from auth import ( + is_authenticated as auth_is_authenticated, + ) + from auth import ( + login_user as auth_login_user, + ) + from auth import ( + logout_user as auth_logout_user, + ) + from auth import ( + user_exists as auth_user_exists, ) @app.route("/login", methods=["GET", "POST"]) @@ -469,7 +425,7 @@ def _register_stegasoo_routes(app: Flask) -> None: temp_password = generate_temp_password() success, message = create_user(username, temp_password) log_action( - actor=get_username(), + actor=get_username(), # noqa: F821 action="user.create", target=f"user:{username}", outcome="success" if success else "failure", @@ -492,7 +448,7 @@ def _register_stegasoo_routes(app: Flask) -> None: target_name = target_user.username if target_user else str(user_id) success, message = delete_user(user_id, get_current_user().id) log_action( - actor=get_username(), + actor=get_username(), # noqa: F821 action="user.delete", target=f"user:{target_name}", outcome="success" if success else "failure", @@ -510,7 +466,7 @@ def _register_stegasoo_routes(app: Flask) -> None: target_user = get_user_by_id(user_id) target_name = target_user.username if target_user else str(user_id) log_action( - actor=get_username(), + actor=get_username(), # noqa: F821 action="user.password_reset", target=f"user:{target_name}", outcome="success" if success else "failure", diff --git a/frontends/web/blueprints/attest.py b/frontends/web/blueprints/attest.py index d229214..c3d35f0 100644 --- a/frontends/web/blueprints/attest.py +++ b/frontends/web/blueprints/attest.py @@ -13,9 +13,8 @@ import json import socket from datetime import UTC, datetime -from flask import Blueprint, Response, flash, redirect, render_template, request, url_for - from auth import login_required +from flask import Blueprint, Response, flash, redirect, render_template, request, url_for bp = Blueprint("attest", __name__) @@ -23,6 +22,7 @@ bp = Blueprint("attest", __name__) def _get_storage(): """Get verisoo LocalStorage pointed at soosef's attestation directory.""" from verisoo.storage import LocalStorage + from soosef.paths import ATTESTATIONS_DIR return LocalStorage(base_path=ATTESTATIONS_DIR) @@ -31,6 +31,7 @@ def _get_storage(): def _get_private_key(): """Load the Ed25519 private key from soosef identity directory.""" from verisoo.crypto import load_private_key + from soosef.paths import IDENTITY_PRIVATE_KEY if not IDENTITY_PRIVATE_KEY.exists(): @@ -165,8 +166,8 @@ def attest(): ) # Save our own identity so we can look it up during verification - from verisoo.models import Identity from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + from verisoo.models import Identity pub_key = private_key.public_key() pub_bytes = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw) diff --git a/frontends/web/blueprints/fieldkit.py b/frontends/web/blueprints/fieldkit.py index c4a2ada..24603bd 100644 --- a/frontends/web/blueprints/fieldkit.py +++ b/frontends/web/blueprints/fieldkit.py @@ -2,9 +2,9 @@ Fieldkit blueprint — killswitch, dead man's switch, status dashboard. """ +from auth import admin_required, get_username, login_required from flask import Blueprint, flash, redirect, render_template, request, url_for -from auth import admin_required, get_username, login_required from soosef.audit import log_action bp = Blueprint("fieldkit", __name__, url_prefix="/fieldkit") diff --git a/frontends/web/blueprints/keys.py b/frontends/web/blueprints/keys.py index be2346e..f26e481 100644 --- a/frontends/web/blueprints/keys.py +++ b/frontends/web/blueprints/keys.py @@ -2,9 +2,9 @@ Key management blueprint — unified view of all key material. """ -from flask import Blueprint, flash, redirect, render_template, request, url_for - from auth import get_username, login_required +from flask import Blueprint, flash, redirect, render_template, url_for + from soosef.audit import log_action bp = Blueprint("keys", __name__, url_prefix="/keys") diff --git a/frontends/web/stego_routes.py b/frontends/web/stego_routes.py index 98250ea..d3d49cb 100644 --- a/frontends/web/stego_routes.py +++ b/frontends/web/stego_routes.py @@ -14,6 +14,7 @@ All routes use subprocess isolation via SubprocessStego for crash safety. import io import mimetypes +import os import secrets import threading import time @@ -38,7 +39,7 @@ def register_stego_routes(app, **deps): login_required = deps["login_required"] subprocess_stego = deps["subprocess_stego"] temp_storage = deps["temp_storage"] - _HAS_QRCODE_READ = deps.get("has_qrcode_read", False) + _has_qrcode_read = deps.get("has_qrcode_read", False) from stegasoo import ( HAS_AUDIO_SUPPORT, @@ -59,14 +60,12 @@ def register_stego_routes(app, **deps): validate_rsa_key, validate_security_factors, ) + from stegasoo.channel import resolve_channel_key from stegasoo.constants import ( - MAX_FILE_PAYLOAD_SIZE, - MAX_MESSAGE_CHARS, TEMP_FILE_EXPIRY, THUMBNAIL_QUALITY, THUMBNAIL_SIZE, ) - from stegasoo.channel import resolve_channel_key from stegasoo.qr_utils import ( decompress_data, extract_key_from_qr, @@ -361,7 +360,7 @@ def register_stego_routes(app, **deps): if is_async: return jsonify({"error": msg}), 400 flash(msg, "error") - return render_template("stego/encode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/encode.html", has_qrcode_read=_has_qrcode_read) try: # Get files @@ -457,7 +456,7 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() - elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ: + elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: @@ -652,7 +651,7 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() - elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ: + elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: @@ -819,7 +818,7 @@ def register_stego_routes(app, **deps): except Exception as e: return _error_response(f"Error: {e}") - return render_template("stego/encode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/encode.html", has_qrcode_read=_has_qrcode_read) # ============================================================================ # ENCODE PROGRESS ENDPOINTS (v4.1.2) @@ -1134,25 +1133,25 @@ def register_stego_routes(app, **deps): if not HAS_AUDIO_SUPPORT: flash("Audio steganography is not available.", "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if not ref_photo or not stego_image: flash("Both reference photo and stego audio are required", "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if not allowed_image(ref_photo.filename): flash("Reference must be an image", "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if not allowed_audio(stego_image.filename): flash("Invalid audio format", "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) passphrase = request.form.get("passphrase", "") @@ -1168,7 +1167,7 @@ def register_stego_routes(app, **deps): if not passphrase: flash("Passphrase is required", "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) ref_data = ref_photo.read() @@ -1187,7 +1186,7 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() - elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ: + elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: @@ -1196,14 +1195,14 @@ def register_stego_routes(app, **deps): else: flash("Could not extract RSA key from QR code image.", "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: flash(result.error_message, "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if pin: @@ -1211,7 +1210,7 @@ def register_stego_routes(app, **deps): if not result.is_valid: flash(result.error_message, "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) key_password = ( @@ -1223,7 +1222,7 @@ def register_stego_routes(app, **deps): if not result.is_valid: flash(result.error_message, "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) is_async = ( @@ -1274,7 +1273,7 @@ def register_stego_routes(app, **deps): else: flash(error_msg, "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if decode_result.is_file: @@ -1296,19 +1295,19 @@ def register_stego_routes(app, **deps): filename=filename, file_size=format_size(len(decode_result.file_data)), mime_type=decode_result.mime_type, - has_qrcode_read=_HAS_QRCODE_READ, + has_qrcode_read=_has_qrcode_read, ) else: return render_template( "decode.html", decoded_message=decode_result.message, - has_qrcode_read=_HAS_QRCODE_READ, + has_qrcode_read=_has_qrcode_read, ) # ========== IMAGE DECODE PATH (original) ========== if not ref_photo or not stego_image: flash("Both reference photo and stego image are required", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # Get form data - v3.2.0: renamed from day_phrase to passphrase passphrase = request.form.get("passphrase", "") # v3.2.0: Renamed @@ -1326,14 +1325,14 @@ def register_stego_routes(app, **deps): # Check DCT availability if embed_mode == "dct" and not has_dct_support(): flash("DCT mode requires scipy. Install with: pip install scipy", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # v3.2.0: Removed date handling (no stego_date needed) # v3.2.0: Renamed from day_phrase if not passphrase: flash("Passphrase is required", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # Read files ref_data = ref_photo.read() @@ -1353,7 +1352,7 @@ def register_stego_routes(app, **deps): rsa_key_from_qr = True elif rsa_key_file and rsa_key_file.filename: rsa_key_data = rsa_key_file.read() - elif rsa_key_qr and rsa_key_qr.filename and _HAS_QRCODE_READ: + elif rsa_key_qr and rsa_key_qr.filename and _has_qrcode_read: qr_image_data = rsa_key_qr.read() key_pem = extract_key_from_qr(qr_image_data) if key_pem: @@ -1362,14 +1361,14 @@ def register_stego_routes(app, **deps): else: flash("Could not extract RSA key from QR code image.", "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) # Validate security factors result = validate_security_factors(pin, rsa_key_data) if not result.is_valid: flash(result.error_message, "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) # Validate PIN if provided if pin: @@ -1377,7 +1376,7 @@ def register_stego_routes(app, **deps): if not result.is_valid: flash(result.error_message, "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) # Determine key password @@ -1389,7 +1388,7 @@ def register_stego_routes(app, **deps): if not result.is_valid: flash(result.error_message, "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) # Check for async mode (v4.1.5) @@ -1437,7 +1436,7 @@ def register_stego_routes(app, **deps): if "channel key" in error_msg.lower(): flash(error_msg, "error") return render_template( - "stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ + "stego/decode.html", has_qrcode_read=_has_qrcode_read ) if ( "decrypt" in error_msg.lower() @@ -1468,14 +1467,14 @@ def register_stego_routes(app, **deps): filename=filename, file_size=format_size(len(decode_result.file_data)), mime_type=decode_result.mime_type, - has_qrcode_read=_HAS_QRCODE_READ, + has_qrcode_read=_has_qrcode_read, ) else: # Text content return render_template( "decode.html", decoded_message=decode_result.message, - has_qrcode_read=_HAS_QRCODE_READ, + has_qrcode_read=_has_qrcode_read, ) except InvalidMagicBytesError: @@ -1483,33 +1482,33 @@ def register_stego_routes(app, **deps): "This doesn't appear to be a Stegasoo image. Try a different mode (LSB/DCT).", "warning", ) - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except ReedSolomonError: flash( "Image too corrupted to decode. It may have been re-saved or compressed.", "error", ) - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except InvalidHeaderError: flash( "Invalid or corrupted header. The image may have been modified.", "error", ) - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except DecryptionError: flash( "Wrong credentials. Double-check your reference photo, passphrase, PIN, and channel key.", "warning", ) - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except StegasooError as e: flash(str(e), "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) except Exception as e: flash(f"Error: {e}", "error") - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) - return render_template("stego/decode.html", has_qrcode_read=_HAS_QRCODE_READ) + return render_template("stego/decode.html", has_qrcode_read=_has_qrcode_read) @app.route("/decode/download/") @login_required @@ -1602,20 +1601,20 @@ def register_stego_routes(app, **deps): filename=job.get("filename"), file_size=format_size(job.get("file_size", 0)), mime_type=job.get("mime_type"), - has_qrcode_read=_HAS_QRCODE_READ, + has_qrcode_read=_has_qrcode_read, ) else: return render_template( "decode.html", decoded_message=job.get("message"), - has_qrcode_read=_HAS_QRCODE_READ, + has_qrcode_read=_has_qrcode_read, ) @app.route("/about") def about(): - from stegasoo.channel import get_channel_status - from stegasoo import has_argon2 from auth import get_current_user + from stegasoo import has_argon2 + from stegasoo.channel import get_channel_status channel_status = get_channel_status() current_user = get_current_user() @@ -1624,7 +1623,7 @@ def register_stego_routes(app, **deps): return render_template( "stego/about.html", has_argon2=has_argon2(), - has_qrcode_read=_HAS_QRCODE_READ, + has_qrcode_read=_has_qrcode_read, channel_configured=channel_status["configured"], channel_fingerprint=channel_status.get("fingerprint"), channel_source=channel_status.get("source"), diff --git a/src/soosef/audit.py b/src/soosef/audit.py index 9cd1971..941f525 100644 --- a/src/soosef/audit.py +++ b/src/soosef/audit.py @@ -36,7 +36,7 @@ from __future__ import annotations import json import logging import threading -from datetime import datetime, timezone +from datetime import UTC, datetime from pathlib import Path from typing import Literal @@ -82,7 +82,7 @@ def log_action( detail: Optional free-text annotation (avoid PII where possible). """ entry: dict[str, str] = { - "timestamp": datetime.now(tz=timezone.utc).isoformat(), + "timestamp": datetime.now(tz=UTC).isoformat(), "actor": actor, "action": action, "target": target, diff --git a/src/soosef/cli.py b/src/soosef/cli.py index 4e63d3f..48f8c69 100644 --- a/src/soosef/cli.py +++ b/src/soosef/cli.py @@ -48,9 +48,9 @@ def main(ctx, data_dir, json_output): @click.pass_context def init(ctx, no_identity, no_channel): """Initialize a new SooSeF instance — generate keys and create directory structure.""" - from soosef.paths import ensure_dirs - from soosef.keystore.manager import KeystoreManager from soosef.config import SoosefConfig + from soosef.keystore.manager import KeystoreManager + from soosef.paths import ensure_dirs click.echo("Initializing SooSeF...") ensure_dirs() @@ -103,7 +103,7 @@ def serve(host, port, no_https, debug): ssl_context = None if config.https_enabled: - from soosef.paths import SSL_CERT, SSL_KEY, CERTS_DIR + from soosef.paths import CERTS_DIR, SSL_CERT, SSL_KEY CERTS_DIR.mkdir(parents=True, exist_ok=True) if not SSL_CERT.exists(): @@ -173,11 +173,12 @@ def _start_deadman_thread(interval_seconds: int = 60) -> threading.Thread | None def _generate_self_signed_cert(cert_path: Path, key_path: Path) -> None: """Generate a self-signed certificate for development/local use.""" + from datetime import UTC, datetime, timedelta + from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID - from datetime import datetime, timedelta, UTC key = rsa.generate_private_key(public_exponent=65537, key_size=2048) subject = issuer = x509.Name( @@ -738,7 +739,7 @@ def show(): def export_keys(output, password): """Export all keys to an encrypted bundle file.""" from soosef.keystore.export import export_bundle - from soosef.paths import IDENTITY_DIR, CHANNEL_KEY_FILE + from soosef.paths import CHANNEL_KEY_FILE, IDENTITY_DIR export_bundle(IDENTITY_DIR, CHANNEL_KEY_FILE, output, password.encode()) click.echo(f"Key bundle exported to: {output}") @@ -750,7 +751,7 @@ def export_keys(output, password): def import_keys(bundle, password): """Import keys from an encrypted bundle file.""" from soosef.keystore.export import import_bundle - from soosef.paths import IDENTITY_DIR, CHANNEL_KEY_FILE + from soosef.paths import CHANNEL_KEY_FILE, IDENTITY_DIR imported = import_bundle(bundle, IDENTITY_DIR, CHANNEL_KEY_FILE, password.encode()) click.echo(f"Imported: {', '.join(imported.keys())}") @@ -836,9 +837,9 @@ def chain(): pass -@chain.command() +@chain.command("status") @click.pass_context -def status(ctx): +def chain_status(ctx): """Show chain status — head index, chain ID, record count.""" from soosef.federation.chain import ChainStore from soosef.paths import CHAIN_DIR @@ -899,10 +900,10 @@ def verify(): raise SystemExit(1) -@chain.command() +@chain.command("show") @click.argument("index", type=int) @click.pass_context -def show(ctx, index): +def chain_show(ctx, index): """Show a specific chain record by index.""" from soosef.exceptions import ChainError from soosef.federation.chain import ChainStore diff --git a/src/soosef/config.py b/src/soosef/config.py index 5dccc29..8448246 100644 --- a/src/soosef/config.py +++ b/src/soosef/config.py @@ -6,7 +6,7 @@ Config is intentionally simple — a flat JSON file with sensible defaults. """ import json -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path from soosef.paths import CONFIG_FILE diff --git a/src/soosef/fieldkit/__init__.py b/src/soosef/fieldkit/__init__.py index 264b34b..f0cd0a8 100644 --- a/src/soosef/fieldkit/__init__.py +++ b/src/soosef/fieldkit/__init__.py @@ -5,7 +5,7 @@ Killswitch, dead man's switch, tamper detection, USB monitoring. All features are opt-in and disabled by default. """ -from soosef.fieldkit.killswitch import PurgeScope, execute_purge from soosef.fieldkit.deadman import DeadmanSwitch +from soosef.fieldkit.killswitch import PurgeScope, execute_purge __all__ = ["PurgeScope", "execute_purge", "DeadmanSwitch"] diff --git a/src/soosef/fieldkit/geofence.py b/src/soosef/fieldkit/geofence.py index 778b226..2f341f7 100644 --- a/src/soosef/fieldkit/geofence.py +++ b/src/soosef/fieldkit/geofence.py @@ -28,14 +28,14 @@ class GeoCircle: def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Distance in meters between two lat/lon points.""" - R = 6371000 # Earth radius in meters + earth_r = 6371000 # Earth radius in meters phi1 = math.radians(lat1) phi2 = math.radians(lat2) dphi = math.radians(lat2 - lat1) dlambda = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2 - return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + return earth_r * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def is_inside(fence: GeoCircle, lat: float, lon: float) -> bool: diff --git a/src/soosef/fieldkit/killswitch.py b/src/soosef/fieldkit/killswitch.py index 9289921..90d4721 100644 --- a/src/soosef/fieldkit/killswitch.py +++ b/src/soosef/fieldkit/killswitch.py @@ -18,7 +18,6 @@ import subprocess from dataclasses import dataclass, field from pathlib import Path -from soosef.exceptions import KillswitchError import soosef.paths as paths logger = logging.getLogger(__name__) diff --git a/tests/conftest.py b/tests/conftest.py index 03b0b8c..29848dc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,6 @@ from __future__ import annotations -import os from pathlib import Path import pytest diff --git a/tests/test_chain.py b/tests/test_chain.py index bfc1e06..f3f7111 100644 --- a/tests/test_chain.py +++ b/tests/test_chain.py @@ -203,9 +203,7 @@ def test_metadata_in_chain(chain_dir: Path, private_key: Ed25519PrivateKey): """Metadata is preserved through append and retrieval.""" store = ChainStore(chain_dir) meta = {"caption": "evidence photo", "backfilled": True} - record = store.append( - hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta - ) + store.append(hashlib.sha256(b"test").digest(), "test/plain", private_key, metadata=meta) loaded = store.get(0) assert loaded.metadata == meta @@ -232,14 +230,16 @@ def test_verify_chain_detects_signer_change(chain_dir: Path): # Manually bypass normal append to inject a record signed by key2. # We need to build the record with correct prev_hash but wrong signer. - import struct import fcntl - from soosef.federation.serialization import serialize_record - from soosef.federation.models import AttestationChainRecord - from soosef.federation.entropy import collect_entropy_witnesses - from uuid_utils import uuid7 + import struct + from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + from uuid_utils import uuid7 + + from soosef.federation.entropy import collect_entropy_witnesses + from soosef.federation.models import AttestationChainRecord from soosef.federation.serialization import canonical_bytes as cb + from soosef.federation.serialization import serialize_record state = store.state() prev_hash = state.head_hash diff --git a/tests/test_chain_security.py b/tests/test_chain_security.py index 78a7ca0..1cf5ef1 100644 --- a/tests/test_chain_security.py +++ b/tests/test_chain_security.py @@ -15,8 +15,7 @@ import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from soosef.exceptions import ChainError -from soosef.federation.chain import ChainStore, MAX_RECORD_SIZE -from soosef.federation.serialization import compute_record_hash +from soosef.federation.chain import MAX_RECORD_SIZE, ChainStore def test_concurrent_append_no_fork(chain_dir: Path): diff --git a/tests/test_deadman_enforcement.py b/tests/test_deadman_enforcement.py index d2697ac..e6aae2f 100644 --- a/tests/test_deadman_enforcement.py +++ b/tests/test_deadman_enforcement.py @@ -182,10 +182,9 @@ def test_enforcement_loop_tolerates_exceptions(tmp_path: Path, monkeypatch: pyte def test_start_deadman_thread_is_daemon(monkeypatch: pytest.MonkeyPatch): """Thread must be a daemon so it dies with the process.""" - from soosef.cli import _start_deadman_thread - # Patch the loop to exit immediately so the thread doesn't hang in tests import soosef.cli as cli_mod + from soosef.cli import _start_deadman_thread monkeypatch.setattr(cli_mod, "_deadman_enforcement_loop", lambda interval_seconds: None) @@ -208,8 +207,8 @@ def test_check_deadman_disarmed( tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch ): """check-deadman exits 0 and prints helpful message when not armed.""" - from soosef.fieldkit import deadman as deadman_mod from soosef.cli import main + from soosef.fieldkit import deadman as deadman_mod # Point at an empty tmp dir so the real ~/.soosef/fieldkit/deadman.json isn't read state_file = tmp_path / "deadman.json" @@ -224,8 +223,8 @@ def test_check_deadman_armed_ok( tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch ): """check-deadman exits 0 when armed and check-in is current.""" - from soosef.fieldkit import deadman as deadman_mod from soosef.cli import main + from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) @@ -248,8 +247,8 @@ def test_check_deadman_overdue_in_grace( tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch ): """check-deadman exits 0 but prints OVERDUE warning when past interval but in grace.""" - from soosef.fieldkit import deadman as deadman_mod from soosef.cli import main + from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file) @@ -274,8 +273,8 @@ def test_check_deadman_fires_when_expired( tmp_path: Path, cli_runner: CliRunner, monkeypatch: pytest.MonkeyPatch ): """check-deadman exits 2 when the switch has fully expired.""" - from soosef.fieldkit import deadman as deadman_mod from soosef.cli import main + from soosef.fieldkit import deadman as deadman_mod state_file = tmp_path / "deadman.json" monkeypatch.setattr(deadman_mod, "DEADMAN_STATE", state_file)