golfgame/docs/v2/V2_02_PERSISTENCE.md
Aaron D. Lee bea85e6b28 Huge v2 uplift, now deployable with real user management and tooling!
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 11:32:15 -05:00

26 KiB

V2-02: Persistence & Recovery

Overview

This document covers the live state caching and game recovery system. Games will survive server restarts by storing live state in Redis and rebuilding from events.

Dependencies: V2-01 (Event Sourcing) Dependents: V2-03 (User Accounts), V2-06 (Replay)


Goals

  1. Cache live game state in Redis
  2. Implement Redis pub/sub for multi-server support
  3. Enable game recovery from events on server restart
  4. Implement graceful shutdown with state preservation

Current State

Games are stored in-memory in main.py:

# Current approach
rooms: dict[str, Room] = {}  # Lost on restart!

On server restart, all active games are lost.


Architecture

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   FastAPI #1    │     │   FastAPI #2    │     │   FastAPI #N    │
│   (WebSocket)   │     │   (WebSocket)   │     │   (WebSocket)   │
└────────┬────────┘     └────────┬────────┘     └────────┬────────┘
         │                       │                       │
         └───────────────────────┼───────────────────────┘
                                 │
                    ┌────────────▼────────────┐
                    │         Redis           │
                    │  ┌─────────────────┐   │
                    │  │  State Cache    │   │  <- Live game state
                    │  │  (Hash/JSON)    │   │
                    │  └─────────────────┘   │
                    │  ┌─────────────────┐   │
                    │  │    Pub/Sub      │   │  <- Cross-server events
                    │  │   (Channels)    │   │
                    │  └─────────────────┘   │
                    │  ┌─────────────────┐   │
                    │  │   Room Index    │   │  <- Active room codes
                    │  │    (Set)        │   │
                    │  └─────────────────┘   │
                    └─────────────────────────┘
                                 │
                    ┌────────────▼────────────┐
                    │      PostgreSQL         │
                    │    (Event Store)        │  <- Source of truth
                    └─────────────────────────┘

Redis Data Model

Key Patterns

