584 lines
19 KiB
Python
584 lines
19 KiB
Python
"""
|
|
Replay service for Golf game.
|
|
|
|
Provides game replay functionality, share link generation, and game export/import.
|
|
Leverages the event-sourced architecture for perfect game reconstruction.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import secrets
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional, List
|
|
|
|
import asyncpg
|
|
|
|
from stores.event_store import EventStore
|
|
from models.events import GameEvent, EventType
|
|
from models.game_state import rebuild_state, RebuiltGameState, CardState
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# SQL schema for replay/sharing tables
|
|
REPLAY_SCHEMA_SQL = """
|
|
-- Public share links for completed games
|
|
CREATE TABLE IF NOT EXISTS shared_games (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
game_id UUID NOT NULL,
|
|
share_code VARCHAR(12) UNIQUE NOT NULL,
|
|
created_by VARCHAR(50),
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
expires_at TIMESTAMPTZ,
|
|
view_count INTEGER DEFAULT 0,
|
|
is_public BOOLEAN DEFAULT true,
|
|
title VARCHAR(100),
|
|
description TEXT
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_shared_games_code ON shared_games(share_code);
|
|
CREATE INDEX IF NOT EXISTS idx_shared_games_game ON shared_games(game_id);
|
|
|
|
-- Track replay views for analytics
|
|
CREATE TABLE IF NOT EXISTS replay_views (
|
|
id SERIAL PRIMARY KEY,
|
|
shared_game_id UUID REFERENCES shared_games(id),
|
|
viewer_id VARCHAR(50),
|
|
viewed_at TIMESTAMPTZ DEFAULT NOW(),
|
|
ip_hash VARCHAR(64),
|
|
watch_duration_seconds INTEGER
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_replay_views_shared ON replay_views(shared_game_id);
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class ReplayFrame:
|
|
"""Single frame in a replay."""
|
|
event_index: int
|
|
event_type: str
|
|
event_data: dict
|
|
game_state: dict
|
|
timestamp: float # Seconds from start
|
|
player_id: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class GameReplay:
|
|
"""Complete replay of a game."""
|
|
game_id: str
|
|
frames: List[ReplayFrame]
|
|
total_duration_seconds: float
|
|
player_names: List[str]
|
|
final_scores: dict
|
|
winner: Optional[str]
|
|
options: dict
|
|
room_code: str
|
|
total_rounds: int
|
|
|
|
|
|
class ReplayService:
|
|
"""
|
|
Service for game replay, export, and sharing.
|
|
|
|
Provides:
|
|
- Replay building from event store
|
|
- Share link creation and retrieval
|
|
- Game export/import
|
|
"""
|
|
|
|
EXPORT_VERSION = "1.0"
|
|
|
|
def __init__(self, pool: asyncpg.Pool, event_store: EventStore):
|
|
"""
|
|
Initialize replay service.
|
|
|
|
Args:
|
|
pool: asyncpg connection pool.
|
|
event_store: Event store for retrieving game events.
|
|
"""
|
|
self.pool = pool
|
|
self.event_store = event_store
|
|
|
|
async def initialize_schema(self) -> None:
|
|
"""Create replay tables if they don't exist."""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(REPLAY_SCHEMA_SQL)
|
|
logger.info("Replay schema initialized")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Replay Building
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def build_replay(self, game_id: str) -> GameReplay:
|
|
"""
|
|
Build complete replay from event store.
|
|
|
|
Args:
|
|
game_id: Game UUID.
|
|
|
|
Returns:
|
|
GameReplay with all frames and metadata.
|
|
|
|
Raises:
|
|
ValueError: If no events found for game.
|
|
"""
|
|
events = await self.event_store.get_events(game_id)
|
|
if not events:
|
|
raise ValueError(f"No events found for game {game_id}")
|
|
|
|
frames = []
|
|
state = RebuiltGameState(game_id=game_id)
|
|
start_time = None
|
|
|
|
for i, event in enumerate(events):
|
|
if start_time is None:
|
|
start_time = event.timestamp
|
|
|
|
# Apply event to get state
|
|
state.apply(event)
|
|
|
|
# Calculate timestamp relative to start
|
|
elapsed = (event.timestamp - start_time).total_seconds()
|
|
|
|
frames.append(ReplayFrame(
|
|
event_index=i,
|
|
event_type=event.event_type.value,
|
|
event_data=event.data,
|
|
game_state=self._state_to_dict(state),
|
|
timestamp=elapsed,
|
|
player_id=event.player_id,
|
|
))
|
|
|
|
# Extract final game info
|
|
player_names = [p.name for p in state.players.values()]
|
|
final_scores = {p.name: p.total_score for p in state.players.values()}
|
|
|
|
# Determine winner (lowest total score)
|
|
winner = None
|
|
if state.phase.value == "game_over" and state.players:
|
|
winner_player = min(state.players.values(), key=lambda p: p.total_score)
|
|
winner = winner_player.name
|
|
|
|
return GameReplay(
|
|
game_id=game_id,
|
|
frames=frames,
|
|
total_duration_seconds=frames[-1].timestamp if frames else 0,
|
|
player_names=player_names,
|
|
final_scores=final_scores,
|
|
winner=winner,
|
|
options=state.options,
|
|
room_code=state.room_code,
|
|
total_rounds=state.total_rounds,
|
|
)
|
|
|
|
async def get_replay_frame(
|
|
self,
|
|
game_id: str,
|
|
frame_index: int
|
|
) -> Optional[ReplayFrame]:
|
|
"""
|
|
Get a specific frame from a replay.
|
|
|
|
Useful for seeking to a specific point without loading entire replay.
|
|
|
|
Args:
|
|
game_id: Game UUID.
|
|
frame_index: Index of frame to retrieve (0-based).
|
|
|
|
Returns:
|
|
ReplayFrame or None if index out of range.
|
|
"""
|
|
events = await self.event_store.get_events(
|
|
game_id,
|
|
from_sequence=1,
|
|
to_sequence=frame_index + 1
|
|
)
|
|
|
|
if not events or len(events) <= frame_index:
|
|
return None
|
|
|
|
state = RebuiltGameState(game_id=game_id)
|
|
start_time = events[0].timestamp if events else None
|
|
|
|
for event in events:
|
|
state.apply(event)
|
|
|
|
last_event = events[-1]
|
|
elapsed = (last_event.timestamp - start_time).total_seconds() if start_time else 0
|
|
|
|
return ReplayFrame(
|
|
event_index=frame_index,
|
|
event_type=last_event.event_type.value,
|
|
event_data=last_event.data,
|
|
game_state=self._state_to_dict(state),
|
|
timestamp=elapsed,
|
|
player_id=last_event.player_id,
|
|
)
|
|
|
|
def _state_to_dict(self, state: RebuiltGameState) -> dict:
|
|
"""Convert RebuiltGameState to serializable dict."""
|
|
players = []
|
|
for pid in state.player_order:
|
|
if pid in state.players:
|
|
p = state.players[pid]
|
|
players.append({
|
|
"id": p.id,
|
|
"name": p.name,
|
|
"cards": [c.to_dict() for c in p.cards],
|
|
"score": p.score,
|
|
"total_score": p.total_score,
|
|
"rounds_won": p.rounds_won,
|
|
"is_cpu": p.is_cpu,
|
|
"all_face_up": p.all_face_up(),
|
|
})
|
|
|
|
return {
|
|
"phase": state.phase.value,
|
|
"players": players,
|
|
"current_player_idx": state.current_player_idx,
|
|
"current_player_id": state.player_order[state.current_player_idx] if state.player_order else None,
|
|
"deck_remaining": state.deck_remaining,
|
|
"discard_pile": [c.to_dict() for c in state.discard_pile],
|
|
"discard_top": state.discard_pile[-1].to_dict() if state.discard_pile else None,
|
|
"drawn_card": state.drawn_card.to_dict() if state.drawn_card else None,
|
|
"current_round": state.current_round,
|
|
"total_rounds": state.total_rounds,
|
|
"finisher_id": state.finisher_id,
|
|
"options": state.options,
|
|
}
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Share Links
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def create_share_link(
|
|
self,
|
|
game_id: str,
|
|
user_id: Optional[str] = None,
|
|
title: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
expires_days: Optional[int] = None,
|
|
) -> str:
|
|
"""
|
|
Generate shareable link for a game.
|
|
|
|
Args:
|
|
game_id: Game UUID.
|
|
user_id: ID of user creating the share.
|
|
title: Optional custom title.
|
|
description: Optional description.
|
|
expires_days: Days until link expires (None = never).
|
|
|
|
Returns:
|
|
12-character share code.
|
|
"""
|
|
share_code = secrets.token_urlsafe(9)[:12]
|
|
|
|
expires_at = None
|
|
if expires_days:
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=expires_days)
|
|
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO shared_games
|
|
(game_id, share_code, created_by, title, description, expires_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
""", game_id, share_code, user_id, title, description, expires_at)
|
|
|
|
logger.info(f"Created share link {share_code} for game {game_id}")
|
|
return share_code
|
|
|
|
async def get_shared_game(self, share_code: str) -> Optional[dict]:
|
|
"""
|
|
Retrieve shared game by code.
|
|
|
|
Args:
|
|
share_code: 12-character share code.
|
|
|
|
Returns:
|
|
Shared game metadata dict, or None if not found/expired.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
row = await conn.fetchrow("""
|
|
SELECT sg.*, g.room_code, g.completed_at, g.num_players, g.num_rounds
|
|
FROM shared_games sg
|
|
JOIN games_v2 g ON sg.game_id = g.id
|
|
WHERE sg.share_code = $1
|
|
AND sg.is_public = true
|
|
AND (sg.expires_at IS NULL OR sg.expires_at > NOW())
|
|
""", share_code)
|
|
|
|
if row:
|
|
# Increment view count
|
|
await conn.execute("""
|
|
UPDATE shared_games SET view_count = view_count + 1
|
|
WHERE share_code = $1
|
|
""", share_code)
|
|
|
|
return dict(row)
|
|
return None
|
|
|
|
async def record_replay_view(
|
|
self,
|
|
shared_game_id: str,
|
|
viewer_id: Optional[str] = None,
|
|
ip_hash: Optional[str] = None,
|
|
duration_seconds: Optional[int] = None,
|
|
) -> None:
|
|
"""
|
|
Record a replay view for analytics.
|
|
|
|
Args:
|
|
shared_game_id: UUID of the shared_games record.
|
|
viewer_id: Optional user ID of viewer.
|
|
ip_hash: Optional hashed IP for rate limiting.
|
|
duration_seconds: Optional watch duration.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO replay_views
|
|
(shared_game_id, viewer_id, ip_hash, watch_duration_seconds)
|
|
VALUES ($1, $2, $3, $4)
|
|
""", shared_game_id, viewer_id, ip_hash, duration_seconds)
|
|
|
|
async def get_user_shared_games(self, user_id: str) -> List[dict]:
|
|
"""
|
|
Get all shared games created by a user.
|
|
|
|
Args:
|
|
user_id: User ID.
|
|
|
|
Returns:
|
|
List of shared game metadata dicts.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT sg.*, g.room_code, g.completed_at
|
|
FROM shared_games sg
|
|
JOIN games_v2 g ON sg.game_id = g.id
|
|
WHERE sg.created_by = $1
|
|
ORDER BY sg.created_at DESC
|
|
""", user_id)
|
|
return [dict(row) for row in rows]
|
|
|
|
async def delete_share_link(self, share_code: str, user_id: str) -> bool:
|
|
"""
|
|
Delete a share link.
|
|
|
|
Args:
|
|
share_code: Share code to delete.
|
|
user_id: User requesting deletion (must be creator).
|
|
|
|
Returns:
|
|
True if deleted, False if not found or not authorized.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute("""
|
|
DELETE FROM shared_games
|
|
WHERE share_code = $1 AND created_by = $2
|
|
""", share_code, user_id)
|
|
return result == "DELETE 1"
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Export/Import
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def export_game(self, game_id: str) -> dict:
|
|
"""
|
|
Export game as portable JSON format.
|
|
|
|
Args:
|
|
game_id: Game UUID.
|
|
|
|
Returns:
|
|
Export data dict suitable for JSON serialization.
|
|
"""
|
|
replay = await self.build_replay(game_id)
|
|
|
|
# Get raw events for export
|
|
events = await self.event_store.get_events(game_id)
|
|
start_time = events[0].timestamp if events else datetime.now(timezone.utc)
|
|
|
|
return {
|
|
"version": self.EXPORT_VERSION,
|
|
"exported_at": datetime.now(timezone.utc).isoformat(),
|
|
"game": {
|
|
"id": replay.game_id,
|
|
"room_code": replay.room_code,
|
|
"players": replay.player_names,
|
|
"winner": replay.winner,
|
|
"final_scores": replay.final_scores,
|
|
"duration_seconds": replay.total_duration_seconds,
|
|
"total_rounds": replay.total_rounds,
|
|
"options": replay.options,
|
|
},
|
|
"events": [
|
|
{
|
|
"type": event.event_type.value,
|
|
"sequence": event.sequence_num,
|
|
"player_id": event.player_id,
|
|
"data": event.data,
|
|
"timestamp": (event.timestamp - start_time).total_seconds(),
|
|
}
|
|
for event in events
|
|
],
|
|
}
|
|
|
|
async def import_game(self, export_data: dict, user_id: str) -> str:
|
|
"""
|
|
Import a game from exported JSON.
|
|
|
|
Creates a new game record with the imported events.
|
|
|
|
Args:
|
|
export_data: Exported game data.
|
|
user_id: User performing the import.
|
|
|
|
Returns:
|
|
New game ID.
|
|
|
|
Raises:
|
|
ValueError: If export format is invalid.
|
|
"""
|
|
version = export_data.get("version")
|
|
if version != self.EXPORT_VERSION:
|
|
raise ValueError(f"Unsupported export version: {version}")
|
|
|
|
if "events" not in export_data or not export_data["events"]:
|
|
raise ValueError("Export contains no events")
|
|
|
|
# Generate new game ID
|
|
import uuid
|
|
new_game_id = str(uuid.uuid4())
|
|
|
|
# Calculate base timestamp
|
|
base_time = datetime.now(timezone.utc)
|
|
|
|
# Import events with new game ID
|
|
events = []
|
|
for event_data in export_data["events"]:
|
|
event = GameEvent(
|
|
event_type=EventType(event_data["type"]),
|
|
game_id=new_game_id,
|
|
sequence_num=event_data["sequence"],
|
|
player_id=event_data.get("player_id"),
|
|
data=event_data["data"],
|
|
timestamp=base_time + timedelta(seconds=event_data.get("timestamp", 0)),
|
|
)
|
|
events.append(event)
|
|
|
|
# Batch insert events
|
|
await self.event_store.append_batch(events)
|
|
|
|
# Create game metadata record
|
|
game_info = export_data.get("game", {})
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO games_v2
|
|
(id, room_code, status, num_rounds, options, completed_at)
|
|
VALUES ($1, $2, 'imported', $3, $4, NOW())
|
|
""",
|
|
new_game_id,
|
|
f"IMP-{secrets.token_hex(2).upper()}", # Generate room code for imported games
|
|
game_info.get("total_rounds", 1),
|
|
json.dumps(game_info.get("options", {})),
|
|
)
|
|
|
|
logger.info(f"Imported game as {new_game_id} by user {user_id}")
|
|
return new_game_id
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Game History Queries
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def get_user_game_history(
|
|
self,
|
|
user_id: str,
|
|
limit: int = 20,
|
|
offset: int = 0,
|
|
) -> List[dict]:
|
|
"""
|
|
Get game history for a user.
|
|
|
|
Args:
|
|
user_id: User ID.
|
|
limit: Max games to return.
|
|
offset: Pagination offset.
|
|
|
|
Returns:
|
|
List of game summary dicts.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT g.id, g.room_code, g.status, g.completed_at,
|
|
g.num_players, g.num_rounds, g.winner_id,
|
|
$1 = ANY(g.player_ids) as participated
|
|
FROM games_v2 g
|
|
WHERE $1 = ANY(g.player_ids)
|
|
AND g.status IN ('completed', 'imported')
|
|
ORDER BY g.completed_at DESC NULLS LAST
|
|
LIMIT $2 OFFSET $3
|
|
""", user_id, limit, offset)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
async def can_view_game(self, user_id: Optional[str], game_id: str) -> bool:
|
|
"""
|
|
Check if user can view a game replay.
|
|
|
|
Users can view games they played in or games that are shared publicly.
|
|
|
|
Args:
|
|
user_id: User ID (None for anonymous).
|
|
game_id: Game UUID.
|
|
|
|
Returns:
|
|
True if user can view the game.
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
# Check if user played in the game
|
|
if user_id:
|
|
row = await conn.fetchrow("""
|
|
SELECT 1 FROM games_v2
|
|
WHERE id = $1 AND $2 = ANY(player_ids)
|
|
""", game_id, user_id)
|
|
if row:
|
|
return True
|
|
|
|
# Check if game has a public share link
|
|
row = await conn.fetchrow("""
|
|
SELECT 1 FROM shared_games
|
|
WHERE game_id = $1
|
|
AND is_public = true
|
|
AND (expires_at IS NULL OR expires_at > NOW())
|
|
""", game_id)
|
|
return row is not None
|
|
|
|
|
|
# Global instance
|
|
_replay_service: Optional[ReplayService] = None
|
|
|
|
|
|
async def get_replay_service(pool: asyncpg.Pool, event_store: EventStore) -> ReplayService:
|
|
"""Get or create the replay service instance."""
|
|
global _replay_service
|
|
if _replay_service is None:
|
|
_replay_service = ReplayService(pool, event_store)
|
|
await _replay_service.initialize_schema()
|
|
return _replay_service
|
|
|
|
|
|
def set_replay_service(service: ReplayService) -> None:
|
|
"""Set the global replay service instance."""
|
|
global _replay_service
|
|
_replay_service = service
|
|
|
|
|
|
def close_replay_service() -> None:
|
|
"""Close the replay service."""
|
|
global _replay_service
|
|
_replay_service = None
|