1244 lines
39 KiB
Python
1244 lines
39 KiB
Python
"""
|
|
Admin service for Golf game.
|
|
|
|
Provides admin capabilities: user management, game moderation,
|
|
system monitoring, audit logging, and invite code management.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import secrets
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional, List
|
|
|
|
import asyncpg
|
|
|
|
from models.user import User, UserRole
|
|
from stores.user_store import UserStore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class UserDetails:
|
|
"""Extended user info for admin view."""
|
|
id: str
|
|
username: str
|
|
email: Optional[str]
|
|
role: str
|
|
email_verified: bool
|
|
is_banned: bool
|
|
ban_reason: Optional[str]
|
|
force_password_reset: bool
|
|
created_at: datetime
|
|
last_login: Optional[datetime]
|
|
last_seen_at: Optional[datetime]
|
|
is_active: bool
|
|
games_played: int
|
|
games_won: int
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.id,
|
|
"username": self.username,
|
|
"email": self.email,
|
|
"role": self.role,
|
|
"email_verified": self.email_verified,
|
|
"is_banned": self.is_banned,
|
|
"ban_reason": self.ban_reason,
|
|
"force_password_reset": self.force_password_reset,
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
"last_login": self.last_login.isoformat() if self.last_login else None,
|
|
"last_seen_at": self.last_seen_at.isoformat() if self.last_seen_at else None,
|
|
"is_active": self.is_active,
|
|
"games_played": self.games_played,
|
|
"games_won": self.games_won,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class AuditEntry:
|
|
"""Admin audit log entry."""
|
|
id: int
|
|
admin_username: str
|
|
admin_user_id: str
|
|
action: str
|
|
target_type: Optional[str]
|
|
target_id: Optional[str]
|
|
details: dict
|
|
ip_address: Optional[str]
|
|
created_at: datetime
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.id,
|
|
"admin_username": self.admin_username,
|
|
"admin_user_id": self.admin_user_id,
|
|
"action": self.action,
|
|
"target_type": self.target_type,
|
|
"target_id": self.target_id,
|
|
"details": self.details,
|
|
"ip_address": self.ip_address,
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SystemStats:
|
|
"""System statistics snapshot."""
|
|
active_users_now: int
|
|
active_games_now: int
|
|
total_users: int
|
|
total_games_completed: int
|
|
registrations_today: int
|
|
registrations_week: int
|
|
games_today: int
|
|
events_last_hour: int
|
|
top_players: List[dict]
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"active_users_now": self.active_users_now,
|
|
"active_games_now": self.active_games_now,
|
|
"total_users": self.total_users,
|
|
"total_games_completed": self.total_games_completed,
|
|
"registrations_today": self.registrations_today,
|
|
"registrations_week": self.registrations_week,
|
|
"games_today": self.games_today,
|
|
"events_last_hour": self.events_last_hour,
|
|
"top_players": self.top_players,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class InviteCode:
|
|
"""Invite code details."""
|
|
code: str
|
|
created_by: str
|
|
created_by_username: str
|
|
created_at: datetime
|
|
expires_at: datetime
|
|
max_uses: int
|
|
use_count: int
|
|
is_active: bool
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"code": self.code,
|
|
"created_by": self.created_by,
|
|
"created_by_username": self.created_by_username,
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
|
|
"max_uses": self.max_uses,
|
|
"use_count": self.use_count,
|
|
"is_active": self.is_active,
|
|
"remaining_uses": max(0, self.max_uses - self.use_count),
|
|
}
|
|
|
|
|
|
class AdminService:
|
|
"""
|
|
Admin operations and moderation service.
|
|
|
|
Provides methods for:
|
|
- Audit logging
|
|
- User management (search, ban, unban, force password reset)
|
|
- Game moderation (view active games, end stuck games)
|
|
- System statistics
|
|
- Invite code management
|
|
- User impersonation (read-only)
|
|
"""
|
|
|
|
def __init__(self, pool: asyncpg.Pool, user_store: UserStore, state_cache=None):
|
|
"""
|
|
Initialize admin service.
|
|
|
|
Args:
|
|
pool: asyncpg connection pool.
|
|
user_store: User persistence store.
|
|
state_cache: Optional Redis state cache for game operations.
|
|
"""
|
|
self.pool = pool
|
|
self.user_store = user_store
|
|
self.state_cache = state_cache
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Audit Logging
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def audit(
|
|
self,
|
|
admin_id: str,
|
|
action: str,
|
|
target_type: Optional[str] = None,
|
|
target_id: Optional[str] = None,
|
|
details: Optional[dict] = None,
|
|
ip_address: Optional[str] = None,
|
|
) -> int:
|
|
"""
|
|
Log an admin action.
|
|
|
|
Args:
|
|
admin_id: Admin user ID.
|
|
action: Action name (e.g., "ban_user", "end_game").
|
|
target_type: Type of target (e.g., "user", "game", "invite_code").
|
|
target_id: ID of the target.
|
|
details: Additional details as JSON.
|
|
ip_address: Admin's IP address.
|
|
|
|
Returns:
|
|
Audit log entry ID.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO admin_audit_log
|
|
(admin_user_id, action, target_type, target_id, details, ip_address)
|
|
VALUES ($1, $2, $3, $4, $5, $6::inet)
|
|
RETURNING id
|
|
""",
|
|
admin_id,
|
|
action,
|
|
target_type,
|
|
target_id,
|
|
json.dumps(details or {}),
|
|
ip_address,
|
|
)
|
|
return row["id"]
|
|
|
|
async def get_audit_log(
|
|
self,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
admin_id: Optional[str] = None,
|
|
action: Optional[str] = None,
|
|
target_type: Optional[str] = None,
|
|
) -> List[AuditEntry]:
|
|
"""
|
|
Get audit log entries with optional filtering.
|
|
|
|
Args:
|
|
limit: Maximum number of entries to return.
|
|
offset: Number of entries to skip.
|
|
admin_id: Filter by admin user ID.
|
|
action: Filter by action name.
|
|
target_type: Filter by target type.
|
|
|
|
Returns:
|
|
List of audit entries.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
query = """
|
|
SELECT a.id, u.username as admin_username, a.admin_user_id,
|
|
a.action, a.target_type, a.target_id, a.details,
|
|
a.ip_address, a.created_at
|
|
FROM admin_audit_log a
|
|
JOIN users_v2 u ON a.admin_user_id = u.id
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
param_num = 1
|
|
|
|
if admin_id:
|
|
query += f" AND a.admin_user_id = ${param_num}"
|
|
params.append(admin_id)
|
|
param_num += 1
|
|
|
|
if action:
|
|
query += f" AND a.action = ${param_num}"
|
|
params.append(action)
|
|
param_num += 1
|
|
|
|
if target_type:
|
|
query += f" AND a.target_type = ${param_num}"
|
|
params.append(target_type)
|
|
param_num += 1
|
|
|
|
query += f" ORDER BY a.created_at DESC LIMIT ${param_num} OFFSET ${param_num + 1}"
|
|
params.extend([limit, offset])
|
|
|
|
rows = await conn.fetch(query, *params)
|
|
|
|
return [
|
|
AuditEntry(
|
|
id=row["id"],
|
|
admin_username=row["admin_username"],
|
|
admin_user_id=str(row["admin_user_id"]),
|
|
action=row["action"],
|
|
target_type=row["target_type"],
|
|
target_id=row["target_id"],
|
|
details=json.loads(row["details"]) if row["details"] else {},
|
|
ip_address=str(row["ip_address"]) if row["ip_address"] else None,
|
|
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
# -------------------------------------------------------------------------
|
|
# User Management
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def search_users(
|
|
self,
|
|
query: str = "",
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
include_banned: bool = True,
|
|
include_deleted: bool = False,
|
|
) -> List[UserDetails]:
|
|
"""
|
|
Search users by username or email.
|
|
|
|
Args:
|
|
query: Search query (matches username or email).
|
|
limit: Maximum number of results.
|
|
offset: Number of results to skip.
|
|
include_banned: Include banned users.
|
|
include_deleted: Include soft-deleted users.
|
|
|
|
Returns:
|
|
List of user details.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
sql = """
|
|
SELECT u.id, u.username, u.email, u.role,
|
|
u.email_verified, u.is_banned, u.ban_reason,
|
|
u.force_password_reset, u.created_at, u.last_login,
|
|
u.last_seen_at, u.is_active,
|
|
COALESCE(s.games_played, 0) as games_played,
|
|
COALESCE(s.games_won, 0) as games_won
|
|
FROM users_v2 u
|
|
LEFT JOIN player_stats s ON u.id = s.user_id
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
param_num = 1
|
|
|
|
if query:
|
|
sql += f" AND (u.username ILIKE ${param_num} OR u.email ILIKE ${param_num})"
|
|
params.append(f"%{query}%")
|
|
param_num += 1
|
|
|
|
if not include_banned:
|
|
sql += " AND (u.is_banned = false OR u.is_banned IS NULL)"
|
|
|
|
if not include_deleted:
|
|
sql += " AND u.deleted_at IS NULL"
|
|
|
|
sql += f" ORDER BY u.created_at DESC LIMIT ${param_num} OFFSET ${param_num + 1}"
|
|
params.extend([limit, offset])
|
|
|
|
rows = await conn.fetch(sql, *params)
|
|
|
|
return [
|
|
UserDetails(
|
|
id=str(row["id"]),
|
|
username=row["username"],
|
|
email=row["email"],
|
|
role=row["role"],
|
|
email_verified=row["email_verified"],
|
|
is_banned=row["is_banned"] or False,
|
|
ban_reason=row["ban_reason"],
|
|
force_password_reset=row["force_password_reset"] or False,
|
|
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
|
|
last_login=row["last_login"].replace(tzinfo=timezone.utc) if row["last_login"] else None,
|
|
last_seen_at=row["last_seen_at"].replace(tzinfo=timezone.utc) if row["last_seen_at"] else None,
|
|
is_active=row["is_active"],
|
|
games_played=row["games_played"] or 0,
|
|
games_won=row["games_won"] or 0,
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
async def get_user(self, user_id: str) -> Optional[UserDetails]:
|
|
"""
|
|
Get detailed user info by ID.
|
|
|
|
Args:
|
|
user_id: User UUID.
|
|
|
|
Returns:
|
|
User details, or None if not found.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
SELECT u.id, u.username, u.email, u.role,
|
|
u.email_verified, u.is_banned, u.ban_reason,
|
|
u.force_password_reset, u.created_at, u.last_login,
|
|
u.last_seen_at, u.is_active,
|
|
COALESCE(s.games_played, 0) as games_played,
|
|
COALESCE(s.games_won, 0) as games_won
|
|
FROM users_v2 u
|
|
LEFT JOIN player_stats s ON u.id = s.user_id
|
|
WHERE u.id = $1
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
if not row:
|
|
return None
|
|
|
|
return UserDetails(
|
|
id=str(row["id"]),
|
|
username=row["username"],
|
|
email=row["email"],
|
|
role=row["role"],
|
|
email_verified=row["email_verified"],
|
|
is_banned=row["is_banned"] or False,
|
|
ban_reason=row["ban_reason"],
|
|
force_password_reset=row["force_password_reset"] or False,
|
|
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
|
|
last_login=row["last_login"].replace(tzinfo=timezone.utc) if row["last_login"] else None,
|
|
last_seen_at=row["last_seen_at"].replace(tzinfo=timezone.utc) if row["last_seen_at"] else None,
|
|
is_active=row["is_active"],
|
|
games_played=row["games_played"] or 0,
|
|
games_won=row["games_won"] or 0,
|
|
)
|
|
|
|
async def ban_user(
|
|
self,
|
|
admin_id: str,
|
|
user_id: str,
|
|
reason: str,
|
|
duration_days: Optional[int] = None,
|
|
ip_address: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Ban a user.
|
|
|
|
Args:
|
|
admin_id: Admin performing the ban.
|
|
user_id: User to ban.
|
|
reason: Reason for ban.
|
|
duration_days: Optional ban duration (None = permanent).
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
True if ban was successful.
|
|
"""
|
|
expires_at = None
|
|
if duration_days:
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=duration_days)
|
|
|
|
async with self.pool.acquire() as conn:
|
|
# Check user exists and isn't admin
|
|
user = await conn.fetchrow(
|
|
"SELECT role FROM users_v2 WHERE id = $1",
|
|
user_id,
|
|
)
|
|
|
|
if not user:
|
|
return False
|
|
|
|
if user["role"] == "admin":
|
|
logger.warning(f"Admin {admin_id} attempted to ban admin {user_id}")
|
|
return False # Can't ban admins
|
|
|
|
# Create ban record
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO user_bans (user_id, banned_by, reason, expires_at)
|
|
VALUES ($1, $2, $3, $4)
|
|
""",
|
|
user_id,
|
|
admin_id,
|
|
reason,
|
|
expires_at,
|
|
)
|
|
|
|
# Update user
|
|
await conn.execute(
|
|
"""
|
|
UPDATE users_v2
|
|
SET is_banned = true, ban_reason = $1
|
|
WHERE id = $2
|
|
""",
|
|
reason,
|
|
user_id,
|
|
)
|
|
|
|
# Revoke all sessions
|
|
await conn.execute(
|
|
"""
|
|
UPDATE user_sessions
|
|
SET revoked_at = NOW()
|
|
WHERE user_id = $1 AND revoked_at IS NULL
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
# Kick from any active games (if state cache available)
|
|
if self.state_cache:
|
|
await self._kick_from_games(user_id)
|
|
|
|
# Audit log
|
|
await self.audit(
|
|
admin_id,
|
|
"ban_user",
|
|
"user",
|
|
user_id,
|
|
{"reason": reason, "duration_days": duration_days},
|
|
ip_address,
|
|
)
|
|
|
|
logger.info(f"Admin {admin_id} banned user {user_id}: {reason}")
|
|
return True
|
|
|
|
async def unban_user(
|
|
self,
|
|
admin_id: str,
|
|
user_id: str,
|
|
ip_address: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Unban a user.
|
|
|
|
Args:
|
|
admin_id: Admin performing the unban.
|
|
user_id: User to unban.
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
True if unban was successful.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
# Update ban record
|
|
await conn.execute(
|
|
"""
|
|
UPDATE user_bans
|
|
SET unbanned_at = NOW(), unbanned_by = $1
|
|
WHERE user_id = $2 AND unbanned_at IS NULL
|
|
""",
|
|
admin_id,
|
|
user_id,
|
|
)
|
|
|
|
# Update user
|
|
result = await conn.execute(
|
|
"""
|
|
UPDATE users_v2
|
|
SET is_banned = false, ban_reason = NULL
|
|
WHERE id = $1
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
if result == "UPDATE 0":
|
|
return False
|
|
|
|
await self.audit(
|
|
admin_id,
|
|
"unban_user",
|
|
"user",
|
|
user_id,
|
|
ip_address=ip_address,
|
|
)
|
|
|
|
logger.info(f"Admin {admin_id} unbanned user {user_id}")
|
|
return True
|
|
|
|
async def force_password_reset(
|
|
self,
|
|
admin_id: str,
|
|
user_id: str,
|
|
ip_address: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Force user to reset password on next login.
|
|
|
|
Args:
|
|
admin_id: Admin performing the action.
|
|
user_id: User to force reset.
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
True if successful.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"""
|
|
UPDATE users_v2
|
|
SET force_password_reset = true
|
|
WHERE id = $1
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
if result == "UPDATE 0":
|
|
return False
|
|
|
|
# Revoke all sessions to force re-login
|
|
await conn.execute(
|
|
"""
|
|
UPDATE user_sessions
|
|
SET revoked_at = NOW()
|
|
WHERE user_id = $1 AND revoked_at IS NULL
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
await self.audit(
|
|
admin_id,
|
|
"force_password_reset",
|
|
"user",
|
|
user_id,
|
|
ip_address=ip_address,
|
|
)
|
|
|
|
logger.info(f"Admin {admin_id} forced password reset for user {user_id}")
|
|
return True
|
|
|
|
async def change_user_role(
|
|
self,
|
|
admin_id: str,
|
|
user_id: str,
|
|
new_role: str,
|
|
ip_address: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Change user role.
|
|
|
|
Args:
|
|
admin_id: Admin performing the action.
|
|
user_id: User to modify.
|
|
new_role: New role ("user" or "admin").
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
True if successful.
|
|
"""
|
|
if new_role not in ("user", "admin"):
|
|
return False
|
|
|
|
async with self.pool.acquire() as conn:
|
|
# Get old role for audit
|
|
old = await conn.fetchrow(
|
|
"SELECT role FROM users_v2 WHERE id = $1",
|
|
user_id,
|
|
)
|
|
|
|
if not old:
|
|
return False
|
|
|
|
await conn.execute(
|
|
"UPDATE users_v2 SET role = $1 WHERE id = $2",
|
|
new_role,
|
|
user_id,
|
|
)
|
|
|
|
await self.audit(
|
|
admin_id,
|
|
"change_role",
|
|
"user",
|
|
user_id,
|
|
{"old_role": old["role"], "new_role": new_role},
|
|
ip_address,
|
|
)
|
|
|
|
logger.info(f"Admin {admin_id} changed role for user {user_id}: {old['role']} -> {new_role}")
|
|
return True
|
|
|
|
async def get_user_ban_history(self, user_id: str) -> List[dict]:
|
|
"""
|
|
Get ban history for a user.
|
|
|
|
Args:
|
|
user_id: User UUID.
|
|
|
|
Returns:
|
|
List of ban records.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT b.id, b.reason, b.banned_at, b.expires_at,
|
|
b.unbanned_at, u1.username as banned_by_username,
|
|
u2.username as unbanned_by_username
|
|
FROM user_bans b
|
|
JOIN users_v2 u1 ON b.banned_by = u1.id
|
|
LEFT JOIN users_v2 u2 ON b.unbanned_by = u2.id
|
|
WHERE b.user_id = $1
|
|
ORDER BY b.banned_at DESC
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
return [
|
|
{
|
|
"id": row["id"],
|
|
"reason": row["reason"],
|
|
"banned_at": row["banned_at"].isoformat() if row["banned_at"] else None,
|
|
"expires_at": row["expires_at"].isoformat() if row["expires_at"] else None,
|
|
"unbanned_at": row["unbanned_at"].isoformat() if row["unbanned_at"] else None,
|
|
"banned_by": row["banned_by_username"],
|
|
"unbanned_by": row["unbanned_by_username"],
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
# -------------------------------------------------------------------------
|
|
# User Impersonation (Read-Only)
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def impersonate_user(
|
|
self,
|
|
admin_id: str,
|
|
user_id: str,
|
|
ip_address: Optional[str] = None,
|
|
) -> Optional[User]:
|
|
"""
|
|
Get user object for read-only impersonation.
|
|
|
|
This allows an admin to view the app as another user would see it,
|
|
without being able to make changes. The returned User object should
|
|
only be used for read operations.
|
|
|
|
Args:
|
|
admin_id: Admin performing impersonation.
|
|
user_id: User to impersonate.
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
User object for impersonation, or None if user not found.
|
|
"""
|
|
user = await self.user_store.get_user_by_id(user_id)
|
|
|
|
if user:
|
|
await self.audit(
|
|
admin_id,
|
|
"impersonate_user",
|
|
"user",
|
|
user_id,
|
|
ip_address=ip_address,
|
|
)
|
|
logger.info(f"Admin {admin_id} started impersonating user {user_id}")
|
|
|
|
return user
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Game Moderation
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def get_active_games(self) -> List[dict]:
|
|
"""
|
|
Get all active games.
|
|
|
|
Returns:
|
|
List of active game info dicts.
|
|
"""
|
|
if not self.state_cache:
|
|
# Fall back to database
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT id, room_code, status, created_at, started_at,
|
|
num_players, num_rounds, host_id
|
|
FROM games_v2
|
|
WHERE status = 'active'
|
|
ORDER BY created_at DESC
|
|
"""
|
|
)
|
|
return [
|
|
{
|
|
"game_id": str(row["id"]),
|
|
"room_code": row["room_code"],
|
|
"status": row["status"],
|
|
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
|
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
|
|
"player_count": row["num_players"] or 0,
|
|
"num_rounds": row["num_rounds"] or 0,
|
|
"host_id": row["host_id"],
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
# Use Redis state cache for live data
|
|
rooms = await self.state_cache.get_active_rooms()
|
|
games = []
|
|
|
|
for room_code in rooms:
|
|
room = await self.state_cache.get_room(room_code)
|
|
if room:
|
|
game_id = room.get("game_id")
|
|
state = None
|
|
if game_id:
|
|
state = await self.state_cache.get_game_state(game_id)
|
|
|
|
players = await self.state_cache.get_room_players(room_code)
|
|
|
|
games.append({
|
|
"room_code": room_code,
|
|
"game_id": game_id,
|
|
"status": room.get("status"),
|
|
"created_at": room.get("created_at"),
|
|
"player_count": len(players),
|
|
"phase": state.get("phase") if state else None,
|
|
"current_round": state.get("current_round") if state else None,
|
|
})
|
|
|
|
return games
|
|
|
|
async def get_game_details(
|
|
self,
|
|
admin_id: str,
|
|
game_id: str,
|
|
ip_address: Optional[str] = None,
|
|
) -> Optional[dict]:
|
|
"""
|
|
Get full game state (admin view).
|
|
|
|
Args:
|
|
admin_id: Admin requesting the view.
|
|
game_id: Game UUID.
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
Full game state dict, or None if not found.
|
|
"""
|
|
state = None
|
|
|
|
if self.state_cache:
|
|
state = await self.state_cache.get_game_state(game_id)
|
|
|
|
if not state:
|
|
# Try database
|
|
async with self.pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
SELECT id, room_code, status, created_at, started_at,
|
|
completed_at, num_players, num_rounds, options,
|
|
winner_id, host_id, player_ids
|
|
FROM games_v2
|
|
WHERE id = $1
|
|
""",
|
|
game_id,
|
|
)
|
|
if row:
|
|
state = {
|
|
"game_id": str(row["id"]),
|
|
"room_code": row["room_code"],
|
|
"status": row["status"],
|
|
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
|
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
|
|
"completed_at": row["completed_at"].isoformat() if row["completed_at"] else None,
|
|
"num_players": row["num_players"],
|
|
"num_rounds": row["num_rounds"],
|
|
"options": json.loads(row["options"]) if row["options"] else {},
|
|
"winner_id": row["winner_id"],
|
|
"host_id": row["host_id"],
|
|
"player_ids": row["player_ids"] or [],
|
|
}
|
|
|
|
if state:
|
|
await self.audit(
|
|
admin_id,
|
|
"view_game",
|
|
"game",
|
|
game_id,
|
|
ip_address=ip_address,
|
|
)
|
|
|
|
return state
|
|
|
|
async def end_game(
|
|
self,
|
|
admin_id: str,
|
|
game_id: str,
|
|
reason: str,
|
|
ip_address: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Force-end a stuck game.
|
|
|
|
Args:
|
|
admin_id: Admin ending the game.
|
|
game_id: Game UUID.
|
|
reason: Reason for ending.
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
True if game was ended.
|
|
"""
|
|
room_code = None
|
|
|
|
if self.state_cache:
|
|
state = await self.state_cache.get_game_state(game_id)
|
|
if state:
|
|
room_code = state.get("room_code")
|
|
# Mark game as ended in cache
|
|
state["phase"] = "game_over"
|
|
state["admin_ended"] = True
|
|
state["admin_end_reason"] = reason
|
|
await self.state_cache.save_game_state(game_id, state)
|
|
|
|
# Update database
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"""
|
|
UPDATE games_v2
|
|
SET status = 'abandoned', completed_at = NOW()
|
|
WHERE id = $1 AND status = 'active'
|
|
""",
|
|
game_id,
|
|
)
|
|
|
|
if result == "UPDATE 0" and not room_code:
|
|
return False
|
|
|
|
# Get room code if we didn't have it
|
|
if not room_code:
|
|
row = await conn.fetchrow(
|
|
"SELECT room_code FROM games_v2 WHERE id = $1",
|
|
game_id,
|
|
)
|
|
if row:
|
|
room_code = row["room_code"]
|
|
|
|
await self.audit(
|
|
admin_id,
|
|
"end_game",
|
|
"game",
|
|
game_id,
|
|
{"reason": reason, "room_code": room_code},
|
|
ip_address,
|
|
)
|
|
|
|
logger.info(f"Admin {admin_id} ended game {game_id}: {reason}")
|
|
return True
|
|
|
|
async def _kick_from_games(self, user_id: str) -> None:
|
|
"""
|
|
Kick user from any active games.
|
|
|
|
Args:
|
|
user_id: User to kick.
|
|
"""
|
|
if not self.state_cache:
|
|
return
|
|
|
|
player_room = await self.state_cache.get_player_room(user_id)
|
|
if player_room:
|
|
await self.state_cache.remove_player_from_room(player_room, user_id)
|
|
logger.info(f"Kicked user {user_id} from room {player_room}")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# System Stats
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def get_system_stats(self) -> SystemStats:
|
|
"""
|
|
Get current system statistics.
|
|
|
|
Returns:
|
|
SystemStats snapshot.
|
|
"""
|
|
# Active games from Redis
|
|
active_games = 0
|
|
if self.state_cache:
|
|
active_rooms = await self.state_cache.get_active_rooms()
|
|
active_games = len(active_rooms)
|
|
|
|
async with self.pool.acquire() as conn:
|
|
# Total users
|
|
total_users = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM users_v2 WHERE deleted_at IS NULL"
|
|
)
|
|
|
|
# Total completed games
|
|
total_games = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM games_v2 WHERE status = 'completed'"
|
|
)
|
|
|
|
# Registrations today
|
|
reg_today = await conn.fetchval(
|
|
"""
|
|
SELECT COUNT(*) FROM users_v2
|
|
WHERE created_at >= CURRENT_DATE
|
|
AND deleted_at IS NULL
|
|
"""
|
|
)
|
|
|
|
# Registrations this week
|
|
reg_week = await conn.fetchval(
|
|
"""
|
|
SELECT COUNT(*) FROM users_v2
|
|
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
|
|
AND deleted_at IS NULL
|
|
"""
|
|
)
|
|
|
|
# Games today
|
|
games_today = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM games_v2 WHERE created_at >= CURRENT_DATE"
|
|
)
|
|
|
|
# Events last hour
|
|
events_hour = await conn.fetchval(
|
|
"""
|
|
SELECT COUNT(*) FROM events
|
|
WHERE created_at >= NOW() - INTERVAL '1 hour'
|
|
"""
|
|
) or 0
|
|
|
|
# Top players (by wins)
|
|
top_players = await conn.fetch(
|
|
"""
|
|
SELECT u.username, s.games_won, s.games_played
|
|
FROM player_stats s
|
|
JOIN users_v2 u ON s.user_id = u.id
|
|
WHERE s.games_played >= 3
|
|
ORDER BY s.games_won DESC
|
|
LIMIT 10
|
|
"""
|
|
)
|
|
|
|
# Active users (sessions used in last hour)
|
|
active_users = await conn.fetchval(
|
|
"""
|
|
SELECT COUNT(DISTINCT user_id)
|
|
FROM user_sessions
|
|
WHERE last_used_at >= NOW() - INTERVAL '1 hour'
|
|
AND revoked_at IS NULL
|
|
"""
|
|
)
|
|
|
|
return SystemStats(
|
|
active_users_now=active_users or 0,
|
|
active_games_now=active_games,
|
|
total_users=total_users or 0,
|
|
total_games_completed=total_games or 0,
|
|
registrations_today=reg_today or 0,
|
|
registrations_week=reg_week or 0,
|
|
games_today=games_today or 0,
|
|
events_last_hour=events_hour,
|
|
top_players=[
|
|
{
|
|
"username": p["username"],
|
|
"games_won": p["games_won"],
|
|
"games_played": p["games_played"],
|
|
}
|
|
for p in top_players
|
|
],
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Invite Codes
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def create_invite_code(
|
|
self,
|
|
admin_id: str,
|
|
max_uses: int = 1,
|
|
expires_days: int = 7,
|
|
ip_address: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Create a new invite code.
|
|
|
|
Args:
|
|
admin_id: Admin creating the code.
|
|
max_uses: Maximum number of uses.
|
|
expires_days: Days until expiration.
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
The generated invite code.
|
|
"""
|
|
code = secrets.token_urlsafe(6).upper()[:8]
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=expires_days)
|
|
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO invite_codes (code, created_by, expires_at, max_uses)
|
|
VALUES ($1, $2, $3, $4)
|
|
""",
|
|
code,
|
|
admin_id,
|
|
expires_at,
|
|
max_uses,
|
|
)
|
|
|
|
await self.audit(
|
|
admin_id,
|
|
"create_invite",
|
|
"invite_code",
|
|
code,
|
|
{"max_uses": max_uses, "expires_days": expires_days},
|
|
ip_address,
|
|
)
|
|
|
|
logger.info(f"Admin {admin_id} created invite code {code}")
|
|
return code
|
|
|
|
async def get_invite_codes(self, include_expired: bool = False) -> List[InviteCode]:
|
|
"""
|
|
Get all invite codes.
|
|
|
|
Args:
|
|
include_expired: Include expired/inactive codes.
|
|
|
|
Returns:
|
|
List of invite codes.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
query = """
|
|
SELECT c.code, c.created_by, c.created_at, c.expires_at,
|
|
c.max_uses, c.use_count, c.is_active,
|
|
u.username as created_by_username
|
|
FROM invite_codes c
|
|
JOIN users_v2 u ON c.created_by = u.id
|
|
"""
|
|
if not include_expired:
|
|
query += " WHERE c.expires_at > NOW() AND c.is_active = true"
|
|
|
|
query += " ORDER BY c.created_at DESC"
|
|
|
|
rows = await conn.fetch(query)
|
|
|
|
return [
|
|
InviteCode(
|
|
code=row["code"],
|
|
created_by=str(row["created_by"]),
|
|
created_by_username=row["created_by_username"],
|
|
created_at=row["created_at"].replace(tzinfo=timezone.utc) if row["created_at"] else None,
|
|
expires_at=row["expires_at"].replace(tzinfo=timezone.utc) if row["expires_at"] else None,
|
|
max_uses=row["max_uses"],
|
|
use_count=row["use_count"],
|
|
is_active=row["is_active"],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
async def revoke_invite_code(
|
|
self,
|
|
admin_id: str,
|
|
code: str,
|
|
ip_address: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Revoke an invite code.
|
|
|
|
Args:
|
|
admin_id: Admin revoking the code.
|
|
code: Code to revoke.
|
|
ip_address: Admin's IP address for audit.
|
|
|
|
Returns:
|
|
True if code was revoked.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"UPDATE invite_codes SET is_active = false WHERE code = $1",
|
|
code,
|
|
)
|
|
|
|
if result == "UPDATE 0":
|
|
return False
|
|
|
|
await self.audit(
|
|
admin_id,
|
|
"revoke_invite",
|
|
"invite_code",
|
|
code,
|
|
ip_address=ip_address,
|
|
)
|
|
|
|
logger.info(f"Admin {admin_id} revoked invite code {code}")
|
|
return True
|
|
|
|
async def validate_invite_code(self, code: str) -> bool:
|
|
"""
|
|
Check if an invite code is valid.
|
|
|
|
Args:
|
|
code: Code to validate.
|
|
|
|
Returns:
|
|
True if code is valid and has remaining uses.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
SELECT max_uses, use_count, expires_at, is_active
|
|
FROM invite_codes
|
|
WHERE code = $1
|
|
""",
|
|
code,
|
|
)
|
|
|
|
if not row:
|
|
return False
|
|
|
|
if not row["is_active"]:
|
|
return False
|
|
|
|
if row["expires_at"] and row["expires_at"] < datetime.now(timezone.utc):
|
|
return False
|
|
|
|
if row["use_count"] >= row["max_uses"]:
|
|
return False
|
|
|
|
return True
|
|
|
|
async def use_invite_code(self, code: str) -> bool:
|
|
"""
|
|
Use an invite code (increment use count).
|
|
|
|
Args:
|
|
code: Code to use.
|
|
|
|
Returns:
|
|
True if code was successfully used.
|
|
"""
|
|
if not await self.validate_invite_code(code):
|
|
return False
|
|
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"""
|
|
UPDATE invite_codes
|
|
SET use_count = use_count + 1
|
|
WHERE code = $1 AND is_active = true
|
|
AND use_count < max_uses
|
|
AND expires_at > NOW()
|
|
""",
|
|
code,
|
|
)
|
|
|
|
return result != "UPDATE 0"
|
|
|
|
|
|
# Global admin service instance
|
|
_admin_service: Optional[AdminService] = None
|
|
|
|
|
|
async def get_admin_service(
|
|
pool: asyncpg.Pool,
|
|
user_store: UserStore,
|
|
state_cache=None,
|
|
) -> AdminService:
|
|
"""
|
|
Get or create the global admin service instance.
|
|
|
|
Args:
|
|
pool: asyncpg connection pool.
|
|
user_store: User persistence store.
|
|
state_cache: Optional Redis state cache.
|
|
|
|
Returns:
|
|
AdminService instance.
|
|
"""
|
|
global _admin_service
|
|
if _admin_service is None:
|
|
_admin_service = AdminService(pool, user_store, state_cache)
|
|
return _admin_service
|
|
|
|
|
|
def close_admin_service() -> None:
|
|
"""Close the global admin service."""
|
|
global _admin_service
|
|
_admin_service = None
|