-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebsocket_client.py
68 lines (56 loc) · 1.88 KB
/
websocket_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import json
from typing import List
import websockets
from loguru import logger
from model import InputModel, ResponseModel
class WebSocketClient:
def __init__(self):
self.uri = None
self.websocket = None
self.temp_mq: List[InputModel] = []
async def connect(self, uri: str):
self.uri = uri
self.websocket = await websockets.connect(self.uri)
logger.success("Connected!")
async def send_message(self, message: InputModel):
"""
发送消息到核心
:param message:
:return:
"""
if self.websocket:
try:
await self.websocket.send(
json.dumps(message.model_dump(), ensure_ascii=False)
)
logger.success("Process -> engine")
except websockets.exceptions.ConnectionClosedError:
self.temp_mq.append(message)
async def send_tmq(self):
"""
发送临时消息队列中的消息
:return:
"""
if self.temp_mq:
await self.send_message(self.temp_mq[0])
self.temp_mq.pop(0)
async def receive_message(self) -> ResponseModel:
"""
从核心接收消息
:return:
"""
if self.websocket:
try:
response = await self.websocket.recv()
try:
return ResponseModel(**json.loads(response))
except json.JSONDecodeError:
logger.error(f"JSON Decode Error!\n{response}")
except websockets.exceptions.ConnectionClosedError:
logger.critical("Cyber Engine Connection Lost!")
await self.connect(self.uri)
async def close(self):
if self.websocket:
await self.websocket.close()
logger.success("Connection closed!")
client = WebSocketClient()