diff --git a/pex/interpreter.py b/pex/interpreter.py index 87aef04c7..192ff5277 100644 --- a/pex/interpreter.py +++ b/pex/interpreter.py @@ -8,12 +8,14 @@ import json import os import re +import subprocess import sys from textwrap import dedent from pex import third_party from pex.compatibility import string from pex.executor import Executor +from pex.jobs import Job, SpawnedJob, execute_parallel from pex.third_party.packaging import markers, tags from pex.third_party.pkg_resources import Distribution, Requirement from pex.tracer import TRACER @@ -203,7 +205,7 @@ def __hash__(self): class PythonInterpreter(object): - REGEXEN = ( + _REGEXEN = ( re.compile(r'jython$'), # NB: OSX ships python binaries named Python so we allow for capital-P. @@ -220,7 +222,11 @@ class PythonInterpreter(object): re.compile(r'pypy-1.[0-9]$'), ) - CACHE = {} # memoize executable => PythonInterpreter + _PYTHON_INTERPRETER_BY_NORMALIZED_PATH = {} + + @staticmethod + def _normalize_path(path): + return os.path.realpath(path) class Error(Exception): pass class IdentificationError(Error): pass @@ -230,6 +236,10 @@ class InterpreterNotFound(Error): pass def get(cls): return cls.from_binary(sys.executable) + @staticmethod + def _paths(paths=None): + return paths or os.getenv('PATH', '').split(os.pathsep) + @classmethod def iter(cls, paths=None): """Iterate all interpreters found in `paths`. @@ -240,19 +250,12 @@ def iter(cls, paths=None): :param paths: The paths to look for python interpreters; by default the `PATH`. :type paths: list str """ - if paths is None: - paths = os.getenv('PATH', '').split(os.pathsep) - for interpreter in cls._filter(cls._find(paths)): - yield interpreter + return cls._filter(cls._find(cls._paths(paths=paths))) @classmethod def all(cls, paths=None): return list(cls.iter(paths=paths)) - @classmethod - def _from_binary_internal(cls): - return cls(sys.executable, PythonIdentity.get()) - @classmethod def _create_isolated_cmd(cls, binary, args=None, pythonpath=None, env=None): cmd = [binary] @@ -263,7 +266,7 @@ def _create_isolated_cmd(cls, binary, args=None, pythonpath=None, env=None): # some python distributions include portions of the standard library there. cmd.append('-s') - env = cls.sanitized_environment(env=env) + env = cls._sanitized_environment(env=env) pythonpath = list(pythonpath or ()) if pythonpath: env['PYTHONPATH'] = os.pathsep.join(pythonpath) @@ -288,9 +291,9 @@ def _execute(cls, binary, args=None, pythonpath=None, env=None, stdin_payload=No return cmd, stdout, stderr @classmethod - def _from_binary_external(cls, binary): + def _spawn_from_binary_external(cls, binary): pythonpath = third_party.expose(['pex']) - _, stdout, _ = cls._execute( + cmd, env = cls._create_isolated_cmd( binary, args=[ '-c', @@ -303,13 +306,19 @@ def _from_binary_external(cls, binary): ], pythonpath=pythonpath ) - identity = stdout.strip() - if not identity: - raise cls.IdentificationError('Could not establish identity of %s' % binary) - return cls(binary, PythonIdentity.decode(identity)) + process = Executor.open_process(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + job = Job(command=cmd, process=process) + + def create_interpreter(stdout): + identity = stdout.decode('utf-8').strip() + if not identity: + raise cls.IdentificationError('Could not establish identity of %s' % binary) + return cls(binary, PythonIdentity.decode(identity)) + + return SpawnedJob.stdout(job, result_func=create_interpreter) @classmethod - def expand_path(cls, path): + def _expand_path(cls, path): if os.path.isfile(path): return [path] elif os.path.isdir(path): @@ -320,67 +329,73 @@ def expand_path(cls, path): def from_env(cls, hashbang): """Resolve a PythonInterpreter as /usr/bin/env would. - :param hashbang: A string, e.g. "python3.3" representing some binary on the $PATH. + :param hashbang: A string, e.g. "python3.3" representing some binary on the $PATH. + :return: the first matching interpreter found or `None`. + :rtype: :class:`PythonInterpreter` """ - paths = os.getenv('PATH', '').split(':') - for path in paths: - for fn in cls.expand_path(path): - basefile = os.path.basename(fn) - if hashbang == basefile: - try: - return cls.from_binary(fn) - except Exception as e: - TRACER.log('Could not identify %s: %s' % (fn, e)) + def hashbang_matches(fn): + basefile = os.path.basename(fn) + return hashbang == basefile + + for interpreter in cls._identify_interpreters(filter=hashbang_matches): + return interpreter + + @classmethod + def _spawn_from_binary(cls, binary): + normalized_binary = cls._normalize_path(binary) + + # N.B.: The CACHE is written as the last step in PythonInterpreter instance initialization. + cached_interpreter = cls._PYTHON_INTERPRETER_BY_NORMALIZED_PATH.get(normalized_binary) + if cached_interpreter is not None: + return SpawnedJob.completed(cached_interpreter) + if normalized_binary == cls._normalize_path(sys.executable): + current_interpreter = cls(sys.executable, PythonIdentity.get()) + return SpawnedJob.completed(current_interpreter) + return cls._spawn_from_binary_external(normalized_binary) @classmethod def from_binary(cls, binary): """Create an interpreter from the given `binary`. :param str binary: The path to the python interpreter binary. - :return: an interpreter created from the given `binary` with only the specified - extras. + :return: an interpreter created from the given `binary`. :rtype: :class:`PythonInterpreter` """ - normalized_binary = os.path.realpath(binary) - if normalized_binary not in cls.CACHE: - if normalized_binary == os.path.realpath(sys.executable): - cls.CACHE[normalized_binary] = cls._from_binary_internal() - else: - cls.CACHE[normalized_binary] = cls._from_binary_external(normalized_binary) - return cls.CACHE[normalized_binary] + return cls._spawn_from_binary(binary).await_result() @classmethod - def _matches_binary_name(cls, basefile): - return any(matcher.match(basefile) is not None for matcher in cls.REGEXEN) + def _matches_binary_name(cls, path): + basefile = os.path.basename(path) + return any(matcher.match(basefile) is not None for matcher in cls._REGEXEN) @classmethod def _find(cls, paths): + """Given a list of files or directories, try to detect python interpreters amongst them. + + Returns an iterator over PythonInterpreter objects. """ - Given a list of files or directories, try to detect python interpreters amongst them. - Returns an iterator over PythonInterpreter objects. - """ - for path in paths: - for fn in cls.expand_path(path): - basefile = os.path.basename(fn) - if cls._matches_binary_name(basefile): - try: - yield cls.from_binary(fn) - except Exception as e: - TRACER.log('Could not identify %s: %s' % (fn, e)) - continue + return cls._identify_interpreters(filter=cls._matches_binary_name, paths=paths) + + @classmethod + def _identify_interpreters(cls, filter, paths=None): + def iter_candidates(): + for path in cls._paths(paths=paths): + for fn in cls._expand_path(path): + if filter(fn): + yield fn + + return execute_parallel(inputs=list(iter_candidates()), spawn_func=cls._spawn_from_binary) @classmethod def _filter(cls, pythons): - """ - Given an iterator over python interpreters filter out duplicate versions and versions we would - prefer not to use. + """Filters duplicate python interpreters and versions we don't support. - Returns an iterator over PythonInterpreters. + Returns an iterator over PythonInterpreters. """ MAJOR, MINOR, SUBMINOR = range(3) def version_filter(version): return (version[MAJOR] == 2 and version[MINOR] >= 7 or - version[MAJOR] == 3 and version[MINOR] >= 4) + version[MAJOR] == 3 and version[MINOR] >= 5) seen = set() for interp in pythons: @@ -390,7 +405,7 @@ def version_filter(version): yield interp @classmethod - def sanitized_environment(cls, env=None): + def _sanitized_environment(cls, env=None): # N.B. This is merely a hack because sysconfig.py on the default OS X # installation of 2.7 breaks. env_copy = (env or os.environ).copy() @@ -400,14 +415,16 @@ def sanitized_environment(cls, env=None): def __init__(self, binary, identity): """Construct a PythonInterpreter. - You should probably PythonInterpreter.from_binary instead. + You should probably use `PythonInterpreter.from_binary` instead. - :param binary: The full path of the python binary. - :param identity: The :class:`PythonIdentity` of the PythonInterpreter. + :param binary: The full path of the python binary. + :param identity: The :class:`PythonIdentity` of the PythonInterpreter. """ - self._binary = os.path.realpath(binary) + self._binary = self._normalize_path(binary) self._identity = identity + self._PYTHON_INTERPRETER_BY_NORMALIZED_PATH[self._binary] = self + @property def binary(self): return self._binary @@ -441,18 +458,59 @@ def open_process(self, args=None, pythonpath=None, env=None, **kwargs): process = Executor.open_process(cmd, env=env, **kwargs) return cmd, process + def _tup(self): + return self._binary, self._identity + def __hash__(self): - return hash((self._binary, self._identity)) + return hash(self._tup()) def __eq__(self, other): - if not isinstance(other, PythonInterpreter): - return False - return (self._binary, self._identity) == (other._binary, other._identity) + if type(other) is not type(self): + return NotImplemented + return self._tup() == other._tup() def __lt__(self, other): - if not isinstance(other, PythonInterpreter): - return False + if type(other) is not type(self): + return NotImplemented return self.version < other.version def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self._binary, self._identity) + + +def spawn_python_job(args, env=None, interpreter=None, expose=None, **subprocess_kwargs): + """Spawns a python job. + + :param args: The arguments to pass to the python interpreter. + :type args: list of str + :param env: The environment to spawn the python interpreter process in. Defaults to the ambient + environment. + :type env: dict of (str, str) + :param interpreter: The interpreter to use to spawn the python job. Defaults to the current + interpreter. + :type interpreter: :class:`PythonInterpreter` + :param expose: The names of any vendored distributions to expose to the spawned python process. + :type expose: list of str + :param subprocess_kwargs: Any additional :class:`subprocess.Popen` kwargs to pass through. + :returns: A job handle to the spawned python process. + :rtype: :class:`Job` + """ + if expose: + subprocess_env = (env or os.environ).copy() + # In order to expose vendored distributions with their un-vendored import paths in-tact, we + # need to set `__PEX_UNVENDORED__`. See: vendor.__main__.ImportRewriter._modify_import. + subprocess_env['__PEX_UNVENDORED__'] = '1' + + pythonpath = third_party.expose(expose) + else: + subprocess_env = env + pythonpath = None + + interpreter = interpreter or PythonInterpreter.get() + cmd, process = interpreter.open_process( + args=args, + pythonpath=pythonpath, + env=subprocess_env, + **subprocess_kwargs + ) + return Job(command=cmd, process=process) diff --git a/pex/jobs.py b/pex/jobs.py index 8b3777b85..7c8539dbc 100644 --- a/pex/jobs.py +++ b/pex/jobs.py @@ -4,12 +4,9 @@ from __future__ import absolute_import import errno -import os from threading import BoundedSemaphore, Event, Thread -from pex import third_party from pex.compatibility import Queue, cpu_count -from pex.interpreter import PythonInterpreter from pex.tracer import TRACER @@ -48,7 +45,7 @@ def communicate(self, input=None): :raises: :class:`Job.Error` if the job exited non-zero. """ stdout, stderr = self._process.communicate(input=input) - self._check_returncode() + self._check_returncode(stderr) return stdout, stderr def kill(self): @@ -62,55 +59,41 @@ def kill(self): if e.errno != errno.ESRCH: raise e - def _check_returncode(self): + def _check_returncode(self, stderr=None): if self._process.returncode != 0: - raise self.Error('Executing {} failed with {}' - .format(' '.join(self._command), self._process.returncode)) + msg = 'Executing {} failed with {}'.format(' '.join(self._command), self._process.returncode) + if stderr: + msg += '\nSTDERR:\n{}'.format(stderr.decode('utf-8')) + raise self.Error(msg) def __str__(self): return 'pid: {pid} -> {command}'.format(pid=self._process.pid, command=' '.join(self._command)) -def spawn_python_job(args, env=None, interpreter=None, expose=None, **subprocess_kwargs): - """Spawns a python job. - - :param args: The arguments to pass to the python interpreter. - :type args: list of str - :param env: The environment to spawn the python interpreter process in. Defaults to the ambient - environment. - :type env: dict of (str, str) - :param interpreter: The interpreter to use to spawn the python job. Defaults to the current - interpreter. - :type interpreter: :class:`PythonInterpreter` - :param expose: The names of any vendored distributions to expose to the spawned python process. - :type expose: list of str - :param subprocess_kwargs: Any additional :class:`subprocess.Popen` kwargs to pass through. - :returns: A job handle to the spawned python process. - :rtype: :class:`Job` - """ - if expose: - subprocess_env = (env or os.environ).copy() - # In order to expose vendored distributions with their un-vendored import paths in-tact, we - # need to set `__PEX_UNVENDORED__`. See: vendor.__main__.ImportRewriter._modify_import. - subprocess_env['__PEX_UNVENDORED__'] = '1' +class SpawnedJob(object): + """A handle to a spawned :class:`Job` and its associated result.""" - pythonpath = third_party.expose(expose) - else: - subprocess_env = env - pythonpath = None + @classmethod + def completed(cls, result): + """Wrap an already completed result in a SpawnedJob. - interpreter = interpreter or PythonInterpreter.get() - cmd, process = interpreter.open_process( - args=args, - pythonpath=pythonpath, - env=subprocess_env, - **subprocess_kwargs - ) - return Job(command=cmd, process=process) + The returned job will no-op when `kill` is called since the job is already completed. + :param result: The completed result. + :return: A spawned job whose result is already complete. + :rtype: :class:`SpawnedJob` + """ + class Completed(SpawnedJob): + def __init__(self): + super(Completed, self).__init__(job=None, result_func=lambda: result) -class SpawnedJob(object): - """A handle to a spawned :class:`Job` and its associated result.""" + def kill(self): + pass + + def __str__(self): + return 'SpawnedJob.completed({})'.format(result) + + return Completed() @classmethod def wait(cls, job, result): @@ -181,26 +164,34 @@ def _sanitize_max_jobs(max_jobs=None): return min(max_jobs, _ABSOLUTE_MAX_JOBS) -def execute_parallel(max_jobs, inputs, spawn_func, raise_type): +def execute_parallel(inputs, spawn_func, raise_type=None, max_jobs=None): """Execute jobs for the given inputs in parallel. :param int max_jobs: The maximum number of parallel jobs to spawn. :param inputs: An iterable of the data to parallelize over `spawn_func`. :param spawn_func: A function taking a single input and returning a :class:`SpawnedJob`. - :param raise_type: A type that takes a single string argument and will be used to construct a - raiseable value when any of the spawned jobs errors. + :param raise_type: An optional type that takes a single string argument and will be used to + construct a raiseable value when any of the spawned jobs errors. `None` by + default which indicates spawned jobs that error should be skipped; i.e.: the + returned iterator over spawned job results will return less results than + inputs. :returns: An iterator over the spawned job results as they come in. - :raises: A `raise_type` exception if any individual job errors. + :raises: A `raise_type` exception if any individual job errors and `raise_type` is not `None`. """ size = _sanitize_max_jobs(max_jobs) TRACER.log('Spawning a maximum of {} parallel jobs to process:\n {}' .format(size, '\n '.join(map(str, inputs))), V=9) + class SpawnError(Exception): + def __init__(self, item, error): + self.item = item + self.error = error + stop = Event() # Used as a signal to stop spawning further jobs once any one job fails. job_slots = BoundedSemaphore(value=size) done_sentinel = object() - spawned_job_queue = Queue() # Queue[Union[SpawnedJob, Exception, Literal[done_sentinel]]] + spawned_job_queue = Queue() # Queue[Union[SpawnedJob, SpawnError, Literal[done_sentinel]]] def spawn_jobs(): for item in inputs: @@ -208,11 +199,11 @@ def spawn_jobs(): break job_slots.acquire() try: - resut = spawn_func(item) + result = spawn_func(item) except Exception as e: - resut = e + result = SpawnError(item, e) finally: - spawned_job_queue.put(resut) + spawned_job_queue.put(result) spawned_job_queue.put(done_sentinel) spawner = Thread(name='PEX Parallel Job Spawner', target=spawn_jobs) @@ -229,15 +220,25 @@ def spawn_jobs(): return try: - if isinstance(item, Exception): - error = item + if isinstance(item, SpawnError): + if raise_type: + error = item.error + else: + # Otherwise, if we were given no `raise_type`, we skip over the input that raised. + TRACER.log('Failed to spawn a job for {}: {} raised {}' + .format(item.item, spawn_func, item.error)) elif error is not None: # I.E.: `item` is not an exception, but there was a prior exception. item.kill() else: try: yield item.await_result() except Job.Error as e: - stop.set() - error = raise_type('{} raised {}'.format(item, e)) + if raise_type: + # Fail fast and proceed to kill all outstanding spawned jobs. + stop.set() + error = raise_type('{} raised {}'.format(item, e)) + else: + # Otherwise, if we were given no `raise_type`, we continue on and just log the failure. + TRACER.log('{} raised {}'.format(item, e)) finally: job_slots.release() diff --git a/pex/resolver.py b/pex/resolver.py index 32c35e2c3..79a09ee24 100644 --- a/pex/resolver.py +++ b/pex/resolver.py @@ -13,7 +13,8 @@ from pex.common import AtomicDirectory, atomic_directory, safe_mkdtemp from pex.distribution_target import DistributionTarget -from pex.jobs import SpawnedJob, execute_parallel, spawn_python_job +from pex.interpreter import spawn_python_job +from pex.jobs import SpawnedJob, execute_parallel from pex.orderedset import OrderedSet from pex.pex_info import PexInfo from pex.pip import get_pip @@ -360,7 +361,12 @@ def _iter_local_projects(self): yield BuildRequest.create(target=target, source_path=local_project) def _run_parallel(self, inputs, spawn_func, raise_type): - for result in execute_parallel(self._max_parallel_jobs, inputs, spawn_func, raise_type): + for result in execute_parallel( + inputs=inputs, + spawn_func=spawn_func, + raise_type=raise_type, + max_jobs=self._max_parallel_jobs + ): yield result def _spawn_resolve(self, resolved_dists_dir, target): diff --git a/tests/test_bdist_pex.py b/tests/test_bdist_pex.py index a7187c8e1..f4a0a5353 100644 --- a/tests/test_bdist_pex.py +++ b/tests/test_bdist_pex.py @@ -7,7 +7,7 @@ from textwrap import dedent from pex.common import open_zip, temporary_dir -from pex.jobs import spawn_python_job +from pex.interpreter import spawn_python_job from pex.testing import WheelBuilder, make_project, temporary_content