golfgame/server/game_analyzer.py
adlee-was-taken 49b2490c25 Add PostgreSQL game logging system
- Add GameLogger service for move logging to PostgreSQL
- Add moves table to event_store.py for AI decision analysis
- Update main.py to initialize GameLogger in lifespan
- Update game_analyzer.py to query PostgreSQL instead of SQLite
- Add VDD documentation V2_08_GAME_LOGGING.md

Replaces SQLite game_log.py with unified PostgreSQL backend.
See docs/v2/V2_08_GAME_LOGGING.md for architecture and API.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-14 08:42:49 -05:00

670 lines
25 KiB
Python

"""
Game Analyzer for 6-Card Golf AI decisions.
Evaluates AI decisions against optimal play baselines and generates
reports on decision quality, mistake rates, and areas for improvement.
NOTE: This analyzer has been updated to use PostgreSQL. It requires
POSTGRES_URL to be configured. For quick analysis during simulations,
use the SimulationStats class in simulate.py instead.
Usage:
python game_analyzer.py blunders [limit]
python game_analyzer.py recent [limit]
"""
import asyncio
import json
import os
import sqlite3 # For legacy GameAnalyzer class (deprecated)
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
from enum import Enum
from game import Rank, RANK_VALUES, GameOptions
from constants import get_card_value_for_rank
# =============================================================================
# Card Value Utilities
# =============================================================================
def get_card_value(rank: str, options: Optional[dict] = None) -> int:
"""Get point value for a card rank string.
This is a wrapper around constants.get_card_value_for_rank() for
backwards compatibility with existing analyzer code.
"""
return get_card_value_for_rank(rank, options)
def rank_quality(rank: str, options: Optional[dict] = None) -> str:
"""Categorize a card as excellent, good, neutral, bad, or terrible."""
value = get_card_value(rank, options)
if value <= -2:
return "excellent" # Jokers, 2s
if value <= 0:
return "good" # Kings (or lucky 7s, ten_penny 10s)
if value <= 2:
return "decent" # Aces, 2s without special rules
if value <= 5:
return "neutral" # 3-5
if value <= 7:
return "bad" # 6-7
return "terrible" # 8-10, J, Q
# =============================================================================
# Decision Classification
# =============================================================================
class DecisionQuality(Enum):
"""Classification of decision quality."""
OPTIMAL = "optimal" # Best possible decision
GOOD = "good" # Reasonable decision, minor suboptimality
QUESTIONABLE = "questionable" # Debatable, might be personality-driven
MISTAKE = "mistake" # Clear suboptimal play (2-5 point cost)
BLUNDER = "blunder" # Severe error (5+ point cost)
@dataclass
class DecisionAnalysis:
"""Analysis of a single decision."""
move_id: int
action: str
card_rank: Optional[str]
position: Optional[int]
quality: DecisionQuality
expected_value: float # EV impact of this decision
reasoning: str
optimal_play: Optional[str] = None # What should have been done
@dataclass
class GameSummary:
"""Summary analysis of a complete game."""
game_id: str
player_name: str
total_decisions: int
optimal_count: int
good_count: int
questionable_count: int
mistake_count: int
blunder_count: int
total_ev_lost: float # Points "left on table"
decisions: list[DecisionAnalysis]
@property
def accuracy(self) -> float:
"""Percentage of optimal/good decisions."""
if self.total_decisions == 0:
return 100.0
return (self.optimal_count + self.good_count) / self.total_decisions * 100
@property
def mistake_rate(self) -> float:
"""Percentage of mistakes + blunders."""
if self.total_decisions == 0:
return 0.0
return (self.mistake_count + self.blunder_count) / self.total_decisions * 100
# =============================================================================
# Decision Evaluators
# =============================================================================
class DecisionEvaluator:
"""Evaluates individual decisions against optimal play."""
def __init__(self, options: Optional[dict] = None):
self.options = options or {}
def evaluate_take_discard(
self,
discard_rank: str,
hand: list[dict],
took_discard: bool
) -> DecisionAnalysis:
"""
Evaluate decision to take from discard vs draw from deck.
Optimal play:
- Always take: Jokers, Kings, 2s
- Take if: Value < worst visible card
- Don't take: High cards (8+) with good hand
"""
discard_value = get_card_value(discard_rank, self.options)
discard_qual = rank_quality(discard_rank, self.options)
# Find worst visible card in hand
visible_cards = [c for c in hand if c.get('face_up')]
worst_visible_value = max(
(get_card_value(c['rank'], self.options) for c in visible_cards),
default=5 # Assume average if no visible
)
# Determine if taking was correct
should_take = False
reasoning = ""
# Auto-take excellent cards
if discard_qual == "excellent":
should_take = True
reasoning = f"{discard_rank} is excellent (value={discard_value}), always take"
# Auto-take good cards
elif discard_qual == "good":
should_take = True
reasoning = f"{discard_rank} is good (value={discard_value}), should take"
# Take if better than worst visible
elif discard_value < worst_visible_value - 1:
should_take = True
reasoning = f"{discard_rank} ({discard_value}) better than worst visible ({worst_visible_value})"
# Don't take bad cards
elif discard_qual in ("bad", "terrible"):
should_take = False
reasoning = f"{discard_rank} is {discard_qual} (value={discard_value}), should not take"
else:
# Neutral - personality can influence
should_take = None # Either is acceptable
reasoning = f"{discard_rank} is neutral, either choice reasonable"
# Evaluate the actual decision
if should_take is None:
quality = DecisionQuality.GOOD
ev = 0
elif took_discard == should_take:
quality = DecisionQuality.OPTIMAL
ev = 0
else:
# Wrong decision
if discard_qual == "excellent" and not took_discard:
quality = DecisionQuality.BLUNDER
ev = -abs(discard_value) # Lost opportunity
reasoning = f"Failed to take {discard_rank} - significant missed opportunity"
elif discard_qual == "terrible" and took_discard:
quality = DecisionQuality.BLUNDER
ev = discard_value - 5 # Expected deck draw ~5
reasoning = f"Took terrible card {discard_rank} when should have drawn from deck"
elif discard_qual == "good" and not took_discard:
quality = DecisionQuality.MISTAKE
ev = -2
reasoning = f"Missed good card {discard_rank}"
elif discard_qual == "bad" and took_discard:
quality = DecisionQuality.MISTAKE
ev = discard_value - 5
reasoning = f"Took bad card {discard_rank}"
else:
quality = DecisionQuality.QUESTIONABLE
ev = -1
reasoning = f"Suboptimal choice with {discard_rank}"
return DecisionAnalysis(
move_id=0,
action="take_discard" if took_discard else "draw_deck",
card_rank=discard_rank,
position=None,
quality=quality,
expected_value=ev,
reasoning=reasoning,
optimal_play="take" if should_take else "draw" if should_take is False else "either"
)
def evaluate_swap(
self,
drawn_rank: str,
hand: list[dict],
swapped: bool,
swap_position: Optional[int],
was_from_discard: bool
) -> DecisionAnalysis:
"""
Evaluate swap vs discard decision.
Optimal play:
- Swap excellent cards into face-down positions
- Swap if drawn card better than position card
- Don't discard good cards
"""
drawn_value = get_card_value(drawn_rank, self.options)
drawn_qual = rank_quality(drawn_rank, self.options)
# If from discard, must swap - evaluate position choice
if was_from_discard and not swapped:
# This shouldn't happen per rules
return DecisionAnalysis(
move_id=0,
action="invalid",
card_rank=drawn_rank,
position=swap_position,
quality=DecisionQuality.BLUNDER,
expected_value=-10,
reasoning="Must swap when drawing from discard",
optimal_play="swap"
)
if not swapped:
# Discarded the drawn card
if drawn_qual == "excellent":
return DecisionAnalysis(
move_id=0,
action="discard",
card_rank=drawn_rank,
position=None,
quality=DecisionQuality.BLUNDER,
expected_value=abs(drawn_value) + 5, # Lost value + avg replacement
reasoning=f"Discarded excellent card {drawn_rank}!",
optimal_play="swap into face-down"
)
elif drawn_qual == "good":
return DecisionAnalysis(
move_id=0,
action="discard",
card_rank=drawn_rank,
position=None,
quality=DecisionQuality.MISTAKE,
expected_value=3,
reasoning=f"Discarded good card {drawn_rank}",
optimal_play="swap into face-down"
)
else:
# Discarding neutral/bad card is fine
return DecisionAnalysis(
move_id=0,
action="discard",
card_rank=drawn_rank,
position=None,
quality=DecisionQuality.OPTIMAL,
expected_value=0,
reasoning=f"Correctly discarded {drawn_qual} card {drawn_rank}",
)
# Swapped - evaluate position choice
if swap_position is not None and 0 <= swap_position < len(hand):
replaced_card = hand[swap_position]
if replaced_card.get('face_up'):
replaced_rank = replaced_card.get('rank', '?')
replaced_value = get_card_value(replaced_rank, self.options)
ev_change = replaced_value - drawn_value
if ev_change > 0:
quality = DecisionQuality.OPTIMAL
reasoning = f"Good swap: {drawn_rank} ({drawn_value}) for {replaced_rank} ({replaced_value})"
elif ev_change < -3:
quality = DecisionQuality.MISTAKE
reasoning = f"Bad swap: lost {-ev_change} points swapping {replaced_rank} for {drawn_rank}"
elif ev_change < 0:
quality = DecisionQuality.QUESTIONABLE
reasoning = f"Marginal swap: {drawn_rank} for {replaced_rank}"
else:
quality = DecisionQuality.GOOD
reasoning = f"Neutral swap: {drawn_rank} for {replaced_rank}"
return DecisionAnalysis(
move_id=0,
action="swap",
card_rank=drawn_rank,
position=swap_position,
quality=quality,
expected_value=ev_change,
reasoning=reasoning,
)
else:
# Swapped into face-down - generally good for good cards
if drawn_qual in ("excellent", "good", "decent"):
return DecisionAnalysis(
move_id=0,
action="swap",
card_rank=drawn_rank,
position=swap_position,
quality=DecisionQuality.OPTIMAL,
expected_value=5 - drawn_value, # vs expected ~5 hidden
reasoning=f"Good: swapped {drawn_rank} into unknown position",
)
else:
return DecisionAnalysis(
move_id=0,
action="swap",
card_rank=drawn_rank,
position=swap_position,
quality=DecisionQuality.QUESTIONABLE,
expected_value=0,
reasoning=f"Risky: swapped {drawn_qual} card {drawn_rank} into unknown",
)
return DecisionAnalysis(
move_id=0,
action="swap",
card_rank=drawn_rank,
position=swap_position,
quality=DecisionQuality.GOOD,
expected_value=0,
reasoning="Swap decision",
)
# =============================================================================
# Game Analyzer
# =============================================================================
class GameAnalyzer:
"""Analyzes logged games for decision quality.
DEPRECATED: This class uses SQLite which has been replaced by PostgreSQL.
Use the CLI commands (blunders, recent) instead, or query the moves table
in PostgreSQL directly.
"""
def __init__(self, db_path: str = "games.db"):
self.db_path = Path(db_path)
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found: {db_path}")
def get_game_options(self, game_id: str) -> Optional[dict]:
"""Load game options from database."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT options_json FROM games WHERE id = ?",
(game_id,)
)
row = cursor.fetchone()
if row and row[0]:
return json.loads(row[0])
return None
def get_player_moves(self, game_id: str, player_name: str) -> list[dict]:
"""Get all moves for a player in a game."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM moves
WHERE game_id = ? AND player_name = ?
ORDER BY move_number
""", (game_id, player_name))
return [dict(row) for row in cursor.fetchall()]
def analyze_player_game(self, game_id: str, player_name: str) -> GameSummary:
"""Analyze all decisions made by a player in a game."""
options = self.get_game_options(game_id)
moves = self.get_player_moves(game_id, player_name)
evaluator = DecisionEvaluator(options)
decisions = []
draw_context = None # Track the draw for evaluating subsequent swap
for move in moves:
action = move['action']
card_rank = move['card_rank']
position = move['position']
hand = json.loads(move['hand_json']) if move['hand_json'] else []
discard_top = json.loads(move['discard_top_json']) if move['discard_top_json'] else None
if action in ('take_discard', 'draw_deck'):
# Evaluate draw decision
if discard_top:
analysis = evaluator.evaluate_take_discard(
discard_rank=discard_top.get('rank', '?'),
hand=hand,
took_discard=(action == 'take_discard')
)
analysis.move_id = move['id']
decisions.append(analysis)
# Store context for swap evaluation
draw_context = {
'rank': card_rank,
'from_discard': action == 'take_discard',
'hand': hand
}
elif action == 'swap':
if draw_context:
analysis = evaluator.evaluate_swap(
drawn_rank=draw_context['rank'],
hand=draw_context['hand'],
swapped=True,
swap_position=position,
was_from_discard=draw_context['from_discard']
)
analysis.move_id = move['id']
decisions.append(analysis)
draw_context = None
elif action == 'discard':
if draw_context:
analysis = evaluator.evaluate_swap(
drawn_rank=draw_context['rank'],
hand=draw_context['hand'],
swapped=False,
swap_position=None,
was_from_discard=draw_context['from_discard']
)
analysis.move_id = move['id']
decisions.append(analysis)
draw_context = None
# Tally results
counts = {q: 0 for q in DecisionQuality}
total_ev_lost = 0.0
for d in decisions:
counts[d.quality] += 1
if d.expected_value < 0:
total_ev_lost += abs(d.expected_value)
return GameSummary(
game_id=game_id,
player_name=player_name,
total_decisions=len(decisions),
optimal_count=counts[DecisionQuality.OPTIMAL],
good_count=counts[DecisionQuality.GOOD],
questionable_count=counts[DecisionQuality.QUESTIONABLE],
mistake_count=counts[DecisionQuality.MISTAKE],
blunder_count=counts[DecisionQuality.BLUNDER],
total_ev_lost=total_ev_lost,
decisions=decisions
)
def find_blunders(self, limit: int = 20) -> list[dict]:
"""Find all blunders across all games."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT m.*, g.room_code
FROM moves m
JOIN games g ON m.game_id = g.id
WHERE m.is_cpu = 1
ORDER BY m.timestamp DESC
LIMIT ?
""", (limit * 10,)) # Get more, then filter
blunders = []
options_cache = {}
for row in cursor:
move = dict(row)
game_id = move['game_id']
# Cache options lookup
if game_id not in options_cache:
options_cache[game_id] = self.get_game_options(game_id)
options = options_cache[game_id]
card_rank = move['card_rank']
if not card_rank:
continue
# Check for obvious blunders
quality = rank_quality(card_rank, options)
action = move['action']
is_blunder = False
reason = ""
if action == 'discard' and quality in ('excellent', 'good'):
is_blunder = True
reason = f"Discarded {quality} card {card_rank}"
elif action == 'take_discard' and quality == 'terrible':
# Check if this was for pairing - that's smart play!
hand = json.loads(move['hand_json']) if move['hand_json'] else []
card_value = get_card_value(card_rank, options)
has_matching_visible = any(
c.get('rank') == card_rank and c.get('face_up')
for c in hand
)
# Also check if player has worse visible cards (taking to swap is smart)
has_worse_visible = any(
c.get('face_up') and get_card_value(c.get('rank', '?'), options) > card_value
for c in hand
)
if has_matching_visible:
# Taking to pair - this is good play, not a blunder
pass
elif has_worse_visible:
# Taking to swap for a worse card - reasonable play
pass
else:
is_blunder = True
reason = f"Took terrible card {card_rank} with no improvement path"
if is_blunder:
blunders.append({
**move,
'blunder_reason': reason
})
if len(blunders) >= limit:
break
return blunders
# =============================================================================
# Report Generation
# =============================================================================
def generate_player_report(summary: GameSummary) -> str:
"""Generate a text report for a player's game performance."""
lines = [
f"=== Decision Analysis: {summary.player_name} ===",
f"Game: {summary.game_id[:8]}...",
f"",
f"Total Decisions: {summary.total_decisions}",
f"Accuracy: {summary.accuracy:.1f}%",
f"",
f"Breakdown:",
f" Optimal: {summary.optimal_count}",
f" Good: {summary.good_count}",
f" Questionable: {summary.questionable_count}",
f" Mistakes: {summary.mistake_count}",
f" Blunders: {summary.blunder_count}",
f"",
f"Points Lost to Errors: {summary.total_ev_lost:.1f}",
f"",
]
# List specific issues
issues = [d for d in summary.decisions
if d.quality in (DecisionQuality.MISTAKE, DecisionQuality.BLUNDER)]
if issues:
lines.append("Issues Found:")
for d in issues:
marker = "!!!" if d.quality == DecisionQuality.BLUNDER else "!"
lines.append(f" {marker} {d.reasoning}")
return "\n".join(lines)
def print_blunder_report(blunders: list[dict]):
"""Print a report of found blunders."""
print(f"\n=== Blunder Report ({len(blunders)} found) ===\n")
for b in blunders:
print(f"Player: {b['player_name']}")
print(f"Action: {b['action']} {b['card_rank']}")
print(f"Reason: {b['blunder_reason']}")
print(f"Room: {b.get('room_code', 'N/A')}")
print("-" * 40)
# =============================================================================
# CLI Interface (PostgreSQL version)
# =============================================================================
async def run_cli():
"""Async CLI entry point."""
import sys
if len(sys.argv) < 2:
print("Usage:")
print(" python game_analyzer.py blunders [limit]")
print(" python game_analyzer.py recent [limit]")
print("")
print("Requires POSTGRES_URL environment variable.")
sys.exit(1)
postgres_url = os.environ.get("POSTGRES_URL")
if not postgres_url:
print("Error: POSTGRES_URL environment variable not set.")
print("")
print("Set it like: export POSTGRES_URL=postgresql://golf:devpassword@localhost:5432/golf")
print("")
print("For simulation analysis without PostgreSQL, use:")
print(" python simulate.py 100 --preset baseline")
sys.exit(1)
from stores.event_store import EventStore
try:
event_store = await EventStore.create(postgres_url)
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
sys.exit(1)
command = sys.argv[1]
try:
if command == "blunders":
limit = int(sys.argv[2]) if len(sys.argv) > 2 else 20
blunders = await event_store.find_suspicious_discards(limit)
print(f"\n=== Suspicious Discards ({len(blunders)} found) ===\n")
for b in blunders:
print(f"Player: {b.get('player_name', 'Unknown')}")
print(f"Action: discard {b.get('card_rank', '?')}")
print(f"Room: {b.get('room_code', 'N/A')}")
print(f"Reason: {b.get('decision_reason', 'N/A')}")
print("-" * 40)
elif command == "recent":
limit = int(sys.argv[2]) if len(sys.argv) > 2 else 10
games = await event_store.get_recent_games_with_stats(limit)
print("\n=== Recent Games ===\n")
for game in games:
game_id = str(game.get('id', ''))[:8]
room_code = game.get('room_code', 'N/A')
status = game.get('status', 'unknown')
moves = game.get('total_moves', 0)
print(f"{game_id}... | Room: {room_code} | Status: {status} | Moves: {moves}")
else:
print(f"Unknown command: {command}")
print("Available: blunders, recent")
finally:
await event_store.close()
if __name__ == "__main__":
# Note: The detailed analysis (GameAnalyzer class) still uses the old SQLite
# schema format. For now, use the CLI commands above for PostgreSQL queries.
# Full migration of the analysis logic is TODO.
asyncio.run(run_cli())