golf:room:{room_code}          -> Hash (room metadata)
golf:game:{game_id}            -> JSON (full game state)
golf:room:{room_code}:players  -> Set (connected player IDs)
golf:rooms:active              -> Set (active room codes)
golf:player:{player_id}:room   -> String (player's current room)

Room Metadata Hash

golf:room:ABCD
├── game_id: "uuid-..."
├── host_id: "player-uuid"
├── created_at: "2024-01-15T10:30:00Z"
├── status: "waiting" | "playing" | "finished"
└── server_id: "server-1"  # Which server owns this room

Game State JSON

{
  "game_id": "uuid-...",
  "room_code": "ABCD",
  "phase": "playing",
  "current_round": 3,
  "total_rounds": 9,
  "current_player_idx": 1,
  "player_order": ["p1", "p2", "p3"],
  "players": {
    "p1": {
      "id": "p1",
      "name": "Alice",
      "cards": [{"rank": "K", "suit": "hearts", "face_up": true}, ...],
      "score": null,
      "total_score": 15,
      "rounds_won": 1,
      "is_cpu": false
    }
  },
  "deck_count": 32,
  "discard_top": {"rank": "7", "suit": "clubs"},
  "drawn_card": null,
  "options": {...},
  "sequence_num": 47
}

State Cache Implementation

# server/stores/state_cache.py
import json
from typing import Optional
from datetime import timedelta
import redis.asyncio as redis

from models.game_state import RebuiltGameState


class StateCache:
    """Redis-backed live game state cache."""

    # Key patterns
    ROOM_KEY = "golf:room:{room_code}"
    GAME_KEY = "golf:game:{game_id}"
    ROOM_PLAYERS_KEY = "golf:room:{room_code}:players"
    ACTIVE_ROOMS_KEY = "golf:rooms:active"
    PLAYER_ROOM_KEY = "golf:player:{player_id}:room"

    # TTLs
    ROOM_TTL = timedelta(hours=4)  # Inactive rooms expire
    GAME_TTL = timedelta(hours=4)

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    # --- Room Operations ---

    async def create_room(
        self,
        room_code: str,
        game_id: str,
        host_id: str,
        server_id: str,
    ) -> None:
        """Create a new room."""
        pipe = self.redis.pipeline()

        # Room metadata
        pipe.hset(
            self.ROOM_KEY.format(room_code=room_code),
            mapping={
                "game_id": game_id,
                "host_id": host_id,
                "status": "waiting",
                "server_id": server_id,
                "created_at": datetime.utcnow().isoformat(),
            },
        )
        pipe.expire(self.ROOM_KEY.format(room_code=room_code), self.ROOM_TTL)

        # Add to active rooms
        pipe.sadd(self.ACTIVE_ROOMS_KEY, room_code)

        # Track host's room
        pipe.set(
            self.PLAYER_ROOM_KEY.format(player_id=host_id),
            room_code,
            ex=self.ROOM_TTL,
        )

        await pipe.execute()

    async def get_room(self, room_code: str) -> Optional[dict]:
        """Get room metadata."""
        data = await self.redis.hgetall(self.ROOM_KEY.format(room_code=room_code))
        if not data:
            return None
        return {k.decode(): v.decode() for k, v in data.items()}

    async def room_exists(self, room_code: str) -> bool:
        """Check if room exists."""
        return await self.redis.exists(self.ROOM_KEY.format(room_code=room_code)) > 0

    async def delete_room(self, room_code: str) -> None:
        """Delete a room and all associated data."""
        room = await self.get_room(room_code)
        if not room:
            return

        pipe = self.redis.pipeline()

        # Get players to clean up their mappings
        players = await self.redis.smembers(
            self.ROOM_PLAYERS_KEY.format(room_code=room_code)
        )
        for player_id in players:
            pipe.delete(self.PLAYER_ROOM_KEY.format(player_id=player_id.decode()))

        # Delete room data
        pipe.delete(self.ROOM_KEY.format(room_code=room_code))
        pipe.delete(self.ROOM_PLAYERS_KEY.format(room_code=room_code))
        pipe.srem(self.ACTIVE_ROOMS_KEY, room_code)

        # Delete game state if exists
        if "game_id" in room:
            pipe.delete(self.GAME_KEY.format(game_id=room["game_id"]))

        await pipe.execute()

    async def get_active_rooms(self) -> set[str]:
        """Get all active room codes."""
        rooms = await self.redis.smembers(self.ACTIVE_ROOMS_KEY)
        return {r.decode() for r in rooms}

    # --- Player Operations ---

    async def add_player_to_room(self, room_code: str, player_id: str) -> None:
        """Add a player to a room."""
        pipe = self.redis.pipeline()
        pipe.sadd(self.ROOM_PLAYERS_KEY.format(room_code=room_code), player_id)
        pipe.set(
            self.PLAYER_ROOM_KEY.format(player_id=player_id),
            room_code,
            ex=self.ROOM_TTL,
        )
        # Refresh room TTL on activity
        pipe.expire(self.ROOM_KEY.format(room_code=room_code), self.ROOM_TTL)
        await pipe.execute()

    async def remove_player_from_room(self, room_code: str, player_id: str) -> None:
        """Remove a player from a room."""
        pipe = self.redis.pipeline()
        pipe.srem(self.ROOM_PLAYERS_KEY.format(room_code=room_code), player_id)
        pipe.delete(self.PLAYER_ROOM_KEY.format(player_id=player_id))
        await pipe.execute()

    async def get_room_players(self, room_code: str) -> set[str]:
        """Get player IDs in a room."""
        players = await self.redis.smembers(
            self.ROOM_PLAYERS_KEY.format(room_code=room_code)
        )
        return {p.decode() for p in players}

    async def get_player_room(self, player_id: str) -> Optional[str]:
        """Get the room a player is in."""
        room = await self.redis.get(self.PLAYER_ROOM_KEY.format(player_id=player_id))
        return room.decode() if room else None

    # --- Game State Operations ---

    async def save_game_state(self, game_id: str, state: dict) -> None:
        """Save full game state."""
        await self.redis.set(
            self.GAME_KEY.format(game_id=game_id),
            json.dumps(state),
            ex=self.GAME_TTL,
        )

    async def get_game_state(self, game_id: str) -> Optional[dict]:
        """Get full game state."""
        data = await self.redis.get(self.GAME_KEY.format(game_id=game_id))
        if not data:
            return None
        return json.loads(data)

    async def update_game_state(self, game_id: str, updates: dict) -> None:
        """Partial update to game state (get, merge, set)."""
        state = await self.get_game_state(game_id)
        if state:
            state.update(updates)
            await self.save_game_state(game_id, state)

    async def delete_game_state(self, game_id: str) -> None:
        """Delete game state."""
        await self.redis.delete(self.GAME_KEY.format(game_id=game_id))

    # --- Room Status ---

    async def set_room_status(self, room_code: str, status: str) -> None:
        """Update room status."""
        await self.redis.hset(
            self.ROOM_KEY.format(room_code=room_code),
            "status",
            status,
        )

    async def refresh_room_ttl(self, room_code: str) -> None:
        """Refresh room TTL on activity."""
        pipe = self.redis.pipeline()
        pipe.expire(self.ROOM_KEY.format(room_code=room_code), self.ROOM_TTL)

        room = await self.get_room(room_code)
        if room and "game_id" in room:
            pipe.expire(self.GAME_KEY.format(game_id=room["game_id"]), self.GAME_TTL)

        await pipe.execute()

Pub/Sub for Multi-Server

# server/stores/pubsub.py
import asyncio
import json
from typing import Callable, Awaitable
from dataclasses import dataclass
from enum import Enum
import redis.asyncio as redis


class MessageType(str, Enum):
    GAME_STATE_UPDATE = "game_state_update"
    PLAYER_JOINED = "player_joined"
    PLAYER_LEFT = "player_left"
    ROOM_CLOSED = "room_closed"
    BROADCAST = "broadcast"


@dataclass
class PubSubMessage:
    type: MessageType
    room_code: str
    data: dict

    def to_json(self) -> str:
        return json.dumps({
            "type": self.type.value,
            "room_code": self.room_code,
            "data": self.data,
        })

    @classmethod
    def from_json(cls, raw: str) -> "PubSubMessage":
        d = json.loads(raw)
        return cls(
            type=MessageType(d["type"]),
            room_code=d["room_code"],
            data=d["data"],
        )


class GamePubSub:
    """Redis pub/sub for cross-server game events."""

    CHANNEL_PREFIX = "golf:room:"

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.pubsub = redis_client.pubsub()
        self._handlers: dict[str, list[Callable[[PubSubMessage], Awaitable[None]]]] = {}
        self._running = False
        self._task: Optional[asyncio.Task] = None

    def _channel(self, room_code: str) -> str:
        return f"{self.CHANNEL_PREFIX}{room_code}"

    async def subscribe(
        self,
        room_code: str,
        handler: Callable[[PubSubMessage], Awaitable[None]],
    ) -> None:
        """Subscribe to room events."""
        channel = self._channel(room_code)
        if channel not in self._handlers:
            self._handlers[channel] = []
            await self.pubsub.subscribe(channel)
        self._handlers[channel].append(handler)

    async def unsubscribe(self, room_code: str) -> None:
        """Unsubscribe from room events."""
        channel = self._channel(room_code)
        if channel in self._handlers:
            del self._handlers[channel]
            await self.pubsub.unsubscribe(channel)

    async def publish(self, message: PubSubMessage) -> None:
        """Publish a message to a room's channel."""
        channel = self._channel(message.room_code)
        await self.redis.publish(channel, message.to_json())

    async def start(self) -> None:
        """Start listening for messages."""
        self._running = True
        self._task = asyncio.create_task(self._listen())

    async def stop(self) -> None:
        """Stop listening."""
        self._running = False
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
        await self.pubsub.close()

    async def _listen(self) -> None:
        """Main listener loop."""
        while self._running:
            try:
                message = await self.pubsub.get_message(
                    ignore_subscribe_messages=True,
                    timeout=1.0,
                )
                if message and message["type"] == "message":
                    channel = message["channel"].decode()
                    handlers = self._handlers.get(channel, [])

                    try:
                        msg = PubSubMessage.from_json(message["data"].decode())
                        for handler in handlers:
                            await handler(msg)
                    except Exception as e:
                        print(f"Error handling pubsub message: {e}")

            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"PubSub listener error: {e}")
                await asyncio.sleep(1)

