-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsocketmanager.py
70 lines (33 loc) · 1.44 KB
/
socketmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import json
import socket
from utility.configuration.configuration import ConfigMetaclass, ConfigAttribute
class SocketConfig(metaclass=ConfigMetaclass):
buffer_size = ConfigAttribute('buffer_size', int, 1024)
encoding = ConfigAttribute('encoding', str, 'utf-8')
host = ConfigAttribute('host', str, 'localhost')
port = ConfigAttribute('port', int, 8000)
def __init__(self, path=None):
if path:
with open(path, 'r') as file:
config_json = json.load(file)
for name, value in config_json.items():
setattr(self, name, value)
class SocketManager(metaclass=ConfigMetaclass):
def __init__(self):
self._config = SocketConfig()
@property
def address(self):
return (self._config.host, self._config.port)
def send_request_message(self, **kwargs):
with socket.socket() as sock:
sock.connect(self.address)
context = {'action':'sendall', 'message':kwargs}
response_str = json.dumps(context)
response_bytes = response_str.encode(self._config.encoding)
sock.send(response_bytes)
def receive_response_message(self):
with socket.socket() as sock:
sock.connect(self.address)
while True:
response_bytes = sock.recvmsg(self._config.buffer_size)[0]
return response_bytes.decode(self._config.encoding)