Skip to content

Commit

Permalink
Implemented another mode of operation
Browse files Browse the repository at this point in the history
  • Loading branch information
mdboom committed Jul 20, 2023
1 parent edef889 commit e03bfa6
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 31 deletions.
123 changes: 116 additions & 7 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Licensed to PSF under a Contributor Agreement.
#

__all__ = ["Pool", "ThreadPool", "SubinterpreterPool"]
__all__ = ["Pool", "ThreadPool", "SubinterpreterPool", "SubinterpreterPool2"]

#
# Imports
Expand Down Expand Up @@ -969,9 +969,7 @@ def _wait_for_updates(cls, sentinels, change_notifier, timeout=None):
import pickle
target = pickle.loads({target!r})
args = pickle.loads({args!r})
kwargs = pickle.loads({kwargs!r})
target, args, kwargs = pickle.loads({pickle!r})
target(*args, **kwargs)
"""
Expand Down Expand Up @@ -1009,9 +1007,7 @@ class SubinterpreterProcess(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
def spawn_subinterpreter():
code = template.format(
target=pickle.dumps(target),
args=pickle.dumps(args),
kwargs=pickle.dumps(kwargs),
pickle=pickle.dumps((target, args, kwargs))
)
interpreter = interpreters.create()
interpreter.run(code)
Expand Down Expand Up @@ -1080,3 +1076,116 @@ def _help_stuff_finish(inqueue, task_handler, size):
@classmethod
def _wait_for_updates(cls, sentinels, change_notifier, timeout=None):
time.sleep(timeout or 0)


subinterpreter2_setup_code = """
import sys
sys.path.insert(0, '.')
import pickle
def _f(p):
func, args, kwargs = pickle.loads(p)
return pickle.dumps(func(*args, **kwargs))
"""


subinterpreter2_template = """
_f({pickle!r})
"""


def run_in_interpreter(inter, func, args, kwargs):
code = subinterpreter2_template.format(pickle=pickle.dumps((func, args, kwargs)))
result = pickle.loads(inter.run(code, eval=True))
return result


def subinterpreter_worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False):
if (maxtasks is not None) and not (isinstance(maxtasks, int)
and maxtasks >= 1):
raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()

interpreter = interpreters.create()
interpreter.run(subinterpreter2_setup_code)

try:
if initializer is not None:
# TODO
initializer(*initargs)

completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
except (EOFError, OSError):
util.debug('worker got EOFError or OSError -- exiting')
break

if task is None:
util.debug('worker got sentinel -- exiting')
break

job, i, func, args, kwds = task
try:
result = (True, run_in_interpreter(interpreter, func, args, kwds))
except Exception as e:
if wrap_exception and func is not _helper_reraises_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))

task = job = result = func = args = kwds = None
completed += 1
finally:
interpreter.close()
util.debug('worker exiting after %d tasks' % completed)


class SubinterpreterProcess2(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
assert target is worker
target = subinterpreter_worker
threading.Thread.__init__(self, group, subinterpreter_worker, name, args, kwargs)
self._pid = None
self._children = weakref.WeakKeyDictionary()
self._start_called = False
self._parent = threading.current_thread()

def terminate(self):
self.join()

def start(self):
if self._parent != threading.current_thread():
raise RuntimeError(
"Parent is {0!r} but current_process is {1!r}".format(
self._parent, threading.current_thread()
)
)
self._start_called = True
threading.Thread.start(self)

@property
def exitcode(self):
if self._start_called and not self.is_alive():
return 0
else:
return None


class SubinterpreterPool2(ThreadPool):
@staticmethod
def Process(ctx, *args, **kwds):
return SubinterpreterProcess2(*args, **kwds)
8 changes: 6 additions & 2 deletions Lib/test/support/interpreters.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ def close(self):
"""
return _interpreters.destroy(self._id)

def run(self, src_str, /, *, channels=None):
def run(self, src_str, /, *, channels=None, eval=False):
"""Run the given source code in the interpreter.
This blocks the current Python thread until done.
If `eval` is True, the code is evaluated as an expression, and the
result is returned. The return type must be a shareable object.
"""
_interpreters.run_string(self._id, src_str, channels)
return _interpreters.run_string(self._id, src_str, channels, eval)


def create_channel():
Expand Down
54 changes: 32 additions & 22 deletions Modules/_xxsubinterpretersmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ _ensure_not_running(PyInterpreterState *interp)

static int
_run_script(PyInterpreterState *interp, const char *codestr,
_sharedns *shared, _sharedexception *sharedexc)
_sharedns *shared, _sharedexception *sharedexc, int eval, PyObject **result)
{
PyObject *excval = NULL;
PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
Expand All @@ -420,15 +420,25 @@ _run_script(PyInterpreterState *interp, const char *codestr,
}

// Run the string (see PyRun_SimpleStringFlags).
PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
Py_DECREF(ns);
if (result == NULL) {
goto error;
}
else {
Py_DECREF(result); // We throw away the result.
if (eval == 0) {
*result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
Py_DECREF(ns);
if (*result == NULL) {
goto error;
}
} else {
*result = PyRun_StringFlags(codestr, Py_eval_input, ns, ns, NULL);
Py_DECREF(ns);
if (*result == NULL) {
goto error;
}
if (_PyObject_CheckCrossInterpreterData(*result) != 0) {
Py_DECREF(*result);
*result = NULL;
PyErr_SetString(PyExc_TypeError, "Result is not shareable between interpreters");
goto error;
}
}

*sharedexc = no_exception;
return 0;

Expand All @@ -446,18 +456,20 @@ _run_script(PyInterpreterState *interp, const char *codestr,
return -1;
}

static int
static PyObject*
_run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
const char *codestr, PyObject *shareables)
const char *codestr, PyObject *shareables, int eval)
{
PyObject* result = NULL;

if (_ensure_not_running(interp) < 0) {
return -1;
return result;
}
module_state *state = get_module_state(mod);

_sharedns *shared = _get_shared_ns(shareables);
if (shared == NULL && PyErr_Occurred()) {
return -1;
return result;
}

// Switch to interpreter.
Expand All @@ -471,7 +483,7 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,

// Run the script.
_sharedexception exc = {NULL, NULL};
int result = _run_script(interp, codestr, shared, &exc);
int result_code = _run_script(interp, codestr, shared, &exc, eval, &result);

// Switch back.
if (save_tstate != NULL) {
Expand All @@ -483,7 +495,7 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
assert(state != NULL);
_sharedexception_apply(&exc, state->RunFailedError);
}
else if (result != 0) {
else if (result_code != 0) {
// We were unable to allocate a shared exception.
PyErr_NoMemory();
}
Expand Down Expand Up @@ -674,12 +686,13 @@ Return the ID of main interpreter.");
static PyObject *
interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"id", "script", "shared", NULL};
static char *kwlist[] = {"id", "script", "shared", "eval", NULL};
PyObject *id, *code;
PyObject *shared = NULL;
int eval = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"OU|O:run_string", kwlist,
&id, &code, &shared)) {
"OU|Op:run_string", kwlist,
&id, &code, &shared, &eval)) {
return NULL;
}

Expand All @@ -702,10 +715,7 @@ interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
}

// Run the code in the interpreter.
if (_run_script_in_interpreter(self, interp, codestr, shared) != 0) {
return NULL;
}
Py_RETURN_NONE;
return _run_script_in_interpreter(self, interp, codestr, shared, eval);
}

PyDoc_STRVAR(run_string_doc,
Expand Down

0 comments on commit e03bfa6

Please sign in to comment.