golfgame/docs/v2/V2_01_EVENT_SOURCING.md
Aaron D. Lee bea85e6b28 Huge v2 uplift, now deployable with real user management and tooling!
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 11:32:15 -05:00

26 KiB

V2-01: Event Sourcing Infrastructure

Overview

This document covers the foundational event sourcing system. All game actions will be stored as immutable events, enabling replay, audit trails, and stats aggregation.

Dependencies: None (this is the foundation) Dependents: All other V2 documents


Goals

  1. Define event classes for all game actions
  2. Create PostgreSQL event store
  3. Implement dual-write (events + current mutations)
  4. Build state rebuilder from events
  5. Validate that event replay produces identical state

Current State

The game currently uses direct mutation:

# Current approach in game.py
def draw_card(self, player_id: str, source: str) -> Optional[Card]:
    card = self.deck.pop() if source == "deck" else self.discard.pop()
    self.drawn_card = card
    self.phase = GamePhase.PLAY
    return card

Move logging exists in game_logger.py but stores denormalized state snapshots, not replayable events.


Event Design

Base Event Class

# server/models/events.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Any
from enum import Enum
import uuid

class EventType(str, Enum):
    # Lifecycle
    GAME_CREATED = "game_created"
    PLAYER_JOINED = "player_joined"
    PLAYER_LEFT = "player_left"
    GAME_STARTED = "game_started"
    ROUND_STARTED = "round_started"
    ROUND_ENDED = "round_ended"
    GAME_ENDED = "game_ended"

    # Gameplay
    INITIAL_FLIP = "initial_flip"
    CARD_DRAWN = "card_drawn"
    CARD_SWAPPED = "card_swapped"
    CARD_DISCARDED = "card_discarded"
    CARD_FLIPPED = "card_flipped"
    FLIP_SKIPPED = "flip_skipped"
    FLIP_AS_ACTION = "flip_as_action"
    KNOCK_EARLY = "knock_early"


@dataclass
class GameEvent:
    """Base class for all game events."""
    event_type: EventType
    game_id: str
    sequence_num: int
    timestamp: datetime = field(default_factory=datetime.utcnow)
    player_id: Optional[str] = None
    data: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        return {
            "event_type": self.event_type.value,
            "game_id": self.game_id,
            "sequence_num": self.sequence_num,
            "timestamp": self.timestamp.isoformat(),
            "player_id": self.player_id,
            "data": self.data,
        }

    @classmethod
    def from_dict(cls, d: dict) -> "GameEvent":
        return cls(
            event_type=EventType(d["event_type"]),
            game_id=d["game_id"],
            sequence_num=d["sequence_num"],
            timestamp=datetime.fromisoformat(d["timestamp"]),
            player_id=d.get("player_id"),
            data=d.get("data", {}),
        )

Lifecycle Events

# Lifecycle event data structures

@dataclass
class GameCreatedData:
    room_code: str
    host_id: str
    options: dict  # GameOptions as dict

@dataclass
class PlayerJoinedData:
    player_name: str
    is_cpu: bool
    cpu_profile: Optional[str] = None

@dataclass
class GameStartedData:
    deck_seed: int  # For deterministic replay
    player_order: list[str]  # Player IDs in turn order
    num_decks: int
    num_rounds: int
    dealt_cards: dict[str, list[dict]]  # player_id -> cards dealt

@dataclass
class RoundStartedData:
    round_num: int
    deck_seed: int
    dealt_cards: dict[str, list[dict]]

@dataclass
class RoundEndedData:
    scores: dict[str, int]  # player_id -> score
    winner_id: Optional[str]
    final_hands: dict[str, list[dict]]  # For verification

@dataclass
class GameEndedData:
    final_scores: dict[str, int]  # player_id -> total score
    winner_id: str
    rounds_won: dict[str, int]

Gameplay Events

# Gameplay event data structures

@dataclass
class InitialFlipData:
    positions: list[int]
    cards: list[dict]  # The cards revealed

@dataclass
class CardDrawnData:
    source: str  # "deck" or "discard"
    card: dict  # Card drawn

@dataclass
class CardSwappedData:
    position: int
    new_card: dict  # Card placed (was drawn)
    old_card: dict  # Card removed (goes to discard)

@dataclass
class CardDiscardedData:
    card: dict  # Card discarded

@dataclass
class CardFlippedData:
    position: int
    card: dict  # Card revealed

@dataclass
class FlipAsActionData:
    position: int
    card: dict  # Card revealed

@dataclass
class KnockEarlyData:
    positions: list[int]  # Positions flipped
    cards: list[dict]  # Cards revealed

Event Store Schema

-- migrations/versions/001_create_events.sql

-- Events table (append-only log)
CREATE TABLE events (
    id BIGSERIAL PRIMARY KEY,
    game_id UUID NOT NULL,
    sequence_num INT NOT NULL,
    event_type VARCHAR(50) NOT NULL,
    player_id VARCHAR(50),
    event_data JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),

    -- Ensure events are ordered and unique per game
    UNIQUE(game_id, sequence_num)
);

-- Games metadata (for queries, not source of truth)
CREATE TABLE games_v2 (
    id UUID PRIMARY KEY,
    room_code VARCHAR(10) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',  -- active, completed, abandoned
    created_at TIMESTAMPTZ DEFAULT NOW(),
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    num_players INT,
    num_rounds INT,
    options JSONB,
    winner_id VARCHAR(50),
    host_id VARCHAR(50),

    -- Denormalized for efficient queries
    player_ids VARCHAR(50)[] DEFAULT '{}'
);

-- Indexes
CREATE INDEX idx_events_game_seq ON events(game_id, sequence_num);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_player ON events(player_id) WHERE player_id IS NOT NULL;
CREATE INDEX idx_events_created ON events(created_at);

CREATE INDEX idx_games_status ON games_v2(status);
CREATE INDEX idx_games_room ON games_v2(room_code) WHERE status = 'active';
CREATE INDEX idx_games_players ON games_v2 USING GIN(player_ids);
CREATE INDEX idx_games_completed ON games_v2(completed_at) WHERE status = 'completed';

Event Store Implementation

# server/stores/event_store.py
from typing import Optional, AsyncIterator
from datetime import datetime
import asyncpg
import json

from models.events import GameEvent, EventType


