Version 2.0.0: Animation fixes, timing improvements, and E2E test suite

Animation fixes:
- Fix held card positioning bug (was appearing at bottom of page)
- Fix discard pile blank/white flash on turn transitions
- Fix blank card at round end by skipping animations during round_over/game_over
- Set card content before triggering flip animation to prevent flash
- Center suit symbol on 10 cards

Timing improvements:
- Reduce post-discard delay from 700ms to 500ms
- Reduce post-swap delay from 1800ms to 1000ms
- Speed up swap flip animation from 1150ms to 550ms
- Reduce CPU initial thinking delay from 150-250ms to 80-150ms
- Pause now happens after swap completes (showing result) instead of before

E2E test suite:
- Add Playwright-based test bot that plays full games
- State parser extracts game state from DOM for validation
- AI brain ports decision logic for automated play
- Freeze detector monitors for UI hangs
- Visual validator checks CSS states
- Full game, stress, and visual test specs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Aaron D. Lee
2026-01-29 18:33:28 -05:00
parent 724bf87c43
commit 6950769bc3
29 changed files with 5153 additions and 348 deletions

View File

@@ -47,15 +47,74 @@ def can_make_pair(card1: Card, card2: Card) -> bool:
return card1.rank == card2.rank
def estimate_opponent_min_score(player: Player, game: Game) -> int:
"""Estimate minimum opponent score from visible cards."""
def get_discard_thinking_time(card: Optional[Card], options: GameOptions) -> float:
"""Calculate CPU 'thinking time' based on how obvious the discard decision is.
Easy decisions (obviously good or bad cards) = quick (400-600ms)
Hard decisions (medium value cards) = slower (900-1100ms)
Returns time in seconds.
"""
if not card:
# No discard available - quick decision to draw from deck
return random.uniform(0.4, 0.5)
value = get_card_value(card, options)
# Obviously good cards (easy take): 2 (-2), Joker (-2/-5), K (0), A (1)
if value <= 1:
return random.uniform(0.4, 0.6)
# Obviously bad cards (easy pass): 10, J, Q (value 10)
if value >= 10:
return random.uniform(0.4, 0.6)
# Medium cards require more thought: 3-9
# 5, 6, 7 are the hardest decisions (middle of the range)
if value in (5, 6, 7):
return random.uniform(0.9, 1.1)
# 3, 4, 8, 9 - moderate difficulty
return random.uniform(0.6, 0.85)
def estimate_opponent_min_score(player: Player, game: Game, optimistic: bool = False) -> int:
"""Estimate minimum opponent score from visible cards.
Args:
player: The player making the estimation (excluded from opponents)
game: The game state
optimistic: If True, assume opponents' hidden cards are average (4.5).
If False, assume opponents could get lucky (lower estimate).
"""
min_est = 999
for p in game.players:
if p.id == player.id:
continue
visible = sum(get_ai_card_value(c, game.options) for c in p.cards if c.face_up)
hidden = sum(1 for c in p.cards if not c.face_up)
estimate = visible + int(hidden * 4.5) # Assume ~4.5 avg for hidden
if optimistic:
# Assume average hidden cards
estimate = visible + int(hidden * 4.5)
else:
# Assume opponents could get lucky - hidden cards might be low
# or could complete pairs, so use lower estimate
# Check for potential pairs in opponent's hand
pair_potential = 0
for col in range(3):
top, bot = p.cards[col], p.cards[col + 3]
# If one card is visible and the other is hidden, there's pair potential
if top.face_up and not bot.face_up:
pair_potential += get_ai_card_value(top, game.options)
elif bot.face_up and not top.face_up:
pair_potential += get_ai_card_value(bot, game.options)
# Conservative estimate: assume 2.5 avg for hidden (could be low cards)
# and subtract some pair potential (hidden cards might match visible)
base_estimate = visible + int(hidden * 2.5)
estimate = base_estimate - int(pair_potential * 0.25) # 25% chance of pair
min_est = min(min_est, estimate)
return min_est
@@ -365,60 +424,91 @@ CPU_PROFILES = [
),
]
# Track which profiles are in use
_used_profiles: set[str] = set()
_cpu_profiles: dict[str, CPUProfile] = {}
# Track profiles per room (room_code -> set of used profile names)
_room_used_profiles: dict[str, set[str]] = {}
# Track cpu_id -> (room_code, profile) mapping
_cpu_profiles: dict[str, tuple[str, CPUProfile]] = {}
def get_available_profile() -> Optional[CPUProfile]:
"""Get a random available CPU profile."""
available = [p for p in CPU_PROFILES if p.name not in _used_profiles]
def get_available_profile(room_code: str) -> Optional[CPUProfile]:
"""Get a random available CPU profile for a specific room."""
used_in_room = _room_used_profiles.get(room_code, set())
available = [p for p in CPU_PROFILES if p.name not in used_in_room]
if not available:
return None
profile = random.choice(available)
_used_profiles.add(profile.name)
if room_code not in _room_used_profiles:
_room_used_profiles[room_code] = set()
_room_used_profiles[room_code].add(profile.name)
return profile
def release_profile(name: str):
"""Release a CPU profile back to the pool."""
_used_profiles.discard(name)
# Also remove from cpu_profiles by finding the cpu_id with this profile
to_remove = [cpu_id for cpu_id, profile in _cpu_profiles.items() if profile.name == name]
def release_profile(name: str, room_code: str):
"""Release a CPU profile back to the room's pool."""
if room_code in _room_used_profiles:
_room_used_profiles[room_code].discard(name)
# Clean up empty room entries
if not _room_used_profiles[room_code]:
del _room_used_profiles[room_code]
# Also remove from cpu_profiles by finding the cpu_id with this profile in this room
to_remove = [
cpu_id for cpu_id, (rc, profile) in _cpu_profiles.items()
if profile.name == name and rc == room_code
]
for cpu_id in to_remove:
del _cpu_profiles[cpu_id]
def cleanup_room_profiles(room_code: str):
"""Clean up all profile tracking for a room when it's deleted."""
if room_code in _room_used_profiles:
del _room_used_profiles[room_code]
# Remove all cpu_profiles for this room
to_remove = [cpu_id for cpu_id, (rc, _) in _cpu_profiles.items() if rc == room_code]
for cpu_id in to_remove:
del _cpu_profiles[cpu_id]
def reset_all_profiles():
"""Reset all profile tracking (for cleanup)."""
_used_profiles.clear()
_room_used_profiles.clear()
_cpu_profiles.clear()
def get_profile(cpu_id: str) -> Optional[CPUProfile]:
"""Get the profile for a CPU player."""
return _cpu_profiles.get(cpu_id)
entry = _cpu_profiles.get(cpu_id)
return entry[1] if entry else None
def assign_profile(cpu_id: str) -> Optional[CPUProfile]:
"""Assign a random profile to a CPU player."""
profile = get_available_profile()
def assign_profile(cpu_id: str, room_code: str) -> Optional[CPUProfile]:
"""Assign a random profile to a CPU player in a specific room."""
profile = get_available_profile(room_code)
if profile:
_cpu_profiles[cpu_id] = profile
_cpu_profiles[cpu_id] = (room_code, profile)
return profile
def assign_specific_profile(cpu_id: str, profile_name: str) -> Optional[CPUProfile]:
"""Assign a specific profile to a CPU player by name."""
# Check if profile exists and is available
def assign_specific_profile(cpu_id: str, profile_name: str, room_code: str) -> Optional[CPUProfile]:
"""Assign a specific profile to a CPU player by name in a specific room."""
used_in_room = _room_used_profiles.get(room_code, set())
# Check if profile exists and is available in this room
for profile in CPU_PROFILES:
if profile.name == profile_name and profile.name not in _used_profiles:
_used_profiles.add(profile.name)
_cpu_profiles[cpu_id] = profile
if profile.name == profile_name and profile.name not in used_in_room:
if room_code not in _room_used_profiles:
_room_used_profiles[room_code] = set()
_room_used_profiles[room_code].add(profile.name)
_cpu_profiles[cpu_id] = (room_code, profile)
return profile
return None
def get_available_profiles(room_code: str) -> list[dict]:
"""Get available CPU profiles for a specific room."""
used_in_room = _room_used_profiles.get(room_code, set())
return [p.to_dict() for p in CPU_PROFILES if p.name not in used_in_room]
def get_all_profiles() -> list[dict]:
"""Get all CPU profiles for display."""
return [p.to_dict() for p in CPU_PROFILES]
@@ -1150,7 +1240,7 @@ class GolfAI:
# Knock Penalty (+10 if not lowest): Need to be confident we're lowest
if options.knock_penalty:
opponent_min = estimate_opponent_min_score(player, game)
opponent_min = estimate_opponent_min_score(player, game, optimistic=False)
# Conservative players require bigger lead
safety_margin = 5 if profile.aggression < 0.4 else 2
if estimated_score > opponent_min - safety_margin:
@@ -1174,8 +1264,37 @@ class GolfAI:
if options.underdog_bonus:
go_out_threshold -= 1
# HIGH SCORE CAUTION: When our score is >10, be extra careful
# Opponents' hidden cards could easily beat us with pairs or low cards
if estimated_score > 10:
# Get pessimistic estimate of opponent's potential score
opponent_min_pessimistic = estimate_opponent_min_score(player, game, optimistic=False)
opponent_min_optimistic = estimate_opponent_min_score(player, game, optimistic=True)
ai_log(f" High score caution: our score={estimated_score}, "
f"opponent estimates: optimistic={opponent_min_optimistic}, pessimistic={opponent_min_pessimistic}")
# If opponents could potentially beat us, reduce our willingness to go out
if opponent_min_pessimistic < estimated_score:
# Calculate how risky this is
risk_margin = estimated_score - opponent_min_pessimistic
# Reduce threshold based on risk (more risk = lower threshold)
risk_penalty = min(risk_margin, 8) # Cap at 8 point penalty
go_out_threshold -= risk_penalty
ai_log(f" Risk penalty: -{risk_penalty} (opponents could score {opponent_min_pessimistic})")
# Additional penalty for very high scores (>15) - almost never go out
if estimated_score > 15:
extra_penalty = (estimated_score - 15) * 2
go_out_threshold -= extra_penalty
ai_log(f" Very high score penalty: -{extra_penalty}")
ai_log(f" Go-out decision: score={estimated_score}, threshold={go_out_threshold}, "
f"aggression={profile.aggression:.2f}")
if estimated_score <= go_out_threshold:
if random.random() < profile.aggression:
ai_log(f" >> GOING OUT with score {estimated_score}")
return True
return False
@@ -1196,11 +1315,22 @@ async def process_cpu_turn(
# Get logger if game_id provided
logger = get_logger() if game_id else None
# Add delay based on unpredictability (chaotic players are faster/slower)
delay = 0.8 + random.uniform(0, 0.5)
# Brief initial delay before CPU "looks at" the discard pile
await asyncio.sleep(random.uniform(0.08, 0.15))
# "Thinking" delay based on how obvious the discard decision is
# Easy decisions (good/bad cards) are quick, medium cards take longer
discard_top = game.discard_top()
thinking_time = get_discard_thinking_time(discard_top, game.options)
# Adjust for personality - chaotic players have more variance
if profile.unpredictability > 0.2:
delay = random.uniform(0.3, 1.2)
await asyncio.sleep(delay)
thinking_time *= random.uniform(0.6, 1.4)
discard_str = f"{discard_top.rank.value}" if discard_top else "empty"
ai_log(f"{cpu_player.name} thinking for {thinking_time:.2f}s (discard: {discard_str})")
await asyncio.sleep(thinking_time)
ai_log(f"{cpu_player.name} done thinking, making decision")
# Check if we should try to go out early
GolfAI.should_go_out_early(cpu_player, game, profile)
@@ -1243,8 +1373,7 @@ async def process_cpu_turn(
await broadcast_callback()
return # Turn is over
# Decide whether to draw from discard or deck
discard_top = game.discard_top()
# Decide whether to draw from discard or deck (discard_top already fetched above)
take_discard = GolfAI.should_take_discard(discard_top, cpu_player, profile, game)
source = "discard" if take_discard else "deck"

View File

@@ -15,7 +15,7 @@ import redis.asyncio as redis
from config import config
from room import RoomManager, Room
from game import GamePhase, GameOptions
from ai import GolfAI, process_cpu_turn, get_all_profiles, reset_all_profiles
from ai import GolfAI, process_cpu_turn, get_all_profiles, reset_all_profiles, cleanup_room_profiles
from game_log import get_logger
# Import production components
@@ -407,12 +407,17 @@ async def require_admin(user: User = Depends(require_user)) -> User:
@app.get("/api/debug/cpu-profiles")
async def get_cpu_profile_status():
"""Get current CPU profile allocation status."""
from ai import _used_profiles, _cpu_profiles, CPU_PROFILES
from ai import _room_used_profiles, _cpu_profiles, CPU_PROFILES
return {
"total_profiles": len(CPU_PROFILES),
"used_count": len(_used_profiles),
"used_profiles": list(_used_profiles),
"cpu_mappings": {cpu_id: profile.name for cpu_id, profile in _cpu_profiles.items()},
"room_profiles": {
room_code: list(profiles)
for room_code, profiles in _room_used_profiles.items()
},
"cpu_mappings": {
cpu_id: {"room": room_code, "profile": profile.name}
for cpu_id, (room_code, profile) in _cpu_profiles.items()
},
"active_rooms": len(room_manager.rooms),
"rooms": {
code: {
@@ -431,6 +436,19 @@ async def reset_cpu_profiles():
return {"status": "ok", "message": "All CPU profiles reset"}
MAX_CONCURRENT_GAMES = 4
def count_user_games(user_id: str) -> int:
"""Count how many games this authenticated user is currently in."""
count = 0
for room in room_manager.rooms.values():
for player in room.players.values():
if player.auth_user_id == user_id:
count += 1
return count
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
@@ -444,13 +462,17 @@ async def websocket_endpoint(websocket: WebSocket):
except Exception as e:
logger.debug(f"WebSocket auth failed: {e}")
# Use authenticated user ID if available, otherwise generate random UUID
# Each connection gets a unique ID (allows multi-tab play)
connection_id = str(uuid.uuid4())
player_id = connection_id
# Track auth user separately for stats/limits (can be None)
auth_user_id = str(authenticated_user.id) if authenticated_user else None
if authenticated_user:
player_id = str(authenticated_user.id)
logger.debug(f"WebSocket authenticated as user {player_id}")
logger.debug(f"WebSocket authenticated as user {auth_user_id}, connection {connection_id}")
else:
player_id = str(uuid.uuid4())
logger.debug(f"WebSocket connected anonymously as {player_id}")
logger.debug(f"WebSocket connected anonymously as {connection_id}")
current_room: Room | None = None
@@ -460,12 +482,20 @@ async def websocket_endpoint(websocket: WebSocket):
msg_type = data.get("type")
if msg_type == "create_room":
# Check concurrent game limit for authenticated users
if auth_user_id and count_user_games(auth_user_id) >= MAX_CONCURRENT_GAMES:
await websocket.send_json({
"type": "error",
"message": f"Maximum {MAX_CONCURRENT_GAMES} concurrent games allowed",
})
continue
player_name = data.get("player_name", "Player")
# Use authenticated user's name if available
if authenticated_user and authenticated_user.display_name:
player_name = authenticated_user.display_name
room = room_manager.create_room()
room.add_player(player_id, player_name, websocket)
room.add_player(player_id, player_name, websocket, auth_user_id)
current_room = room
await websocket.send_json({
@@ -484,6 +514,14 @@ async def websocket_endpoint(websocket: WebSocket):
room_code = data.get("room_code", "").upper()
player_name = data.get("player_name", "Player")
# Check concurrent game limit for authenticated users
if auth_user_id and count_user_games(auth_user_id) >= MAX_CONCURRENT_GAMES:
await websocket.send_json({
"type": "error",
"message": f"Maximum {MAX_CONCURRENT_GAMES} concurrent games allowed",
})
continue
room = room_manager.get_room(room_code)
if not room:
await websocket.send_json({
@@ -509,7 +547,7 @@ async def websocket_endpoint(websocket: WebSocket):
# Use authenticated user's name if available
if authenticated_user and authenticated_user.display_name:
player_name = authenticated_user.display_name
room.add_player(player_id, player_name, websocket)
room.add_player(player_id, player_name, websocket, auth_user_id)
current_room = room
await websocket.send_json({
@@ -744,6 +782,9 @@ async def websocket_endpoint(websocket: WebSocket):
)
await broadcast_game_state(current_room)
# Let client swap animation complete (~550ms), then pause to show result
# Total 1.0s = 550ms animation + 450ms visible pause
await asyncio.sleep(1.0)
await check_and_run_cpu_turn(current_room)
elif msg_type == "discard":
@@ -782,9 +823,15 @@ async def websocket_endpoint(websocket: WebSocket):
"optional": current_room.game.flip_is_optional,
})
else:
# Let client animation complete before CPU turn
await asyncio.sleep(0.5)
await check_and_run_cpu_turn(current_room)
else:
# Turn ended, check for CPU
# Turn ended - let client animation complete before CPU turn
# (player discard swoop animation is ~500ms: 350ms swoop + 150ms settle)
logger.debug(f"Player discarded, waiting 0.5s before CPU turn")
await asyncio.sleep(0.5)
logger.debug(f"Post-discard delay complete, checking for CPU turn")
await check_and_run_cpu_turn(current_room)
elif msg_type == "cancel_draw":
@@ -954,9 +1001,11 @@ async def websocket_endpoint(websocket: WebSocket):
})
# Clean up the room
room_code = current_room.code
for cpu in list(current_room.get_cpu_players()):
current_room.remove_player(cpu.id)
room_manager.remove_room(current_room.code)
cleanup_room_profiles(room_code)
room_manager.remove_room(room_code)
current_room = None
except WebSocketDisconnect:
@@ -972,12 +1021,12 @@ async def _process_stats_safe(room: Room):
notifications while stats are being processed.
"""
try:
# Build mapping - for non-CPU players, the player_id is their user_id
# (assigned during authentication or as a session UUID)
# Build mapping - use auth_user_id for authenticated players
# Only authenticated players get their stats tracked
player_user_ids = {}
for player_id, room_player in room.players.items():
if not room_player.is_cpu:
player_user_ids[player_id] = player_id
if not room_player.is_cpu and room_player.auth_user_id:
player_user_ids[player_id] = room_player.auth_user_id
# Find winner
winner_id = None
@@ -1095,6 +1144,7 @@ async def check_and_run_cpu_turn(room: Room):
async def handle_player_leave(room: Room, player_id: str):
"""Handle a player leaving a room."""
room_code = room.code
room_player = room.remove_player(player_id)
# If no human players left, clean up the room entirely
@@ -1102,7 +1152,9 @@ async def handle_player_leave(room: Room, player_id: str):
# Remove all remaining CPU players to release their profiles
for cpu in list(room.get_cpu_players()):
room.remove_player(cpu.id)
room_manager.remove_room(room.code)
# Clean up any remaining profile tracking for this room
cleanup_room_profiles(room_code)
room_manager.remove_room(room_code)
elif room_player:
await room.broadcast({
"type": "player_left",

View File

@@ -19,7 +19,7 @@ from typing import Optional
from fastapi import WebSocket
from ai import assign_profile, assign_specific_profile, get_profile, release_profile
from ai import assign_profile, assign_specific_profile, get_profile, release_profile, cleanup_room_profiles
from game import Game, Player
@@ -33,11 +33,12 @@ class RoomPlayer:
in-game state like cards and scores.
Attributes:
id: Unique player identifier.
id: Unique player identifier (connection_id for multi-tab support).
name: Display name.
websocket: WebSocket connection (None for CPU players).
is_host: Whether this player controls game settings.
is_cpu: Whether this is an AI-controlled player.
auth_user_id: Authenticated user ID for stats/limits (None for guests).
"""
id: str
@@ -45,6 +46,7 @@ class RoomPlayer:
websocket: Optional[WebSocket] = None
is_host: bool = False
is_cpu: bool = False
auth_user_id: Optional[str] = None
@dataclass
@@ -73,6 +75,7 @@ class Room:
player_id: str,
name: str,
websocket: WebSocket,
auth_user_id: Optional[str] = None,
) -> RoomPlayer:
"""
Add a human player to the room.
@@ -80,9 +83,10 @@ class Room:
The first player to join becomes the host.
Args:
player_id: Unique identifier for the player.
player_id: Unique identifier for the player (connection_id).
name: Display name.
websocket: The player's WebSocket connection.
auth_user_id: Authenticated user ID for stats/limits (None for guests).
Returns:
The created RoomPlayer object.
@@ -93,6 +97,7 @@ class Room:
name=name,
websocket=websocket,
is_host=is_host,
auth_user_id=auth_user_id,
)
self.players[player_id] = room_player
@@ -117,9 +122,9 @@ class Room:
The created RoomPlayer, or None if profile unavailable.
"""
if profile_name:
profile = assign_specific_profile(cpu_id, profile_name)
profile = assign_specific_profile(cpu_id, profile_name, self.code)
else:
profile = assign_profile(cpu_id)
profile = assign_profile(cpu_id, self.code)
if not profile:
return None
@@ -157,9 +162,9 @@ class Room:
room_player = self.players.pop(player_id)
self.game.remove_player(player_id)
# Release CPU profile back to the pool
# Release CPU profile back to the room's pool
if room_player.is_cpu:
release_profile(room_player.name)
release_profile(room_player.name, self.code)
# Assign new host if needed
if room_player.is_host and self.players: