Huge v2 uplift, now deployable with real user management and tooling!
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
306
server/stores/pubsub.py
Normal file
306
server/stores/pubsub.py
Normal file
@@ -0,0 +1,306 @@
|
||||
"""
|
||||
Redis pub/sub for cross-server game events.
|
||||
|
||||
In a multi-server deployment, each server has its own WebSocket connections.
|
||||
When a game action occurs, the server handling that action needs to notify
|
||||
all other servers so they can update their connected clients.
|
||||
|
||||
This module provides:
|
||||
- Pub/sub channels per room for targeted broadcasting
|
||||
- Message types for state updates, player events, and broadcasts
|
||||
- Async listener loop for handling incoming messages
|
||||
- Clean subscription management
|
||||
|
||||
Usage:
|
||||
pubsub = GamePubSub(redis_client)
|
||||
await pubsub.start()
|
||||
|
||||
# Subscribe to room events
|
||||
async def handle_message(msg: PubSubMessage):
|
||||
print(f"Received: {msg.type} for room {msg.room_code}")
|
||||
|
||||
await pubsub.subscribe("ABCD", handle_message)
|
||||
|
||||
# Publish to room
|
||||
await pubsub.publish(PubSubMessage(
|
||||
type=MessageType.GAME_STATE_UPDATE,
|
||||
room_code="ABCD",
|
||||
data={"game_state": {...}},
|
||||
))
|
||||
|
||||
await pubsub.stop()
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Callable, Awaitable, Optional
|
||||
|
||||
import redis.asyncio as redis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MessageType(str, Enum):
|
||||
"""Types of messages that can be published via pub/sub."""
|
||||
|
||||
# Game state changed (other servers should update their cache)
|
||||
GAME_STATE_UPDATE = "game_state_update"
|
||||
|
||||
# Player connected to room (for presence tracking)
|
||||
PLAYER_JOINED = "player_joined"
|
||||
|
||||
# Player disconnected from room
|
||||
PLAYER_LEFT = "player_left"
|
||||
|
||||
# Room is being closed (game ended or abandoned)
|
||||
ROOM_CLOSED = "room_closed"
|
||||
|
||||
# Generic broadcast to all clients in room
|
||||
BROADCAST = "broadcast"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PubSubMessage:
|
||||
"""
|
||||
Message sent via Redis pub/sub.
|
||||
|
||||
Attributes:
|
||||
type: Message type (determines how handlers process it).
|
||||
room_code: Room this message is for.
|
||||
data: Message payload (type-specific).
|
||||
sender_id: Optional server ID of sender (to avoid echo).
|
||||
"""
|
||||
|
||||
type: MessageType
|
||||
room_code: str
|
||||
data: dict
|
||||
sender_id: Optional[str] = None
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Serialize to JSON for Redis."""
|
||||
return json.dumps({
|
||||
"type": self.type.value,
|
||||
"room_code": self.room_code,
|
||||
"data": self.data,
|
||||
"sender_id": self.sender_id,
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, raw: str) -> "PubSubMessage":
|
||||
"""Deserialize from JSON."""
|
||||
d = json.loads(raw)
|
||||
return cls(
|
||||
type=MessageType(d["type"]),
|
||||
room_code=d["room_code"],
|
||||
data=d.get("data", {}),
|
||||
sender_id=d.get("sender_id"),
|
||||
)
|
||||
|
||||
|
||||
# Type alias for message handlers
|
||||
MessageHandler = Callable[[PubSubMessage], Awaitable[None]]
|
||||
|
||||
|
||||
class GamePubSub:
|
||||
"""
|
||||
Redis pub/sub for cross-server game events.
|
||||
|
||||
Manages subscriptions to room channels and dispatches incoming
|
||||
messages to registered handlers.
|
||||
"""
|
||||
|
||||
CHANNEL_PREFIX = "golf:room:"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
redis_client: redis.Redis,
|
||||
server_id: str = "default",
|
||||
):
|
||||
"""
|
||||
Initialize pub/sub with Redis client.
|
||||
|
||||
Args:
|
||||
redis_client: Async Redis client.
|
||||
server_id: Unique ID for this server instance.
|
||||
"""
|
||||
self.redis = redis_client
|
||||
self.server_id = server_id
|
||||
self.pubsub = redis_client.pubsub()
|
||||
self._handlers: dict[str, list[MessageHandler]] = {}
|
||||
self._running = False
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
|
||||
def _channel(self, room_code: str) -> str:
|
||||
"""Get Redis channel name for a room."""
|
||||
return f"{self.CHANNEL_PREFIX}{room_code}"
|
||||
|
||||
async def subscribe(
|
||||
self,
|
||||
room_code: str,
|
||||
handler: MessageHandler,
|
||||
) -> None:
|
||||
"""
|
||||
Subscribe to room events.
|
||||
|
||||
Args:
|
||||
room_code: Room to subscribe to.
|
||||
handler: Async function to call on each message.
|
||||
"""
|
||||
channel = self._channel(room_code)
|
||||
if channel not in self._handlers:
|
||||
self._handlers[channel] = []
|
||||
await self.pubsub.subscribe(channel)
|
||||
logger.debug(f"Subscribed to channel {channel}")
|
||||
self._handlers[channel].append(handler)
|
||||
|
||||
async def unsubscribe(self, room_code: str) -> None:
|
||||
"""
|
||||
Unsubscribe from room events.
|
||||
|
||||
Args:
|
||||
room_code: Room to unsubscribe from.
|
||||
"""
|
||||
channel = self._channel(room_code)
|
||||
if channel in self._handlers:
|
||||
del self._handlers[channel]
|
||||
await self.pubsub.unsubscribe(channel)
|
||||
logger.debug(f"Unsubscribed from channel {channel}")
|
||||
|
||||
async def remove_handler(self, room_code: str, handler: MessageHandler) -> None:
|
||||
"""
|
||||
Remove a specific handler from a room subscription.
|
||||
|
||||
Args:
|
||||
room_code: Room the handler was registered for.
|
||||
handler: Handler to remove.
|
||||
"""
|
||||
channel = self._channel(room_code)
|
||||
if channel in self._handlers:
|
||||
handlers = self._handlers[channel]
|
||||
if handler in handlers:
|
||||
handlers.remove(handler)
|
||||
# If no handlers left, unsubscribe
|
||||
if not handlers:
|
||||
await self.unsubscribe(room_code)
|
||||
|
||||
async def publish(self, message: PubSubMessage) -> int:
|
||||
"""
|
||||
Publish a message to a room's channel.
|
||||
|
||||
Args:
|
||||
message: Message to publish.
|
||||
|
||||
Returns:
|
||||
Number of subscribers that received the message.
|
||||
"""
|
||||
# Add sender ID so we can filter out our own messages
|
||||
message.sender_id = self.server_id
|
||||
channel = self._channel(message.room_code)
|
||||
count = await self.redis.publish(channel, message.to_json())
|
||||
logger.debug(f"Published {message.type.value} to {channel} ({count} receivers)")
|
||||
return count
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start listening for messages."""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._task = asyncio.create_task(self._listen())
|
||||
logger.info("GamePubSub listener started")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop listening and clean up."""
|
||||
self._running = False
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
|
||||
await self.pubsub.close()
|
||||
self._handlers.clear()
|
||||
logger.info("GamePubSub listener stopped")
|
||||
|
||||
async def _listen(self) -> None:
|
||||
"""Main listener loop."""
|
||||
while self._running:
|
||||
try:
|
||||
message = await self.pubsub.get_message(
|
||||
ignore_subscribe_messages=True,
|
||||
timeout=1.0,
|
||||
)
|
||||
if message and message["type"] == "message":
|
||||
await self._handle_message(message)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except redis.ConnectionError as e:
|
||||
logger.error(f"PubSub connection error: {e}")
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
logger.error(f"PubSub listener error: {e}", exc_info=True)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def _handle_message(self, raw_message: dict) -> None:
|
||||
"""Handle an incoming Redis message."""
|
||||
try:
|
||||
channel = raw_message["channel"]
|
||||
if isinstance(channel, bytes):
|
||||
channel = channel.decode()
|
||||
|
||||
data = raw_message["data"]
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode()
|
||||
|
||||
msg = PubSubMessage.from_json(data)
|
||||
|
||||
# Skip messages from ourselves
|
||||
if msg.sender_id == self.server_id:
|
||||
return
|
||||
|
||||
handlers = self._handlers.get(channel, [])
|
||||
for handler in handlers:
|
||||
try:
|
||||
await handler(msg)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in pubsub handler: {e}", exc_info=True)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning(f"Invalid JSON in pubsub message: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing pubsub message: {e}", exc_info=True)
|
||||
|
||||
|
||||
# Global pub/sub instance
|
||||
_pubsub: Optional[GamePubSub] = None
|
||||
|
||||
|
||||
async def get_pubsub(redis_client: redis.Redis, server_id: str = "default") -> GamePubSub:
|
||||
"""
|
||||
Get or create the global pub/sub instance.
|
||||
|
||||
Args:
|
||||
redis_client: Redis client to use.
|
||||
server_id: Unique ID for this server.
|
||||
|
||||
Returns:
|
||||
GamePubSub instance.
|
||||
"""
|
||||
global _pubsub
|
||||
if _pubsub is None:
|
||||
_pubsub = GamePubSub(redis_client, server_id)
|
||||
return _pubsub
|
||||
|
||||
|
||||
async def close_pubsub() -> None:
|
||||
"""Stop and close the global pub/sub instance."""
|
||||
global _pubsub
|
||||
if _pubsub is not None:
|
||||
await _pubsub.stop()
|
||||
_pubsub = None
|
||||
Reference in New Issue
Block a user