From f8d28cf78e6fdc1a3124b45fa3d24d672095e3b8 Mon Sep 17 00:00:00 2001 From: "Aaron D. Lee" Date: Fri, 3 Apr 2026 17:39:40 -0400 Subject: [PATCH] feat(F2): AES-256-CTR encryption module for recordings Co-Authored-By: Claude Sonnet 4.6 --- tests/unit/test_encryption.py | 58 +++++++++++++++++++++++++++++++++++ vigilar/storage/encryption.py | 43 ++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 tests/unit/test_encryption.py create mode 100644 vigilar/storage/encryption.py diff --git a/tests/unit/test_encryption.py b/tests/unit/test_encryption.py new file mode 100644 index 0000000..d586958 --- /dev/null +++ b/tests/unit/test_encryption.py @@ -0,0 +1,58 @@ +"""Tests for AES-256-CTR recording encryption.""" + +import os +from pathlib import Path + +from vigilar.storage.encryption import decrypt_stream, encrypt_file + + +def test_encrypt_file_creates_vge(tmp_path): + plain = tmp_path / "test.mp4" + plain.write_bytes(b"fake mp4 content here for testing") + key_hex = os.urandom(32).hex() + vge_path = encrypt_file(str(plain), key_hex) + assert vge_path.endswith(".vge") + assert Path(vge_path).exists() + assert not plain.exists() + + +def test_encrypt_file_prepends_iv(tmp_path): + plain = tmp_path / "test.mp4" + plain.write_bytes(b"x" * 100) + key_hex = os.urandom(32).hex() + vge_path = encrypt_file(str(plain), key_hex) + data = Path(vge_path).read_bytes() + assert len(data) == 16 + 100 + + +def test_decrypt_stream_roundtrip(tmp_path): + original = b"Hello, this is a recording file with some content." * 100 + plain = tmp_path / "test.mp4" + plain.write_bytes(original) + key_hex = os.urandom(32).hex() + vge_path = encrypt_file(str(plain), key_hex) + chunks = list(decrypt_stream(vge_path, key_hex)) + decrypted = b"".join(chunks) + assert decrypted == original + + +def test_decrypt_stream_yields_chunks(tmp_path): + original = b"A" * 100_000 + plain = tmp_path / "test.mp4" + plain.write_bytes(original) + key_hex = os.urandom(32).hex() + vge_path = encrypt_file(str(plain), key_hex) + chunks = list(decrypt_stream(vge_path, key_hex)) + assert len(chunks) > 1 + assert b"".join(chunks) == original + + +def test_encrypt_file_wrong_key_produces_garbage(tmp_path): + original = b"secret recording content" * 50 + plain = tmp_path / "test.mp4" + plain.write_bytes(original) + key1 = os.urandom(32).hex() + key2 = os.urandom(32).hex() + vge_path = encrypt_file(str(plain), key1) + decrypted = b"".join(decrypt_stream(vge_path, key2)) + assert decrypted != original diff --git a/vigilar/storage/encryption.py b/vigilar/storage/encryption.py new file mode 100644 index 0000000..538ef48 --- /dev/null +++ b/vigilar/storage/encryption.py @@ -0,0 +1,43 @@ +"""AES-256-CTR encryption for recording files.""" + +import os +from collections.abc import Generator +from pathlib import Path + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +CHUNK_SIZE = 64 * 1024 # 64KB + + +def encrypt_file(plain_path: str, key_hex: str) -> str: + key = bytes.fromhex(key_hex) + iv = os.urandom(16) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv)) + encryptor = cipher.encryptor() + vge_path = Path(plain_path).with_suffix(".vge") + with open(plain_path, "rb") as src, open(vge_path, "wb") as dst: + dst.write(iv) + while True: + chunk = src.read(CHUNK_SIZE) + if not chunk: + break + dst.write(encryptor.update(chunk)) + dst.write(encryptor.finalize()) + Path(plain_path).unlink() + return str(vge_path) + + +def decrypt_stream(vge_path: str, key_hex: str) -> Generator[bytes, None, None]: + key = bytes.fromhex(key_hex) + with open(vge_path, "rb") as f: + iv = f.read(16) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv)) + decryptor = cipher.decryptor() + while True: + chunk = f.read(CHUNK_SIZE) + if not chunk: + break + yield decryptor.update(chunk) + final = decryptor.finalize() + if final: + yield final