-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·139 lines (98 loc) · 3.08 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
import socket
import threading
import sys
import getopt
import re
from typing import Tuple
from config import SERVER_TIMEOUT
from robot import Robot
def read_message(conn: socket) -> list:
buffer = b''
output = []
while (b'\a\b' not in buffer) or (buffer[-2:] != b'\a\b'):
data = conn.recv(1000)
if not data: # socket closed
return None
buffer += data
line, sep, buffer = buffer.partition(b'\a\b')
output.append(line.decode())
# While buffer is not empty
while buffer:
line, sep, buffer = buffer.partition(b'\a\b')
output.append(line.decode())
return output
def handle_client(sock: socket, conn: socket, addr, clientId: int) -> None:
robot = Robot(conn, clientId)
while True:
try:
output = read_message(conn)
except:
print(f"[WARN] Connection {addr} closed with ERROR")
conn.close()
return
if output is None:
break
isError = False
for msg in output:
print(f" ∟ [{robot.id}] Recv: {msg}")
if not robot.process_message(msg):
isError = True
break
if (isError):
break
print(f"[INFO] Connection {addr} closed with success")
conn.close()
return
def parse_args(argv) -> Tuple[str, int]:
host = ''
port = 0
# Parse command line arguments
try:
opts, args = getopt.getopt(
argv, "ha:p:", ["help", "address=", "port="])
except getopt.GetoptError:
print(
f"{__file__} [-a IP_ADDRESS] [--address IP_ADDRESS] [-p PORT] [--port PORT]")
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(
f"{sys.argv[0]} [-a IP_ADDRESS] [--address IP_ADDRESS] [-p PORT] [--port PORT]")
sys.exit()
elif opt in ("-a", "--address"):
host = arg
elif opt in ("-p", "--port"):
port = int(arg)
return (host, port)
def main():
# hostname = socket.gethostname()
# host = socket.gethostbyname(hostname)
host, port = parse_args(sys.argv[1:])
try:
# Create IPv4 socket stream
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', port))
except socket.error as e:
print(f"[ERROR] Can't bind to port {port} ({str(e)})")
return
# Start listening on host:port
sock.listen()
print(f"[INFO] Listening on {sock.getsockname()}")
clientCount = 0
try:
# Wait for connections
while True:
# Create connection with client
conn, addr = sock.accept()
print(f"[INFO] Recieved a connection from {addr}")
conn.settimeout(SERVER_TIMEOUT)
thread = threading.Thread(
target=handle_client, args=(sock, conn, addr, clientCount))
thread.start()
clientCount += 1
except KeyboardInterrupt:
sock.close()
print("[INFO] Stopping the server")
if __name__ == "__main__":
main()