8 Commits

Author SHA1 Message Date
Aaron D. Lee
cf247d207f v4.0.2: Add Web UI authentication and optional HTTPS
Some checks failed
Release / test (push) Failing after 43s
Release / publish (push) Has been skipped
Release / github-release (push) Has been skipped
- Add single-admin login with SQLite3 user storage
- First-run setup wizard for admin account creation
- Account management page for password changes
- Optional HTTPS with auto-generated self-signed certificates
- Configurable via STEGASOO_AUTH_ENABLED, STEGASOO_HTTPS_ENABLED env vars
- UI improvements: larger QR previews, consistent panel styling
- Update docker-compose.yml with auth config and persistent volumes
- Update all documentation for v4.0.2

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 20:00:47 -05:00
Aaron D. Lee
28d77957eb Bit of project management stuff. 2026-01-02 18:44:00 -05:00
Aaron D. Lee
89b4809489 Streamline README to focus on current features
Reduced from 433 to 123 lines by removing:
- Version history (now in CHANGELOG.md)
- Upgrade guides (now in CHANGELOG.md)
- Breaking changes sections
- Redundant examples
- Verbose project structure

README now focuses on: features, quick start, interfaces, security model.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 18:33:46 -05:00
Aaron D. Lee
79ab165b95 Add professional project structure and documentation
New files:
- LICENSE (MIT) - Required legal file
- CHANGELOG.md - Version history following Keep a Changelog
- CONTRIBUTING.md - Contributor guidelines
- CODE_OF_CONDUCT.md - Community standards
- .github/ISSUE_TEMPLATE/ - Bug report and feature request forms
- .github/PULL_REQUEST_TEMPLATE.md - PR checklist
- src/stegasoo/py.typed - PEP 561 type hint marker
- examples/ - Usage examples (basic, file embedding, channel keys)

Updated:
- README.md - Added CI status badges

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 18:23:08 -05:00
Aaron D. Lee
4194d6923a Remove backup files and add pattern to .gitignore
Deleted stale backup files:
- frontends/cli/main.py_old
- src/stegasoo/dct_steganography.py_old

Added gitignore patterns for common backup extensions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 18:13:04 -05:00
Aaron D. Lee
08e19a3bfd Remove obsolete debug/diagnostic scripts
Deleted one-off debugging scripts that are no longer needed:
- debug_jpegio.py - DCT/jpegio extraction debugger
- test_compare_capacity_flow.py - API flow crash diagnostic
- test_dct_crash.py - DCT crash diagnostic tool

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 18:10:29 -05:00
Aaron D. Lee
dea7472018 Remove build.sh from repo and update .gitignore
Dev convenience script should not be tracked.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 18:08:15 -05:00
Aaron D. Lee
e8863d15d7 Remove dev scripts from repo (already in .gitignore)
These are local dev convenience scripts that should not be tracked.
- quick_web.sh - Flask dev server launcher
- rbld_containers.sh - Docker rebuild helper

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 18:06:45 -05:00
39 changed files with 1838 additions and 3190 deletions

98
.github/ISSUE_TEMPLATE/bug_report.yml vendored Normal file
View File

@@ -0,0 +1,98 @@
name: Bug Report
description: Report a bug or unexpected behavior
title: "[Bug]: "
labels: ["bug", "triage"]
body:
- type: markdown
attributes:
value: |
Thanks for taking the time to report a bug! Please fill out the form below.
- type: textarea
id: description
attributes:
label: Bug Description
description: A clear and concise description of what the bug is.
placeholder: Describe the bug...
validations:
required: true
- type: textarea
id: reproduction
attributes:
label: Steps to Reproduce
description: Steps to reproduce the behavior.
placeholder: |
1. Run command '...'
2. Upload image '...'
3. See error
validations:
required: true
- type: textarea
id: expected
attributes:
label: Expected Behavior
description: What did you expect to happen?
validations:
required: true
- type: textarea
id: actual
attributes:
label: Actual Behavior
description: What actually happened?
validations:
required: true
- type: dropdown
id: interface
attributes:
label: Interface
description: Which interface are you using?
options:
- CLI
- Web UI
- REST API
- Python Library
validations:
required: true
- type: input
id: version
attributes:
label: Stegasoo Version
description: Run `stegasoo --version` or check the web UI footer
placeholder: "4.0.1"
validations:
required: true
- type: input
id: python-version
attributes:
label: Python Version
description: Run `python --version`
placeholder: "3.11.0"
validations:
required: true
- type: input
id: os
attributes:
label: Operating System
placeholder: "Ubuntu 22.04 / Windows 11 / macOS 14"
validations:
required: true
- type: textarea
id: logs
attributes:
label: Error Logs
description: Paste any relevant error messages or tracebacks.
render: shell
- type: textarea
id: additional
attributes:
label: Additional Context
description: Add any other context, screenshots, or files here.

8
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@@ -0,0 +1,8 @@
blank_issues_enabled: true
contact_links:
- name: Security Vulnerability
url: https://github.com/adlee-was-taken/stegasoo/security/advisories/new
about: Report security vulnerabilities privately
- name: Documentation
url: https://github.com/adlee-was-taken/stegasoo#readme
about: Check the documentation before opening an issue

View File

@@ -0,0 +1,62 @@
name: Feature Request
description: Suggest a new feature or enhancement
title: "[Feature]: "
labels: ["enhancement"]
body:
- type: markdown
attributes:
value: |
Thanks for suggesting a feature! Please fill out the form below.
- type: textarea
id: problem
attributes:
label: Problem Statement
description: Is your feature request related to a problem? Describe it.
placeholder: I'm always frustrated when...
validations:
required: true
- type: textarea
id: solution
attributes:
label: Proposed Solution
description: Describe the solution you'd like.
placeholder: I would like to be able to...
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternatives Considered
description: Describe any alternative solutions or features you've considered.
- type: dropdown
id: interface
attributes:
label: Affected Interface(s)
description: Which interface(s) would this feature affect?
multiple: true
options:
- CLI
- Web UI
- REST API
- Python Library
- Core Library
- type: dropdown
id: priority
attributes:
label: Priority
description: How important is this feature to you?
options:
- Nice to have
- Would improve my workflow
- Critical for my use case
- type: textarea
id: context
attributes:
label: Additional Context
description: Add any other context, mockups, or examples here.

46
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@@ -0,0 +1,46 @@
## Description
<!-- Describe your changes in detail -->
## Type of Change
<!-- Mark the relevant option with an 'x' -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
- [ ] Documentation update
- [ ] Refactoring (no functional changes)
- [ ] CI/CD or build changes
## Related Issues
<!-- Link any related issues here -->
Fixes #
## Testing
<!-- Describe how you tested your changes -->
- [ ] I have added tests that prove my fix/feature works
- [ ] Existing tests pass locally with my changes
- [ ] I have tested manually (describe below)
### Manual Testing Steps
<!-- If applicable, describe manual testing performed -->
## Checklist
- [ ] My code follows the project's style guidelines
- [ ] I have performed a self-review of my code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have updated the documentation accordingly
- [ ] I have updated CHANGELOG.md (if user-facing changes)
- [ ] My changes generate no new warnings
- [ ] Any dependent changes have been merged and published
## Screenshots
<!-- If applicable, add screenshots to help explain your changes -->

14
.gitignore vendored
View File

