141 lines
4.3 KiB
Python
141 lines
4.3 KiB
Python
"""
|
|
Security headers middleware for FastAPI.
|
|
|
|
Adds security headers to all responses:
|
|
- Content-Security-Policy (CSP)
|
|
- X-Content-Type-Options
|
|
- X-Frame-Options
|
|
- X-XSS-Protection
|
|
- Referrer-Policy
|
|
- Permissions-Policy
|
|
- Strict-Transport-Security (HSTS)
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import Request
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.responses import Response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
|
|
"""
|
|
HTTP middleware for adding security headers.
|
|
|
|
Configurable CSP and HSTS settings for different environments.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
app,
|
|
environment: str = "development",
|
|
csp_report_uri: Optional[str] = None,
|
|
allowed_hosts: Optional[list[str]] = None,
|
|
):
|
|
"""
|
|
Initialize security headers middleware.
|
|
|
|
Args:
|
|
app: FastAPI application.
|
|
environment: Environment name (production enables HSTS).
|
|
csp_report_uri: Optional URI for CSP violation reports.
|
|
allowed_hosts: List of allowed hosts for connect-src directive.
|
|
"""
|
|
super().__init__(app)
|
|
self.environment = environment
|
|
self.csp_report_uri = csp_report_uri
|
|
self.allowed_hosts = allowed_hosts or []
|
|
|
|
async def dispatch(self, request: Request, call_next) -> Response:
|
|
"""
|
|
Add security headers to response.
|
|
|
|
Args:
|
|
request: Incoming HTTP request.
|
|
call_next: Next middleware/handler in chain.
|
|
|
|
Returns:
|
|
HTTP response with security headers.
|
|
"""
|
|
response = await call_next(request)
|
|
|
|
# Basic security headers
|
|
response.headers["X-Content-Type-Options"] = "nosniff"
|
|
response.headers["X-Frame-Options"] = "DENY"
|
|
response.headers["X-XSS-Protection"] = "1; mode=block"
|
|
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
|
|
|
|
# Permissions Policy (formerly Feature-Policy)
|
|
response.headers["Permissions-Policy"] = (
|
|
"geolocation=(), "
|
|
"microphone=(), "
|
|
"camera=(), "
|
|
"payment=(), "
|
|
"usb=()"
|
|
)
|
|
|
|
# Content Security Policy
|
|
csp = self._build_csp(request)
|
|
response.headers["Content-Security-Policy"] = csp
|
|
|
|
# HSTS (only in production with HTTPS)
|
|
if self.environment == "production":
|
|
# Only add HSTS if request came via HTTPS
|
|
forwarded_proto = request.headers.get("X-Forwarded-Proto", "")
|
|
if forwarded_proto == "https" or request.url.scheme == "https":
|
|
response.headers["Strict-Transport-Security"] = (
|
|
"max-age=31536000; includeSubDomains; preload"
|
|
)
|
|
|
|
return response
|
|
|
|
def _build_csp(self, request: Request) -> str:
|
|
"""
|
|
Build Content-Security-Policy header.
|
|
|
|
Args:
|
|
request: HTTP request (for host-specific directives).
|
|
|
|
Returns:
|
|
CSP header value string.
|
|
"""
|
|
# Get the host for WebSocket connections
|
|
host = request.headers.get("host", "localhost")
|
|
|
|
# Build connect-src directive
|
|
connect_sources = ["'self'"]
|
|
|
|
# Add WebSocket URLs
|
|
if self.environment == "production":
|
|
connect_sources.append(f"wss://{host}")
|
|
for allowed_host in self.allowed_hosts:
|
|
connect_sources.append(f"wss://{allowed_host}")
|
|
else:
|
|
# Development - allow ws:// and wss://
|
|
connect_sources.append(f"ws://{host}")
|
|
connect_sources.append(f"wss://{host}")
|
|
connect_sources.append("ws://localhost:*")
|
|
connect_sources.append("wss://localhost:*")
|
|
|
|
directives = [
|
|
"default-src 'self'",
|
|
"script-src 'self'",
|
|
# Allow inline styles for UI (cards, animations)
|
|
"style-src 'self' 'unsafe-inline'",
|
|
"img-src 'self' data:",
|
|
"font-src 'self'",
|
|
f"connect-src {' '.join(connect_sources)}",
|
|
"frame-ancestors 'none'",
|
|
"base-uri 'self'",
|
|
"form-action 'self'",
|
|
]
|
|
|
|
# Add report-uri if configured
|
|
if self.csp_report_uri:
|
|
directives.append(f"report-uri {self.csp_report_uri}")
|
|
|
|
return "; ".join(directives)
|