Game Recovery

# server/services/recovery_service.py
from typing import Optional
import asyncio

from stores.event_store import EventStore
from stores.state_cache import StateCache
from models.events import rebuild_state, EventType


class RecoveryService:
    """Recovers games from event store on startup."""

    def __init__(self, event_store: EventStore, state_cache: StateCache):
        self.event_store = event_store
        self.state_cache = state_cache

    async def recover_all_games(self) -> dict[str, any]:
        """
        Recover all active games from event store.
        Returns dict of recovered games.
        """
        results = {
            "recovered": 0,
            "failed": 0,
            "skipped": 0,
            "games": [],
        }

        # Get active rooms from Redis (may be stale)
        active_rooms = await self.state_cache.get_active_rooms()

        for room_code in active_rooms:
            room = await self.state_cache.get_room(room_code)
            if not room:
                results["skipped"] += 1
                continue

            game_id = room.get("game_id")
            if not game_id:
                results["skipped"] += 1
                continue

            try:
                game = await self.recover_game(game_id)
                if game:
                    results["recovered"] += 1
                    results["games"].append({
                        "game_id": game_id,
                        "room_code": room_code,
                        "phase": game.phase.value,
                        "sequence": game.sequence_num,
                    })
                else:
                    results["skipped"] += 1
            except Exception as e:
                print(f"Failed to recover game {game_id}: {e}")
                results["failed"] += 1

        return results

    async def recover_game(self, game_id: str) -> Optional[any]:
        """
        Recover a single game from event store.
        Returns the rebuilt game state.
        """
        # Get all events for this game
        events = await self.event_store.get_events(game_id)

        if not events:
            return None

        # Check if game is actually active (not ended)
        last_event = events[-1]
        if last_event.event_type == EventType.GAME_ENDED:
            return None  # Game is finished, don't recover

        # Rebuild state
        state = rebuild_state(events)

        # Save to cache
        await self.state_cache.save_game_state(
            game_id,
            self._state_to_dict(state),
        )

        return state

    async def recover_from_sequence(
        self,
        game_id: str,
        cached_state: dict,
        cached_sequence: int,
    ) -> Optional[any]:
        """
        Recover game by applying only new events to cached state.
        More efficient than full rebuild.
        """
        # Get events after cached sequence
        new_events = await self.event_store.get_events(
            game_id,
            from_sequence=cached_sequence + 1,
        )

        if not new_events:
            return None  # No new events

        # Rebuild state from cache + new events
        state = self._dict_to_state(cached_state)
        for event in new_events:
            state.apply(event)

        # Update cache
        await self.state_cache.save_game_state(
            game_id,
            self._state_to_dict(state),
        )

        return state

    def _state_to_dict(self, state) -> dict:
        """Convert RebuiltGameState to dict for caching."""
        return {
            "game_id": state.game_id,
            "room_code": state.room_code,
            "phase": state.phase.value,
            "current_round": state.current_round,
            "total_rounds": state.total_rounds,
            "current_player_idx": state.current_player_idx,
            "player_order": state.player_order,
            "players": {
                pid: {
                    "id": p.id,
                    "name": p.name,
                    "cards": [c.to_dict() for c in p.cards],
                    "score": p.score,
                    "total_score": p.total_score,
                    "rounds_won": p.rounds_won,
                    "is_cpu": p.is_cpu,
                    "cpu_profile": p.cpu_profile,
                }
                for pid, p in state.players.items()
            },
            "deck_count": len(state.deck),
            "discard_top": state.discard[-1].to_dict() if state.discard else None,
            "drawn_card": state.drawn_card.to_dict() if state.drawn_card else None,
            "options": state.options,
            "sequence_num": state.sequence_num,
            "finisher_id": state.finisher_id,
        }

    def _dict_to_state(self, d: dict):
        """Convert dict back to RebuiltGameState."""
        # Implementation depends on RebuiltGameState structure
        pass

