# V2_06: Game Replay & Export System > **Scope**: Replay viewer, game export/import, share links, spectator mode > **Dependencies**: V2_01 (Event Sourcing), V2_02 (Persistence), V2_03 (User Accounts) > **Complexity**: Medium --- ## Overview The replay system leverages our event-sourced architecture to provide: - **Replay Viewer**: Step through any completed game move-by-move - **Export/Import**: Download games as JSON, share with others - **Share Links**: Generate public links to specific games - **Spectator Mode**: Watch live games in progress --- ## 1. Database Schema ### Shared Games Table ```sql -- Public share links for completed games CREATE TABLE shared_games ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), game_id UUID NOT NULL REFERENCES games(id), share_code VARCHAR(12) UNIQUE NOT NULL, -- Short shareable code created_by UUID REFERENCES users(id), created_at TIMESTAMPTZ DEFAULT NOW(), expires_at TIMESTAMPTZ, -- NULL = never expires view_count INTEGER DEFAULT 0, is_public BOOLEAN DEFAULT true, title VARCHAR(100), -- Optional custom title description TEXT -- Optional description ); CREATE INDEX idx_shared_games_code ON shared_games(share_code); CREATE INDEX idx_shared_games_game ON shared_games(game_id); -- Track replay views for analytics CREATE TABLE replay_views ( id SERIAL PRIMARY KEY, shared_game_id UUID REFERENCES shared_games(id), viewer_id UUID REFERENCES users(id), -- NULL for anonymous viewed_at TIMESTAMPTZ DEFAULT NOW(), ip_hash VARCHAR(64), -- Hashed IP for rate limiting watch_duration_seconds INTEGER ); ``` --- ## 2. Replay Service ### Core Implementation ```python # server/replay.py from dataclasses import dataclass from typing import Optional import secrets import json from server.events import EventStore, GameEvent from server.game import Game, GameOptions @dataclass class ReplayFrame: """Single frame in a replay.""" event_index: int event: GameEvent game_state: dict # Serialized game state after event timestamp: float @dataclass class GameReplay: """Complete replay of a game.""" game_id: str frames: list[ReplayFrame] total_duration_seconds: float player_names: list[str] final_scores: dict[str, int] winner: Optional[str] options: GameOptions class ReplayService: def __init__(self, event_store: EventStore, db_pool): self.event_store = event_store self.db = db_pool async def build_replay(self, game_id: str) -> GameReplay: """Build complete replay from event store.""" events = await self.event_store.get_events(game_id) if not events: raise ValueError(f"No events found for game {game_id}") frames = [] game = None start_time = None for i, event in enumerate(events): if start_time is None: start_time = event.timestamp # Apply event to get state if event.event_type == "game_started": game = Game.from_event(event) else: game.apply_event(event) frames.append(ReplayFrame( event_index=i, event=event, game_state=game.to_dict(reveal_all=True), timestamp=(event.timestamp - start_time).total_seconds() )) return GameReplay( game_id=game_id, frames=frames, total_duration_seconds=frames[-1].timestamp if frames else 0, player_names=[p.name for p in game.players], final_scores={p.name: p.score for p in game.players}, winner=game.winner.name if game.winner else None, options=game.options ) async def create_share_link( self, game_id: str, user_id: Optional[str] = None, title: Optional[str] = None, expires_days: Optional[int] = None ) -> str: """Generate shareable link for a game.""" share_code = secrets.token_urlsafe(8)[:12] # 12-char code expires_at = None if expires_days: expires_at = f"NOW() + INTERVAL '{expires_days} days'" async with self.db.acquire() as conn: await conn.execute(""" INSERT INTO shared_games (game_id, share_code, created_by, title, expires_at) VALUES ($1, $2, $3, $4, $5) """, game_id, share_code, user_id, title, expires_at) return share_code async def get_shared_game(self, share_code: str) -> Optional[dict]: """Retrieve shared game by code.""" async with self.db.acquire() as conn: row = await conn.fetchrow(""" SELECT sg.*, g.room_code, g.completed_at FROM shared_games sg JOIN games g ON sg.game_id = g.id WHERE sg.share_code = $1 AND sg.is_public = true AND (sg.expires_at IS NULL OR sg.expires_at > NOW()) """, share_code) if row: # Increment view count await conn.execute(""" UPDATE shared_games SET view_count = view_count + 1 WHERE share_code = $1 """, share_code) return dict(row) return None async def export_game(self, game_id: str) -> dict: """Export game as portable JSON format.""" replay = await self.build_replay(game_id) return { "version": "1.0", "exported_at": datetime.utcnow().isoformat(), "game": { "id": replay.game_id, "players": replay.player_names, "winner": replay.winner, "final_scores": replay.final_scores, "duration_seconds": replay.total_duration_seconds, "options": asdict(replay.options) }, "events": [ { "type": f.event.event_type, "data": f.event.data, "timestamp": f.timestamp } for f in replay.frames ] } async def import_game(self, export_data: dict, user_id: str) -> str: """Import a game from exported JSON.""" if export_data.get("version") != "1.0": raise ValueError("Unsupported export version") # Generate new game ID for import new_game_id = str(uuid.uuid4()) # Store events with new game ID for event_data in export_data["events"]: event = GameEvent( game_id=new_game_id, event_type=event_data["type"], data=event_data["data"], timestamp=datetime.fromisoformat(event_data["timestamp"]) ) await self.event_store.append(event) # Mark as imported game async with self.db.acquire() as conn: await conn.execute(""" INSERT INTO games (id, imported_by, imported_at, is_imported) VALUES ($1, $2, NOW(), true) """, new_game_id, user_id) return new_game_id ``` --- ## 3. Spectator Mode ### Live Game Watching ```python # server/spectator.py from typing import Set from fastapi import WebSocket class SpectatorManager: """Manage spectators watching live games.""" def __init__(self): # game_id -> set of spectator websockets self.spectators: dict[str, Set[WebSocket]] = {} async def add_spectator(self, game_id: str, ws: WebSocket): """Add spectator to game.""" if game_id not in self.spectators: self.spectators[game_id] = set() self.spectators[game_id].add(ws) # Send current game state game = await self.get_game_state(game_id) await ws.send_json({ "type": "spectator_joined", "game": game.to_dict(reveal_all=False), "spectator_count": len(self.spectators[game_id]) }) async def remove_spectator(self, game_id: str, ws: WebSocket): """Remove spectator from game.""" if game_id in self.spectators: self.spectators[game_id].discard(ws) if not self.spectators[game_id]: del self.spectators[game_id] async def broadcast_to_spectators(self, game_id: str, message: dict): """Send update to all spectators of a game.""" if game_id not in self.spectators: return dead_connections = set() for ws in self.spectators[game_id]: try: await ws.send_json(message) except: dead_connections.add(ws) # Clean up dead connections self.spectators[game_id] -= dead_connections def get_spectator_count(self, game_id: str) -> int: return len(self.spectators.get(game_id, set())) # Integration with main game loop async def handle_game_event(game_id: str, event: GameEvent): """Called after each game event to notify spectators.""" await spectator_manager.broadcast_to_spectators(game_id, { "type": "game_update", "event": event.to_dict(), "timestamp": event.timestamp.isoformat() }) ``` --- ## 4. API Endpoints ```python # server/routes/replay.py from fastapi import APIRouter, HTTPException, Query from fastapi.responses import JSONResponse router = APIRouter(prefix="/api/replay", tags=["replay"]) @router.get("/game/{game_id}") async def get_replay(game_id: str, user: Optional[User] = Depends(get_current_user)): """Get full replay for a game.""" # Check if user has permission (played in game or game is public) if not await can_view_game(user, game_id): raise HTTPException(403, "Cannot view this game") replay = await replay_service.build_replay(game_id) return { "game_id": replay.game_id, "frames": [ { "index": f.event_index, "event_type": f.event.event_type, "timestamp": f.timestamp, "state": f.game_state } for f in replay.frames ], "metadata": { "players": replay.player_names, "winner": replay.winner, "final_scores": replay.final_scores, "duration": replay.total_duration_seconds } } @router.post("/game/{game_id}/share") async def create_share_link( game_id: str, title: Optional[str] = None, expires_days: Optional[int] = Query(None, ge=1, le=365), user: User = Depends(require_auth) ): """Create shareable link for a game.""" if not await user_played_in_game(user.id, game_id): raise HTTPException(403, "Can only share games you played in") share_code = await replay_service.create_share_link( game_id, user.id, title, expires_days ) return { "share_code": share_code, "share_url": f"/replay/{share_code}", "expires_days": expires_days } @router.get("/shared/{share_code}") async def get_shared_replay(share_code: str): """Get replay via share code (public endpoint).""" shared = await replay_service.get_shared_game(share_code) if not shared: raise HTTPException(404, "Shared game not found or expired") replay = await replay_service.build_replay(shared["game_id"]) return { "title": shared.get("title"), "view_count": shared["view_count"], "replay": replay } @router.get("/game/{game_id}/export") async def export_game(game_id: str, user: User = Depends(require_auth)): """Export game as downloadable JSON.""" if not await can_view_game(user, game_id): raise HTTPException(403, "Cannot export this game") export_data = await replay_service.export_game(game_id) return JSONResponse( content=export_data, headers={ "Content-Disposition": f'attachment; filename="golf-game-{game_id[:8]}.json"' } ) @router.post("/import") async def import_game( export_data: dict, user: User = Depends(require_auth) ): """Import a game from JSON export.""" try: new_game_id = await replay_service.import_game(export_data, user.id) return {"game_id": new_game_id, "message": "Game imported successfully"} except ValueError as e: raise HTTPException(400, str(e)) # Spectator endpoints @router.websocket("/spectate/{room_code}") async def spectate_game(websocket: WebSocket, room_code: str): """WebSocket endpoint for spectating live games.""" await websocket.accept() game_id = await get_game_id_by_room(room_code) if not game_id: await websocket.close(code=4004, reason="Game not found") return try: await spectator_manager.add_spectator(game_id, websocket) while True: # Keep connection alive, handle pings data = await websocket.receive_text() if data == "ping": await websocket.send_text("pong") except WebSocketDisconnect: pass finally: await spectator_manager.remove_spectator(game_id, websocket) ``` --- ## 5. Frontend: Replay Viewer ### Replay Component ```javascript // client/replay.js class ReplayViewer { constructor(container) { this.container = container; this.frames = []; this.currentFrame = 0; this.isPlaying = false; this.playbackSpeed = 1.0; this.playInterval = null; } async loadReplay(gameId) { const response = await fetch(`/api/replay/game/${gameId}`); const data = await response.json(); this.frames = data.frames; this.metadata = data.metadata; this.currentFrame = 0; this.render(); this.renderControls(); } async loadSharedReplay(shareCode) { const response = await fetch(`/api/replay/shared/${shareCode}`); if (!response.ok) { this.showError("Replay not found or expired"); return; } const data = await response.json(); this.frames = data.replay.frames; this.metadata = data.replay; this.title = data.title; this.currentFrame = 0; this.render(); } render() { if (!this.frames.length) return; const frame = this.frames[this.currentFrame]; const state = frame.state; // Render game board at this state this.renderBoard(state); // Show event description this.renderEventInfo(frame); // Update timeline this.updateTimeline(); } renderBoard(state) { // Similar to main game rendering but read-only const boardHtml = `