class EventStore:
    """PostgreSQL-backed event store."""

    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def append(self, event: GameEvent) -> int:
        """
        Append an event to the store.
        Returns the event ID.
        Raises if sequence_num already exists (optimistic concurrency).
        """
        async with self.pool.acquire() as conn:
            try:
                row = await conn.fetchrow("""
                    INSERT INTO events (game_id, sequence_num, event_type, player_id, event_data)
                    VALUES ($1, $2, $3, $4, $5)
                    RETURNING id
                """,
                    event.game_id,
                    event.sequence_num,
                    event.event_type.value,
                    event.player_id,
                    json.dumps(event.data),
                )
                return row["id"]
            except asyncpg.UniqueViolationError:
                raise ConcurrencyError(
                    f"Event {event.sequence_num} already exists for game {event.game_id}"
                )

    async def append_batch(self, events: list[GameEvent]) -> list[int]:
        """Append multiple events atomically."""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                ids = []
                for event in events:
                    row = await conn.fetchrow("""
                        INSERT INTO events (game_id, sequence_num, event_type, player_id, event_data)
                        VALUES ($1, $2, $3, $4, $5)
                        RETURNING id
                    """,
                        event.game_id,
                        event.sequence_num,
                        event.event_type.value,
                        event.player_id,
                        json.dumps(event.data),
                    )
                    ids.append(row["id"])
                return ids

    async def get_events(
        self,
        game_id: str,
        from_sequence: int = 0,
        to_sequence: Optional[int] = None,
    ) -> list[GameEvent]:
        """Get events for a game, optionally within a sequence range."""
        async with self.pool.acquire() as conn:
            if to_sequence is not None:
                rows = await conn.fetch("""
                    SELECT event_type, game_id, sequence_num, player_id, event_data, created_at
                    FROM events
                    WHERE game_id = $1 AND sequence_num >= $2 AND sequence_num <= $3
                    ORDER BY sequence_num
                """, game_id, from_sequence, to_sequence)
            else:
                rows = await conn.fetch("""
                    SELECT event_type, game_id, sequence_num, player_id, event_data, created_at
                    FROM events
                    WHERE game_id = $1 AND sequence_num >= $2
                    ORDER BY sequence_num
                """, game_id, from_sequence)

            return [
                GameEvent(
                    event_type=EventType(row["event_type"]),
                    game_id=row["game_id"],
                    sequence_num=row["sequence_num"],
                    player_id=row["player_id"],
                    data=json.loads(row["event_data"]),
                    timestamp=row["created_at"],
                )
                for row in rows
            ]

    async def get_latest_sequence(self, game_id: str) -> int:
        """Get the latest sequence number for a game."""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow("""
                SELECT COALESCE(MAX(sequence_num), -1) as seq
                FROM events
                WHERE game_id = $1
            """, game_id)
            return row["seq"]

    async def stream_events(
        self,
        game_id: str,
        from_sequence: int = 0,
    ) -> AsyncIterator[GameEvent]:
        """Stream events for memory-efficient processing."""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                async for row in conn.cursor("""
                    SELECT event_type, game_id, sequence_num, player_id, event_data, created_at
                    FROM events
                    WHERE game_id = $1 AND sequence_num >= $2
                    ORDER BY sequence_num
                """, game_id, from_sequence):
                    yield GameEvent(
                        event_type=EventType(row["event_type"]),
                        game_id=row["game_id"],
                        sequence_num=row["sequence_num"],
                        player_id=row["player_id"],
                        data=json.loads(row["event_data"]),
                        timestamp=row["created_at"],
                    )


class ConcurrencyError(Exception):
    """Raised when optimistic concurrency check fails."""
    pass

State Rebuilder

# server/models/game_state.py
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

from models.events import GameEvent, EventType


class GamePhase(str, Enum):
    WAITING = "waiting"
    INITIAL_FLIP = "initial_flip"
    PLAYING = "playing"
    FINAL_TURN = "final_turn"
    ROUND_OVER = "round_over"
    GAME_OVER = "game_over"


@dataclass
class Card:
    rank: str
    suit: str
    face_up: bool = False

    def to_dict(self) -> dict:
        return {"rank": self.rank, "suit": self.suit, "face_up": self.face_up}

    @classmethod
    def from_dict(cls, d: dict) -> "Card":
        return cls(rank=d["rank"], suit=d["suit"], face_up=d.get("face_up", False))


@dataclass
class PlayerState:
    id: str
    name: str
    cards: list[Card] = field(default_factory=list)
    score: Optional[int] = None
    total_score: int = 0
    rounds_won: int = 0
    is_cpu: bool = False
    cpu_profile: Optional[str] = None


