# V2-02: Persistence & Recovery ## Overview This document covers the live state caching and game recovery system. Games will survive server restarts by storing live state in Redis and rebuilding from events. **Dependencies:** V2-01 (Event Sourcing) **Dependents:** V2-03 (User Accounts), V2-06 (Replay) --- ## Goals 1. Cache live game state in Redis 2. Implement Redis pub/sub for multi-server support 3. Enable game recovery from events on server restart 4. Implement graceful shutdown with state preservation --- ## Current State Games are stored in-memory in `main.py`: ```python # Current approach rooms: dict[str, Room] = {} # Lost on restart! ``` On server restart, all active games are lost. --- ## Architecture ``` ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ FastAPI #1 │ │ FastAPI #2 │ │ FastAPI #N │ │ (WebSocket) │ │ (WebSocket) │ │ (WebSocket) │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ └───────────────────────┼───────────────────────┘ │ ┌────────────▼────────────┐ │ Redis │ │ ┌─────────────────┐ │ │ │ State Cache │ │ <- Live game state │ │ (Hash/JSON) │ │ │ └─────────────────┘ │ │ ┌─────────────────┐ │ │ │ Pub/Sub │ │ <- Cross-server events │ │ (Channels) │ │ │ └─────────────────┘ │ │ ┌─────────────────┐ │ │ │ Room Index │ │ <- Active room codes │ │ (Set) │ │ │ └─────────────────┘ │ └─────────────────────────┘ │ ┌────────────▼────────────┐ │ PostgreSQL │ │ (Event Store) │ <- Source of truth └─────────────────────────┘ ``` --- ## Redis Data Model ### Key Patterns ``` golf:room:{room_code} -> Hash (room metadata) golf:game:{game_id} -> JSON (full game state) golf:room:{room_code}:players -> Set (connected player IDs) golf:rooms:active -> Set (active room codes) golf:player:{player_id}:room -> String (player's current room) ``` ### Room Metadata Hash ``` golf:room:ABCD ├── game_id: "uuid-..." ├── host_id: "player-uuid" ├── created_at: "2024-01-15T10:30:00Z" ├── status: "waiting" | "playing" | "finished" └── server_id: "server-1" # Which server owns this room ``` ### Game State JSON ```json { "game_id": "uuid-...", "room_code": "ABCD", "phase": "playing", "current_round": 3, "total_rounds": 9, "current_player_idx": 1, "player_order": ["p1", "p2", "p3"], "players": { "p1": { "id": "p1", "name": "Alice", "cards": [{"rank": "K", "suit": "hearts", "face_up": true}, ...], "score": null, "total_score": 15, "rounds_won": 1, "is_cpu": false } }, "deck_count": 32, "discard_top": {"rank": "7", "suit": "clubs"}, "drawn_card": null, "options": {...}, "sequence_num": 47 } ``` --- ## State Cache Implementation ```python # server/stores/state_cache.py import json from typing import Optional from datetime import timedelta import redis.asyncio as redis from models.game_state import RebuiltGameState class StateCache: """Redis-backed live game state cache.""" # Key patterns ROOM_KEY = "golf:room:{room_code}" GAME_KEY = "golf:game:{game_id}" ROOM_PLAYERS_KEY = "golf:room:{room_code}:players" ACTIVE_ROOMS_KEY = "golf:rooms:active" PLAYER_ROOM_KEY = "golf:player:{player_id}:room" # TTLs ROOM_TTL = timedelta(hours=4) # Inactive rooms expire GAME_TTL = timedelta(hours=4) def __init__(self, redis_client: redis.Redis): self.redis = redis_client # --- Room Operations --- async def create_room( self, room_code: str, game_id: str, host_id: str, server_id: str, ) -> None: """Create a new room.""" pipe = self.redis.pipeline() # Room metadata pipe.hset( self.ROOM_KEY.format(room_code=room_code), mapping={ "game_id": game_id, "host_id": host_id, "status": "waiting", "server_id": server_id, "created_at": datetime.utcnow().isoformat(), }, ) pipe.expire(self.ROOM_KEY.format(room_code=room_code), self.ROOM_TTL) # Add to active rooms pipe.sadd(self.ACTIVE_ROOMS_KEY, room_code) # Track host's room pipe.set( self.PLAYER_ROOM_KEY.format(player_id=host_id), room_code, ex=self.ROOM_TTL, ) await pipe.execute() async def get_room(self, room_code: str) -> Optional[dict]: """Get room metadata.""" data = await self.redis.hgetall(self.ROOM_KEY.format(room_code=room_code)) if not data: return None return {k.decode(): v.decode() for k, v in data.items()} async def room_exists(self, room_code: str) -> bool: """Check if room exists.""" return await self.redis.exists(self.ROOM_KEY.format(room_code=room_code)) > 0 async def delete_room(self, room_code: str) -> None: """Delete a room and all associated data.""" room = await self.get_room(room_code) if not room: return pipe = self.redis.pipeline() # Get players to clean up their mappings players = await self.redis.smembers( self.ROOM_PLAYERS_KEY.format(room_code=room_code) ) for player_id in players: pipe.delete(self.PLAYER_ROOM_KEY.format(player_id=player_id.decode())) # Delete room data pipe.delete(self.ROOM_KEY.format(room_code=room_code)) pipe.delete(self.ROOM_PLAYERS_KEY.format(room_code=room_code)) pipe.srem(self.ACTIVE_ROOMS_KEY, room_code) # Delete game state if exists if "game_id" in room: pipe.delete(self.GAME_KEY.format(game_id=room["game_id"])) await pipe.execute() async def get_active_rooms(self) -> set[str]: """Get all active room codes.""" rooms = await self.redis.smembers(self.ACTIVE_ROOMS_KEY) return {r.decode() for r in rooms} # --- Player Operations --- async def add_player_to_room(self, room_code: str, player_id: str) -> None: """Add a player to a room.""" pipe = self.redis.pipeline() pipe.sadd(self.ROOM_PLAYERS_KEY.format(room_code=room_code), player_id) pipe.set( self.PLAYER_ROOM_KEY.format(player_id=player_id), room_code, ex=self.ROOM_TTL, ) # Refresh room TTL on activity pipe.expire(self.ROOM_KEY.format(room_code=room_code), self.ROOM_TTL) await pipe.execute() async def remove_player_from_room(self, room_code: str, player_id: str) -> None: """Remove a player from a room.""" pipe = self.redis.pipeline() pipe.srem(self.ROOM_PLAYERS_KEY.format(room_code=room_code), player_id) pipe.delete(self.PLAYER_ROOM_KEY.format(player_id=player_id)) await pipe.execute() async def get_room_players(self, room_code: str) -> set[str]: """Get player IDs in a room.""" players = await self.redis.smembers( self.ROOM_PLAYERS_KEY.format(room_code=room_code) ) return {p.decode() for p in players} async def get_player_room(self, player_id: str) -> Optional[str]: """Get the room a player is in.""" room = await self.redis.get(self.PLAYER_ROOM_KEY.format(player_id=player_id)) return room.decode() if room else None # --- Game State Operations --- async def save_game_state(self, game_id: str, state: dict) -> None: """Save full game state.""" await self.redis.set( self.GAME_KEY.format(game_id=game_id), json.dumps(state), ex=self.GAME_TTL, ) async def get_game_state(self, game_id: str) -> Optional[dict]: """Get full game state.""" data = await self.redis.get(self.GAME_KEY.format(game_id=game_id)) if not data: return None return json.loads(data) async def update_game_state(self, game_id: str, updates: dict) -> None: """Partial update to game state (get, merge, set).""" state = await self.get_game_state(game_id) if state: state.update(updates) await self.save_game_state(game_id, state) async def delete_game_state(self, game_id: str) -> None: """Delete game state.""" await self.redis.delete(self.GAME_KEY.format(game_id=game_id)) # --- Room Status --- async def set_room_status(self, room_code: str, status: str) -> None: """Update room status.""" await self.redis.hset( self.ROOM_KEY.format(room_code=room_code), "status", status, ) async def refresh_room_ttl(self, room_code: str) -> None: """Refresh room TTL on activity.""" pipe = self.redis.pipeline() pipe.expire(self.ROOM_KEY.format(room_code=room_code), self.ROOM_TTL) room = await self.get_room(room_code) if room and "game_id" in room: pipe.expire(self.GAME_KEY.format(game_id=room["game_id"]), self.GAME_TTL) await pipe.execute() ``` --- ## Pub/Sub for Multi-Server ```python # server/stores/pubsub.py import asyncio import json from typing import Callable, Awaitable from dataclasses import dataclass from enum import Enum import redis.asyncio as redis class MessageType(str, Enum): GAME_STATE_UPDATE = "game_state_update" PLAYER_JOINED = "player_joined" PLAYER_LEFT = "player_left" ROOM_CLOSED = "room_closed" BROADCAST = "broadcast" @dataclass class PubSubMessage: type: MessageType room_code: str data: dict def to_json(self) -> str: return json.dumps({ "type": self.type.value, "room_code": self.room_code, "data": self.data, }) @classmethod def from_json(cls, raw: str) -> "PubSubMessage": d = json.loads(raw) return cls( type=MessageType(d["type"]), room_code=d["room_code"], data=d["data"], ) class GamePubSub: """Redis pub/sub for cross-server game events.""" CHANNEL_PREFIX = "golf:room:" def __init__(self, redis_client: redis.Redis): self.redis = redis_client self.pubsub = redis_client.pubsub() self._handlers: dict[str, list[Callable[[PubSubMessage], Awaitable[None]]]] = {} self._running = False self._task: Optional[asyncio.Task] = None def _channel(self, room_code: str) -> str: return f"{self.CHANNEL_PREFIX}{room_code}" async def subscribe( self, room_code: str, handler: Callable[[PubSubMessage], Awaitable[None]], ) -> None: """Subscribe to room events.""" channel = self._channel(room_code) if channel not in self._handlers: self._handlers[channel] = [] await self.pubsub.subscribe(channel) self._handlers[channel].append(handler) async def unsubscribe(self, room_code: str) -> None: """Unsubscribe from room events.""" channel = self._channel(room_code) if channel in self._handlers: del self._handlers[channel] await self.pubsub.unsubscribe(channel) async def publish(self, message: PubSubMessage) -> None: """Publish a message to a room's channel.""" channel = self._channel(message.room_code) await self.redis.publish(channel, message.to_json()) async def start(self) -> None: """Start listening for messages.""" self._running = True self._task = asyncio.create_task(self._listen()) async def stop(self) -> None: """Stop listening.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass await self.pubsub.close() async def _listen(self) -> None: """Main listener loop.""" while self._running: try: message = await self.pubsub.get_message( ignore_subscribe_messages=True, timeout=1.0, ) if message and message["type"] == "message": channel = message["channel"].decode() handlers = self._handlers.get(channel, []) try: msg = PubSubMessage.from_json(message["data"].decode()) for handler in handlers: await handler(msg) except Exception as e: print(f"Error handling pubsub message: {e}") except asyncio.CancelledError: break except Exception as e: print(f"PubSub listener error: {e}") await asyncio.sleep(1) ``` --- ## Game Recovery ```python # server/services/recovery_service.py from typing import Optional import asyncio from stores.event_store import EventStore from stores.state_cache import StateCache from models.events import rebuild_state, EventType class RecoveryService: """Recovers games from event store on startup.""" def __init__(self, event_store: EventStore, state_cache: StateCache): self.event_store = event_store self.state_cache = state_cache async def recover_all_games(self) -> dict[str, any]: """ Recover all active games from event store. Returns dict of recovered games. """ results = { "recovered": 0, "failed": 0, "skipped": 0, "games": [], } # Get active rooms from Redis (may be stale) active_rooms = await self.state_cache.get_active_rooms() for room_code in active_rooms: room = await self.state_cache.get_room(room_code) if not room: results["skipped"] += 1 continue game_id = room.get("game_id") if not game_id: results["skipped"] += 1 continue try: game = await self.recover_game(game_id) if game: results["recovered"] += 1 results["games"].append({ "game_id": game_id, "room_code": room_code, "phase": game.phase.value, "sequence": game.sequence_num, }) else: results["skipped"] += 1 except Exception as e: print(f"Failed to recover game {game_id}: {e}") results["failed"] += 1 return results async def recover_game(self, game_id: str) -> Optional[any]: """ Recover a single game from event store. Returns the rebuilt game state. """ # Get all events for this game events = await self.event_store.get_events(game_id) if not events: return None # Check if game is actually active (not ended) last_event = events[-1] if last_event.event_type == EventType.GAME_ENDED: return None # Game is finished, don't recover # Rebuild state state = rebuild_state(events) # Save to cache await self.state_cache.save_game_state( game_id, self._state_to_dict(state), ) return state async def recover_from_sequence( self, game_id: str, cached_state: dict, cached_sequence: int, ) -> Optional[any]: """ Recover game by applying only new events to cached state. More efficient than full rebuild. """ # Get events after cached sequence new_events = await self.event_store.get_events( game_id, from_sequence=cached_sequence + 1, ) if not new_events: return None # No new events # Rebuild state from cache + new events state = self._dict_to_state(cached_state) for event in new_events: state.apply(event) # Update cache await self.state_cache.save_game_state( game_id, self._state_to_dict(state), ) return state def _state_to_dict(self, state) -> dict: """Convert RebuiltGameState to dict for caching.""" return { "game_id": state.game_id, "room_code": state.room_code, "phase": state.phase.value, "current_round": state.current_round, "total_rounds": state.total_rounds, "current_player_idx": state.current_player_idx, "player_order": state.player_order, "players": { pid: { "id": p.id, "name": p.name, "cards": [c.to_dict() for c in p.cards], "score": p.score, "total_score": p.total_score, "rounds_won": p.rounds_won, "is_cpu": p.is_cpu, "cpu_profile": p.cpu_profile, } for pid, p in state.players.items() }, "deck_count": len(state.deck), "discard_top": state.discard[-1].to_dict() if state.discard else None, "drawn_card": state.drawn_card.to_dict() if state.drawn_card else None, "options": state.options, "sequence_num": state.sequence_num, "finisher_id": state.finisher_id, } def _dict_to_state(self, d: dict): """Convert dict back to RebuiltGameState.""" # Implementation depends on RebuiltGameState structure pass ``` --- ## Graceful Shutdown ```python # server/main.py additions import signal import asyncio from contextlib import asynccontextmanager from stores.state_cache import StateCache from stores.event_store import EventStore from services.recovery_service import RecoveryService @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler.""" # Startup print("Starting up...") # Initialize connections app.state.redis = await create_redis_pool() app.state.pg_pool = await create_pg_pool() app.state.state_cache = StateCache(app.state.redis) app.state.event_store = EventStore(app.state.pg_pool) app.state.recovery_service = RecoveryService( app.state.event_store, app.state.state_cache, ) # Recover games print("Recovering games from event store...") results = await app.state.recovery_service.recover_all_games() print(f"Recovery complete: {results['recovered']} recovered, " f"{results['failed']} failed, {results['skipped']} skipped") # Start pub/sub app.state.pubsub = GamePubSub(app.state.redis) await app.state.pubsub.start() yield # Shutdown print("Shutting down...") # Stop accepting new connections await app.state.pubsub.stop() # Flush any pending state to Redis await flush_pending_states(app) # Close connections await app.state.redis.close() await app.state.pg_pool.close() print("Shutdown complete") async def flush_pending_states(app: FastAPI): """Flush any in-memory state to Redis before shutdown.""" # If we have any rooms with unsaved state, save them now for room_code, room in rooms.items(): if room.game and room.game.game_id: try: state = room.game.get_full_state() await app.state.state_cache.save_game_state( room.game.game_id, state, ) except Exception as e: print(f"Error flushing state for room {room_code}: {e}") app = FastAPI(lifespan=lifespan) # Handle SIGTERM gracefully def handle_sigterm(signum, frame): """Handle SIGTERM by initiating graceful shutdown.""" raise KeyboardInterrupt() signal.signal(signal.SIGTERM, handle_sigterm) ``` --- ## Integration with Game Service ```python # server/services/game_service.py from stores.state_cache import StateCache from stores.event_store import EventStore from stores.pubsub import GamePubSub, PubSubMessage, MessageType class GameService: """ Handles game commands with event sourcing. Coordinates between event store, state cache, and pub/sub. """ def __init__( self, event_store: EventStore, state_cache: StateCache, pubsub: GamePubSub, ): self.event_store = event_store self.state_cache = state_cache self.pubsub = pubsub async def handle_draw( self, game_id: str, player_id: str, source: str, ) -> dict: """Handle draw card command.""" # 1. Get current state from cache state = await self.state_cache.get_game_state(game_id) if not state: raise GameNotFoundError(game_id) # 2. Validate command if state["current_player_id"] != player_id: raise NotYourTurnError() # 3. Execute command (get card from deck/discard) # This uses the existing game logic game = self._load_game_from_state(state) card = game.draw_card(player_id, source) if not card: raise InvalidMoveError("Cannot draw from that source") # 4. Create event event = GameEvent( event_type=EventType.CARD_DRAWN, game_id=game_id, sequence_num=state["sequence_num"] + 1, player_id=player_id, data={"source": source, "card": card.to_dict()}, ) # 5. Persist event await self.event_store.append(event) # 6. Update cache new_state = game.get_full_state() new_state["sequence_num"] = event.sequence_num await self.state_cache.save_game_state(game_id, new_state) # 7. Publish to other servers await self.pubsub.publish(PubSubMessage( type=MessageType.GAME_STATE_UPDATE, room_code=state["room_code"], data={"game_state": new_state}, )) return new_state ``` --- ## Acceptance Criteria 1. **Redis State Cache Working** - [ ] Can create/get/delete rooms - [ ] Can add/remove players from rooms - [ ] Can save/get/delete game state - [ ] TTL expiration works correctly - [ ] Room code uniqueness enforced 2. **Pub/Sub Working** - [ ] Can subscribe to room channels - [ ] Can publish messages - [ ] Messages received by all subscribers - [ ] Handles disconnections gracefully - [ ] Multiple servers can communicate 3. **Game Recovery Working** - [ ] Games recovered on startup - [ ] State matches what was saved - [ ] Partial recovery (from sequence) works - [ ] Ended games not recovered - [ ] Failed recoveries logged and skipped 4. **Graceful Shutdown Working** - [ ] SIGTERM triggers clean shutdown - [ ] In-flight requests complete - [ ] State flushed to Redis - [ ] Connections closed cleanly - [ ] No data loss on restart 5. **Integration Tests** - [ ] Server restart doesn't lose games - [ ] Multi-server state sync works - [ ] State cache matches event store - [ ] Performance acceptable (<100ms for state ops) --- ## Implementation Order 1. Set up Redis locally (docker) 2. Implement StateCache class 3. Write StateCache tests 4. Implement GamePubSub class 5. Implement RecoveryService 6. Add lifespan handler to main.py 7. Integrate with game commands 8. Test full recovery cycle 9. Test multi-server pub/sub --- ## Docker Setup for Development ```yaml # docker-compose.dev.yml version: '3.8' services: redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data command: redis-server --appendonly yes postgres: image: postgres:16-alpine ports: - "5432:5432" environment: POSTGRES_USER: golf POSTGRES_PASSWORD: devpassword POSTGRES_DB: golf volumes: - postgres_data:/var/lib/postgresql/data volumes: redis_data: postgres_data: ``` ```bash # Start services docker-compose -f docker-compose.dev.yml up -d # Connect to Redis CLI docker exec -it golfgame_redis_1 redis-cli # Connect to PostgreSQL docker exec -it golfgame_postgres_1 psql -U golf ``` --- ## Notes for Agent - Redis operations should use pipelines for atomicity - Consider Redis Cluster for production (but not needed initially) - The state cache is a cache, not source of truth (events are) - Pub/sub is best-effort; state sync should handle missed messages - Test with multiple server instances locally - Use connection pooling for both Redis and PostgreSQL