fix(server): winner_id on completed games + stats idempotency latch

Two issues in the GAME_OVER broadcast path:

1. log_game_end called update_game_completed with winner_id=None default,
   so games_v2.winner_id was NULL on all 17 completed staging rows. The
   denormalized column existed but carried no information. Compute winner
   (lowest total; None on tie) in broadcast_game_state and thread through.

2. _process_stats_safe had no idempotency guard. log_game_end was already
   self-guarding via game_log_id=None after first fire, but nothing
   stopped repeated GAME_OVER broadcasts from re-firing stats and
   double-counting games_played/games_won. Add Room.stats_processed latch;
   reset it in handle_start_game so a re-used room still records.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
adlee-was-taken
2026-04-18 00:47:53 -04:00
parent 8030a3c171
commit c02b0054c2
5 changed files with 117 additions and 28 deletions

View File

@@ -209,6 +209,8 @@ async def handle_start_game(data: dict, ctx: ConnectionContext, *, broadcast_gam
async with ctx.current_room.game_lock:
ctx.current_room.game.start_game(num_decks, num_rounds, options)
# Reset the per-game idempotency latch so this game's stats can fire.
ctx.current_room.stats_processed = False
game_logger = get_logger()
if game_logger:

View File

@@ -754,11 +754,24 @@ async def broadcast_game_state(room: Room):
# Process game completion BEFORE the per-player loop so it runs exactly
# once and isn't gated on any player still being connected.
if room.game.phase == GamePhase.GAME_OVER:
# Determine winner (lowest total; None on tie) so games_v2.winner_id
# is actually populated and stats/rating agree with each other.
winner_id: Optional[str] = None
if room.game.players:
lowest = min(p.total_score for p in room.game.players)
leaders = [p for p in room.game.players if p.total_score == lowest]
if len(leaders) == 1:
winner_id = leaders[0].id
game_logger = get_logger()
if game_logger and room.game_log_id:
game_logger.log_game_end(room.game_log_id)
game_logger.log_game_end(room.game_log_id, winner_id=winner_id)
room.game_log_id = None
if _stats_service and room.game.players:
# Idempotency: latch on room so repeat GAME_OVER broadcasts don't
# double-count. Set before scheduling the task — the task itself is
# fire-and-forget and might outlive this function.
if _stats_service and room.game.players and not room.stats_processed:
room.stats_processed = True
asyncio.create_task(_process_stats_safe(room))
for pid, player in list(room.players.items()):

View File

@@ -73,6 +73,10 @@ class Room:
game_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
cpu_turn_task: Optional[asyncio.Task] = None
last_activity: float = field(default_factory=time.time)
# Latched True after _process_stats_safe fires for this game; prevents
# double-counting if broadcast_game_state is invoked multiple times
# with phase=GAME_OVER (double-click on next-round, reconnect flush).
stats_processed: bool = False
def touch(self) -> None:
"""Update last_activity timestamp to mark room as active."""

View File

@@ -143,20 +143,22 @@ class GameLogger:
)
)
async def log_game_end_async(self, game_id: str) -> None:
async def log_game_end_async(
self,
game_id: str,
winner_id: Optional[str] = None,
) -> None:
"""
Mark game as ended.
Args:
game_id: Game UUID.
Mark game as ended. winner_id is the player who finished with the
lowest total — None when tied or when the caller doesn't have it.
"""
try:
await self.event_store.update_game_completed(game_id)
log.debug(f"Logged game end: {game_id}")
await self.event_store.update_game_completed(game_id, winner_id)
log.debug(f"Logged game end: {game_id} winner={winner_id}")
except Exception as e:
log.error(f"Failed to log game end: {e}")
def log_game_end(self, game_id: str) -> None:
def log_game_end(self, game_id: str, winner_id: Optional[str] = None) -> None:
"""
Sync wrapper for log_game_end_async.
@@ -166,8 +168,8 @@ class GameLogger:
return
try:
loop = asyncio.get_running_loop()
asyncio.create_task(self.log_game_end_async(game_id))
asyncio.get_running_loop()
asyncio.create_task(self.log_game_end_async(game_id, winner_id))
except RuntimeError:
# Not in async context - skip (simulations don't need this)
pass

View File

@@ -233,27 +233,95 @@ class TestLogGameStartPopulatesMetadata:
)
event_store.update_game_started.assert_not_awaited()
"""Only the host ends games — non-host requests are rejected unchanged."""
class TestLogGameEndWinnerId:
"""
update_game_completed accepts winner_id but the existing sync wrapper
called it with the default None → every completed games_v2 row had
winner_id NULL. Thread the winner through so the denormalized column
is actually useful.
"""
@pytest.mark.asyncio
async def test_winner_id_passed_through(self):
event_store = MagicMock()
event_store.update_game_completed = AsyncMock()
logger = GameLogger(event_store)
await logger.log_game_end_async("game-uuid", winner_id="player-7")
event_store.update_game_completed.assert_awaited_once_with("game-uuid", "player-7")
@pytest.mark.asyncio
async def test_winner_id_optional(self):
"""A tie or abandonment-style end without a clear winner still works."""
event_store = MagicMock()
event_store.update_game_completed = AsyncMock()
logger = GameLogger(event_store)
await logger.log_game_end_async("game-uuid")
event_store.update_game_completed.assert_awaited_once_with("game-uuid", None)
@pytest.mark.asyncio
async def test_sync_wrapper_forwards_winner(self):
event_store = MagicMock()
event_store.update_game_completed = AsyncMock()
logger = GameLogger(event_store)
logger.log_game_end("game-uuid", winner_id="player-9")
await asyncio.sleep(0)
await asyncio.sleep(0)
event_store.update_game_completed.assert_awaited_once_with("game-uuid", "player-9")
class TestStatsIdempotency:
"""
broadcast_game_state can fire multiple times with phase=GAME_OVER
(double-click next-round, reconnect flush, etc.). log_game_end is
already idempotent because it nulls game_log_id immediately after.
_process_stats_safe had no such guard → every extra broadcast would
double-count games_played/games_won on the same game.
Solution: Room.stats_processed flag. Set True before firing the task.
"""
def test_room_has_stats_processed_flag_defaulting_false(self):
room = Room(code="TEST")
assert room.stats_processed is False
def test_stats_processed_survives_touch(self):
"""touch() updates last_activity but must not clobber stats_processed."""
room = Room(code="TEST")
room.stats_processed = True
room.touch()
assert room.stats_processed is True
@pytest.mark.asyncio
async def test_start_game_resets_stats_processed(self):
"""When a room is reused for a second game, the latch must reset —
otherwise the new game's stats would be silently dropped."""
from handlers import handle_start_game
rm = RoomManager()
room = rm.create_room()
room.add_player("host", "Host", MockWebSocket())
host_ws = MockWebSocket()
room.add_player("host", "Host", host_ws)
room.get_player("host").is_host = True
joiner_ws = MockWebSocket()
room.add_player("joiner", "Joiner", joiner_ws)
room.game_log_id = "game-uuid-untouchable"
room.add_player("p2", "P2", MockWebSocket())
# Previous game already had stats processed
room.stats_processed = True
mock_logger = MagicMock()
mock_logger.log_game_abandoned = MagicMock()
ctx = make_ctx(websocket=host_ws, player_id="host", room=room)
ctx = make_ctx(websocket=joiner_ws, player_id="joiner", room=room)
with patch("handlers.get_logger", return_value=mock_logger):
await handle_end_game(
{},
with patch("handlers.get_logger", return_value=None):
await handle_start_game(
{"decks": 1, "rounds": 1},
ctx,
room_manager=rm,
cleanup_room_profiles=lambda _code: None,
broadcast_game_state=AsyncMock(),
check_and_run_cpu_turn=lambda _r: None,
)
mock_logger.log_game_abandoned.assert_not_called()
assert room.code in rm.rooms # room still exists
assert room.stats_processed is False