@@ -35,6 +35,12 @@ old_files/
*.swp *.swp
*.swo *.swo
# Backup files
*_old
*_old.*
*.bak
*.orig
# Testing # Testing
.pytest_cache/ .pytest_cache/
.coverage .coverage
@@ -58,6 +64,12 @@ htmlcov/
# Output test files. # Output test files.
test_data/*.png test_data/*.png
#Project root scripts. # Dev scripts (local convenience scripts)
build.sh
rbld_containers.sh rbld_containers.sh
quick_web.sh quick_web.sh
project_stats.sh
# Web UI auth database and SSL certs
frontends/web/instance/
frontends/web/certs/

6
API.md
View File

@@ -1,4 +1,4 @@
# Stegasoo REST API Documentation (v4.0.1) # Stegasoo REST API Documentation (v4.0.2)
Complete REST API reference for Stegasoo steganography operations. Complete REST API reference for Stegasoo steganography operations.
@@ -113,7 +113,7 @@ Check API status and configuration.
```json ```json
{ {
"version": "4.0.1", "version": "4.0.2",
"has_argon2": true, "has_argon2": true,
"has_qrcode_read": true, "has_qrcode_read": true,
"has_dct": true, "has_dct": true,
@@ -462,7 +462,7 @@ X-Stegasoo-Capacity-Percent: 12.4
X-Stegasoo-Embed-Mode: lsb X-Stegasoo-Embed-Mode: lsb
X-Stegasoo-Channel-Mode: private X-Stegasoo-Channel-Mode: private
X-Stegasoo-Channel-Fingerprint: ABCD-••••-...-3456 X-Stegasoo-Channel-Fingerprint: ABCD-••••-...-3456
X-Stegasoo-Version: 4.0.1 X-Stegasoo-Version: 4.0.2
<binary image data> <binary image data>
``` ```

120
CHANGELOG.md Normal file
View File

@@ -0,0 +1,120 @@
# Changelog
All notable changes to Stegasoo will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org).
## [4.0.2] - 2026-01-02
### Added
- **Web UI Authentication**: Single-admin login with SQLite3 user storage
- First-run setup wizard for admin account creation
- Account management page for password changes
- `@login_required` decorator protects encode/decode/generate routes
- Argon2id password hashing (lighter 64MB for fast login)
- **Optional HTTPS**: Auto-generated self-signed certificates for home network deployment
- Configurable via `STEGASOO_HTTPS_ENABLED` environment variable
- Certificates stored in `frontends/web/certs/`
- New environment variables: `STEGASOO_AUTH_ENABLED`, `STEGASOO_HTTPS_ENABLED`, `STEGASOO_HOSTNAME`
### Changed
- PIN entry column widened in encode/decode forms (col-md-4 → col-md-6)
- Channel options column narrowed (col-md-8 → col-md-6)
- QR preview panels enlarged for better text readability
- Consistent font sizing across all preview panel banners (0.7rem filename, 0.6rem data, 0.65rem badges)
### Fixed
- QR preview text too small to read in encode/decode templates
- Inconsistent label sizes between reference/carrier/stego panels
## [4.0.1] - 2025-01-02
### Fixed
- Fixed numpy binary incompatibility on Python 3.10 (jpegio/scipy)
- Fixed BatchCredentials test failures with missing `reference_photo` parameter
- Graceful handling when DCT dependencies have version mismatches
### Changed
- Applied `ruff` linter fixes across entire codebase (~400 issues)
- Applied `black` formatter to all Python files
- Modernized type hints: `Optional[X]``X | None`
- Updated ruff config to use `[tool.ruff.lint]` section
- Moved documentation files to repository root
### Removed
- Removed obsolete debug/diagnostic scripts
- Cleaned up backup files and dev scripts
## [4.0.0] - 2024-12-29
### Added
- Refreshed Web UI with modern, snazzy interface
- Improved user experience across all pages
### Changed
- Major version bump for breaking API changes
- Simplified passphrase handling (single passphrase instead of day-based)
- Removed date_str parameter from encoding
### Fixed
- Various bug fixes for Web UI
- CLI updates and improvements
## [3.2.0] - 2024-12-28
### Added
- Big revamp of the encoding system
- Home and about page improvements
- UNDER_THE_HOOD.md documentation
### Changed
- Renamed `phrase``passphrase` in API
- Updated Web UI styling
## [3.0.2] - 2024-12-27
### Added
- Full experimental DCT steganography support
- jpegio integration for better JPEG manipulation
- DCT/LSB mode selector in Web UI
## [3.0.0] - 2024-12-25
### Added
- DCT (Discrete Cosine Transform) steganography mode
- Support for JPEG carriers without quality loss
- Channel key feature for private messaging
### Changed
- Complete rewrite of steganography engine
- New hybrid authentication system
## [2.0.0] - 2024-12-20
### Added
- Web UI frontend
- REST API (FastAPI)
- Batch processing support
- RSA key authentication option
### Changed
- Migrated to hybrid photo + passphrase + PIN authentication
## [1.0.0] - 2024-12-15
### Added
- Initial release
- LSB steganography
- AES-256-GCM encryption
- CLI interface
- Basic PIN authentication
[4.0.2]: https://github.com/adlee-was-taken/stegasoo/compare/v4.0.1...v4.0.2
[4.0.1]: https://github.com/adlee-was-taken/stegasoo/compare/v4.0.0...v4.0.1
[4.0.0]: https://github.com/adlee-was-taken/stegasoo/compare/v3.2.0...v4.0.0
[3.2.0]: https://github.com/adlee-was-taken/stegasoo/compare/v3.0.2...v3.2.0
[3.0.2]: https://github.com/adlee-was-taken/stegasoo/compare/v3.0.0...v3.0.2
[3.0.0]: https://github.com/adlee-was-taken/stegasoo/compare/v2.0.0...v3.0.0
[2.0.0]: https://github.com/adlee-was-taken/stegasoo/compare/v1.0.0...v2.0.0
[1.0.0]: https://github.com/adlee-was-taken/stegasoo/releases/tag/v1.0.0

2
CLI.md
View File

@@ -1,4 +1,4 @@
# Stegasoo CLI Documentation (v4.0.1) # Stegasoo CLI Documentation (v4.0.2)
Complete command-line interface reference for Stegasoo steganography operations. Complete command-line interface reference for Stegasoo steganography operations.

54
CODE_OF_CONDUCT.md Normal file
View File

@@ -0,0 +1,54 @@
# Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior:
* The use of sexualized language or imagery, and sexual attention or advances
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information without explicit permission
* Other conduct which could reasonably be considered inappropriate
## Enforcement Responsibilities
Project maintainers are responsible for clarifying and enforcing our standards
of acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the project maintainers. All complaints will be reviewed and
investigated promptly and fairly.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant](https://www.contributor-covenant.org),
version 2.0.

165
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,165 @@
# Contributing to Stegasoo
Thank you for your interest in contributing to Stegasoo! This document provides guidelines and information for contributors.
## Getting Started
### Prerequisites
- Python 3.10 or higher
- Git
- Docker (optional, for container testing)
### Development Setup
1. **Clone the repository**
```bash
git clone https://github.com/adlee-was-taken/stegasoo.git
cd stegasoo
```
2. **Create a virtual environment**
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. **Install development dependencies**
```bash
pip install -e ".[dev]"
```
4. **Install pre-commit hooks**
```bash
pre-commit install
```
## Development Workflow
### Code Style
We use the following tools to maintain code quality:
- **Black** - Code formatting (line length: 100)
- **Ruff** - Linting
- **MyPy** - Type checking
Run all checks before committing:
```bash
black src/ tests/ frontends/
ruff check src/ tests/ frontends/
mypy src/
```
### Running Tests
```bash
# Run all tests
pytest
# Run with coverage
pytest --cov=stegasoo --cov-report=term-missing
# Run specific test file
pytest tests/test_stegasoo.py
```
### Type Hints
All new code should include type hints:
```python
def encode_message(
message: str,
carrier_image: bytes,
passphrase: str,
pin: str = "",
) -> EncodeResult:
...
```
## Making Changes
### Branch Naming
- `feature/description` - New features
- `fix/description` - Bug fixes
- `docs/description` - Documentation updates
- `refactor/description` - Code refactoring
### Commit Messages
Write clear, concise commit messages:
```
Add channel key validation for private messaging
- Implement validate_channel_key() function
- Add tests for valid/invalid key formats
- Update CLI to support --channel-key flag
```
### Pull Request Process
1. **Create a feature branch** from `main`
2. **Make your changes** with appropriate tests
3. **Ensure all checks pass** (tests, linting, formatting)
4. **Submit a PR** with a clear description
5. **Address review feedback** promptly
### PR Checklist
- [ ] Tests added/updated for changes
- [ ] Documentation updated if needed
- [ ] CHANGELOG.md updated for user-facing changes
- [ ] All CI checks passing
- [ ] No merge conflicts with `main`
## Project Structure
```
stegasoo/
├── src/stegasoo/ # Core library
│ ├── crypto.py # Encryption/decryption
│ ├── steganography.py # LSB embedding
│ ├── dct_steganography.py # DCT embedding
│ └── ...
├── frontends/
│ ├── cli/ # Command-line interface
│ ├── web/ # Flask web UI
│ └── api/ # FastAPI REST API
├── tests/ # Test suite
└── examples/ # Usage examples
```
## Reporting Issues
### Bug Reports
Please include:
- Python version and OS
- Stegasoo version (`stegasoo --version`)
- Minimal reproduction steps
- Expected vs actual behavior
- Error messages/tracebacks
### Feature Requests
Please include:
- Use case description
- Proposed solution (if any)
- Alternatives considered
## Security
If you discover a security vulnerability, please see [SECURITY.md](SECURITY.md) for responsible disclosure guidelines. **Do not open a public issue for security vulnerabilities.**
## License
By contributing, you agree that your contributions will be licensed under the MIT License.
## Questions?
Feel free to open a discussion or issue if you have questions about contributing.
Thank you for helping make Stegasoo better!

View File

@@ -227,6 +227,23 @@ docker-compose logs -f
docker-compose down docker-compose down
``` ```
#### Authentication Configuration (v4.0.2)
The Web UI supports optional authentication. Configure via environment variables:
```bash
# .env file (create in project root)
STEGASOO_AUTH_ENABLED=true # Enable login (default: true)
STEGASOO_HTTPS_ENABLED=false # Enable HTTPS (default: false)
STEGASOO_HOSTNAME=localhost # Hostname for SSL cert
STEGASOO_CHANNEL_KEY= # Optional channel key
# Then run
docker-compose up -d web
```
On first access, you'll be prompted to create an admin account. The database and SSL certs are persisted in Docker volumes.
#### Services #### Services
| Service | URL | Description | | Service | URL | Description |
@@ -424,6 +441,10 @@ Stegasoo works on Raspberry Pi 4 (2GB+ RAM recommended):
# System dependencies # System dependencies
sudo apt-get install python3-dev libzbar0 libjpeg-dev sudo apt-get install python3-dev libzbar0 libjpeg-dev
# Create venv with Python 3.12 (if available, or 3.11)
python3 -m venv venv
source venv/bin/activate
# Install (may take a while to compile) # Install (may take a while to compile)
pip install stegasoo[cli] pip install stegasoo[cli]
@@ -431,7 +452,25 @@ pip install stegasoo[cli]
pip install stegasoo[web] # Needs ~768MB free pip install stegasoo[web] # Needs ~768MB free
``` ```
**Note:** Argon2 operations will be slower on Pi due to memory-hardness. **Running the Web UI on Pi:**
```bash
cd frontends/web
# Optional: Enable authentication
export STEGASOO_AUTH_ENABLED=true
# Optional: Enable HTTPS for local network security
export STEGASOO_HTTPS_ENABLED=true
export STEGASOO_HOSTNAME=raspberrypi.local
# Start server
python app.py
```
**Notes:**
- Argon2 operations will be slower on Pi due to memory-hardness
- First run will prompt you to create an admin account
- HTTPS generates a self-signed certificate (browsers will warn)
--- ---

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024-2025 Aaron D. Lee
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

430
README.md
View File

@@ -2,429 +2,121 @@
A secure steganography system for hiding encrypted messages in images using hybrid authentication. A secure steganography system for hiding encrypted messages in images using hybrid authentication.
[![Tests](https://github.com/adlee-was-taken/stegasoo/actions/workflows/test.yml/badge.svg)](https://github.com/adlee-was-taken/stegasoo/actions/workflows/test.yml)
[![Lint](https://github.com/adlee-was-taken/stegasoo/actions/workflows/lint.yml/badge.svg)](https://github.com/adlee-was-taken/stegasoo/actions/workflows/lint.yml)
![Python](https://img.shields.io/badge/Python-3.10--3.12-blue) ![Python](https://img.shields.io/badge/Python-3.10--3.12-blue)
![License](https://img.shields.io/badge/License-MIT-green) [![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE)
![Security](https://img.shields.io/badge/Security-AES--256--GCM-red) ![Security](https://img.shields.io/badge/Security-AES--256--GCM-red)
![Version](https://img.shields.io/badge/Version-4.0.1-purple)
## Features ## Features
- 🔐 **AES-256-GCM** authenticated encryption - **AES-256-GCM** authenticated encryption
- 🧠 **Argon2id** memory-hard key derivation (256MB RAM requirement) - **Argon2id** memory-hard key derivation (256MB RAM requirement)
- 🎲 **Pseudo-random pixel selection** defeats steganalysis - **Pseudo-random pixel selection** defeats steganalysis
- 🔑 **Multi-factor authentication**: PIN, RSA key, or both - **Multi-factor authentication**: Reference photo + passphrase + PIN/RSA key
- 🖼️ **Reference photo** as "something you have" - **Multiple interfaces**: CLI, Web UI, REST API
- 🌐 **Multiple interfaces**: CLI, Web UI, REST API - **File embedding**: Hide any file type (PDF, ZIP, documents)
- 📁 **File embedding** - Hide any file type (PDF, ZIP, documents) - **DCT steganography**: JPEG-resilient embedding for social media
- 📱 **QR code support** - Encode/decode RSA keys via QR codes - **Channel keys**: Private group communication channels
- 🆕 **DCT steganography** - JPEG-resilient embedding for social media
- 🆕 **Large image support** - Process images up to 14MB+
## What's New in v4.0.0 ## Embedding Modes
| Feature | Description |
|---------|-------------|
| **Simplified Auth** | Removed date dependency - encode/decode anytime without tracking dates |
| **Passphrase** | Renamed from "day phrase" to "passphrase" (no more daily rotation) |
| **Python 3.12** | Requires Python 3.10-3.12 (jpegio incompatible with 3.13) |
| **Large Image Fix** | JPEG normalization prevents crashes with quality=100 images |
| **Subprocess Isolation** | WebUI runs encode/decode in subprocesses for stability |
| **4-Word Default** | Default passphrase increased from 3 to 4 words |
### Breaking Changes from v3.x
- `day_phrase` parameter renamed to `passphrase` in all APIs
- `date_str` parameter removed from encode/decode functions
- Python 3.13 not supported (jpegio C extension incompatibility)
### Embedding Mode Comparison
| Mode | Capacity (1080p) | JPEG Resilient | Best For | | Mode | Capacity (1080p) | JPEG Resilient | Best For |
|------|------------------|----------------|----------| |------|------------------|----------------|----------|
| **DCT** (default) | ~150 KB | Yes | Social media, messaging apps | | **DCT** (default) | ~150 KB | Yes | Social media, messaging apps |
| **LSB** | ~750 KB | No | Email, file transfer | | **LSB** | ~750 KB | No | Email, direct file transfer |
## WebUI Preview ## Web UI
| Front Page | Encode | Decode | Generate | | Home | Encode | Decode | Generate |
|:----------:|:------:|:------:|:--------:| |:----:|:------:|:------:|:--------:|
| ![Screenshot](https://github.com/adlee-was-taken/stegasoo/blob/main/data/WebUI.webp) | ![Screenshot](https://github.com/adlee-was-taken/stegasoo/blob/main/data/WebUI_Encode.webp) | ![Screenshot](https://github.com/adlee-was-taken/stegasoo/blob/main/data/WebUI_Decode.webp) | ![Screenshot](https://github.com/adlee-was-taken/stegasoo/blob/main/data/WebUI_Generate.webp) | | ![Home](data/WebUI.webp) | ![Encode](data/WebUI_Encode.webp) | ![Decode](data/WebUI_Decode.webp) | ![Generate](data/WebUI_Generate.webp) |
## Quick Start ## Quick Start
```bash ```bash
# Install with all features (requires Python 3.10-3.12) # Install (Python 3.10-3.12)
pip install -e ".[all]" pip install -e ".[all]"
# Generate credentials (memorize these!) # Generate credentials
stegasoo generate --pin --words 4 stegasoo generate --pin --words 4
# Encode a message (DCT mode - default, best for social media) # Encode a message
stegasoo encode \ stegasoo encode \
--ref photo.jpg \ --ref my_photo.jpg \
--carrier meme.jpg \ --carrier meme.jpg \
--passphrase "apple forest thunder mountain" \ --passphrase "apple forest thunder mountain" \
--pin 123456 \ --pin 123456 \
--message "Secret message" --message "Secret message"
# Encode with LSB mode (higher capacity, for email/file transfer) # Decode
stegasoo encode \
--ref photo.jpg \
--carrier meme.png \
--passphrase "apple forest thunder mountain" \
--pin 123456 \
--message "Secret message" \
--mode lsb
# Decode (auto-detects mode)
stegasoo decode \ stegasoo decode \
--ref photo.jpg \ --ref my_photo.jpg \
--stego stego.png \ --stego stego_image.png \
--passphrase "apple forest thunder mountain" \ --passphrase "apple forest thunder mountain" \
--pin 123456 --pin 123456
``` ```
For detailed installation instructions, see **[INSTALL.md](INSTALL.md)**. ## Interfaces
--- | Interface | Start Command | Documentation |
|-----------|---------------|---------------|
| **CLI** | `stegasoo --help` | [CLI.md](CLI.md) |
| **Web UI** | `cd frontends/web && python app.py` | [WEB_UI.md](WEB_UI.md) |
| **REST API** | `cd frontends/api && uvicorn main:app` | [API.md](API.md) |
## Security Model ## Security Model
Stegasoo uses multiple authentication factors combined with strong cryptography:
``` ```
┌─────────────────────────────────────────────────────────────────┐ Reference Photo ──┐
│ AUTHENTICATION LAYERS (~80-256 bits)
├─────────────────────────────────────────────────────────────────┤ ├──► Argon2id KDF ──► AES-256-GCM Key
│ │ Passphrase ───────┤ (256MB RAM)
│ Reference Photo ──┐ (~43-132 bits)
│ (~80-256 bits) │
│ ├──► Argon2id KDF ──► AES-256-GCM Key │ PIN ──────────────┤
│ Passphrase ───────┤ (256MB RAM) (~20-30 bits)
│ (~43-132 bits) │
│ │ │ RSA Key ──────────┘
│ Static PIN ───────┤ │ (optional)
│ (~20-30 bits) │ │
│ │ │
│ RSA Key ──────────┘ │
│ (~128 bits) (optional, adds another factor) │
│ │
└─────────────────────────────────────────────────────────────────┘
``` ```
### Entropy Summary
| Component | Entropy | Purpose |
|-----------|---------|---------|
| Reference Photo | ~80-256 bits | Something you have |
| Passphrase (3-12 words) | ~33-132 bits | Something you know |
| PIN (6-9 digits) | ~20-30 bits | Something you know |
| RSA Key (2048-4096 bit) | ~112-128 bits | Something you have (optional) |
| **Combined** | **133-400+ bits** | **Beyond brute force** |
### Attack Resistance
| Attack | Protection |
|--------|------------|
| Brute force | 2^133+ combinations minimum |
| Rainbow tables | Random 16-byte salt per message |
| Steganalysis | Pseudo-random pixel/coefficient selection |
| GPU cracking | Argon2id requires 256MB RAM per attempt |
| Side-channel | Constant-time operations in cryptography library |
| JPEG recompression | DCT mode embeds in frequency domain |
### Security Configurations
| Configuration | Entropy | Use Case | | Configuration | Entropy | Use Case |
|--------------|---------|----------| |--------------|---------|----------|
| 3-word passphrase + 6-digit PIN | ~133 bits | Casual private messaging | | 4-word passphrase + 6-digit PIN | ~153 bits | Standard security |
| 4-word passphrase + 9-digit PIN | ~176 bits | Standard security (recommended) | | 4-word passphrase + PIN + RSA | ~280+ bits | Maximum security |
| 4-word passphrase + RSA 2048 | ~241 bits | File-based authentication |
| 6-word passphrase + PIN + RSA 4096 | ~304 bits | Maximum security |
---
## Interfaces
### Command-Line Interface (CLI)
Full-featured CLI with piping support:
```bash
# Generate with RSA key
stegasoo generate --rsa --rsa-bits 4096 -o mykey.pem -p "password"
# Encode (DCT mode is now default)
stegasoo encode -r ref.jpg -c carrier.jpg -p "passphrase words here" --pin 123456 -m "Message"
# Encode with LSB mode for higher capacity
stegasoo encode -r ref.jpg -c carrier.png -p "passphrase words here" --pin 123456 \
-m "Message" --mode lsb
# Encode a file
stegasoo encode -r ref.jpg -c carrier.png -p "passphrase words here" --pin 123456 -f secret.txt
# Decode to stdout (quiet mode)
stegasoo decode -r ref.jpg -s stego.png -p "passphrase words here" --pin 123456 -q
# Compare LSB vs DCT capacity for an image
stegasoo compare carrier.png
# Check available modes
stegasoo modes
```
📖 Full documentation: **[CLI.md](CLI.md)**
### Web UI
Browser-based interface with drag-and-drop uploads:
```bash
# Start the server
cd frontends/web
python app.py
# Visit http://localhost:5000
```
Features:
- Drag-and-drop image uploads with scan animations
- Real-time entropy calculator
- Native mobile sharing (Web Share API)
- DCT mode default with compact mode selector
- Subprocess isolation for stability
- Large image support (14MB+ tested)
- Streamlined form flow (v3.3.0)
📖 Full documentation: **[WEB_UI.md](WEB_UI.md)**
### REST API
FastAPI-powered REST API with OpenAPI documentation:
```bash
# Start the server
cd frontends/api
uvicorn main:app --host 0.0.0.0 --port 8000
# Docs at http://localhost:8000/docs
```
Example API calls:
```bash
# Generate credentials
curl -X POST http://localhost:8000/generate \
-H "Content-Type: application/json" \
-d '{"use_pin": true, "passphrase_words": 4}'
# Encode (DCT mode is default)
curl -X POST http://localhost:8000/encode/multipart \
-F "message=Secret" \
-F "passphrase=apple forest thunder mountain" \
-F "pin=123456" \
-F "reference_photo=@photo.jpg" \
-F "carrier=@meme.jpg" \
--output stego.jpg
# Encode with LSB mode
curl -X POST http://localhost:8000/encode/multipart \
-F "message=Secret" \
-F "passphrase=apple forest thunder mountain" \
-F "pin=123456" \
-F "embed_mode=lsb" \
-F "reference_photo=@photo.jpg" \
-F "carrier=@meme.png" \
--output stego.png
# Decode (auto-detects mode)
curl -X POST http://localhost:8000/decode/multipart \
-F "passphrase=apple forest thunder mountain" \
-F "pin=123456" \
-F "reference_photo=@photo.jpg" \
-F "stego_image=@stego.jpg"
```
📖 Full documentation: **[API.md](API.md)**
---
## Project Structure
```
stegasoo/
├── src/stegasoo/ # Core library
│ ├── __init__.py # Public API
│ ├── constants.py # Configuration
│ ├── crypto.py # Encryption/decryption
│ ├── steganography.py # LSB image embedding
│ ├── dct_steganography.py # DCT embedding
│ ├── keygen.py # Credential generation
│ ├── validation.py # Input validation
│ ├── models.py # Data classes
│ ├── exceptions.py # Custom exceptions
│ ├── qr_utils.py # QR code utilities
│ └── utils.py # Utilities
├── frontends/
│ ├── web/ # Flask web UI
│ │ ├── app.py
│ │ ├── subprocess_stego.py # Subprocess isolation
│ │ └── stego_worker.py # Worker script
│ ├── cli/ # Command-line interface
│ └── api/ # FastAPI REST API
├── data/
│ └── bip39-words.txt # BIP-39 wordlist
├── pyproject.toml # Package configuration
├── requirements.txt # Dependencies
├── Dockerfile # Multi-stage Docker build
├── docker-compose.yml # Container orchestration
├── README.md # This file
├── INSTALL.md # Installation guide
├── CLI.md # CLI documentation
├── API.md # API documentation
├── WEB_UI.md # Web UI documentation
├── SECURITY.md # Security documentation
└── UNDER_THE_HOOD.md # Technical deep-dive
```
---
## Requirements ## Requirements
| Requirement | Version | Notes | | Requirement | Version |
|-------------|---------|-------| |-------------|---------|
| Python | 3.10-3.12 | **3.13 not supported** (jpegio incompatibility) | | Python | 3.10-3.12 |
| RAM | 512 MB+ | 256MB for Argon2 operations | | RAM | 512 MB+ |
| Disk | ~100 MB | |
### Key Dependencies
| Package | Purpose |
|---------|---------|
| `cryptography` | AES-256-GCM encryption |
| `Pillow` | Image processing |
| `argon2-cffi` | Memory-hard key derivation |
| `scipy` | DCT transforms |
| `jpegio` | JPEG coefficient manipulation |
| `numpy` | Array operations |
---
## Configuration
### Limits
| Limit | Value |
|-------|-------|
| Max image size | Tested up to 14MB |
| Max message size | 50 KB |
| Max file upload | 5 MB |
| PIN length | 6-9 digits |
| Passphrase length | 3-12 words |
| RSA key sizes | 2048, 3072, 4096 bits |
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `FLASK_ENV` | production | Flask environment |
| `PYTHONPATH` | - | Include `src/` for development |
---
## Development ## Development
```bash ```bash
# Install dev dependencies
pip install -e ".[dev]" pip install -e ".[dev]"
# Run tests
pytest pytest
black src/ tests/ frontends/
# Format code ruff check src/ tests/ frontends/
black src/ frontends/
ruff check src/ frontends/
# Type checking
mypy src/
# Check DCT support
python -c "from stegasoo import has_dct_support; print(f'DCT: {has_dct_support()}')"
python -c "from stegasoo.dct_steganography import has_jpegio_support; print(f'jpegio: {has_jpegio_support()}')"
``` ```
--- ## Documentation
## Version History - [INSTALL.md](INSTALL.md) - Installation guide
- [CLI.md](CLI.md) - Command-line reference
| Version | Changes | - [API.md](API.md) - REST API documentation
|---------|---------| - [WEB_UI.md](WEB_UI.md) - Web interface guide
| **4.0.1** | Lint cleanup, test fixes, Web UI improvements (channel key dropdown, LED indicators) | - [SECURITY.md](SECURITY.md) - Security model details
| **4.0.0** | Channel key support for deployment isolation, removed date dependency, renamed day_phrase→passphrase, Python 3.12 requirement, JPEG normalization fix, subprocess isolation, large image support | - [UNDER_THE_HOOD.md](UNDER_THE_HOOD.md) - Technical deep-dive
| **3.2.x** | DCT color mode, JPEG output fixes | - [CHANGELOG.md](CHANGELOG.md) - Version history
| **3.0.x** | Added DCT steganography mode | - [CONTRIBUTING.md](CONTRIBUTING.md) - Contributor guide
| **2.2.x** | QR code support, file embedding |
| **2.0.x** | Web UI, REST API, RSA keys |
| **1.0.x** | Initial release, CLI only |
---
## Upgrading from v3.x
### Code Changes Required
```python
# Old (v3.x)
result = encode(
message="secret",
day_phrase="apple forest thunder",
date_str="2024-01-15",
...
)
# New (v4.0)
result = encode(
message="secret",
passphrase="apple forest thunder mountain",
# No date_str needed!
...
)
```
### CLI Changes
```bash
# Old (v3.x)
stegasoo encode --phrase "words" --date 2024-01-15 ...
# New (v4.0)
stegasoo encode --passphrase "words here more" ...
# or short form
stegasoo encode -p "words here more" ...
```
---
## License ## License
MIT License - Use responsibly. MIT License - see [LICENSE](LICENSE). Use responsibly.
--- ---
## ⚠️ Disclaimer *This tool is for educational and legitimate privacy purposes. Users are responsible for complying with applicable laws.*
This tool is for educational and legitimate privacy purposes only. Users are responsible for complying with applicable laws in their jurisdiction.
---
## See Also
- **[INSTALL.md](INSTALL.md)** - Detailed installation instructions
- **[CLI.md](CLI.md)** - Command-line interface reference
- **[API.md](API.md)** - REST API documentation
- **[WEB_UI.md](WEB_UI.md)** - Web interface guide
- **[SECURITY.md](SECURITY.md)** - Security model and threat analysis
- **[UNDER_THE_HOOD.md](UNDER_THE_HOOD.md)** - Technical implementation details

138
WEB_UI.md
View File

@@ -1,11 +1,12 @@
# Stegasoo Web UI Documentation (v4.0.1) # Stegasoo Web UI Documentation (v4.0.2)
Complete guide for the Stegasoo web-based steganography interface. Complete guide for the Stegasoo web-based steganography interface.
## Table of Contents ## Table of Contents
- [Overview](#overview) - [Overview](#overview)
- [What's New in v4.0.1](#whats-new-in-v401) - [What's New in v4.0.2](#whats-new-in-v402)
- [Authentication & HTTPS](#authentication--https)
- [Installation & Setup](#installation--setup) - [Installation & Setup](#installation--setup)
- [Pages & Features](#pages--features) - [Pages & Features](#pages--features)
- [Home Page](#home-page) - [Home Page](#home-page)
@@ -53,24 +54,118 @@ Built with Flask, Bootstrap 5, and a modern dark theme.
--- ---
## What's New in v4.0.1 ## What's New in v4.0.2
Version 4.0.1 adds channel key support and UI improvements: Version 4.0.2 adds authentication and HTTPS support for secure home network deployment:
| Feature | Description | | Feature | Description |
|---------|-------------| |---------|-------------|
| Channel keys | 256-bit keys for deployment/group isolation | | **Authentication** | Single-admin login with SQLite3 user storage |
| Channel dropdown | Select channel mode (Auto/Public/Custom) | | **First-run setup** | Wizard to create admin account on first access |
| LED indicators | Visual status indicators for form fields | | **Account management** | Change password page |
| Key capsule styling | Improved RSA key display | | **Optional HTTPS** | Auto-generated self-signed certificates |
| Streamlined layout | PIN + Channel key in same row | | **UI improvements** | Larger QR previews, consistent panel styling |
**Key benefits:** **Key benefits:**
-Channel key isolation - Different teams/deployments can't read each other's messages -Secure your Web UI with username/password
-Dropdown selection for channel mode instead of radio buttons -No manual database setup - automatic on first run
-Visual LED indicators show field status -HTTPS with auto-generated certs for home networks
- ✅ Cleaner form layout with improved spacing - ✅ Configurable via environment variables
-Backward compatible - public mode works without channel key -Improved readability of QR preview panels
---
## Authentication & HTTPS
### Overview
v4.0.2 adds optional authentication and HTTPS for secure home network deployment.
### First-Run Setup
On first access, you'll be prompted to create an admin account:
1. Navigate to `http://localhost:5000`
2. You'll be redirected to `/setup`
3. Enter a username (e.g., "admin")
4. Enter a password (minimum 8 characters)
5. Confirm the password
6. Click "Create Admin Account"
The admin account is stored in `frontends/web/instance/stegasoo.db` (SQLite).
### Login
After setup, protected pages require login:
- **Protected routes:** `/encode`, `/decode`, `/generate`, `/account`, `/api/*`
- **Public routes:** `/`, `/about`, `/login`, `/setup`
### Account Management
Access `/account` to:
- View current username
- Change your password
- Logout
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `STEGASOO_AUTH_ENABLED` | `true` | Enable/disable authentication |
| `STEGASOO_HTTPS_ENABLED` | `false` | Enable HTTPS with self-signed certs |
| `STEGASOO_HOSTNAME` | `localhost` | Hostname for certificate generation |
### Enabling HTTPS
```bash
# Enable HTTPS
export STEGASOO_HTTPS_ENABLED=true
export STEGASOO_HOSTNAME=stegasoo.local # Optional: your hostname
cd frontends/web
python app.py
```
On first run with HTTPS enabled:
- Generates RSA 2048-bit private key
- Creates self-signed X.509 certificate (365 days validity)
- Stores in `frontends/web/certs/`
- Server starts on https://localhost:5000
**Note:** Browsers will show a security warning for self-signed certificates. This is expected for home network use.
### Disabling Authentication
For development or trusted networks:
```bash
export STEGASOO_AUTH_ENABLED=false
cd frontends/web
python app.py
```
### Docker Configuration
```yaml
# docker-compose.yml
services:
web:
environment:
STEGASOO_AUTH_ENABLED: "true"
STEGASOO_HTTPS_ENABLED: "true"
STEGASOO_HOSTNAME: "stegasoo.local"
volumes:
- ./instance:/app/frontends/web/instance # Persist user database
- ./certs:/app/frontends/web/certs # Persist SSL certs
```
### Security Notes
- Passwords are hashed with Argon2id (time_cost=3, memory_cost=64MB)
- Single admin user only (no registration)
- Session-based authentication using Flask sessions
- Database stored in `instance/stegasoo.db` (add to `.gitignore`)
--- ---
@@ -752,6 +847,10 @@ Both modes use the same strong encryption (AES-256-GCM with Argon2id key derivat
|----------|---------|-------------| |----------|---------|-------------|
| `FLASK_ENV` | production | Flask environment | | `FLASK_ENV` | production | Flask environment |
| `PYTHONPATH` | - | Include `src/` for development | | `PYTHONPATH` | - | Include `src/` for development |
| `STEGASOO_AUTH_ENABLED` | `true` | Enable/disable authentication (v4.0.2) |
| `STEGASOO_HTTPS_ENABLED` | `false` | Enable HTTPS with self-signed certs (v4.0.2) |
| `STEGASOO_HOSTNAME` | `localhost` | Hostname for certificate CN (v4.0.2) |
| `STEGASOO_CHANNEL_KEY` | - | Channel key for deployment isolation |
### Application Limits ### Application Limits
@@ -808,12 +907,23 @@ services:
target: web target: web
ports: ports:
- "5000:5000" - "5000:5000"
environment:
STEGASOO_AUTH_ENABLED: "true"
STEGASOO_HTTPS_ENABLED: "false"
STEGASOO_CHANNEL_KEY: ${STEGASOO_CHANNEL_KEY:-}
volumes:
- stegasoo-web-data:/app/frontends/web/instance
- stegasoo-web-certs:/app/frontends/web/certs
deploy: deploy:
resources: resources:
limits: limits:
memory: 768M memory: 768M
reservations: reservations:
memory: 384M memory: 384M
volumes:
stegasoo-web-data:
stegasoo-web-certs:
``` ```
--- ---

View File

@@ -1,61 +0,0 @@
#!/bin/bash
# Stegasoo Build Script
# Usage: ./build.sh [base|fast|full|clean]
set -e
case "${1:-fast}" in
base)
# Build base image with all dependencies (run once, or when deps change)
echo "🔨 Building base image (this takes 5-10 minutes)..."
docker build -f Dockerfile.base -t stegasoo-base:latest .
echo "✅ Base image built! Future builds will be fast."
echo ""
echo "Optional: Push to registry for team use:"
echo " docker tag stegasoo-base:latest yourregistry/stegasoo-base:latest"
echo " docker push yourregistry/stegasoo-base:latest"
;;
fast)
# Fast build using pre-built base image
if ! docker image inspect stegasoo-base:latest >/dev/null 2>&1; then
echo "⚠️ Base image not found. Building it first (one-time)..."
$0 base
fi
echo "🚀 Fast build using base image..."
docker-compose build
echo "✅ Done! Start with: docker-compose up -d"
;;
full)
# Full rebuild from scratch (slow, but no base image needed)
echo "🐢 Full build from scratch (slow)..."
docker-compose build --no-cache
echo "✅ Done! Start with: docker-compose up -d"
;;
clean)
# Clean up everything
echo "🧹 Cleaning up..."
docker-compose down --rmi local -v 2>/dev/null || true
docker rmi stegasoo-base:latest 2>/dev/null || true
echo "✅ Cleaned!"
;;
*)
echo "Stegasoo Build Script"
echo ""
echo "Usage: $0 [command]"
echo ""
echo "Commands:"
echo " base Build the base image (one-time, 5-10 min)"
echo " fast Fast build using base image (default, ~10 sec)"
echo " full Full rebuild from scratch (slow, no base needed)"
echo " clean Remove all images and volumes"
echo ""
echo "Typical workflow:"
echo " 1. First time: $0 base"
echo " 2. Daily dev: $0 fast (or just 'docker-compose build')"
echo " 3. Deps change: $0 base (rebuild base image)"
;;
esac

View File

@@ -1,215 +0,0 @@
#!/usr/bin/env python3
"""
Debug script for DCT/jpegio extraction issues.
Run from the stegasoo directory.
"""
import sys
import struct
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / 'src'))
import hashlib
import numpy as np
# Check for jpegio
try:
import jpegio as jio
print("✓ jpegio available")
except ImportError:
print("✗ jpegio NOT available")
sys.exit(1)
def get_usable_positions(coef_array, min_magnitude=2):
"""Get positions of usable coefficients."""
positions = []
h, w = coef_array.shape
for row in range(h):
for col in range(w):
# Skip DC coefficients (top-left of each 8x8 block)
if (row % 8 == 0) and (col % 8 == 0):
continue
if abs(coef_array[row, col]) >= min_magnitude:
positions.append((row, col))
return positions
def generate_order(num_positions, seed):
"""Generate pseudo-random order for coefficient selection."""
hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest()
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
order = list(range(num_positions))
rng.shuffle(order)
return order
def extract_bits(coef_array, positions, order, num_bits):
"""Extract bits from coefficients."""
bits = []
for i, pos_idx in enumerate(order):
if i >= num_bits:
break
row, col = positions[pos_idx]
coef = coef_array[row, col]
bits.append(coef & 1)
return bits
def bits_to_bytes(bits):
"""Convert list of bits to bytes."""
result = []
for i in range(0, len(bits), 8):
byte_bits = bits[i:i+8]
if len(byte_bits) == 8:
byte_val = sum(byte_bits[j] << (7-j) for j in range(8))
result.append(byte_val)
return bytes(result)
def main():
if len(sys.argv) < 3:
print("Usage: python debug_jpegio.py <stego_image.jpg> <reference_photo>")
print("\nOptional: add passphrase, pin, key path")
print(" python debug_jpegio.py stego.jpg ref.jpg 'passphrase' '123456' key.pem")
sys.exit(1)
stego_path = sys.argv[1]
ref_path = sys.argv[2]
passphrase = sys.argv[3] if len(sys.argv) > 3 else "test"
pin = sys.argv[4] if len(sys.argv) > 4 else ""
key_path = sys.argv[5] if len(sys.argv) > 5 else None
print(f"\n{'='*60}")
print("JPEGIO DCT EXTRACTION DEBUG")
print(f"{'='*60}")
print(f"Stego image: {stego_path}")
print(f"Reference: {ref_path}")
print(f"Passphrase: '{passphrase}'")
print(f"PIN: '{pin}'")
print(f"Key: {key_path}")
# Load stego image with jpegio
print(f"\n[1] Loading stego image with jpegio...")
try:
jpeg = jio.read(stego_path)
print(f" ✓ jpegio.read() succeeded")
print(f" Number of components: {len(jpeg.coef_arrays)}")
for i, arr in enumerate(jpeg.coef_arrays):
print(f" Component {i}: shape={arr.shape}, dtype={arr.dtype}")
except Exception as e:
print(f" ✗ Failed: {e}")
sys.exit(1)
# Get coefficient array (channel 0)
coef_array = jpeg.coef_arrays[0]
print(f"\n[2] Coefficient array analysis...")
print(f" Shape: {coef_array.shape}")
print(f" Non-zero coefficients: {np.count_nonzero(coef_array)}")
print(f" Min value: {coef_array.min()}")
print(f" Max value: {coef_array.max()}")
# Get usable positions
print(f"\n[3] Finding usable positions (|coef| >= 2, non-DC)...")
positions = get_usable_positions(coef_array)
print(f" Usable positions: {len(positions)}")
print(f" Capacity: ~{len(positions) // 8} bytes")
# Generate seed (this needs to match the encode seed!)
print(f"\n[4] Generating seed...")
# Load reference photo
ref_data = Path(ref_path).read_bytes()
ref_hash = hashlib.sha256(ref_data).digest()
print(f" Reference hash: {ref_hash[:8].hex()}...")
# Load RSA key if provided
rsa_component = b""
if key_path:
try:
from stegasoo import load_rsa_key
key_data = Path(key_path).read_bytes()
# Try without password first
try:
rsa_key = load_rsa_key(key_data, password=None)
except:
rsa_key = load_rsa_key(key_data, password="testpass")
# Get public key bytes for seed
from cryptography.hazmat.primitives import serialization
pub_bytes = rsa_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
rsa_component = hashlib.sha256(pub_bytes).digest()
print(f" RSA key loaded, hash: {rsa_component[:8].hex()}...")
except Exception as e:
print(f" ✗ Could not load RSA key: {e}")
# Build seed like stegasoo does
# This is the critical part - must match encoding!
seed_parts = [
ref_hash,
passphrase.encode('utf-8'),
pin.encode('utf-8') if pin else b"",
rsa_component,
]
seed = hashlib.sha256(b"".join(seed_parts)).digest()
print(f" Combined seed: {seed[:8].hex()}...")
# Generate order
print(f"\n[5] Generating coefficient order...")
order = generate_order(len(positions), seed)
print(f" First 10 indices: {order[:10]}")
# Try to extract header
print(f"\n[6] Extracting header (first 80 bits = 10 bytes)...")
HEADER_SIZE = 10
header_bits = extract_bits(coef_array, positions, order, HEADER_SIZE * 8)
header_bytes = bits_to_bytes(header_bits)
print(f" Raw header bytes: {header_bytes.hex()}")
print(f" As ASCII (if printable): {repr(header_bytes)}")
# Check for JPGS magic
JPEGIO_MAGIC = b'JPGS'
if header_bytes[:4] == JPEGIO_MAGIC:
print(f" ✓ Found JPEGIO magic bytes!")
version = header_bytes[4]
flags = header_bytes[5]
data_length = struct.unpack('>I', header_bytes[6:10])[0]
print(f" Version: {version}")
print(f" Flags: {flags}")
print(f" Data length: {data_length} bytes")
if data_length > 0 and data_length < len(positions) // 8:
print(f"\n[7] Extracting payload ({data_length} bytes)...")
total_bits = (HEADER_SIZE + data_length) * 8
all_bits = extract_bits(coef_array, positions, order, total_bits)
data_bits = all_bits[HEADER_SIZE * 8:]
payload = bits_to_bytes(data_bits)
print(f" Payload (first 64 bytes): {payload[:64].hex()}")
print(f" This should be encrypted data starting with salt/IV")
else:
print(f" ✗ Invalid data length: {data_length}")
else:
print(f" ✗ No JPEGIO magic found")
print(f" Expected: {JPEGIO_MAGIC.hex()} ('JPGS')")
print(f" Got: {header_bytes[:4].hex()} ('{header_bytes[:4]}')")
# Try alternate interpretations
print(f"\n[7] Trying alternate header interpretations...")
# Maybe it's scipy DCT format?
DCT_MAGIC = b'DCTS'
if header_bytes[:4] == DCT_MAGIC:
print(f" Found SCIPY DCT magic - wrong extraction method!")
else:
# Show bit distribution
print(f" First 32 extracted bits: {header_bits[:32]}")
# Check if bits look random or patterned
ones = sum(header_bits[:80])
print(f" Bit distribution: {ones}/80 ones ({100*ones/80:.1f}%)")
print(f"\n{'='*60}")
print("DEBUG COMPLETE")
print(f"{'='*60}\n")
if __name__ == '__main__':
main()

View File

@@ -18,6 +18,14 @@ services:
environment: environment:
<<: *common-env <<: *common-env
FLASK_ENV: production FLASK_ENV: production
# Authentication (v4.0.2)
STEGASOO_AUTH_ENABLED: ${STEGASOO_AUTH_ENABLED:-true}
STEGASOO_HTTPS_ENABLED: ${STEGASOO_HTTPS_ENABLED:-false}
STEGASOO_HOSTNAME: ${STEGASOO_HOSTNAME:-localhost}
volumes:
# Persist auth database and SSL certs (v4.0.2)
- stegasoo-web-data:/app/frontends/web/instance
- stegasoo-web-certs:/app/frontends/web/certs
restart: unless-stopped restart: unless-stopped
deploy: deploy:
resources: resources:
@@ -45,3 +53,10 @@ services:
memory: 768M memory: 768M
reservations: reservations:
memory: 384M memory: 384M
# Named volumes for persistent data
volumes:
stegasoo-web-data:
driver: local
stegasoo-web-certs:
driver: local

48
examples/README.md Normal file
View File

@@ -0,0 +1,48 @@
# Stegasoo Examples
This directory contains example scripts demonstrating how to use Stegasoo.
## Prerequisites
Install Stegasoo first:
```bash
pip install stegasoo
# Or for development:
pip install -e ".[all]"
```
## Examples
### basic_usage.py
Basic encode/decode workflow with a text message.
```bash
python basic_usage.py
```
### embed_file.py
Embed and extract files (documents, images, etc.) inside carrier images.
```bash
python embed_file.py
```
### channel_keys.py
Use channel keys to create private communication channels for groups.
```bash
python channel_keys.py
```
## Test Images
You'll need to provide your own images:
- `my_secret_photo.png` - Your reference photo (keep this secret!)
- `carrier.png` - The image that will carry your hidden message
For testing, you can use any PNG or BMP image. JPEG carriers are supported with DCT mode.

59
examples/basic_usage.py Normal file
View File

@@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""
Basic Stegasoo Usage Example
This example demonstrates how to encode and decode a secret message
using the Stegasoo library.
"""
from pathlib import Path
import stegasoo
def main():
# Load your images
# The reference photo is your "key" - keep it secret!
reference_photo = Path("my_secret_photo.png").read_bytes()
carrier_image = Path("carrier.png").read_bytes()
# Your secret message
message = "This is my secret message!"
# Your credentials
passphrase = "correct horse battery staple" # Use 4+ words
pin = "123456" # 6-9 digits
# === ENCODE ===
print("Encoding message...")
result = stegasoo.encode(
message=message,
reference_photo=reference_photo,
carrier_image=carrier_image,
passphrase=passphrase,
pin=pin,
)
# Save the stego image
output_path = Path(f"secret_{result.suggested_filename}")
output_path.write_bytes(result.stego_image)
print(f"Saved to: {output_path}")
print(f"Capacity used: {result.capacity_used_percent:.1f}%")
# === DECODE ===
print("\nDecoding message...")
stego_image = output_path.read_bytes()
decoded = stegasoo.decode(
stego_image=stego_image,
reference_photo=reference_photo,
passphrase=passphrase,
pin=pin,
)
print(f"Decoded message: {decoded.message}")
print(f"Message type: {decoded.payload_type}")
if __name__ == "__main__":
main()

72
examples/channel_keys.py Normal file
View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
Channel Keys Example
Channel keys allow you to create private communication channels.
Only people with the same channel key can decode messages.
"""
from pathlib import Path
import stegasoo
from stegasoo.channel import generate_channel_key, get_channel_fingerprint
def main():
# Generate a channel key for your group
channel_key = generate_channel_key()
fingerprint = get_channel_fingerprint(channel_key)
print("=== Channel Key Generated ===")
print(f"Key: {channel_key}")
print(f"Fingerprint: {fingerprint}")
print("\nShare this key securely with your group members!")
print("-" * 40)
# Load images
reference_photo = Path("my_secret_photo.png").read_bytes()
carrier_image = Path("carrier.png").read_bytes()
# Encode with channel key
print("\nEncoding message with channel key...")
result = stegasoo.encode(
message="Secret group message!",
reference_photo=reference_photo,
carrier_image=carrier_image,
passphrase="correct horse battery staple",
pin="123456",
channel_key=channel_key, # Add the channel key
)
stego_data = result.stego_image
print(f"Encoded successfully!")
# Decode with correct channel key
print("\nDecoding with correct channel key...")
decoded = stegasoo.decode(
stego_image=stego_data,
reference_photo=reference_photo,
passphrase="correct horse battery staple",
pin="123456",
channel_key=channel_key, # Same channel key
)
print(f"Message: {decoded.message}")
# Try to decode with wrong channel key
print("\nTrying to decode with wrong channel key...")
wrong_key = generate_channel_key()
try:
stegasoo.decode(
stego_image=stego_data,
reference_photo=reference_photo,
passphrase="correct horse battery staple",
pin="123456",
channel_key=wrong_key, # Different channel key
)
print("ERROR: Should have failed!")
except (stegasoo.DecryptionError, stegasoo.ExtractionError):
print("Correctly rejected - wrong channel key!")
if __name__ == "__main__":
main()

78
examples/embed_file.py Normal file
View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""
File Embedding Example
This example demonstrates how to embed a file (like a document or image)
inside a carrier image using Stegasoo.
"""
from pathlib import Path
import stegasoo
from stegasoo.models import FilePayload
def main():
# Load images
reference_photo = Path("my_secret_photo.png").read_bytes()
carrier_image = Path("carrier.png").read_bytes()
# Load the file to embed
secret_file = Path("secret_document.pdf")
file_data = secret_file.read_bytes()
# Create a FilePayload
payload = FilePayload(
filename=secret_file.name,
data=file_data,
mime_type="application/pdf",
)
# Credentials
passphrase = "correct horse battery staple"
pin = "123456"
# Check capacity first
capacity = stegasoo.calculate_capacity(carrier_image)
print(f"Carrier capacity: {capacity['capacity_bytes']:,} bytes")
print(f"File size: {len(file_data):,} bytes")
if len(file_data) > capacity["capacity_bytes"]:
print("Error: File too large for this carrier!")
return
# Encode the file
print("\nEmbedding file...")
result = stegasoo.encode(
file_payload=payload,
reference_photo=reference_photo,
carrier_image=carrier_image,
passphrase=passphrase,
pin=pin,
)
output_path = Path(f"contains_file_{result.suggested_filename}")
output_path.write_bytes(result.stego_image)
print(f"Saved to: {output_path}")
# Decode and extract the file
print("\nExtracting file...")
decoded = stegasoo.decode(
stego_image=output_path.read_bytes(),
reference_photo=reference_photo,
passphrase=passphrase,
pin=pin,
)
if decoded.payload_type == "file":
extracted_path = Path(f"extracted_{decoded.filename}")
extracted_path.write_bytes(decoded.file_data)
print(f"Extracted: {extracted_path}")
print(f"Original filename: {decoded.filename}")
print(f"MIME type: {decoded.mime_type}")
else:
print(f"Unexpected payload type: {decoded.payload_type}")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -29,8 +29,31 @@ import sys
import time import time
from pathlib import Path from pathlib import Path
from flask import Flask, flash, jsonify, redirect, render_template, request, send_file, url_for from auth import (
change_password,
create_user,
get_username,
is_authenticated,
login_required,
user_exists,
verify_password,
)
from auth import (
init_app as init_auth,
)
from flask import (
Flask,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
session,
url_for,
)
from PIL import Image from PIL import Image
from ssl_utils import ensure_certs
os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0" os.environ["NUMPY_MADVISE_HUGEPAGE"] = "0"
os.environ["OMP_NUM_THREADS"] = "1" os.environ["OMP_NUM_THREADS"] = "1"
@@ -124,6 +147,13 @@ app = Flask(__name__)
app.secret_key = secrets.token_hex(32) app.secret_key = secrets.token_hex(32)
app.config["MAX_CONTENT_LENGTH"] = MAX_FILE_SIZE app.config["MAX_CONTENT_LENGTH"] = MAX_FILE_SIZE
# Auth configuration from environment
app.config["AUTH_ENABLED"] = os.environ.get("STEGASOO_AUTH_ENABLED", "true").lower() == "true"
app.config["HTTPS_ENABLED"] = os.environ.get("STEGASOO_HTTPS_ENABLED", "false").lower() == "true"
# Initialize auth module
init_auth(app)
# Temporary file storage for sharing (file_id -> {data, timestamp, filename}) # Temporary file storage for sharing (file_id -> {data, timestamp, filename})
TEMP_FILES: dict[str, dict] = {} TEMP_FILES: dict[str, dict] = {}
THUMBNAIL_FILES: dict[str, bytes] = {} THUMBNAIL_FILES: dict[str, bytes] = {}
@@ -159,6 +189,10 @@ def inject_globals():
"channel_configured": channel_status["configured"], "channel_configured": channel_status["configured"],
"channel_fingerprint": channel_status.get("fingerprint"), "channel_fingerprint": channel_status.get("fingerprint"),
"channel_source": channel_status.get("source"), "channel_source": channel_status.get("source"),
# NEW in v4.0.2 - Auth state
"auth_enabled": app.config.get("AUTH_ENABLED", True),
"is_authenticated": is_authenticated(),
"username": get_username() if is_authenticated() else None,
} }
@@ -296,6 +330,7 @@ def index():
@app.route("/api/channel/status") @app.route("/api/channel/status")
@login_required
def api_channel_status(): def api_channel_status():
""" """
Get current channel key status (v4.0.0). Get current channel key status (v4.0.0).
@@ -330,6 +365,7 @@ def api_channel_status():
@app.route("/api/channel/validate", methods=["POST"]) @app.route("/api/channel/validate", methods=["POST"])
@login_required
def api_channel_validate(): def api_channel_validate():
""" """
Validate a channel key format (v4.0.0). Validate a channel key format (v4.0.0).
@@ -366,6 +402,7 @@ def api_channel_validate():
@app.route("/generate", methods=["GET", "POST"]) @app.route("/generate", methods=["GET", "POST"])
@login_required
def generate(): def generate():
if request.method == "POST": if request.method == "POST":
# v3.2.0: Changed from words_per_phrase to words_per_passphrase, default increased to 4 # v3.2.0: Changed from words_per_phrase to words_per_passphrase, default increased to 4
@@ -450,6 +487,7 @@ def generate():
@app.route("/generate/qr/<token>") @app.route("/generate/qr/<token>")
@login_required
def generate_qr(token): def generate_qr(token):
"""Generate QR code for RSA key.""" """Generate QR code for RSA key."""
if not HAS_QRCODE: if not HAS_QRCODE:
@@ -473,6 +511,7 @@ def generate_qr(token):
@app.route("/generate/qr-download/<token>") @app.route("/generate/qr-download/<token>")
@login_required
def generate_qr_download(token): def generate_qr_download(token):
"""Download QR code as PNG file.""" """Download QR code as PNG file."""
if not HAS_QRCODE: if not HAS_QRCODE:
@@ -501,6 +540,7 @@ def generate_qr_download(token):
@app.route("/qr/crop", methods=["POST"]) @app.route("/qr/crop", methods=["POST"])
@login_required
def qr_crop(): def qr_crop():
""" """
Detect and crop QR code from an image. Detect and crop QR code from an image.
@@ -538,6 +578,7 @@ def qr_crop():
@app.route("/generate/download-key", methods=["POST"]) @app.route("/generate/download-key", methods=["POST"])
@login_required
def download_key(): def download_key():
"""Download RSA key as password-protected PEM file.""" """Download RSA key as password-protected PEM file."""
key_pem = request.form.get("key_pem", "") key_pem = request.form.get("key_pem", "")
@@ -570,6 +611,7 @@ def download_key():
@app.route("/extract-key-from-qr", methods=["POST"]) @app.route("/extract-key-from-qr", methods=["POST"])
@login_required
def extract_key_from_qr_route(): def extract_key_from_qr_route():
""" """
Extract RSA key from uploaded QR code image. Extract RSA key from uploaded QR code image.
@@ -609,6 +651,7 @@ def extract_key_from_qr_route():
@app.route("/api/compare-capacity", methods=["POST"]) @app.route("/api/compare-capacity", methods=["POST"])
@login_required
def api_compare_capacity(): def api_compare_capacity():
""" """
Compare LSB and DCT capacity for an uploaded carrier image. Compare LSB and DCT capacity for an uploaded carrier image.
@@ -652,6 +695,7 @@ def api_compare_capacity():
@app.route("/api/check-fit", methods=["POST"]) @app.route("/api/check-fit", methods=["POST"])
@login_required
def api_check_fit(): def api_check_fit():
""" """
Check if a payload will fit in the carrier with selected mode. Check if a payload will fit in the carrier with selected mode.
@@ -705,6 +749,7 @@ def api_check_fit():
@app.route("/encode", methods=["GET", "POST"]) @app.route("/encode", methods=["GET", "POST"])
@login_required
def encode_page(): def encode_page():
if request.method == "POST": if request.method == "POST":
try: try:
@@ -926,6 +971,7 @@ def encode_page():
@app.route("/encode/result/<file_id>") @app.route("/encode/result/<file_id>")
@login_required
def encode_result(file_id): def encode_result(file_id):
if file_id not in TEMP_FILES: if file_id not in TEMP_FILES:
flash("File expired or not found. Please encode again.", "error") flash("File expired or not found. Please encode again.", "error")
@@ -956,6 +1002,7 @@ def encode_result(file_id):
@app.route("/encode/thumbnail/<thumb_id>") @app.route("/encode/thumbnail/<thumb_id>")
@login_required
def encode_thumbnail(thumb_id): def encode_thumbnail(thumb_id):
"""Serve thumbnail image.""" """Serve thumbnail image."""
if thumb_id not in THUMBNAIL_FILES: if thumb_id not in THUMBNAIL_FILES:
@@ -967,6 +1014,7 @@ def encode_thumbnail(thumb_id):
@app.route("/encode/download/<file_id>") @app.route("/encode/download/<file_id>")
@login_required
def encode_download(file_id): def encode_download(file_id):
if file_id not in TEMP_FILES: if file_id not in TEMP_FILES:
flash("File expired or not found.", "error") flash("File expired or not found.", "error")
@@ -984,6 +1032,7 @@ def encode_download(file_id):
@app.route("/encode/file/<file_id>") @app.route("/encode/file/<file_id>")
@login_required
def encode_file_route(file_id): def encode_file_route(file_id):
"""Serve file for Web Share API.""" """Serve file for Web Share API."""
if file_id not in TEMP_FILES: if file_id not in TEMP_FILES:
@@ -1001,6 +1050,7 @@ def encode_file_route(file_id):
@app.route("/encode/cleanup/<file_id>", methods=["POST"]) @app.route("/encode/cleanup/<file_id>", methods=["POST"])
@login_required
def encode_cleanup(file_id): def encode_cleanup(file_id):
"""Manually cleanup a file after sharing.""" """Manually cleanup a file after sharing."""
TEMP_FILES.pop(file_id, None) TEMP_FILES.pop(file_id, None)
@@ -1018,6 +1068,7 @@ def encode_cleanup(file_id):
@app.route("/decode", methods=["GET", "POST"]) @app.route("/decode", methods=["GET", "POST"])
@login_required
def decode_page(): def decode_page():
if request.method == "POST": if request.method == "POST":
try: try:
@@ -1170,6 +1221,7 @@ def decode_page():
@app.route("/decode/download/<file_id>") @app.route("/decode/download/<file_id>")
@login_required
def decode_download(file_id): def decode_download(file_id):
"""Download decoded file.""" """Download decoded file."""
if file_id not in TEMP_FILES: if file_id not in TEMP_FILES:
@@ -1245,9 +1297,117 @@ def test_capacity_nopil():
) )
# ============================================================================
# AUTHENTICATION ROUTES (v4.0.2)
# ============================================================================
@app.route("/login", methods=["GET", "POST"])
def login():
"""Login page."""
if not app.config.get("AUTH_ENABLED", True):
return redirect(url_for("index"))
if not user_exists():
return redirect(url_for("setup"))
if is_authenticated():
return redirect(url_for("index"))
if request.method == "POST":
password = request.form.get("password", "")
if verify_password(password):
session["authenticated"] = True
session.permanent = True
flash("Login successful", "success")
return redirect(url_for("index"))
else:
flash("Invalid password", "error")
return render_template("login.html", username=get_username())
@app.route("/logout")
def logout():
"""Logout and clear session."""
session.clear()
flash("Logged out successfully", "success")
return redirect(url_for("index"))
@app.route("/setup", methods=["GET", "POST"])
def setup():
"""First-run setup page."""
if not app.config.get("AUTH_ENABLED", True):
return redirect(url_for("index"))
if user_exists():
return redirect(url_for("login"))
if request.method == "POST":
username = request.form.get("username", "admin")
password = request.form.get("password", "")
password_confirm = request.form.get("password_confirm", "")
if len(password) < 8:
flash("Password must be at least 8 characters", "error")
elif password != password_confirm:
flash("Passwords do not match", "error")
else:
try:
create_user(username, password)
session["authenticated"] = True
session.permanent = True
flash("Admin account created successfully!", "success")
return redirect(url_for("index"))
except Exception as e:
flash(f"Error creating account: {e}", "error")
return render_template("setup.html")
@app.route("/account", methods=["GET", "POST"])
@login_required
def account():
"""Account management page."""
if request.method == "POST":
current = request.form.get("current_password", "")
new = request.form.get("new_password", "")
new_confirm = request.form.get("new_password_confirm", "")
if new != new_confirm:
flash("New passwords do not match", "error")
else:
success, message = change_password(current, new)
flash(message, "success" if success else "error")
return render_template("account.html", username=get_username())
# ============================================================================ # ============================================================================
# MAIN # MAIN
# ============================================================================ # ============================================================================
if __name__ == "__main__": if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False) base_dir = Path(__file__).parent
# HTTPS configuration
ssl_context = None
if app.config.get("HTTPS_ENABLED", False):
hostname = os.environ.get("STEGASOO_HOSTNAME", "localhost")
cert_path, key_path = ensure_certs(base_dir, hostname)
ssl_context = (str(cert_path), str(key_path))
print(f"HTTPS enabled with self-signed certificate for {hostname}")
# Auth status
if app.config.get("AUTH_ENABLED", True):
print("Authentication enabled")
else:
print("Authentication disabled")
app.run(
host="0.0.0.0",
port=5000,
debug=False,
ssl_context=ssl_context,
)

162
frontends/web/auth.py Normal file
View File

@@ -0,0 +1,162 @@
"""
Stegasoo Authentication Module
Single-admin authentication with Argon2 password hashing.
Uses Flask sessions for authentication state and SQLite3 for storage.
"""
import functools
import sqlite3
from pathlib import Path
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from flask import current_app, g, redirect, session, url_for
# Argon2 password hasher (lighter than stegasoo's 256MB for faster login)
ph = PasswordHasher(
time_cost=3,
memory_cost=65536, # 64MB
parallelism=4,
hash_len=32,
salt_len=16,
)
def get_db_path() -> Path:
"""Get database path in Flask instance folder."""
instance_path = Path(current_app.instance_path)
instance_path.mkdir(parents=True, exist_ok=True)
return instance_path / "stegasoo.db"
def get_db() -> sqlite3.Connection:
"""Get database connection, cached on Flask g object."""
if "db" not in g:
g.db = sqlite3.connect(get_db_path())
g.db.row_factory = sqlite3.Row
return g.db
def close_db(e=None):
"""Close database connection at end of request."""
db = g.pop("db", None)
if db is not None:
db.close()
def init_db():
"""Initialize database schema."""
db = get_db()
db.executescript("""
CREATE TABLE IF NOT EXISTS admin_user (
id INTEGER PRIMARY KEY CHECK (id = 1),
username TEXT NOT NULL DEFAULT 'admin',
password_hash TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
db.commit()
def user_exists() -> bool:
"""Check if admin user has been created."""
db = get_db()
result = db.execute("SELECT 1 FROM admin_user WHERE id = 1").fetchone()
return result is not None
def create_user(username: str, password: str):
"""Create admin user (first-run setup)."""
if user_exists():
raise ValueError("Admin user already exists")
password_hash = ph.hash(password)
db = get_db()
db.execute(
"INSERT INTO admin_user (id, username, password_hash) VALUES (1, ?, ?)",
(username, password_hash),
)
db.commit()
def get_username() -> str:
"""Get the admin username."""
db = get_db()
row = db.execute("SELECT username FROM admin_user WHERE id = 1").fetchone()
return row["username"] if row else "admin"
def verify_password(password: str) -> bool:
"""Verify password against stored hash."""
db = get_db()
row = db.execute("SELECT password_hash FROM admin_user WHERE id = 1").fetchone()
if not row:
return False
try:
ph.verify(row["password_hash"], password)
# Rehash if parameters changed
if ph.check_needs_rehash(row["password_hash"]):
new_hash = ph.hash(password)
db.execute(
"UPDATE admin_user SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = 1",
(new_hash,),
)
db.commit()
return True
except VerifyMismatchError:
return False
def change_password(current_password: str, new_password: str) -> tuple[bool, str]:
"""Change admin password. Returns (success, message)."""
if not verify_password(current_password):
return False, "Current password is incorrect"
if len(new_password) < 8:
return False, "New password must be at least 8 characters"
new_hash = ph.hash(new_password)
db = get_db()
db.execute(
"UPDATE admin_user SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = 1",
(new_hash,),
)
db.commit()
return True, "Password changed successfully"
def is_authenticated() -> bool:
"""Check if current session is authenticated."""
return session.get("authenticated", False)
def login_required(f):
"""Decorator to require login for a route."""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# Check if auth is enabled
if not current_app.config.get("AUTH_ENABLED", True):
return f(*args, **kwargs)
# Check for first-run setup
if not user_exists():
return redirect(url_for("setup"))
# Check authentication
if not is_authenticated():
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated_function
def init_app(app):
"""Initialize auth module with Flask app."""
app.teardown_appcontext(close_db)
with app.app_context():
init_db()

111
frontends/web/ssl_utils.py Normal file
View File

@@ -0,0 +1,111 @@
"""
SSL Certificate Utilities
Auto-generates self-signed certificates for HTTPS.
Uses cryptography library (already a dependency).
"""
import datetime
import ipaddress
from pathlib import Path
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
def get_cert_paths(base_dir: Path) -> tuple[Path, Path]:
"""Get paths for cert and key files."""
cert_dir = base_dir / "certs"
cert_dir.mkdir(parents=True, exist_ok=True)
return cert_dir / "server.crt", cert_dir / "server.key"
def certs_exist(base_dir: Path) -> bool:
"""Check if both cert files exist."""
cert_path, key_path = get_cert_paths(base_dir)
return cert_path.exists() and key_path.exists()
def generate_self_signed_cert(
base_dir: Path,
hostname: str = "localhost",
days_valid: int = 365,
) -> tuple[Path, Path]:
"""
Generate self-signed SSL certificate.
Args:
base_dir: Base directory for certs folder
hostname: Server hostname for certificate
days_valid: Certificate validity in days
Returns:
Tuple of (cert_path, key_path)
"""
cert_path, key_path = get_cert_paths(base_dir)
# Generate RSA key
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Create certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Stegasoo"),
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
])
# Subject Alternative Names
san_list = [
x509.DNSName(hostname),
x509.DNSName("localhost"),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]
# Add the hostname as IP if it looks like one
try:
san_list.append(x509.IPAddress(ipaddress.IPv4Address(hostname)))
except ipaddress.AddressValueError:
pass
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=days_valid))
.add_extension(
x509.SubjectAlternativeName(san_list),
critical=False,
)
.sign(key, hashes.SHA256())
)
# Write key file (chmod 600)
key_path.write_bytes(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
key_path.chmod(0o600)
# Write cert file
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
return cert_path, key_path
def ensure_certs(base_dir: Path, hostname: str = "localhost") -> tuple[Path, Path]:
"""Ensure certificates exist, generating if needed."""
if certs_exist(base_dir):
return get_cert_paths(base_dir)
print(f"Generating self-signed SSL certificate for {hostname}...")
return generate_self_signed_cert(base_dir, hostname)

View File

@@ -724,6 +724,7 @@ footer {
.scan-data-value { .scan-data-value {
color: rgba(0, 255, 170, 1); color: rgba(0, 255, 170, 1);
font-weight: 600; font-weight: 600;
font-size: 0.65rem;
} }
.scan-hash-preview { .scan-hash-preview {
@@ -744,7 +745,7 @@ footer {
border: 1px solid rgba(0, 255, 170, 0.4); border: 1px solid rgba(0, 255, 170, 0.4);
border-radius: 3px; border-radius: 3px;
padding: 2px 6px; padding: 2px 6px;
font-size: 0.5rem; font-size: 0.65rem;
color: rgba(0, 255, 170, 1); color: rgba(0, 255, 170, 1);
text-transform: uppercase; text-transform: uppercase;
letter-spacing: 0.3px; letter-spacing: 0.3px;
@@ -1001,6 +1002,7 @@ footer {
.pixel-data-value { .pixel-data-value {
color: #d4e157; color: #d4e157;
font-weight: 600; font-weight: 600;
font-size: 0.65rem;
} }
.pixel-status-badge { .pixel-status-badge {
@@ -1010,7 +1012,7 @@ footer {
border: 1px solid rgba(212, 225, 87, 0.4); border: 1px solid rgba(212, 225, 87, 0.4);
border-radius: 3px; border-radius: 3px;
padding: 2px 6px; padding: 2px 6px;
font-size: 0.55rem; font-size: 0.65rem;
color: #d4e157; color: #d4e157;
text-transform: uppercase; text-transform: uppercase;
letter-spacing: 0.5px; letter-spacing: 0.5px;
@@ -1047,10 +1049,10 @@ footer {
/* Expand drop zone when showing scanned QR result */ /* Expand drop zone when showing scanned QR result */
#rsaQrSection .drop-zone:has(.qr-scan-container:not(.d-none)) { #rsaQrSection .drop-zone:has(.qr-scan-container:not(.d-none)) {
width: auto; width: auto;
min-width: 200px; min-width: 280px;
max-width: 280px; max-width: 400px;
height: auto; height: auto;
min-height: 200px; min-height: 280px;
aspect-ratio: auto; aspect-ratio: auto;
} }
@@ -1070,9 +1072,9 @@ footer {
overflow: visible; overflow: visible;
border-radius: 8px; border-radius: 8px;
background: rgba(0, 0, 0, 0.3); background: rgba(0, 0, 0, 0.3);
min-height: 160px; min-height: 220px;
min-width: 160px; min-width: 220px;
padding: 10px; padding: 12px;
display: flex; display: flex;
justify-content: center; justify-content: center;
align-items: center; align-items: center;
@@ -1090,10 +1092,10 @@ footer {
/* Cropped image - hidden until loaded, scales UP to fill container */ /* Cropped image - hidden until loaded, scales UP to fill container */
.qr-scan-container .qr-cropped { .qr-scan-container .qr-cropped {
max-height: 180px; max-height: 240px;
max-width: 180px; max-width: 240px;
min-width: 140px; min-width: 180px;
min-height: 140px; min-height: 180px;
width: auto; width: auto;
height: auto; height: auto;
object-fit: contain; object-fit: contain;
@@ -1255,11 +1257,11 @@ footer {
left: 0; left: 0;
right: 0; right: 0;
z-index: 10; z-index: 10;
background: linear-gradient(to top, background: linear-gradient(to top,
rgba(10, 15, 30, 0.95) 0%, rgba(10, 15, 30, 0.95) 0%,
rgba(10, 15, 30, 0.6) 80%, rgba(10, 15, 30, 0.6) 80%,
transparent 100%); transparent 100%);
padding: 4px 6px 3px 6px; padding: 8px 10px 6px 10px;
opacity: 0; opacity: 0;
transition: opacity 0.3s ease; transition: opacity 0.3s ease;
border-radius: 0 0 6px 6px; border-radius: 0 0 6px 6px;
@@ -1282,10 +1284,10 @@ footer {
/* QR Data Panel text styles */ /* QR Data Panel text styles */
.qr-data-filename { .qr-data-filename {
font-family: 'Courier New', monospace; font-family: 'Courier New', monospace;
font-size: 0.6rem; font-size: 0.7rem;
color: #fff; color: #fff;
text-align: center; text-align: center;
margin-bottom: 2px; margin-bottom: 3px;
white-space: nowrap; white-space: nowrap;
overflow: hidden; overflow: hidden;
text-overflow: ellipsis; text-overflow: ellipsis;
@@ -1301,7 +1303,7 @@ footer {
justify-content: space-between; justify-content: space-between;
align-items: center; align-items: center;
font-family: 'Courier New', monospace; font-family: 'Courier New', monospace;
font-size: 0.5rem; font-size: 0.6rem;
white-space: nowrap; white-space: nowrap;
} }
@@ -1310,9 +1312,9 @@ footer {
align-items: center; align-items: center;
background: rgba(0, 255, 170, 0.15); background: rgba(0, 255, 170, 0.15);
border: 1px solid rgba(0, 255, 170, 0.4); border: 1px solid rgba(0, 255, 170, 0.4);
border-radius: 2px; border-radius: 3px;
padding: 1px 4px; padding: 2px 6px;
font-size: 0.45rem; font-size: 0.65rem;
color: rgba(0, 255, 170, 1); color: rgba(0, 255, 170, 1);
text-transform: uppercase; text-transform: uppercase;
letter-spacing: 0.3px; letter-spacing: 0.3px;
@@ -1321,7 +1323,7 @@ footer {
.qr-data-value { .qr-data-value {
color: rgba(0, 255, 170, 1); color: rgba(0, 255, 170, 1);
font-weight: 600; font-weight: 600;
font-size: 0.5rem; font-size: 0.65rem;
} }
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------

View File

@@ -0,0 +1,102 @@
{% extends "base.html" %}
{% block title %}Account - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card">
<div class="card-header">
<h5 class="mb-0"><i class="bi bi-person-gear me-2"></i>Account Settings</h5>
</div>
<div class="card-body">
<p class="text-muted mb-4">
Logged in as <strong>{{ username }}</strong>
</p>
<h6 class="text-muted mb-3">Change Password</h6>
<form method="POST" action="{{ url_for('account') }}" id="accountForm">
<div class="mb-3">
<label class="form-label">
<i class="bi bi-key me-1"></i> Current Password
</label>
<div class="input-group">
<input type="password" name="current_password" class="form-control"
id="currentPasswordInput" required>
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('currentPasswordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<div class="mb-3">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> New Password
</label>
<div class="input-group">
<input type="password" name="new_password" class="form-control"
id="newPasswordInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('newPasswordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
<div class="form-text">Minimum 8 characters</div>
</div>
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> Confirm New Password
</label>
<div class="input-group">
<input type="password" name="new_password_confirm" class="form-control"
id="newPasswordConfirmInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('newPasswordConfirmInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<button type="submit" class="btn btn-primary w-100">
<i class="bi bi-check-lg me-2"></i>Update Password
</button>
</form>
<hr class="my-4">
<a href="{{ url_for('logout') }}" class="btn btn-outline-danger w-100">
<i class="bi bi-box-arrow-left me-2"></i>Logout
</a>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
function togglePassword(inputId, btn) {
const input = document.getElementById(inputId);
const icon = btn.querySelector('i');
if (input.type === 'password') {
input.type = 'text';
icon.classList.replace('bi-eye', 'bi-eye-slash');
} else {
input.type = 'password';
icon.classList.replace('bi-eye-slash', 'bi-eye');
}
}
document.getElementById('accountForm')?.addEventListener('submit', function(e) {
const newPass = document.getElementById('newPasswordInput').value;
const confirm = document.getElementById('newPasswordConfirmInput').value;
if (newPass !== confirm) {
e.preventDefault();
alert('New passwords do not match');
}
});
</script>
{% endblock %}

View File

@@ -24,20 +24,38 @@
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="/"><i class="bi bi-house me-1"></i> Home</a> <a class="nav-link" href="/"><i class="bi bi-house me-1"></i> Home</a>
</li> </li>
{% if not auth_enabled or is_authenticated %}
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="/encode"><i class="bi bi-lock me-1"></i> Encode</a> <a class="nav-link" href="/encode"><i class="bi bi-lock me-1"></i> Encode</a>
</li> </li>
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="/decode"><i class="bi bi-unlock me-1"></i> Decode</a> <a class="nav-link" href="/decode"><i class="bi bi-unlock me-1"></i> Decode</a>
</li> </li>
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="/generate"><i class="bi bi-key me-1"></i> Generate</a> <a class="nav-link" href="/generate"><i class="bi bi-key me-1"></i> Generate</a>
</li> </li>
{% endif %}
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="/about"><i class="bi bi-info-circle me-1"></i> About</a> <a class="nav-link" href="/about"><i class="bi bi-info-circle me-1"></i> About</a>
</li> </li>
{% if auth_enabled %}
{% if is_authenticated %}
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown">
<i class="bi bi-person-circle me-1"></i> {{ username }}
</a>
<ul class="dropdown-menu dropdown-menu-end dropdown-menu-dark">
<li><a class="dropdown-item" href="/account"><i class="bi bi-gear me-2"></i>Account</a></li>
<li><hr class="dropdown-divider"></li>
<li><a class="dropdown-item" href="/logout"><i class="bi bi-box-arrow-left me-2"></i>Logout</a></li>
</ul>
</li>
{% else %}
<li class="nav-item">
<a class="nav-link" href="/login"><i class="bi bi-box-arrow-in-right me-1"></i> Login</a>
</li>
{% endif %}
{% endif %}
</ul> </ul>
</div> </div>
</div> </div>

View File

@@ -327,11 +327,11 @@
<!-- PIN + Channel Row --> <!-- PIN + Channel Row -->
<div class="row"> <div class="row">
<div class="col-md-4 mb-3"> <div class="col-md-6 mb-3">
<div class="security-box h-100"> <div class="security-box h-100">
<label class="form-label"><i class="bi bi-123 me-1"></i> PIN</label> <label class="form-label"><i class="bi bi-123 me-1"></i> PIN</label>
<div class="input-group pin-input-container"> <div class="input-group pin-input-container">
<input type="password" name="pin" class="form-control" id="pinInput" placeholder="••••••" maxlength="9" style="max-width: 180px;"> <input type="password" name="pin" class="form-control" id="pinInput" placeholder="••••••" maxlength="9">
<button class="btn btn-outline-secondary" type="button" data-toggle-password="pinInput"> <button class="btn btn-outline-secondary" type="button" data-toggle-password="pinInput">
<i class="bi bi-eye"></i> <i class="bi bi-eye"></i>
</button> </button>
@@ -340,7 +340,7 @@
</div> </div>
</div> </div>
<div class="col-md-8 mb-3"> <div class="col-md-6 mb-3">
<div class="security-box h-100"> <div class="security-box h-100">
<label class="form-label"> <label class="form-label">
<i class="bi bi-broadcast me-1"></i> Channel <i class="bi bi-broadcast me-1"></i> Channel

View File

@@ -394,11 +394,11 @@
<!-- PIN + Channel Row --> <!-- PIN + Channel Row -->
<div class="row"> <div class="row">
<div class="col-md-4 mb-3"> <div class="col-md-6 mb-3">
<div class="security-box h-100"> <div class="security-box h-100">
<label class="form-label"><i class="bi bi-123 me-1"></i> PIN</label> <label class="form-label"><i class="bi bi-123 me-1"></i> PIN</label>
<div class="input-group pin-input-container"> <div class="input-group pin-input-container">
<input type="password" name="pin" class="form-control" id="pinInput" placeholder="••••••" maxlength="9" style="max-width: 180px;"> <input type="password" name="pin" class="form-control" id="pinInput" placeholder="••••••" maxlength="9">
<button class="btn btn-outline-secondary" type="button" data-toggle-password="pinInput"> <button class="btn btn-outline-secondary" type="button" data-toggle-password="pinInput">
<i class="bi bi-eye"></i> <i class="bi bi-eye"></i>
</button> </button>
@@ -407,7 +407,7 @@
</div> </div>
</div> </div>
<div class="col-md-8 mb-3"> <div class="col-md-6 mb-3">
<div class="security-box h-100"> <div class="security-box h-100">
<label class="form-label"> <label class="form-label">
<i class="bi bi-broadcast me-1"></i> Channel <i class="bi bi-broadcast me-1"></i> Channel

View File

@@ -0,0 +1,61 @@
{% extends "base.html" %}
{% block title %}Login - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-5 col-lg-4">
<div class="card">
<div class="card-header text-center">
<i class="bi bi-shield-lock fs-1 d-block mb-2"></i>
<h5 class="mb-0">Login</h5>
</div>
<div class="card-body">
<form method="POST" action="{{ url_for('login') }}">
<div class="mb-3">
<label class="form-label">
<i class="bi bi-person me-1"></i> Username
</label>
<input type="text" name="username" class="form-control"
value="{{ username }}" readonly>
</div>
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key me-1"></i> Password
</label>
<div class="input-group">
<input type="password" name="password" class="form-control"
id="passwordInput" required autofocus>
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<button type="submit" class="btn btn-primary w-100">
<i class="bi bi-box-arrow-in-right me-2"></i>Login
</button>
</form>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
function togglePassword(inputId, btn) {
const input = document.getElementById(inputId);
const icon = btn.querySelector('i');
if (input.type === 'password') {
input.type = 'text';
icon.classList.replace('bi-eye', 'bi-eye-slash');
} else {
input.type = 'password';
icon.classList.replace('bi-eye-slash', 'bi-eye');
}
}
</script>
{% endblock %}

View File

@@ -0,0 +1,94 @@
{% extends "base.html" %}
{% block title %}Setup - Stegasoo{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6 col-lg-5">
<div class="card">
<div class="card-header text-center">
<i class="bi bi-gear-fill fs-1 d-block mb-2"></i>
<h5 class="mb-0">Initial Setup</h5>
</div>
<div class="card-body">
<p class="text-muted text-center mb-4">
Welcome to Stegasoo! Create your admin account to get started.
</p>
<form method="POST" action="{{ url_for('setup') }}" id="setupForm">
<div class="mb-3">
<label class="form-label">
<i class="bi bi-person me-1"></i> Username
</label>
<input type="text" name="username" class="form-control"
value="admin" required minlength="3">
</div>
<div class="mb-3">
<label class="form-label">
<i class="bi bi-key me-1"></i> Password
</label>
<div class="input-group">
<input type="password" name="password" class="form-control"
id="passwordInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
<div class="form-text">Minimum 8 characters</div>
</div>
<div class="mb-4">
<label class="form-label">
<i class="bi bi-key-fill me-1"></i> Confirm Password
</label>
<div class="input-group">
<input type="password" name="password_confirm" class="form-control"
id="passwordConfirmInput" required minlength="8">
<button class="btn btn-outline-secondary" type="button"
onclick="togglePassword('passwordConfirmInput', this)">
<i class="bi bi-eye"></i>
</button>
</div>
</div>
<button type="submit" class="btn btn-primary w-100">
<i class="bi bi-check-lg me-2"></i>Create Admin Account
</button>
</form>
</div>
</div>
<div class="alert alert-info mt-4 small">
<i class="bi bi-info-circle me-2"></i>
This is a single-user setup. The admin account has full access to all features.
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
function togglePassword(inputId, btn) {
const input = document.getElementById(inputId);
const icon = btn.querySelector('i');
if (input.type === 'password') {
input.type = 'text';
icon.classList.replace('bi-eye', 'bi-eye-slash');
} else {
input.type = 'password';
icon.classList.replace('bi-eye-slash', 'bi-eye');
}
}
document.getElementById('setupForm')?.addEventListener('submit', function(e) {
const pass = document.getElementById('passwordInput').value;
const confirm = document.getElementById('passwordConfirmInput').value;
if (pass !== confirm) {
e.preventDefault();
alert('Passwords do not match');
}
});
</script>
{% endblock %}

View File

@@ -1,3 +0,0 @@
#!/bin/bash
cd ./frontends/web/
python app.py

View File

@@ -1,4 +0,0 @@
#!/bin/bash
sudo docker-compose down
sudo docker-compose build
sudo docker-compose up -d

View File

@@ -1,9 +1,14 @@
""" """
Stegasoo Constants and Configuration (v4.0.1 - Channel Key Support) Stegasoo Constants and Configuration (v4.0.2 - Web UI Authentication)
Central location for all magic numbers, limits, and crypto parameters. Central location for all magic numbers, limits, and crypto parameters.
All version numbers, limits, and configuration values should be defined here. All version numbers, limits, and configuration values should be defined here.
CHANGES in v4.0.2:
- Added Web UI authentication with SQLite3 user storage
- Added optional HTTPS with auto-generated self-signed certificates
- UI improvements for QR preview panels and PIN/channel columns
BREAKING CHANGES in v4.0.0: BREAKING CHANGES in v4.0.0:
- Added channel key support for deployment/group isolation - Added channel key support for deployment/group isolation
- FORMAT_VERSION bumped to 5 (adds flags byte to header) - FORMAT_VERSION bumped to 5 (adds flags byte to header)
@@ -20,7 +25,7 @@ from pathlib import Path
# VERSION # VERSION
# ============================================================================ # ============================================================================
__version__ = "4.0.1" __version__ = "4.0.2"
# ============================================================================ # ============================================================================
# FILE FORMAT # FILE FORMAT

View File

@@ -1,974 +0,0 @@
"""
DCT Domain Steganography Module (v3.2.0)
Embeds data in DCT coefficients with two approaches:
1. PNG output: Scipy-based DCT transform (grayscale or color)
2. JPEG output: jpegio-based coefficient manipulation (if available)
The JPEG approach is the "correct" way to do JPEG steganography because
it directly modifies the already-quantized coefficients without re-encoding.
Changes in v3.0.2:
- jpegio integration for proper JPEG coefficient embedding
- Falls back to warning if jpegio not available for JPEG output
- Maintains backward compatibility with v3.0.1
Changes in v3.2.0:
- Fixed color-mode extraction to properly extract from Y channel
- Added _extract_from_y_channel() for accurate color-mode extraction
- Improved extraction robustness for both grayscale and color modes
Requires: scipy (for PNG mode), optionally jpegio (for JPEG mode)
"""
import io
import struct
import hashlib
from dataclasses import dataclass
from typing import Optional, Literal, Tuple
from enum import Enum
import numpy as np
from PIL import Image
# Check for scipy availability (for PNG/DCT mode)
try:
from scipy.fftpack import dct, idct
HAS_SCIPY = True
except ImportError:
HAS_SCIPY = False
dct = None
idct = None
# Check for jpegio availability (for proper JPEG mode)
try:
import jpegio as jio
HAS_JPEGIO = True
except ImportError:
HAS_JPEGIO = False
jio = None
# ============================================================================
# CONSTANTS
# ============================================================================
# DCT block size (standard 8x8 like JPEG)
BLOCK_SIZE = 8
# Coefficients to use for embedding (mid-frequency, zig-zag order positions)
EMBED_POSITIONS = [
(0, 1), (1, 0), (2, 0), (1, 1), (0, 2), (0, 3), (1, 2), (2, 1), (3, 0),
(4, 0), (3, 1), (2, 2), (1, 3), (0, 4), (0, 5), (1, 4), (2, 3), (3, 2),
(4, 1), (5, 0), (5, 1), (4, 2), (3, 3), (2, 4), (1, 5), (0, 6), (0, 7),
(1, 6), (2, 5), (3, 4), (4, 3), (5, 2), (6, 1), (7, 0),
]
# Use subset of mid-frequency coefficients for better robustness
DEFAULT_EMBED_POSITIONS = EMBED_POSITIONS[4:20] # 16 coefficients per block
# Quantization step for QIM embedding (larger = more robust, more visible)
QUANT_STEP = 25
# Magic bytes for DCT stego identification
DCT_MAGIC = b'DCTS'
# Header size: magic(4) + version(1) + flags(1) + length(4) = 10 bytes
HEADER_SIZE = 10
# Output format options
OUTPUT_FORMAT_PNG = 'png'
OUTPUT_FORMAT_JPEG = 'jpeg'
# JPEG output quality (only for fallback mode, not jpegio)
JPEG_OUTPUT_QUALITY = 95
# jpegio constants for JPEG coefficient embedding
JPEGIO_MAGIC = b'JPGS'
JPEGIO_MIN_COEF_MAGNITUDE = 2
JPEGIO_EMBED_CHANNEL = 0 # Y channel
# Flag bits for header
FLAG_COLOR_MODE = 0x01 # Set if embedded in color mode (Y channel of YCbCr)
# ============================================================================
# DATA CLASSES
# ============================================================================
class DCTOutputFormat(Enum):
"""Output format for DCT stego images."""
PNG = 'png'
JPEG = 'jpeg'
@dataclass
class DCTEmbedStats:
"""Statistics from DCT embedding operation."""
blocks_used: int
blocks_available: int
bits_embedded: int
capacity_bits: int
usage_percent: float
image_width: int
image_height: int
output_format: str
jpeg_native: bool = False # True if used jpegio for proper JPEG embedding
color_mode: str = 'grayscale' # 'color' or 'grayscale' (v3.0.1+)
@dataclass
class DCTCapacityInfo:
"""Capacity information for a carrier image."""
width: int
height: int
blocks_x: int
blocks_y: int
total_blocks: int
bits_per_block: int
total_capacity_bits: int
total_capacity_bytes: int
usable_capacity_bytes: int
# ============================================================================
# AVAILABILITY CHECKS
# ============================================================================
def _check_scipy():
"""Raise ImportError if scipy is not available."""
if not HAS_SCIPY:
raise ImportError(
"DCT steganography requires scipy. "
"Install with: pip install scipy"
)
def has_dct_support() -> bool:
"""Check if DCT steganography is available (scipy installed)."""
return HAS_SCIPY
def has_jpegio_support() -> bool:
"""Check if jpegio is available for proper JPEG coefficient embedding."""
return HAS_JPEGIO
# ============================================================================
# SCIPY DCT HELPERS (for PNG output)
# ============================================================================
def _dct2(block: np.ndarray) -> np.ndarray:
"""Apply 2D DCT to a block."""
return dct(dct(block.T, norm='ortho').T, norm='ortho')
def _idct2(block: np.ndarray) -> np.ndarray:
"""Apply 2D inverse DCT to a block."""
return idct(idct(block.T, norm='ortho').T, norm='ortho')
def _to_grayscale(image_data: bytes) -> np.ndarray:
"""Convert image bytes to grayscale numpy array."""
img = Image.open(io.BytesIO(image_data))
gray = img.convert('L')
return np.array(gray, dtype=np.float64)
def _extract_y_channel(image_data: bytes) -> np.ndarray:
"""
Extract Y (luminance) channel from image for color-mode extraction.
This uses the same YCbCr conversion as embedding to ensure
accurate extraction from color-mode stego images.
Args:
image_data: Image file bytes
Returns:
Y channel as float64 numpy array
"""
img = Image.open(io.BytesIO(image_data))
# Convert to RGB if needed
if img.mode != 'RGB':
img = img.convert('RGB')
rgb_array = np.array(img, dtype=np.float64)
# Extract Y channel using ITU-R BT.601 (same as embedding)
R = rgb_array[:, :, 0]
G = rgb_array[:, :, 1]
B = rgb_array[:, :, 2]
Y = 0.299 * R + 0.587 * G + 0.114 * B
return Y
def _pad_to_blocks(image: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
"""Pad image dimensions to be divisible by block size."""
h, w = image.shape
new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
if new_h == h and new_w == w:
return image, (h, w)
padded = np.zeros((new_h, new_w), dtype=image.dtype)
padded[:h, :w] = image
if new_h > h:
padded[h:, :w] = image[h-(new_h-h):h, :w][::-1, :]
if new_w > w:
padded[:h, w:] = image[:h, w-(new_w-w):w][:, ::-1]
if new_h > h and new_w > w:
padded[h:, w:] = image[h-(new_h-h):h, w-(new_w-w):w][::-1, ::-1]
return padded, (h, w)
def _unpad_image(image: np.ndarray, original_size: Tuple[int, int]) -> np.ndarray:
"""Remove padding from image."""
h, w = original_size
return image[:h, :w]
def _embed_bit_in_coeff(coef: float, bit: int, quant_step: int = QUANT_STEP) -> float:
"""Embed a single bit into a DCT coefficient using QIM."""
quantized = round(coef / quant_step)
if (quantized % 2) != bit:
if quantized % 2 == 0 and bit == 1:
quantized += 1 if coef >= quantized * quant_step else -1
elif quantized % 2 == 1 and bit == 0:
quantized += 1 if coef >= quantized * quant_step else -1
return quantized * quant_step
def _extract_bit_from_coeff(coef: float, quant_step: int = QUANT_STEP) -> int:
"""Extract a single bit from a DCT coefficient."""
quantized = round(coef / quant_step)
return quantized % 2
def _generate_block_order(num_blocks: int, seed: bytes) -> list:
"""Generate pseudo-random block order from seed."""
hash_bytes = hashlib.sha256(seed).digest()
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
order = list(range(num_blocks))
rng.shuffle(order)
return order
def _save_stego_image(image: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes:
"""Save stego image in specified format (grayscale)."""
clipped = np.clip(image, 0, 255).astype(np.uint8)
img = Image.fromarray(clipped, mode='L')
buffer = io.BytesIO()
if output_format == OUTPUT_FORMAT_JPEG:
img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY,
subsampling=0, optimize=True)
else:
img.save(buffer, format='PNG', optimize=True)
return buffer.getvalue()
def _save_color_image(rgb_array: np.ndarray, output_format: str = OUTPUT_FORMAT_PNG) -> bytes:
"""Save color RGB image in specified format."""
clipped = np.clip(rgb_array, 0, 255).astype(np.uint8)
img = Image.fromarray(clipped, mode='RGB')
buffer = io.BytesIO()
if output_format == OUTPUT_FORMAT_JPEG:
img.save(buffer, format='JPEG', quality=JPEG_OUTPUT_QUALITY,
subsampling=0, optimize=True)
else:
img.save(buffer, format='PNG', optimize=True)
return buffer.getvalue()
def _rgb_to_ycbcr(rgb: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Convert RGB array to YCbCr components.
Uses ITU-R BT.601 conversion (standard for JPEG).
Args:
rgb: RGB image array (H, W, 3), float64
Returns:
Tuple of (Y, Cb, Cr) arrays
"""
R = rgb[:, :, 0]
G = rgb[:, :, 1]
B = rgb[:, :, 2]
# ITU-R BT.601 conversion
Y = 0.299 * R + 0.587 * G + 0.114 * B
Cb = 128 - 0.168736 * R - 0.331264 * G + 0.5 * B
Cr = 128 + 0.5 * R - 0.418688 * G - 0.081312 * B
return Y, Cb, Cr
def _ycbcr_to_rgb(Y: np.ndarray, Cb: np.ndarray, Cr: np.ndarray) -> np.ndarray:
"""
Convert YCbCr components back to RGB array.
Args:
Y: Luminance channel
Cb: Blue-difference chroma
Cr: Red-difference chroma
Returns:
RGB array (H, W, 3)
"""
R = Y + 1.402 * (Cr - 128)
G = Y - 0.344136 * (Cb - 128) - 0.714136 * (Cr - 128)
B = Y + 1.772 * (Cb - 128)
rgb = np.stack([R, G, B], axis=-1)
return rgb
def _create_header(data_length: int, flags: int = 0) -> bytes:
"""Create DCT stego header."""
version = 1
return struct.pack('>4sBBI', DCT_MAGIC, version, flags, data_length)
def _parse_header(header_bits: list) -> Tuple[int, int, int]:
"""Parse header from extracted bits. Returns (version, flags, data_length)."""
if len(header_bits) < HEADER_SIZE * 8:
raise ValueError("Insufficient header data")
header_bytes = bytes([
sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(HEADER_SIZE)
])
magic, version, flags, length = struct.unpack('>4sBBI', header_bytes)
if magic != DCT_MAGIC:
raise ValueError("Invalid DCT stego magic bytes")
return version, flags, length
# ============================================================================
# JPEGIO HELPERS (for proper JPEG output)
# ============================================================================
def _jpegio_bytes_to_file(data: bytes, suffix: str = '.jpg') -> str:
"""Write bytes to temp file for jpegio."""
import tempfile
import os
fd, path = tempfile.mkstemp(suffix=suffix)
try:
os.write(fd, data)
finally:
os.close(fd)
return path
def _jpegio_file_to_bytes(path: str) -> bytes:
"""Read file to bytes and delete it."""
import os
try:
with open(path, 'rb') as f:
return f.read()
finally:
try:
os.unlink(path)
except OSError:
pass
def _jpegio_get_usable_positions(coef_array: np.ndarray) -> list:
"""Get usable coefficient positions for jpegio embedding."""
positions = []
h, w = coef_array.shape
for row in range(h):
for col in range(w):
# Skip DC coefficients
if (row % BLOCK_SIZE == 0) and (col % BLOCK_SIZE == 0):
continue
# Check magnitude
if abs(coef_array[row, col]) >= JPEGIO_MIN_COEF_MAGNITUDE:
positions.append((row, col))
return positions
def _jpegio_generate_order(num_positions: int, seed: bytes) -> list:
"""Generate pseudo-random order for jpegio embedding."""
hash_bytes = hashlib.sha256(seed + b"jpeg_coef_order").digest()
rng = np.random.RandomState(int.from_bytes(hash_bytes[:4], 'big'))
order = list(range(num_positions))
rng.shuffle(order)
return order
def _jpegio_create_header(data_length: int, flags: int = 0) -> bytes:
"""Create header for jpegio embedding."""
return struct.pack('>4sBBI', JPEGIO_MAGIC, 1, flags, data_length)
def _jpegio_parse_header(header_bytes: bytes) -> Tuple[int, int, int]:
"""Parse jpegio header."""
if len(header_bytes) < HEADER_SIZE:
raise ValueError("Insufficient header data")
magic, version, flags, length = struct.unpack('>4sBBI', header_bytes[:HEADER_SIZE])
if magic != JPEGIO_MAGIC:
raise ValueError(f"Invalid JPEG stego magic: {magic}")
return version, flags, length
# ============================================================================
# PUBLIC API
# ============================================================================
def calculate_dct_capacity(image_data: bytes) -> DCTCapacityInfo:
"""
Calculate the DCT embedding capacity of an image.
Args:
image_data: Image file bytes
Returns:
DCTCapacityInfo with capacity details
"""
_check_scipy()
img = Image.open(io.BytesIO(image_data))
width, height = img.size
blocks_x = width // BLOCK_SIZE
blocks_y = height // BLOCK_SIZE
total_blocks = blocks_x * blocks_y
bits_per_block = len(DEFAULT_EMBED_POSITIONS)
total_bits = total_blocks * bits_per_block
total_bytes = total_bits // 8
usable_bytes = max(0, total_bytes - HEADER_SIZE)
return DCTCapacityInfo(
width=width,
height=height,
blocks_x=blocks_x,
blocks_y=blocks_y,
total_blocks=total_blocks,
bits_per_block=bits_per_block,
total_capacity_bits=total_bits,
total_capacity_bytes=total_bytes,
usable_capacity_bytes=usable_bytes
)
def will_fit_dct(data_length: int, image_data: bytes) -> bool:
"""Check if data will fit in the image using DCT embedding."""
capacity = calculate_dct_capacity(image_data)
return data_length <= capacity.usable_capacity_bytes
def estimate_capacity_comparison(image_data: bytes) -> dict:
"""Compare LSB and DCT capacity for an image."""
img = Image.open(io.BytesIO(image_data))
width, height = img.size
pixels = width * height
lsb_bytes = (pixels * 3) // 8
if HAS_SCIPY:
dct_info = calculate_dct_capacity(image_data)
dct_bytes = dct_info.usable_capacity_bytes
else:
blocks = (width // 8) * (height // 8)
dct_bytes = (blocks * 16) // 8 - HEADER_SIZE
return {
'width': width,
'height': height,
'lsb': {
'capacity_bytes': lsb_bytes,
'capacity_kb': lsb_bytes / 1024,
'output': 'PNG/BMP (color)',
},
'dct': {
'capacity_bytes': dct_bytes,
'capacity_kb': dct_bytes / 1024,
'output': 'PNG or JPEG (grayscale)',
'ratio_vs_lsb': (dct_bytes / lsb_bytes * 100) if lsb_bytes > 0 else 0,
'available': HAS_SCIPY,
},
'jpeg_native': {
'available': HAS_JPEGIO,
'note': 'Uses jpegio for proper JPEG coefficient embedding',
}
}
def embed_in_dct(
data: bytes,
carrier_image: bytes,
seed: bytes,
output_format: str = OUTPUT_FORMAT_PNG,
color_mode: str = 'color', # v3.0.1: 'color' or 'grayscale'
) -> Tuple[bytes, DCTEmbedStats]:
"""
Embed data into image using DCT coefficient modification.
For PNG output: Uses scipy DCT transform
For JPEG output: Uses jpegio if available for proper coefficient embedding
Args:
data: Data to embed
carrier_image: Carrier image bytes
seed: Seed for pseudo-random selection
output_format: 'png' (default, lossless) or 'jpeg'
color_mode: 'color' (preserve colors) or 'grayscale' (v3.0.1+)
Returns:
Tuple of (stego_image_bytes, stats)
"""
# Validate output format
if output_format not in (OUTPUT_FORMAT_PNG, OUTPUT_FORMAT_JPEG):
raise ValueError(f"Invalid output format: {output_format}")
# Validate color mode
if color_mode not in ('color', 'grayscale'):
color_mode = 'color' # Default to color
# For JPEG output, try to use jpegio for proper coefficient embedding
# Note: jpegio naturally preserves color (works in YCbCr space)
if output_format == OUTPUT_FORMAT_JPEG:
if HAS_JPEGIO:
return _embed_jpegio(data, carrier_image, seed, color_mode)
else:
# Fall back to scipy + PIL JPEG (WARNING: may not decode properly)
import warnings
warnings.warn(
"jpegio not available. JPEG output may not decode correctly. "
"Install jpegio for proper JPEG steganography support.",
RuntimeWarning
)
# Continue with scipy method but output as JPEG
# PNG output or JPEG fallback: use scipy DCT method
_check_scipy()
return _embed_scipy_dct(data, carrier_image, seed, output_format, color_mode)
def _embed_scipy_dct(
data: bytes,
carrier_image: bytes,
seed: bytes,
output_format: str,
color_mode: str = 'color',
) -> Tuple[bytes, DCTEmbedStats]:
"""Embed using scipy DCT (for PNG output), with color preservation option."""
capacity_info = calculate_dct_capacity(carrier_image)
if len(data) > capacity_info.usable_capacity_bytes:
raise ValueError(
f"Data too large ({len(data)} bytes) for carrier "
f"(capacity: {capacity_info.usable_capacity_bytes} bytes)"
)
# Load image
img = Image.open(io.BytesIO(carrier_image))
width, height = img.size
# Set flags for header
flags = FLAG_COLOR_MODE if color_mode == 'color' else 0
if color_mode == 'color' and img.mode in ('RGB', 'RGBA'):
# Color mode: convert to YCbCr, embed in Y only, preserve Cb/Cr
if img.mode == 'RGBA':
img = img.convert('RGB')
rgb_array = np.array(img, dtype=np.float64)
Y, Cb, Cr = _rgb_to_ycbcr(rgb_array)
# Pad Y channel
Y_padded, original_size = _pad_to_blocks(Y)
# Embed in Y channel (with color flag)
Y_embedded = _embed_in_channel(Y_padded, data, seed, capacity_info, flags)
# Unpad
Y_result = _unpad_image(Y_embedded, original_size)
# Convert back to RGB
result_rgb = _ycbcr_to_rgb(Y_result, Cb, Cr)
# Save as color image
stego_bytes = _save_color_image(result_rgb, output_format)
else:
# Grayscale mode: original behavior
image = _to_grayscale(carrier_image)
padded, original_size = _pad_to_blocks(image)
embedded = _embed_in_channel(padded, data, seed, capacity_info, flags)
result = _unpad_image(embedded, original_size)
stego_bytes = _save_stego_image(result, output_format)
# Calculate stats
header = _create_header(len(data), flags)
payload = header + data
bits = len(payload) * 8
stats = DCTEmbedStats(
blocks_used=(bits + len(DEFAULT_EMBED_POSITIONS) - 1) // len(DEFAULT_EMBED_POSITIONS),
blocks_available=capacity_info.total_blocks,
bits_embedded=bits,
capacity_bits=capacity_info.total_capacity_bits,
usage_percent=(bits / capacity_info.total_capacity_bits) * 100,
image_width=width,
image_height=height,
output_format=output_format,
jpeg_native=False,
color_mode=color_mode,
)
return stego_bytes, stats
def _embed_in_channel(
channel: np.ndarray,
data: bytes,
seed: bytes,
capacity_info: DCTCapacityInfo,
flags: int = 0,
) -> np.ndarray:
"""Embed data in a single channel using DCT."""
header = _create_header(len(data), flags)
payload = header + data
bits = []
for byte in payload:
for i in range(7, -1, -1):
bits.append((byte >> i) & 1)
num_blocks = capacity_info.total_blocks
block_order = _generate_block_order(num_blocks, seed)
h, w = channel.shape
result = channel.copy()
bit_idx = 0
for block_num in block_order:
if bit_idx >= len(bits):
break
by = (block_num // (w // BLOCK_SIZE)) * BLOCK_SIZE
bx = (block_num % (w // BLOCK_SIZE)) * BLOCK_SIZE
block = result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE].copy()
dct_block = _dct2(block)
for pos in DEFAULT_EMBED_POSITIONS:
if bit_idx >= len(bits):
break
dct_block[pos] = _embed_bit_in_coeff(dct_block[pos], bits[bit_idx])
bit_idx += 1
modified_block = _idct2(dct_block)
result[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE] = modified_block
return result
def _embed_jpegio(
data: bytes,
carrier_image: bytes,
seed: bytes,
color_mode: str = 'color',
) -> Tuple[bytes, DCTEmbedStats]:
"""
Embed using jpegio for proper JPEG coefficient modification.
Note: jpegio naturally preserves color since JPEG stores YCbCr
and we only modify Y channel coefficients.
"""
import tempfile
import os
# Check if carrier is JPEG - if not, convert it
img = Image.open(io.BytesIO(carrier_image))
width, height = img.size
if img.format != 'JPEG':
# Convert to JPEG first
buffer = io.BytesIO()
if img.mode != 'RGB':
img = img.convert('RGB')
img.save(buffer, format='JPEG', quality=95, subsampling=0)
carrier_image = buffer.getvalue()
# Write carrier to temp file
input_path = _jpegio_bytes_to_file(carrier_image, suffix='.jpg')
output_path = tempfile.mktemp(suffix='.jpg')
# Set flags
flags = FLAG_COLOR_MODE if color_mode == 'color' else 0
try:
# Read JPEG with jpegio
jpeg = jio.read(input_path)
# Get Y channel coefficients (channel 0)
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
# Find usable positions
all_positions = _jpegio_get_usable_positions(coef_array)
# Generate pseudo-random order
order = _jpegio_generate_order(len(all_positions), seed)
# Create payload with flags
header = _jpegio_create_header(len(data), flags)
payload = header + data
# Convert to bits
bits = []
for byte in payload:
for i in range(7, -1, -1):
bits.append((byte >> i) & 1)
if len(bits) > len(all_positions):
raise ValueError(
f"Payload too large: {len(bits)} bits, "
f"only {len(all_positions)} usable coefficients"
)
# Embed using LSB
coefs_used = 0
for bit_idx, pos_idx in enumerate(order):
if bit_idx >= len(bits):
break
row, col = all_positions[pos_idx]
coef = coef_array[row, col]
# Embed bit in LSB
if (coef & 1) != bits[bit_idx]:
if coef > 0:
coef_array[row, col] = coef - 1 if (coef & 1) else coef + 1
else:
coef_array[row, col] = coef + 1 if (coef & 1) else coef - 1
coefs_used += 1
# Write modified JPEG
jio.write(jpeg, output_path)
# Read back as bytes
with open(output_path, 'rb') as f:
stego_bytes = f.read()
stats = DCTEmbedStats(
blocks_used=coefs_used // 63, # Approximate blocks
blocks_available=len(all_positions) // 63,
bits_embedded=len(bits),
capacity_bits=len(all_positions),
usage_percent=(len(bits) / len(all_positions)) * 100 if all_positions else 0,
image_width=width,
image_height=height,
output_format=OUTPUT_FORMAT_JPEG,
jpeg_native=True,
color_mode=color_mode, # JPEG naturally preserves color
)
return stego_bytes, stats
finally:
for path in [input_path, output_path]:
try:
os.unlink(path)
except OSError:
pass
def extract_from_dct(
stego_image: bytes,
seed: bytes,
) -> bytes:
"""
Extract data from DCT stego image.
Automatically detects whether image uses scipy DCT or jpegio embedding,
and handles both grayscale and color modes.
Args:
stego_image: Stego image bytes
seed: Same seed used for embedding
Returns:
Extracted data bytes
"""
# Check image format
img = Image.open(io.BytesIO(stego_image))
if img.format == 'JPEG' and HAS_JPEGIO:
# Try jpegio extraction first
try:
return _extract_jpegio(stego_image, seed)
except ValueError:
# If jpegio magic not found, fall back to scipy method
pass
# PNG or fallback: use scipy DCT method
_check_scipy()
return _extract_scipy_dct(stego_image, seed)
def _extract_scipy_dct(stego_image: bytes, seed: bytes) -> bytes:
"""
Extract using scipy DCT (for PNG images).
v3.2.0: Now properly handles both grayscale and color modes by
first trying to detect the mode from header flags, then extracting
from the appropriate channel.
"""
# First, try extracting from grayscale to get header and detect mode
# This works because even color-mode images can be converted to grayscale
# and the Y channel ≈ grayscale for extraction purposes
# Try Y channel extraction first (works for both color and grayscale)
img = Image.open(io.BytesIO(stego_image))
if img.mode in ('RGB', 'RGBA'):
# Extract from Y channel (more accurate for color-mode images)
channel = _extract_y_channel(stego_image)
else:
# Grayscale image
channel = _to_grayscale(stego_image)
padded, original_size = _pad_to_blocks(channel)
h, w = padded.shape
blocks_x = w // BLOCK_SIZE
blocks_y = h // BLOCK_SIZE
num_blocks = blocks_x * blocks_y
block_order = _generate_block_order(num_blocks, seed)
all_bits = []
for block_num in block_order:
by = (block_num // blocks_x) * BLOCK_SIZE
bx = (block_num % blocks_x) * BLOCK_SIZE
block = padded[by:by+BLOCK_SIZE, bx:bx+BLOCK_SIZE]
dct_block = _dct2(block)
for pos in DEFAULT_EMBED_POSITIONS:
bit = _extract_bit_from_coeff(dct_block[pos])
all_bits.append(bit)
if len(all_bits) >= HEADER_SIZE * 8:
try:
_, flags, data_length = _parse_header(all_bits[:HEADER_SIZE * 8])
total_needed = (HEADER_SIZE + data_length) * 8
if len(all_bits) >= total_needed:
break
except ValueError:
pass
version, flags, data_length = _parse_header(all_bits)
# Check if color mode flag is set (for informational purposes)
is_color_mode = bool(flags & FLAG_COLOR_MODE)
data_bits = all_bits[HEADER_SIZE * 8:(HEADER_SIZE + data_length) * 8]
data = bytes([
sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(data_length)
])
return data
def _extract_jpegio(stego_image: bytes, seed: bytes) -> bytes:
"""Extract using jpegio for JPEG images."""
import os
temp_path = _jpegio_bytes_to_file(stego_image, suffix='.jpg')
try:
jpeg = jio.read(temp_path)
coef_array = jpeg.coef_arrays[JPEGIO_EMBED_CHANNEL]
all_positions = _jpegio_get_usable_positions(coef_array)
order = _jpegio_generate_order(len(all_positions), seed)
# Extract header bits
header_bits = []
for pos_idx in order[:HEADER_SIZE * 8]:
row, col = all_positions[pos_idx]
coef = coef_array[row, col]
header_bits.append(coef & 1)
header_bytes = bytes([
sum(header_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(HEADER_SIZE)
])
version, flags, data_length = _jpegio_parse_header(header_bytes)
# Extract all needed bits
total_bits_needed = (HEADER_SIZE + data_length) * 8
all_bits = []
for bit_idx, pos_idx in enumerate(order):
if bit_idx >= total_bits_needed:
break
row, col = all_positions[pos_idx]
coef = coef_array[row, col]
all_bits.append(coef & 1)
# Extract data
data_bits = all_bits[HEADER_SIZE * 8:]
data = bytes([
sum(data_bits[i*8:(i+1)*8][j] << (7-j) for j in range(8))
for i in range(data_length)
])
return data
finally:
try:
os.unlink(temp_path)
except OSError:
pass
# ============================================================================
# CONVENIENCE FUNCTIONS
# ============================================================================
def get_output_extension(output_format: str) -> str:
"""Get file extension for output format."""
if output_format == OUTPUT_FORMAT_JPEG:
return '.jpg'
return '.png'
def get_output_mimetype(output_format: str) -> str:
"""Get MIME type for output format."""
if output_format == OUTPUT_FORMAT_JPEG:
return 'image/jpeg'
return 'image/png'

0
src/stegasoo/py.typed Normal file
View File

View File

@@ -1,205 +0,0 @@
#!/usr/bin/env python3
"""
Test that mimics the exact /api/compare-capacity flow.
Run with: python test_compare_capacity_flow.py ./xx_2.jpg
"""
import sys
import io
import gc
import json
import time
print("=" * 60)
print("COMPARE-CAPACITY FLOW TEST")
print("=" * 60)
if len(sys.argv) < 2:
print("Usage: python test_compare_capacity_flow.py <image_path>")
sys.exit(1)
image_path = sys.argv[1]
# Read the file
with open(image_path, 'rb') as f:
carrier_data = f.read()
print(f"Loaded {len(carrier_data)} bytes from {image_path}")
# Import everything like Flask does
print("\n[1] Importing modules...")
from PIL import Image
import numpy as np
try:
import jpegio as jio
HAS_JPEGIO = True
print(f" jpegio: available")
except ImportError:
HAS_JPEGIO = False
print(f" jpegio: NOT available")
try:
from scipy.fft import dct, idct
print(f" scipy.fft: available")
except ImportError:
from scipy.fftpack import dct, idct
print(f" scipy.fftpack: available (fallback)")
print(" Imports complete")
# Simulate the compare_modes function
print("\n[2] Opening image (1st time - for dimensions)...")
img1 = Image.open(io.BytesIO(carrier_data))
width, height = img1.size
print(f" Size: {width}x{height}")
img1.close()
print(" Closed img1")
gc.collect()
print("\n[3] Opening image (2nd time - for LSB capacity)...")
img2 = Image.open(io.BytesIO(carrier_data))
num_pixels = img2.size[0] * img2.size[1]
lsb_bytes = (num_pixels * 3) // 8 - 69
print(f" LSB capacity: {lsb_bytes} bytes")
img2.close()
print(" Closed img2")
gc.collect()
print("\n[4] Opening image (3rd time - for DCT capacity)...")
img3 = Image.open(io.BytesIO(carrier_data))
w, h = img3.size
blocks_x = w // 8
blocks_y = h // 8
total_blocks = blocks_x * blocks_y
dct_bits = total_blocks * 16
dct_bytes = dct_bits // 8 - 10
print(f" DCT capacity: {dct_bytes} bytes ({total_blocks} blocks)")
img3.close()
print(" Closed img3")
gc.collect()
print("\n[5] Building response dict...")
response = {
'success': True,
'width': width,
'height': height,
'lsb': {
'capacity_bytes': lsb_bytes,
'capacity_kb': round(lsb_bytes / 1024, 1),
'output': 'PNG',
},
'dct': {
'capacity_bytes': dct_bytes,
'capacity_kb': round(dct_bytes / 1024, 1),
'output': 'JPEG',
'available': True,
'ratio': round(dct_bytes / lsb_bytes * 100, 1),
}
}
print(f" Response built")
print("\n[6] Serializing to JSON...")
json_str = json.dumps(response)
print(f" JSON length: {len(json_str)} bytes")
print(f" Content: {json_str[:200]}...")
print("\n[7] Simulating Flask response completion...")
# In Flask, after the response is sent, Python may garbage collect
del carrier_data
del response
del json_str
gc.collect()
print(" GC after response simulation")
print("\n[8] Additional cleanup (simulating request end)...")
gc.collect()
gc.collect()
print(" Multiple GC cycles complete")
print("\n[9] Waiting for delayed crash...")
for i in range(3):
time.sleep(1)
print(f" {i+1}s...")
gc.collect()
print("\n" + "=" * 60)
print("TEST PASSED - No crash detected")
print("=" * 60)
# Now test with jpegio if available
if HAS_JPEGIO:
print("\n" + "=" * 60)
print("JPEGIO SPECIFIC TEST")
print("=" * 60)
import tempfile
import os
# Reload image data
with open(image_path, 'rb') as f:
carrier_data = f.read()
print("\n[J1] Checking if image is JPEG...")
img = Image.open(io.BytesIO(carrier_data))
is_jpeg = img.format == 'JPEG'
img.close()
print(f" Is JPEG: {is_jpeg}")
if is_jpeg:
print("\n[J2] Writing to temp file...")
fd, temp_path = tempfile.mkstemp(suffix='.jpg')
os.write(fd, carrier_data)
os.close(fd)
print(f" Temp file: {temp_path}")
print("\n[J3] Reading with jpegio...")
try:
jpeg = jio.read(temp_path)
print(f" jpegio.read() OK")
print("\n[J4] Accessing coefficient arrays...")
coef = jpeg.coef_arrays[0]
print(f" Coef shape: {coef.shape}, dtype: {coef.dtype}")
print("\n[J5] Counting usable positions...")
positions = []
h, w = coef.shape
for row in range(h):
for col in range(w):
if (row % 8 == 0) and (col % 8 == 0):
continue
if abs(coef[row, col]) >= 2:
positions.append((row, col))
print(f" Usable positions: {len(positions)}")
print("\n[J6] Cleaning up jpegio object...")
del coef
del jpeg
gc.collect()
print(" Deleted jpeg object")
print("\n[J7] Removing temp file...")
os.unlink(temp_path)
print(" Temp file removed")
gc.collect()
print("\n[J8] Final GC...")
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
print("\n[J9] Waiting for delayed crash...")
for i in range(3):
time.sleep(1)
print(f" {i+1}s...")
gc.collect()
print("\n" + "=" * 60)
print("JPEGIO TEST PASSED - No crash detected")
print("=" * 60)
else:
print(" Skipping jpegio test (not a JPEG)")
print("\n\nAll tests completed successfully!")

View File

@@ -1,231 +0,0 @@
#!/usr/bin/env python3
"""
Standalone DCT crash diagnostic script.
Run this outside of Flask to isolate the issue.
Usage:
python test_dct_crash.py /path/to/your/large_image.jpg
"""
import sys
import gc
import traceback
import io
print("=" * 60)
print("DCT CRASH DIAGNOSTIC TOOL")
print("=" * 60)
# Step 1: Check Python and library versions
print("\n[1] ENVIRONMENT INFO")
print(f"Python: {sys.version}")
try:
import numpy as np
print(f"NumPy: {np.__version__}")
except ImportError as e:
print(f"NumPy: NOT INSTALLED - {e}")
sys.exit(1)
try:
import scipy
print(f"SciPy: {scipy.__version__}")
except ImportError as e:
print(f"SciPy: NOT INSTALLED - {e}")
sys.exit(1)
try:
from PIL import Image
import PIL
print(f"Pillow: {PIL.__version__}")
except ImportError as e:
print(f"Pillow: NOT INSTALLED - {e}")
sys.exit(1)
# Step 2: Check which DCT module we're using
print("\n[2] DCT MODULE CHECK")
try:
from scipy.fft import dct, idct
print("Using: scipy.fft (preferred)")
DCT_MODULE = "scipy.fft"
except ImportError:
try:
from scipy.fftpack import dct, idct
print("Using: scipy.fftpack (legacy)")
DCT_MODULE = "scipy.fftpack"
except ImportError:
print("ERROR: No DCT module available!")
sys.exit(1)
# Step 3: Test basic DCT on small array
print("\n[3] BASIC DCT TEST (8x8 block)")
try:
test_block = np.random.rand(8, 8).astype(np.float64)
# 1D DCT on rows
result = dct(test_block[0, :], norm='ortho')
print(f" 1D DCT: OK (output shape: {result.shape})")
# 1D IDCT
recovered = idct(result, norm='ortho')
error = np.max(np.abs(test_block[0, :] - recovered))
print(f" 1D IDCT: OK (roundtrip error: {error:.2e})")
# 2D via separable
temp = np.zeros_like(test_block)
for i in range(8):
temp[:, i] = dct(test_block[:, i], norm='ortho')
result2d = np.zeros_like(temp)
for i in range(8):
result2d[i, :] = dct(temp[i, :], norm='ortho')
print(f" 2D DCT: OK")
gc.collect()
print(" GC after basic test: OK")
except Exception as e:
print(f" FAILED: {e}")
traceback.print_exc()
# Step 4: Test with larger arrays (stress test)
print("\n[4] STRESS TEST (many 8x8 blocks)")
try:
NUM_BLOCKS = 10000
print(f" Processing {NUM_BLOCKS} blocks...")
for i in range(NUM_BLOCKS):
block = np.random.rand(8, 8).astype(np.float64)
# Forward DCT
temp = np.zeros_like(block)
for j in range(8):
temp[:, j] = dct(block[:, j], norm='ortho')
result = np.zeros_like(temp)
for j in range(8):
result[j, :] = dct(temp[j, :], norm='ortho')
# Inverse DCT
temp2 = np.zeros_like(result)
for j in range(8):
temp2[j, :] = idct(result[j, :], norm='ortho')
recovered = np.zeros_like(temp2)
for j in range(8):
recovered[:, j] = idct(temp2[:, j], norm='ortho')
if i % 1000 == 0:
gc.collect()
print(f" {i}/{NUM_BLOCKS} blocks processed...")
gc.collect()
print(f" Stress test PASSED")
except Exception as e:
print(f" FAILED at block {i}: {e}")
traceback.print_exc()
# Step 5: Test with actual image if provided
if len(sys.argv) > 1:
image_path = sys.argv[1]
print(f"\n[5] IMAGE TEST: {image_path}")
try:
with open(image_path, 'rb') as f:
image_data = f.read()
print(f" File size: {len(image_data) / 1024 / 1024:.2f} MB")
img = Image.open(io.BytesIO(image_data))
width, height = img.size
print(f" Dimensions: {width}x{height}")
print(f" Format: {img.format}")
print(f" Mode: {img.mode}")
# Convert to grayscale float array
gray = img.convert('L')
arr = np.array(gray, dtype=np.float64)
img.close()
gray.close()
print(f" Array shape: {arr.shape}")
print(f" Array dtype: {arr.dtype}")
# Pad to block boundary
BLOCK_SIZE = 8
h, w = arr.shape
new_h = ((h + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
new_w = ((w + BLOCK_SIZE - 1) // BLOCK_SIZE) * BLOCK_SIZE
if new_h != h or new_w != w:
padded = np.zeros((new_h, new_w), dtype=np.float64)
padded[:h, :w] = arr
arr = padded
print(f" Padded to: {arr.shape}")
blocks_y = arr.shape[0] // BLOCK_SIZE
blocks_x = arr.shape[1] // BLOCK_SIZE
total_blocks = blocks_y * blocks_x
print(f" Total 8x8 blocks: {total_blocks}")
# Process ALL blocks
print(f" Processing all blocks with DCT...")
processed = 0
for by in range(blocks_y):
for bx in range(blocks_x):
y = by * BLOCK_SIZE
x = bx * BLOCK_SIZE
block = arr[y:y+BLOCK_SIZE, x:x+BLOCK_SIZE].copy()
# Forward DCT
temp = np.zeros((8, 8), dtype=np.float64)
for i in range(8):
temp[:, i] = dct(block[:, i], norm='ortho')
dct_block = np.zeros((8, 8), dtype=np.float64)
for i in range(8):
dct_block[i, :] = dct(temp[i, :], norm='ortho')
# Inverse DCT
temp2 = np.zeros((8, 8), dtype=np.float64)
for i in range(8):
temp2[i, :] = idct(dct_block[i, :], norm='ortho')
recovered = np.zeros((8, 8), dtype=np.float64)
for i in range(8):
recovered[:, i] = idct(temp2[:, i], norm='ortho')
processed += 1
# GC after each row of blocks
if by % 50 == 0:
gc.collect()
print(f" Row {by}/{blocks_y} ({processed}/{total_blocks} blocks)")
gc.collect()
print(f" Image DCT test PASSED ({processed} blocks)")
except Exception as e:
print(f" FAILED: {e}")
traceback.print_exc()
else:
print("\n[5] IMAGE TEST: Skipped (no image path provided)")
print(" Usage: python test_dct_crash.py /path/to/image.jpg")
# Step 6: Final cleanup test
print("\n[6] FINAL CLEANUP TEST")
try:
gc.collect()
gc.collect()
gc.collect()
print(" Multiple GC cycles: OK")
except Exception as e:
print(f" FAILED: {e}")
print("\n" + "=" * 60)
print("If this script completes without 'free(): invalid size',")
print("the issue is likely in PIL/jpegio interaction, not scipy DCT.")
print("=" * 60)
# Keep process alive briefly to catch delayed crashes
import time
print("\nWaiting 2 seconds for delayed crashes...")
time.sleep(2)
print("Done - no crash detected!")