golfgame/server/stores/pubsub.py
Aaron D. Lee bea85e6b28 Huge v2 uplift, now deployable with real user management and tooling!
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 11:32:15 -05:00

307 lines
8.9 KiB
Python

"""
Redis pub/sub for cross-server game events.
In a multi-server deployment, each server has its own WebSocket connections.
When a game action occurs, the server handling that action needs to notify
all other servers so they can update their connected clients.
This module provides:
- Pub/sub channels per room for targeted broadcasting
- Message types for state updates, player events, and broadcasts
- Async listener loop for handling incoming messages
- Clean subscription management
Usage:
pubsub = GamePubSub(redis_client)
await pubsub.start()
# Subscribe to room events
async def handle_message(msg: PubSubMessage):
print(f"Received: {msg.type} for room {msg.room_code}")
await pubsub.subscribe("ABCD", handle_message)
# Publish to room
await pubsub.publish(PubSubMessage(
type=MessageType.GAME_STATE_UPDATE,
room_code="ABCD",
data={"game_state": {...}},
))
await pubsub.stop()
"""
import asyncio
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Awaitable, Optional
import redis.asyncio as redis
logger = logging.getLogger(__name__)
class MessageType(str, Enum):
"""Types of messages that can be published via pub/sub."""
# Game state changed (other servers should update their cache)
GAME_STATE_UPDATE = "game_state_update"
# Player connected to room (for presence tracking)
PLAYER_JOINED = "player_joined"
# Player disconnected from room
PLAYER_LEFT = "player_left"
# Room is being closed (game ended or abandoned)
ROOM_CLOSED = "room_closed"
# Generic broadcast to all clients in room
BROADCAST = "broadcast"
@dataclass
class PubSubMessage:
"""
Message sent via Redis pub/sub.
Attributes:
type: Message type (determines how handlers process it).
room_code: Room this message is for.
data: Message payload (type-specific).
sender_id: Optional server ID of sender (to avoid echo).
"""
type: MessageType
room_code: str
data: dict
sender_id: Optional[str] = None
def to_json(self) -> str:
"""Serialize to JSON for Redis."""
return json.dumps({
"type": self.type.value,
"room_code": self.room_code,
"data": self.data,
"sender_id": self.sender_id,
})
@classmethod
def from_json(cls, raw: str) -> "PubSubMessage":
"""Deserialize from JSON."""
d = json.loads(raw)
return cls(
type=MessageType(d["type"]),
room_code=d["room_code"],
data=d.get("data", {}),
sender_id=d.get("sender_id"),
)
# Type alias for message handlers
MessageHandler = Callable[[PubSubMessage], Awaitable[None]]
class GamePubSub:
"""
Redis pub/sub for cross-server game events.
Manages subscriptions to room channels and dispatches incoming
messages to registered handlers.
"""
CHANNEL_PREFIX = "golf:room:"
def __init__(
self,
redis_client: redis.Redis,
server_id: str = "default",
):
"""
Initialize pub/sub with Redis client.
Args:
redis_client: Async Redis client.
server_id: Unique ID for this server instance.
"""
self.redis = redis_client
self.server_id = server_id
self.pubsub = redis_client.pubsub()
self._handlers: dict[str, list[MessageHandler]] = {}
self._running = False
self._task: Optional[asyncio.Task] = None
def _channel(self, room_code: str) -> str:
"""Get Redis channel name for a room."""
return f"{self.CHANNEL_PREFIX}{room_code}"
async def subscribe(
self,
room_code: str,
handler: MessageHandler,
) -> None:
"""
Subscribe to room events.
Args:
room_code: Room to subscribe to.
handler: Async function to call on each message.
"""
channel = self._channel(room_code)
if channel not in self._handlers:
self._handlers[channel] = []
await self.pubsub.subscribe(channel)
logger.debug(f"Subscribed to channel {channel}")
self._handlers[channel].append(handler)
async def unsubscribe(self, room_code: str) -> None:
"""
Unsubscribe from room events.
Args:
room_code: Room to unsubscribe from.
"""
channel = self._channel(room_code)
if channel in self._handlers:
del self._handlers[channel]
await self.pubsub.unsubscribe(channel)
logger.debug(f"Unsubscribed from channel {channel}")
async def remove_handler(self, room_code: str, handler: MessageHandler) -> None:
"""
Remove a specific handler from a room subscription.
Args:
room_code: Room the handler was registered for.
handler: Handler to remove.
"""
channel = self._channel(room_code)
if channel in self._handlers:
handlers = self._handlers[channel]
if handler in handlers:
handlers.remove(handler)
# If no handlers left, unsubscribe
if not handlers:
await self.unsubscribe(room_code)
async def publish(self, message: PubSubMessage) -> int:
"""
Publish a message to a room's channel.
Args:
message: Message to publish.
Returns:
Number of subscribers that received the message.
"""
# Add sender ID so we can filter out our own messages
message.sender_id = self.server_id
channel = self._channel(message.room_code)
count = await self.redis.publish(channel, message.to_json())
logger.debug(f"Published {message.type.value} to {channel} ({count} receivers)")
return count
async def start(self) -> None:
"""Start listening for messages."""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._listen())
logger.info("GamePubSub listener started")
async def stop(self) -> None:
"""Stop listening and clean up."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
await self.pubsub.close()
self._handlers.clear()
logger.info("GamePubSub listener stopped")
async def _listen(self) -> None:
"""Main listener loop."""
while self._running:
try:
message = await self.pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0,
)
if message and message["type"] == "message":
await self._handle_message(message)
except asyncio.CancelledError:
break
except redis.ConnectionError as e:
logger.error(f"PubSub connection error: {e}")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"PubSub listener error: {e}", exc_info=True)
await asyncio.sleep(1)
async def _handle_message(self, raw_message: dict) -> None:
"""Handle an incoming Redis message."""
try:
channel = raw_message["channel"]
if isinstance(channel, bytes):
channel = channel.decode()
data = raw_message["data"]
if isinstance(data, bytes):
data = data.decode()
msg = PubSubMessage.from_json(data)
# Skip messages from ourselves
if msg.sender_id == self.server_id:
return
handlers = self._handlers.get(channel, [])
for handler in handlers:
try:
await handler(msg)
except Exception as e:
logger.error(f"Error in pubsub handler: {e}", exc_info=True)
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in pubsub message: {e}")
except Exception as e:
logger.error(f"Error processing pubsub message: {e}", exc_info=True)
# Global pub/sub instance
_pubsub: Optional[GamePubSub] = None
async def get_pubsub(redis_client: redis.Redis, server_id: str = "default") -> GamePubSub:
"""
Get or create the global pub/sub instance.
Args:
redis_client: Redis client to use.
server_id: Unique ID for this server.
Returns:
GamePubSub instance.
"""
global _pubsub
if _pubsub is None:
_pubsub = GamePubSub(redis_client, server_id)
return _pubsub
async def close_pubsub() -> None:
"""Stop and close the global pub/sub instance."""
global _pubsub
if _pubsub is not None:
await _pubsub.stop()
_pubsub = None