@dataclass
class RebuiltGameState:
    """Game state rebuilt from events."""
    game_id: str
    room_code: str = ""
    phase: GamePhase = GamePhase.WAITING
    players: dict[str, PlayerState] = field(default_factory=dict)
    player_order: list[str] = field(default_factory=list)
    current_player_idx: int = 0

    deck: list[Card] = field(default_factory=list)
    discard: list[Card] = field(default_factory=list)
    drawn_card: Optional[Card] = None

    current_round: int = 0
    total_rounds: int = 9
    options: dict = field(default_factory=dict)

    sequence_num: int = 0
    finisher_id: Optional[str] = None

    def apply(self, event: GameEvent) -> "RebuiltGameState":
        """
        Apply an event to produce new state.
        Returns self for chaining.
        """
        assert event.sequence_num == self.sequence_num + 1 or self.sequence_num == 0, \
            f"Expected sequence {self.sequence_num + 1}, got {event.sequence_num}"

        handler = getattr(self, f"_apply_{event.event_type.value}", None)
        if handler:
            handler(event)
        else:
            raise ValueError(f"Unknown event type: {event.event_type}")

        self.sequence_num = event.sequence_num
        return self

    def _apply_game_created(self, event: GameEvent):
        self.room_code = event.data["room_code"]
        self.options = event.data.get("options", {})
        self.players[event.data["host_id"]] = PlayerState(
            id=event.data["host_id"],
            name="Host",  # Will be updated by player_joined
        )

    def _apply_player_joined(self, event: GameEvent):
        self.players[event.player_id] = PlayerState(
            id=event.player_id,
            name=event.data["player_name"],
            is_cpu=event.data.get("is_cpu", False),
            cpu_profile=event.data.get("cpu_profile"),
        )

    def _apply_player_left(self, event: GameEvent):
        if event.player_id in self.players:
            del self.players[event.player_id]
        if event.player_id in self.player_order:
            self.player_order.remove(event.player_id)

    def _apply_game_started(self, event: GameEvent):
        self.player_order = event.data["player_order"]
        self.total_rounds = event.data["num_rounds"]
        self.current_round = 1
        self.phase = GamePhase.INITIAL_FLIP

        # Deal cards
        for player_id, cards_data in event.data["dealt_cards"].items():
            if player_id in self.players:
                self.players[player_id].cards = [
                    Card.from_dict(c) for c in cards_data
                ]

        # Rebuild deck from seed would go here for full determinism
        # For now, we trust the dealt_cards data

    def _apply_round_started(self, event: GameEvent):
        self.current_round = event.data["round_num"]
        self.phase = GamePhase.INITIAL_FLIP
        self.finisher_id = None
        self.drawn_card = None

        for player_id, cards_data in event.data["dealt_cards"].items():
            if player_id in self.players:
                self.players[player_id].cards = [
                    Card.from_dict(c) for c in cards_data
                ]
                self.players[player_id].score = None

    def _apply_initial_flip(self, event: GameEvent):
        player = self.players.get(event.player_id)
        if player:
            for pos, card_data in zip(event.data["positions"], event.data["cards"]):
                if 0 <= pos < len(player.cards):
                    player.cards[pos] = Card.from_dict(card_data)
                    player.cards[pos].face_up = True

        # Check if all players have flipped
        required = self.options.get("initial_flips", 2)
        all_flipped = all(
            sum(1 for c in p.cards if c.face_up) >= required
            for p in self.players.values()
        )
        if all_flipped and required > 0:
            self.phase = GamePhase.PLAYING

    def _apply_card_drawn(self, event: GameEvent):
        card = Card.from_dict(event.data["card"])
        card.face_up = True
        self.drawn_card = card

        if event.data["source"] == "discard" and self.discard:
            self.discard.pop()

    def _apply_card_swapped(self, event: GameEvent):
        player = self.players.get(event.player_id)
        if player and self.drawn_card:
            pos = event.data["position"]
            old_card = player.cards[pos]

            new_card = Card.from_dict(event.data["new_card"])
            new_card.face_up = True
            player.cards[pos] = new_card

            old_card.face_up = True
            self.discard.append(old_card)
            self.drawn_card = None

            self._advance_turn(player)

    def _apply_card_discarded(self, event: GameEvent):
        if self.drawn_card:
            self.discard.append(self.drawn_card)
            self.drawn_card = None

        player = self.players.get(event.player_id)
        if player:
            self._advance_turn(player)

    def _apply_card_flipped(self, event: GameEvent):
        player = self.players.get(event.player_id)
        if player:
            pos = event.data["position"]
            card = Card.from_dict(event.data["card"])
            card.face_up = True
            player.cards[pos] = card

            self._advance_turn(player)

    def _apply_flip_skipped(self, event: GameEvent):
        player = self.players.get(event.player_id)
        if player:
            self._advance_turn(player)

    def _apply_flip_as_action(self, event: GameEvent):
        player = self.players.get(event.player_id)
        if player:
            pos = event.data["position"]
            card = Card.from_dict(event.data["card"])
            card.face_up = True
            player.cards[pos] = card

            self._advance_turn(player)

    def _apply_knock_early(self, event: GameEvent):
        player = self.players.get(event.player_id)
        if player:
            for pos, card_data in zip(event.data["positions"], event.data["cards"]):
                card = Card.from_dict(card_data)
                card.face_up = True
                player.cards[pos] = card

            self._check_all_face_up(player)
            self._advance_turn(player)

    def _apply_round_ended(self, event: GameEvent):
        self.phase = GamePhase.ROUND_OVER
        for player_id, score in event.data["scores"].items():
            if player_id in self.players:
                self.players[player_id].score = score
                self.players[player_id].total_score += score

        winner_id = event.data.get("winner_id")
        if winner_id and winner_id in self.players:
            self.players[winner_id].rounds_won += 1

    def _apply_game_ended(self, event: GameEvent):
        self.phase = GamePhase.GAME_OVER

    def _advance_turn(self, player: PlayerState):
        """Advance to next player's turn."""
        self._check_all_face_up(player)

        if self.phase == GamePhase.ROUND_OVER:
            return

        self.current_player_idx = (self.current_player_idx + 1) % len(self.player_order)

        # Check if we've come back to finisher
        if self.finisher_id:
            current_id = self.player_order[self.current_player_idx]
            if current_id == self.finisher_id:
                self.phase = GamePhase.ROUND_OVER

    def _check_all_face_up(self, player: PlayerState):
        """Check if player has all cards face up (triggers final turn)."""
        if all(c.face_up for c in player.cards):
            if self.phase == GamePhase.PLAYING and not self.finisher_id:
                self.finisher_id = player.id
                self.phase = GamePhase.FINAL_TURN

    @property
    def current_player_id(self) -> Optional[str]:
        if self.player_order and 0 <= self.current_player_idx < len(self.player_order):
            return self.player_order[self.current_player_idx]
        return None


