import asyncio import random from game_flow import PlayerWebSocketHandler, TEST_SCENARIOS async def simulate_matchmaking(num_players: int = 6): print(f"\n๐ŸŽฎ Spawning {num_players} players...\n") # 1) Login + connect players = await asyncio.gather(*[ PlayerWebSocketHandler.setup_player(f"player_{i}") for i in range(num_players) ]) print("\nโœ… All players authenticated + connected\n") # 2) Start listeners BEFORE matchmaking for p in players: p.start_listener() print("\n๐Ÿ‘‚ WebSocket listeners active\n") await asyncio.sleep(0.3) # โœ… 3) Split evenly between classic & blitz half = num_players // 2 assignments = ["classic"] * half + ["blitz"] * (num_players - half) random.shuffle(assignments) print("\n๐ŸŽฏ Queuing players:") for p, mode in zip(players, assignments): print(f" - {p.label} -> {mode}") await asyncio.gather(*[ p.join_matchmaking(mode) for p, mode in zip(players, assignments) ]) print("\nโœ… All players queued โ€” waiting for matches...\n") # โœ… 4) Collect matches matches = {} timeout = 15 start = asyncio.get_event_loop().time() matches = dict() while asyncio.get_event_loop().time() - start < timeout: for p in players: # print(f'player = {p.label} for match_id = {p.match_id}') # print(f'players = {len(matches.get(p.match_id, list()))} for match = {p.match_id}') if p.match_id: # matches.setdefault(p.match_id, []).append(p) if p.match_id not in matches: matches[p.match_id] = [p] elif p not in matches[p.match_id]: matches[p.match_id].append(p) # print(f'player = {p.label} for match = {p.match_id}') # print(f'players = {len(matches[p.match_id])} for match = {p.match_id}') # stop early if all assigned if sum(len(v) for v in matches.values()) >= num_players: break await asyncio.sleep(0.25) print(f"\nโœ… Matchmaking complete โ€” {len(matches)} matches formed\n") # โœ… 5) Assign random scenarios per match & run tasks = [] for match_id, grouped_players in matches.items(): if len(grouped_players) != 2: print(f"โš ๏ธ Skipping match {match_id} โ€” not 1v1") continue p1, p2 = grouped_players scenario = random.choice(TEST_SCENARIOS) print( f"๐ŸŽญ Running scenario '{scenario.__name__}' โ€” " f"{p1.label} vs {p2.label} | match={match_id}" ) tasks.append(asyncio.create_task( scenario(match_id, p1, p2) )) # โœ… 6) Wait for all mock games to finish if tasks: await asyncio.gather(*tasks) else: print("โš ๏ธ No playable matches found") # โœ… 7) Cleanup connections print("\n๐Ÿงน Closing player connections...\n") await asyncio.gather(*[p.close() for p in players]) print("\n๐Ÿ Matchmaking test run complete โœ…\n") if __name__ == "__main__": asyncio.run(simulate_matchmaking(6))