- Enforce invite codes on registration (INVITE_ONLY=true by default) - Bootstrap admin account for first-time setup - Require authentication for WebSocket connections and room creation - Add Glicko-2 rating system with multiplayer pairwise comparisons - Add Redis-backed matchmaking queue with expanding rating window - Auto-start matched games with standard rules after countdown - Add "Find Game" button and matchmaking UI to client - Add rating column to leaderboard - Scale down docker-compose.prod.yml for 512MB droplet Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
394 lines
14 KiB
Python
394 lines
14 KiB
Python
"""
|
|
Matchmaking service for public skill-based games.
|
|
|
|
Uses Redis sorted sets to maintain a queue of players looking for games,
|
|
grouped by rating. A background task periodically scans the queue and
|
|
creates matches when enough similar-skill players are available.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
from fastapi import WebSocket
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class QueuedPlayer:
|
|
"""A player waiting in the matchmaking queue."""
|
|
user_id: str
|
|
username: str
|
|
rating: float
|
|
queued_at: float # time.time()
|
|
connection_id: str
|
|
|
|
|
|
@dataclass
|
|
class MatchmakingConfig:
|
|
"""Configuration for the matchmaking system."""
|
|
enabled: bool = True
|
|
min_players: int = 2
|
|
max_players: int = 4
|
|
initial_rating_window: int = 100 # +/- rating range to start
|
|
expand_interval: int = 15 # seconds between range expansions
|
|
expand_amount: int = 50 # rating points to expand by
|
|
max_rating_window: int = 500 # maximum +/- range
|
|
match_check_interval: float = 3.0 # seconds between match attempts
|
|
countdown_seconds: int = 5 # countdown before matched game starts
|
|
|
|
|
|
class MatchmakingService:
|
|
"""
|
|
Manages the matchmaking queue and creates matches.
|
|
|
|
Players join the queue with their rating. A background task
|
|
periodically scans for groups of similarly-rated players and
|
|
creates games when matches are found.
|
|
"""
|
|
|
|
def __init__(self, redis_client, config: Optional[MatchmakingConfig] = None):
|
|
self.redis = redis_client
|
|
self.config = config or MatchmakingConfig()
|
|
self._queue: dict[str, QueuedPlayer] = {} # user_id -> QueuedPlayer
|
|
self._websockets: dict[str, WebSocket] = {} # user_id -> WebSocket
|
|
self._connection_ids: dict[str, str] = {} # user_id -> connection_id
|
|
self._running = False
|
|
self._task: Optional[asyncio.Task] = None
|
|
|
|
async def join_queue(
|
|
self,
|
|
user_id: str,
|
|
username: str,
|
|
rating: float,
|
|
websocket: WebSocket,
|
|
connection_id: str,
|
|
) -> dict:
|
|
"""
|
|
Add a player to the matchmaking queue.
|
|
|
|
Returns:
|
|
Queue status dict.
|
|
"""
|
|
if user_id in self._queue:
|
|
return {"position": self._get_position(user_id), "queue_size": len(self._queue)}
|
|
|
|
player = QueuedPlayer(
|
|
user_id=user_id,
|
|
username=username,
|
|
rating=rating,
|
|
queued_at=time.time(),
|
|
connection_id=connection_id,
|
|
)
|
|
|
|
self._queue[user_id] = player
|
|
self._websockets[user_id] = websocket
|
|
self._connection_ids[user_id] = connection_id
|
|
|
|
# Also add to Redis for persistence across restarts
|
|
if self.redis:
|
|
try:
|
|
await self.redis.zadd("matchmaking:queue", {user_id: rating})
|
|
await self.redis.hset(
|
|
"matchmaking:players",
|
|
user_id,
|
|
json.dumps({
|
|
"username": username,
|
|
"rating": rating,
|
|
"queued_at": player.queued_at,
|
|
"connection_id": connection_id,
|
|
}),
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Redis matchmaking write failed: {e}")
|
|
|
|
position = self._get_position(user_id)
|
|
logger.info(f"Player {username} ({user_id[:8]}) joined queue (rating={rating:.0f}, pos={position})")
|
|
|
|
return {"position": position, "queue_size": len(self._queue)}
|
|
|
|
async def leave_queue(self, user_id: str) -> bool:
|
|
"""Remove a player from the matchmaking queue."""
|
|
if user_id not in self._queue:
|
|
return False
|
|
|
|
player = self._queue.pop(user_id, None)
|
|
self._websockets.pop(user_id, None)
|
|
self._connection_ids.pop(user_id, None)
|
|
|
|
if self.redis:
|
|
try:
|
|
await self.redis.zrem("matchmaking:queue", user_id)
|
|
await self.redis.hdel("matchmaking:players", user_id)
|
|
except Exception as e:
|
|
logger.warning(f"Redis matchmaking remove failed: {e}")
|
|
|
|
if player:
|
|
logger.info(f"Player {player.username} ({user_id[:8]}) left queue")
|
|
|
|
return True
|
|
|
|
async def get_queue_status(self, user_id: str) -> dict:
|
|
"""Get current queue status for a player."""
|
|
if user_id not in self._queue:
|
|
return {"in_queue": False}
|
|
|
|
player = self._queue[user_id]
|
|
wait_time = time.time() - player.queued_at
|
|
current_window = self._get_rating_window(wait_time)
|
|
|
|
return {
|
|
"in_queue": True,
|
|
"position": self._get_position(user_id),
|
|
"queue_size": len(self._queue),
|
|
"wait_time": int(wait_time),
|
|
"rating_window": current_window,
|
|
}
|
|
|
|
async def find_matches(self, room_manager, broadcast_game_state_fn) -> list[dict]:
|
|
"""
|
|
Scan the queue and create matches.
|
|
|
|
Returns:
|
|
List of match info dicts for matches created.
|
|
"""
|
|
if len(self._queue) < self.config.min_players:
|
|
return []
|
|
|
|
matches_created = []
|
|
matched_user_ids = set()
|
|
|
|
# Sort players by rating
|
|
sorted_players = sorted(self._queue.values(), key=lambda p: p.rating)
|
|
|
|
for player in sorted_players:
|
|
if player.user_id in matched_user_ids:
|
|
continue
|
|
|
|
wait_time = time.time() - player.queued_at
|
|
window = self._get_rating_window(wait_time)
|
|
|
|
# Find compatible players
|
|
candidates = []
|
|
for other in sorted_players:
|
|
if other.user_id == player.user_id or other.user_id in matched_user_ids:
|
|
continue
|
|
if abs(other.rating - player.rating) <= window:
|
|
candidates.append(other)
|
|
|
|
# Include the player themselves
|
|
group = [player] + candidates
|
|
|
|
if len(group) >= self.config.min_players:
|
|
# Take up to max_players
|
|
match_group = group[:self.config.max_players]
|
|
matched_user_ids.update(p.user_id for p in match_group)
|
|
|
|
# Create the match
|
|
match_info = await self._create_match(match_group, room_manager)
|
|
if match_info:
|
|
matches_created.append(match_info)
|
|
|
|
return matches_created
|
|
|
|
async def _create_match(self, players: list[QueuedPlayer], room_manager) -> Optional[dict]:
|
|
"""
|
|
Create a room for matched players and notify them.
|
|
|
|
Returns:
|
|
Match info dict, or None if creation failed.
|
|
"""
|
|
try:
|
|
# Create room
|
|
room = room_manager.create_room()
|
|
|
|
# Add all matched players to the room
|
|
for player in players:
|
|
ws = self._websockets.get(player.user_id)
|
|
if not ws:
|
|
continue
|
|
|
|
room.add_player(
|
|
player.connection_id,
|
|
player.username,
|
|
ws,
|
|
player.user_id,
|
|
)
|
|
|
|
# Remove matched players from queue
|
|
for player in players:
|
|
await self.leave_queue(player.user_id)
|
|
|
|
# Notify all matched players
|
|
match_info = {
|
|
"room_code": room.code,
|
|
"players": [
|
|
{"username": p.username, "rating": round(p.rating)}
|
|
for p in players
|
|
],
|
|
}
|
|
|
|
for player in players:
|
|
ws = self._websockets.get(player.user_id)
|
|
if ws:
|
|
try:
|
|
await ws.send_json({
|
|
"type": "queue_matched",
|
|
"room_code": room.code,
|
|
"players": match_info["players"],
|
|
"countdown": self.config.countdown_seconds,
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Failed to notify matched player {player.user_id[:8]}: {e}")
|
|
|
|
# Also send room_joined to each player so the client switches screens
|
|
for player in players:
|
|
ws = self._websockets.get(player.user_id)
|
|
if ws:
|
|
try:
|
|
await ws.send_json({
|
|
"type": "room_joined",
|
|
"room_code": room.code,
|
|
"player_id": player.connection_id,
|
|
"authenticated": True,
|
|
})
|
|
# Send player list
|
|
await ws.send_json({
|
|
"type": "player_joined",
|
|
"players": room.player_list(),
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
avg_rating = sum(p.rating for p in players) / len(players)
|
|
logger.info(
|
|
f"Match created: room={room.code}, "
|
|
f"players={[p.username for p in players]}, "
|
|
f"avg_rating={avg_rating:.0f}"
|
|
)
|
|
|
|
# Schedule auto-start after countdown
|
|
asyncio.create_task(self._auto_start_game(room, self.config.countdown_seconds))
|
|
|
|
return match_info
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create match: {e}")
|
|
return None
|
|
|
|
async def _auto_start_game(self, room, countdown: int):
|
|
"""Auto-start a matched game after countdown."""
|
|
from game import GamePhase, GameOptions
|
|
|
|
await asyncio.sleep(countdown)
|
|
|
|
if room.game.phase != GamePhase.WAITING:
|
|
return # Game already started or room closed
|
|
|
|
if len(room.players) < 2:
|
|
return # Not enough players
|
|
|
|
# Standard rules for ranked games
|
|
options = GameOptions()
|
|
options.flip_mode = "never"
|
|
options.initial_flips = 2
|
|
|
|
try:
|
|
async with room.game_lock:
|
|
room.game.start_game(1, 9, options) # 1 deck, 9 rounds, standard rules
|
|
|
|
# Send game started to all players
|
|
for pid, rp in room.players.items():
|
|
if rp.websocket and not rp.is_cpu:
|
|
try:
|
|
state = room.game.get_state(pid)
|
|
await rp.websocket.send_json({
|
|
"type": "game_started",
|
|
"game_state": state,
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
logger.info(f"Auto-started matched game in room {room.code}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to auto-start matched game: {e}")
|
|
|
|
def _get_rating_window(self, wait_time: float) -> int:
|
|
"""Calculate the current rating window based on wait time."""
|
|
expansions = int(wait_time / self.config.expand_interval)
|
|
window = self.config.initial_rating_window + (expansions * self.config.expand_amount)
|
|
return min(window, self.config.max_rating_window)
|
|
|
|
def _get_position(self, user_id: str) -> int:
|
|
"""Get a player's position in the queue (1-indexed)."""
|
|
sorted_ids = sorted(
|
|
self._queue.keys(),
|
|
key=lambda uid: self._queue[uid].queued_at,
|
|
)
|
|
try:
|
|
return sorted_ids.index(user_id) + 1
|
|
except ValueError:
|
|
return 0
|
|
|
|
async def start(self, room_manager, broadcast_fn):
|
|
"""Start the matchmaking background task."""
|
|
if self._running:
|
|
return
|
|
|
|
self._running = True
|
|
self._task = asyncio.create_task(
|
|
self._matchmaking_loop(room_manager, broadcast_fn)
|
|
)
|
|
logger.info("Matchmaking service started")
|
|
|
|
async def stop(self):
|
|
"""Stop the matchmaking background task."""
|
|
self._running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("Matchmaking service stopped")
|
|
|
|
async def _matchmaking_loop(self, room_manager, broadcast_fn):
|
|
"""Background task that periodically checks for matches."""
|
|
while self._running:
|
|
try:
|
|
matches = await self.find_matches(room_manager, broadcast_fn)
|
|
if matches:
|
|
logger.info(f"Created {len(matches)} match(es)")
|
|
|
|
# Send queue status updates to all queued players
|
|
for user_id in list(self._queue.keys()):
|
|
ws = self._websockets.get(user_id)
|
|
if ws:
|
|
try:
|
|
status = await self.get_queue_status(user_id)
|
|
await ws.send_json({
|
|
"type": "queue_status",
|
|
**status,
|
|
})
|
|
except Exception:
|
|
# Player disconnected, remove from queue
|
|
await self.leave_queue(user_id)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Matchmaking error: {e}")
|
|
|
|
await asyncio.sleep(self.config.match_check_interval)
|
|
|
|
async def cleanup(self):
|
|
"""Clean up Redis queue data on shutdown."""
|
|
if self.redis:
|
|
try:
|
|
await self.redis.delete("matchmaking:queue")
|
|
await self.redis.delete("matchmaking:players")
|
|
except Exception:
|
|
pass
|