def rebuild_state(events: list[GameEvent]) -> RebuiltGameState:
    """Rebuild game state from a list of events."""
    if not events:
        raise ValueError("Cannot rebuild state from empty event list")

    state = RebuiltGameState(game_id=events[0].game_id)
    for event in events:
        state.apply(event)

    return state

Dual-Write Integration

Modify existing game.py to emit events alongside mutations:

# server/game.py additions

class Game:
    def __init__(self):
        # ... existing init ...
        self._event_emitter: Optional[Callable[[GameEvent], None]] = None
        self._sequence_num = 0

    def set_event_emitter(self, emitter: Callable[[GameEvent], None]):
        """Set callback for event emission."""
        self._event_emitter = emitter

    def _emit(self, event_type: EventType, player_id: Optional[str] = None, **data):
        """Emit an event if emitter is configured."""
        if self._event_emitter:
            self._sequence_num += 1
            event = GameEvent(
                event_type=event_type,
                game_id=self.game_id,
                sequence_num=self._sequence_num,
                player_id=player_id,
                data=data,
            )
            self._event_emitter(event)

    # Example: modify draw_card
    def draw_card(self, player_id: str, source: str) -> Optional[Card]:
        # ... existing validation ...

        if source == "deck":
            card = self.deck.pop()
        else:
            card = self.discard_pile.pop()

        self.drawn_card = card

        # NEW: Emit event
        self._emit(
            EventType.CARD_DRAWN,
            player_id=player_id,
            source=source,
            card=card.to_dict(),
        )

        return card

Validation Test

# server/tests/test_event_replay.py
import pytest
from game import Game, GameOptions
from models.events import GameEvent, rebuild_state


