Skip to content

Commit

Permalink
Parallelize interpreter discovery. (pex-tool#842)
Browse files Browse the repository at this point in the history
This speeds up both pex builds and pex executions on machines with
multiple interpreters and `--interpreter-constraint`s in-play.

Work towards pex-tool#782.
  • Loading branch information
jsirois authored Jan 23, 2020
1 parent 2fb8b6f commit aa6a843
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 125 deletions.
192 changes: 125 additions & 67 deletions pex/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import json
import os
import re
import subprocess
import sys
from textwrap import dedent

from pex import third_party
from pex.compatibility import string
from pex.executor import Executor
from pex.jobs import Job, SpawnedJob, execute_parallel
from pex.third_party.packaging import markers, tags
from pex.third_party.pkg_resources import Distribution, Requirement
from pex.tracer import TRACER
Expand Down Expand Up @@ -203,7 +205,7 @@ def __hash__(self):


class PythonInterpreter(object):
REGEXEN = (
_REGEXEN = (
re.compile(r'jython$'),

# NB: OSX ships python binaries named Python so we allow for capital-P.
Expand All @@ -220,7 +222,11 @@ class PythonInterpreter(object):
re.compile(r'pypy-1.[0-9]$'),
)

CACHE = {} # memoize executable => PythonInterpreter
_PYTHON_INTERPRETER_BY_NORMALIZED_PATH = {}

@staticmethod
def _normalize_path(path):
return os.path.realpath(path)

class Error(Exception): pass
class IdentificationError(Error): pass
Expand All @@ -230,6 +236,10 @@ class InterpreterNotFound(Error): pass
def get(cls):
return cls.from_binary(sys.executable)

@staticmethod
def _paths(paths=None):
return paths or os.getenv('PATH', '').split(os.pathsep)

@classmethod
def iter(cls, paths=None):
"""Iterate all interpreters found in `paths`.
Expand All @@ -240,19 +250,12 @@ def iter(cls, paths=None):
:param paths: The paths to look for python interpreters; by default the `PATH`.
:type paths: list str
"""
if paths is None:
paths = os.getenv('PATH', '').split(os.pathsep)
for interpreter in cls._filter(cls._find(paths)):
yield interpreter
return cls._filter(cls._find(cls._paths(paths=paths)))

@classmethod
def all(cls, paths=None):
return list(cls.iter(paths=paths))

@classmethod
def _from_binary_internal(cls):
return cls(sys.executable, PythonIdentity.get())

@classmethod
def _create_isolated_cmd(cls, binary, args=None, pythonpath=None, env=None):
cmd = [binary]
Expand All @@ -263,7 +266,7 @@ def _create_isolated_cmd(cls, binary, args=None, pythonpath=None, env=None):
# some python distributions include portions of the standard library there.
cmd.append('-s')

env = cls.sanitized_environment(env=env)
env = cls._sanitized_environment(env=env)
pythonpath = list(pythonpath or ())
if pythonpath:
env['PYTHONPATH'] = os.pathsep.join(pythonpath)
Expand All @@ -288,9 +291,9 @@ def _execute(cls, binary, args=None, pythonpath=None, env=None, stdin_payload=No
return cmd, stdout, stderr

@classmethod
def _from_binary_external(cls, binary):
def _spawn_from_binary_external(cls, binary):
pythonpath = third_party.expose(['pex'])
_, stdout, _ = cls._execute(
cmd, env = cls._create_isolated_cmd(
binary,
args=[
'-c',
Expand All @@ -303,13 +306,19 @@ def _from_binary_external(cls, binary):
],
pythonpath=pythonpath
)
identity = stdout.strip()
if not identity:
raise cls.IdentificationError('Could not establish identity of %s' % binary)
return cls(binary, PythonIdentity.decode(identity))
process = Executor.open_process(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
job = Job(command=cmd, process=process)

def create_interpreter(stdout):
identity = stdout.decode('utf-8').strip()
if not identity:
raise cls.IdentificationError('Could not establish identity of %s' % binary)
return cls(binary, PythonIdentity.decode(identity))

return SpawnedJob.stdout(job, result_func=create_interpreter)

@classmethod
def expand_path(cls, path):
def _expand_path(cls, path):
if os.path.isfile(path):
return [path]
elif os.path.isdir(path):
Expand All @@ -320,67 +329,73 @@ def expand_path(cls, path):
def from_env(cls, hashbang):
"""Resolve a PythonInterpreter as /usr/bin/env would.
:param hashbang: A string, e.g. "python3.3" representing some binary on the $PATH.
:param hashbang: A string, e.g. "python3.3" representing some binary on the $PATH.
:return: the first matching interpreter found or `None`.
:rtype: :class:`PythonInterpreter`
"""
paths = os.getenv('PATH', '').split(':')
for path in paths:
for fn in cls.expand_path(path):
basefile = os.path.basename(fn)
if hashbang == basefile:
try:
return cls.from_binary(fn)
except Exception as e:
TRACER.log('Could not identify %s: %s' % (fn, e))
def hashbang_matches(fn):
basefile = os.path.basename(fn)
return hashbang == basefile

for interpreter in cls._identify_interpreters(filter=hashbang_matches):
return interpreter

@classmethod
def _spawn_from_binary(cls, binary):
normalized_binary = cls._normalize_path(binary)

# N.B.: The CACHE is written as the last step in PythonInterpreter instance initialization.
cached_interpreter = cls._PYTHON_INTERPRETER_BY_NORMALIZED_PATH.get(normalized_binary)
if cached_interpreter is not None:
return SpawnedJob.completed(cached_interpreter)
if normalized_binary == cls._normalize_path(sys.executable):
current_interpreter = cls(sys.executable, PythonIdentity.get())
return SpawnedJob.completed(current_interpreter)
return cls._spawn_from_binary_external(normalized_binary)

@classmethod
def from_binary(cls, binary):
"""Create an interpreter from the given `binary`.
:param str binary: The path to the python interpreter binary.
:return: an interpreter created from the given `binary` with only the specified
extras.
:return: an interpreter created from the given `binary`.
:rtype: :class:`PythonInterpreter`
"""
normalized_binary = os.path.realpath(binary)
if normalized_binary not in cls.CACHE:
if normalized_binary == os.path.realpath(sys.executable):
cls.CACHE[normalized_binary] = cls._from_binary_internal()
else:
cls.CACHE[normalized_binary] = cls._from_binary_external(normalized_binary)
return cls.CACHE[normalized_binary]
return cls._spawn_from_binary(binary).await_result()

@classmethod
def _matches_binary_name(cls, basefile):
return any(matcher.match(basefile) is not None for matcher in cls.REGEXEN)
def _matches_binary_name(cls, path):
basefile = os.path.basename(path)
return any(matcher.match(basefile) is not None for matcher in cls._REGEXEN)

@classmethod
def _find(cls, paths):
"""Given a list of files or directories, try to detect python interpreters amongst them.
Returns an iterator over PythonInterpreter objects.
"""
Given a list of files or directories, try to detect python interpreters amongst them.
Returns an iterator over PythonInterpreter objects.
"""
for path in paths:
for fn in cls.expand_path(path):
basefile = os.path.basename(fn)
if cls._matches_binary_name(basefile):
try:
yield cls.from_binary(fn)
except Exception as e:
TRACER.log('Could not identify %s: %s' % (fn, e))
continue
return cls._identify_interpreters(filter=cls._matches_binary_name, paths=paths)

@classmethod
def _identify_interpreters(cls, filter, paths=None):
def iter_candidates():
for path in cls._paths(paths=paths):
for fn in cls._expand_path(path):
if filter(fn):
yield fn

return execute_parallel(inputs=list(iter_candidates()), spawn_func=cls._spawn_from_binary)

@classmethod
def _filter(cls, pythons):
"""
Given an iterator over python interpreters filter out duplicate versions and versions we would
prefer not to use.
"""Filters duplicate python interpreters and versions we don't support.
Returns an iterator over PythonInterpreters.
Returns an iterator over PythonInterpreters.
"""
MAJOR, MINOR, SUBMINOR = range(3)
def version_filter(version):
return (version[MAJOR] == 2 and version[MINOR] >= 7 or
version[MAJOR] == 3 and version[MINOR] >= 4)
version[MAJOR] == 3 and version[MINOR] >= 5)

seen = set()
for interp in pythons:
Expand All @@ -390,7 +405,7 @@ def version_filter(version):
yield interp

@classmethod
def sanitized_environment(cls, env=None):
def _sanitized_environment(cls, env=None):
# N.B. This is merely a hack because sysconfig.py on the default OS X
# installation of 2.7 breaks.
env_copy = (env or os.environ).copy()
Expand All @@ -400,14 +415,16 @@ def sanitized_environment(cls, env=None):
def __init__(self, binary, identity):
"""Construct a PythonInterpreter.
You should probably PythonInterpreter.from_binary instead.
You should probably use `PythonInterpreter.from_binary` instead.
:param binary: The full path of the python binary.
:param identity: The :class:`PythonIdentity` of the PythonInterpreter.
:param binary: The full path of the python binary.
:param identity: The :class:`PythonIdentity` of the PythonInterpreter.
"""
self._binary = os.path.realpath(binary)
self._binary = self._normalize_path(binary)
self._identity = identity

self._PYTHON_INTERPRETER_BY_NORMALIZED_PATH[self._binary] = self

@property
def binary(self):
return self._binary
Expand Down Expand Up @@ -441,18 +458,59 @@ def open_process(self, args=None, pythonpath=None, env=None, **kwargs):
process = Executor.open_process(cmd, env=env, **kwargs)
return cmd, process

def _tup(self):
return self._binary, self._identity

def __hash__(self):
return hash((self._binary, self._identity))
return hash(self._tup())

def __eq__(self, other):
if not isinstance(other, PythonInterpreter):
return False
return (self._binary, self._identity) == (other._binary, other._identity)
if type(other) is not type(self):
return NotImplemented
return self._tup() == other._tup()

def __lt__(self, other):
if not isinstance(other, PythonInterpreter):
return False
if type(other) is not type(self):
return NotImplemented
return self.version < other.version

def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self._binary, self._identity)


def spawn_python_job(args, env=None, interpreter=None, expose=None, **subprocess_kwargs):
"""Spawns a python job.
:param args: The arguments to pass to the python interpreter.
:type args: list of str
:param env: The environment to spawn the python interpreter process in. Defaults to the ambient
environment.
:type env: dict of (str, str)
:param interpreter: The interpreter to use to spawn the python job. Defaults to the current
interpreter.
:type interpreter: :class:`PythonInterpreter`
:param expose: The names of any vendored distributions to expose to the spawned python process.
:type expose: list of str
:param subprocess_kwargs: Any additional :class:`subprocess.Popen` kwargs to pass through.
:returns: A job handle to the spawned python process.
:rtype: :class:`Job`
"""
if expose:
subprocess_env = (env or os.environ).copy()
# In order to expose vendored distributions with their un-vendored import paths in-tact, we
# need to set `__PEX_UNVENDORED__`. See: vendor.__main__.ImportRewriter._modify_import.
subprocess_env['__PEX_UNVENDORED__'] = '1'

pythonpath = third_party.expose(expose)
else:
subprocess_env = env
pythonpath = None

interpreter = interpreter or PythonInterpreter.get()
cmd, process = interpreter.open_process(
args=args,
pythonpath=pythonpath,
env=subprocess_env,
**subprocess_kwargs
)
return Job(command=cmd, process=process)
Loading

0 comments on commit aa6a843

Please sign in to comment.