307 lines
8.9 KiB
Python
307 lines
8.9 KiB
Python
"""
|
|
Redis pub/sub for cross-server game events.
|
|
|
|
In a multi-server deployment, each server has its own WebSocket connections.
|
|
When a game action occurs, the server handling that action needs to notify
|
|
all other servers so they can update their connected clients.
|
|
|
|
This module provides:
|
|
- Pub/sub channels per room for targeted broadcasting
|
|
- Message types for state updates, player events, and broadcasts
|
|
- Async listener loop for handling incoming messages
|
|
- Clean subscription management
|
|
|
|
Usage:
|
|
pubsub = GamePubSub(redis_client)
|
|
await pubsub.start()
|
|
|
|
# Subscribe to room events
|
|
async def handle_message(msg: PubSubMessage):
|
|
print(f"Received: {msg.type} for room {msg.room_code}")
|
|
|
|
await pubsub.subscribe("ABCD", handle_message)
|
|
|
|
# Publish to room
|
|
await pubsub.publish(PubSubMessage(
|
|
type=MessageType.GAME_STATE_UPDATE,
|
|
room_code="ABCD",
|
|
data={"game_state": {...}},
|
|
))
|
|
|
|
await pubsub.stop()
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Callable, Awaitable, Optional
|
|
|
|
import redis.asyncio as redis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessageType(str, Enum):
|
|
"""Types of messages that can be published via pub/sub."""
|
|
|
|
# Game state changed (other servers should update their cache)
|
|
GAME_STATE_UPDATE = "game_state_update"
|
|
|
|
# Player connected to room (for presence tracking)
|
|
PLAYER_JOINED = "player_joined"
|
|
|
|
# Player disconnected from room
|
|
PLAYER_LEFT = "player_left"
|
|
|
|
# Room is being closed (game ended or abandoned)
|
|
ROOM_CLOSED = "room_closed"
|
|
|
|
# Generic broadcast to all clients in room
|
|
BROADCAST = "broadcast"
|
|
|
|
|
|
@dataclass
|
|
class PubSubMessage:
|
|
"""
|
|
Message sent via Redis pub/sub.
|
|
|
|
Attributes:
|
|
type: Message type (determines how handlers process it).
|
|
room_code: Room this message is for.
|
|
data: Message payload (type-specific).
|
|
sender_id: Optional server ID of sender (to avoid echo).
|
|
"""
|
|
|
|
type: MessageType
|
|
room_code: str
|
|
data: dict
|
|
sender_id: Optional[str] = None
|
|
|
|
def to_json(self) -> str:
|
|
"""Serialize to JSON for Redis."""
|
|
return json.dumps({
|
|
"type": self.type.value,
|
|
"room_code": self.room_code,
|
|
"data": self.data,
|
|
"sender_id": self.sender_id,
|
|
})
|
|
|
|
@classmethod
|
|
def from_json(cls, raw: str) -> "PubSubMessage":
|
|
"""Deserialize from JSON."""
|
|
d = json.loads(raw)
|
|
return cls(
|
|
type=MessageType(d["type"]),
|
|
room_code=d["room_code"],
|
|
data=d.get("data", {}),
|
|
sender_id=d.get("sender_id"),
|
|
)
|
|
|
|
|
|
# Type alias for message handlers
|
|
MessageHandler = Callable[[PubSubMessage], Awaitable[None]]
|
|
|
|
|
|
class GamePubSub:
|
|
"""
|
|
Redis pub/sub for cross-server game events.
|
|
|
|
Manages subscriptions to room channels and dispatches incoming
|
|
messages to registered handlers.
|
|
"""
|
|
|
|
CHANNEL_PREFIX = "golf:room:"
|
|
|
|
def __init__(
|
|
self,
|
|
redis_client: redis.Redis,
|
|
server_id: str = "default",
|
|
):
|
|
"""
|
|
Initialize pub/sub with Redis client.
|
|
|
|
Args:
|
|
redis_client: Async Redis client.
|
|
server_id: Unique ID for this server instance.
|
|
"""
|
|
self.redis = redis_client
|
|
self.server_id = server_id
|
|
self.pubsub = redis_client.pubsub()
|
|
self._handlers: dict[str, list[MessageHandler]] = {}
|
|
self._running = False
|
|
self._task: Optional[asyncio.Task] = None
|
|
|
|
def _channel(self, room_code: str) -> str:
|
|
"""Get Redis channel name for a room."""
|
|
return f"{self.CHANNEL_PREFIX}{room_code}"
|
|
|
|
async def subscribe(
|
|
self,
|
|
room_code: str,
|
|
handler: MessageHandler,
|
|
) -> None:
|
|
"""
|
|
Subscribe to room events.
|
|
|
|
Args:
|
|
room_code: Room to subscribe to.
|
|
handler: Async function to call on each message.
|
|
"""
|
|
channel = self._channel(room_code)
|
|
if channel not in self._handlers:
|
|
self._handlers[channel] = []
|
|
await self.pubsub.subscribe(channel)
|
|
logger.debug(f"Subscribed to channel {channel}")
|
|
self._handlers[channel].append(handler)
|
|
|
|
async def unsubscribe(self, room_code: str) -> None:
|
|
"""
|
|
Unsubscribe from room events.
|
|
|
|
Args:
|
|
room_code: Room to unsubscribe from.
|
|
"""
|
|
channel = self._channel(room_code)
|
|
if channel in self._handlers:
|
|
del self._handlers[channel]
|
|
await self.pubsub.unsubscribe(channel)
|
|
logger.debug(f"Unsubscribed from channel {channel}")
|
|
|
|
async def remove_handler(self, room_code: str, handler: MessageHandler) -> None:
|
|
"""
|
|
Remove a specific handler from a room subscription.
|
|
|
|
Args:
|
|
room_code: Room the handler was registered for.
|
|
handler: Handler to remove.
|
|
"""
|
|
channel = self._channel(room_code)
|
|
if channel in self._handlers:
|
|
handlers = self._handlers[channel]
|
|
if handler in handlers:
|
|
handlers.remove(handler)
|
|
# If no handlers left, unsubscribe
|
|
if not handlers:
|
|
await self.unsubscribe(room_code)
|
|
|
|
async def publish(self, message: PubSubMessage) -> int:
|
|
"""
|
|
Publish a message to a room's channel.
|
|
|
|
Args:
|
|
message: Message to publish.
|
|
|
|
Returns:
|
|
Number of subscribers that received the message.
|
|
"""
|
|
# Add sender ID so we can filter out our own messages
|
|
message.sender_id = self.server_id
|
|
channel = self._channel(message.room_code)
|
|
count = await self.redis.publish(channel, message.to_json())
|
|
logger.debug(f"Published {message.type.value} to {channel} ({count} receivers)")
|
|
return count
|
|
|
|
async def start(self) -> None:
|
|
"""Start listening for messages."""
|
|
if self._running:
|
|
return
|
|
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._listen())
|
|
logger.info("GamePubSub listener started")
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop listening and clean up."""
|
|
self._running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._task = None
|
|
|
|
await self.pubsub.close()
|
|
self._handlers.clear()
|
|
logger.info("GamePubSub listener stopped")
|
|
|
|
async def _listen(self) -> None:
|
|
"""Main listener loop."""
|
|
while self._running:
|
|
try:
|
|
message = await self.pubsub.get_message(
|
|
ignore_subscribe_messages=True,
|
|
timeout=1.0,
|
|
)
|
|
if message and message["type"] == "message":
|
|
await self._handle_message(message)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except redis.ConnectionError as e:
|
|
logger.error(f"PubSub connection error: {e}")
|
|
await asyncio.sleep(1)
|
|
except Exception as e:
|
|
logger.error(f"PubSub listener error: {e}", exc_info=True)
|
|
await asyncio.sleep(1)
|
|
|
|
async def _handle_message(self, raw_message: dict) -> None:
|
|
"""Handle an incoming Redis message."""
|
|
try:
|
|
channel = raw_message["channel"]
|
|
if isinstance(channel, bytes):
|
|
channel = channel.decode()
|
|
|
|
data = raw_message["data"]
|
|
if isinstance(data, bytes):
|
|
data = data.decode()
|
|
|
|
msg = PubSubMessage.from_json(data)
|
|
|
|
# Skip messages from ourselves
|
|
if msg.sender_id == self.server_id:
|
|
return
|
|
|
|
handlers = self._handlers.get(channel, [])
|
|
for handler in handlers:
|
|
try:
|
|
await handler(msg)
|
|
except Exception as e:
|
|
logger.error(f"Error in pubsub handler: {e}", exc_info=True)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Invalid JSON in pubsub message: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing pubsub message: {e}", exc_info=True)
|
|
|
|
|
|
# Global pub/sub instance
|
|
_pubsub: Optional[GamePubSub] = None
|
|
|
|
|
|
async def get_pubsub(redis_client: redis.Redis, server_id: str = "default") -> GamePubSub:
|
|
"""
|
|
Get or create the global pub/sub instance.
|
|
|
|
Args:
|
|
redis_client: Redis client to use.
|
|
server_id: Unique ID for this server.
|
|
|
|
Returns:
|
|
GamePubSub instance.
|
|
"""
|
|
global _pubsub
|
|
if _pubsub is None:
|
|
_pubsub = GamePubSub(redis_client, server_id)
|
|
return _pubsub
|
|
|
|
|
|
async def close_pubsub() -> None:
|
|
"""Stop and close the global pub/sub instance."""
|
|
global _pubsub
|
|
if _pubsub is not None:
|
|
await _pubsub.stop()
|
|
_pubsub = None
|