golfgame/server/services/admin_service.py
Aaron D. Lee bea85e6b28 Huge v2 uplift, now deployable with real user management and tooling!
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 11:32:15 -05:00

1244 lines
39 KiB
Python

"""
Admin service for Golf game.
Provides admin capabilities: user management, game moderation,
system monitoring, audit logging, and invite code management.
"""
import json
import logging
import secrets
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, List
import asyncpg
from models.user import User, UserRole
from stores.user_store import UserStore
logger = logging.getLogger(__name__)
@dataclass
class UserDetails:
"""Extended user info for admin view."""
id: str
username: str
email: Optional[str]
role: str
email_verified: bool
is_banned: bool
ban_reason: Optional[str]
force_password_reset: bool
created_at: datetime
last_login: Optional[datetime]
last_seen_at: Optional[datetime]
is_active: bool
games_played: int
games_won: int
def to_dict(self) -> dict:
return {
"id": self.id,
"username": self.username,
"email": self.email,
"role": self.role,
"email_verified": self.email_verified,
"is_banned": self.is_banned,
"ban_reason": self.ban_reason,
"force_password_reset": self.force_password_reset,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_login": self.last_login.isoformat() if self.last_login else None,
"last_seen_at": self.last_seen_at.isoformat() if self.last_seen_at else None,
"is_active": self.is_active,
"games_played": self.games_played,
"games_won": self.games_won,
}
@dataclass
class AuditEntry:
"""Admin audit log entry."""
id: int
admin_username: str
admin_user_id: str
action: str
target_type: Optional[str]
target_id: Optional[str]
details: dict
ip_address: Optional[str]
created_at: datetime
def to_dict(self) -> dict:
return {
"id": self.id,
"admin_username": self.admin_username,
"admin_user_id": self.admin_user_id,
"action": self.action,
"target_type": self.target_type,
"target_id": self.target_id,
"details": self.details,
"ip_address": self.ip_address,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
@dataclass
class SystemStats:
"""System statistics snapshot."""
active_users_now: int
active_games_now: int
total_users: int
total_games_completed: int
registrations_today: int
registrations_week: int
games_today: int
events_last_hour: int
top_players: List[dict]
def to_dict(self) -> dict:
return {
"active_users_now": self.active_users_now,
"active_games_now": self.active_games_now,
"total_users": self.total_users,
"total_games_completed": self.total_games_completed,
"registrations_today": self.registrations_today,
"registrations_week": self.registrations_week,
"games_today": self.games_today,
"events_last_hour": self.events_last_hour,
"top_players": self.top_players,
}
@dataclass
class InviteCode:
"""Invite code details."""
code: str
created_by: str
created_by_username: str
created_at: datetime
expires_at: datetime
max_uses: int
use_count: int
is_active: bool
def to_dict(self) -> dict:
return {
"code": self.code,
"created_by": self.created_by,
"created_by_username": self.created_by_username,
"created_at": self.created_at.isoformat() if self.created_at else None,
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"max_uses": self.max_uses,
"use_count": self.use_count,
"is_active": self.is_active,
"remaining_uses": max(0, self.max_uses - self.use_count),
}
class AdminService:
"""
Admin operations and moderation service.
Provides methods for:
- Audit logging
- User management (search, ban, unban, force password reset)
- Game moderation (view active games, end stuck games)
- System statistics
- Invite code management
- User impersonation (read-only)
"""
def __init__(self, pool: asyncpg.Pool, user_store: UserStore, state_cache=None):
"""
Initialize admin service.
Args:
pool: asyncpg connection pool.
user_store: User persistence store.
state_cache: Optional Redis state cache for game operations.
"""
self.pool = pool
self.user_store = user_store
self.state_cache = state_cache
# -------------------------------------------------------------------------
# Audit Logging
# -------------------------------------------------------------------------
async def audit(
self,
admin_id: str,
action: str,
target_type: Optional[str] = None,
target_id: Optional[str] = None,
details: Optional[dict] = None,
ip_address: Optional[str] = None,
) -> int:
"""
Log an admin action.
Args:
admin_id: Admin user ID.
action: Action name (e.g., "ban_user", "end_game").
target_type: Type of target (e.g., "user", "game", "invite_code").
target_id: ID of the target.
details: Additional details as JSON.
ip_address: Admin's IP address.
Returns:
Audit log entry ID.
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO admin_audit_log
(admin_user_id, action, target_type, target_id, details, ip_address)
VALUES ($1, $2, $3, $4, $5, $6::inet)
RETURNING id
""",
admin_id,
action,
target_type,
target_id,
json.dumps(details or {}),
ip_address,
)
return row["id"]
async def get_audit_log(
self,
limit: int = 100,
offset: int = 0,
admin_id: Optional[str] = None,
action: Optional[str] = None,
target_type: Optional[str] = None,
) -> List[AuditEntry]:
"""
Get audit log entries with optional filtering.
Args:
limit: Maximum number of entries to return.
offset: Number of entries to skip.
admin_id: Filter by admin user ID.
action: Filter by action name.
target_type: Filter by target type.
Returns:
List of audit entries.
"""
async with self.pool.acquire() as conn:
query = """
SELECT a.id, u.username as admin_username, a.admin_user_id,
a.action, a.target_type, a.target_id, a.details,
a.ip_address, a.created_at
FROM admin_audit_log a
JOIN users_v2 u ON a.admin_user_id = u.id
WHERE 1=1
"""
params = []
param_num = 1
if admin_id:
query += f" AND a.admin_user_id = ${param_num}"
params.append(admin_id)
param_num += 1
if action:
query += f" AND a.action = ${param_num}"
params.append(action)
param_num += 1
if target_type:
query += f" AND a.target_type = ${param_num}"
params.append(target_type)
param_num += 1
query += f" ORDER BY a.created_at DESC LIMIT ${param_num} OFFSET ${param_num + 1}"
params.extend([limit, offset])
rows = await conn.fetch(query, *params)
return [
AuditEntry(
id=row["id"],
admin_username=row["admin_username"],
admin_user_id=str(row["admin_user_id"]),
action=row["action"],
target_type=row["target_type"],
target_id=row["target_id"],
details=json.loads(row["details"]) if row["details"] else {},
ip_address=str(row["ip_address"]) if row["ip_address"] else None,
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
)
for row in rows
]
# -------------------------------------------------------------------------
# User Management
# -------------------------------------------------------------------------
async def search_users(
self,
query: str = "",
limit: int = 50,
offset: int = 0,
include_banned: bool = True,
include_deleted: bool = False,
) -> List[UserDetails]:
"""
Search users by username or email.
Args:
query: Search query (matches username or email).
limit: Maximum number of results.
offset: Number of results to skip.
include_banned: Include banned users.
include_deleted: Include soft-deleted users.
Returns:
List of user details.
"""
async with self.pool.acquire() as conn:
sql = """
SELECT u.id, u.username, u.email, u.role,
u.email_verified, u.is_banned, u.ban_reason,
u.force_password_reset, u.created_at, u.last_login,
u.last_seen_at, u.is_active,
COALESCE(s.games_played, 0) as games_played,
COALESCE(s.games_won, 0) as games_won
FROM users_v2 u
LEFT JOIN player_stats s ON u.id = s.user_id
WHERE 1=1
"""
params = []
param_num = 1
if query:
sql += f" AND (u.username ILIKE ${param_num} OR u.email ILIKE ${param_num})"
params.append(f"%{query}%")
param_num += 1
if not include_banned:
sql += " AND (u.is_banned = false OR u.is_banned IS NULL)"
if not include_deleted:
sql += " AND u.deleted_at IS NULL"
sql += f" ORDER BY u.created_at DESC LIMIT ${param_num} OFFSET ${param_num + 1}"
params.extend([limit, offset])
rows = await conn.fetch(sql, *params)
return [
UserDetails(
id=str(row["id"]),
username=row["username"],
email=row["email"],
role=row["role"],
email_verified=row["email_verified"],
is_banned=row["is_banned"] or False,
ban_reason=row["ban_reason"],
force_password_reset=row["force_password_reset"] or False,
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
last_login=row["last_login"].replace(tzinfo=timezone.utc) if row["last_login"] else None,
last_seen_at=row["last_seen_at"].replace(tzinfo=timezone.utc) if row["last_seen_at"] else None,
is_active=row["is_active"],
games_played=row["games_played"] or 0,
games_won=row["games_won"] or 0,
)
for row in rows
]
async def get_user(self, user_id: str) -> Optional[UserDetails]:
"""
Get detailed user info by ID.
Args:
user_id: User UUID.
Returns:
User details, or None if not found.
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT u.id, u.username, u.email, u.role,
u.email_verified, u.is_banned, u.ban_reason,
u.force_password_reset, u.created_at, u.last_login,
u.last_seen_at, u.is_active,
COALESCE(s.games_played, 0) as games_played,
COALESCE(s.games_won, 0) as games_won
FROM users_v2 u
LEFT JOIN player_stats s ON u.id = s.user_id
WHERE u.id = $1
""",
user_id,
)
if not row:
return None
return UserDetails(
id=str(row["id"]),
username=row["username"],
email=row["email"],
role=row["role"],
email_verified=row["email_verified"],
is_banned=row["is_banned"] or False,
ban_reason=row["ban_reason"],
force_password_reset=row["force_password_reset"] or False,
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
last_login=row["last_login"].replace(tzinfo=timezone.utc) if row["last_login"] else None,
last_seen_at=row["last_seen_at"].replace(tzinfo=timezone.utc) if row["last_seen_at"] else None,
is_active=row["is_active"],
games_played=row["games_played"] or 0,
games_won=row["games_won"] or 0,
)
async def ban_user(
self,
admin_id: str,
user_id: str,
reason: str,
duration_days: Optional[int] = None,
ip_address: Optional[str] = None,
) -> bool:
"""
Ban a user.
Args:
admin_id: Admin performing the ban.
user_id: User to ban.
reason: Reason for ban.
duration_days: Optional ban duration (None = permanent).
ip_address: Admin's IP address for audit.
Returns:
True if ban was successful.
"""
expires_at = None
if duration_days:
expires_at = datetime.now(timezone.utc) + timedelta(days=duration_days)
async with self.pool.acquire() as conn:
# Check user exists and isn't admin
user = await conn.fetchrow(
"SELECT role FROM users_v2 WHERE id = $1",
user_id,
)
if not user:
return False
if user["role"] == "admin":
logger.warning(f"Admin {admin_id} attempted to ban admin {user_id}")
return False # Can't ban admins
# Create ban record
await conn.execute(
"""
INSERT INTO user_bans (user_id, banned_by, reason, expires_at)
VALUES ($1, $2, $3, $4)
""",
user_id,
admin_id,
reason,
expires_at,
)
# Update user
await conn.execute(
"""
UPDATE users_v2
SET is_banned = true, ban_reason = $1
WHERE id = $2
""",
reason,
user_id,
)
# Revoke all sessions
await conn.execute(
"""
UPDATE user_sessions
SET revoked_at = NOW()
WHERE user_id = $1 AND revoked_at IS NULL
""",
user_id,
)
# Kick from any active games (if state cache available)
if self.state_cache:
await self._kick_from_games(user_id)
# Audit log
await self.audit(
admin_id,
"ban_user",
"user",
user_id,
{"reason": reason, "duration_days": duration_days},
ip_address,
)
logger.info(f"Admin {admin_id} banned user {user_id}: {reason}")
return True
async def unban_user(
self,
admin_id: str,
user_id: str,
ip_address: Optional[str] = None,
) -> bool:
"""
Unban a user.
Args:
admin_id: Admin performing the unban.
user_id: User to unban.
ip_address: Admin's IP address for audit.
Returns:
True if unban was successful.
"""
async with self.pool.acquire() as conn:
# Update ban record
await conn.execute(
"""
UPDATE user_bans
SET unbanned_at = NOW(), unbanned_by = $1
WHERE user_id = $2 AND unbanned_at IS NULL
""",
admin_id,
user_id,
)
# Update user
result = await conn.execute(
"""
UPDATE users_v2
SET is_banned = false, ban_reason = NULL
WHERE id = $1
""",
user_id,
)
if result == "UPDATE 0":
return False
await self.audit(
admin_id,
"unban_user",
"user",
user_id,
ip_address=ip_address,
)
logger.info(f"Admin {admin_id} unbanned user {user_id}")
return True
async def force_password_reset(
self,
admin_id: str,
user_id: str,
ip_address: Optional[str] = None,
) -> bool:
"""
Force user to reset password on next login.
Args:
admin_id: Admin performing the action.
user_id: User to force reset.
ip_address: Admin's IP address for audit.
Returns:
True if successful.
"""
async with self.pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE users_v2
SET force_password_reset = true
WHERE id = $1
""",
user_id,
)
if result == "UPDATE 0":
return False
# Revoke all sessions to force re-login
await conn.execute(
"""
UPDATE user_sessions
SET revoked_at = NOW()
WHERE user_id = $1 AND revoked_at IS NULL
""",
user_id,
)
await self.audit(
admin_id,
"force_password_reset",
"user",
user_id,
ip_address=ip_address,
)
logger.info(f"Admin {admin_id} forced password reset for user {user_id}")
return True
async def change_user_role(
self,
admin_id: str,
user_id: str,
new_role: str,
ip_address: Optional[str] = None,
) -> bool:
"""
Change user role.
Args:
admin_id: Admin performing the action.
user_id: User to modify.
new_role: New role ("user" or "admin").
ip_address: Admin's IP address for audit.
Returns:
True if successful.
"""
if new_role not in ("user", "admin"):
return False
async with self.pool.acquire() as conn:
# Get old role for audit
old = await conn.fetchrow(
"SELECT role FROM users_v2 WHERE id = $1",
user_id,
)
if not old:
return False
await conn.execute(
"UPDATE users_v2 SET role = $1 WHERE id = $2",
new_role,
user_id,
)
await self.audit(
admin_id,
"change_role",
"user",
user_id,
{"old_role": old["role"], "new_role": new_role},
ip_address,
)
logger.info(f"Admin {admin_id} changed role for user {user_id}: {old['role']} -> {new_role}")
return True
async def get_user_ban_history(self, user_id: str) -> List[dict]:
"""
Get ban history for a user.
Args:
user_id: User UUID.
Returns:
List of ban records.
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT b.id, b.reason, b.banned_at, b.expires_at,
b.unbanned_at, u1.username as banned_by_username,
u2.username as unbanned_by_username
FROM user_bans b
JOIN users_v2 u1 ON b.banned_by = u1.id
LEFT JOIN users_v2 u2 ON b.unbanned_by = u2.id
WHERE b.user_id = $1
ORDER BY b.banned_at DESC
""",
user_id,
)
return [
{
"id": row["id"],
"reason": row["reason"],
"banned_at": row["banned_at"].isoformat() if row["banned_at"] else None,
"expires_at": row["expires_at"].isoformat() if row["expires_at"] else None,
"unbanned_at": row["unbanned_at"].isoformat() if row["unbanned_at"] else None,
"banned_by": row["banned_by_username"],
"unbanned_by": row["unbanned_by_username"],
}
for row in rows
]
# -------------------------------------------------------------------------
# User Impersonation (Read-Only)
# -------------------------------------------------------------------------
async def impersonate_user(
self,
admin_id: str,
user_id: str,
ip_address: Optional[str] = None,
) -> Optional[User]:
"""
Get user object for read-only impersonation.
This allows an admin to view the app as another user would see it,
without being able to make changes. The returned User object should
only be used for read operations.
Args:
admin_id: Admin performing impersonation.
user_id: User to impersonate.
ip_address: Admin's IP address for audit.
Returns:
User object for impersonation, or None if user not found.
"""
user = await self.user_store.get_user_by_id(user_id)
if user:
await self.audit(
admin_id,
"impersonate_user",
"user",
user_id,
ip_address=ip_address,
)
logger.info(f"Admin {admin_id} started impersonating user {user_id}")
return user
# -------------------------------------------------------------------------
# Game Moderation
# -------------------------------------------------------------------------
async def get_active_games(self) -> List[dict]:
"""
Get all active games.
Returns:
List of active game info dicts.
"""
if not self.state_cache:
# Fall back to database
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, room_code, status, created_at, started_at,
num_players, num_rounds, host_id
FROM games_v2
WHERE status = 'active'
ORDER BY created_at DESC
"""
)
return [
{
"game_id": str(row["id"]),
"room_code": row["room_code"],
"status": row["status"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
"player_count": row["num_players"] or 0,
"num_rounds": row["num_rounds"] or 0,
"host_id": row["host_id"],
}
for row in rows
]
# Use Redis state cache for live data
rooms = await self.state_cache.get_active_rooms()
games = []
for room_code in rooms:
room = await self.state_cache.get_room(room_code)
if room:
game_id = room.get("game_id")
state = None
if game_id:
state = await self.state_cache.get_game_state(game_id)
players = await self.state_cache.get_room_players(room_code)
games.append({
"room_code": room_code,
"game_id": game_id,
"status": room.get("status"),
"created_at": room.get("created_at"),
"player_count": len(players),
"phase": state.get("phase") if state else None,
"current_round": state.get("current_round") if state else None,
})
return games
async def get_game_details(
self,
admin_id: str,
game_id: str,
ip_address: Optional[str] = None,
) -> Optional[dict]:
"""
Get full game state (admin view).
Args:
admin_id: Admin requesting the view.
game_id: Game UUID.
ip_address: Admin's IP address for audit.
Returns:
Full game state dict, or None if not found.
"""
state = None
if self.state_cache:
state = await self.state_cache.get_game_state(game_id)
if not state:
# Try database
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT id, room_code, status, created_at, started_at,
completed_at, num_players, num_rounds, options,
winner_id, host_id, player_ids
FROM games_v2
WHERE id = $1
""",
game_id,
)
if row:
state = {
"game_id": str(row["id"]),
"room_code": row["room_code"],
"status": row["status"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
"completed_at": row["completed_at"].isoformat() if row["completed_at"] else None,
"num_players": row["num_players"],
"num_rounds": row["num_rounds"],
"options": json.loads(row["options"]) if row["options"] else {},
"winner_id": row["winner_id"],
"host_id": row["host_id"],
"player_ids": row["player_ids"] or [],
}
if state:
await self.audit(
admin_id,
"view_game",
"game",
game_id,
ip_address=ip_address,
)
return state
async def end_game(
self,
admin_id: str,
game_id: str,
reason: str,
ip_address: Optional[str] = None,
) -> bool:
"""
Force-end a stuck game.
Args:
admin_id: Admin ending the game.
game_id: Game UUID.
reason: Reason for ending.
ip_address: Admin's IP address for audit.
Returns:
True if game was ended.
"""
room_code = None
if self.state_cache:
state = await self.state_cache.get_game_state(game_id)
if state:
room_code = state.get("room_code")
# Mark game as ended in cache
state["phase"] = "game_over"
state["admin_ended"] = True
state["admin_end_reason"] = reason
await self.state_cache.save_game_state(game_id, state)
# Update database
async with self.pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE games_v2
SET status = 'abandoned', completed_at = NOW()
WHERE id = $1 AND status = 'active'
""",
game_id,
)
if result == "UPDATE 0" and not room_code:
return False
# Get room code if we didn't have it
if not room_code:
row = await conn.fetchrow(
"SELECT room_code FROM games_v2 WHERE id = $1",
game_id,
)
if row:
room_code = row["room_code"]
await self.audit(
admin_id,
"end_game",
"game",
game_id,
{"reason": reason, "room_code": room_code},
ip_address,
)
logger.info(f"Admin {admin_id} ended game {game_id}: {reason}")
return True
async def _kick_from_games(self, user_id: str) -> None:
"""
Kick user from any active games.
Args:
user_id: User to kick.
"""
if not self.state_cache:
return
player_room = await self.state_cache.get_player_room(user_id)
if player_room:
await self.state_cache.remove_player_from_room(player_room, user_id)
logger.info(f"Kicked user {user_id} from room {player_room}")
# -------------------------------------------------------------------------
# System Stats
# -------------------------------------------------------------------------
async def get_system_stats(self) -> SystemStats:
"""
Get current system statistics.
Returns:
SystemStats snapshot.
"""
# Active games from Redis
active_games = 0
if self.state_cache:
active_rooms = await self.state_cache.get_active_rooms()
active_games = len(active_rooms)
async with self.pool.acquire() as conn:
# Total users
total_users = await conn.fetchval(
"SELECT COUNT(*) FROM users_v2 WHERE deleted_at IS NULL"
)
# Total completed games
total_games = await conn.fetchval(
"SELECT COUNT(*) FROM games_v2 WHERE status = 'completed'"
)
# Registrations today
reg_today = await conn.fetchval(
"""
SELECT COUNT(*) FROM users_v2
WHERE created_at >= CURRENT_DATE
AND deleted_at IS NULL
"""
)
# Registrations this week
reg_week = await conn.fetchval(
"""
SELECT COUNT(*) FROM users_v2
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
AND deleted_at IS NULL
"""
)
# Games today
games_today = await conn.fetchval(
"SELECT COUNT(*) FROM games_v2 WHERE created_at >= CURRENT_DATE"
)
# Events last hour
events_hour = await conn.fetchval(
"""
SELECT COUNT(*) FROM events
WHERE created_at >= NOW() - INTERVAL '1 hour'
"""
) or 0
# Top players (by wins)
top_players = await conn.fetch(
"""
SELECT u.username, s.games_won, s.games_played
FROM player_stats s
JOIN users_v2 u ON s.user_id = u.id
WHERE s.games_played >= 3
ORDER BY s.games_won DESC
LIMIT 10
"""
)
# Active users (sessions used in last hour)
active_users = await conn.fetchval(
"""
SELECT COUNT(DISTINCT user_id)
FROM user_sessions
WHERE last_used_at >= NOW() - INTERVAL '1 hour'
AND revoked_at IS NULL
"""
)
return SystemStats(
active_users_now=active_users or 0,
active_games_now=active_games,
total_users=total_users or 0,
total_games_completed=total_games or 0,
registrations_today=reg_today or 0,
registrations_week=reg_week or 0,
games_today=games_today or 0,
events_last_hour=events_hour,
top_players=[
{
"username": p["username"],
"games_won": p["games_won"],
"games_played": p["games_played"],
}
for p in top_players
],
)
# -------------------------------------------------------------------------
# Invite Codes
# -------------------------------------------------------------------------
async def create_invite_code(
self,
admin_id: str,
max_uses: int = 1,
expires_days: int = 7,
ip_address: Optional[str] = None,
) -> str:
"""
Create a new invite code.
Args:
admin_id: Admin creating the code.
max_uses: Maximum number of uses.
expires_days: Days until expiration.
ip_address: Admin's IP address for audit.
Returns:
The generated invite code.
"""
code = secrets.token_urlsafe(6).upper()[:8]
expires_at = datetime.now(timezone.utc) + timedelta(days=expires_days)
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO invite_codes (code, created_by, expires_at, max_uses)
VALUES ($1, $2, $3, $4)
""",
code,
admin_id,
expires_at,
max_uses,
)
await self.audit(
admin_id,
"create_invite",
"invite_code",
code,
{"max_uses": max_uses, "expires_days": expires_days},
ip_address,
)
logger.info(f"Admin {admin_id} created invite code {code}")
return code
async def get_invite_codes(self, include_expired: bool = False) -> List[InviteCode]:
"""
Get all invite codes.
Args:
include_expired: Include expired/inactive codes.
Returns:
List of invite codes.
"""
async with self.pool.acquire() as conn:
query = """
SELECT c.code, c.created_by, c.created_at, c.expires_at,
c.max_uses, c.use_count, c.is_active,
u.username as created_by_username
FROM invite_codes c
JOIN users_v2 u ON c.created_by = u.id
"""
if not include_expired:
query += " WHERE c.expires_at > NOW() AND c.is_active = true"
query += " ORDER BY c.created_at DESC"
rows = await conn.fetch(query)
return [
InviteCode(
code=row["code"],
created_by=str(row["created_by"]),
created_by_username=row["created_by_username"],
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
expires_at=row["expires_at"].replace(tzinfo=timezone.utc) if row["expires_at"] else None,
max_uses=row["max_uses"],
use_count=row["use_count"],
is_active=row["is_active"],
)
for row in rows
]
async def revoke_invite_code(
self,
admin_id: str,
code: str,
ip_address: Optional[str] = None,
) -> bool:
"""
Revoke an invite code.
Args:
admin_id: Admin revoking the code.
code: Code to revoke.
ip_address: Admin's IP address for audit.
Returns:
True if code was revoked.
"""
async with self.pool.acquire() as conn:
result = await conn.execute(
"UPDATE invite_codes SET is_active = false WHERE code = $1",
code,
)
if result == "UPDATE 0":
return False
await self.audit(
admin_id,
"revoke_invite",
"invite_code",
code,
ip_address=ip_address,
)
logger.info(f"Admin {admin_id} revoked invite code {code}")
return True
async def validate_invite_code(self, code: str) -> bool:
"""
Check if an invite code is valid.
Args:
code: Code to validate.
Returns:
True if code is valid and has remaining uses.
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT max_uses, use_count, expires_at, is_active
FROM invite_codes
WHERE code = $1
""",
code,
)
if not row:
return False
if not row["is_active"]:
return False
if row["expires_at"] and row["expires_at"] < datetime.now(timezone.utc):
return False
if row["use_count"] >= row["max_uses"]:
return False
return True
async def use_invite_code(self, code: str) -> bool:
"""
Use an invite code (increment use count).
Args:
code: Code to use.
Returns:
True if code was successfully used.
"""
if not await self.validate_invite_code(code):
return False
async with self.pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE invite_codes
SET use_count = use_count + 1
WHERE code = $1 AND is_active = true
AND use_count < max_uses
AND expires_at > NOW()
""",
code,
)
return result != "UPDATE 0"
# Global admin service instance
_admin_service: Optional[AdminService] = None
async def get_admin_service(
pool: asyncpg.Pool,
user_store: UserStore,
state_cache=None,
) -> AdminService:
"""
Get or create the global admin service instance.
Args:
pool: asyncpg connection pool.
user_store: User persistence store.
state_cache: Optional Redis state cache.
Returns:
AdminService instance.
"""
global _admin_service
if _admin_service is None:
_admin_service = AdminService(pool, user_store, state_cache)
return _admin_service
def close_admin_service() -> None:
"""Close the global admin service."""
global _admin_service
_admin_service = None