""" PostgreSQL-backed event store for Golf game. The event store is an append-only log of all game events. Events are immutable and ordered by sequence number within each game. Features: - Optimistic concurrency via unique constraint on (game_id, sequence_num) - Batch appends for atomic multi-event writes - Streaming for memory-efficient large game replay - Game metadata table for efficient queries """ import json import logging from datetime import datetime, timezone from typing import Optional, AsyncIterator import asyncpg from models.events import GameEvent, EventType logger = logging.getLogger(__name__) class ConcurrencyError(Exception): """Raised when optimistic concurrency check fails.""" pass # SQL schema for event store SCHEMA_SQL = """ -- Events table (append-only log) CREATE TABLE IF NOT EXISTS events ( id BIGSERIAL PRIMARY KEY, game_id UUID NOT NULL, sequence_num INT NOT NULL, event_type VARCHAR(50) NOT NULL, player_id VARCHAR(50), event_data JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), -- Ensure events are ordered and unique per game UNIQUE(game_id, sequence_num) ); -- Games metadata (denormalized for queries, not source of truth) CREATE TABLE IF NOT EXISTS games_v2 ( id UUID PRIMARY KEY, room_code VARCHAR(10) NOT NULL, status VARCHAR(20) DEFAULT 'active', -- active, completed, abandoned created_at TIMESTAMPTZ DEFAULT NOW(), started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, num_players INT, num_rounds INT, options JSONB, winner_id VARCHAR(50), host_id VARCHAR(50), -- Denormalized for efficient queries player_ids VARCHAR(50)[] DEFAULT '{}' ); -- Moves table (denormalized for AI decision analysis) -- Replaces SQLite game_log.py - provides efficient queries for move-level analysis CREATE TABLE IF NOT EXISTS moves ( id BIGSERIAL PRIMARY KEY, game_id UUID NOT NULL, sequence_num INT NOT NULL, timestamp TIMESTAMPTZ DEFAULT NOW(), player_id VARCHAR(50) NOT NULL, player_name VARCHAR(100), is_cpu BOOLEAN DEFAULT FALSE, -- Action details action VARCHAR(30) NOT NULL, -- draw_deck, take_discard, swap, discard, flip, etc. card_rank VARCHAR(5), card_suit VARCHAR(10), position INT, -- AI context (JSONB for flexibility) hand_state JSONB, -- Player's hand at decision time discard_top JSONB, -- Top of discard pile visible_opponents JSONB, -- Face-up cards of opponents decision_reason TEXT, -- AI reasoning UNIQUE(game_id, sequence_num) ); -- Indexes for common queries CREATE INDEX IF NOT EXISTS idx_events_game_seq ON events(game_id, sequence_num); CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type); CREATE INDEX IF NOT EXISTS idx_events_player ON events(player_id) WHERE player_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at); CREATE INDEX IF NOT EXISTS idx_games_status ON games_v2(status); CREATE INDEX IF NOT EXISTS idx_games_room ON games_v2(room_code) WHERE status = 'active'; CREATE INDEX IF NOT EXISTS idx_games_players ON games_v2 USING GIN(player_ids); CREATE INDEX IF NOT EXISTS idx_games_completed ON games_v2(completed_at) WHERE status = 'completed'; CREATE INDEX IF NOT EXISTS idx_moves_game ON moves(game_id); CREATE INDEX IF NOT EXISTS idx_moves_action ON moves(action); CREATE INDEX IF NOT EXISTS idx_moves_is_cpu ON moves(is_cpu); CREATE INDEX IF NOT EXISTS idx_moves_player ON moves(player_id); """ class EventStore: """ PostgreSQL-backed event store. Provides methods for appending events and querying event history. Uses asyncpg for async database access. """ def __init__(self, pool: asyncpg.Pool): """ Initialize event store with connection pool. Args: pool: asyncpg connection pool. """ self.pool = pool @classmethod async def create(cls, postgres_url: str) -> "EventStore": """ Create an EventStore with a new connection pool. Args: postgres_url: PostgreSQL connection URL. Returns: Configured EventStore instance. """ pool = await asyncpg.create_pool(postgres_url, min_size=2, max_size=10) store = cls(pool) await store.initialize_schema() return store async def initialize_schema(self) -> None: """Create database tables if they don't exist.""" async with self.pool.acquire() as conn: await conn.execute(SCHEMA_SQL) logger.info("Event store schema initialized") async def close(self) -> None: """Close the connection pool.""" await self.pool.close() # ------------------------------------------------------------------------- # Event Writes # ------------------------------------------------------------------------- async def append(self, event: GameEvent) -> int: """ Append an event to the store. Args: event: The event to append. Returns: The database ID of the inserted event. Raises: ConcurrencyError: If sequence_num already exists for this game. """ async with self.pool.acquire() as conn: try: row = await conn.fetchrow( """ INSERT INTO events (game_id, sequence_num, event_type, player_id, event_data) VALUES ($1, $2, $3, $4, $5) RETURNING id """, event.game_id, event.sequence_num, event.event_type.value, event.player_id, json.dumps(event.data), ) return row["id"] except asyncpg.UniqueViolationError: raise ConcurrencyError( f"Event {event.sequence_num} already exists for game {event.game_id}" ) async def append_batch(self, events: list[GameEvent]) -> list[int]: """ Append multiple events atomically. All events are inserted in a single transaction. If any event fails (e.g., duplicate sequence), all are rolled back. Args: events: List of events to append. Returns: List of database IDs for inserted events. Raises: ConcurrencyError: If any sequence_num already exists. """ if not events: return [] async with self.pool.acquire() as conn: async with conn.transaction(): ids = [] for event in events: try: row = await conn.fetchrow( """ INSERT INTO events (game_id, sequence_num, event_type, player_id, event_data) VALUES ($1, $2, $3, $4, $5) RETURNING id """, event.game_id, event.sequence_num, event.event_type.value, event.player_id, json.dumps(event.data), ) ids.append(row["id"]) except asyncpg.UniqueViolationError: raise ConcurrencyError( f"Event {event.sequence_num} already exists for game {event.game_id}" ) return ids # ------------------------------------------------------------------------- # Event Reads # ------------------------------------------------------------------------- async def get_events( self, game_id: str, from_sequence: int = 0, to_sequence: Optional[int] = None, ) -> list[GameEvent]: """ Get events for a game, optionally within a sequence range. Args: game_id: Game UUID. from_sequence: Start sequence (inclusive). to_sequence: End sequence (inclusive), or None for all. Returns: List of events in sequence order. """ async with self.pool.acquire() as conn: if to_sequence is not None: rows = await conn.fetch( """ SELECT event_type, game_id, sequence_num, player_id, event_data, created_at FROM events WHERE game_id = $1 AND sequence_num >= $2 AND sequence_num <= $3 ORDER BY sequence_num """, game_id, from_sequence, to_sequence, ) else: rows = await conn.fetch( """ SELECT event_type, game_id, sequence_num, player_id, event_data, created_at FROM events WHERE game_id = $1 AND sequence_num >= $2 ORDER BY sequence_num """, game_id, from_sequence, ) return [self._row_to_event(row) for row in rows] async def get_latest_sequence(self, game_id: str) -> int: """ Get the latest sequence number for a game. Args: game_id: Game UUID. Returns: Latest sequence number, or -1 if no events exist. """ async with self.pool.acquire() as conn: row = await conn.fetchrow( """ SELECT COALESCE(MAX(sequence_num), -1) as seq FROM events WHERE game_id = $1 """, game_id, ) return row["seq"] async def stream_events( self, game_id: str, from_sequence: int = 0, ) -> AsyncIterator[GameEvent]: """ Stream events for memory-efficient processing. Use this for replaying large games without loading all events into memory. Args: game_id: Game UUID. from_sequence: Start sequence (inclusive). Yields: Events in sequence order. """ async with self.pool.acquire() as conn: async with conn.transaction(): async for row in conn.cursor( """ SELECT event_type, game_id, sequence_num, player_id, event_data, created_at FROM events WHERE game_id = $1 AND sequence_num >= $2 ORDER BY sequence_num """, game_id, from_sequence, ): yield self._row_to_event(row) async def get_event_count(self, game_id: str) -> int: """ Get the total number of events for a game. Args: game_id: Game UUID. Returns: Event count. """ async with self.pool.acquire() as conn: row = await conn.fetchrow( "SELECT COUNT(*) as count FROM events WHERE game_id = $1", game_id, ) return row["count"] # ------------------------------------------------------------------------- # Game Metadata # ------------------------------------------------------------------------- async def create_game( self, game_id: str, room_code: str, host_id: str, options: Optional[dict] = None, ) -> None: """ Create a game metadata record. Args: game_id: Game UUID. room_code: 4-letter room code. host_id: Host player ID. options: GameOptions as dict. """ async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO games_v2 (id, room_code, host_id, options) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO NOTHING """, game_id, room_code, host_id, json.dumps(options) if options else None, ) async def update_game_started( self, game_id: str, num_players: int, num_rounds: int, player_ids: list[str], ) -> None: """ Update game metadata when game starts. Args: game_id: Game UUID. num_players: Number of players. num_rounds: Number of rounds. player_ids: List of player IDs. """ async with self.pool.acquire() as conn: await conn.execute( """ UPDATE games_v2 SET started_at = NOW(), num_players = $2, num_rounds = $3, player_ids = $4 WHERE id = $1 """, game_id, num_players, num_rounds, player_ids, ) async def update_game_completed( self, game_id: str, winner_id: Optional[str] = None, ) -> None: """ Update game metadata when game completes. Args: game_id: Game UUID. winner_id: ID of the winner. """ async with self.pool.acquire() as conn: await conn.execute( """ UPDATE games_v2 SET status = 'completed', completed_at = NOW(), winner_id = $2 WHERE id = $1 """, game_id, winner_id, ) async def get_active_games(self) -> list[dict]: """ Get all active games for recovery on server restart. Returns: List of active game metadata dicts. """ async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, room_code, status, created_at, started_at, num_players, num_rounds, options, host_id, player_ids FROM games_v2 WHERE status = 'active' ORDER BY created_at DESC """ ) return [dict(row) for row in rows] async def get_game(self, game_id: str) -> Optional[dict]: """ Get game metadata by ID. Args: game_id: Game UUID. Returns: Game metadata dict, or None if not found. """ async with self.pool.acquire() as conn: row = await conn.fetchrow( """ SELECT id, room_code, status, created_at, started_at, completed_at, num_players, num_rounds, options, winner_id, host_id, player_ids FROM games_v2 WHERE id = $1 """, game_id, ) return dict(row) if row else None # ------------------------------------------------------------------------- # Move Logging (for AI decision analysis) # ------------------------------------------------------------------------- async def append_move( self, game_id: str, player_id: str, player_name: str, is_cpu: bool, action: str, card_rank: Optional[str] = None, card_suit: Optional[str] = None, position: Optional[int] = None, hand_state: Optional[list] = None, discard_top: Optional[dict] = None, visible_opponents: Optional[dict] = None, decision_reason: Optional[str] = None, ) -> int: """ Append a move to the moves table for AI decision analysis. Args: game_id: Game UUID. player_id: Player who made the move. player_name: Display name of the player. is_cpu: Whether this is a CPU player. action: Action type (draw_deck, take_discard, swap, discard, flip, etc.). card_rank: Rank of the card involved. card_suit: Suit of the card involved. position: Hand position (0-5) for swaps/flips. hand_state: Player's hand at decision time. discard_top: Top of discard pile at decision time. visible_opponents: Face-up cards of opponents. decision_reason: AI reasoning for the decision. Returns: The database ID of the inserted move. """ async with self.pool.acquire() as conn: # Get next sequence number for this game seq_row = await conn.fetchrow( "SELECT COALESCE(MAX(sequence_num), 0) + 1 as seq FROM moves WHERE game_id = $1", game_id, ) sequence_num = seq_row["seq"] row = await conn.fetchrow( """ INSERT INTO moves ( game_id, sequence_num, player_id, player_name, is_cpu, action, card_rank, card_suit, position, hand_state, discard_top, visible_opponents, decision_reason ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id """, game_id, sequence_num, player_id, player_name, is_cpu, action, card_rank, card_suit, position, json.dumps(hand_state) if hand_state else None, json.dumps(discard_top) if discard_top else None, json.dumps(visible_opponents) if visible_opponents else None, decision_reason, ) return row["id"] async def get_moves( self, game_id: str, player_id: Optional[str] = None, is_cpu: Optional[bool] = None, action: Optional[str] = None, limit: int = 100, ) -> list[dict]: """ Get moves for a game with optional filters. Args: game_id: Game UUID. player_id: Filter by player ID. is_cpu: Filter by CPU status. action: Filter by action type. limit: Maximum number of moves to return. Returns: List of move dicts. """ conditions = ["game_id = $1"] params = [game_id] param_idx = 2 if player_id is not None: conditions.append(f"player_id = ${param_idx}") params.append(player_id) param_idx += 1 if is_cpu is not None: conditions.append(f"is_cpu = ${param_idx}") params.append(is_cpu) param_idx += 1 if action is not None: conditions.append(f"action = ${param_idx}") params.append(action) param_idx += 1 params.append(limit) where_clause = " AND ".join(conditions) async with self.pool.acquire() as conn: rows = await conn.fetch( f""" SELECT id, game_id, sequence_num, timestamp, player_id, player_name, is_cpu, action, card_rank, card_suit, position, hand_state, discard_top, visible_opponents, decision_reason FROM moves WHERE {where_clause} ORDER BY sequence_num LIMIT ${param_idx} """, *params, ) return [self._row_to_move(row) for row in rows] async def get_player_decisions( self, game_id: str, player_name: str, ) -> list[dict]: """ Get all decisions made by a specific player in a game. Args: game_id: Game UUID. player_name: Display name of the player. Returns: List of move dicts. """ async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, game_id, sequence_num, timestamp, player_id, player_name, is_cpu, action, card_rank, card_suit, position, hand_state, discard_top, visible_opponents, decision_reason FROM moves WHERE game_id = $1 AND player_name = $2 ORDER BY sequence_num """, game_id, player_name, ) return [self._row_to_move(row) for row in rows] async def find_suspicious_discards(self, limit: int = 50) -> list[dict]: """ Find cases where CPU discarded good cards (Ace, 2, King). Used for AI decision quality analysis. Args: limit: Maximum number of results. Returns: List of suspicious move dicts with game room_code. """ async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT m.*, g.room_code FROM moves m JOIN games_v2 g ON m.game_id = g.id WHERE m.action = 'discard' AND m.card_rank IN ('A', '2', 'K') AND m.is_cpu = TRUE ORDER BY m.timestamp DESC LIMIT $1 """, limit, ) return [self._row_to_move(row) for row in rows] async def get_recent_games_with_stats(self, limit: int = 10) -> list[dict]: """ Get recent games with move counts. Args: limit: Maximum number of games. Returns: List of game dicts with total_moves count. """ async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT g.*, COUNT(m.id) as total_moves FROM games_v2 g LEFT JOIN moves m ON g.id = m.game_id GROUP BY g.id ORDER BY g.created_at DESC LIMIT $1 """, limit, ) return [dict(row) for row in rows] def _row_to_move(self, row: asyncpg.Record) -> dict: """Convert a database row to a move dict.""" result = dict(row) # Parse JSON fields if result.get("hand_state"): result["hand_state"] = json.loads(result["hand_state"]) if result.get("discard_top"): result["discard_top"] = json.loads(result["discard_top"]) if result.get("visible_opponents"): result["visible_opponents"] = json.loads(result["visible_opponents"]) return result # ------------------------------------------------------------------------- # Helpers # ------------------------------------------------------------------------- def _row_to_event(self, row: asyncpg.Record) -> GameEvent: """Convert a database row to a GameEvent.""" return GameEvent( event_type=EventType(row["event_type"]), game_id=str(row["game_id"]), sequence_num=row["sequence_num"], player_id=row["player_id"], data=json.loads(row["event_data"]) if row["event_data"] else {}, timestamp=row["created_at"].replace(tzinfo=timezone.utc), ) # Global event store instance (initialized on first use) _event_store: Optional[EventStore] = None async def get_event_store(postgres_url: str) -> EventStore: """ Get or create the global event store instance. Args: postgres_url: PostgreSQL connection URL. Returns: EventStore instance. """ global _event_store if _event_store is None: _event_store = await EventStore.create(postgres_url) return _event_store async def close_event_store() -> None: """Close the global event store connection pool.""" global _event_store if _event_store is not None: await _event_store.close() _event_store = None