-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultiprocess.py
178 lines (163 loc) · 5.77 KB
/
multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-
# file: multiprocess.py
# author: joddiyzhang@gmail.com
# time: 11/12/2017 5:07 PM
# Copyright (C) <2017> <Joddiy Zhang>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ------------------------------------------------------------------------
import os
import importlib
import multiprocessing
from functools import partial
from abc import ABCMeta, abstractmethod
from multiprocessing.pool import ThreadPool
from multiprocessing import Pool, Pipe, Process, set_start_method
class MultiProcess(object):
"""
common multiprocess module
"""
pool_num, parent_conn, child_conn, pro = None, None, None, None
def __init__(self, pool_num=10):
"""
Init function
:param pool_num: process-pool size
"""
self.pool_num = pool_num
def start(self):
"""
Start a sub-process which manages the process-pool of Worker, and connects with its parent-process by pipeline.
Parent-processor will send Task to sub-process which then gets a process from pool to execute it.
:return:
"""
# print("===> now start parent, PID:", os.getpid())
# parent-process start a server process, and sub-process will be started by this server, avoid thread-unsafe
set_start_method('forkserver')
self.parent_conn, self.child_conn = Pipe()
self.pro = Process(target=self._start_pool, args=(self.child_conn,))
self.pro.start()
def push(self, task):
"""
Push Task to pipeline
:return:
"""
self.parent_conn.send(task)
def stop(self):
"""
Wait exit
:return:
"""
# push exit signal to pipeline
self.push(InfoTask(False, 1))
# wait sub-process exit
self.pro.join()
# close pipeline
self.parent_conn.close()
self.child_conn.close()
def _start_pool(self, child_conn):
"""
Start a process-pool, and then connect parent-process by pipeline.
After get a task, it will assign a worker to execute it.
:param child_conn:
:return:
"""
# print("===> now start child, PID:", os.getpid())
pool = multiprocessing.Pool(processes=self.pool_num)
while True:
# hold and wait task
info_task = child_conn.recv()
if info_task.task_module is False:
# when receive False, exit
break
else:
tmp_path = os.path.splitext(info_task.task_module)
module_path = importlib.import_module(tmp_path[0])
user_class = getattr(module_path, tmp_path[1][1:])
task_module = user_class()
# start a worker
monitor_func = partial(self.monitor_worker, task_module.runnable, timeout=info_task.timeout)
pool.apply_async(monitor_func, args=info_task.args, callback=task_module.callback,
error_callback=task_module.error_callback)
pool.close()
pool.join()
@staticmethod
def monitor_worker(func, *args, **kwargs):
"""
Worker process will start a thread to excute Task, and another to monitor whether it is timeout.
:param func:
:param args:
:param kwargs:
:return:
"""
# print("===> now task get a worker, PID:", os.getpid())
timeout = kwargs.get('timeout', None)
# start a thread to monitor
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
# wait result
out = res.get(timeout)
return out
except multiprocessing.TimeoutError:
# exit when timeout
p.terminate()
raise Exception("timeout", args)
except Exception as e:
# other error
p.terminate()
raise Exception(str(e), args)
class InfoTask(object):
"""
common Task
"""
from_pid, task_module, timeout, args = None, None, None, None
def __init__(self, task_module, timeout, *args):
"""
init function
:param task_module: task module path(eg: src.components.multiprocess.example.Demo)
if it is False, it means there isn't any more new Task
:param timeout: timeout (second)
:param args: other params transferred to module
"""
self.from_pid = os.getpid()
self.task_module = task_module
self.timeout = timeout
self.args = args
class Model(metaclass=ABCMeta):
@abstractmethod
def runnable(self, *args):
"""
main function will be executed by Worker
:param args:
:return:
"""
pass
@abstractmethod
def callback(self, result):
"""
callback function when success
PLEASE NOTE: don't raise any Exception here, otherwise the process cannot exit normally
:param result:
:return:
"""
pass
@abstractmethod
def error_callback(self, error):
"""
callback function when fail
PLEASE NOTE: don't raise any Exception here, otherwise the process cannot exit normally
:param error:
:return:
"""
pass