feat(F2): AES-256-CTR encryption module for recordings
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
0544f7218a
commit
f8d28cf78e
58
tests/unit/test_encryption.py
Normal file
58
tests/unit/test_encryption.py
Normal file
@ -0,0 +1,58 @@
|
||||
"""Tests for AES-256-CTR recording encryption."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from vigilar.storage.encryption import decrypt_stream, encrypt_file
|
||||
|
||||
|
||||
def test_encrypt_file_creates_vge(tmp_path):
|
||||
plain = tmp_path / "test.mp4"
|
||||
plain.write_bytes(b"fake mp4 content here for testing")
|
||||
key_hex = os.urandom(32).hex()
|
||||
vge_path = encrypt_file(str(plain), key_hex)
|
||||
assert vge_path.endswith(".vge")
|
||||
assert Path(vge_path).exists()
|
||||
assert not plain.exists()
|
||||
|
||||
|
||||
def test_encrypt_file_prepends_iv(tmp_path):
|
||||
plain = tmp_path / "test.mp4"
|
||||
plain.write_bytes(b"x" * 100)
|
||||
key_hex = os.urandom(32).hex()
|
||||
vge_path = encrypt_file(str(plain), key_hex)
|
||||
data = Path(vge_path).read_bytes()
|
||||
assert len(data) == 16 + 100
|
||||
|
||||
|
||||
def test_decrypt_stream_roundtrip(tmp_path):
|
||||
original = b"Hello, this is a recording file with some content." * 100
|
||||
plain = tmp_path / "test.mp4"
|
||||
plain.write_bytes(original)
|
||||
key_hex = os.urandom(32).hex()
|
||||
vge_path = encrypt_file(str(plain), key_hex)
|
||||
chunks = list(decrypt_stream(vge_path, key_hex))
|
||||
decrypted = b"".join(chunks)
|
||||
assert decrypted == original
|
||||
|
||||
|
||||
def test_decrypt_stream_yields_chunks(tmp_path):
|
||||
original = b"A" * 100_000
|
||||
plain = tmp_path / "test.mp4"
|
||||
plain.write_bytes(original)
|
||||
key_hex = os.urandom(32).hex()
|
||||
vge_path = encrypt_file(str(plain), key_hex)
|
||||
chunks = list(decrypt_stream(vge_path, key_hex))
|
||||
assert len(chunks) > 1
|
||||
assert b"".join(chunks) == original
|
||||
|
||||
|
||||
def test_encrypt_file_wrong_key_produces_garbage(tmp_path):
|
||||
original = b"secret recording content" * 50
|
||||
plain = tmp_path / "test.mp4"
|
||||
plain.write_bytes(original)
|
||||
key1 = os.urandom(32).hex()
|
||||
key2 = os.urandom(32).hex()
|
||||
vge_path = encrypt_file(str(plain), key1)
|
||||
decrypted = b"".join(decrypt_stream(vge_path, key2))
|
||||
assert decrypted != original
|
||||
43
vigilar/storage/encryption.py
Normal file
43
vigilar/storage/encryption.py
Normal file
@ -0,0 +1,43 @@
|
||||
"""AES-256-CTR encryption for recording files."""
|
||||
|
||||
import os
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
|
||||
CHUNK_SIZE = 64 * 1024 # 64KB
|
||||
|
||||
|
||||
def encrypt_file(plain_path: str, key_hex: str) -> str:
|
||||
key = bytes.fromhex(key_hex)
|
||||
iv = os.urandom(16)
|
||||
cipher = Cipher(algorithms.AES(key), modes.CTR(iv))
|
||||
encryptor = cipher.encryptor()
|
||||
vge_path = Path(plain_path).with_suffix(".vge")
|
||||
with open(plain_path, "rb") as src, open(vge_path, "wb") as dst:
|
||||
dst.write(iv)
|
||||
while True:
|
||||
chunk = src.read(CHUNK_SIZE)
|
||||
if not chunk:
|
||||
break
|
||||
dst.write(encryptor.update(chunk))
|
||||
dst.write(encryptor.finalize())
|
||||
Path(plain_path).unlink()
|
||||
return str(vge_path)
|
||||
|
||||
|
||||
def decrypt_stream(vge_path: str, key_hex: str) -> Generator[bytes, None, None]:
|
||||
key = bytes.fromhex(key_hex)
|
||||
with open(vge_path, "rb") as f:
|
||||
iv = f.read(16)
|
||||
cipher = Cipher(algorithms.AES(key), modes.CTR(iv))
|
||||
decryptor = cipher.decryptor()
|
||||
while True:
|
||||
chunk = f.read(CHUNK_SIZE)
|
||||
if not chunk:
|
||||
break
|
||||
yield decryptor.update(chunk)
|
||||
final = decryptor.finalize()
|
||||
if final:
|
||||
yield final
|
||||
Loading…
Reference in New Issue
Block a user