v3.0.0: V3 features, server refactoring, and documentation overhaul

- Extract WebSocket handlers from main.py into handlers.py
- Add V3 feature docs (dealer rotation, dealing animation, round end reveal,
  column pair celebration, final turn urgency, opponent thinking, score tallying,
  card hover/selection, knock early drama, column pair indicator, swap animation
  improvements, draw source distinction, card value tooltips, active rules context,
  discard pile history, realistic card sounds)
- Add V3 refactoring docs (ai.py, main.py/game.py, misc improvements)
- Add installation guide with Docker, systemd, and nginx setup
- Add helper scripts (install.sh, dev-server.sh, docker-build.sh)
- Add animation flow diagrams documentation
- Add test files for handlers, rooms, and V3 features
- Add e2e test specs for V3 features
- Update README with complete project structure and current tech stack
- Update CLAUDE.md with full architecture tree and server layer descriptions
- Update .env.example to reflect PostgreSQL (remove SQLite references)
- Update .gitignore to exclude virtualenv files, .claude/, and .db files
- Remove tracked virtualenv files (bin/, lib64, pyvenv.cfg)
- Remove obsolete game_log.py (SQLite) and games.db

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
adlee-was-taken
2026-02-14 10:03:45 -05:00
parent 13ab5b9017
commit 9fc6b83bba
60 changed files with 11791 additions and 1639 deletions

View File