Graceful Shutdown

# server/main.py additions
import signal
import asyncio
from contextlib import asynccontextmanager

from stores.state_cache import StateCache
from stores.event_store import EventStore
from services.recovery_service import RecoveryService


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan handler."""
    # Startup
    print("Starting up...")

    # Initialize connections
    app.state.redis = await create_redis_pool()
    app.state.pg_pool = await create_pg_pool()

    app.state.state_cache = StateCache(app.state.redis)
    app.state.event_store = EventStore(app.state.pg_pool)
    app.state.recovery_service = RecoveryService(
        app.state.event_store,
        app.state.state_cache,
    )

    # Recover games
    print("Recovering games from event store...")
    results = await app.state.recovery_service.recover_all_games()
    print(f"Recovery complete: {results['recovered']} recovered, "
          f"{results['failed']} failed, {results['skipped']} skipped")

    # Start pub/sub
    app.state.pubsub = GamePubSub(app.state.redis)
    await app.state.pubsub.start()

    yield

    # Shutdown
    print("Shutting down...")

    # Stop accepting new connections
    await app.state.pubsub.stop()

    # Flush any pending state to Redis
    await flush_pending_states(app)

    # Close connections
    await app.state.redis.close()
    await app.state.pg_pool.close()

    print("Shutdown complete")


async def flush_pending_states(app: FastAPI):
    """Flush any in-memory state to Redis before shutdown."""
    # If we have any rooms with unsaved state, save them now
    for room_code, room in rooms.items():
        if room.game and room.game.game_id:
            try:
                state = room.game.get_full_state()
                await app.state.state_cache.save_game_state(
                    room.game.game_id,
                    state,
                )
            except Exception as e:
                print(f"Error flushing state for room {room_code}: {e}")


app = FastAPI(lifespan=lifespan)


# Handle SIGTERM gracefully
def handle_sigterm(signum, frame):
    """Handle SIGTERM by initiating graceful shutdown."""
    raise KeyboardInterrupt()

signal.signal(signal.SIGTERM, handle_sigterm)

Integration with Game Service

# server/services/game_service.py
from stores.state_cache import StateCache
from stores.event_store import EventStore
from stores.pubsub import GamePubSub, PubSubMessage, MessageType


class GameService:
    """
    Handles game commands with event sourcing.
    Coordinates between event store, state cache, and pub/sub.
    """

    def __init__(
        self,
        event_store: EventStore,
        state_cache: StateCache,
        pubsub: GamePubSub,
    ):
        self.event_store = event_store
        self.state_cache = state_cache
        self.pubsub = pubsub

    async def handle_draw(
        self,
        game_id: str,
        player_id: str,
        source: str,
    ) -> dict:
        """Handle draw card command."""
        # 1. Get current state from cache
        state = await self.state_cache.get_game_state(game_id)
        if not state:
            raise GameNotFoundError(game_id)

        # 2. Validate command
        if state["current_player_id"] != player_id:
            raise NotYourTurnError()

        # 3. Execute command (get card from deck/discard)
        # This uses the existing game logic
        game = self._load_game_from_state(state)
        card = game.draw_card(player_id, source)

        if not card:
            raise InvalidMoveError("Cannot draw from that source")

        # 4. Create event
        event = GameEvent(
            event_type=EventType.CARD_DRAWN,
            game_id=game_id,
            sequence_num=state["sequence_num"] + 1,
            player_id=player_id,
            data={"source": source, "card": card.to_dict()},
        )

        # 5. Persist event
        await self.event_store.append(event)

        # 6. Update cache
        new_state = game.get_full_state()
        new_state["sequence_num"] = event.sequence_num
        await self.state_cache.save_game_state(game_id, new_state)

        # 7. Publish to other servers
        await self.pubsub.publish(PubSubMessage(
            type=MessageType.GAME_STATE_UPDATE,
            room_code=state["room_code"],
            data={"game_state": new_state},
        ))

        return new_state

Acceptance Criteria

  1. Redis State Cache Working

    • Can create/get/delete rooms
    • Can add/remove players from rooms
    • Can save/get/delete game state
    • TTL expiration works correctly
    • Room code uniqueness enforced
  2. Pub/Sub Working

    • Can subscribe to room channels
    • Can publish messages
    • Messages received by all subscribers
    • Handles disconnections gracefully
    • Multiple servers can communicate
  3. Game Recovery Working

    • Games recovered on startup
    • State matches what was saved
    • Partial recovery (from sequence) works
    • Ended games not recovered
    • Failed recoveries logged and skipped
  4. Graceful Shutdown Working

    • SIGTERM triggers clean shutdown
    • In-flight requests complete
    • State flushed to Redis
    • Connections closed cleanly
    • No data loss on restart
  5. Integration Tests

    • Server restart doesn't lose games
    • Multi-server state sync works
    • State cache matches event store
    • Performance acceptable (<100ms for state ops)

Implementation Order

  1. Set up Redis locally (docker)
  2. Implement StateCache class
  3. Write StateCache tests
  4. Implement GamePubSub class
  5. Implement RecoveryService
  6. Add lifespan handler to main.py
  7. Integrate with game commands
  8. Test full recovery cycle
  9. Test multi-server pub/sub

Docker Setup for Development

# docker-compose.dev.yml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

  postgres:
    image: postgres:16-alpine
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: golf
      POSTGRES_PASSWORD: devpassword
      POSTGRES_DB: golf
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  redis_data:
  postgres_data:
# Start services
docker-compose -f docker-compose.dev.yml up -d

# Connect to Redis CLI
docker exec -it golfgame_redis_1 redis-cli

# Connect to PostgreSQL
docker exec -it golfgame_postgres_1 psql -U golf

Notes for Agent

  • Redis operations should use pipelines for atomicity
  • Consider Redis Cluster for production (but not needed initially)
  • The state cache is a cache, not source of truth (events are)
  • Pub/sub is best-effort; state sync should handle missed messages
  • Test with multiple server instances locally
  • Use connection pooling for both Redis and PostgreSQL