Features: - Multiplayer WebSocket game server (FastAPI) - 8 AI personalities with distinct play styles - 15+ house rule variants - SQLite game logging for AI analysis - Comprehensive test suite (80+ tests) AI improvements: - Fixed Maya bug (taking bad cards, discarding good ones) - Personality traits influence style without overriding competence - Zero blunders detected in 1000+ game simulations Testing infrastructure: - Game rules verification (test_game.py) - AI decision analysis (game_analyzer.py) - Score distribution analysis (score_analysis.py) - House rules testing (test_house_rules.py) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
436 lines
13 KiB
Python
436 lines
13 KiB
Python
"""
|
|
Golf AI Simulation Runner
|
|
|
|
Runs AI-vs-AI games to generate decision logs for analysis.
|
|
No server/websocket needed - runs games directly.
|
|
|
|
Usage:
|
|
python simulate.py [num_games] [num_players]
|
|
|
|
Examples:
|
|
python simulate.py 10 # Run 10 games with 4 players each
|
|
python simulate.py 50 2 # Run 50 games with 2 players each
|
|
"""
|
|
|
|
import asyncio
|
|
import random
|
|
import sys
|
|
from typing import Optional
|
|
|
|
from game import Game, Player, GamePhase, GameOptions
|
|
from ai import (
|
|
GolfAI, CPUProfile, CPU_PROFILES,
|
|
get_ai_card_value, has_worse_visible_card
|
|
)
|
|
from game_log import GameLogger
|
|
|
|
|
|
class SimulationStats:
|
|
"""Track simulation statistics."""
|
|
|
|
def __init__(self):
|
|
self.games_played = 0
|
|
self.total_rounds = 0
|
|
self.total_turns = 0
|
|
self.player_wins: dict[str, int] = {}
|
|
self.player_scores: dict[str, list[int]] = {}
|
|
self.decisions: dict[str, dict] = {} # player -> {action: count}
|
|
|
|
def record_game(self, game: Game, winner_name: str):
|
|
self.games_played += 1
|
|
self.total_rounds += game.current_round
|
|
|
|
if winner_name not in self.player_wins:
|
|
self.player_wins[winner_name] = 0
|
|
self.player_wins[winner_name] += 1
|
|
|
|
for player in game.players:
|
|
if player.name not in self.player_scores:
|
|
self.player_scores[player.name] = []
|
|
self.player_scores[player.name].append(player.total_score)
|
|
|
|
def record_turn(self, player_name: str, action: str):
|
|
self.total_turns += 1
|
|
if player_name not in self.decisions:
|
|
self.decisions[player_name] = {}
|
|
if action not in self.decisions[player_name]:
|
|
self.decisions[player_name][action] = 0
|
|
self.decisions[player_name][action] += 1
|
|
|
|
def report(self) -> str:
|
|
lines = [
|
|
"=" * 50,
|
|
"SIMULATION RESULTS",
|
|
"=" * 50,
|
|
f"Games played: {self.games_played}",
|
|
f"Total rounds: {self.total_rounds}",
|
|
f"Total turns: {self.total_turns}",
|
|
f"Avg turns/game: {self.total_turns / max(1, self.games_played):.1f}",
|
|
"",
|
|
"WIN RATES:",
|
|
]
|
|
|
|
total_wins = sum(self.player_wins.values())
|
|
for name, wins in sorted(self.player_wins.items(), key=lambda x: -x[1]):
|
|
pct = wins / max(1, total_wins) * 100
|
|
lines.append(f" {name}: {wins} wins ({pct:.1f}%)")
|
|
|
|
lines.append("")
|
|
lines.append("AVERAGE SCORES (lower is better):")
|
|
|
|
for name, scores in sorted(
|
|
self.player_scores.items(),
|
|
key=lambda x: sum(x[1]) / len(x[1]) if x[1] else 999
|
|
):
|
|
avg = sum(scores) / len(scores) if scores else 0
|
|
lines.append(f" {name}: {avg:.1f}")
|
|
|
|
lines.append("")
|
|
lines.append("DECISION BREAKDOWN:")
|
|
|
|
for name, actions in sorted(self.decisions.items()):
|
|
total = sum(actions.values())
|
|
lines.append(f" {name}:")
|
|
for action, count in sorted(actions.items()):
|
|
pct = count / max(1, total) * 100
|
|
lines.append(f" {action}: {count} ({pct:.1f}%)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def create_cpu_players(num_players: int) -> list[tuple[Player, CPUProfile]]:
|
|
"""Create CPU players with random profiles."""
|
|
# Shuffle profiles and pick
|
|
profiles = random.sample(CPU_PROFILES, min(num_players, len(CPU_PROFILES)))
|
|
|
|
players = []
|
|
for i, profile in enumerate(profiles):
|
|
player = Player(id=f"cpu_{i}", name=profile.name)
|
|
players.append((player, profile))
|
|
|
|
return players
|
|
|
|
|
|
def run_cpu_turn(
|
|
game: Game,
|
|
player: Player,
|
|
profile: CPUProfile,
|
|
logger: Optional[GameLogger],
|
|
game_id: Optional[str],
|
|
stats: SimulationStats
|
|
) -> str:
|
|
"""Run a single CPU turn synchronously. Returns action taken."""
|
|
|
|
# Decide whether to draw from discard or deck
|
|
discard_top = game.discard_top()
|
|
take_discard = GolfAI.should_take_discard(discard_top, player, profile, game)
|
|
|
|
source = "discard" if take_discard else "deck"
|
|
drawn = game.draw_card(player.id, source)
|
|
|
|
if not drawn:
|
|
return "no_card"
|
|
|
|
action = "take_discard" if take_discard else "draw_deck"
|
|
stats.record_turn(player.name, action)
|
|
|
|
# Log draw decision
|
|
if logger and game_id:
|
|
reason = f"took {discard_top.rank.value} from discard" if take_discard else "drew from deck"
|
|
logger.log_move(
|
|
game_id=game_id,
|
|
player=player,
|
|
is_cpu=True,
|
|
action=action,
|
|
card=drawn,
|
|
game=game,
|
|
decision_reason=reason,
|
|
)
|
|
|
|
# Decide whether to swap or discard
|
|
swap_pos = GolfAI.choose_swap_or_discard(drawn, player, profile, game)
|
|
|
|
# If drawn from discard, must swap
|
|
if swap_pos is None and game.drawn_from_discard:
|
|
face_down = [i for i, c in enumerate(player.cards) if not c.face_up]
|
|
if face_down:
|
|
swap_pos = random.choice(face_down)
|
|
else:
|
|
# Find worst card using house rules
|
|
worst_pos = 0
|
|
worst_val = -999
|
|
for i, c in enumerate(player.cards):
|
|
card_val = get_ai_card_value(c, game.options)
|
|
if card_val > worst_val:
|
|
worst_val = card_val
|
|
worst_pos = i
|
|
swap_pos = worst_pos
|
|
|
|
if swap_pos is not None:
|
|
old_card = player.cards[swap_pos]
|
|
game.swap_card(player.id, swap_pos)
|
|
action = "swap"
|
|
stats.record_turn(player.name, action)
|
|
|
|
if logger and game_id:
|
|
logger.log_move(
|
|
game_id=game_id,
|
|
player=player,
|
|
is_cpu=True,
|
|
action="swap",
|
|
card=drawn,
|
|
position=swap_pos,
|
|
game=game,
|
|
decision_reason=f"swapped {drawn.rank.value} for {old_card.rank.value} at pos {swap_pos}",
|
|
)
|
|
else:
|
|
game.discard_drawn(player.id)
|
|
action = "discard"
|
|
stats.record_turn(player.name, action)
|
|
|
|
if logger and game_id:
|
|
logger.log_move(
|
|
game_id=game_id,
|
|
player=player,
|
|
is_cpu=True,
|
|
action="discard",
|
|
card=drawn,
|
|
game=game,
|
|
decision_reason=f"discarded {drawn.rank.value}",
|
|
)
|
|
|
|
if game.flip_on_discard:
|
|
flip_pos = GolfAI.choose_flip_after_discard(player, profile)
|
|
game.flip_and_end_turn(player.id, flip_pos)
|
|
|
|
if logger and game_id:
|
|
flipped = player.cards[flip_pos]
|
|
logger.log_move(
|
|
game_id=game_id,
|
|
player=player,
|
|
is_cpu=True,
|
|
action="flip",
|
|
card=flipped,
|
|
position=flip_pos,
|
|
game=game,
|
|
decision_reason=f"flipped position {flip_pos}",
|
|
)
|
|
|
|
return action
|
|
|
|
|
|
def run_game(
|
|
players_with_profiles: list[tuple[Player, CPUProfile]],
|
|
options: GameOptions,
|
|
logger: Optional[GameLogger],
|
|
stats: SimulationStats,
|
|
verbose: bool = False
|
|
) -> tuple[str, int]:
|
|
"""Run a complete game. Returns (winner_name, winner_score)."""
|
|
|
|
game = Game()
|
|
profiles: dict[str, CPUProfile] = {}
|
|
|
|
for player, profile in players_with_profiles:
|
|
# Reset player state
|
|
player.cards = []
|
|
player.score = 0
|
|
player.total_score = 0
|
|
player.rounds_won = 0
|
|
|
|
game.add_player(player)
|
|
profiles[player.id] = profile
|
|
|
|
game.start_game(num_decks=1, num_rounds=1, options=options)
|
|
|
|
# Log game start
|
|
game_id = None
|
|
if logger:
|
|
game_id = logger.log_game_start(
|
|
room_code="SIM",
|
|
num_players=len(players_with_profiles),
|
|
options=options
|
|
)
|
|
|
|
# Do initial flips for all players
|
|
if options.initial_flips > 0:
|
|
for player, profile in players_with_profiles:
|
|
positions = GolfAI.choose_initial_flips(options.initial_flips)
|
|
game.flip_initial_cards(player.id, positions)
|
|
|
|
# Play until game over
|
|
turn_count = 0
|
|
max_turns = 200 # Safety limit
|
|
|
|
while game.phase in (GamePhase.PLAYING, GamePhase.FINAL_TURN) and turn_count < max_turns:
|
|
current = game.current_player()
|
|
if not current:
|
|
break
|
|
|
|
profile = profiles[current.id]
|
|
action = run_cpu_turn(game, current, profile, logger, game_id, stats)
|
|
|
|
if verbose and turn_count % 10 == 0:
|
|
print(f" Turn {turn_count}: {current.name} - {action}")
|
|
|
|
turn_count += 1
|
|
|
|
# Log game end
|
|
if logger and game_id:
|
|
logger.log_game_end(game_id)
|
|
|
|
# Find winner
|
|
winner = min(game.players, key=lambda p: p.total_score)
|
|
stats.record_game(game, winner.name)
|
|
|
|
return winner.name, winner.total_score
|
|
|
|
|
|
def run_simulation(
|
|
num_games: int = 10,
|
|
num_players: int = 4,
|
|
verbose: bool = True
|
|
):
|
|
"""Run multiple games and report statistics."""
|
|
|
|
print(f"\nRunning {num_games} games with {num_players} players each...")
|
|
print("=" * 50)
|
|
|
|
logger = GameLogger()
|
|
stats = SimulationStats()
|
|
|
|
# Default options
|
|
options = GameOptions(
|
|
initial_flips=2,
|
|
flip_on_discard=False,
|
|
use_jokers=False,
|
|
)
|
|
|
|
for i in range(num_games):
|
|
players = create_cpu_players(num_players)
|
|
|
|
if verbose:
|
|
names = [p.name for p, _ in players]
|
|
print(f"\nGame {i+1}/{num_games}: {', '.join(names)}")
|
|
|
|
winner, score = run_game(players, options, logger, stats, verbose=False)
|
|
|
|
if verbose:
|
|
print(f" Winner: {winner} (score: {score})")
|
|
|
|
print("\n")
|
|
print(stats.report())
|
|
|
|
print("\n" + "=" * 50)
|
|
print("ANALYSIS")
|
|
print("=" * 50)
|
|
print("\nRun analysis with:")
|
|
print(" python game_analyzer.py blunders")
|
|
print(" python game_analyzer.py summary")
|
|
|
|
|
|
def run_detailed_game(num_players: int = 4):
|
|
"""Run a single game with detailed output."""
|
|
|
|
print(f"\nRunning detailed game with {num_players} players...")
|
|
print("=" * 50)
|
|
|
|
logger = GameLogger()
|
|
stats = SimulationStats()
|
|
|
|
options = GameOptions(
|
|
initial_flips=2,
|
|
flip_on_discard=False,
|
|
use_jokers=False,
|
|
)
|
|
|
|
players_with_profiles = create_cpu_players(num_players)
|
|
|
|
game = Game()
|
|
profiles: dict[str, CPUProfile] = {}
|
|
|
|
for player, profile in players_with_profiles:
|
|
game.add_player(player)
|
|
profiles[player.id] = profile
|
|
print(f" {player.name} ({profile.style})")
|
|
|
|
game.start_game(num_decks=1, num_rounds=1, options=options)
|
|
|
|
game_id = logger.log_game_start(
|
|
room_code="DETAIL",
|
|
num_players=num_players,
|
|
options=options
|
|
)
|
|
|
|
# Initial flips
|
|
print("\nInitial flips:")
|
|
for player, profile in players_with_profiles:
|
|
positions = GolfAI.choose_initial_flips(options.initial_flips)
|
|
game.flip_initial_cards(player.id, positions)
|
|
visible = [(i, c.rank.value) for i, c in enumerate(player.cards) if c.face_up]
|
|
print(f" {player.name}: {visible}")
|
|
|
|
print(f"\nDiscard pile: {game.discard_top().rank.value}")
|
|
print("\n" + "-" * 50)
|
|
|
|
# Play game
|
|
turn = 0
|
|
while game.phase in (GamePhase.PLAYING, GamePhase.FINAL_TURN) and turn < 100:
|
|
current = game.current_player()
|
|
if not current:
|
|
break
|
|
|
|
profile = profiles[current.id]
|
|
discard_before = game.discard_top()
|
|
|
|
# Show state before turn
|
|
visible = [(i, c.rank.value) for i, c in enumerate(current.cards) if c.face_up]
|
|
hidden = sum(1 for c in current.cards if not c.face_up)
|
|
|
|
print(f"\nTurn {turn + 1}: {current.name}")
|
|
print(f" Hand: {visible} + {hidden} hidden")
|
|
print(f" Discard: {discard_before.rank.value}")
|
|
|
|
# Run turn
|
|
action = run_cpu_turn(game, current, profile, logger, game_id, stats)
|
|
|
|
# Show result
|
|
discard_after = game.discard_top()
|
|
print(f" Action: {action}")
|
|
print(f" New discard: {discard_after.rank.value if discard_after else 'empty'}")
|
|
|
|
if game.phase == GamePhase.FINAL_TURN and game.finisher_id == current.id:
|
|
print(f" >>> {current.name} went out! Final turn phase.")
|
|
|
|
turn += 1
|
|
|
|
# Game over
|
|
logger.log_game_end(game_id)
|
|
|
|
print("\n" + "=" * 50)
|
|
print("FINAL SCORES")
|
|
print("=" * 50)
|
|
|
|
for player in sorted(game.players, key=lambda p: p.total_score):
|
|
cards = [c.rank.value for c in player.cards]
|
|
print(f" {player.name}: {player.total_score} points")
|
|
print(f" Cards: {cards}")
|
|
|
|
winner = min(game.players, key=lambda p: p.total_score)
|
|
print(f"\nWinner: {winner.name}!")
|
|
|
|
print(f"\nGame logged as: {game_id[:8]}...")
|
|
print("Run: python game_analyzer.py game", game_id, winner.name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1 and sys.argv[1] == "detail":
|
|
# Detailed single game
|
|
num_players = int(sys.argv[2]) if len(sys.argv) > 2 else 4
|
|
run_detailed_game(num_players)
|
|
else:
|
|
# Batch simulation
|
|
num_games = int(sys.argv[1]) if len(sys.argv) > 1 else 10
|
|
num_players = int(sys.argv[2]) if len(sys.argv) > 2 else 4
|
|
run_simulation(num_games, num_players)
|