# test_websocket.py import asyncio import websockets async def test_connection(): uri = "ws://47.101.162.164:9600/ws/node" headers = { "Authorization": "Bearer family-member-x9" } try: print(f"Connecting to {uri}...") async with websockets.connect(uri, extra_headers=headers) as ws: print("✅ Connection successful!") print("WebSocket connection established.") # 发送注册消息 import json register_msg = { "type": "register", "node_id": "test-node", "display_name": "Test Node", "serves_users": ["test-user"], "working_dir": "/tmp", "capabilities": ["claude_code", "shell", "file_ops"] } await ws.send(json.dumps(register_msg)) print("Sent registration message") # 等待响应 response = await ws.recv() print(f"Received response: {response}") except Exception as e: print(f"❌ Connection failed: {e}") if __name__ == "__main__": asyncio.run(test_connection())