class TestEventReplay:
    """Verify that event replay produces identical state."""

    def test_full_game_replay(self):
        """Play a complete game and verify replay matches."""
        events = []

        def collect_events(event: GameEvent):
            events.append(event)

        # Play a real game
        game = Game()
        game.set_event_emitter(collect_events)

        game.add_player("p1", "Alice")
        game.add_player("p2", "Bob")
        game.start_game(num_decks=1, num_rounds=1, options=GameOptions())

        # Play through initial flips
        game.flip_initial_cards("p1", [0, 1])
        game.flip_initial_cards("p2", [0, 1])

        # Play some turns
        while game.phase not in (GamePhase.ROUND_OVER, GamePhase.GAME_OVER):
            current = game.current_player()
            if not current:
                break

            # Simple bot: always draw from deck and discard
            game.draw_card(current.id, "deck")
            game.discard_drawn(current.id)

            if len(events) > 100:  # Safety limit
                break

        # Get final state
        final_state = game.get_state("p1")

        # Rebuild from events
        rebuilt = rebuild_state(events)

        # Verify key state matches
        assert rebuilt.phase == game.phase
        assert rebuilt.current_round == game.current_round
        assert len(rebuilt.players) == len(game.players)

        for player_id, player in rebuilt.players.items():
            original = game.get_player(player_id)
            assert player.score == original.score
            assert player.total_score == original.total_score
            assert len(player.cards) == len(original.cards)

            for i, card in enumerate(player.cards):
                orig_card = original.cards[i]
                assert card.rank == orig_card.rank
                assert card.suit == orig_card.suit
                assert card.face_up == orig_card.face_up

    def test_partial_replay(self):
        """Verify we can replay to any point in the game."""
        events = []

        def collect_events(event: GameEvent):
            events.append(event)

        game = Game()
        game.set_event_emitter(collect_events)

        # ... setup and play ...

        # Replay only first N events
        for n in range(1, len(events) + 1):
            partial = rebuild_state(events[:n])
            assert partial.sequence_num == n

    def test_event_order_enforced(self):
        """Verify events must be applied in order."""
        events = []

        # ... collect some events ...

        state = RebuiltGameState(game_id="test")

        # Skip an event - should fail
        with pytest.raises(AssertionError):
            state.apply(events[1])  # Skipping events[0]

Acceptance Criteria

  1. Event Classes Complete

    • All lifecycle events defined (created, joined, left, started, ended)
    • All gameplay events defined (draw, swap, discard, flip, etc.)
    • Events are serializable to/from JSON
    • Events include all data needed for replay
  2. Event Store Working

    • PostgreSQL schema created via migration
    • Can append single events
    • Can append batches atomically
    • Can retrieve events by game_id
    • Can retrieve events by sequence range
    • Concurrent writes to same sequence fail cleanly
  3. State Rebuilder Working

    • Can rebuild state from any event sequence
    • Handles all event types
    • Enforces event ordering
    • Matches original game state exactly
  4. Dual-Write Enabled

    • Game class has event emitter hook
    • All state-changing methods emit events
    • Events don't affect existing game behavior
    • Can be enabled/disabled via config
  5. Validation Tests Pass

    • Full game replay test
    • Partial replay test
    • Event order enforcement test
    • At least 95% of games replay correctly

Implementation Order

  1. Create event dataclasses (models/events.py)
  2. Create database migration for events table
  3. Implement EventStore class
  4. Implement RebuiltGameState class
  5. Add event emitter to Game class
  6. Add _emit() calls to all game methods
  7. Write validation tests
  8. Run tests until 100% pass

Notes for Agent

  • The existing game.py has good test coverage - don't break existing tests
  • Start with lifecycle events, then gameplay events
  • The deck seed is important for deterministic replay
  • Consider edge cases: player disconnects, CPU players, house rules
  • Events should be immutable - never modify after creation