@@ -1,239 +0,0 @@
"""SQLite game logging for AI decision analysis."""
import json
import sqlite3
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from dataclasses import asdict
from game import Card, Player, Game, GameOptions
class GameLogger:
"""Logs game state and AI decisions to SQLite for post-game analysis."""
def __init__(self, db_path: str = "games.db"):
self.db_path = Path(db_path)
self._init_db()
def _init_db(self):
"""Initialize database schema."""
with sqlite3.connect(self.db_path) as conn:
conn.executescript("""
-- Games table
CREATE TABLE IF NOT EXISTS games (
id TEXT PRIMARY KEY,
room_code TEXT,
started_at TIMESTAMP,
ended_at TIMESTAMP,
num_players INTEGER,
options_json TEXT
);
-- Moves table (one per AI decision)
CREATE TABLE IF NOT EXISTS moves (
id INTEGER PRIMARY KEY AUTOINCREMENT,
game_id TEXT REFERENCES games(id),
move_number INTEGER,
timestamp TIMESTAMP,
player_id TEXT,
player_name TEXT,
is_cpu BOOLEAN,
-- Decision context
action TEXT,
-- Cards involved
card_rank TEXT,
card_suit TEXT,
position INTEGER,
-- Full state snapshot
hand_json TEXT,
discard_top_json TEXT,
visible_opponents_json TEXT,
-- AI reasoning
decision_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_moves_game_id ON moves(game_id);
CREATE INDEX IF NOT EXISTS idx_moves_action ON moves(action);
CREATE INDEX IF NOT EXISTS idx_moves_is_cpu ON moves(is_cpu);
""")
def log_game_start(
self, room_code: str, num_players: int, options: GameOptions
) -> str:
"""Log start of a new game. Returns game_id."""
game_id = str(uuid.uuid4())
options_dict = {
"flip_mode": options.flip_mode,
"initial_flips": options.initial_flips,
"knock_penalty": options.knock_penalty,
"use_jokers": options.use_jokers,
"lucky_swing": options.lucky_swing,
"super_kings": options.super_kings,
"ten_penny": options.ten_penny,
"knock_bonus": options.knock_bonus,
"underdog_bonus": options.underdog_bonus,
"tied_shame": options.tied_shame,
"blackjack": options.blackjack,
"eagle_eye": options.eagle_eye,
}
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO games (id, room_code, started_at, num_players, options_json)
VALUES (?, ?, ?, ?, ?)
""",
(game_id, room_code, datetime.now(), num_players, json.dumps(options_dict)),
)
return game_id
def log_move(
self,
game_id: str,
player: Player,
is_cpu: bool,
action: str,
card: Optional[Card] = None,
position: Optional[int] = None,
game: Optional[Game] = None,
decision_reason: Optional[str] = None,
):
"""Log a single move/decision."""
# Get current move number
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT COALESCE(MAX(move_number), 0) + 1 FROM moves WHERE game_id = ?",
(game_id,),
)
move_number = cursor.fetchone()[0]
# Serialize hand
hand_data = []
for c in player.cards:
hand_data.append({
"rank": c.rank.value,
"suit": c.suit.value,
"face_up": c.face_up,
})
# Serialize discard top
discard_top_data = None
if game:
discard_top = game.discard_top()
if discard_top:
discard_top_data = {
"rank": discard_top.rank.value,
"suit": discard_top.suit.value,
}
# Serialize visible opponent cards
visible_opponents = {}
if game:
for p in game.players:
if p.id != player.id:
visible = []
for c in p.cards:
if c.face_up:
visible.append({
"rank": c.rank.value,
"suit": c.suit.value,
})
visible_opponents[p.name] = visible
conn.execute(
"""
INSERT INTO moves (
game_id, move_number, timestamp, player_id, player_name, is_cpu,
action, card_rank, card_suit, position,
hand_json, discard_top_json, visible_opponents_json, decision_reason
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
game_id,
move_number,
datetime.now(),
player.id,
player.name,
is_cpu,
action,
card.rank.value if card else None,
card.suit.value if card else None,
position,
json.dumps(hand_data),
json.dumps(discard_top_data) if discard_top_data else None,
json.dumps(visible_opponents),
decision_reason,
),
)
def log_game_end(self, game_id: str):
"""Mark game as ended."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE games SET ended_at = ? WHERE id = ?",
(datetime.now(), game_id),
)
# Query helpers for analysis
def find_suspicious_discards(db_path: str = "games.db") -> list[dict]:
"""Find cases where AI discarded good cards (Ace, 2, King)."""
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT m.*, g.room_code
FROM moves m
JOIN games g ON m.game_id = g.id
WHERE m.action = 'discard'
AND m.card_rank IN ('A', '2', 'K')
AND m.is_cpu = 1
ORDER BY m.timestamp DESC
""")
return [dict(row) for row in cursor.fetchall()]
def get_player_decisions(db_path: str, game_id: str, player_name: str) -> list[dict]:
"""Get all decisions made by a specific player in a game."""
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM moves
WHERE game_id = ? AND player_name = ?
ORDER BY move_number
""", (game_id, player_name))
return [dict(row) for row in cursor.fetchall()]
def get_recent_games(db_path: str = "games.db", limit: int = 10) -> list[dict]:
"""Get list of recent games."""
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT g.*, COUNT(m.id) as total_moves
FROM games g
LEFT JOIN moves m ON g.id = m.game_id
GROUP BY g.id
ORDER BY g.started_at DESC
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
# Global logger instance (lazy initialization)
_logger: Optional[GameLogger] = None
def get_logger() -> GameLogger:
"""Get or create the global game logger instance."""
global _logger
if _logger is None:
_logger = GameLogger()
return _logger

Binary file not shown.

506
server/handlers.py Normal file
View File

@@ -0,0 +1,506 @@
"""WebSocket message handlers for the Golf card game.
Each handler corresponds to a single message type from the client.
Handlers are dispatched via the HANDLERS dict in main.py.
"""
import asyncio
import logging
import uuid
from dataclasses import dataclass
from typing import Optional
from fastapi import WebSocket
from game import GamePhase, GameOptions
from ai import GolfAI, get_all_profiles
from room import Room
from services.game_logger import get_logger
logger = logging.getLogger(__name__)
@dataclass
class ConnectionContext:
"""State tracked per WebSocket connection."""
websocket: WebSocket
connection_id: str
player_id: str
auth_user_id: Optional[str]
authenticated_user: object # Optional[User]
current_room: Optional[Room] = None
def log_human_action(room: Room, player, action: str, card=None, position=None, reason: str = ""):
"""Log a human player's game action (shared helper for all handlers)."""
game_logger = get_logger()
if game_logger and room.game_log_id and player:
game_logger.log_move(
game_id=room.game_log_id,
player=player,
is_cpu=False,
action=action,
card=card,
position=position,
game=room.game,
decision_reason=reason,
)
# ---------------------------------------------------------------------------
# Lobby / Room handlers
# ---------------------------------------------------------------------------
async def handle_create_room(data: dict, ctx: ConnectionContext, *, room_manager, count_user_games, max_concurrent, **kw) -> None:
if ctx.auth_user_id and count_user_games(ctx.auth_user_id) >= max_concurrent:
await ctx.websocket.send_json({
"type": "error",
"message": f"Maximum {max_concurrent} concurrent games allowed",
})
return
player_name = data.get("player_name", "Player")
if ctx.authenticated_user and ctx.authenticated_user.display_name:
player_name = ctx.authenticated_user.display_name
room = room_manager.create_room()
room.add_player(ctx.player_id, player_name, ctx.websocket, ctx.auth_user_id)
ctx.current_room = room
await ctx.websocket.send_json({
"type": "room_created",
"room_code": room.code,
"player_id": ctx.player_id,
"authenticated": ctx.authenticated_user is not None,
})
await room.broadcast({
"type": "player_joined",
"players": room.player_list(),
})
async def handle_join_room(data: dict, ctx: ConnectionContext, *, room_manager, count_user_games, max_concurrent, **kw) -> None:
room_code = data.get("room_code", "").upper()
player_name = data.get("player_name", "Player")
if ctx.auth_user_id and count_user_games(ctx.auth_user_id) >= max_concurrent:
await ctx.websocket.send_json({
"type": "error",
"message": f"Maximum {max_concurrent} concurrent games allowed",
})
return
room = room_manager.get_room(room_code)
if not room:
await ctx.websocket.send_json({"type": "error", "message": "Room not found"})
return
if len(room.players) >= 6:
await ctx.websocket.send_json({"type": "error", "message": "Room is full"})
return
if room.game.phase != GamePhase.WAITING:
await ctx.websocket.send_json({"type": "error", "message": "Game already in progress"})
return
if ctx.authenticated_user and ctx.authenticated_user.display_name:
player_name = ctx.authenticated_user.display_name
room.add_player(ctx.player_id, player_name, ctx.websocket, ctx.auth_user_id)
ctx.current_room = room
await ctx.websocket.send_json({
"type": "room_joined",
"room_code": room.code,
"player_id": ctx.player_id,
"authenticated": ctx.authenticated_user is not None,
})
await room.broadcast({
"type": "player_joined",
"players": room.player_list(),
})
async def handle_get_cpu_profiles(data: dict, ctx: ConnectionContext, **kw) -> None:
if not ctx.current_room:
return
await ctx.websocket.send_json({
"type": "cpu_profiles",
"profiles": get_all_profiles(),
})
async def handle_add_cpu(data: dict, ctx: ConnectionContext, **kw) -> None:
if not ctx.current_room:
return
room_player = ctx.current_room.get_player(ctx.player_id)
if not room_player or not room_player.is_host:
await ctx.websocket.send_json({"type": "error", "message": "Only the host can add CPU players"})
return
if len(ctx.current_room.players) >= 6:
await ctx.websocket.send_json({"type": "error", "message": "Room is full"})
return
cpu_id = f"cpu_{uuid.uuid4().hex[:8]}"
profile_name = data.get("profile_name")
cpu_player = ctx.current_room.add_cpu_player(cpu_id, profile_name)
if not cpu_player:
await ctx.websocket.send_json({"type": "error", "message": "CPU profile not available"})
return
await ctx.current_room.broadcast({
"type": "player_joined",
"players": ctx.current_room.player_list(),
})
async def handle_remove_cpu(data: dict, ctx: ConnectionContext, **kw) -> None:
if not ctx.current_room:
return
room_player = ctx.current_room.get_player(ctx.player_id)
if not room_player or not room_player.is_host:
return
cpu_players = ctx.current_room.get_cpu_players()
if cpu_players:
ctx.current_room.remove_player(cpu_players[-1].id)
await ctx.current_room.broadcast({
"type": "player_joined",
"players": ctx.current_room.player_list(),
})
# ---------------------------------------------------------------------------
# Game lifecycle handlers
# ---------------------------------------------------------------------------
async def handle_start_game(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
room_player = ctx.current_room.get_player(ctx.player_id)
if not room_player or not room_player.is_host:
await ctx.websocket.send_json({"type": "error", "message": "Only the host can start the game"})
return
if len(ctx.current_room.players) < 2:
await ctx.websocket.send_json({"type": "error", "message": "Need at least 2 players"})
return
num_decks = max(1, min(3, data.get("decks", 1)))
num_rounds = max(1, min(18, data.get("rounds", 1)))
options = GameOptions.from_client_data(data)
async with ctx.current_room.game_lock:
ctx.current_room.game.start_game(num_decks, num_rounds, options)
game_logger = get_logger()
if game_logger:
ctx.current_room.game_log_id = game_logger.log_game_start(
room_code=ctx.current_room.code,
num_players=len(ctx.current_room.players),
options=options,
)
# CPU players do their initial flips immediately
if options.initial_flips > 0:
for cpu in ctx.current_room.get_cpu_players():
positions = GolfAI.choose_initial_flips(options.initial_flips)
ctx.current_room.game.flip_initial_cards(cpu.id, positions)
# Send game started to all human players
for pid, player in ctx.current_room.players.items():
if player.websocket and not player.is_cpu:
game_state = ctx.current_room.game.get_state(pid)
await player.websocket.send_json({
"type": "game_started",
"game_state": game_state,
})
await check_and_run_cpu_turn(ctx.current_room)
async def handle_flip_initial(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
positions = data.get("positions", [])
async with ctx.current_room.game_lock:
if ctx.current_room.game.flip_initial_cards(ctx.player_id, positions):
await broadcast_game_state(ctx.current_room)
await check_and_run_cpu_turn(ctx.current_room)
# ---------------------------------------------------------------------------
# Turn action handlers
# ---------------------------------------------------------------------------
async def handle_draw(data: dict, ctx: ConnectionContext, *, broadcast_game_state, **kw) -> None:
if not ctx.current_room:
return
source = data.get("source", "deck")
async with ctx.current_room.game_lock:
discard_before_draw = ctx.current_room.game.discard_top()
card = ctx.current_room.game.draw_card(ctx.player_id, source)
if card:
player = ctx.current_room.game.get_player(ctx.player_id)
reason = f"took {discard_before_draw.rank.value} from discard" if source == "discard" else "drew from deck"
log_human_action(
ctx.current_room, player,
"take_discard" if source == "discard" else "draw_deck",
card=card, reason=reason,
)
await ctx.websocket.send_json({
"type": "card_drawn",
"card": card.to_dict(),
"source": source,
})
await broadcast_game_state(ctx.current_room)
async def handle_swap(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
position = data.get("position", 0)
async with ctx.current_room.game_lock:
drawn_card = ctx.current_room.game.drawn_card
player = ctx.current_room.game.get_player(ctx.player_id)
old_card = player.cards[position] if player and 0 <= position < len(player.cards) else None
discarded = ctx.current_room.game.swap_card(ctx.player_id, position)
if discarded:
if drawn_card and player:
old_rank = old_card.rank.value if old_card else "?"
log_human_action(
ctx.current_room, player, "swap",
card=drawn_card, position=position,
reason=f"swapped {drawn_card.rank.value} into position {position}, replaced {old_rank}",
)
await broadcast_game_state(ctx.current_room)
await asyncio.sleep(1.0)
await check_and_run_cpu_turn(ctx.current_room)
async def handle_discard(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
async with ctx.current_room.game_lock:
drawn_card = ctx.current_room.game.drawn_card
player = ctx.current_room.game.get_player(ctx.player_id)
if ctx.current_room.game.discard_drawn(ctx.player_id):
if drawn_card and player:
log_human_action(
ctx.current_room, player, "discard",
card=drawn_card,
reason=f"discarded {drawn_card.rank.value}",
)
await broadcast_game_state(ctx.current_room)
if ctx.current_room.game.flip_on_discard:
player = ctx.current_room.game.get_player(ctx.player_id)
has_face_down = player and any(not c.face_up for c in player.cards)
if has_face_down:
await ctx.websocket.send_json({
"type": "can_flip",
"optional": ctx.current_room.game.flip_is_optional,
})
else:
await asyncio.sleep(0.5)
await check_and_run_cpu_turn(ctx.current_room)
else:
logger.debug("Player discarded, waiting 0.5s before CPU turn")
await asyncio.sleep(0.5)
logger.debug("Post-discard delay complete, checking for CPU turn")
await check_and_run_cpu_turn(ctx.current_room)
async def handle_cancel_draw(data: dict, ctx: ConnectionContext, *, broadcast_game_state, **kw) -> None:
if not ctx.current_room:
return
async with ctx.current_room.game_lock:
if ctx.current_room.game.cancel_discard_draw(ctx.player_id):
await broadcast_game_state(ctx.current_room)
async def handle_flip_card(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
position = data.get("position", 0)
async with ctx.current_room.game_lock:
player = ctx.current_room.game.get_player(ctx.player_id)
ctx.current_room.game.flip_and_end_turn(ctx.player_id, position)
if player and 0 <= position < len(player.cards):
flipped_card = player.cards[position]
log_human_action(
ctx.current_room, player, "flip",
card=flipped_card, position=position,
reason=f"flipped card at position {position}",
)
await broadcast_game_state(ctx.current_room)
await check_and_run_cpu_turn(ctx.current_room)
async def handle_skip_flip(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
async with ctx.current_room.game_lock:
player = ctx.current_room.game.get_player(ctx.player_id)
if ctx.current_room.game.skip_flip_and_end_turn(ctx.player_id):
log_human_action(
ctx.current_room, player, "skip_flip",
reason="skipped optional flip (endgame mode)",
)
await broadcast_game_state(ctx.current_room)
await check_and_run_cpu_turn(ctx.current_room)
async def handle_flip_as_action(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
position = data.get("position", 0)
async with ctx.current_room.game_lock:
player = ctx.current_room.game.get_player(ctx.player_id)
if ctx.current_room.game.flip_card_as_action(ctx.player_id, position):
if player and 0 <= position < len(player.cards):
flipped_card = player.cards[position]
log_human_action(
ctx.current_room, player, "flip_as_action",
card=flipped_card, position=position,
reason=f"used flip-as-action to reveal position {position}",
)
await broadcast_game_state(ctx.current_room)
await check_and_run_cpu_turn(ctx.current_room)
async def handle_knock_early(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
async with ctx.current_room.game_lock:
player = ctx.current_room.game.get_player(ctx.player_id)
if ctx.current_room.game.knock_early(ctx.player_id):
if player:
face_down_count = sum(1 for c in player.cards if not c.face_up)
log_human_action(
ctx.current_room, player, "knock_early",
reason=f"knocked early, revealing {face_down_count} hidden cards",
)
await broadcast_game_state(ctx.current_room)
await check_and_run_cpu_turn(ctx.current_room)
async def handle_next_round(data: dict, ctx: ConnectionContext, *, broadcast_game_state, check_and_run_cpu_turn, **kw) -> None:
if not ctx.current_room:
return
room_player = ctx.current_room.get_player(ctx.player_id)
if not room_player or not room_player.is_host:
return
async with ctx.current_room.game_lock:
if ctx.current_room.game.start_next_round():
for cpu in ctx.current_room.get_cpu_players():
positions = GolfAI.choose_initial_flips()
ctx.current_room.game.flip_initial_cards(cpu.id, positions)
for pid, player in ctx.current_room.players.items():
if player.websocket and not player.is_cpu:
game_state = ctx.current_room.game.get_state(pid)
await player.websocket.send_json({
"type": "round_started",
"game_state": game_state,
})
await check_and_run_cpu_turn(ctx.current_room)
else:
await broadcast_game_state(ctx.current_room)
# ---------------------------------------------------------------------------
# Leave / End handlers
# ---------------------------------------------------------------------------
async def handle_leave_room(data: dict, ctx: ConnectionContext, *, handle_player_leave, **kw) -> None:
if ctx.current_room:
await handle_player_leave(ctx.current_room, ctx.player_id)
ctx.current_room = None
async def handle_leave_game(data: dict, ctx: ConnectionContext, *, handle_player_leave, **kw) -> None:
if ctx.current_room:
await handle_player_leave(ctx.current_room, ctx.player_id)
ctx.current_room = None
async def handle_end_game(data: dict, ctx: ConnectionContext, *, room_manager, cleanup_room_profiles, **kw) -> None:
if not ctx.current_room:
return
room_player = ctx.current_room.get_player(ctx.player_id)
if not room_player or not room_player.is_host:
await ctx.websocket.send_json({"type": "error", "message": "Only the host can end the game"})
return
await ctx.current_room.broadcast({
"type": "game_ended",
"reason": "Host ended the game",
})
room_code = ctx.current_room.code
for cpu in list(ctx.current_room.get_cpu_players()):
ctx.current_room.remove_player(cpu.id)
cleanup_room_profiles(room_code)
room_manager.remove_room(room_code)
ctx.current_room = None
# ---------------------------------------------------------------------------
# Handler dispatch table
# ---------------------------------------------------------------------------
HANDLERS = {
"create_room": handle_create_room,
"join_room": handle_join_room,
"get_cpu_profiles": handle_get_cpu_profiles,
"add_cpu": handle_add_cpu,
"remove_cpu": handle_remove_cpu,
"start_game": handle_start_game,
"flip_initial": handle_flip_initial,
"draw": handle_draw,
"swap": handle_swap,
"discard": handle_discard,
"cancel_draw": handle_cancel_draw,
"flip_card": handle_flip_card,
"skip_flip": handle_skip_flip,
"flip_as_action": handle_flip_as_action,
"knock_early": handle_knock_early,
"next_round": handle_next_round,
"leave_room": handle_leave_room,
"leave_game": handle_leave_game,
"end_game": handle_end_game,
}

View File

@@ -9,6 +9,7 @@ from typing import Optional
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, Header
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import redis.asyncio as redis
@@ -16,6 +17,7 @@ from config import config
from room import RoomManager, Room
from game import GamePhase, GameOptions
from ai import GolfAI, process_cpu_turn, get_all_profiles, reset_all_profiles, cleanup_room_profiles
from handlers import HANDLERS, ConnectionContext
from services.game_logger import GameLogger, get_logger, set_logger
# Import production components
@@ -79,127 +81,97 @@ async def _periodic_leaderboard_refresh():
logger.error(f"Leaderboard refresh failed: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler for async service initialization."""
global _user_store, _auth_service, _admin_service, _stats_service, _replay_service
global _spectator_manager, _leaderboard_refresh_task, _redis_client, _rate_limiter
async def _init_redis():
"""Initialize Redis client and rate limiter."""
global _redis_client, _rate_limiter
try:
_redis_client = redis.from_url(config.REDIS_URL, decode_responses=False)
await _redis_client.ping()
logger.info("Redis client connected")
# Note: Uvicorn handles SIGINT/SIGTERM and triggers lifespan cleanup automatically
if config.RATE_LIMIT_ENABLED:
from services.ratelimit import get_rate_limiter
_rate_limiter = await get_rate_limiter(_redis_client)
logger.info("Rate limiter initialized")
except Exception as e:
logger.warning(f"Redis connection failed: {e} - rate limiting disabled")
_redis_client = None
_rate_limiter = None
# Initialize Redis client (for rate limiting, health checks, etc.)
if config.REDIS_URL:
try:
_redis_client = redis.from_url(config.REDIS_URL, decode_responses=False)
await _redis_client.ping()
logger.info("Redis client connected")
# Initialize rate limiter
if config.RATE_LIMIT_ENABLED:
from services.ratelimit import get_rate_limiter
_rate_limiter = await get_rate_limiter(_redis_client)
logger.info("Rate limiter initialized")
except Exception as e:
logger.warning(f"Redis connection failed: {e} - rate limiting disabled")
_redis_client = None
_rate_limiter = None
async def _init_database_services():
"""Initialize all PostgreSQL-dependent services."""
global _user_store, _auth_service, _admin_service, _stats_service
global _replay_service, _spectator_manager, _leaderboard_refresh_task
# Initialize auth, admin, and stats services (requires PostgreSQL)
if config.POSTGRES_URL:
try:
from stores.user_store import get_user_store
from stores.event_store import get_event_store
from services.auth_service import get_auth_service
from services.admin_service import get_admin_service
from services.stats_service import StatsService, set_stats_service
from routers.auth import set_auth_service
from routers.admin import set_admin_service
from routers.stats import set_stats_service as set_stats_router_service
from routers.stats import set_auth_service as set_stats_auth_service
from stores.user_store import get_user_store
from stores.event_store import get_event_store
from services.auth_service import get_auth_service
from services.admin_service import get_admin_service
from services.stats_service import StatsService, set_stats_service
from routers.auth import set_auth_service
from routers.admin import set_admin_service
from routers.stats import set_stats_service as set_stats_router_service
from routers.stats import set_auth_service as set_stats_auth_service
logger.info("Initializing auth services...")
_user_store = await get_user_store(config.POSTGRES_URL)
_auth_service = await get_auth_service(_user_store)
set_auth_service(_auth_service)
logger.info("Auth services initialized successfully")
# Auth
_user_store = await get_user_store(config.POSTGRES_URL)
_auth_service = await get_auth_service(_user_store)
set_auth_service(_auth_service)
logger.info("Auth services initialized")
# Initialize admin service
logger.info("Initializing admin services...")
_admin_service = await get_admin_service(
pool=_user_store.pool,
user_store=_user_store,
state_cache=None, # Will add Redis state cache when available
)
set_admin_service(_admin_service)
logger.info("Admin services initialized successfully")
# Initialize stats service
logger.info("Initializing stats services...")
_event_store = await get_event_store(config.POSTGRES_URL)
_stats_service = StatsService(_user_store.pool, _event_store)
set_stats_service(_stats_service)
set_stats_router_service(_stats_service)
set_stats_auth_service(_auth_service)
logger.info("Stats services initialized successfully")
# Initialize game logger (uses event_store for move logging)
logger.info("Initializing game logger...")
_game_logger = GameLogger(_event_store)
set_logger(_game_logger)
logger.info("Game logger initialized with PostgreSQL backend")
# Initialize replay service
logger.info("Initializing replay services...")
from services.replay_service import get_replay_service, set_replay_service
from services.spectator import get_spectator_manager
from routers.replay import (
set_replay_service as set_replay_router_service,
set_auth_service as set_replay_auth_service,
set_spectator_manager as set_replay_spectator,
set_room_manager as set_replay_room_manager,
)
_replay_service = await get_replay_service(_user_store.pool, _event_store)
_spectator_manager = get_spectator_manager()
set_replay_service(_replay_service)
set_replay_router_service(_replay_service)
set_replay_auth_service(_auth_service)
set_replay_spectator(_spectator_manager)
set_replay_room_manager(room_manager)
logger.info("Replay services initialized successfully")
# Start periodic leaderboard refresh task
_leaderboard_refresh_task = asyncio.create_task(_periodic_leaderboard_refresh())
logger.info("Leaderboard refresh task started")
except Exception as e:
logger.error(f"Failed to initialize services: {e}")
raise
else:
logger.warning("POSTGRES_URL not configured - auth/admin/stats endpoints will not work")
# Set up health check dependencies
from routers.health import set_health_dependencies
db_pool = _user_store.pool if _user_store else None
set_health_dependencies(
db_pool=db_pool,
redis_client=_redis_client,
room_manager=room_manager,
# Admin
_admin_service = await get_admin_service(
pool=_user_store.pool,
user_store=_user_store,
state_cache=None,
)
set_admin_service(_admin_service)
logger.info("Admin services initialized")
logger.info(f"Golf server started (environment={config.ENVIRONMENT})")
# Stats + event store
_event_store = await get_event_store(config.POSTGRES_URL)
_stats_service = StatsService(_user_store.pool, _event_store)
set_stats_service(_stats_service)
set_stats_router_service(_stats_service)
set_stats_auth_service(_auth_service)
logger.info("Stats services initialized")
yield
# Game logger
_game_logger = GameLogger(_event_store)
set_logger(_game_logger)
logger.info("Game logger initialized")
# Graceful shutdown
logger.info("Shutdown initiated...")
# Replay + spectator
from services.replay_service import get_replay_service, set_replay_service
from services.spectator import get_spectator_manager
from routers.replay import (
set_replay_service as set_replay_router_service,
set_auth_service as set_replay_auth_service,
set_spectator_manager as set_replay_spectator,
set_room_manager as set_replay_room_manager,
)
_replay_service = await get_replay_service(_user_store.pool, _event_store)
_spectator_manager = get_spectator_manager()
set_replay_service(_replay_service)
set_replay_router_service(_replay_service)
set_replay_auth_service(_auth_service)
set_replay_spectator(_spectator_manager)
set_replay_room_manager(room_manager)
logger.info("Replay services initialized")
# Signal shutdown to all components
# Periodic leaderboard refresh
_leaderboard_refresh_task = asyncio.create_task(_periodic_leaderboard_refresh())
logger.info("Leaderboard refresh task started")
async def _shutdown_services():
"""Gracefully shut down all services."""
_shutdown_event.set()
# Close all WebSocket connections gracefully
await _close_all_websockets()
# Clean up all rooms and release CPU profiles
# Clean up rooms and CPU profiles
for room in list(room_manager.rooms.values()):
for cpu in list(room.get_cpu_players()):
room.remove_player(cpu.id)
@@ -207,7 +179,6 @@ async def lifespan(app: FastAPI):
reset_all_profiles()
logger.info("All rooms and CPU profiles cleaned up")
# Cancel background tasks
if _leaderboard_refresh_task:
_leaderboard_refresh_task.cancel()
try:
@@ -234,11 +205,40 @@ async def lifespan(app: FastAPI):
close_admin_service()
await close_user_store()
# Close Redis connection
if _redis_client:
await _redis_client.close()
logger.info("Redis connection closed")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler for async service initialization."""
if config.REDIS_URL:
await _init_redis()
if config.POSTGRES_URL:
try:
await _init_database_services()
except Exception as e:
logger.error(f"Failed to initialize services: {e}")
raise
else:
logger.warning("POSTGRES_URL not configured - auth/admin/stats endpoints will not work")
# Set up health check dependencies
from routers.health import set_health_dependencies
set_health_dependencies(
db_pool=_user_store.pool if _user_store else None,
redis_client=_redis_client,
room_manager=room_manager,
)
logger.info(f"Golf server started (environment={config.ENVIRONMENT})")
yield
logger.info("Shutdown initiated...")
await _shutdown_services()
logger.info("Shutdown complete")
@@ -467,11 +467,7 @@ async def websocket_endpoint(websocket: WebSocket):
except Exception as e:
logger.debug(f"WebSocket auth failed: {e}")
# Each connection gets a unique ID (allows multi-tab play)
connection_id = str(uuid.uuid4())
player_id = connection_id
# Track auth user separately for stats/limits (can be None)
auth_user_id = str(authenticated_user.id) if authenticated_user else None
if authenticated_user:
@@ -479,556 +475,34 @@ async def websocket_endpoint(websocket: WebSocket):
else:
logger.debug(f"WebSocket connected anonymously as {connection_id}")
current_room: Room | None = None
ctx = ConnectionContext(
websocket=websocket,
connection_id=connection_id,
player_id=connection_id,
auth_user_id=auth_user_id,
authenticated_user=authenticated_user,
)
# Shared dependencies passed to every handler
handler_deps = dict(
room_manager=room_manager,
count_user_games=count_user_games,
max_concurrent=MAX_CONCURRENT_GAMES,
broadcast_game_state=broadcast_game_state,
check_and_run_cpu_turn=check_and_run_cpu_turn,
handle_player_leave=handle_player_leave,
cleanup_room_profiles=cleanup_room_profiles,
)
try:
while True:
data = await websocket.receive_json()
msg_type = data.get("type")
if msg_type == "create_room":
# Check concurrent game limit for authenticated users
if auth_user_id and count_user_games(auth_user_id) >= MAX_CONCURRENT_GAMES:
await websocket.send_json({
"type": "error",
"message": f"Maximum {MAX_CONCURRENT_GAMES} concurrent games allowed",
})
continue
player_name = data.get("player_name", "Player")
# Use authenticated user's name if available
if authenticated_user and authenticated_user.display_name:
player_name = authenticated_user.display_name
room = room_manager.create_room()
room.add_player(player_id, player_name, websocket, auth_user_id)
current_room = room
await websocket.send_json({
"type": "room_created",
"room_code": room.code,
"player_id": player_id,
"authenticated": authenticated_user is not None,
})
await room.broadcast({
"type": "player_joined",
"players": room.player_list(),
})
elif msg_type == "join_room":
room_code = data.get("room_code", "").upper()
player_name = data.get("player_name", "Player")
# Check concurrent game limit for authenticated users
if auth_user_id and count_user_games(auth_user_id) >= MAX_CONCURRENT_GAMES:
await websocket.send_json({
"type": "error",
"message": f"Maximum {MAX_CONCURRENT_GAMES} concurrent games allowed",
})
continue
room = room_manager.get_room(room_code)
if not room:
await websocket.send_json({
"type": "error",
"message": "Room not found",
})
continue
if len(room.players) >= 6:
await websocket.send_json({
"type": "error",
"message": "Room is full",
})
continue
if room.game.phase != GamePhase.WAITING:
await websocket.send_json({
"type": "error",
"message": "Game already in progress",
})
continue
# Use authenticated user's name if available
if authenticated_user and authenticated_user.display_name:
player_name = authenticated_user.display_name
room.add_player(player_id, player_name, websocket, auth_user_id)
current_room = room
await websocket.send_json({
"type": "room_joined",
"room_code": room.code,
"player_id": player_id,
"authenticated": authenticated_user is not None,
})
await room.broadcast({
"type": "player_joined",
"players": room.player_list(),
})
elif msg_type == "get_cpu_profiles":
if not current_room:
continue
await websocket.send_json({
"type": "cpu_profiles",
"profiles": get_all_profiles(),
})
elif msg_type == "add_cpu":
if not current_room:
continue
room_player = current_room.get_player(player_id)
if not room_player or not room_player.is_host:
await websocket.send_json({
"type": "error",
"message": "Only the host can add CPU players",
})
continue
if len(current_room.players) >= 6:
await websocket.send_json({
"type": "error",
"message": "Room is full",
})
continue
cpu_id = f"cpu_{uuid.uuid4().hex[:8]}"
profile_name = data.get("profile_name")
cpu_player = current_room.add_cpu_player(cpu_id, profile_name)
if not cpu_player:
await websocket.send_json({
"type": "error",
"message": "CPU profile not available",
})
continue
await current_room.broadcast({
"type": "player_joined",
"players": current_room.player_list(),
})
elif msg_type == "remove_cpu":
if not current_room:
continue
room_player = current_room.get_player(player_id)
if not room_player or not room_player.is_host:
continue
# Remove the last CPU player
cpu_players = current_room.get_cpu_players()
if cpu_players:
current_room.remove_player(cpu_players[-1].id)
await current_room.broadcast({
"type": "player_joined",
"players": current_room.player_list(),
})
elif msg_type == "start_game":
if not current_room:
continue
room_player = current_room.get_player(player_id)
if not room_player or not room_player.is_host:
await websocket.send_json({
"type": "error",
"message": "Only the host can start the game",
})
continue
if len(current_room.players) < 2:
await websocket.send_json({
"type": "error",
"message": "Need at least 2 players",
})
continue
num_decks = data.get("decks", 1)
num_rounds = data.get("rounds", 1)
# Parse deck colors (validate against allowed colors)
allowed_colors = {
"red", "blue", "gold", "teal", "purple", "orange", "yellow",
"green", "pink", "cyan", "brown", "slate"
}
raw_deck_colors = data.get("deck_colors", ["red", "blue", "gold"])
deck_colors = [c for c in raw_deck_colors if c in allowed_colors]
if not deck_colors:
deck_colors = ["red", "blue", "gold"]
# Build game options
options = GameOptions(
# Standard options
flip_mode=data.get("flip_mode", "never"),
initial_flips=max(0, min(2, data.get("initial_flips", 2))),
knock_penalty=data.get("knock_penalty", False),
use_jokers=data.get("use_jokers", False),
# House Rules - Point Modifiers
lucky_swing=data.get("lucky_swing", False),
super_kings=data.get("super_kings", False),
ten_penny=data.get("ten_penny", False),
# House Rules - Bonuses/Penalties
knock_bonus=data.get("knock_bonus", False),
underdog_bonus=data.get("underdog_bonus", False),
tied_shame=data.get("tied_shame", False),
blackjack=data.get("blackjack", False),
eagle_eye=data.get("eagle_eye", False),
wolfpack=data.get("wolfpack", False),
# House Rules - New Variants
flip_as_action=data.get("flip_as_action", False),
four_of_a_kind=data.get("four_of_a_kind", False),
negative_pairs_keep_value=data.get("negative_pairs_keep_value", False),
one_eyed_jacks=data.get("one_eyed_jacks", False),
knock_early=data.get("knock_early", False),
# Multi-deck card back colors
deck_colors=deck_colors,
)
# Validate settings
num_decks = max(1, min(3, num_decks))
num_rounds = max(1, min(18, num_rounds))
async with current_room.game_lock:
current_room.game.start_game(num_decks, num_rounds, options)
# Log game start for AI analysis
game_logger = get_logger()
if game_logger:
current_room.game_log_id = game_logger.log_game_start(
room_code=current_room.code,
num_players=len(current_room.players),
options=options,
)
# CPU players do their initial flips immediately (if required)
if options.initial_flips > 0:
for cpu in current_room.get_cpu_players():
positions = GolfAI.choose_initial_flips(options.initial_flips)
current_room.game.flip_initial_cards(cpu.id, positions)
# Send game started to all human players with their personal view
for pid, player in current_room.players.items():
if player.websocket and not player.is_cpu:
game_state = current_room.game.get_state(pid)
await player.websocket.send_json({
"type": "game_started",
"game_state": game_state,
})
# Check if it's a CPU's turn to start
await check_and_run_cpu_turn(current_room)
elif msg_type == "flip_initial":
if not current_room:
continue
positions = data.get("positions", [])
async with current_room.game_lock:
if current_room.game.flip_initial_cards(player_id, positions):
await broadcast_game_state(current_room)
# Check if it's a CPU's turn
await check_and_run_cpu_turn(current_room)
elif msg_type == "draw":
if not current_room:
continue
source = data.get("source", "deck")
async with current_room.game_lock:
# Capture discard top before draw (for logging decision context)
discard_before_draw = current_room.game.discard_top()
card = current_room.game.draw_card(player_id, source)
if card:
# Log draw decision for human player
game_logger = get_logger()
if game_logger and current_room.game_log_id:
player = current_room.game.get_player(player_id)
if player:
reason = f"took {discard_before_draw.rank.value} from discard" if source == "discard" else "drew from deck"
game_logger.log_move(
game_id=current_room.game_log_id,
player=player,
is_cpu=False,
action="take_discard" if source == "discard" else "draw_deck",
card=card,
game=current_room.game,
decision_reason=reason,
)
# Send drawn card only to the player who drew
await websocket.send_json({
"type": "card_drawn",
"card": card.to_dict(),
"source": source,
})
await broadcast_game_state(current_room)
elif msg_type == "swap":
if not current_room:
continue
position = data.get("position", 0)
async with current_room.game_lock:
# Capture drawn card before swap for logging
drawn_card = current_room.game.drawn_card
player = current_room.game.get_player(player_id)
old_card = player.cards[position] if player and 0 <= position < len(player.cards) else None
discarded = current_room.game.swap_card(player_id, position)
if discarded:
# Log swap decision for human player
game_logger = get_logger()
if game_logger and current_room.game_log_id and drawn_card and player:
old_rank = old_card.rank.value if old_card else "?"
game_logger.log_move(
game_id=current_room.game_log_id,
player=player,
is_cpu=False,
action="swap",
card=drawn_card,
position=position,
game=current_room.game,
decision_reason=f"swapped {drawn_card.rank.value} into position {position}, replaced {old_rank}",
)
await broadcast_game_state(current_room)
# Let client swap animation complete (~550ms), then pause to show result
# Total 1.0s = 550ms animation + 450ms visible pause
await asyncio.sleep(1.0)
await check_and_run_cpu_turn(current_room)
elif msg_type == "discard":
if not current_room:
continue
async with current_room.game_lock:
# Capture drawn card before discard for logging
drawn_card = current_room.game.drawn_card
player = current_room.game.get_player(player_id)
if current_room.game.discard_drawn(player_id):
# Log discard decision for human player
game_logger = get_logger()
if game_logger and current_room.game_log_id and drawn_card and player:
game_logger.log_move(
game_id=current_room.game_log_id,
player=player,
is_cpu=False,
action="discard",
card=drawn_card,
game=current_room.game,
decision_reason=f"discarded {drawn_card.rank.value}",
)
await broadcast_game_state(current_room)
if current_room.game.flip_on_discard:
# Check if player has face-down cards to flip
player = current_room.game.get_player(player_id)
has_face_down = player and any(not c.face_up for c in player.cards)
if has_face_down:
await websocket.send_json({
"type": "can_flip",
"optional": current_room.game.flip_is_optional,
})
else:
# Let client animation complete before CPU turn
await asyncio.sleep(0.5)
await check_and_run_cpu_turn(current_room)
else:
# Turn ended - let client animation complete before CPU turn
# (player discard swoop animation is ~500ms: 350ms swoop + 150ms settle)
logger.debug(f"Player discarded, waiting 0.5s before CPU turn")
await asyncio.sleep(0.5)
logger.debug(f"Post-discard delay complete, checking for CPU turn")
await check_and_run_cpu_turn(current_room)
elif msg_type == "cancel_draw":
if not current_room:
continue
async with current_room.game_lock:
if current_room.game.cancel_discard_draw(player_id):
await broadcast_game_state(current_room)
elif msg_type == "flip_card":
if not current_room:
continue
position = data.get("position", 0)
async with current_room.game_lock:
player = current_room.game.get_player(player_id)
current_room.game.flip_and_end_turn(player_id, position)
# Log flip decision for human player
game_logger = get_logger()
if game_logger and current_room.game_log_id and player and 0 <= position < len(player.cards):
flipped_card = player.cards[position]
game_logger.log_move(
game_id=current_room.game_log_id,
player=player,
is_cpu=False,
action="flip",
card=flipped_card,
position=position,
game=current_room.game,
decision_reason=f"flipped card at position {position}",
)
await broadcast_game_state(current_room)
await check_and_run_cpu_turn(current_room)
elif msg_type == "skip_flip":
if not current_room:
continue
async with current_room.game_lock:
player = current_room.game.get_player(player_id)
if current_room.game.skip_flip_and_end_turn(player_id):
# Log skip flip decision for human player
game_logger = get_logger()
if game_logger and current_room.game_log_id and player:
game_logger.log_move(
game_id=current_room.game_log_id,
player=player,
is_cpu=False,
action="skip_flip",
card=None,
game=current_room.game,
decision_reason="skipped optional flip (endgame mode)",
)
await broadcast_game_state(current_room)
await check_and_run_cpu_turn(current_room)
elif msg_type == "flip_as_action":
if not current_room:
continue
position = data.get("position", 0)
async with current_room.game_lock:
player = current_room.game.get_player(player_id)
if current_room.game.flip_card_as_action(player_id, position):
# Log flip-as-action for human player
game_logger = get_logger()
if game_logger and current_room.game_log_id and player and 0 <= position < len(player.cards):
flipped_card = player.cards[position]
game_logger.log_move(
game_id=current_room.game_log_id,
player=player,
is_cpu=False,
action="flip_as_action",
card=flipped_card,
position=position,
game=current_room.game,
decision_reason=f"used flip-as-action to reveal position {position}",
)
await broadcast_game_state(current_room)
await check_and_run_cpu_turn(current_room)
elif msg_type == "knock_early":
if not current_room:
continue
async with current_room.game_lock:
player = current_room.game.get_player(player_id)
if current_room.game.knock_early(player_id):
# Log knock early for human player
game_logger = get_logger()
if game_logger and current_room.game_log_id and player:
face_down_count = sum(1 for c in player.cards if not c.face_up)
game_logger.log_move(
game_id=current_room.game_log_id,
player=player,
is_cpu=False,
action="knock_early",
card=None,
game=current_room.game,
decision_reason=f"knocked early, revealing {face_down_count} hidden cards",
)
await broadcast_game_state(current_room)
await check_and_run_cpu_turn(current_room)
elif msg_type == "next_round":
if not current_room:
continue
room_player = current_room.get_player(player_id)
if not room_player or not room_player.is_host:
continue
async with current_room.game_lock:
if current_room.game.start_next_round():
# CPU players do their initial flips
for cpu in current_room.get_cpu_players():
positions = GolfAI.choose_initial_flips()
current_room.game.flip_initial_cards(cpu.id, positions)
for pid, player in current_room.players.items():
if player.websocket and not player.is_cpu:
game_state = current_room.game.get_state(pid)
await player.websocket.send_json({
"type": "round_started",
"game_state": game_state,
})
await check_and_run_cpu_turn(current_room)
else:
# Game over
await broadcast_game_state(current_room)
elif msg_type == "leave_room":
if current_room:
await handle_player_leave(current_room, player_id)
current_room = None
elif msg_type == "leave_game":
# Player leaves during an active game
if current_room:
await handle_player_leave(current_room, player_id)
current_room = None
elif msg_type == "end_game":
# Host ends the game for everyone
if not current_room:
continue
room_player = current_room.get_player(player_id)
if not room_player or not room_player.is_host:
await websocket.send_json({
"type": "error",
"message": "Only the host can end the game",
})
continue
# Notify all players that the game has ended
await current_room.broadcast({
"type": "game_ended",
"reason": "Host ended the game",
})
# Clean up the room
room_code = current_room.code
for cpu in list(current_room.get_cpu_players()):
current_room.remove_player(cpu.id)
cleanup_room_profiles(room_code)
room_manager.remove_room(room_code)
current_room = None
handler = HANDLERS.get(data.get("type"))
if handler:
await handler(data, ctx, **handler_deps)
except WebSocketDisconnect:
if current_room:
await handle_player_leave(current_room, player_id)
if ctx.current_room:
await handle_player_leave(ctx.current_room, ctx.player_id)
async def _process_stats_safe(room: Room):
@@ -1192,68 +666,17 @@ if os.path.exists(client_path):
async def serve_index():
return FileResponse(os.path.join(client_path, "index.html"))
@app.get("/style.css")
async def serve_css():
return FileResponse(os.path.join(client_path, "style.css"), media_type="text/css")
@app.get("/app.js")
async def serve_js():
return FileResponse(os.path.join(client_path, "app.js"), media_type="application/javascript")
@app.get("/card-manager.js")
async def serve_card_manager():
return FileResponse(os.path.join(client_path, "card-manager.js"), media_type="application/javascript")
@app.get("/state-differ.js")
async def serve_state_differ():
return FileResponse(os.path.join(client_path, "state-differ.js"), media_type="application/javascript")
@app.get("/animation-queue.js")
async def serve_animation_queue():
return FileResponse(os.path.join(client_path, "animation-queue.js"), media_type="application/javascript")
@app.get("/timing-config.js")
async def serve_timing_config():
return FileResponse(os.path.join(client_path, "timing-config.js"), media_type="application/javascript")
@app.get("/leaderboard.js")
async def serve_leaderboard_js():
return FileResponse(os.path.join(client_path, "leaderboard.js"), media_type="application/javascript")
@app.get("/golfball-logo.svg")
async def serve_golfball_logo():
return FileResponse(os.path.join(client_path, "golfball-logo.svg"), media_type="image/svg+xml")
# Admin dashboard
@app.get("/admin")
async def serve_admin():
return FileResponse(os.path.join(client_path, "admin.html"))
@app.get("/admin.css")
async def serve_admin_css():
return FileResponse(os.path.join(client_path, "admin.css"), media_type="text/css")
@app.get("/admin.js")
async def serve_admin_js():
return FileResponse(os.path.join(client_path, "admin.js"), media_type="application/javascript")
@app.get("/replay.js")
async def serve_replay_js():
return FileResponse(os.path.join(client_path, "replay.js"), media_type="application/javascript")
@app.get("/card-animations.js")
async def serve_card_animations_js():
return FileResponse(os.path.join(client_path, "card-animations.js"), media_type="application/javascript")
@app.get("/anime.min.js")
async def serve_anime_js():
return FileResponse(os.path.join(client_path, "anime.min.js"), media_type="application/javascript")
# Serve replay page for share links
@app.get("/replay/{share_code}")
async def serve_replay_page(share_code: str):
return FileResponse(os.path.join(client_path, "index.html"))
# Mount static files for everything else (JS, CSS, SVG, etc.)
app.mount("/", StaticFiles(directory=client_path), name="static")
def run():
"""Run the server using uvicorn."""

View File

@@ -1,16 +1,25 @@
# Core dependencies
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
websockets>=12.0
python-dotenv>=1.0.0
# V2: Event sourcing infrastructure
# Database & caching
asyncpg>=0.29.0
redis>=5.0.0
# V2: Authentication
resend>=2.0.0
# Authentication
bcrypt>=4.1.0
# V2: Production monitoring (optional)
# Email service
resend>=2.0.0
# Production monitoring (optional)
sentry-sdk[fastapi]>=1.40.0
# Testing
pytest>=8.0.0
pytest-asyncio>=0.23.0
pytest-cov>=4.1.0
ruff>=0.1.0
mypy>=1.8.0

View File

@@ -254,12 +254,13 @@ class RoomManager:
"""Initialize an empty room manager."""
self.rooms: dict[str, Room] = {}
def _generate_code(self) -> str:
def _generate_code(self, max_attempts: int = 100) -> str:
"""Generate a unique 4-letter room code."""
while True:
for _ in range(max_attempts):
code = "".join(random.choices(string.ascii_uppercase, k=4))
if code not in self.rooms:
return code
raise RuntimeError("Could not generate unique room code")
def create_room(self) -> Room:
"""

View File

@@ -196,10 +196,12 @@ class TestDrawDiscardMechanics:
self.game.add_player(Player(id="p2", name="Player 2"))
# Skip initial flip phase to test draw/discard mechanics directly
self.game.start_game(options=GameOptions(initial_flips=0))
# Get the actual current player (after dealer rotation, it's p2)
self.current_player_id = self.game.current_player().id
def test_can_draw_from_deck(self):
"""Player can draw from deck."""
card = self.game.draw_card("p1", "deck")
card = self.game.draw_card(self.current_player_id, "deck")
assert card is not None
assert self.game.drawn_card == card
assert self.game.drawn_from_discard is False
@@ -207,7 +209,7 @@ class TestDrawDiscardMechanics:
def test_can_draw_from_discard(self):
"""Player can draw from discard pile."""
discard_top = self.game.discard_top()
card = self.game.draw_card("p1", "discard")
card = self.game.draw_card(self.current_player_id, "discard")
assert card is not None
assert card == discard_top
assert self.game.drawn_card == card
@@ -215,40 +217,40 @@ class TestDrawDiscardMechanics:
def test_can_discard_deck_draw(self):
"""Card drawn from deck CAN be discarded."""
self.game.draw_card("p1", "deck")
self.game.draw_card(self.current_player_id, "deck")
assert self.game.can_discard_drawn() is True
result = self.game.discard_drawn("p1")
result = self.game.discard_drawn(self.current_player_id)
assert result is True
def test_cannot_discard_discard_draw(self):
"""Card drawn from discard pile CANNOT be re-discarded."""
self.game.draw_card("p1", "discard")
self.game.draw_card(self.current_player_id, "discard")
assert self.game.can_discard_drawn() is False
result = self.game.discard_drawn("p1")
result = self.game.discard_drawn(self.current_player_id)
assert result is False
def test_must_swap_discard_draw(self):
"""When drawing from discard, must swap with a hand card."""
self.game.draw_card("p1", "discard")
self.game.draw_card(self.current_player_id, "discard")
# Can't discard, must swap
assert self.game.can_discard_drawn() is False
# Swap works
old_card = self.game.swap_card("p1", 0)
old_card = self.game.swap_card(self.current_player_id, 0)
assert old_card is not None
assert self.game.drawn_card is None
def test_swap_makes_card_face_up(self):
"""Swapped card is placed face up."""
player = self.game.get_player("p1")
player = self.game.get_player(self.current_player_id)
assert player.cards[0].face_up is False # Initially face down
self.game.draw_card("p1", "deck")
self.game.swap_card("p1", 0)
self.game.draw_card(self.current_player_id, "deck")
self.game.swap_card(self.current_player_id, 0)
assert player.cards[0].face_up is True
def test_cannot_peek_before_swap(self):
"""Face-down cards stay hidden until swapped/revealed."""
player = self.game.get_player("p1")
player = self.game.get_player(self.current_player_id)
# Card is face down
assert player.cards[0].face_up is False
# to_client_dict hides face-down card details from clients
@@ -274,38 +276,42 @@ class TestTurnFlow:
self.game.add_player(Player(id="p3", name="Player 3"))
# Skip initial flip phase
self.game.start_game(options=GameOptions(initial_flips=0))
# With dealer rotation (V3_01): dealer=p1(idx 0), first player=p2(idx 1)
def test_turn_advances_after_discard(self):
"""Turn advances to next player after discarding."""
assert self.game.current_player().id == "p1"
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
# First player after dealer is p2
assert self.game.current_player().id == "p2"
def test_turn_advances_after_swap(self):
"""Turn advances to next player after swapping."""
assert self.game.current_player().id == "p1"
self.game.draw_card("p1", "deck")
self.game.swap_card("p1", 0)
assert self.game.current_player().id == "p2"
def test_turn_wraps_around(self):
"""Turn wraps from last player to first."""
# Complete turns for p1 and p2
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
self.game.draw_card("p2", "deck")
self.game.discard_drawn("p2")
assert self.game.current_player().id == "p3"
def test_turn_advances_after_swap(self):
"""Turn advances to next player after swapping."""
assert self.game.current_player().id == "p2"
self.game.draw_card("p2", "deck")
self.game.swap_card("p2", 0)
assert self.game.current_player().id == "p3"
def test_turn_wraps_around(self):
"""Turn wraps from last player to first."""
# Order is: p2 -> p3 -> p1 -> p2 (wraps)
# Complete turns for p2 and p3
self.game.draw_card("p2", "deck")
self.game.discard_drawn("p2")
self.game.draw_card("p3", "deck")
self.game.discard_drawn("p3")
assert self.game.current_player().id == "p1" # Wrapped
assert self.game.current_player().id == "p1"
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
assert self.game.current_player().id == "p2" # Wrapped
def test_only_current_player_can_act(self):
"""Only current player can draw."""
assert self.game.current_player().id == "p1"
card = self.game.draw_card("p2", "deck") # Wrong player
# First player is p2 (after dealer p1)
assert self.game.current_player().id == "p2"
card = self.game.draw_card("p1", "deck") # Wrong player (dealer can't go first)
assert card is None
@@ -321,6 +327,7 @@ class TestRoundEnd:
self.game.add_player(Player(id="p1", name="Player 1"))
self.game.add_player(Player(id="p2", name="Player 2"))
self.game.start_game(options=GameOptions(initial_flips=0))
# With dealer rotation: dealer=p1(idx 0), first player=p2(idx 1)
def reveal_all_cards(self, player_id: str):
"""Helper to flip all cards for a player."""
@@ -330,61 +337,64 @@ class TestRoundEnd:
def test_revealing_all_triggers_final_turn(self):
"""When a player reveals all cards, final turn phase begins."""
# Reveal 5 cards for p1
player = self.game.get_player("p1")
# First player is p2 (after dealer p1)
# Reveal 5 cards for p2
player = self.game.get_player("p2")
for i in range(5):
player.cards[i].face_up = True
assert self.game.phase == GamePhase.PLAYING
# Draw and swap into last face-down position
self.game.draw_card("p1", "deck")
self.game.swap_card("p1", 5) # Last card
self.game.draw_card("p2", "deck")
self.game.swap_card("p2", 5) # Last card
assert self.game.phase == GamePhase.FINAL_TURN
assert self.game.finisher_id == "p1"
assert self.game.finisher_id == "p2"
def test_other_players_get_final_turn(self):
"""After one player finishes, others each get one more turn."""
# P1 reveals all
self.reveal_all_cards("p1")
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
assert self.game.phase == GamePhase.FINAL_TURN
assert self.game.current_player().id == "p2"
# P2 takes final turn
# First player is p2, they reveal all
self.reveal_all_cards("p2")
self.game.draw_card("p2", "deck")
self.game.discard_drawn("p2")
assert self.game.phase == GamePhase.FINAL_TURN
assert self.game.current_player().id == "p1"
# P1 takes final turn
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
# Round should be over
assert self.game.phase == GamePhase.ROUND_OVER
def test_finisher_does_not_get_extra_turn(self):
"""The player who went out doesn't get another turn."""
# P1 reveals all and triggers final turn
self.reveal_all_cards("p1")
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
# P2's turn
assert self.game.current_player().id == "p2"
# p2 goes first, reveals all and triggers final turn
self.reveal_all_cards("p2")
self.game.draw_card("p2", "deck")
self.game.discard_drawn("p2")
# Should be round over, not p1's turn again
# P1's turn (the other player)
assert self.game.current_player().id == "p1"
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
# Should be round over, not p2's turn again
assert self.game.phase == GamePhase.ROUND_OVER
def test_all_cards_revealed_at_round_end(self):
"""At round end, all cards are revealed."""
self.reveal_all_cards("p1")
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
# p2 goes first, reveals all
self.reveal_all_cards("p2")
self.game.draw_card("p2", "deck")
self.game.discard_drawn("p2")
# p1 takes final turn
self.game.draw_card("p1", "deck")
self.game.discard_drawn("p1")
assert self.game.phase == GamePhase.ROUND_OVER
# All cards should be face up now
@@ -536,9 +546,11 @@ class TestEdgeCases:
game.add_player(Player(id="p1", name="Player 1"))
game.add_player(Player(id="p2", name="Player 2"))
game.start_game(options=GameOptions(initial_flips=0))
# First player is p2 after dealer rotation
current_id = game.current_player().id
game.draw_card("p1", "deck")
second_draw = game.draw_card("p1", "deck")
game.draw_card(current_id, "deck")
second_draw = game.draw_card(current_id, "deck")
assert second_draw is None
def test_swap_position_bounds(self):
@@ -547,16 +559,17 @@ class TestEdgeCases:
game.add_player(Player(id="p1", name="Player 1"))
game.add_player(Player(id="p2", name="Player 2"))
game.start_game(options=GameOptions(initial_flips=0))
current_id = game.current_player().id
game.draw_card("p1", "deck")
game.draw_card(current_id, "deck")
result = game.swap_card("p1", -1)
result = game.swap_card(current_id, -1)
assert result is None
result = game.swap_card("p1", 6)
result = game.swap_card(current_id, 6)
assert result is None
result = game.swap_card("p1", 3) # Valid
result = game.swap_card(current_id, 3) # Valid
assert result is not None
def test_empty_discard_pile(self):
@@ -565,11 +578,12 @@ class TestEdgeCases:
game.add_player(Player(id="p1", name="Player 1"))
game.add_player(Player(id="p2", name="Player 2"))
game.start_game(options=GameOptions(initial_flips=0))
current_id = game.current_player().id
# Clear discard pile (normally has 1 card)
game.discard_pile = []
card = game.draw_card("p1", "discard")
card = game.draw_card(current_id, "discard")
assert card is None

293
server/test_handlers.py Normal file
View File

@@ -0,0 +1,293 @@
"""
Test suite for WebSocket message handlers.
Tests handler basic flows and validation using mock WebSocket/Room.
Run with: pytest test_handlers.py -v
"""
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from game import Game, GamePhase, GameOptions, Player
from room import Room, RoomPlayer, RoomManager
from handlers import (
ConnectionContext,
handle_create_room,
handle_join_room,
handle_draw,
handle_swap,
handle_discard,
)
# =============================================================================
# Mock helpers
# =============================================================================
class MockWebSocket:
"""Mock WebSocket that collects sent messages."""
def __init__(self):
self.messages: list[dict] = []
async def send_json(self, data: dict):
self.messages.append(data)
def last_message(self) -> dict:
return self.messages[-1] if self.messages else {}
def messages_of_type(self, msg_type: str) -> list[dict]:
return [m for m in self.messages if m.get("type") == msg_type]
def make_ctx(websocket=None, player_id="test_player", room=None):
"""Create a ConnectionContext with sensible defaults."""
ws = websocket or MockWebSocket()
return ConnectionContext(
websocket=ws,
connection_id="conn_123",
player_id=player_id,
auth_user_id=None,
authenticated_user=None,
current_room=room,
)
def make_room_manager():
"""Create a RoomManager for testing."""
return RoomManager()
def make_room_with_game(num_players=2):
"""Create a Room with players and a game in PLAYING phase."""
room = Room(code="TEST")
for i in range(num_players):
ws = MockWebSocket()
room.add_player(f"p{i}", f"Player {i}", ws)
room.game.start_game(num_decks=1, num_rounds=1, options=GameOptions(initial_flips=0))
# Skip initial flip phase
for p in room.game.players:
room.game.flip_initial_cards(p.id, [0, 1])
return room
# =============================================================================
# Lobby handlers
# =============================================================================
class TestHandleCreateRoom:
@pytest.mark.asyncio
async def test_creates_room(self):
ws = MockWebSocket()
ctx = make_ctx(websocket=ws)
rm = make_room_manager()
await handle_create_room(
{"player_name": "Alice"},
ctx,
room_manager=rm,
count_user_games=lambda uid: 0,
max_concurrent=5,
)
assert ctx.current_room is not None
assert len(rm.rooms) == 1
assert ws.messages_of_type("room_created")
@pytest.mark.asyncio
async def test_max_concurrent_rejects(self):
ws = MockWebSocket()
ctx = make_ctx(websocket=ws)
ctx.auth_user_id = "user1"
rm = make_room_manager()
await handle_create_room(
{"player_name": "Alice"},
ctx,
room_manager=rm,
count_user_games=lambda uid: 5,
max_concurrent=5,
)
assert ctx.current_room is None
assert ws.messages_of_type("error")
class TestHandleJoinRoom:
@pytest.mark.asyncio
async def test_join_existing_room(self):
rm = make_room_manager()
room = rm.create_room()
host_ws = MockWebSocket()
room.add_player("host", "Host", host_ws)
ws = MockWebSocket()
ctx = make_ctx(websocket=ws, player_id="joiner")
await handle_join_room(
{"room_code": room.code, "player_name": "Bob"},
ctx,
room_manager=rm,
count_user_games=lambda uid: 0,
max_concurrent=5,
)
assert ctx.current_room is room
assert ws.messages_of_type("room_joined")
assert len(room.players) == 2
@pytest.mark.asyncio
async def test_join_nonexistent_room(self):
rm = make_room_manager()
ws = MockWebSocket()
ctx = make_ctx(websocket=ws)
await handle_join_room(
{"room_code": "ZZZZ", "player_name": "Bob"},
ctx,
room_manager=rm,
count_user_games=lambda uid: 0,
max_concurrent=5,
)
assert ctx.current_room is None
assert ws.messages_of_type("error")
assert "not found" in ws.last_message().get("message", "").lower()
@pytest.mark.asyncio
async def test_join_full_room(self):
rm = make_room_manager()
room = rm.create_room()
for i in range(6):
room.add_player(f"p{i}", f"Player {i}", MockWebSocket())
ws = MockWebSocket()
ctx = make_ctx(websocket=ws, player_id="extra")
await handle_join_room(
{"room_code": room.code, "player_name": "Extra"},
ctx,
room_manager=rm,
count_user_games=lambda uid: 0,
max_concurrent=5,
)
assert ws.messages_of_type("error")
assert "full" in ws.last_message().get("message", "").lower()
@pytest.mark.asyncio
async def test_join_in_progress_game(self):
rm = make_room_manager()
room = rm.create_room()
room.add_player("host", "Host", MockWebSocket())
room.add_player("p2", "Player 2", MockWebSocket())
room.game.start_game(1, 1, GameOptions(initial_flips=0))
ws = MockWebSocket()
ctx = make_ctx(websocket=ws, player_id="late")
await handle_join_room(
{"room_code": room.code, "player_name": "Late"},
ctx,
room_manager=rm,
count_user_games=lambda uid: 0,
max_concurrent=5,
)
assert ws.messages_of_type("error")
assert "in progress" in ws.last_message().get("message", "").lower()
# =============================================================================
# Turn action handlers
# =============================================================================
class TestHandleDraw:
@pytest.mark.asyncio
async def test_draw_from_deck(self):
room = make_room_with_game()
current_pid = room.game.players[room.game.current_player_index].id
ws = room.players[current_pid].websocket
ctx = make_ctx(websocket=ws, player_id=current_pid, room=room)
broadcast = AsyncMock()
await handle_draw(
{"source": "deck"},
ctx,
broadcast_game_state=broadcast,
)
assert ws.messages_of_type("card_drawn")
broadcast.assert_called_once()
@pytest.mark.asyncio
async def test_draw_no_room(self):
ws = MockWebSocket()
ctx = make_ctx(websocket=ws, room=None)
broadcast = AsyncMock()
await handle_draw(
{"source": "deck"},
ctx,
broadcast_game_state=broadcast,
)
assert len(ws.messages) == 0
broadcast.assert_not_called()
class TestHandleSwap:
@pytest.mark.asyncio
async def test_swap_card(self):
room = make_room_with_game()
current_pid = room.game.players[room.game.current_player_index].id
ws = room.players[current_pid].websocket
# Draw a card first
room.game.draw_card(current_pid, "deck")
ctx = make_ctx(websocket=ws, player_id=current_pid, room=room)
broadcast = AsyncMock()
check_cpu = AsyncMock()
await handle_swap(
{"position": 0},
ctx,
broadcast_game_state=broadcast,
check_and_run_cpu_turn=check_cpu,
)
broadcast.assert_called_once()
class TestHandleDiscard:
@pytest.mark.asyncio
async def test_discard_drawn_card(self):
room = make_room_with_game()
current_pid = room.game.players[room.game.current_player_index].id
ws = room.players[current_pid].websocket
room.game.draw_card(current_pid, "deck")
ctx = make_ctx(websocket=ws, player_id=current_pid, room=room)
broadcast = AsyncMock()
check_cpu = AsyncMock()
await handle_discard(
{},
ctx,
broadcast_game_state=broadcast,
check_and_run_cpu_turn=check_cpu,
)
broadcast.assert_called_once()

317
server/test_room.py Normal file
View File

@@ -0,0 +1,317 @@
"""
Test suite for Room and RoomManager CRUD operations.
Covers:
- Room creation and uniqueness
- Player add/remove with host reassignment
- CPU player management
- Case-insensitive room lookup
- Cross-room player search
- Message broadcast and send_to
Run with: pytest test_room.py -v
"""
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from room import Room, RoomPlayer, RoomManager
# =============================================================================
# Mock helpers
# =============================================================================
class MockWebSocket:
"""Mock WebSocket that collects sent messages."""
def __init__(self):
self.messages: list[dict] = []
async def send_json(self, data: dict):
self.messages.append(data)
# =============================================================================
# RoomManager tests
# =============================================================================
class TestRoomManagerCreate:
def test_create_room_returns_room(self):
rm = RoomManager()
room = rm.create_room()
assert room is not None
assert len(room.code) == 4
assert room.code in rm.rooms
def test_create_multiple_rooms_unique_codes(self):
rm = RoomManager()
codes = set()
for _ in range(20):
room = rm.create_room()
codes.add(room.code)
assert len(codes) == 20
def test_remove_room(self):
rm = RoomManager()
room = rm.create_room()
code = room.code
rm.remove_room(code)
assert code not in rm.rooms
def test_remove_nonexistent_room(self):
rm = RoomManager()
rm.remove_room("ZZZZ") # Should not raise
class TestRoomManagerLookup:
def test_get_room_case_insensitive(self):
rm = RoomManager()
room = rm.create_room()
code = room.code
assert rm.get_room(code.lower()) is room
assert rm.get_room(code.upper()) is room
def test_get_room_not_found(self):
rm = RoomManager()
assert rm.get_room("ZZZZ") is None
def test_find_player_room(self):
rm = RoomManager()
room = rm.create_room()
ws = MockWebSocket()
room.add_player("player1", "Alice", ws)
found = rm.find_player_room("player1")
assert found is room
def test_find_player_room_not_found(self):
rm = RoomManager()
rm.create_room()
assert rm.find_player_room("nobody") is None
def test_find_player_room_cross_room(self):
rm = RoomManager()
room1 = rm.create_room()
room2 = rm.create_room()
room1.add_player("p1", "Alice", MockWebSocket())
room2.add_player("p2", "Bob", MockWebSocket())
assert rm.find_player_room("p1") is room1
assert rm.find_player_room("p2") is room2
# =============================================================================
# Room player management
# =============================================================================
class TestRoomPlayers:
def test_add_player_first_is_host(self):
room = Room(code="TEST")
ws = MockWebSocket()
rp = room.add_player("p1", "Alice", ws)
assert rp.is_host is True
def test_add_player_second_is_not_host(self):
room = Room(code="TEST")
room.add_player("p1", "Alice", MockWebSocket())
rp2 = room.add_player("p2", "Bob", MockWebSocket())
assert rp2.is_host is False
def test_remove_player(self):
room = Room(code="TEST")
room.add_player("p1", "Alice", MockWebSocket())
removed = room.remove_player("p1")
assert removed is not None
assert removed.id == "p1"
assert "p1" not in room.players
def test_remove_nonexistent_player(self):
room = Room(code="TEST")
result = room.remove_player("nobody")
assert result is None
def test_host_reassignment_on_remove(self):
room = Room(code="TEST")
room.add_player("p1", "Alice", MockWebSocket())
room.add_player("p2", "Bob", MockWebSocket())
room.remove_player("p1")
assert room.players["p2"].is_host is True
def test_get_player(self):
room = Room(code="TEST")
room.add_player("p1", "Alice", MockWebSocket())
assert room.get_player("p1") is not None
assert room.get_player("p1").name == "Alice"
assert room.get_player("nobody") is None
def test_is_empty(self):
room = Room(code="TEST")
assert room.is_empty() is True
room.add_player("p1", "Alice", MockWebSocket())
assert room.is_empty() is False
def test_player_list(self):
room = Room(code="TEST")
room.add_player("p1", "Alice", MockWebSocket())
room.add_player("p2", "Bob", MockWebSocket())
plist = room.player_list()
assert len(plist) == 2
assert plist[0]["name"] == "Alice"
assert plist[0]["is_host"] is True
assert plist[1]["is_cpu"] is False
def test_human_player_count(self):
room = Room(code="TEST")
room.add_player("p1", "Alice", MockWebSocket())
assert room.human_player_count() == 1
def test_auth_user_id_stored(self):
room = Room(code="TEST")
rp = room.add_player("p1", "Alice", MockWebSocket(), auth_user_id="auth_123")
assert rp.auth_user_id == "auth_123"
# =============================================================================
# CPU player management
# =============================================================================
class TestCPUPlayers:
def test_add_cpu_player(self):
room = Room(code="TEST")
room.add_player("host", "Host", MockWebSocket())
with patch("room.assign_profile") as mock_assign:
from ai import CPUProfile
mock_assign.return_value = CPUProfile(
name="TestBot", style="balanced",
pair_hope=0.5, aggression=0.5,
swap_threshold=4, unpredictability=0.1,
)
rp = room.add_cpu_player("cpu_1")
assert rp is not None
assert rp.is_cpu is True
assert rp.name == "TestBot"
def test_add_cpu_player_no_profile(self):
room = Room(code="TEST")
room.add_player("host", "Host", MockWebSocket())
with patch("room.assign_profile", return_value=None):
rp = room.add_cpu_player("cpu_1")
assert rp is None
def test_get_cpu_players(self):
room = Room(code="TEST")
room.add_player("host", "Host", MockWebSocket())
with patch("room.assign_profile") as mock_assign:
from ai import CPUProfile
mock_assign.return_value = CPUProfile(
name="Bot", style="balanced",
pair_hope=0.5, aggression=0.5,
swap_threshold=4, unpredictability=0.1,
)
room.add_cpu_player("cpu_1")
cpus = room.get_cpu_players()
assert len(cpus) == 1
assert cpus[0].is_cpu is True
def test_remove_cpu_releases_profile(self):
room = Room(code="TEST")
room.add_player("host", "Host", MockWebSocket())
with patch("room.assign_profile") as mock_assign:
from ai import CPUProfile
mock_assign.return_value = CPUProfile(
name="Bot", style="balanced",
pair_hope=0.5, aggression=0.5,
swap_threshold=4, unpredictability=0.1,
)
room.add_cpu_player("cpu_1")
with patch("room.release_profile") as mock_release:
room.remove_player("cpu_1")
mock_release.assert_called_once_with("Bot", "TEST")
# =============================================================================
# Broadcast / send_to
# =============================================================================
class TestMessaging:
@pytest.mark.asyncio
async def test_broadcast_to_all_humans(self):
room = Room(code="TEST")
ws1 = MockWebSocket()
ws2 = MockWebSocket()
room.add_player("p1", "Alice", ws1)
room.add_player("p2", "Bob", ws2)
await room.broadcast({"type": "test_msg"})
assert len(ws1.messages) == 1
assert len(ws2.messages) == 1
assert ws1.messages[0]["type"] == "test_msg"
@pytest.mark.asyncio
async def test_broadcast_excludes_player(self):
room = Room(code="TEST")
ws1 = MockWebSocket()
ws2 = MockWebSocket()
room.add_player("p1", "Alice", ws1)
room.add_player("p2", "Bob", ws2)
await room.broadcast({"type": "test_msg"}, exclude="p1")
assert len(ws1.messages) == 0
assert len(ws2.messages) == 1
@pytest.mark.asyncio
async def test_broadcast_skips_cpu(self):
room = Room(code="TEST")
ws1 = MockWebSocket()
room.add_player("p1", "Alice", ws1)
# Add a CPU player manually (no websocket)
room.players["cpu_1"] = RoomPlayer(
id="cpu_1", name="Bot", websocket=None, is_cpu=True
)
await room.broadcast({"type": "test_msg"})
assert len(ws1.messages) == 1
# CPU has no websocket, no error
@pytest.mark.asyncio
async def test_send_to_specific_player(self):
room = Room(code="TEST")
ws1 = MockWebSocket()
ws2 = MockWebSocket()
room.add_player("p1", "Alice", ws1)
room.add_player("p2", "Bob", ws2)
await room.send_to("p1", {"type": "private_msg"})
assert len(ws1.messages) == 1
assert len(ws2.messages) == 0
@pytest.mark.asyncio
async def test_send_to_nonexistent_player(self):
room = Room(code="TEST")
await room.send_to("nobody", {"type": "test"}) # Should not raise
@pytest.mark.asyncio
async def test_send_to_cpu_is_noop(self):
room = Room(code="TEST")
room.players["cpu_1"] = RoomPlayer(
id="cpu_1", name="Bot", websocket=None, is_cpu=True
)
await room.send_to("cpu_1", {"type": "test"}) # Should not raise

318
server/test_v3_features.py Normal file
View File

@@ -0,0 +1,318 @@
"""
Test suite for V3 features in 6-Card Golf.
Covers:
- V3_01: Dealer rotation
- V3_03/V3_05: Finisher tracking, knock penalty/bonus
- V3_09: Knock early
Run with: pytest test_v3_features.py -v
"""
import pytest
from game import (
Card, Deck, Player, Game, GamePhase, GameOptions,
Suit, Rank, RANK_VALUES
)
# =============================================================================
# Helper: create a game with N players in PLAYING phase
# =============================================================================
def make_game(num_players=2, options=None, rounds=1):
"""Create a game with N players, dealt and in PLAYING phase."""
opts = options or GameOptions()
game = Game(num_rounds=rounds, options=opts)
for i in range(num_players):
game.add_player(Player(id=f"p{i}", name=f"Player {i}"))
game.start_round()
# Force into PLAYING phase (skip initial flip)
if game.phase == GamePhase.INITIAL_FLIP:
for p in game.players:
game.flip_initial_cards(p.id, [0, 1])
return game
def flip_all_but(player, keep_down=0):
"""Flip all cards face-up except `keep_down` cards."""
for i, card in enumerate(player.cards):
if i < len(player.cards) - keep_down:
card.face_up = True
else:
card.face_up = False
def set_hand(player, ranks):
"""Set player hand to specific ranks (all hearts, all face-up)."""
player.cards = [
Card(Suit.HEARTS, rank, face_up=True) for rank in ranks
]
# =============================================================================
# V3_01: Dealer Rotation
# =============================================================================
class TestDealerRotation:
"""Verify dealer rotates each round and first player is after dealer."""
def test_initial_dealer_is_zero(self):
game = make_game(3)
assert game.dealer_idx == 0
def test_first_player_is_after_dealer(self):
game = make_game(3)
# Dealer is 0, first player should be 1
assert game.current_player_index == 1
def test_dealer_rotates_after_round(self):
game = make_game(3, rounds=3)
assert game.dealer_idx == 0
# End the round by having a player flip all cards
player = game.players[game.current_player_index]
for card in player.cards:
card.face_up = True
game.finisher_id = player.id
game.phase = GamePhase.FINAL_TURN
# Give remaining players their final turns
game.players_with_final_turn = {p.id for p in game.players}
game._end_round()
# Start next round
game.start_next_round()
assert game.dealer_idx == 1
# First player should be after new dealer
assert game.current_player_index == 2
def test_dealer_wraps_around(self):
game = make_game(3, rounds=4)
# Simulate 3 rounds to wrap dealer
for expected_dealer in [0, 1, 2]:
assert game.dealer_idx == expected_dealer
# Force round end
player = game.players[game.current_player_index]
for card in player.cards:
card.face_up = True
game.finisher_id = player.id
game.phase = GamePhase.FINAL_TURN
game.players_with_final_turn = {p.id for p in game.players}
game._end_round()
game.start_next_round()
# After 3 rotations with 3 players, wraps back to 0
assert game.dealer_idx == 0
def test_dealer_in_state_dict(self):
game = make_game(3)
state = game.get_state("p0")
assert "dealer_id" in state
assert "dealer_idx" in state
assert state["dealer_id"] == "p0"
assert state["dealer_idx"] == 0
# =============================================================================
# V3_03/V3_05: Finisher Tracking + Knock Penalty/Bonus
# =============================================================================
class TestFinisherTracking:
"""Verify finisher_id is set and penalties/bonuses apply."""
def test_finisher_id_initially_none(self):
game = make_game(2)
assert game.finisher_id is None
def test_finisher_set_when_all_flipped(self):
game = make_game(2)
# Get current player and flip all their cards
player = game.players[game.current_player_index]
for card in player.cards:
card.face_up = True
# Draw and discard to trigger _check_end_turn
card = game.deck.draw()
if card:
game.drawn_card = card
game.discard_drawn(player.id)
assert game.finisher_id == player.id
assert game.phase == GamePhase.FINAL_TURN
def test_finisher_in_state_dict(self):
game = make_game(2)
game.finisher_id = "p0"
state = game.get_state("p0")
assert state["finisher_id"] == "p0"
def test_knock_penalty_applied(self):
"""Finisher gets +10 if they don't have the lowest score."""
opts = GameOptions(knock_penalty=True, initial_flips=0)
game = make_game(2, options=opts)
# Set hands with different ranks per column to avoid column pairing
# Layout: [0][1][2] / [3][4][5], columns: (0,3),(1,4),(2,5)
set_hand(game.players[0], [Rank.TEN, Rank.NINE, Rank.EIGHT,
Rank.SEVEN, Rank.SIX, Rank.FIVE]) # 10+9+8+7+6+5 = 45
set_hand(game.players[1], [Rank.ACE, Rank.THREE, Rank.FOUR,
Rank.TWO, Rank.KING, Rank.ACE]) # 1+3+4+(-2)+0+1 = 7
game.finisher_id = "p0"
game.phase = GamePhase.FINAL_TURN
game.players_with_final_turn = {"p0", "p1"}
game._end_round()
# p0 had score 45, gets +10 penalty = 55
assert game.players[0].score == 55
# p1 unaffected
assert game.players[1].score == 7
def test_knock_bonus_applied(self):
"""Finisher gets -5 bonus."""
opts = GameOptions(knock_bonus=True, initial_flips=0)
game = make_game(2, options=opts)
# Different ranks per column to avoid pairing
set_hand(game.players[0], [Rank.ACE, Rank.THREE, Rank.FOUR,
Rank.TWO, Rank.KING, Rank.ACE]) # 1+3+4+(-2)+0+1 = 7
set_hand(game.players[1], [Rank.TEN, Rank.NINE, Rank.EIGHT,
Rank.SEVEN, Rank.SIX, Rank.FIVE]) # 10+9+8+7+6+5 = 45
game.finisher_id = "p0"
game.phase = GamePhase.FINAL_TURN
game.players_with_final_turn = {"p0", "p1"}
game._end_round()
# p0 gets -5 bonus: 7 - 5 = 2
assert game.players[0].score == 2
assert game.players[1].score == 45
# =============================================================================
# V3_09: Knock Early
# =============================================================================
class TestKnockEarly:
"""Verify knock_early house rule mechanics."""
def test_knock_early_disabled_by_default(self):
opts = GameOptions()
assert opts.knock_early is False
def test_knock_early_requires_option(self):
game = make_game(2)
player = game.players[game.current_player_index]
# Flip 4 cards, leave 2 face-down
for i in range(4):
player.cards[i].face_up = True
result = game.knock_early(player.id)
assert result is False
def test_knock_early_with_option_enabled(self):
opts = GameOptions(knock_early=True, initial_flips=0)
game = make_game(2, options=opts)
player = game.players[game.current_player_index]
# Flip 4 cards, leave 2 face-down
for i in range(4):
player.cards[i].face_up = True
for i in range(4, 6):
player.cards[i].face_up = False
result = game.knock_early(player.id)
assert result is True
def test_knock_early_requires_face_up_cards(self):
"""Must have at least 4 face-up (at most 2 face-down) to knock."""
opts = GameOptions(knock_early=True, initial_flips=0)
game = make_game(2, options=opts)
player = game.players[game.current_player_index]
# Only 3 face-up, 3 face-down — too many hidden
for i in range(3):
player.cards[i].face_up = True
for i in range(3, 6):
player.cards[i].face_up = False
result = game.knock_early(player.id)
assert result is False
def test_knock_early_triggers_final_turn(self):
opts = GameOptions(knock_early=True, initial_flips=0)
game = make_game(2, options=opts)
player = game.players[game.current_player_index]
flip_all_but(player, keep_down=2)
game.knock_early(player.id)
assert game.phase == GamePhase.FINAL_TURN
def test_knock_early_sets_finisher(self):
opts = GameOptions(knock_early=True, initial_flips=0)
game = make_game(2, options=opts)
player = game.players[game.current_player_index]
flip_all_but(player, keep_down=1)
game.knock_early(player.id)
assert game.finisher_id == player.id
def test_knock_early_not_during_initial_flip(self):
"""Knock early should fail during initial flip phase."""
opts = GameOptions(knock_early=True, initial_flips=2)
game = Game(num_rounds=1, options=opts)
game.add_player(Player(id="p0", name="Player 0"))
game.add_player(Player(id="p1", name="Player 1"))
game.start_round()
# Should be in INITIAL_FLIP
assert game.phase == GamePhase.INITIAL_FLIP
player = game.players[0]
flip_all_but(player, keep_down=2)
result = game.knock_early(player.id)
assert result is False
def test_knock_early_fails_with_drawn_card(self):
"""Can't knock if you've already drawn a card."""
opts = GameOptions(knock_early=True, initial_flips=0)
game = make_game(2, options=opts)
player = game.players[game.current_player_index]
flip_all_but(player, keep_down=2)
# Simulate having drawn a card
game.drawn_card = Card(Suit.HEARTS, Rank.ACE)
result = game.knock_early(player.id)
assert result is False
def test_knock_early_fails_all_face_up(self):
"""Can't knock early if all cards are already face-up (0 face-down)."""
opts = GameOptions(knock_early=True, initial_flips=0)
game = make_game(2, options=opts)
player = game.players[game.current_player_index]
for card in player.cards:
card.face_up = True
result = game.knock_early(player.id)
assert result is False
def test_knock_early_in_state_dict(self):
opts = GameOptions(knock_early=True)
game = make_game(2, options=opts)
state = game.get_state("p0")
assert state["knock_early"] is True
def test_knock_early_active_rules(self):
"""Knock early should appear in active_rules list."""
opts = GameOptions(knock_early=True)
game = make_game(2, options=opts)
state = game.get_state("p0")
assert "Early Knock" in state["active_rules"]