forked from TitorX/httppy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
socketserver.py
191 lines (156 loc) · 5.71 KB
/
socketserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# coding=utf-8
import socket
import threading
import logging
import os
import traceback
class BaseTCPServer(object):
"""
基于TCP套接字的单线程服务器类
用于完成套接字层面的网络操作
工作流程:
bind()
↓
listen()
|
| loop ←------- ←
↓ ↑
get_request() → handle()
"""
def __init__(self, server_address, request_handler_class):
"""
:type server_address: (str, int)
"""
self.logger = logging.getLogger(str(os.getpid()))
self.server_address = server_address
self.request_handler_class = request_handler_class
self.request_queue_size = 5
self.connect_timeout = 5
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def server_bind(self):
""" 套接字对象绑定到服务器端口 """
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_listen(self):
""" 开始监听 """
self.socket.listen(self.request_queue_size)
def server_start(self):
""" 开启服务 """
self.server_bind()
self.logger.info('Server start. pid:%s' % str(os.getpid()))
self.server_listen()
self.logger.info('bind:' + str(self.server_address))
while 1:
socket_request, client_address = self.get_request()
socket_request.settimeout(self.connect_timeout)
self.handle_socket_request(socket_request, client_address)
def get_request(self):
""" 获取连接 """
return self.socket.accept()
def handle_socket_request(self, socket_request, client_address):
"""
处理套接字请求
:type socket_request: socket._socketobject
:type client_address: (str, int)
"""
self.request_handler_class(socket_request, client_address, self)
def server_close(self):
""" 关闭服务 """
self.socket.close()
class TreadPoolTCPServer(BaseTCPServer):
"""
基于线程池的TCP服务器
"""
class _Handler(threading.Thread):
def __init__(self, server):
threading.Thread.__init__(self)
self.handler = None
self.work_signal = threading.Event()
self.server = server
self.socket_request = None
self.client_address = None
self.setDaemon(True)
self.start()
def set_socket_request(self, socket_request, client_address):
self.socket_request = socket_request
self.client_address = client_address
def run(self):
while True:
# 等待工作开始的信号
self.work_signal.wait()
try:
self.handle_request()
self.socket_request = None
self.client_address = None
except Exception as e:
self.socket_request.close()
self.server.logger.warn('\n'.join([str(e), traceback.format_exc()]))
self.work_signal.clear()
# 工作完成后将自身添加回线程池中
self.server.thread_pool.append(self)
def set_request_handler(self, request_handler_class):
""" 设置处理请求的类 """
self.handler = request_handler_class
def handle_request(self):
self.handler(self.socket_request, self.client_address, self.server)
def __init__(self, server_address, request_handler_class):
BaseTCPServer.__init__(self, server_address, request_handler_class)
# 线程池的最大线程数
self.thread_pool_size = 10
self.thread_pool = []
def build_thread_pool(self):
""" 创建线程池 """
for i in range(self.thread_pool_size):
handler = self.create_thread()
handler.set_request_handler(self.request_handler_class)
self.thread_pool.append(handler)
def create_thread(self):
""" 创建一条工作线程 """
return self._Handler(self)
def server_start(self):
self.build_thread_pool()
BaseTCPServer.server_start(self)
def handle_socket_request(self, socket_request, client_address):
""" 处理套接字请求 """
if self.thread_pool:
# 从线程池中取一条空闲线程
handler = self.thread_pool.pop()
handler.set_socket_request(socket_request, client_address)
# 通知线程开始工作
handler.work_signal.set()
else:
# 线程池中没有空闲线程 直接关闭套接字连接
socket_request.close()
class BaseSocketHandler:
def __init__(self, socket_request, client_address, server):
"""
:type socket_request: socket._socketobject
:type client_address: (str, int)
"""
self.recv_size = 1024
self.data = ''
self.result = ''
self.socket_request = socket_request
self.client_address = client_address
self.server = server
self.setup()
try:
self.recv()
if self.data != '':
self.handle_socket_request()
finally:
self.finish()
def setup(self):
pass
def recv(self):
while True:
recv = self.socket_request.recv(self.recv_size)
self.data += recv
if len(recv) < self.recv_size:
break
def handle_socket_request(self):
pass
def finish(self):
self.socket_request.sendall(self.result)
self.socket_request.close()