import random import asyncio import base64 import json from typing import Awaitable, Callable import requests import websockets HOST = "http://127.0.0.1:7350" WS = "ws://127.0.0.1:7350" SERVER_KEY = "defaultkey" def print_board(board): # board = [["X","",""], ["","O",""], ["","",""]] symbols = lambda c: c if c else " " rows = [" | ".join(symbols(c) for c in row) for row in board] separator = "\n---------\n" print("\n" + separator.join(rows) + "\n") class WebSocketHandler(object): def __init__( self, custom_id: str, label: str, ): self.custom_id = custom_id self.label = label self.token = None self.ws = None self.listener_task = None async def on_message(self, msg: dict): raise NotImplementedError("Override me!") # ---------- Auth & Connect ---------- def login(self): """Authenticate via custom ID and store token.""" basic = base64.b64encode(f"{SERVER_KEY}:".encode()).decode() r = requests.post( f"{HOST}/v2/account/authenticate/custom?create=true", headers={"Authorization": f"Basic {basic}"}, json={"id": self.custom_id}, ) r.raise_for_status() self.token = r.json()["token"] return self.token async def connect(self): """Open Nakama WebSocket using stored auth token.""" if not self.token: self.login() url = f"{WS}/ws?token={self.token}" self.ws = await websockets.connect(url) # ---------- Listener ---------- async def listener(self): """Continuously receive events + decode match data.""" try: while True: raw = await self.ws.recv() msg = json.loads(raw) await self.on_message(msg) # ✅ handoff except websockets.exceptions.ConnectionClosedOK: print(f"[{self.label}] WebSocket closed gracefully") except websockets.exceptions.ConnectionClosedError as e: print(f"[{self.label}] WebSocket closed with error: {e}") def start_listener(self): """Spawn background task for async message stream.""" if self.ws: self.listener_task = asyncio.create_task(self.listener()) # ---------- Cleanup ---------- async def close(self): if self.listener_task: self.listener_task.cancel() if self.ws: await self.ws.close() print(f"[{self.label}] Connection closed") class PlayerWebSocketHandler(WebSocketHandler): @classmethod async def setup_player( cls, name: str ) -> 'PlayerWebSocketHandler': """Create WS handler, login, connect, start listener.""" handler = cls(name, name) handler.login() await handler.connect() return handler def __init__(self, custom_id: str, label: str): super().__init__(custom_id, label) self.ticket = None self.match_id = None async def on_message(self, msg): if "matchmaker_matched" in msg: match_id = msg["matchmaker_matched"]["match_id"] print(f"[{self.label}] ✅ Match found: {match_id}") await self.join_match(match_id) return if "matchmaker_ticket" in msg: self.ticket = msg["matchmaker_ticket"]["ticket"] print(f"[{self.label}] ✅ Received ticket: {self.ticket}") return if "match_data" not in msg: print(f"[{self.label}] {msg}") return md = msg["match_data"] op = int(md.get("op_code", 0)) data = md.get("data") if not data: return payload = json.loads(base64.b64decode(data).decode()) if op == 1: print(f"[{self.label}] MOVE -> {payload}") elif op == 2: print(f"\n[{self.label}] GAME STATE") # print_board(payload["board"]) print(f"Turn={payload['turn']} Winner={payload['winner']}\n") else: print(f"[{self.label}] UNKNOWN OPCODE {op}: {payload}") # ---------- Match Helpers ---------- async def join_matchmaking(self, mode: str = "classic"): """Queue into Nakama matchmaker.""" await self.ws.send(json.dumps({ "matchmaker_add": { "min_count": 2, "max_count": 2, "string_properties": {"mode": mode} } })) print(f"[{self.label}] Searching match for mode={mode}...") async def leave_matchmaking(self, ticket: str): """Remove player from Nakama matchmaking queue.""" await self.ws.send(json.dumps({ "matchmaker_remove": { "ticket": ticket } })) print(f"[{self.label}] Left matchmaking queue") async def create_match(self) -> str: await self.ws.send(json.dumps({"match_create": {}})) msg = json.loads(await self.ws.recv()) match_id = msg["match"]["match_id"] print(f"[{self.label}] Created match: {match_id}") return match_id async def join_match(self, match_id: str): await self.ws.send(json.dumps({"match_join": {"match_id": match_id}})) print(f"[{self.label}] Joined match: {match_id}") self.match_id = match_id # ---------- Gameplay ---------- async def send_move(self, match_id: str, row: int, col: int): payload = {"row": row, "col": col} encoded = base64.b64encode(json.dumps(payload).encode()).decode() await self.ws.send(json.dumps({ "match_data_send": { "match_id": match_id, "op_code": 1, "data": encoded, } })) print(f"[{self.label}] Sent move: {payload}") async def happy_path( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): # Play moves await p1.send_move(match_id, 0, 0) await asyncio.sleep(0.3) await p2.send_move(match_id, 1, 1) await asyncio.sleep(0.3) await p1.send_move(match_id, 0, 1) await asyncio.sleep(0.3) await p2.send_move(match_id, 2, 2) await asyncio.sleep(0.3) await p1.send_move(match_id, 0, 2) async def p2_wins_diagonal( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): await p1.send_move(match_id, 0, 0) await asyncio.sleep(0.3) await p2.send_move(match_id, 0, 2) await asyncio.sleep(0.3) await p1.send_move(match_id, 1, 0) await asyncio.sleep(0.3) await p2.send_move(match_id, 1, 1) await asyncio.sleep(0.3) await p1.send_move(match_id, 2, 1) await asyncio.sleep(0.3) await p2.send_move(match_id, 2, 0) # P2 wins async def draw_game( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): moves = [ (p1, 0, 0), (p2, 0, 1), (p1, 0, 2), (p2, 1, 0), (p1, 1, 2), (p2, 1, 1), (p1, 2, 1), (p2, 2, 2), (p1, 2, 0), ] for player, r, c in moves: await player.send_move(match_id, r, c) await asyncio.sleep(0.25) async def illegal_occupied_cell( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): await p1.send_move(match_id, 0, 0) await asyncio.sleep(0.3) # P2 tries same spot → should trigger server rejection await p2.send_move(match_id, 0, 0) async def illegal_out_of_turn( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): await p1.send_move(match_id, 2, 2) await asyncio.sleep(0.3) # P1 tries again before P2 await p1.send_move(match_id, 1, 1) async def illegal_out_of_bounds( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): await p1.send_move(match_id, 3, 3) # Invalid indices async def midgame_disconnect( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): await p1.send_move(match_id, 0, 0) await asyncio.sleep(0.3) await p2.close() # simulate rage quit async def abandoned_lobby( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): # No p2.join_match await asyncio.sleep(5) # test timeout, cleanup, match state async def spam_moves( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): for _ in range(10): await p1.send_move(match_id, 0, 0) await asyncio.sleep(0.05) async def random_game( match_id: str, p1: PlayerWebSocketHandler, p2: PlayerWebSocketHandler ): board = {(r, c) for r in range(3) for c in range(3)} players = [p1, p2] for i in range(9): player = players[i % 2] print(f"[{player.label}] Playing move...") r, c = random.choice(list(board)) board.remove((r, c)) await player.send_move(match_id, r, c) await asyncio.sleep(0.25) TEST_SCENARIOS = [ happy_path, p2_wins_diagonal, draw_game, illegal_occupied_cell, illegal_out_of_turn, illegal_out_of_bounds, midgame_disconnect, abandoned_lobby, spam_moves, random_game, ] async def main(): # Initialize players (login + connect + start listener) p1 = await PlayerWebSocketHandler.setup_player("player_one_123456") p2 = await PlayerWebSocketHandler.setup_player("player_two_123456") # Start listeners p1.start_listener() p2.start_listener() # Match create + join match_id = await p1.create_match() await p2.join_match(match_id) print(f"\n✅ Match ready: {match_id}\n") await asyncio.sleep(1) for test_scenario in TEST_SCENARIOS: print(f"\n🚀 Running '{test_scenario.__name__}'...\n") await test_scenario(match_id, p1, p2) await asyncio.sleep(1.0) print("\n✅ All scenarios executed.\n") await p1.close() await p2.close() if __name__ == "__main__": asyncio.run(main())